Overview
Master concurrent execution in Effect using fibers. This skill covers forking, joining, interruption, parallel execution, and advanced concurrency patterns for building high-performance Effect applications.
Fibers Fundamentals
What are Fibers?
Fibers are lightweight virtual threads that execute effects concurrently:
import { Effect, Fiber } from "effect"
// Every effect runs on a fiber
const effect = Effect.succeed(42)
// When run, this executes on a fiber
// Effects are descriptions - fibers are executions
// Effect: lazy, immutable description
// Fiber: running execution with state
Forking Effects
Create independent concurrent fibers:
import { Effect, Fiber } from "effect"
const task = Effect.gen(function* () {
yield* Effect.sleep("1 second")
yield* Effect.log("Task completed")
return 42
})
const program = Effect.gen(function* () {
// Fork creates a new fiber
const fiber = yield* Effect.fork(task)
// fiber: RuntimeFiber<number, never>
yield* Effect.log("Main fiber continues")
// Join waits for fiber to complete
const result = yield* Fiber.join(fiber)
yield* Effect.log(`Result: ${result}`)
return result
})
Fiber Operations
import { Effect, Fiber } from "effect"
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
// Join - wait for result
const result = yield* Fiber.join(fiber)
// Await - get Exit value (success/failure/interruption)
const exit = yield* Fiber.await(fiber)
// Interrupt - cancel execution
yield* Fiber.interrupt(fiber)
// Poll - check if complete (non-blocking)
const status = yield* Fiber.poll(fiber)
})
Parallel Execution
Effect.all - Run Multiple Effects
import { Effect } from "effect"
// Parallel execution (default)
const program = Effect.gen(function* () {
const results = yield* Effect.all([
fetchUser("1"),
fetchUser("2"),
fetchUser("3")
])
// All requests run concurrently
return results
})
// Sequential execution
const sequential = Effect.gen(function* () {
const results = yield* Effect.all([
fetchUser("1"),
fetchUser("2"),
fetchUser("3")
], { concurrency: 1 })
return results
})
// Limited concurrency
const limited = Effect.gen(function* () {
const results = yield* Effect.all(
Array.from({ length: 100 }, (_, i) => fetchUser(`${i}`)),
{ concurrency: 10 } // Max 10 concurrent
)
return results
})
Effect.all with Batching
import { Effect } from "effect"
// Batching for efficiency
const batchFetch = Effect.gen(function* () {
const userIds = Array.from({ length: 1000 }, (_, i) => `${i}`)
const results = yield* Effect.all(
userIds.map(id => fetchUser(id)),
{
concurrency: 50, // 50 concurrent requests
batching: true // Enable batching optimization
}
)
return results
})
Effect.forEach - Concurrent Iteration
import { Effect } from "effect"
const processUsers = (userIds: string[]) =>
Effect.forEach(
userIds,
(id) => Effect.gen(function* () {
const user = yield* fetchUser(id)
const processed = yield* processUser(user)
return processed
}),
{ concurrency: "unbounded" } // No limit
)
// With concurrency limit
const processUsersLimited = (userIds: string[]) =>
Effect.forEach(
userIds,
(id) => processUser(id),
{ concurrency: 10 }
)
Racing Effects
Effect.race - First to Complete
import { Effect } from "effect"
const fetchWithFallback = (id: string) =>
Effect.race(
fetchFromPrimaryDb(id),
fetchFromSecondaryDb(id)
)
// Returns whichever completes first
// Racing multiple effects
const fastestSource = Effect.race(
fetchFromSource1(),
fetchFromSource2(),
fetchFromSource3()
)
Effect.raceAll - Race Multiple Effects
import { Effect } from "effect"
const sources = [
fetchFromSource1(),
fetchFromSource2(),
fetchFromSource3()
]
// First to succeed wins
const fastest = Effect.raceAll(sources)
Timeout Racing
import { Effect } from "effect"
const withTimeout = <A, E, R>(
effect: Effect.Effect<A, E, R>,
duration: Duration.Duration
) =>
Effect.race(
effect,
Effect.sleep(duration).pipe(
Effect.andThen(Effect.fail({ _tag: "Timeout" }))
)
)
const program = Effect.gen(function* () {
const result = yield* withTimeout(
slowOperation(),
Duration.seconds(5)
)
return result
})
Interruption
Fiber Interruption
import { Effect, Fiber } from "effect"
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask)
// Cancel after 1 second
yield* Effect.sleep("1 second")
yield* Fiber.interrupt(fiber)
yield* Effect.log("Task cancelled")
})
// Automatic interruption on parent exit
const autoInterrupt = Effect.gen(function* () {
const fiber = yield* Effect.fork(infiniteLoop)
// fiber will be interrupted when this effect completes
})
Uninterruptible Regions
import { Effect } from "effect"
const criticalSection = Effect.gen(function* () {
// This region cannot be interrupted
yield* Effect.uninterruptible(
Effect.gen(function* () {
yield* beginTransaction()
yield* updateDatabase()
yield* commitTransaction()
})
)
})
// Interruptible regions within uninterruptible
const mixed = Effect.uninterruptible(
Effect.gen(function* () {
yield* criticalOperation1()
// Allow interruption here
yield* Effect.interruptible(
nonCriticalOperation()
)
yield* criticalOperation2()
})
)
Daemon Fibers
Fork Daemon - Independent Fibers
import { Effect } from "effect"
const program = Effect.gen(function* () {
// Regular fork - interrupted when parent exits
const regularFiber = yield* Effect.fork(task)
// Daemon fork - survives parent exit
const daemonFiber = yield* Effect.forkDaemon(backgroundTask)
// Parent exits, regularFiber interrupted, daemonFiber continues
})
// Background worker example
const startBackgroundWorker = Effect.gen(function* () {
yield* Effect.forkDaemon(
Effect.gen(function* () {
while (true) {
yield* processQueue()
yield* Effect.sleep("1 second")
}
})
)
})
Scoped Concurrency
Effect.forkScoped - Fiber Cleanup
import { Effect, Scope } from "effect"
const program = Effect.gen(function* () {
yield* Effect.scoped(
Effect.gen(function* () {
// Fibers are tied to scope
const fiber1 = yield* Effect.forkScoped(task1)
const fiber2 = yield* Effect.forkScoped(task2)
// Do work
yield* doWork()
// Scope exit automatically interrupts fibers
})
)
// fiber1 and fiber2 are interrupted here
})
Fork In Scope
import { Effect } from "effect"
const managedConcurrency = Effect.gen(function* () {
const scope = yield* Scope.make()
// Fork in specific scope
const fiber = yield* Effect.forkIn(task, scope)
// Work continues
yield* doWork()
// Close scope, interrupt fiber
yield* Scope.close(scope, Exit.succeed(undefined))
})
Advanced Patterns
Worker Pool
import { Effect, Queue } from "effect"
interface Task {
id: string
data: unknown
}
const createWorkerPool = (workers: number) =>
Effect.gen(function* () {
const queue = yield* Queue.bounded<Task>(100)
// Start workers
const workerFibers = yield* Effect.all(
Array.from({ length: workers }, () =>
Effect.fork(
Effect.forever(
Effect.gen(function* () {
const task = yield* Queue.take(queue)
yield* processTask(task)
})
)
)
)
)
return {
submit: (task: Task) => Queue.offer(queue, task),
shutdown: () =>
Effect.all(
workerFibers.map(fiber => Fiber.interrupt(fiber))
)
}
})
Parallel Map-Reduce
import { Effect, Chunk } from "effect"
const parallelMapReduce = <A, B, E, R>(
items: A[],
map: (item: A) => Effect.Effect<B, E, R>,
reduce: (acc: B, item: B) => B,
initial: B,
concurrency: number
) =>
Effect.gen(function* () {
const mapped = yield* Effect.forEach(
items,
map,
{ concurrency }
)
return mapped.reduce(reduce, initial)
})
Request Deduplication
import { Effect, Request, RequestResolver } from "effect"
interface GetUser extends Request.Request<User, UserNotFound> {
readonly _tag: "GetUser"
readonly id: string
}
const GetUserResolver = RequestResolver.makeBatched(
(requests: GetUser[]) =>
Effect.gen(function* () {
const ids = requests.map(r => r.id)
const users = yield* fetchUsersBatch(ids)
// Resolve all requests
return Effect.forEach(requests, (request) => {
const user = users.find(u => u.id === request.id)
return user
? Request.complete(request, user)
: Request.fail(request, { _tag: "UserNotFound", id: request.id })
})
})
)
// Multiple concurrent requests for same ID deduplicated
const program = Effect.gen(function* () {
const results = yield* Effect.all([
Effect.request(GetUser({ id: "1" }), GetUserResolver),
Effect.request(GetUser({ id: "1" }), GetUserResolver),
Effect.request(GetUser({ id: "1" }), GetUserResolver)
])
// Only one actual fetch for ID "1"
})
Best Practices
-
Use Effect.all for Parallel Work: Don't fork manually when Effect.all suffices.
-
Limit Concurrency: Set appropriate concurrency limits to avoid resource exhaustion.
-
Handle Interruption: Ensure cleanup code runs in uninterruptible regions.
-
Use Scoped Forks: Tie fiber lifetime to scopes for automatic cleanup.
-
Avoid Infinite Loops: Use Effect.forever with sleep for background tasks.
-
Batch Requests: Use request resolvers to batch and deduplicate.
-
Timeout Long Operations: Add timeouts to prevent hanging.
-
Monitor Fiber Status: Use Fiber.await and Fiber.poll for status checks.
-
Use Daemon Sparingly: Only fork daemons when truly independent.
-
Test Concurrent Code: Write tests for race conditions and interruption.
Common Pitfalls
-
Forgetting to Join: Forking without joining loses results.
-
No Concurrency Limits: Unbounded concurrency can exhaust resources.
-
Not Handling Interruption: Missing cleanup in interruptible regions.
-
Race Conditions: Sharing mutable state between fibers.
-
Deadlocks: Circular dependencies between fibers.
-
Ignoring Failures: Not checking fiber exit status.
-
Memory Leaks: Daemon fibers that never terminate.
-
Over-Forking: Creating too many fibers unnecessarily.
-
Missing Timeouts: Long-running operations without limits.
-
Wrong Execution Mode: Using sequential when parallel is intended.
When to Use This Skill
Use effect-concurrency when you need to:
- Execute multiple operations in parallel
- Build high-performance data pipelines
- Handle concurrent user requests
- Implement background workers
- Race multiple data sources
- Add timeouts to operations
- Build concurrent job processors
- Manage fiber lifecycles
- Implement request deduplication
- Optimize throughput with batching
Resources
Official Documentation
Related Skills
- effect-core-patterns - Basic Effect operations
- effect-resource-management - Resource cleanup with scopes