crystal-concurrency

You are Claude Code, an expert in Crystal's concurrency model. You specialize in building high-performance, concurrent applications using fibers, channels, and Crystal's lightweight concurrency primitives.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "crystal-concurrency" with this command: npx skills add thebushidocollective/han/thebushidocollective-han-crystal-concurrency

Crystal Concurrency

You are Claude Code, an expert in Crystal's concurrency model. You specialize in building high-performance, concurrent applications using fibers, channels, and Crystal's lightweight concurrency primitives.

Your core responsibilities:

  • Implement fiber-based concurrent operations for non-blocking execution

  • Design channel-based communication patterns for inter-fiber coordination

  • Build parallel processing pipelines with proper synchronization

  • Implement worker pools and task distribution systems

  • Handle concurrent resource access with mutexes and atomic operations

  • Design fault-tolerant concurrent systems with proper error handling

  • Optimize fiber scheduling and resource utilization

  • Implement backpressure and flow control mechanisms

  • Build real-time data processing systems

  • Design concurrent I/O operations for network and file systems

Fibers: Lightweight Concurrency

Crystal uses fibers (also known as green threads or coroutines) for concurrency. Fibers are cooperatively scheduled by the Crystal runtime and are much lighter weight than OS threads.

Basic Fiber Spawning

Simple fiber spawning

spawn do puts "Running in a fiber" sleep 1 puts "Fiber completed" end

Fiber with arguments

def process_data(id : Int32, data : String) puts "Processing #{data} with id #{id}" sleep 0.5 puts "Completed #{id}" end

spawn process_data(1, "task A") spawn process_data(2, "task B")

Wait for fibers to complete

sleep 1

Fiber with Return Values via Channels

Fibers don't return values directly, use channels instead

result_channel = Channel(Int32).new

spawn do result = expensive_computation(42) result_channel.send(result) end

Do other work...

puts "Doing other work"

Wait for result

result = result_channel.receive puts "Got result: #{result}"

def expensive_computation(n : Int32) : Int32 sleep 1 n * 2 end

Named Fibers for Debugging

Give fibers descriptive names for debugging

spawn(name: "data-processor") do process_large_dataset end

spawn(name: "cache-updater") do update_cache_periodically end

Fiber names appear in exception backtraces

spawn(name: "failing-worker") do raise "Something went wrong" end

Channels: Inter-Fiber Communication

Channels are the primary mechanism for communication between fibers. They provide thread-safe message passing with optional buffering.

Unbuffered Channels

Unbuffered channel - blocks until both sender and receiver are ready

channel = Channel(String).new

spawn do puts "Sending message" channel.send("Hello") puts "Message sent" end

spawn do sleep 0.1 # Small delay puts "Receiving message" msg = channel.receive puts "Received: #{msg}" end

sleep 1

Buffered Channels

Buffered channel - allows sending without blocking up to buffer size

channel = Channel(Int32).new(capacity: 3)

These sends won't block

channel.send(1) channel.send(2) channel.send(3)

This would block until someone receives

channel.send(4)

Receive values

puts channel.receive # 1 puts channel.receive # 2 puts channel.receive # 3

Channel Closing and Iteration

Producer-consumer with channel closing

channel = Channel(Int32).new

Producer

spawn do 5.times do |i| channel.send(i) sleep 0.1 end channel.close # Signal no more values end

Consumer - iterate until channel is closed

spawn do channel.each do |value| puts "Received: #{value}" end puts "Channel closed, consumer exiting" end

sleep 1

Checking if Channel is Closed

channel = Channel(String).new

spawn do channel.send("message 1") channel.send("message 2") channel.close end

sleep 0.1

Check before receiving

unless channel.closed? puts channel.receive end

Or handle the exception

begin puts channel.receive puts channel.receive puts channel.receive # Will raise Channel::ClosedError rescue Channel::ClosedError puts "Channel is closed" end

Select: Multiplexing Channels

The select statement allows waiting on multiple channel operations simultaneously, similar to Go's select statement.

Basic Select with Multiple Channels

ch1 = Channel(String).new ch2 = Channel(Int32).new

spawn do sleep 0.2 ch1.send("from channel 1") end

spawn do sleep 0.1 ch2.send(42) end

Wait for whichever channel is ready first

select when msg = ch1.receive puts "Got string: #{msg}" when num = ch2.receive puts "Got number: #{num}" end

sleep 1

Select with Timeout

channel = Channel(String).new

spawn do sleep 2 # Takes too long channel.send("delayed message") end

Wait with timeout

select when msg = channel.receive puts "Received: #{msg}" when timeout(1.second) puts "Timed out waiting for message" end

Select with Default Case (Non-blocking)

channel = Channel(Int32).new

Non-blocking receive

select when value = channel.receive puts "Got value: #{value}" else puts "No value available, continuing immediately" end

Select in a Loop

results = Channel(String).new done = Channel(Nil).new output = [] of String

Multiple workers sending results

3.times do |i| spawn do sleep rand(0.5..1.5) results.send("Worker #{i} done") end end

Collector fiber

spawn do 3.times do output << results.receive end done.send(nil) end

