dual-stream-architecture

Dual-stream event publishing combining Kafka for durability with Redis Pub/Sub for real-time delivery. Use when building event-driven systems needing both guaranteed delivery and low-latency updates. Triggers on dual stream, event publishing, Kafka Redis, real-time events, pub/sub, streaming architecture.

Safety Notice

This listing is from the official public ClawHub registry. Review SKILL.md and referenced scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "dual-stream-architecture" with this command: npx skills add wpank/dual-stream-architecture

Dual-Stream Architecture

Publish events to Kafka (durability) and Redis Pub/Sub (real-time) simultaneously for systems needing both guaranteed delivery and instant updates.

Installation

OpenClaw / Moltbot / Clawbot

npx clawhub@latest install dual-stream-architecture

When to Use

  • Event-driven systems needing both durability AND real-time
  • WebSocket/SSE backends that push live updates
  • Dashboards showing events as they happen
  • Kafka consumers have lag but users expect instant updates

Core Pattern

type DualPublisher struct {
    kafka  *kafka.Writer
    redis  *redis.Client
    logger *slog.Logger
}

func (p *DualPublisher) Publish(ctx context.Context, event Event) error {
    // 1. Kafka: Critical path - must succeed
    payload, _ := json.Marshal(event)
    err := p.kafka.WriteMessages(ctx, kafka.Message{
        Key:   []byte(event.SourceID),
        Value: payload,
    })
    if err != nil {
        return fmt.Errorf("kafka publish failed: %w", err)
    }

    // 2. Redis: Best-effort - don't fail the operation
    p.publishToRedis(ctx, event)

    return nil
}

func (p *DualPublisher) publishToRedis(ctx context.Context, event Event) {
    // Lightweight payload (full event in Kafka)
    notification := map[string]interface{}{
        "id":        event.ID,
        "type":      event.Type,
        "source_id": event.SourceID,
    }

    payload, _ := json.Marshal(notification)
    channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)

    // Fire and forget - log errors but don't propagate
    if err := p.redis.Publish(ctx, channel, payload).Err(); err != nil {
        p.logger.Warn("redis publish failed", "error", err)
    }
}

Architecture

┌──────────────┐     ┌─────────────────┐     ┌──────────────┐
│   Ingester   │────▶│  DualPublisher  │────▶│    Kafka     │──▶ Event Processor
│              │     │                 │     │  (durable)   │
└──────────────┘     │                 │     └──────────────┘
                     │                 │     ┌──────────────┐
                     │                 │────▶│ Redis PubSub │──▶ WebSocket Gateway
                     │                 │     │ (real-time)  │
                     └─────────────────┘     └──────────────┘

Channel Naming Convention

events:{source_type}:{source_id}

Examples:
- events:user:octocat      - Events for user octocat
- events:repo:owner/repo   - Events for a repository
- events:org:microsoft     - Events for an organization

Batch Publishing

For high throughput:

func (p *DualPublisher) PublishBatch(ctx context.Context, events []Event) error {
    // 1. Batch to Kafka
    messages := make([]kafka.Message, len(events))
    for i, event := range events {
        payload, _ := json.Marshal(event)
        messages[i] = kafka.Message{
            Key:   []byte(event.SourceID),
            Value: payload,
        }
    }

    if err := p.kafka.WriteMessages(ctx, messages...); err != nil {
        return fmt.Errorf("kafka batch failed: %w", err)
    }

    // 2. Redis: Pipeline for efficiency
    pipe := p.redis.Pipeline()
    for _, event := range events {
        channel := fmt.Sprintf("events:%s:%s", event.SourceType, event.SourceID)
        notification, _ := json.Marshal(map[string]interface{}{
            "id":   event.ID,
            "type": event.Type,
        })
        pipe.Publish(ctx, channel, notification)
    }
    
    if _, err := pipe.Exec(ctx); err != nil {
        p.logger.Warn("redis batch failed", "error", err)
    }

    return nil
}

Decision Tree

RequirementStreamWhy
Must not lose eventKafka onlyAck required, replicated
User sees immediatelyRedis onlySub-ms delivery
Both durability + real-timeDual streamThis pattern
High volume (>10k/sec)Kafka, batch RedisRedis can bottleneck
Many subscribers per channelRedis + local fan-outDon't hammer Redis

Related Skills


NEVER Do

  • NEVER fail on Redis errors — Redis is best-effort. Log and continue.
  • NEVER send full payload to Redis — Send IDs only, clients fetch from API.
  • NEVER create one Redis channel per event — Use source-level channels.
  • NEVER skip Kafka for "unimportant" events — All events go to Kafka for replay.
  • NEVER use Redis Pub/Sub for persistence — Messages are fire-and-forget.

Edge Cases

CaseSolution
Redis downLog warning, continue with Kafka only
Client connects mid-streamQuery API for recent events, then subscribe
High channel cardinalityUse wildcard patterns or aggregate channels
Kafka backpressureBuffer in memory with timeout, fail if full
Need event replayConsume from Kafka from offset, not Redis

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Automation

database-specialist

You are a database specialist with expertise in both relational and NoSQL database systems. Use when: relational databases, nosql databases, database design,...

Registry SourceRecently Updated
Automation

Snaplii AI Agent Cashback Payment

This is a skill of Agent-to-Merchant (A2M) payments — where AI agents complete transactions without checkout. Snaplii uses pre-funded gift cards as a payment...

Registry SourceRecently Updated
Automation

deployment-engineer

Expert deployment engineer specializing in CI/CD pipelines, release automation, and deployment strategies. Masters blue-green, canary, and rolling deployment...

Registry SourceRecently Updated
Automation

Almured Connection

Agent-to-agent consultation marketplace via MCP. Ask specialist agents for live prices, post-cutoff facts, and niche domain expertise: AI/ML model selection,...

Registry SourceRecently Updated