confluent-ksqldb

Confluent ksqlDB Skill

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "confluent-ksqldb" with this command: npx skills add anton-abyzov/specweave/anton-abyzov-specweave-confluent-ksqldb

Confluent ksqlDB Skill

Expert knowledge of ksqlDB - Confluent's event streaming database for building real-time applications with SQL-like queries on Kafka topics.

What I Know

Core Concepts

Streams (Unbounded, Append-Only):

  • Represents immutable event sequences

  • Every row is a new event

  • Cannot be updated or deleted

  • Example: Click events, sensor readings, transactions

Tables (Mutable, Latest State):

  • Represents current state

  • Updates override previous values (by key)

  • Compacted topic under the hood

  • Example: User profiles, product inventory, account balances

Key Difference:

-- STREAM: Every event is independent INSERT INTO clicks_stream (user_id, page, timestamp) VALUES (1, 'homepage', CURRENT_TIMESTAMP()); -- Creates NEW row

-- TABLE: Latest value wins (by key) INSERT INTO users_table (user_id, name, email) VALUES (1, 'John', 'john@example.com'); -- UPDATES existing row with user_id=1

Query Types

  1. Streaming Queries (Continuous, Real-Time):

-- Filter events in real-time SELECT user_id, page, timestamp FROM clicks_stream WHERE page = 'checkout' EMIT CHANGES;

-- Transform on the fly SELECT user_id, UPPER(page) AS page_upper, TIMESTAMPTOSTRING(timestamp, 'yyyy-MM-dd') AS date FROM clicks_stream EMIT CHANGES;

  1. Materialized Views (Pre-Computed Tables):

-- Aggregate clicks per user (updates continuously) CREATE TABLE user_click_counts AS SELECT user_id, COUNT(*) AS click_count FROM clicks_stream GROUP BY user_id EMIT CHANGES;

-- Query the table (instant results!) SELECT * FROM user_click_counts WHERE user_id = 123;

  1. Pull Queries (Point-in-Time Reads):

-- Query current state (like traditional SQL) SELECT * FROM users_table WHERE user_id = 123;

-- No EMIT CHANGES = pull query (returns once)

When to Use This Skill

Activate me when you need help with:

  • ksqlDB syntax ("How to create ksqlDB stream?")

  • Stream vs table concepts ("When to use stream vs table?")

  • Joins ("Join stream with table")

  • Aggregations ("Count events per user")

  • Windowing ("Tumbling window aggregation")

  • Real-time transformations ("Filter and enrich events")

  • Materialized views ("Create pre-computed aggregates")

Common Patterns

Pattern 1: Filter Events

Use Case: Drop irrelevant events early

-- Create filtered stream CREATE STREAM important_clicks AS SELECT * FROM clicks_stream WHERE page IN ('checkout', 'payment', 'confirmation') EMIT CHANGES;

Pattern 2: Enrich Events (Stream-Table Join)

Use Case: Add user details to click events

-- Users table (current state) CREATE TABLE users ( user_id BIGINT PRIMARY KEY, name VARCHAR, email VARCHAR ) WITH ( kafka_topic='users', value_format='AVRO' );

-- Enrich clicks with user data CREATE STREAM enriched_clicks AS SELECT c.user_id, c.page, c.timestamp, u.name, u.email FROM clicks_stream c LEFT JOIN users u ON c.user_id = u.user_id EMIT CHANGES;

Pattern 3: Real-Time Aggregation

Use Case: Count events per user, per 5-minute window

CREATE TABLE user_clicks_per_5min AS SELECT user_id, WINDOWSTART AS window_start, WINDOWEND AS window_end, COUNT(*) AS click_count FROM clicks_stream WINDOW TUMBLING (SIZE 5 MINUTES) GROUP BY user_id EMIT CHANGES;

-- Query current window SELECT * FROM user_clicks_per_5min WHERE user_id = 123 AND window_start >= NOW() - INTERVAL 5 MINUTES;

Pattern 4: Detect Anomalies

Use Case: Alert when user clicks >100 times in 1 minute

CREATE STREAM high_click_alerts AS SELECT user_id, COUNT() AS click_count FROM clicks_stream WINDOW TUMBLING (SIZE 1 MINUTE) GROUP BY user_id HAVING COUNT() > 100 EMIT CHANGES;

Pattern 5: Change Data Capture (CDC)

Use Case: Track changes to user table

-- Create table from CDC topic (Debezium) CREATE TABLE users_cdc ( user_id BIGINT PRIMARY KEY, name VARCHAR, email VARCHAR, op VARCHAR -- INSERT, UPDATE, DELETE ) WITH ( kafka_topic='mysql.users.cdc', value_format='AVRO' );

-- Stream of changes only CREATE STREAM user_changes AS SELECT * FROM users_cdc WHERE op IN ('UPDATE', 'DELETE') EMIT CHANGES;

Join Types

  1. Stream-Stream Join

Use Case: Correlate related events within time window

-- Join page views with clicks within 10 minutes CREATE STREAM page_view_with_clicks AS SELECT v.user_id, v.page AS viewed_page, c.page AS clicked_page FROM page_views v INNER JOIN clicks c WITHIN 10 MINUTES ON v.user_id = c.user_id EMIT CHANGES;

Window Types:

  • WITHIN 10 MINUTES

  • Events must be within 10 minutes of each other

  • GRACE PERIOD 5 MINUTES

  • Late-arriving events accepted for 5 more minutes

  1. Stream-Table Join

Use Case: Enrich events with current state

