Event-Driven Architecture
Implement event-driven systems with event sourcing, CQRS, and message queues. This skill covers distributed patterns for scalable, resilient applications.
Purpose
Build loosely coupled, scalable systems:
-
Implement event sourcing for audit trails
-
Apply CQRS for read/write optimization
-
Use message queues for async processing
-
Handle distributed transactions with sagas
-
Ensure eventual consistency
-
Build replay and recovery capabilities
Features
- Event Sourcing
// Event definitions interface DomainEvent { eventId: string; eventType: string; aggregateId: string; aggregateType: string; timestamp: Date; version: number; data: Record<string, any>; metadata: { userId?: string; correlationId?: string; causationId?: string; }; }
// Order aggregate events type OrderEvent = | { type: 'OrderCreated'; data: { customerId: string; items: OrderItem[] } } | { type: 'OrderItemAdded'; data: { item: OrderItem } } | { type: 'OrderItemRemoved'; data: { itemId: string } } | { type: 'OrderSubmitted'; data: { submittedAt: Date } } | { type: 'PaymentReceived'; data: { paymentId: string; amount: number } } | { type: 'OrderShipped'; data: { trackingNumber: string; carrier: string } } | { type: 'OrderDelivered'; data: { deliveredAt: Date } } | { type: 'OrderCancelled'; data: { reason: string } };
// Event store class EventStore { async append( aggregateId: string, events: DomainEvent[], expectedVersion: number ): Promise<void> { // Optimistic concurrency check const currentVersion = await this.getVersion(aggregateId);
if (currentVersion !== expectedVersion) {
throw new ConcurrencyError(
`Expected version ${expectedVersion}, but found ${currentVersion}`
);
}
// Append events atomically
await db.$transaction(async (tx) => {
for (let i = 0; i < events.length; i++) {
await tx.event.create({
data: {
...events[i],
version: expectedVersion + i + 1,
},
});
}
});
// Publish to event bus
for (const event of events) {
await eventBus.publish(event);
}
}
async getEvents( aggregateId: string, fromVersion?: number ): Promise<DomainEvent[]> { return db.event.findMany({ where: { aggregateId, version: fromVersion ? { gt: fromVersion } : undefined, }, orderBy: { version: 'asc' }, }); }
async getVersion(aggregateId: string): Promise<number> { const lastEvent = await db.event.findFirst({ where: { aggregateId }, orderBy: { version: 'desc' }, });
return lastEvent?.version ?? 0;
} }
// Aggregate with event sourcing class OrderAggregate { private id: string; private state: OrderState; private version: number = 0; private uncommittedEvents: OrderEvent[] = [];
static async load(eventStore: EventStore, id: string): Promise<OrderAggregate> { const aggregate = new OrderAggregate(id); const events = await eventStore.getEvents(id);
for (const event of events) {
aggregate.apply(event, false);
}
return aggregate;
}
// Command handlers create(customerId: string, items: OrderItem[]): void { if (this.state) { throw new Error('Order already exists'); }
this.applyChange({
type: 'OrderCreated',
data: { customerId, items },
});
}
addItem(item: OrderItem): void { this.ensureState(['draft']);
this.applyChange({
type: 'OrderItemAdded',
data: { item },
});
}
submit(): void { this.ensureState(['draft']);
if (this.state.items.length === 0) {
throw new Error('Cannot submit empty order');
}
this.applyChange({
type: 'OrderSubmitted',
data: { submittedAt: new Date() },
});
}
// Event application private apply(event: OrderEvent, isNew: boolean): void { switch (event.type) { case 'OrderCreated': this.state = { status: 'draft', customerId: event.data.customerId, items: event.data.items, total: this.calculateTotal(event.data.items), }; break;
case 'OrderItemAdded':
this.state.items.push(event.data.item);
this.state.total = this.calculateTotal(this.state.items);
break;
case 'OrderSubmitted':
this.state.status = 'submitted';
this.state.submittedAt = event.data.submittedAt;
break;
// ... other event handlers
}
this.version++;
if (isNew) {
this.uncommittedEvents.push(event);
}
}
private applyChange(event: OrderEvent): void { this.apply(event, true); }
async save(eventStore: EventStore): Promise<void> { const domainEvents = this.uncommittedEvents.map((e, i) => ({ eventId: uuid(), eventType: e.type, aggregateId: this.id, aggregateType: 'Order', timestamp: new Date(), version: this.version - this.uncommittedEvents.length + i + 1, data: e.data, metadata: {}, }));
await eventStore.append(
this.id,
domainEvents,
this.version - this.uncommittedEvents.length
);
this.uncommittedEvents = [];
} }
- CQRS Pattern
// Command side (writes) interface Command { type: string; payload: any; metadata: { userId: string; timestamp: Date; correlationId: string; }; }
class CommandBus { private handlers = new Map<string, CommandHandler>();
register(commandType: string, handler: CommandHandler): void { this.handlers.set(commandType, handler); }
async dispatch(command: Command): Promise<void> { const handler = this.handlers.get(command.type);
if (!handler) {
throw new Error(`No handler for command: ${command.type}`);
}
await handler.handle(command);
} }
// Command handler class CreateOrderHandler implements CommandHandler { constructor( private eventStore: EventStore, private orderRepository: OrderRepository ) {}
async handle(command: CreateOrderCommand): Promise<void> { const order = new OrderAggregate(uuid()); order.create(command.payload.customerId, command.payload.items); await order.save(this.eventStore); } }
// Query side (reads) interface Query { type: string; params: any; }
class QueryBus { private handlers = new Map<string, QueryHandler>();
register(queryType: string, handler: QueryHandler): void { this.handlers.set(queryType, handler); }
async execute<T>(query: Query): Promise<T> { const handler = this.handlers.get(query.type);
if (!handler) {
throw new Error(`No handler for query: ${query.type}`);
}
return handler.handle(query);
} }
// Read model projection class OrderReadModel { async project(event: DomainEvent): Promise<void> { switch (event.eventType) { case 'OrderCreated': await db.orderView.create({ data: { id: event.aggregateId, customerId: event.data.customerId, status: 'draft', itemCount: event.data.items.length, total: event.data.total, createdAt: event.timestamp, }, }); break;
case 'OrderSubmitted':
await db.orderView.update({
where: { id: event.aggregateId },
data: {
status: 'submitted',
submittedAt: event.data.submittedAt,
},
});
break;
case 'OrderShipped':
await db.orderView.update({
where: { id: event.aggregateId },
data: {
status: 'shipped',
trackingNumber: event.data.trackingNumber,
},
});
break;
}
}
// Rebuild projection from events async rebuild(): Promise<void> { // Clear existing read model await db.orderView.deleteMany();
// Replay all events
const events = await eventStore.getAllEvents();
for (const event of events) {
await this.project(event);
}
} }
- Message Queues with RabbitMQ
import amqp from 'amqplib';
class RabbitMQBroker { private connection: amqp.Connection; private channel: amqp.Channel;
async connect(): Promise<void> { this.connection = await amqp.connect(process.env.RABBITMQ_URL!); this.channel = await this.connection.createChannel();
// Setup exchanges
await this.channel.assertExchange('events', 'topic', { durable: true });
await this.channel.assertExchange('commands', 'direct', { durable: true });
await this.channel.assertExchange('dlx', 'fanout', { durable: true });
}
async publish(exchange: string, routingKey: string, message: any): Promise<void> { const content = Buffer.from(JSON.stringify(message));
this.channel.publish(exchange, routingKey, content, {
persistent: true,
contentType: 'application/json',
messageId: uuid(),
timestamp: Date.now(),
});
}
async subscribe(
queue: string,
exchange: string,
routingKey: string,
handler: (message: any) => Promise<void>
): Promise<void> {
// Setup queue with dead letter exchange
await this.channel.assertQueue(queue, {
durable: true,
deadLetterExchange: 'dlx',
deadLetterRoutingKey: ${queue}.dlq,
});
await this.channel.bindQueue(queue, exchange, routingKey);
// Consume messages
await this.channel.consume(queue, async (msg) => {
if (!msg) return;
try {
const content = JSON.parse(msg.content.toString());
await handler(content);
this.channel.ack(msg);
} catch (error) {
console.error('Message processing failed:', error);
// Retry or dead-letter
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0) + 1;
if (retryCount < 3) {
// Retry with exponential backoff
setTimeout(() => {
this.channel.publish(exchange, routingKey, msg.content, {
...msg.properties,
headers: {
...msg.properties.headers,
'x-retry-count': retryCount,
},
});
this.channel.ack(msg);
}, Math.pow(2, retryCount) * 1000);
} else {
// Send to dead letter queue
this.channel.reject(msg, false);
}
}
});
} }
// Event publishing class EventPublisher { constructor(private broker: RabbitMQBroker) {}
async publish(event: DomainEvent): Promise<void> {
const routingKey = ${event.aggregateType}.${event.eventType};
await this.broker.publish('events', routingKey, event);
}
}
// Event consumer class OrderEventConsumer { constructor( private broker: RabbitMQBroker, private readModel: OrderReadModel ) {}
async start(): Promise<void> { await this.broker.subscribe( 'order-projector', 'events', 'Order.*', async (event) => { await this.readModel.project(event); } ); } }
- Saga Pattern for Distributed Transactions
// Saga orchestrator interface SagaStep { name: string; execute: (context: SagaContext) => Promise<void>; compensate: (context: SagaContext) => Promise<void>; }
class SagaOrchestrator { private steps: SagaStep[] = []; private executedSteps: SagaStep[] = [];
addStep(step: SagaStep): this { this.steps.push(step); return this; }
async execute(context: SagaContext): Promise<void> {
try {
for (const step of this.steps) {
console.log(Executing step: ${step.name});
await step.execute(context);
this.executedSteps.push(step);
}
} catch (error) {
console.error('Saga failed, compensating...', error);
await this.compensate(context);
throw error;
}
}
private async compensate(context: SagaContext): Promise<void> {
// Execute compensation in reverse order
for (const step of this.executedSteps.reverse()) {
try {
console.log(Compensating step: ${step.name});
await step.compensate(context);
} catch (error) {
console.error(Compensation failed for ${step.name}:, error);
// Log for manual intervention
await this.logCompensationFailure(step, context, error);
}
}
}
}
// Order saga example const createOrderSaga = new SagaOrchestrator() .addStep({ name: 'Reserve Inventory', execute: async (ctx) => { const reservation = await inventoryService.reserve(ctx.items); ctx.reservationId = reservation.id; }, compensate: async (ctx) => { if (ctx.reservationId) { await inventoryService.releaseReservation(ctx.reservationId); } }, }) .addStep({ name: 'Process Payment', execute: async (ctx) => { const payment = await paymentService.charge(ctx.customerId, ctx.total); ctx.paymentId = payment.id; }, compensate: async (ctx) => { if (ctx.paymentId) { await paymentService.refund(ctx.paymentId); } }, }) .addStep({ name: 'Create Order', execute: async (ctx) => { const order = await orderService.create({ customerId: ctx.customerId, items: ctx.items, paymentId: ctx.paymentId, reservationId: ctx.reservationId, }); ctx.orderId = order.id; }, compensate: async (ctx) => { if (ctx.orderId) { await orderService.cancel(ctx.orderId); } }, }) .addStep({ name: 'Send Confirmation', execute: async (ctx) => { await notificationService.sendOrderConfirmation(ctx.orderId); }, compensate: async (ctx) => { // No compensation needed for notifications }, });
// Execute saga async function handleCreateOrder(command: CreateOrderCommand): Promise<void> { const context: SagaContext = { customerId: command.customerId, items: command.items, total: calculateTotal(command.items), };
await createOrderSaga.execute(context); }
- Kafka Streaming
import { Kafka, Producer, Consumer, EachMessagePayload } from 'kafkajs';
class KafkaService { private kafka: Kafka; private producer: Producer; private consumers: Map<string, Consumer> = new Map();
constructor() { this.kafka = new Kafka({ clientId: process.env.SERVICE_NAME, brokers: (process.env.KAFKA_BROKERS || '').split(','), }); }
async connect(): Promise<void> { this.producer = this.kafka.producer({ idempotent: true, maxInFlightRequests: 5, });
await this.producer.connect();
}
async publish(topic: string, messages: KafkaMessage[]): Promise<void> { await this.producer.send({ topic, messages: messages.map(m => ({ key: m.key, value: JSON.stringify(m.value), headers: m.headers, partition: m.partition, })), }); }
async subscribe( groupId: string, topics: string[], handler: (payload: EachMessagePayload) => Promise<void> ): Promise<void> { const consumer = this.kafka.consumer({ groupId, sessionTimeout: 30000, heartbeatInterval: 3000, });
await consumer.connect();
await consumer.subscribe({ topics, fromBeginning: false });
await consumer.run({
eachMessage: async (payload) => {
try {
await handler(payload);
} catch (error) {
console.error('Message processing failed:', error);
// Implement retry/DLQ logic
}
},
});
this.consumers.set(groupId, consumer);
}
async disconnect(): Promise<void> { await this.producer.disconnect(); for (const consumer of this.consumers.values()) { await consumer.disconnect(); } } }
// Stream processing class OrderStreamProcessor { constructor(private kafka: KafkaService) {}
async start(): Promise<void> { await this.kafka.subscribe( 'order-processor', ['order-events'], async ({ topic, partition, message }) => { const event = JSON.parse(message.value?.toString() || '{}');
switch (event.type) {
case 'OrderCreated':
await this.handleOrderCreated(event);
break;
case 'OrderCompleted':
await this.handleOrderCompleted(event);
break;
}
}
);
}
private async handleOrderCreated(event: any): Promise<void> { // Update analytics await analyticsService.recordOrder(event.data);
// Trigger downstream processes
await this.kafka.publish('inventory-commands', [{
key: event.aggregateId,
value: {
type: 'ReserveInventory',
orderId: event.aggregateId,
items: event.data.items,
},
}]);
} }
Use Cases
- Order Processing System
// Complete order workflow async function processOrder(orderId: string): Promise<void> { const saga = new SagaOrchestrator() .addStep(reserveInventoryStep) .addStep(processPaymentStep) .addStep(createShipmentStep) .addStep(sendNotificationStep);
await saga.execute({ orderId }); }
- Real-time Analytics
// Stream aggregation const orderTotalsStream = kafka.subscribe( 'analytics-aggregator', ['order-events'], async (event) => { await updateDailySales(event.data.total); await updateProductMetrics(event.data.items); } );
Best Practices
Do's
-
Design events as facts - Immutable, past-tense naming
-
Implement idempotent handlers - Handle duplicates gracefully
-
Plan for event versioning - Schema evolution
-
Use dead letter queues - Handle failures
-
Monitor queue depths - Alert on backlogs
-
Test with chaos - Simulate failures
Don'ts
-
Don't couple services through shared databases
-
Don't ignore message ordering requirements
-
Don't skip compensation logic
-
Don't forget about exactly-once semantics
-
Don't over-engineer for simple use cases
-
Don't ignore backpressure
Related Skills
-
redis - Pub/sub and caching
-
real-time-systems - WebSocket integration
-
backend-development - Service architecture
Reference Resources
-
Event Sourcing Pattern
-
CQRS Pattern
-
RabbitMQ Documentation
-
Apache Kafka Documentation