event-driven-architect

Event-Driven Architect

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "event-driven-architect" with this command: npx skills add monkey1sai/openai-cli/monkey1sai-openai-cli-event-driven-architect

Event-Driven Architect

Build decoupled, scalable systems with event-driven patterns.

Core Workflow

  • Identify domain events: Define what happened

  • Design event schema: Structure event payloads

  • Implement event bus: Publish and subscribe

  • Add event handlers: React to events

  • Consider CQRS: Separate reads and writes

  • Enable event sourcing: Store event history

Event Fundamentals

Event Structure

// events/base.ts export interface DomainEvent<T = unknown> { id: string; type: string; aggregateId: string; aggregateType: string; payload: T; metadata: { timestamp: Date; version: number; correlationId?: string; causationId?: string; userId?: string; }; }

// Type-safe event creator export function createEvent<T>( type: string, aggregateType: string, aggregateId: string, payload: T, metadata?: Partial<DomainEvent['metadata']> ): DomainEvent<T> { return { id: crypto.randomUUID(), type, aggregateType, aggregateId, payload, metadata: { timestamp: new Date(), version: 1, ...metadata, }, }; }

Define Domain Events

// events/order.events.ts export interface OrderCreatedPayload { customerId: string; items: Array<{ productId: string; quantity: number; price: number; }>; totalAmount: number; shippingAddress: Address; }

export interface OrderPaidPayload { paymentId: string; amount: number; method: 'card' | 'bank' | 'wallet'; }

export interface OrderShippedPayload { trackingNumber: string; carrier: string; estimatedDelivery: string; }

export interface OrderCancelledPayload { reason: string; cancelledBy: string; refundAmount?: number; }

// Event types export type OrderEvent = | DomainEvent<OrderCreatedPayload> & { type: 'OrderCreated' } | DomainEvent<OrderPaidPayload> & { type: 'OrderPaid' } | DomainEvent<OrderShippedPayload> & { type: 'OrderShipped' } | DomainEvent<OrderCancelledPayload> & { type: 'OrderCancelled' };

// Event creators export const OrderEvents = { created: (orderId: string, payload: OrderCreatedPayload) => createEvent('OrderCreated', 'Order', orderId, payload),

paid: (orderId: string, payload: OrderPaidPayload) => createEvent('OrderPaid', 'Order', orderId, payload),

shipped: (orderId: string, payload: OrderShippedPayload) => createEvent('OrderShipped', 'Order', orderId, payload),

cancelled: (orderId: string, payload: OrderCancelledPayload) => createEvent('OrderCancelled', 'Order', orderId, payload), };

Event Bus

In-Memory Event Bus

// events/event-bus.ts import { EventEmitter } from 'events'; import { DomainEvent } from './base';

type EventHandler<T = unknown> = (event: DomainEvent<T>) => Promise<void>;

