elixir-otp

OTP design patterns, supervision trees, and process management with KISS principle emphasis

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "elixir-otp" with this command: npx skills add vircung/opencode-config/vircung-opencode-config-elixir-otp

Elixir OTP Skill

Essential OTP Patterns (KISS First)

Use these patterns for 80% of use cases. Start simple, optimize later.

GenServer: Stateful Processes

When to use GenServer:

  • Need to maintain state across function calls
  • State needs to be shared across processes
  • Need to serialize access to a resource
  • Long-running processes with periodic tasks

Basic GenServer Pattern:

defmodule MyApp.Counter do
  use GenServer
  
  # Client API (public interface)
  def start_link(initial_value \\ 0) do
    GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
  end
  
  def get_count do
    GenServer.call(__MODULE__, :get_count)
  end
  
  def increment do
    GenServer.cast(__MODULE__, :increment)
  end
  
  def increment_by(amount) do
    GenServer.call(__MODULE__, {:increment_by, amount})
  end
  
  # Server Callbacks (private implementation)
  @impl true
  def init(initial_value) do
    {:ok, initial_value}
  end
  
  @impl true
  def handle_call(:get_count, _from, count) do
    {:reply, count, count}
  end
  
  @impl true
  def handle_call({:increment_by, amount}, _from, count) do
    new_count = count + amount
    {:reply, new_count, new_count}
  end
  
  @impl true
  def handle_cast(:increment, count) do
    {:noreply, count + 1}
  end
end

Common GenServer Patterns:

1. Resource Pool:

defmodule MyApp.ConnectionPool do
  use GenServer
  
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def checkout do
    GenServer.call(__MODULE__, :checkout)
  end
  
  def checkin(connection) do
    GenServer.cast(__MODULE__, {:checkin, connection})
  end
  
  @impl true
  def init(opts) do
    pool_size = Keyword.get(opts, :pool_size, 10)
    connections = Enum.map(1..pool_size, fn _ -> create_connection() end)
    {:ok, %{available: connections, in_use: []}}
  end
  
  @impl true
  def handle_call(:checkout, from, %{available: [conn | rest]} = state) do
    monitor_ref = Process.monitor(from)
    new_state = %{
      state | 
      available: rest, 
      in_use: [{conn, from, monitor_ref} | state.in_use]
    }
    {:reply, {:ok, conn}, new_state}
  end
  
  def handle_call(:checkout, _from, %{available: []} = state) do
    {:reply, {:error, :no_connections}, state}
  end
end

2. Cache with TTL:

defmodule MyApp.Cache do
  use GenServer
  
  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end
  
  def put(key, value, ttl \\ 60_000) do
    GenServer.cast(__MODULE__, {:put, key, value, ttl})
  end
  
  def get(key) do
    GenServer.call(__MODULE__, {:get, key})
  end
  
  @impl true
  def init(_) do
    # Schedule cleanup every minute
    :timer.send_interval(60_000, :cleanup)
    {:ok, %{}}
  end
  
  @impl true
  def handle_cast({:put, key, value, ttl}, cache) do
    expires_at = System.system_time(:millisecond) + ttl
    {:noreply, Map.put(cache, key, {value, expires_at})}
  end
  
  @impl true
  def handle_call({:get, key}, _from, cache) do
    now = System.system_time(:millisecond)
    case Map.get(cache, key) do
      {value, expires_at} when expires_at > now -> {:reply, {:ok, value}, cache}
      _ -> {:reply, :not_found, cache}
    end
  end
  
  @impl true
  def handle_info(:cleanup, cache) do
    now = System.system_time(:millisecond)
    clean_cache = cache |> Enum.reject(fn {_key, {_value, expires_at}} -> expires_at <= now end) |> Map.new()
    {:noreply, clean_cache}
  end
end

Supervisor: Process Supervision

When to use Supervisor:

  • ALWAYS supervise your processes (GenServers, Tasks)
  • Need fault tolerance and automatic restart
  • Managing a group of related processes

Basic Supervision Strategies:

1. :one_for_one (Most Common - 90% of cases)

defmodule MyApp.Supervisor do
  use Supervisor
  
  def start_link(opts) do
    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  @impl true
  def init(_opts) do
    children = [
      MyApp.Counter,
      MyApp.Cache,
      {MyApp.Worker, [arg1: "value"]}
    ]
    
    # If one child crashes, restart only that child
    Supervisor.init(children, strategy: :one_for_one)
  end
end

2. :rest_for_one (When Order Matters)

defmodule MyApp.DatabaseSupervisor do
  use Supervisor
  
  @impl true
  def init(_opts) do
    children = [
      MyApp.Repo,           # Database connection
      MyApp.Cache,          # Depends on DB
      MyApp.DataProcessor   # Depends on both
    ]
    
    # If DataProcessor crashes, restart it only
    # If Cache crashes, restart Cache AND DataProcessor
    # If Repo crashes, restart ALL children
    Supervisor.init(children, strategy: :rest_for_one)
  end
end

3. :one_for_all (All or Nothing)