-- Add product details to order events CREATE STREAM enriched_orders AS SELECT o.order_id, o.product_id, p.product_name, p.price FROM orders_stream o LEFT JOIN products_table p ON o.product_id = p.product_id EMIT CHANGES;

  1. Table-Table Join

Use Case: Combine two tables (latest state)

-- Join users with their current cart CREATE TABLE user_with_cart AS SELECT u.user_id, u.name, c.cart_total FROM users u LEFT JOIN shopping_carts c ON u.user_id = c.user_id EMIT CHANGES;

Windowing Types

Tumbling Window (Non-Overlapping)

Use Case: Aggregate per fixed time period

-- Count events every 5 minutes SELECT user_id, COUNT(*) AS event_count FROM events WINDOW TUMBLING (SIZE 5 MINUTES) GROUP BY user_id;

-- Windows: [0:00-0:05), [0:05-0:10), [0:10-0:15)

Hopping Window (Overlapping)

Use Case: Moving average over time

-- Count events in 10-minute windows, advancing every 5 minutes SELECT user_id, COUNT(*) AS event_count FROM events WINDOW HOPPING (SIZE 10 MINUTES, ADVANCE BY 5 MINUTES) GROUP BY user_id;

-- Windows: [0:00-0:10), [0:05-0:15), [0:10-0:20)

Session Window (Event-Based)

Use Case: Group events by user session (gap-based)

-- Session ends after 30 minutes of inactivity SELECT user_id, COUNT(*) AS session_events FROM events WINDOW SESSION (30 MINUTES) GROUP BY user_id;

Best Practices

  1. Use Appropriate Data Types

✅ DO:

CREATE STREAM orders ( order_id BIGINT, user_id BIGINT, total DECIMAL(10, 2), -- Precise currency timestamp TIMESTAMP );

❌ DON'T:

-- WRONG: Using DOUBLE for currency (precision loss!) total DOUBLE

  1. Always Specify Keys

✅ DO:

CREATE TABLE users ( user_id BIGINT PRIMARY KEY, -- Explicit key name VARCHAR ) WITH (kafka_topic='users');

❌ DON'T:

-- WRONG: No key specified (can't join!) CREATE TABLE users ( user_id BIGINT, name VARCHAR );

  1. Use Windowing for Aggregations

✅ DO:

-- Windowed aggregation (bounded memory) SELECT COUNT(*) FROM events WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY user_id;

❌ DON'T:

-- WRONG: Non-windowed aggregation (unbounded memory!) SELECT COUNT(*) FROM events GROUP BY user_id;

  1. Set Retention Policies

-- Limit table size (keep last 7 days) CREATE TABLE user_stats ( user_id BIGINT PRIMARY KEY, click_count BIGINT ) WITH ( kafka_topic='user_stats', retention_ms=604800000 -- 7 days );

Performance Optimization

  1. Partition Alignment

Ensure joined streams/tables have same partition key:

-- GOOD: Both keyed by user_id (co-partitioned) CREATE STREAM clicks (user_id BIGINT KEY, ...) CREATE TABLE users (user_id BIGINT PRIMARY KEY, ...)

-- Join works efficiently (no repartitioning) SELECT * FROM clicks c JOIN users u ON c.user_id = u.user_id;

  1. Use Materialized Views

Pre-compute expensive queries:

-- BAD: Compute on every request SELECT COUNT(*) FROM orders WHERE user_id = 123;

-- GOOD: Materialized table (instant lookup) CREATE TABLE user_order_counts AS SELECT user_id, COUNT(*) AS order_count FROM orders GROUP BY user_id;

-- Query is now instant SELECT order_count FROM user_order_counts WHERE user_id = 123;

  1. Filter Early

-- GOOD: Filter before join CREATE STREAM important_events AS SELECT * FROM events WHERE event_type = 'purchase';

SELECT * FROM important_events e JOIN users u ON e.user_id = u.user_id;

-- BAD: Join first, filter later (processes all events!) SELECT * FROM events e JOIN users u ON e.user_id = u.user_id WHERE e.event_type = 'purchase';

Common Issues & Solutions

Issue 1: Query Timing Out

Error: Query timed out

Root Cause: Non-windowed aggregation on large stream

Solution: Add time window:

-- WRONG SELECT COUNT(*) FROM events GROUP BY user_id;

-- RIGHT SELECT COUNT(*) FROM events WINDOW TUMBLING (SIZE 1 HOUR) GROUP BY user_id;

Issue 2: Partition Mismatch

Error: Cannot join streams (different partition keys)

Solution: Repartition stream:

-- Repartition stream by user_id CREATE STREAM clicks_by_user AS SELECT * FROM clicks PARTITION BY user_id;

-- Now join works SELECT * FROM clicks_by_user c JOIN users u ON c.user_id = u.user_id;

Issue 3: Late-Arriving Events

Solution: Use grace period:

SELECT COUNT(*) FROM events WINDOW TUMBLING (SIZE 5 MINUTES, GRACE PERIOD 1 MINUTE) GROUP BY user_id; -- Accepts events up to 1 minute late

References

Invoke me when you need stream processing, real-time analytics, or SQL-like queries on Kafka!

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

technical-writing

No summary provided by upstream source.

Repository SourceNeeds Review
General

spec-driven-brainstorming

No summary provided by upstream source.

Repository SourceNeeds Review
General

kafka-architecture

No summary provided by upstream source.

Repository SourceNeeds Review
General

frontend

No summary provided by upstream source.

Repository SourceNeeds Review