implementing-io-pipelines

.NET Streaming (System.IO.Pipelines)

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "implementing-io-pipelines" with this command: npx skills add christian289/dotnet-with-claudecode/christian289-dotnet-with-claudecode-implementing-io-pipelines

.NET Streaming (System.IO.Pipelines)

A guide for System.IO.Pipelines API for high-performance I/O pipelines.

Quick Reference: See QUICKREF.md for essential patterns at a glance.

  1. Core Concepts

Concept Description

Pipe

Memory buffer-based read/write pipe

PipeReader

Read data from pipe

PipeWriter

Write data to pipe

ReadOnlySequence<T>

Non-contiguous memory sequence

  1. Advantages
  • Zero-copy: Minimizes unnecessary memory copying

  • Backpressure control: Speed regulation between producer and consumer

  • Memory pooling: Automatic buffer reuse

  • Async I/O: Efficient asynchronous processing

  1. Basic Usage

using System.IO.Pipelines;

public sealed class PipelineProcessor { public async Task ProcessAsync(Stream stream) { var pipe = new Pipe();

    // Run Writer and Reader concurrently
    var writing = FillPipeAsync(stream, pipe.Writer);
    var reading = ReadPipeAsync(pipe.Reader);

    await Task.WhenAll(writing, reading);
}

private async Task FillPipeAsync(Stream stream, PipeWriter writer)
{
    const int minimumBufferSize = 512;

    while (true)
    {
        // Acquire buffer from memory pool
        Memory&#x3C;byte> memory = writer.GetMemory(minimumBufferSize);

        int bytesRead = await stream.ReadAsync(memory);

        if (bytesRead == 0)
            break;

        // Notify bytes written
        writer.Advance(bytesRead);

        // Flush data and notify Reader
        FlushResult result = await writer.FlushAsync();

        if (result.IsCompleted)
            break;
    }

    // Signal write completion
    await writer.CompleteAsync();
}

private async Task ReadPipeAsync(PipeReader reader)
{
    while (true)
    {
        ReadResult result = await reader.ReadAsync();
        ReadOnlySequence&#x3C;byte> buffer = result.Buffer;

        // Process buffer
        ProcessBuffer(buffer);

        // Notify consumption up to processed position
        reader.AdvanceTo(buffer.End);

        if (result.IsCompleted)
            break;
    }

    // Signal read completion
    await reader.CompleteAsync();
}

}

  1. Line-by-Line Parsing

private async Task ReadLinesAsync(PipeReader reader) { while (true) { ReadResult result = await reader.ReadAsync(); ReadOnlySequence<byte> buffer = result.Buffer;

    while (TryReadLine(ref buffer, out ReadOnlySequence&#x3C;byte> line))
    {
        ProcessLine(line);
    }

    // Notify unprocessed data position
    reader.AdvanceTo(buffer.Start, buffer.End);

    if (result.IsCompleted)
        break;
}

await reader.CompleteAsync();

}

private bool TryReadLine( ref ReadOnlySequence<byte> buffer, out ReadOnlySequence<byte> line) { // Find newline SequencePosition? position = buffer.PositionOf((byte)'\n');

if (position is null)
{
    line = default;
    return false;
}

// Slice up to newline
line = buffer.Slice(0, position.Value);

// Move buffer past newline
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));

return true;

}

  1. Processing ReadOnlySequence

private void ProcessBuffer(ReadOnlySequence<byte> buffer) { if (buffer.IsSingleSegment) { // Single segment - direct access ProcessSpan(buffer.FirstSpan); } else { // Multiple segments - iteration required foreach (var segment in buffer) { ProcessSpan(segment.Span); } } }

  1. Network I/O Integration

public async Task ProcessSocketAsync(Socket socket) { var pipe = new Pipe();

var writing = ReceiveAsync(socket, pipe.Writer);
var reading = ProcessAsync(pipe.Reader);

await Task.WhenAll(writing, reading);

}

private async Task ReceiveAsync(Socket socket, PipeWriter writer) { while (true) { Memory<byte> memory = writer.GetMemory(4096);

    int bytesReceived = await socket.ReceiveAsync(
        memory,
        SocketFlags.None);

    if (bytesReceived == 0)
        break;

    writer.Advance(bytesReceived);

    FlushResult result = await writer.FlushAsync();

    if (result.IsCompleted)
        break;
}

await writer.CompleteAsync();

}

  1. PipeOptions Configuration

var pipeOptions = new PipeOptions( pool: MemoryPool<byte>.Shared, // Memory pool readerScheduler: PipeScheduler.ThreadPool, // Reader scheduler writerScheduler: PipeScheduler.ThreadPool, // Writer scheduler pauseWriterThreshold: 64 * 1024, // Writer pause threshold resumeWriterThreshold: 32 * 1024, // Writer resume threshold minimumSegmentSize: 4096, // Minimum segment size useSynchronizationContext: false );

var pipe = new Pipe(pipeOptions);

  1. Required NuGet Package

<ItemGroup> <!-- Included in BCL for .NET Core 3.0+ --> <PackageReference Include="System.IO.Pipelines" Version="9.0.*" /> </ItemGroup>

  1. Important Notes

AdvanceTo Call Required

// Must call AdvanceTo after ReadAsync ReadResult result = await reader.ReadAsync(); // ... processing ... reader.AdvanceTo(consumed, examined);

Buffer Lifetime

// ❌ Bad example: Saving buffer after ReadAsync ReadOnlySequence<byte> saved; var result = await reader.ReadAsync(); saved = result.Buffer; // Dangerous! Invalidated after AdvanceTo

// ✅ Good example: Copy needed data var copy = result.Buffer.ToArray(); reader.AdvanceTo(result.Buffer.End);

Completion Calls

// Must call CompleteAsync for both Writer and Reader await writer.CompleteAsync(); await reader.CompleteAsync();

  1. References
  • System.IO.Pipelines

  • High-performance I/O

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

converting-html-css-to-wpf-xaml

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

publishing-wpf-apps

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

using-xaml-property-element-syntax

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

managing-styles-resourcedictionary

No summary provided by upstream source.

Repository SourceNeeds Review