class EventBus { private emitter = new EventEmitter(); private handlers = new Map<string, EventHandler[]>();

async publish<T>(event: DomainEvent<T>): Promise<void> { console.log(Publishing event: ${event.type}, event);

// Store event (for event sourcing)
await this.storeEvent(event);

// Emit to handlers
this.emitter.emit(event.type, event);
this.emitter.emit('*', event); // Wildcard for all events

}

async publishAll(events: DomainEvent[]): Promise<void> { for (const event of events) { await this.publish(event); } }

subscribe<T>(eventType: string, handler: EventHandler<T>): () => void { const wrappedHandler = async (event: DomainEvent<T>) => { try { await handler(event); } catch (error) { console.error(Error handling ${eventType}:, error); // Could emit to dead letter queue here } };

this.emitter.on(eventType, wrappedHandler);

// Return unsubscribe function
return () => {
  this.emitter.off(eventType, wrappedHandler);
};

}

subscribeAll(handler: EventHandler): () => void { return this.subscribe('*', handler); }

private async storeEvent(event: DomainEvent): Promise<void> { await db.event.create({ data: { id: event.id, type: event.type, aggregateId: event.aggregateId, aggregateType: event.aggregateType, payload: event.payload as any, metadata: event.metadata as any, createdAt: event.metadata.timestamp, }, }); } }

export const eventBus = new EventBus();

Redis-Based Event Bus

// events/redis-event-bus.ts import { Redis } from 'ioredis'; import { DomainEvent } from './base';

const publisher = new Redis(process.env.REDIS_URL!); const subscriber = new Redis(process.env.REDIS_URL!);

class RedisEventBus { private handlers = new Map<string, Set<(event: DomainEvent) => Promise<void>>>();

constructor() { subscriber.on('message', async (channel, message) => { const event = JSON.parse(message) as DomainEvent; const handlers = this.handlers.get(channel) || new Set();

  for (const handler of handlers) {
    try {
      await handler(event);
    } catch (error) {
      console.error(`Error handling ${event.type}:`, error);
    }
  }
});

}

async publish(event: DomainEvent): Promise<void> { const channel = events:${event.type}; await publisher.publish(channel, JSON.stringify(event));

// Also store in stream for replay
await publisher.xadd(
  `stream:${event.aggregateType}`,
  '*',
  'event',
  JSON.stringify(event)
);

}

subscribe(eventType: string, handler: (event: DomainEvent) => Promise<void>): () => void { const channel = events:${eventType};

if (!this.handlers.has(channel)) {
  this.handlers.set(channel, new Set());
  subscriber.subscribe(channel);
}

this.handlers.get(channel)!.add(handler);

return () => {
  this.handlers.get(channel)?.delete(handler);
};

} }

export const eventBus = new RedisEventBus();

Event Handlers

Handler Registration

// handlers/order.handlers.ts import { eventBus } from '../events/event-bus'; import { OrderEvent } from '../events/order.events';

// Email notification on order created eventBus.subscribe<OrderCreatedPayload>('OrderCreated', async (event) => { await emailService.send({ to: await getUserEmail(event.payload.customerId), template: 'order-confirmation', data: { orderId: event.aggregateId, items: event.payload.items, total: event.payload.totalAmount, }, }); });

// Update inventory on order created eventBus.subscribe<OrderCreatedPayload>('OrderCreated', async (event) => { for (const item of event.payload.items) { await inventoryService.reserve(item.productId, item.quantity); } });

// Analytics tracking eventBus.subscribe<OrderPaidPayload>('OrderPaid', async (event) => { await analytics.track('order_completed', { orderId: event.aggregateId, amount: event.payload.amount, paymentMethod: event.payload.method, }); });

// Notify shipping on order paid eventBus.subscribe<OrderPaidPayload>('OrderPaid', async (event) => { await shippingService.createShipment(event.aggregateId); });

// Handle cancellation eventBus.subscribe<OrderCancelledPayload>('OrderCancelled', async (event) => { // Release inventory const order = await orderRepository.findById(event.aggregateId); for (const item of order.items) { await inventoryService.release(item.productId, item.quantity); }

// Process refund if (event.payload.refundAmount) { await paymentService.refund(event.aggregateId, event.payload.refundAmount); }

// Send cancellation email await emailService.send({ to: await getUserEmail(order.customerId), template: 'order-cancelled', data: { orderId: event.aggregateId, reason: event.payload.reason, }, }); });

Event Sourcing

Aggregate with Events

// aggregates/order.aggregate.ts import { DomainEvent } from '../events/base'; import { OrderEvents, OrderCreatedPayload, OrderPaidPayload } from '../events/order.events';

interface OrderItem { productId: string; quantity: number; price: number; }

type OrderStatus = 'pending' | 'paid' | 'shipped' | 'delivered' | 'cancelled';

export class OrderAggregate { private _id: string; private _status: OrderStatus = 'pending'; private _items: OrderItem[] = []; private _totalAmount: number = 0; private _customerId: string = ''; private _version: number = 0;

private uncommittedEvents: DomainEvent[] = [];

get id() { return this._id; } get status() { return this._status; } get items() { return [...this._items]; } get version() { return this._version; }

constructor(id?: string) { this._id = id || crypto.randomUUID(); }

// Command: Create order static create(customerId: string, items: OrderItem[], shippingAddress: Address): OrderAggregate { const order = new OrderAggregate(); const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0);

order.apply(
  OrderEvents.created(order._id, {
    customerId,
    items,
    totalAmount,
    shippingAddress,
  })
);

return order;

}

// Command: Pay order pay(paymentId: string, amount: number, method: 'card' | 'bank' | 'wallet'): void { if (this._status !== 'pending') { throw new Error('Order cannot be paid in current status'); }

if (amount !== this._totalAmount) {
  throw new Error('Payment amount does not match order total');
}

this.apply(
  OrderEvents.paid(this._id, { paymentId, amount, method })
);

}

// Command: Cancel order cancel(reason: string, cancelledBy: string): void { if (['shipped', 'delivered', 'cancelled'].includes(this._status)) { throw new Error('Order cannot be cancelled in current status'); }

const refundAmount = this._status === 'paid' ? this._totalAmount : undefined;

this.apply(
  OrderEvents.cancelled(this._id, { reason, cancelledBy, refundAmount })
);

}