defmodule MyApp.ClusterSupervisor do
  use Supervisor
  
  @impl true  
  def init(_opts) do
    children = [
      MyApp.ClusterNode1,
      MyApp.ClusterNode2,
      MyApp.ClusterCoordinator
    ]
    
    # If ANY child crashes, restart ALL children
    # Use when children are tightly coupled
    Supervisor.init(children, strategy: :one_for_all)
  end
end

Task: One-off Async Work

When to use Task:

  • One-off async operations
  • Fire-and-forget work
  • Parallel processing without state

Basic Task Patterns:

1. Simple Async/Await:

# Fire and forget
Task.start(fn -> 
  send_email(user.email, "Welcome!")
end)

# Async with result
task = Task.async(fn ->
  expensive_calculation(data)
end)

result = Task.await(task, 10_000)  # 10 second timeout

2. Supervised Tasks:

defmodule MyApp.TaskSupervisor do
  use Task.Supervisor
  
  def start_link(_opts) do
    Task.Supervisor.start_link(name: __MODULE__)
  end
end

# In your application supervisor
children = [
  MyApp.TaskSupervisor,
  # other children...
]

# Use supervised tasks
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
  process_large_file(file_path)
end)

3. Parallel Processing:

defmodule MyApp.BatchProcessor do
  def process_batch(items) do
    items
    |> Task.async_stream(&process_item/1, max_concurrency: 10, timeout: 30_000)
    |> Enum.to_list()
  end
  
  defp process_item(item) do
    # Process individual item
    {:ok, processed_item}
  end
end

Agent: Simple Shared State

When to use Agent:

  • Simple shared state (get/update operations)
  • No complex logic or async operations
  • No need for custom message handling

Agent vs GenServer Decision:

  • Use Agent: Simple key-value store, counters, flags
  • Use GenServer: Complex state logic, async operations, custom messages
defmodule MyApp.Settings do
  use Agent
  
  def start_link(initial_settings \\ %{}) do
    Agent.start_link(fn -> initial_settings end, name: __MODULE__)
  end
  
  def get(key) do
    Agent.get(__MODULE__, &Map.get(&1, key))
  end
  
  def put(key, value) do
    Agent.update(__MODULE__, &Map.put(&1, key, value))
  end
  
  def get_all do
    Agent.get(__MODULE__, & &1)
  end
end

Process Communication

Message Passing Patterns

1. Direct Send/Receive:

defmodule MyApp.DirectMessaging do
  def start_receiver do
    spawn(fn -> message_loop() end)
  end
  
  defp message_loop do
    receive do
      {:hello, sender_pid} -> 
        send(sender_pid, {:reply, "Hello back!"})
        message_loop()
      :stop -> 
        :ok
      _ -> 
        message_loop()
    end
  end
  
  def send_message(receiver_pid, message) do
    send(receiver_pid, message)
    
    receive do
      {:reply, response} -> response
    after
      5000 -> :timeout
    end
  end
end

2. GenServer Calls (Preferred):

# Synchronous communication
result = GenServer.call(MyWorker, {:process, data})

# Asynchronous communication  
GenServer.cast(MyWorker, {:update_state, new_data})

Process Monitoring and Linking

1. Process Monitoring:

defmodule MyApp.ProcessMonitor do
  use GenServer
  
  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end
  
  def monitor_process(pid) do
    GenServer.cast(__MODULE__, {:monitor, pid})
  end
  
  @impl true
  def handle_cast({:monitor, pid}, monitored) do
    ref = Process.monitor(pid)
    {:noreply, Map.put(monitored, ref, pid)}
  end
  
  @impl true
  def handle_info({:DOWN, ref, :process, pid, reason}, monitored) do
    IO.puts("Process #{inspect(pid)} died with reason: #{inspect(reason)}")
    {:noreply, Map.delete(monitored, ref)}
  end
end

2. Process Linking (Use with Caution):

# Bidirectional link - if one dies, both die
Process.link(other_pid)

# Better: Use supervision for managed restarts

Advanced OTP Patterns (Use When Needed)

These patterns solve specific scaling problems. Don't use unless you have measured the need.

DynamicSupervisor: Runtime Child Spawning

When you outgrow static supervision:

  • Need to spawn processes at runtime
  • Don't know number of processes at startup
  • Managing user sessions, connections