Wait for completion with timeout

select when done.receive puts "All workers completed" output.each { |msg| puts msg } when timeout(5.seconds) puts "Timeout - not all workers completed" end

Worker Pools

Worker pools distribute tasks across a fixed number of concurrent workers.

Basic Worker Pool

class WorkerPool(T, R) def initialize(@size : Int32) @tasks = Channel(T).new @results = Channel(R).new @workers = [] of Fiber

@size.times do |i|
  @workers &#x3C;&#x3C; spawn(name: "worker-#{i}") do
    worker_loop
  end
end

end

private def worker_loop @tasks.each do |task| result = process(task) @results.send(result) end end

def process(task : T) : R # Override in subclass or pass block raise "Not implemented" end

def submit(task : T) @tasks.send(task) end

def get_result : R @results.receive end

def shutdown @tasks.close end end

Usage example

class IntSquarePool < WorkerPool(Int32, Int32) def process(task : Int32) : Int32 sleep 0.1 # Simulate work task * task end end

pool = IntSquarePool.new(size: 3)

Submit tasks

10.times { |i| pool.submit(i) }

Collect results

results = [] of Int32 10.times { results << pool.get_result }

pool.shutdown puts results.sort

Worker Pool with Error Handling

struct Task property id : Int32 property data : String

def initialize(@id, @data) end end

struct Result property task_id : Int32 property success : Bool property value : String? property error : String?

def initialize(@task_id, @success, @value = nil, @error = nil) end end

class RobustWorkerPool def initialize(@worker_count : Int32) @tasks = Channel(Task).new(capacity: 100) @results = Channel(Result).new(capacity: 100)

@worker_count.times do |i|
  spawn(name: "worker-#{i}") do
    process_tasks
  end
end

end

private def process_tasks @tasks.each do |task| begin result_value = process_task(task) @results.send(Result.new( task_id: task.id, success: true, value: result_value )) rescue ex @results.send(Result.new( task_id: task.id, success: false, error: ex.message )) end end end

private def process_task(task : Task) : String # Simulate processing that might fail raise "Invalid data" if task.data.empty? sleep 0.1 "Processed: #{task.data}" end

def submit(task : Task) @tasks.send(task) end

def get_result : Result @results.receive end

def shutdown @tasks.close end end

Parallel Map and Reduce

Implement parallel processing of collections.

Parallel Map

def parallel_map(collection : Array(T), workers : Int32 = 4, &block : T -> R) : Array(R) forall T, R tasks = Channel(Tuple(Int32, T)).new results = Channel(Tuple(Int32, R)).new

Spawn workers

workers.times do spawn do tasks.each do |index, item| result = yield item results.send({index, result}) end end end

Send tasks

spawn do collection.each_with_index do |item, index| tasks.send({index, item}) end tasks.close end

Collect results in order

result_map = {} of Int32 => R collection.size.times do index, result = results.receive result_map[index] = result end

collection.indices.map { |i| result_map[i] } end

Usage

numbers = (1..100).to_a squares = parallel_map(numbers, workers: 8) do |n| sleep 0.01 # Simulate work n * n end

puts squares.first(10)

Parallel Reduce with Pipeline

def parallel_reduce(collection : Array(T), workers : Int32 = 4, initial : R, &block : R, T -> R) : R forall T, R chunk_size = (collection.size / workers.to_f).ceil.to_i chunks = collection.each_slice(chunk_size).to_a

results = Channel(R).new

chunks.each do |chunk| spawn do chunk_result = chunk.reduce(initial) { |acc, item| yield acc, item } results.send(chunk_result) end end

Reduce the partial results

final_result = initial chunks.size.times do final_result = yield final_result, results.receive end

final_result end

Usage - sum of squares

numbers = (1..1000).to_a sum = parallel_reduce(numbers, initial: 0) do |acc, n| acc + n * n end

puts "Sum of squares: #{sum}"

Mutex: Protecting Shared State

When fibers need to share mutable state, use mutexes to prevent race conditions.

Basic Mutex Usage

require "mutex"

class Counter def initialize @count = 0 @mutex = Mutex.new end

def increment @mutex.synchronize do current = @count sleep 0.001 # Simulate some work @count = current + 1 end end

def value : Int32 @mutex.synchronize { @count } end end

counter = Counter.new

Spawn 100 fibers that each increment 10 times

100.times do spawn do 10.times { counter.increment } end end

sleep 2 puts "Final count: #{counter.value}" # Should be 1000

Read-Write Lock Pattern

require "mutex"

class CachedData def initialize @data = {} of String => String @mutex = Mutex.new @version = 0 end

def read(key : String) : String? @mutex.synchronize do @data[key]? end end

def write(key : String, value : String) @mutex.synchronize do @data[key] = value @version += 1 end end

def batch_update(updates : Hash(String, String)) @mutex.synchronize do updates.each do |key, value| @data[key] = value end @version += 1 end end

def snapshot : Hash(String, String) @mutex.synchronize do @data.dup end end end

Atomic Operations

For simple counters and flags, atomic operations are more efficient than mutexes.

Atomic Counter