// Apply event and track for persistence private apply(event: DomainEvent): void { this.applyEvent(event); this.uncommittedEvents.push(event); }

// Apply event to state (used for replay too) private applyEvent(event: DomainEvent): void { switch (event.type) { case 'OrderCreated': const created = event.payload as OrderCreatedPayload; this._customerId = created.customerId; this._items = created.items; this._totalAmount = created.totalAmount; this._status = 'pending'; break;

  case 'OrderPaid':
    this._status = 'paid';
    break;

  case 'OrderShipped':
    this._status = 'shipped';
    break;

  case 'OrderCancelled':
    this._status = 'cancelled';
    break;
}

this._version++;

}

// Get and clear uncommitted events getUncommittedEvents(): DomainEvent[] { const events = [...this.uncommittedEvents]; this.uncommittedEvents = []; return events; }

// Rebuild from events static fromEvents(events: DomainEvent[]): OrderAggregate { if (events.length === 0) { throw new Error('Cannot rebuild aggregate from empty events'); }

const order = new OrderAggregate(events[0].aggregateId);

for (const event of events) {
  order.applyEvent(event);
}

return order;

} }

Event Store Repository

// repositories/event-store.repository.ts import { db } from '../lib/db'; import { DomainEvent } from '../events/base'; import { eventBus } from '../events/event-bus';

export class EventStoreRepository<T extends { id: string; getUncommittedEvents(): DomainEvent[] }> { constructor( private aggregateType: string, private reconstruct: (events: DomainEvent[]) => T ) {}

async save(aggregate: T): Promise<void> { const events = aggregate.getUncommittedEvents();

if (events.length === 0) return;

// Store events
await db.event.createMany({
  data: events.map((event) => ({
    id: event.id,
    type: event.type,
    aggregateId: event.aggregateId,
    aggregateType: event.aggregateType,
    payload: event.payload as any,
    metadata: event.metadata as any,
    createdAt: event.metadata.timestamp,
  })),
});

// Publish events
await eventBus.publishAll(events);

}

async findById(id: string): Promise<T | null> { const events = await db.event.findMany({ where: { aggregateId: id, aggregateType: this.aggregateType, }, orderBy: { createdAt: 'asc' }, });

if (events.length === 0) return null;

return this.reconstruct(
  events.map((e) => ({
    id: e.id,
    type: e.type,
    aggregateId: e.aggregateId,
    aggregateType: e.aggregateType,
    payload: e.payload,
    metadata: e.metadata as any,
  }))
);

}

async getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]> { const events = await db.event.findMany({ where: { aggregateId, aggregateType: this.aggregateType, ...(fromVersion && { metadata: { path: ['version'], gte: fromVersion }, }), }, orderBy: { createdAt: 'asc' }, });

return events.map((e) => ({
  id: e.id,
  type: e.type,
  aggregateId: e.aggregateId,
  aggregateType: e.aggregateType,
  payload: e.payload,
  metadata: e.metadata as any,
}));

} }

