effect-concurrency

Master concurrent execution in Effect using fibers. This skill covers forking, joining, interruption, parallel execution, and advanced concurrency patterns for building high-performance Effect applications.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "effect-concurrency" with this command: npx skills add thebushidocollective/han/thebushidocollective-han-effect-concurrency

Effect Concurrency

Master concurrent execution in Effect using fibers. This skill covers forking, joining, interruption, parallel execution, and advanced concurrency patterns for building high-performance Effect applications.

Fibers Fundamentals

What are Fibers?

Fibers are lightweight virtual threads that execute effects concurrently:

import { Effect, Fiber } from "effect"

// Every effect runs on a fiber const effect = Effect.succeed(42) // When run, this executes on a fiber

// Effects are descriptions - fibers are executions // Effect: lazy, immutable description // Fiber: running execution with state

Forking Effects

Create independent concurrent fibers:

import { Effect, Fiber } from "effect"

const task = Effect.gen(function* () { yield* Effect.sleep("1 second") yield* Effect.log("Task completed") return 42 })

const program = Effect.gen(function* () { // Fork creates a new fiber const fiber = yield* Effect.fork(task) // fiber: RuntimeFiber<number, never>

yield* Effect.log("Main fiber continues")

// Join waits for fiber to complete const result = yield* Fiber.join(fiber) yield* Effect.log(Result: ${result})

return result })

Fiber Operations

import { Effect, Fiber } from "effect"

