elixir-otp-concurrency

Elixir OTP and Concurrency

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "elixir-otp-concurrency" with this command: npx skills add vinnie357/claude-skills/vinnie357-claude-skills-elixir-otp-concurrency

Elixir OTP and Concurrency

This skill activates when working with OTP behaviors, building concurrent systems, managing processes, or implementing fault-tolerant architectures in Elixir.

When to Use This Skill

Activate when:

  • Implementing GenServer, GenStage, Supervisor, or other OTP behaviors

  • Designing supervision trees and fault-tolerance strategies

  • Working with Tasks, Agents, or process management

  • Building concurrent or distributed systems

  • Managing application state

  • Troubleshooting process-related issues

OTP Behaviors

GenServer - Generic Server

Use GenServer for stateful processes:

defmodule MyApp.Counter do use GenServer

Client API

def start_link(initial_value) do GenServer.start_link(MODULE, initial_value, name: MODULE) end

def increment do GenServer.call(MODULE, :increment) end

def get_value do GenServer.call(MODULE, :get) end

Server Callbacks

@impl true def init(initial_value) do {:ok, initial_value} end

@impl true def handle_call(:increment, _from, state) do {:reply, state + 1, state + 1} end

@impl true def handle_call(:get, _from, state) do {:reply, state, state} end end

GenServer Best Practices

  • Use call for synchronous requests that need a response

  • Use cast for asynchronous fire-and-forget messages

  • Use handle_info for receiving regular messages

  • Keep server callbacks fast - delegate heavy work to Tasks

  • Name processes with via tuples or Registry for dynamic naming

  • Implement timeouts to prevent client processes from hanging

GenServer Patterns

Background Work:

def init(state) do schedule_work() {:ok, state} end

def handle_info(:work, state) do do_work(state) schedule_work() {:noreply, state} end

defp schedule_work do Process.send_after(self(), :work, 5000) end

State Timeouts:

def handle_call(:get, _from, state) do {:reply, state, state, {:state_timeout, 30_000, :cleanup}} end

def handle_state_timeout(:cleanup, state) do {:stop, :normal, state} end

Supervisor - Process Supervision

Build supervision trees for fault tolerance:

defmodule MyApp.Application do use Application

@impl true def start(_type, _args) do children = [ # Database connection pool {MyApp.Repo, []},

  # PubSub system
  {Phoenix.PubSub, name: MyApp.PubSub},

  # Custom supervisor
  {MyApp.WorkerSupervisor, []},

  # Individual workers
  {MyApp.Cache, []},
  {MyApp.RateLimiter, []},

  # Web endpoint
  MyAppWeb.Endpoint
]

opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)

end end

Supervision Strategies

:one_for_one - If a child dies, only that child is restarted

Supervisor.start_link(children, strategy: :one_for_one)

:one_for_all - If any child dies, all children are terminated and restarted

Supervisor.start_link(children, strategy: :one_for_all)

:rest_for_one - If a child dies, it and all children started after it are restarted

Supervisor.start_link(children, strategy: :rest_for_one)

Dynamic Supervisors

For dynamically creating processes:

defmodule MyApp.WorkerSupervisor do use DynamicSupervisor

def start_link(init_arg) do DynamicSupervisor.start_link(MODULE, init_arg, name: MODULE) end

def start_worker(args) do spec = {MyApp.Worker, args} DynamicSupervisor.start_child(MODULE, spec) end

@impl true def init(_init_arg) do DynamicSupervisor.init(strategy: :one_for_one) end end

Restart Strategies

Configure child restart behavior:

children = [

Always restart (default)

{MyApp.CriticalWorker, restart: :permanent},

Never restart

{MyApp.OneTimeTask, restart: :temporary},

Only restart on abnormal exit

{MyApp.OptionalWorker, restart: :transient} ]

Task - Concurrent Work

Fire-and-forget Tasks

For concurrent work without needing results:

Task.start(fn -> send_email(user, "Welcome!") end)

Awaited Tasks

For concurrent work with results:

task = Task.async(fn -> expensive_computation() end)

Do other work...

result = Task.await(task, 5000) # 5 second timeout

Supervised Tasks

For long-running tasks under supervision:

defmodule MyApp.Application do use Application

def start(_type, _args) do children = [ {Task.Supervisor, name: MyApp.TaskSupervisor} ]

Supervisor.start_link(children, strategy: :one_for_one)

end end

Use the supervised task

Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> long_running_operation() end)

Concurrent Map

Process collections concurrently:

Sequential

results = Enum.map(urls, &fetch_url/1)

Concurrent

results = Task.async_stream(urls, &fetch_url/1, max_concurrency: 10) |> Enum.to_list()