defmodule MyApp.SessionSupervisor do
  use DynamicSupervisor
  
  def start_link(_opts) do
    DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
  end
  
  @impl true
  def init(_opts) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
  
  def start_session(user_id) do
    spec = {MyApp.UserSession, user_id}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end
  
  def stop_session(user_id) do
    case Registry.lookup(MyApp.SessionRegistry, user_id) do
      [{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
      [] -> :ok
    end
  end
end

PartitionSupervisor: Reducing GenServer Bottlenecks

When single GenServer becomes bottleneck:

  • High-frequency operations
  • Contention on single process
  • Need to distribute load
defmodule MyApp.CounterSupervisor do
  use PartitionSupervisor
  
  def start_link(_opts) do
    PartitionSupervisor.start_link(__MODULE__, [], name: __MODULE__)
  end
  
  @impl true
  def init(_opts) do
    PartitionSupervisor.init(MyApp.Counter, strategy: :one_for_one)
  end
  
  def increment(key) do
    PartitionSupervisor.get_child_pid(__MODULE__, key)
    |> MyApp.Counter.increment()
  end
end

Registry: Process Discovery

When you have many named processes:

  • Dynamic process naming
  • Process lookups by custom keys
  • Avoiding atom exhaustion
defmodule MyApp.GameRegistry do
  def start_link do
    Registry.start_link(keys: :unique, name: __MODULE__)
  end
  
  def register_game(game_id, pid) do
    Registry.register(__MODULE__, game_id, pid)
  end
  
  def find_game(game_id) do
    case Registry.lookup(__MODULE__, game_id) do
      [{pid, _}] -> {:ok, pid}
      [] -> {:error, :not_found}
    end
  end
  
  def list_games do
    Registry.select(__MODULE__, [{{:"$1", :"$2", :"$3"}, [], [{{:"$1", :"$2"}}]}])
  end
end

OTP Design Principles (KISS Emphasis)

When to Use What (KISS Guide)

Start Simple (80% of cases):

Need state? → GenServer
Need supervision? → Supervisor (one_for_one)
Need one-off async? → Task
Need simple shared state? → Agent

Advance When Needed (specific problems only):

Spawning processes at runtime? → DynamicSupervisor
GenServer bottleneck (proven via profiling)? → PartitionSupervisor  
Many named processes? → Registry
Need supervised async tasks? → Task.Supervisor

Red Flags (Premature Optimization)

Don't do these without measuring:

  • Using PartitionSupervisor "just in case"
  • Registry when you have < 100 processes
  • Complex supervision trees without proven need
  • DynamicSupervisor when static children work fine

Decision Framework

1. Start with simplest pattern that works

# Start here
defmodule MyApp.SimpleCounter do
  use GenServer
  # Basic implementation
end

# Not here
defmodule MyApp.DistributedPartitionedCounter do
  use PartitionSupervisor
  # Complex implementation you don't need yet
end

2. Profile to find actual bottlenecks

# Use Observer to find real problems
:observer.start()

# Use telemetry for metrics
:telemetry.execute([:my_app, :counter, :increment], %{value: 1})

3. Optimize specific problem with specific solution

# If profiling shows GenServer is bottleneck:
# THEN consider PartitionSupervisor

# If you're running out of atoms for process names:
# THEN consider Registry

# If you need runtime process spawning:
# THEN consider DynamicSupervisor

4. Measure improvement, iterate

OTP Testing Patterns

Testing GenServers

defmodule MyApp.CounterTest do
  use ExUnit.Case
  
  setup do
    {:ok, pid} = MyApp.Counter.start_link(0)
    %{counter: pid}
  end
  
  test "increments count", %{counter: pid} do
    assert MyApp.Counter.get_count(pid) == 0
    MyApp.Counter.increment(pid)
    assert MyApp.Counter.get_count(pid) == 1
  end
  
  test "handles concurrent increments" do
    {:ok, pid} = MyApp.Counter.start_link(0)
    
    tasks = for _ <- 1..100 do
      Task.async(fn -> MyApp.Counter.increment(pid) end)
    end
    
    Enum.each(tasks, &Task.await/1)
    assert MyApp.Counter.get_count(pid) == 100
  end
end

Testing Supervision Trees

defmodule MyApp.SupervisorTest do
  use ExUnit.Case
  
  test "restarts failed children" do
    {:ok, supervisor_pid} = MyApp.Supervisor.start_link([])
    
    # Find counter process
    [{counter_pid, _}] = Registry.lookup(MyApp.Registry, MyApp.Counter)
    
    # Kill the process
    Process.exit(counter_pid, :kill)
    
    # Wait for restart
    :timer.sleep(100)
    
    # Verify new process is running
    [{new_counter_pid, _}] = Registry.lookup(MyApp.Registry, MyApp.Counter)
    assert new_counter_pid != counter_pid
    assert Process.alive?(new_counter_pid)
  end
end

Cross-References

System Architecture

For high-level system design, context boundaries, and distributed architecture patterns, see the elixir-architecture skill.

Phoenix Integration

For integrating OTP patterns with Phoenix contexts, LiveView, and PubSub systems, see the elixir-phoenix-framework skill.

Performance Review

For profiling OTP applications, identifying process bottlenecks, and optimization strategies, see the elixir-review skill.

Summary

Remember KISS:

  1. Start simple: GenServer + Supervisor covers most needs
  2. Profile before optimizing: Measure actual problems, not theoretical ones
  3. Advance when needed: Use complex patterns to solve specific measured problems
  4. Test concurrent behavior: OTP makes concurrency easier but testing is still important

The BEAM VM and OTP give you powerful tools for building fault-tolerant, concurrent systems. Use them wisely by starting simple and evolving complexity only when requirements demand it.

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

python-architecture

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

elixir-ecto

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

elixir-development

No summary provided by upstream source.

Repository SourceNeeds Review