const program = Effect.gen(function* () { const fiber = yield* Effect.fork(longRunningTask)

// Join - wait for result const result = yield* Fiber.join(fiber)

// Await - get Exit value (success/failure/interruption) const exit = yield* Fiber.await(fiber)

// Interrupt - cancel execution yield* Fiber.interrupt(fiber)

// Poll - check if complete (non-blocking) const status = yield* Fiber.poll(fiber) })

Parallel Execution

Effect.all - Run Multiple Effects

import { Effect } from "effect"

// Parallel execution (default) const program = Effect.gen(function* () { const results = yield* Effect.all([ fetchUser("1"), fetchUser("2"), fetchUser("3") ]) // All requests run concurrently return results })

// Sequential execution const sequential = Effect.gen(function* () { const results = yield* Effect.all([ fetchUser("1"), fetchUser("2"), fetchUser("3") ], { concurrency: 1 }) return results })

// Limited concurrency const limited = Effect.gen(function* () { const results = yield* Effect.all( Array.from({ length: 100 }, (_, i) => fetchUser(${i})), { concurrency: 10 } // Max 10 concurrent ) return results })

Effect.all with Batching

import { Effect } from "effect"

// Batching for efficiency const batchFetch = Effect.gen(function* () { const userIds = Array.from({ length: 1000 }, (_, i) => ${i})

const results = yield* Effect.all( userIds.map(id => fetchUser(id)), { concurrency: 50, // 50 concurrent requests batching: true // Enable batching optimization } )

return results })

Effect.forEach - Concurrent Iteration

import { Effect } from "effect"

const processUsers = (userIds: string[]) => Effect.forEach( userIds, (id) => Effect.gen(function* () { const user = yield* fetchUser(id) const processed = yield* processUser(user) return processed }), { concurrency: "unbounded" } // No limit )

// With concurrency limit const processUsersLimited = (userIds: string[]) => Effect.forEach( userIds, (id) => processUser(id), { concurrency: 10 } )

Racing Effects

Effect.race - First to Complete

import { Effect } from "effect"

const fetchWithFallback = (id: string) => Effect.race( fetchFromPrimaryDb(id), fetchFromSecondaryDb(id) ) // Returns whichever completes first

// Racing multiple effects const fastestSource = Effect.race( fetchFromSource1(), fetchFromSource2(), fetchFromSource3() )

Effect.raceAll - Race Multiple Effects

import { Effect } from "effect"

const sources = [ fetchFromSource1(), fetchFromSource2(), fetchFromSource3() ]

// First to succeed wins const fastest = Effect.raceAll(sources)

Timeout Racing

import { Effect } from "effect"

const withTimeout = <A, E, R>( effect: Effect.Effect<A, E, R>, duration: Duration.Duration ) => Effect.race( effect, Effect.sleep(duration).pipe( Effect.andThen(Effect.fail({ _tag: "Timeout" })) ) )

const program = Effect.gen(function* () { const result = yield* withTimeout( slowOperation(), Duration.seconds(5) ) return result })

Interruption

Fiber Interruption

import { Effect, Fiber } from "effect"

const program = Effect.gen(function* () { const fiber = yield* Effect.fork(longRunningTask)

// Cancel after 1 second yield* Effect.sleep("1 second") yield* Fiber.interrupt(fiber)

yield* Effect.log("Task cancelled") })

// Automatic interruption on parent exit const autoInterrupt = Effect.gen(function* () { const fiber = yield* Effect.fork(infiniteLoop) // fiber will be interrupted when this effect completes })

Uninterruptible Regions

import { Effect } from "effect"

const criticalSection = Effect.gen(function* () { // This region cannot be interrupted yield* Effect.uninterruptible( Effect.gen(function* () { yield* beginTransaction() yield* updateDatabase() yield* commitTransaction() }) ) })

// Interruptible regions within uninterruptible const mixed = Effect.uninterruptible( Effect.gen(function* () { yield* criticalOperation1()

// Allow interruption here
yield* Effect.interruptible(
  nonCriticalOperation()
)

yield* criticalOperation2()

}) )

Daemon Fibers

Fork Daemon - Independent Fibers

import { Effect } from "effect"

const program = Effect.gen(function* () { // Regular fork - interrupted when parent exits const regularFiber = yield* Effect.fork(task)

// Daemon fork - survives parent exit const daemonFiber = yield* Effect.forkDaemon(backgroundTask)

// Parent exits, regularFiber interrupted, daemonFiber continues })

// Background worker example const startBackgroundWorker = Effect.gen(function* () { yield* Effect.forkDaemon( Effect.gen(function* () { while (true) { yield* processQueue() yield* Effect.sleep("1 second") } }) ) })

Scoped Concurrency

Effect.forkScoped - Fiber Cleanup

import { Effect, Scope } from "effect"

const program = Effect.gen(function* () { yield* Effect.scoped( Effect.gen(function* () { // Fibers are tied to scope const fiber1 = yield* Effect.forkScoped(task1) const fiber2 = yield* Effect.forkScoped(task2)

  // Do work
  yield* doWork()

  // Scope exit automatically interrupts fibers
})

) // fiber1 and fiber2 are interrupted here })

Fork In Scope

import { Effect } from "effect"

const managedConcurrency = Effect.gen(function* () { const scope = yield* Scope.make()

// Fork in specific scope const fiber = yield* Effect.forkIn(task, scope)

// Work continues yield* doWork()

// Close scope, interrupt fiber yield* Scope.close(scope, Exit.succeed(undefined)) })

Advanced Patterns

Worker Pool

import { Effect, Queue } from "effect"

interface Task { id: string data: unknown }

const createWorkerPool = (workers: number) => Effect.gen(function* () { const queue = yield* Queue.bounded<Task>(100)

// Start workers
const workerFibers = yield* Effect.all(
  Array.from({ length: workers }, () =>
    Effect.fork(
      Effect.forever(
        Effect.gen(function* () {
          const task = yield* Queue.take(queue)
          yield* processTask(task)
        })
      )
    )
  )
)

return {
  submit: (task: Task) => Queue.offer(queue, task),
  shutdown: () =>
    Effect.all(
      workerFibers.map(fiber => Fiber.interrupt(fiber))
    )
}

})

Parallel Map-Reduce

import { Effect, Chunk } from "effect"

const parallelMapReduce = <A, B, E, R>( items: A[], map: (item: A) => Effect.Effect<B, E, R>, reduce: (acc: B, item: B) => B, initial: B, concurrency: number ) => Effect.gen(function* () { const mapped = yield* Effect.forEach( items, map, { concurrency } )

return mapped.reduce(reduce, initial)

})

Request Deduplication

import { Effect, Request, RequestResolver } from "effect"

interface GetUser extends Request.Request<User, UserNotFound> { readonly _tag: "GetUser" readonly id: string }

const GetUserResolver = RequestResolver.makeBatched( (requests: GetUser[]) => Effect.gen(function* () { const ids = requests.map(r => r.id) const users = yield* fetchUsersBatch(ids)

  // Resolve all requests
  return Effect.forEach(requests, (request) => {
    const user = users.find(u => u.id === request.id)
    return user
      ? Request.complete(request, user)
      : Request.fail(request, { _tag: "UserNotFound", id: request.id })
  })
})

)

// Multiple concurrent requests for same ID deduplicated const program = Effect.gen(function* () { const results = yield* Effect.all([ Effect.request(GetUser({ id: "1" }), GetUserResolver), Effect.request(GetUser({ id: "1" }), GetUserResolver), Effect.request(GetUser({ id: "1" }), GetUserResolver) ]) // Only one actual fetch for ID "1" })

Best Practices

Use Effect.all for Parallel Work: Don't fork manually when Effect.all suffices.

Limit Concurrency: Set appropriate concurrency limits to avoid resource exhaustion.

Handle Interruption: Ensure cleanup code runs in uninterruptible regions.

Use Scoped Forks: Tie fiber lifetime to scopes for automatic cleanup.

Avoid Infinite Loops: Use Effect.forever with sleep for background tasks.

Batch Requests: Use request resolvers to batch and deduplicate.

Timeout Long Operations: Add timeouts to prevent hanging.

Monitor Fiber Status: Use Fiber.await and Fiber.poll for status checks.

Use Daemon Sparingly: Only fork daemons when truly independent.

Test Concurrent Code: Write tests for race conditions and interruption.

Common Pitfalls

Forgetting to Join: Forking without joining loses results.

No Concurrency Limits: Unbounded concurrency can exhaust resources.

Not Handling Interruption: Missing cleanup in interruptible regions.

Race Conditions: Sharing mutable state between fibers.

Deadlocks: Circular dependencies between fibers.

Ignoring Failures: Not checking fiber exit status.

Memory Leaks: Daemon fibers that never terminate.

Over-Forking: Creating too many fibers unnecessarily.

Missing Timeouts: Long-running operations without limits.

Wrong Execution Mode: Using sequential when parallel is intended.

When to Use This Skill

Use effect-concurrency when you need to:

  • Execute multiple operations in parallel

  • Build high-performance data pipelines

  • Handle concurrent user requests

  • Implement background workers

  • Race multiple data sources

  • Add timeouts to operations

  • Build concurrent job processors

  • Manage fiber lifecycles

  • Implement request deduplication

  • Optimize throughput with batching

Resources

Official Documentation

  • Concurrency

  • Fibers

  • Concurrency Options

  • Racing

  • Interruption

Related Skills

  • effect-core-patterns - Basic Effect operations

  • effect-resource-management - Resource cleanup with scopes

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

android-jetpack-compose

No summary provided by upstream source.

Repository SourceNeeds Review
General

fastapi-async-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

storybook-story-writing

No summary provided by upstream source.

Repository SourceNeeds Review
General

atomic-design-fundamentals

No summary provided by upstream source.

Repository SourceNeeds Review