// Usage export const orderRepository = new EventStoreRepository( 'Order', OrderAggregate.fromEvents );

CQRS Pattern

Separate Command and Query

// commands/create-order.command.ts export interface CreateOrderCommand { customerId: string; items: Array<{ productId: string; quantity: number }>; shippingAddress: Address; }

// Command handler export async function handleCreateOrder(command: CreateOrderCommand): Promise<string> { // Validate const customer = await customerRepository.findById(command.customerId); if (!customer) throw new Error('Customer not found');

// Get product prices const items = await Promise.all( command.items.map(async (item) => { const product = await productRepository.findById(item.productId); return { productId: item.productId, quantity: item.quantity, price: product.price, }; }) );

// Create aggregate and save const order = OrderAggregate.create( command.customerId, items, command.shippingAddress );

await orderRepository.save(order);

return order.id; }

// queries/order.queries.ts // Read model - denormalized for fast queries export interface OrderReadModel { id: string; status: string; customerName: string; customerEmail: string; items: Array<{ productName: string; quantity: number; price: number; }>; totalAmount: number; createdAt: Date; paidAt?: Date; shippedAt?: Date; }

// Query handler export async function getOrderById(orderId: string): Promise<OrderReadModel | null> { return db.orderReadModel.findUnique({ where: { id: orderId }, }); }

export async function getOrdersByCustomer(customerId: string): Promise<OrderReadModel[]> { return db.orderReadModel.findMany({ where: { customerId }, orderBy: { createdAt: 'desc' }, }); }

Read Model Projector

// projectors/order.projector.ts import { eventBus } from '../events/event-bus';

// Project events to read model eventBus.subscribe('OrderCreated', async (event) => { const { customerId, items, totalAmount } = event.payload; const customer = await db.customer.findUnique({ where: { id: customerId } });

await db.orderReadModel.create({ data: { id: event.aggregateId, status: 'pending', customerId, customerName: customer.name, customerEmail: customer.email, items: await enrichItems(items), totalAmount, createdAt: event.metadata.timestamp, }, }); });

eventBus.subscribe('OrderPaid', async (event) => { await db.orderReadModel.update({ where: { id: event.aggregateId }, data: { status: 'paid', paidAt: event.metadata.timestamp, }, }); });

eventBus.subscribe('OrderShipped', async (event) => { await db.orderReadModel.update({ where: { id: event.aggregateId }, data: { status: 'shipped', shippedAt: event.metadata.timestamp, trackingNumber: event.payload.trackingNumber, }, }); });

Best Practices

  • Immutable events: Never modify stored events

  • Descriptive event names: Past tense (OrderCreated, not CreateOrder)

  • Include all context: Events should be self-contained

  • Version events: Handle schema evolution

  • Idempotent handlers: Handle duplicate events gracefully

  • Separate concerns: Commands mutate, queries read

  • Event versioning: Support backward compatibility

  • Dead letter queue: Handle failed events

Output Checklist

Every event-driven system should include:

  • Well-defined domain events

  • Type-safe event payloads

  • Event bus (in-memory or distributed)

  • Event handlers with error handling

  • Event store for persistence

  • Aggregate with event sourcing (if needed)

  • CQRS separation (if needed)

  • Read model projectors

  • Dead letter handling

  • Event replay capability

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Coding

eslint-prettier-config

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

api-docs-generator

No summary provided by upstream source.

Repository SourceNeeds Review
Coding

rate-limiting-abuse-protection

No summary provided by upstream source.

Repository SourceNeeds Review