require "atomic"

class AtomicCounter def initialize(initial : Int32 = 0) @count = Atomic(Int32).new(initial) end

def increment : Int32 @count.add(1) end

def decrement : Int32 @count.sub(1) end

def value : Int32 @count.get end

def compare_and_set(expected : Int32, new_value : Int32) : Bool @count.compare_and_set(expected, new_value) end end

counter = AtomicCounter.new

Safe concurrent increments without mutex

1000.times do spawn { counter.increment } end

sleep 1 puts "Count: #{counter.value}"

Atomic Flag for Coordination

require "atomic"

class ShutdownCoordinator def initialize @shutdown_flag = Atomic(Int32).new(0) end

def shutdown! @shutdown_flag.set(1) end

def shutdown? : Bool @shutdown_flag.get == 1 end

def run_until_shutdown(&block) until shutdown? yield sleep 0.1 end end end

coordinator = ShutdownCoordinator.new

Worker that checks shutdown flag

spawn(name: "worker") do coordinator.run_until_shutdown do puts "Working..." end puts "Worker shutdown gracefully" end

sleep 1 coordinator.shutdown! sleep 0.5

When to Use This Skill

Use the crystal-concurrency skill when you need to:

  • Process multiple I/O operations concurrently (network requests, file operations)

  • Implement real-time data processing pipelines

  • Build worker pools for parallel task processing

  • Handle multiple client connections simultaneously (web servers, chat systems)

  • Perform background processing without blocking main execution

  • Aggregate results from multiple concurrent operations

  • Implement producer-consumer patterns

  • Build rate limiters and backpressure mechanisms

  • Process large datasets in parallel

  • Coordinate multiple asynchronous operations

  • Implement timeout and cancellation patterns

  • Build concurrent caches with synchronized access

  • Stream data processing with multiple stages

  • Implement fan-out/fan-in patterns

Best Practices

  • Always Close Channels: Close channels when done sending to signal completion to receivers

  • Use Buffered Channels for Performance: Buffer channels when producers/consumers run at different speeds

  • Limit Fiber Count: Don't spawn unlimited fibers; use worker pools for bounded concurrency

  • Handle Channel Closure: Always handle Channel::ClosedError or check closed? before operations

  • Use Select for Timeouts: Implement timeouts with select and timeout() to prevent infinite blocking

  • Prefer Channels Over Shared State: Use message passing (channels) instead of shared memory when possible

  • Synchronize Shared State: Always use Mutex or atomics when sharing mutable state between fibers

  • Clean Up Resources: Use ensure blocks to guarantee resource cleanup even on errors

  • Name Your Fibers: Give fibers descriptive names for easier debugging and profiling

  • Avoid Blocking Operations in Fibers: Use non-blocking I/O; blocking operations prevent other fibers from running

  • Use Atomic Operations for Counters: Atomics are more efficient than mutexes for simple counters and flags

  • Implement Graceful Shutdown: Design systems to shut down cleanly, draining channels and waiting for fibers

  • Handle Fiber Panics: Wrap fiber code in exception handlers to prevent silent failures

  • Size Channel Buffers Appropriately: Too small causes blocking; too large wastes memory

  • Use Select Default for Polling: Non-blocking checks with select ... else for polling patterns

Common Pitfalls

  • Forgetting to Close Channels: Receivers will wait forever if channels aren't closed after sending completes

  • Deadlocks from Unbuffered Channels: Sending to unbuffered channel blocks until receiver is ready

  • Race Conditions on Shared State: Not using mutexes/atomics when multiple fibers access same data

  • Channel Buffer Overflow: Sending more items than buffer capacity without receivers causes blocking

  • Not Handling Closed Channels: Receiving from closed channel raises exception; always handle it

  • Spawning Too Many Fibers: Unlimited fiber spawning exhausts memory; use worker pools instead

  • Blocking the Scheduler: CPU-intensive work in fibers prevents other fibers from running

  • Resource Leaks: Not closing channels, files, or connections in all code paths including errors

  • Order Assumptions: Fibers execute in non-deterministic order; don't assume execution sequence

  • Timeout Too Short: Aggressive timeouts cause false failures; balance responsiveness with reliability

  • Mutex Held Too Long: Long critical sections reduce concurrency; minimize mutex hold time

  • Send/Receive Mismatch: Imbalanced producers/consumers leads to memory buildup or starvation

  • Ignoring Fiber Exceptions: Exceptions in fibers don't propagate to spawner; handle explicitly

  • Nested Mutex Locks: Can cause deadlocks; avoid acquiring multiple mutexes or use consistent order

  • Not Using synchronize : Forgetting to wrap mutex usage in synchronize block causes race conditions

Resources

  • Crystal Concurrency Guide

  • Crystal API - Fiber

  • Crystal API - Channel

  • Crystal API - Mutex

  • Crystal API - Atomic

  • Crystal Book - Concurrency

  • Effective Crystal - Concurrency Patterns

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

typescript-type-system

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

typescript-async-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

c-systems-programming

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

cpp-templates-metaprogramming

No summary provided by upstream source.

Repository SourceNeeds Review