ð crystal-concurrency
Use when implementing concurrent programming in Crystal using fibers, channels, and parallel execution patterns for high-performance, non-blocking applications.
Overview
You are Claude Code, an expert in Crystal's concurrency model. You specialize in building high-performance, concurrent applications using fibers, channels, and Crystal's lightweight concurrency primitives.
Your core responsibilities:
- Implement fiber-based concurrent operations for non-blocking execution
- Design channel-based communication patterns for inter-fiber coordination
- Build parallel processing pipelines with proper synchronization
- Implement worker pools and task distribution systems
- Handle concurrent resource access with mutexes and atomic operations
- Design fault-tolerant concurrent systems with proper error handling
- Optimize fiber scheduling and resource utilization
- Implement backpressure and flow control mechanisms
- Build real-time data processing systems
- Design concurrent I/O operations for network and file systems
Fibers: Lightweight Concurrency
Crystal uses fibers (also known as green threads or coroutines) for concurrency. Fibers are cooperatively scheduled by the Crystal runtime and are much lighter weight than OS threads.
Basic Fiber Spawning
# Simple fiber spawning
spawn do
puts "Running in a fiber"
sleep 1
puts "Fiber completed"
end
# Fiber with arguments
def process_data(id : Int32, data : String)
puts "Processing #{data} with id #{id}"
sleep 0.5
puts "Completed #{id}"
end
spawn process_data(1, "task A")
spawn process_data(2, "task B")
# Wait for fibers to complete
sleep 1
Fiber with Return Values via Channels
# Fibers don't return values directly, use channels instead
result_channel = Channel(Int32).new
spawn do
result = expensive_computation(42)
result_channel.send(result)
end
# Do other work...
puts "Doing other work"
# Wait for result
result = result_channel.receive
puts "Got result: #{result}"
def expensive_computation(n : Int32) : Int32
sleep 1
n * 2
end
Named Fibers for Debugging
# Give fibers descriptive names for debugging
spawn(name: "data-processor") do
process_large_dataset
end
spawn(name: "cache-updater") do
update_cache_periodically
end
# Fiber names appear in exception backtraces
spawn(name: "failing-worker") do
raise "Something went wrong"
end
Channels: Inter-Fiber Communication
Channels are the primary mechanism for communication between fibers. They provide thread-safe message passing with optional buffering.
Unbuffered Channels
# Unbuffered channel - blocks until both sender and receiver are ready
channel = Channel(String).new
spawn do
puts "Sending message"
channel.send("Hello")
puts "Message sent"
end
spawn do
sleep 0.1 # Small delay
puts "Receiving message"
msg = channel.receive
puts "Received: #{msg}"
end
sleep 1
Buffered Channels
# Buffered channel - allows sending without blocking up to buffer size
channel = Channel(Int32).new(capacity: 3)
# These sends won't block
channel.send(1)
channel.send(2)
channel.send(3)
# This would block until someone receives
# channel.send(4)
# Receive values
puts channel.receive # 1
puts channel.receive # 2
puts channel.receive # 3
Channel Closing and Iteration
# Producer-consumer with channel closing
channel = Channel(Int32).new
# Producer
spawn do
5.times do |i|
channel.send(i)
sleep 0.1
end
channel.close # Signal no more values
end
# Consumer - iterate until channel is closed
spawn do
channel.each do |value|
puts "Received: #{value}"
end
puts "Channel closed, consumer exiting"
end
sleep 1
Checking if Channel is Closed
channel = Channel(String).new
spawn do
channel.send("message 1")
channel.send("message 2")
channel.close
end
sleep 0.1
# Check before receiving
unless channel.closed?
puts channel.receive
end
# Or handle the exception
begin
puts channel.receive
puts channel.receive
puts channel.receive # Will raise Channel::ClosedError
rescue Channel::ClosedError
puts "Channel is closed"
end
Select: Multiplexing Channels
The select statement allows waiting on multiple channel operations simultaneously,
similar to Go's select statement.
Basic Select with Multiple Channels
ch1 = Channel(String).new
ch2 = Channel(Int32).new
spawn do
sleep 0.2
ch1.send("from channel 1")
end
spawn do
sleep 0.1
ch2.send(42)
end
# Wait for whichever channel is ready first
select
when msg = ch1.receive
puts "Got string: #{msg}"
when num = ch2.receive
puts "Got number: #{num}"
end
sleep 1
Select with Timeout
channel = Channel(String).new
spawn do
sleep 2 # Takes too long
channel.send("delayed message")
end
# Wait with timeout
select
when msg = channel.receive
puts "Received: #{msg}"
when timeout(1.second)
puts "Timed out waiting for message"
end
Select with Default Case (Non-blocking)
channel = Channel(Int32).new
# Non-blocking receive
select
when value = channel.receive
puts "Got value: #{value}"
else
puts "No value available, continuing immediately"
end
Select in a Loop
results = Channel(String).new
done = Channel(Nil).new
output = [] of String
# Multiple workers sending results
3.times do |i|
spawn do
sleep rand(0.5..1.5)
results.send("Worker #{i} done")
end
end
# Collector fiber
spawn do
3.times do
output << results.receive
end
done.send(nil)
end
# Wait for completion with timeout
select
when done.receive
puts "All workers completed"
output.each { |msg| puts msg }
when timeout(5.seconds)
puts "Timeout - not all workers completed"
end
Worker Pools
Worker pools distribute tasks across a fixed number of concurrent workers.
Basic Worker Pool
class WorkerPool(T, R)
def initialize(@size : Int32)
@tasks = Channel(T).new
@results = Channel(R).new
@workers = [] of Fiber
@size.times do |i|
@workers << spawn(name: "worker-#{i}") do
worker_loop
end
end
end
private def worker_loop
@tasks.each do |task|
result = process(task)
@results.send(result)
end
end
def process(task : T) : R
# Override in subclass or pass block
raise "Not implemented"
end
def submit(task : T)
@tasks.send(task)
end
def get_result : R
@results.receive
end
def shutdown
@tasks.close
end
end
# Usage example
class IntSquarePool < WorkerPool(Int32, Int32)
def process(task : Int32) : Int32
sleep 0.1 # Simulate work
task * task
end
end
pool = IntSquarePool.new(size: 3)
# Submit tasks
10.times { |i| pool.submit(i) }
# Collect results
results = [] of Int32
10.times { results << pool.get_result }
pool.shutdown
puts results.sort
Worker Pool with Error Handling
struct Task
property id : Int32
property data : String
def initialize(@id, @data)
end
end
struct Result
property task_id : Int32
property success : Bool
property value : String?
property error : String?
def initialize(@task_id, @success, @value = nil, @error = nil)
end
end
class RobustWorkerPool
def initialize(@worker_count : Int32)
@tasks = Channel(Task).new(capacity: 100)
@results = Channel(Result).new(capacity: 100)
@worker_count.times do |i|
spawn(name: "worker-#{i}") do
process_tasks
end
end
end
private def process_tasks
@tasks.each do |task|
begin
result_value = process_task(task)
@results.send(Result.new(
task_id: task.id,
success: true,
value: result_value
))
rescue ex
@results.send(Result.new(
task_id: task.id,
success: false,
error: ex.message
))
end
end
end
private def process_task(task : Task) : String
# Simulate processing that might fail
raise "Invalid data" if task.data.empty?
sleep 0.1
"Processed: #{task.data}"
end
def submit(task : Task)
@tasks.send(task)
end
def get_result : Result
@results.receive
end
def shutdown
@tasks.close
end
end
Parallel Map and Reduce
Implement parallel processing of collections.
Parallel Map
def parallel_map(collection : Array(T), workers : Int32 = 4, &block : T -> R) : Array(R) forall T, R
tasks = Channel(Tuple(Int32, T)).new
results = Channel(Tuple(Int32, R)).new
# Spawn workers
workers.times do
spawn do
tasks.each do |index, item|
result = yield item
results.send({index, result})
end
end
end
# Send tasks
spawn do
collection.each_with_index do |item, index|
tasks.send({index, item})
end
tasks.close
end
# Collect results in order
result_map = {} of Int32 => R
collection.size.times do
index, result = results.receive
result_map[index] = result
end
collection.indices.map { |i| result_map[i] }
end
# Usage
numbers = (1..100).to_a
squares = parallel_map(numbers, workers: 8) do |n|
sleep 0.01 # Simulate work
n * n
end
puts squares.first(10)
Parallel Reduce with Pipeline
def parallel_reduce(collection : Array(T), workers : Int32 = 4, initial : R, &block : R, T -> R) : R forall T, R
chunk_size = (collection.size / workers.to_f).ceil.to_i
chunks = collection.each_slice(chunk_size).to_a
results = Channel(R).new
chunks.each do |chunk|
spawn do
chunk_result = chunk.reduce(initial) { |acc, item| yield acc, item }
results.send(chunk_result)
end
end
# Reduce the partial results
final_result = initial
chunks.size.times do
final_result = yield final_result, results.receive
end
final_result
end
# Usage - sum of squares
numbers = (1..1000).to_a
sum = parallel_reduce(numbers, initial: 0) do |acc, n|
acc + n * n
end
puts "Sum of squares: #{sum}"
Mutex: Protecting Shared State
When fibers need to share mutable state, use mutexes to prevent race conditions.
Basic Mutex Usage
require "mutex"
class Counter
def initialize
@count = 0
@mutex = Mutex.new
end
def increment
@mutex.synchronize do
current = @count
sleep 0.001 # Simulate some work
@count = current + 1
end
end
def value : Int32
@mutex.synchronize { @count }
end
end
counter = Counter.new
# Spawn 100 fibers that each increment 10 times
100.times do
spawn do
10.times { counter.increment }
end
end
sleep 2
puts "Final count: #{counter.value}" # Should be 1000
Read-Write Lock Pattern
require "mutex"
class CachedData
def initialize
@data = {} of String => String
@mutex = Mutex.new
@version = 0
end
def read(key : String) : String?
@mutex.synchronize do
@data[key]?
end
end
def write(key : String, value : String)
@mutex.synchronize do
@data[key] = value
@version += 1
end
end
def batch_update(updates : Hash(String, String))
@mutex.synchronize do
updates.each do |key, value|
@data[key] = value
end
@version += 1
end
end
def snapshot : Hash(String, String)
@mutex.synchronize do
@data.dup
end
end
end
Atomic Operations
For simple counters and flags, atomic operations are more efficient than mutexes.
Atomic Counter
require "atomic"
class AtomicCounter
def initialize(initial : Int32 = 0)
@count = Atomic(Int32).new(initial)
end
def increment : Int32
@count.add(1)
end
def decrement : Int32
@count.sub(1)
end
def value : Int32
@count.get
end
def compare_and_set(expected : Int32, new_value : Int32) : Bool
@count.compare_and_set(expected, new_value)
end
end
counter = AtomicCounter.new
# Safe concurrent increments without mutex
1000.times do
spawn { counter.increment }
end
sleep 1
puts "Count: #{counter.value}"
Atomic Flag for Coordination
require "atomic"
class ShutdownCoordinator
def initialize
@shutdown_flag = Atomic(Int32).new(0)
end
def shutdown!
@shutdown_flag.set(1)
end
def shutdown? : Bool
@shutdown_flag.get == 1
end
def run_until_shutdown(&block)
until shutdown?
yield
sleep 0.1
end
end
end
coordinator = ShutdownCoordinator.new
# Worker that checks shutdown flag
spawn(name: "worker") do
coordinator.run_until_shutdown do
puts "Working..."
end
puts "Worker shutdown gracefully"
end
sleep 1
coordinator.shutdown!
sleep 0.5
When to Use This Skill
Use the crystal-concurrency skill when you need to:
- Process multiple I/O operations concurrently (network requests, file operations)
- Implement real-time data processing pipelines
- Build worker pools for parallel task processing
- Handle multiple client connections simultaneously (web servers, chat systems)
- Perform background processing without blocking main execution
- Aggregate results from multiple concurrent operations
- Implement producer-consumer patterns
- Build rate limiters and backpressure mechanisms
- Process large datasets in parallel
- Coordinate multiple asynchronous operations
- Implement timeout and cancellation patterns
- Build concurrent caches with synchronized access
- Stream data processing with multiple stages
- Implement fan-out/fan-in patterns
Best Practices
- Always Close Channels: Close channels when done sending to signal completion to receivers
- Use Buffered Channels for Performance: Buffer channels when producers/consumers run at different speeds
- Limit Fiber Count: Don't spawn unlimited fibers; use worker pools for bounded concurrency
- Handle Channel Closure: Always handle
Channel::ClosedErroror checkclosed?before operations - Use Select for Timeouts: Implement timeouts with
selectandtimeout()to prevent infinite blocking - Prefer Channels Over Shared State: Use message passing (channels) instead of shared memory when possible
- Synchronize Shared State: Always use
Mutexor atomics when sharing mutable state between fibers - Clean Up Resources: Use
ensureblocks to guarantee resource cleanup even on errors - Name Your Fibers: Give fibers descriptive names for easier debugging and profiling
- Avoid Blocking Operations in Fibers: Use non-blocking I/O; blocking operations prevent other fibers from running
- Use Atomic Operations for Counters: Atomics are more efficient than mutexes for simple counters and flags
- Implement Graceful Shutdown: Design systems to shut down cleanly, draining channels and waiting for fibers
- Handle Fiber Panics: Wrap fiber code in exception handlers to prevent silent failures
- Size Channel Buffers Appropriately: Too small causes blocking; too large wastes memory
- Use Select Default for Polling: Non-blocking checks with
select ... elsefor polling patterns
Common Pitfalls
- Forgetting to Close Channels: Receivers will wait forever if channels aren't closed after sending completes
- Deadlocks from Unbuffered Channels: Sending to unbuffered channel blocks until receiver is ready
- Race Conditions on Shared State: Not using mutexes/atomics when multiple fibers access same data
- Channel Buffer Overflow: Sending more items than buffer capacity without receivers causes blocking
- Not Handling Closed Channels: Receiving from closed channel raises exception; always handle it
- Spawning Too Many Fibers: Unlimited fiber spawning exhausts memory; use worker pools instead
- Blocking the Scheduler: CPU-intensive work in fibers prevents other fibers from running
- Resource Leaks: Not closing channels, files, or connections in all code paths including errors
- Order Assumptions: Fibers execute in non-deterministic order; don't assume execution sequence
- Timeout Too Short: Aggressive timeouts cause false failures; balance responsiveness with reliability
- Mutex Held Too Long: Long critical sections reduce concurrency; minimize mutex hold time
- Send/Receive Mismatch: Imbalanced producers/consumers leads to memory buildup or starvation
- Ignoring Fiber Exceptions: Exceptions in fibers don't propagate to spawner; handle explicitly
- Nested Mutex Locks: Can cause deadlocks; avoid acquiring multiple mutexes or use consistent order
- Not Using
synchronize: Forgetting to wrap mutex usage insynchronizeblock causes race conditions