implementing-pubsub-pattern

A guide for Pub-Sub patterns for event-based asynchronous communication.

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "implementing-pubsub-pattern" with this command: npx skills add christian289/dotnet-with-claudecode/christian289-dotnet-with-claudecode-implementing-pubsub-pattern

.NET Pub-Sub Pattern

A guide for Pub-Sub patterns for event-based asynchronous communication.

Quick Reference: See QUICKREF.md for essential patterns at a glance.

  1. Core APIs

API Purpose NuGet

System.Reactive (Rx.NET) Reactive event streams System.Reactive

System.Threading.Channels

Async Producer-Consumer BCL

IObservable<T>

Observable sequence BCL

  1. System.Threading.Channels

2.1 Basic Usage

using System.Threading.Channels;

public sealed class MessageProcessor { private readonly Channel<Message> _channel = Channel.CreateUnbounded<Message>();

// Producer - Send message
public async Task SendAsync(Message message)
{
    await _channel.Writer.WriteAsync(message);
}

// Consumer - Process message
public async Task ProcessAsync(CancellationToken ct)
{
    await foreach (var message in _channel.Reader.ReadAllAsync(ct))
    {
        await HandleMessage(message);
    }
}

// Channel completion signal
public void Complete() => _channel.Writer.Complete();

}

2.2 Bounded Channel (Backpressure Control)

// Backpressure control with buffer size limit var options = new BoundedChannelOptions(capacity: 100) { FullMode = BoundedChannelFullMode.Wait, // Wait when full SingleReader = true, SingleWriter = false };

var channel = Channel.CreateBounded<Message>(options);

// Writer waits until space is available await channel.Writer.WriteAsync(message);

2.3 Multiple Consumer Pattern

public sealed class WorkerPool { private readonly Channel<WorkItem> _channel; private readonly int _workerCount;

public WorkerPool(int workerCount = 4)
{
    _workerCount = workerCount;
    _channel = Channel.CreateUnbounded&#x3C;WorkItem>();
}

public async Task StartAsync(CancellationToken ct)
{
    var workers = Enumerable.Range(0, _workerCount)
        .Select(_ => ProcessAsync(ct));

    await Task.WhenAll(workers);
}

private async Task ProcessAsync(CancellationToken ct)
{
    await foreach (var item in _channel.Reader.ReadAllAsync(ct))
    {
        await ProcessItem(item);
    }
}

public ValueTask EnqueueAsync(WorkItem item) =>
    _channel.Writer.WriteAsync(item);

}

  1. System.Reactive (Rx.NET)

3.1 EventAggregator Pattern

using System.Reactive.Linq; using System.Reactive.Subjects;

public sealed class EventAggregator : IDisposable { private readonly Subject<object> _subject = new();

// Subscribe to specific event type
public IObservable&#x3C;T> GetEvent&#x3C;T>() =>
    _subject.OfType&#x3C;T>().AsObservable();

// Publish event
public void Publish&#x3C;T>(T @event) =>
    _subject.OnNext(@event!);

public void Dispose() => _subject.Dispose();

}

3.2 Usage Example

// Event definitions public record UserLoggedIn(string UserId); public record OrderPlaced(int OrderId);

// Subscription var aggregator = new EventAggregator();

aggregator.GetEvent<UserLoggedIn>() .Subscribe(e => Console.WriteLine($"User logged in: {e.UserId}"));

aggregator.GetEvent<OrderPlaced>() .Where(e => e.OrderId > 100) .Subscribe(e => Console.WriteLine($"Large order: {e.OrderId}"));

// Publish aggregator.Publish(new UserLoggedIn("user123")); aggregator.Publish(new OrderPlaced(150));

3.3 Rx Operators

// Debounce - Process only the last event in a sequence searchInput .Throttle(TimeSpan.FromMilliseconds(300)) .DistinctUntilChanged() .Subscribe(query => Search(query));

// Buffer - Collect events for a period and process as batch events .Buffer(TimeSpan.FromSeconds(5)) .Subscribe(batch => ProcessBatch(batch));

// Retry - Retry on failure observable .Retry(3) .Subscribe( onNext: data => Process(data), onError: ex => LogError(ex) );

  1. Comparison: Channels vs Rx

Feature Channels Rx.NET

Purpose Producer-Consumer Event streams

Backpressure Built-in (Bounded) Separate implementation

Operators Basic Rich

Learning curve Low High

Dependency BCL NuGet

  1. DI Integration

// Program.cs services.AddSingleton(Channel.CreateUnbounded<Message>()); services.AddSingleton(sp => sp.GetRequiredService<Channel<Message>>().Reader); services.AddSingleton(sp => sp.GetRequiredService<Channel<Message>>().Writer);

// Producer public sealed class Producer(ChannelWriter<Message> writer) { public ValueTask SendAsync(Message msg) => writer.WriteAsync(msg); }

// Consumer public sealed class Consumer(ChannelReader<Message> reader) { public async Task ProcessAsync(CancellationToken ct) { await foreach (var msg in reader.ReadAllAsync(ct)) { await Handle(msg); } } }

  1. Required NuGet Package

<ItemGroup> <PackageReference Include="System.Reactive" Version="6.0.*" /> </ItemGroup>

  1. Important Notes

Memory Leaks

// Subscription disposal is required var subscription = observable.Subscribe(handler);

// After use subscription.Dispose();

Thread Safety

  • Channels are thread-safe by default

  • Subject is not thread-safe (use Synchronize() if needed)

Backpressure Handling

// Prevent memory explosion with Bounded Channel var channel = Channel.CreateBounded<Message>(new BoundedChannelOptions(1000) { FullMode = BoundedChannelFullMode.DropOldest // Drop old messages });

  1. References
  • Channels

  • System.Reactive

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

converting-html-css-to-wpf-xaml

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

publishing-wpf-apps

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

managing-styles-resourcedictionary

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

using-xaml-property-element-syntax

No summary provided by upstream source.

Repository SourceNeeds Review