dual-stream-architecture

Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture.

Safety Notice

This listing is from the official public ClawHub registry. Review SKILL.md and referenced scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "dual-stream-architecture" with this command: npx skills add wpank/dual-stream-architecture

Dual-Stream Architecture

Publish events to Kafka (durability) and Redis Pub/Sub (real-time) simultaneously for systems needing both guaranteed delivery and instant updates.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install dual-stream-architecture

When to Use

  • Event-driven systems needing both durability AND real-time
  • WebSocket/SSE backends that push live updates
  • Dashboards showing events as they happen
  • Kafka consumers have lag but users expect instant updates

Core Pattern

type DualPublisher struct {
    kafka  *kafka.Writer
    redis  *redis.Client
    logger *slog.Logger
}

func (p *DualPublisher) Publish(ctx context.Context, event Event) error {
    // 1. Kafka: Critical path - must succeed
    payload, _ := json.Marshal(event)
    err := p.kafka.WriteMessages(ctx, kafka.Message{
        Key:   []byte(event.SourceID),
        Value: payload,
    })
    if err != nil {
        return fmt.Errorf("kafka publish failed: %w", err)
    }

    // 2. Redis: Best-effort - don't fail the operation
    p.publishToRedis(ctx, event)

    return nil
}

func (p *DualPublisher) publishToRedis(ctx context.Context, event Event) {
    // Lightweight payload (full event in Kafka)
    notification := map[string]interface{}{
        "id":        event.ID,
        "type":      event.Type,
        "source_id": event.SourceID,
    }

    payload, _ := json.Marshal(notification)
    channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)

    // Fire and forget - log errors but don't propagate
    if err := p.redis.Publish(ctx, channel, payload).Err(); err != nil {
        p.logger.Warn("redis publish failed", "error", err)
    }
}

Architecture

┌──────────────┐     ┌─────────────────┐     ┌──────────────┐
│   Ingester   │────▶│  DualPublisher  │────▶│    Kafka     │──▶ Event Processor
│              │     │                 │     │  (durable)   │
└──────────────┘     │                 │     └──────────────┘
                     │                 │     ┌──────────────┐
                     │                 │────▶│ Redis PubSub │──▶ WebSocket Gateway
                     │                 │     │ (real-time)  │
                     └─────────────────┘     └──────────────┘

Channel Naming Convention

events:{source_type}:{source_id}

Examples:
- events:user:octocat      - Events for user octocat
- events:repo:owner/repo   - Events for a repository
- events:org:microsoft     - Events for an organization

Batch Publishing

For high throughput:

func (p *DualPublisher) PublishBatch(ctx context.Context, events []Event) error {
    // 1. Batch to Kafka
    messages := make([]kafka.Message, len(events))
    for i, event := range events {
        payload, _ := json.Marshal(event)
        messages[i] = kafka.Message{
            Key:   []byte(event.SourceID),
            Value: payload,
        }
    }

    if err := p.kafka.WriteMessages(ctx, messages...); err != nil {
        return fmt.Errorf("kafka batch failed: %w", err)
    }

    // 2. Redis: Pipeline for efficiency
    pipe := p.redis.Pipeline()
    for _, event := range events {
        channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)
        notification, _ := json.Marshal(map[string]interface{}{
            "id":   event.ID,
            "type": event.Type,
        })
        pipe.Publish(ctx, channel, notification)
    }
    
    if _, err := pipe.Exec(ctx); err != nil {
        p.logger.Warn("redis batch failed", "error", err)
    }

    return nil
}

Decision Tree

RequirementStreamWhy
Must not lose eventKafka onlyAck required, replicated
User sees immediatelyRedis onlySub-ms delivery
Both durability + real-timeDual streamThis pattern
High volume (>10k/sec)Kafka, batch RedisRedis can bottleneck
Many subscribers per channelRedis + local fan-outDon't hammer Redis

Related Skills


NEVER Do

  • NEVER fail on Redis errors — Redis is best-effort. Log and continue.
  • NEVER send full payload to Redis — Send IDs only, clients fetch from API.
  • NEVER create one Redis channel per event — Use source-level channels.
  • NEVER skip Kafka for "unimportant" events — All events go to Kafka for replay.
  • NEVER use Redis Pub/Sub for persistence — Messages are fire-and-forget.

Edge Cases

CaseSolution
Redis downLog warning, continue with Kafka only
Client connects mid-streamQuery API for recent events, then subscribe
High channel cardinalityUse wildcard patterns or aggregate channels
Kafka backpressureBuffer in memory with timeout, fail if full
Need event replayConsume from Kafka from offset, not Redis

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

Git Workflow

Use this skill for any git commit, pull request, or release task. Invoke immediately when the user wants to: stage and commit changes, write a commit message...

Registry SourceRecently Updated
Automation

Atlassian Jira by altf1be

Atlassian Jira Cloud CRUD skill — manage issues, comments, attachments, workflow transitions, and JQL search via Jira REST API v3 with email + API token auth.

Registry SourceRecently Updated
Automation

nix-memory

Monitors and scores agent identity and memory integrity by hashing key files, tracking changes, detecting drift, and providing continuity alerts for OpenClaw...

Registry SourceRecently Updated