Agent - Simple State Management

Use Agent for simple state:

{:ok, agent} = Agent.start_link(fn -> %{} end, name: MyApp.Cache)

Get state

value = Agent.get(MyApp.Cache, fn state -> Map.get(state, :key) end)

Update state

Agent.update(MyApp.Cache, fn state -> Map.put(state, :key, value) end)

Get and update atomically

Agent.get_and_update(MyApp.Cache, fn state -> {Map.get(state, :key), Map.delete(state, :key)} end)

When to use Agent vs GenServer:

  • Use Agent for simple key-value state

  • Use GenServer when you need complex logic, callbacks, or process lifecycle management

Process Communication

send/receive

Basic message passing:

Send message

send(pid, {:hello, "world"})

Receive message

receive do {:hello, msg} -> IO.puts(msg) after 5000 -> IO.puts("Timeout") end

Process Registration

Register processes by name:

Local registration

Process.register(self(), :my_process) send(:my_process, :hello)

Via Registry

{:ok, _} = Registry.start_link(keys: :unique, name: MyApp.Registry)

{:ok, pid} = GenServer.start_link(MyWorker, nil, name: {:via, Registry, {MyApp.Registry, "worker_1"}} )

Look up process

[{pid, _}] = Registry.lookup(MyApp.Registry, "worker_1")

Process Links and Monitors

Links - Bidirectional, propagate exits:

Link processes

Process.link(pid)

Spawn linked

spawn_link(fn -> do_work() end)

Monitors - Unidirectional, receive DOWN messages:

ref = Process.monitor(pid)

receive do {:DOWN, ^ref, :process, ^pid, reason} -> IO.puts("Process died: #{inspect(reason)}") end

Concurrency Patterns

Pipeline Pattern

Chain operations with concurrency:

defmodule Pipeline do def process(data) do data |> async(&step1/1) |> async(&step2/1) |> async(&step3/1) |> await_all() end

defp async(input, fun) do Task.async(fn -> fun.(input) end) end

defp await_all(tasks) when is_list(tasks) do Enum.map(tasks, &Task.await/1) end end

Worker Pool

Implement a worker pool:

defmodule MyApp.WorkerPool do use GenServer

def start_link(opts) do pool_size = Keyword.get(opts, :size, 10) GenServer.start_link(MODULE, pool_size, name: MODULE) end

def execute(fun) do GenServer.call(MODULE, {:execute, fun}) end

@impl true def init(pool_size) do workers = for _ <- 1..pool_size do {:ok, pid} = Task.Supervisor.start_link() pid end

{:ok, %{workers: workers, index: 0}}

end

@impl true def handle_call({:execute, fun}, _from, state) do worker = Enum.at(state.workers, state.index) task = Task.Supervisor.async_nolink(worker, fun)

new_index = rem(state.index + 1, length(state.workers))
{:reply, task, %{state | index: new_index}}

end end

Backpressure with GenStage

For producer-consumer pipelines:

defmodule Producer do use GenStage

def start_link(initial) do GenStage.start_link(MODULE, initial, name: MODULE) end

def init(initial) do {:producer, initial} end

def handle_demand(demand, state) do events = Enum.to_list(state..state + demand - 1) {:noreply, events, state + demand} end end

defmodule Consumer do use GenStage

def start_link() do GenStage.start_link(MODULE, :ok) end

def init(:ok) do {:consumer, :ok} end

def handle_events(events, _from, state) do Enum.each(events, &process_event/1) {:noreply, [], state} end end

ETS - Erlang Term Storage

In-memory key-value storage:

Create table

:ets.new(:my_table, [:named_table, :public, read_concurrency: true])

Insert

:ets.insert(:my_table, {:key, "value"})

Lookup

[{:key, value}] = :ets.lookup(:my_table, :key)

Delete

:ets.delete(:my_table, :key)

Match patterns

:ets.match(:my_table, {:"$1", "value"})

Iterate

:ets.foldl(fn {k, v}, acc -> [{k, v} | acc] end, [], :my_table)

ETS Best Practices

  • Use read_concurrency: true for read-heavy workloads

  • Use write_concurrency: true for write-heavy workloads

  • Prefer :set (default) for unique keys

  • Use :bag or :duplicate_bag for multiple values per key

  • Always own ETS tables in a GenServer or Supervisor to prevent data loss

Error Handling and Fault Tolerance

Let It Crash Philosophy

Design for failure:

Don't do defensive programming

def process_order(order_id) do

Let it crash if order doesn't exist

order = Repo.get!(Order, order_id)

Let it crash if validation fails

{:ok, processed} = process(order)

processed end

Proper Error Handling

When to handle errors vs let crash:

Handle expected errors

def fetch_user(id) do case HTTPoison.get("#{@api_url}/users/#{id}") do {:ok, %{status_code: 200, body: body}} -> Jason.decode(body)

{:ok, %{status_code: 404}} ->
  {:error, :not_found}

{:ok, %{status_code: status}} ->
  {:error, {:unexpected_status, status}}

{:error, reason} ->
  {:error, {:network_error, reason}}

end end

Let unexpected errors crash

def update_user!(id, params) do user = Repo.get!(User, id) # Crash if not found

user |> User.changeset(params) |> Repo.update!() # Crash if invalid end

Circuit Breaker

Prevent cascading failures:

defmodule CircuitBreaker do use GenServer

def start_link(_) do GenServer.start_link(MODULE, %{status: :closed, failures: 0}, name: MODULE) end

def call(fun) do case GenServer.call(MODULE, :status) do :open -> {:error, :circuit_open} :closed -> execute(fun) end end

defp execute(fun) do try do result = fun.() GenServer.cast(MODULE, :success) {:ok, result} rescue e -> GenServer.cast(MODULE, :failure) {:error, e} end end

@impl true def init(state), do: {:ok, state}

@impl true def handle_call(:status, _from, state) do {:reply, state.status, state} end

@impl true def handle_cast(:success, state) do {:noreply, %{state | failures: 0, status: :closed}} end

@impl true def handle_cast(:failure, state) do new_failures = state.failures + 1

if new_failures >= 5 do
  Process.send_after(self(), :half_open, 30_000)
  {:noreply, %{state | failures: new_failures, status: :open}}
else
  {:noreply, %{state | failures: new_failures}}
end

end

@impl true def handle_info(:half_open, state) do {:noreply, %{state | status: :closed, failures: 0}} end end

Testing Concurrent Systems

Testing GenServers

defmodule MyApp.CounterTest do use ExUnit.Case, async: true

test "increments counter" do {:ok, pid} = MyApp.Counter.start_link(0)

assert MyApp.Counter.increment(pid) == 1
assert MyApp.Counter.increment(pid) == 2
assert MyApp.Counter.get_value(pid) == 2

end end

Testing Asynchronous Processes

test "process receives message" do parent = self()

spawn(fn -> receive do :ping -> send(parent, :pong) end end)

send(pid, :ping)

assert_receive :pong, 1000 end

Testing Supervision

test "supervisor restarts crashed worker" do {:ok, sup} = Supervisor.start_link([MyApp.Worker], strategy: :one_for_one)

[{_, worker_pid, _, _}] = Supervisor.which_children(sup)

Crash the worker

Process.exit(worker_pid, :kill)

Wait for restart

Process.sleep(100)

Verify new worker started

[{_, new_pid, _, _}] = Supervisor.which_children(sup) assert new_pid != worker_pid assert Process.alive?(new_pid) end

Debugging Concurrent Systems

Observer

Launch Observer for visual process inspection:

:observer.start()

Process Info

Inspect running processes:

List all processes

Process.list()

Process information

Process.info(pid)

Message queue length

{:message_queue_len, count} = Process.info(pid, :message_queue_len)

Current function

{:current_function, {mod, fun, arity}} = Process.info(pid, :current_function)

Tracing

Use :sys module for debugging:

Enable tracing

:sys.trace(pid, true)

Get state

:sys.get_state(pid)

Get status

:sys.get_status(pid)

Performance Considerations

Process Spawning

  • Processes are lightweight (< 2KB overhead)

  • Spawning thousands/millions of processes is normal

  • Use process pools when spawn rate is very high

Message Passing

  • Messages are copied between processes

  • Large messages are expensive - consider ETS or persistent_term

  • Use binary for efficient large data transfer

Bottlenecks

  • Single GenServer can become bottleneck

  • Solution: shard state across multiple processes

  • Use ETS with read_concurrency for read-heavy workloads

Key Principles

  • Embrace concurrency: Use processes liberally, they're cheap

  • Let it crash: Don't write defensive code, use supervision

  • Isolate failures: Design supervision trees to contain failures

  • Communicate via messages: Avoid shared state between processes

  • Use the right tool: GenServer for state, Task for work, Agent for simple state

  • Test at boundaries: Test process APIs, not internal implementation

  • Monitor and observe: Use Observer and logging to understand system behavior

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

nushell

No summary provided by upstream source.

Repository SourceNeeds Review
General

accessibility

No summary provided by upstream source.

Repository SourceNeeds Review
General

act

No summary provided by upstream source.

Repository SourceNeeds Review
General

elixir-anti-patterns

No summary provided by upstream source.

Repository SourceNeeds Review