streams

This skill should be used when the user asks about "Effect Stream", "Stream.from", "Stream.map", "Stream.filter", "Stream.run", "streaming data", "async iteration", "Sink", "Channel", "Stream.concat", "Stream.merge", "backpressure", "Stream.fromIterable", "chunked processing", "real-time data", or needs to understand how Effect handles streaming data processing.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "streams" with this command: npx skills add andrueandersoncs/claude-skill-effect-ts/andrueandersoncs-claude-skill-effect-ts-streams

Streams in Effect

Overview

Effect Streams provide:

  • Lazy evaluation - Elements produced on demand
  • Resource safety - Automatic cleanup
  • Backpressure - Producer/consumer coordination
  • Composition - Transform, filter, merge streams
  • Error handling - Typed errors in stream pipeline
Stream<A, E, R>;
// Produces values of type A
// May fail with error E
// Requires environment R

Creating Streams

From Values

import { Stream } from "effect";

const numbers = Stream.make(1, 2, 3, 4, 5);

const fromArray = Stream.fromIterable([1, 2, 3]);

const empty = Stream.empty;

const single = Stream.succeed(42);

const infinite = Stream.iterate(1, (n) => n + 1);

From Effects

const fromEffect = Stream.fromEffect(fetchData());

const polling = Stream.repeatEffect(checkStatus());

const scheduled = Stream.repeatEffectWithSchedule(checkStatus(), Schedule.spaced("5 seconds"));

From Async Sources

// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(asyncGenerator(), (error) => new StreamError({ cause: error }));

// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
  const handler = (value: number) => emit.single(value);
  eventEmitter.on("data", handler);
  return Effect.sync(() => eventEmitter.off("data", handler));
});

// From queue
const fromQueue = Stream.fromQueue(queue);

Generating Streams

const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]));

const range = Stream.range(1, 100);

const repeated = Stream.repeat(Stream.succeed("ping")).pipe(Stream.take(5));

Transforming Streams

map - Transform Elements

const doubled = numbers.pipe(Stream.map((n) => n * 2));

const enriched = users.pipe(Stream.mapEffect((user) => fetchProfile(user.id)));

const parallel = items.pipe(Stream.mapEffect(process, { concurrency: 10 }));

filter - Select Elements

const evens = numbers.pipe(Stream.filter((n) => n % 2 === 0));

const valid = items.pipe(Stream.filterEffect((item) => validate(item)));

flatMap - Nested Streams

const expanded = numbers.pipe(Stream.flatMap((n) => Stream.make(n, n * 10, n * 100)));
// 1, 10, 100, 2, 20, 200, ...

take/drop

const first5 = numbers.pipe(Stream.take(5));
const skip5 = numbers.pipe(Stream.drop(5));
const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10));
const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10));

Combining Streams

concat - Sequential

const combined = Stream.concat(stream1, stream2);
// or
const combined = stream1.pipe(Stream.concat(stream2));

merge - Interleaved

// Interleave elements from both
const merged = Stream.merge(stream1, stream2);

// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 });

zip - Pair Elements

const zipped = Stream.zip(names, ages);
// Stream<[string, number]>

// With function
const combined = Stream.zipWith(names, ages, (name, age) => ({ name, age }));

interleave

const interleaved = Stream.interleave(stream1, stream2);
// a1, b1, a2, b2, ...

Consuming Streams

Running to Collection

const array = yield * Stream.runCollect(numbers);

const first = yield * Stream.runHead(numbers);

const sum = yield * Stream.runFold(numbers, 0, (acc, n) => acc + n);

Running for Effects

yield * numbers.pipe(Stream.runForEach((n) => Effect.log(`Got: ${n}`)));

yield * numbers.pipe(Stream.runDrain);

Running to Sink

import { Sink } from "effect";

const sum = yield * numbers.pipe(Stream.run(Sink.sum));

const array = yield * numbers.pipe(Stream.run(Sink.collectAll()));

Chunking

Streams process elements in chunks for efficiency:

const chunked = numbers.pipe(Stream.grouped(10));

const processed = numbers.pipe(Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2)));

const rechunked = numbers.pipe(Stream.rechunk(100));

Error Handling

const safe = stream.pipe(Stream.catchAll((error) => Stream.succeed(fallbackValue)));

const handled = stream.pipe(Stream.catchTag("NetworkError", (error) => Stream.succeed(cachedValue)));

const resilient = stream.pipe(Stream.retry(Schedule.exponential("1 second")));

const withFallback = stream.pipe(Stream.orElse(() => fallbackStream));

Resource Management

// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
  Effect.sync(() => fs.openSync("data.txt", "r")),
  (fd) => Effect.sync(() => fs.closeSync(fd)),
).pipe(
  Stream.flatMap((fd) =>
    Stream.repeatEffectOption(
      Effect.sync(() => {
        const buffer = Buffer.alloc(1024);
        const bytes = fs.readSync(fd, buffer);
        return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none();
      }),
    ),
  ),
);

// Scoped streams
const scoped = Stream.scoped(Effect.acquireRelease(openConnection, closeConnection));

Sinks

Sinks consume stream elements:

import { Sink } from "effect";

Sink.sum;
Sink.count;
Sink.head;
Sink.last;
Sink.collectAll();
Sink.forEach(f);

const maxSink = Sink.foldLeft(Number.NEGATIVE_INFINITY, (max, n: number) => Math.max(max, n));

Common Patterns

Batched Processing

const batchProcess = stream.pipe(
  Stream.grouped(100),
  Stream.mapEffect((batch) => Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))),
);

Rate Limiting

const rateLimited = stream.pipe(
  Stream.throttle({
    units: 1,
    duration: "100 millis",
    strategy: "shape",
  }),
);

Debouncing

const debounced = stream.pipe(Stream.debounce("500 millis"));

Windowing

// Time-based windows
const windows = stream.pipe(Stream.groupedWithin(1000, "1 second"));

Best Practices

  1. Use chunking for efficiency - Batch operations when possible
  2. Handle backpressure - Use appropriate buffer strategies
  3. Clean up resources - Use acquireRelease for external resources
  4. Process in parallel - Use concurrency option in mapEffect
  5. Handle errors early - Catch/retry before final consumption

Additional Resources

For comprehensive stream documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.

Search for these sections:

  • "Creating Streams" for stream construction
  • "Consuming Streams" for running streams
  • "Operations" for transformations
  • "Error Handling in Streams" for error patterns
  • "Resourceful Streams" for resource management
  • "Sink" for custom sinks

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

testing

No summary provided by upstream source.

Repository SourceNeeds Review
General

traits

No summary provided by upstream source.

Repository SourceNeeds Review
General

configuration

No summary provided by upstream source.

Repository SourceNeeds Review