Saga Orchestration
Patterns for managing distributed transactions and long-running business processes.
Saga Types
Choreography
┌─────┐ ┌─────┐ ┌─────┐ │Svc A│─►│Svc B│─►│Svc C│ └─────┘ └─────┘ └─────┘ │ │ │ ▼ ▼ ▼ Event Event Event
-
Services react to events
-
Decentralized control
-
Good for simple flows
Orchestration
┌─────────────┐
│ Orchestrator│
└──────┬──────┘
│
┌─────┼─────┐
▼ ▼ ▼
┌────┐┌────┐┌────┐ │Svc1││Svc2││Svc3│ └────┘└────┘└────┘
-
Central coordinator
-
Explicit control flow
-
Better for complex flows
Saga States
State Description
Started Saga initiated
Pending Waiting for step
Compensating Rolling back
Completed All steps succeeded
Failed Failed after compensation
Orchestrator Implementation
@dataclass class SagaStep: name: str action: str compensation: str status: str = "pending"
class SagaOrchestrator: async def execute(self, data: dict) -> SagaResult: completed_steps = [] context = {"data": data}
for step in self.steps:
result = await step.action(context)
if not result.success:
await self.compensate(completed_steps, context)
return SagaResult(status="failed", error=result.error)
completed_steps.append(step)
context.update(result.data)
return SagaResult(status="completed", data=context)
async def compensate(self, completed_steps, context):
for step in reversed(completed_steps):
await step.compensation(context)
Order Fulfillment Saga Example
class OrderFulfillmentSaga(SagaOrchestrator): def define_steps(self, data): return [ SagaStep("reserve_inventory", action=self.reserve_inventory, compensation=self.release_inventory), SagaStep("process_payment", action=self.process_payment, compensation=self.refund_payment), SagaStep("create_shipment", action=self.create_shipment, compensation=self.cancel_shipment), ]
Choreography Example
class OrderChoreographySaga: def init(self, event_bus): self.event_bus = event_bus event_bus.subscribe("OrderCreated", self._on_order_created) event_bus.subscribe("InventoryReserved", self._on_inventory_reserved) event_bus.subscribe("PaymentFailed", self._on_payment_failed)
async def _on_order_created(self, event):
await self.event_bus.publish("ReserveInventory", {
"order_id": event["order_id"],
"items": event["items"]
})
async def _on_payment_failed(self, event):
# Compensation
await self.event_bus.publish("ReleaseInventory", {
"reservation_id": event["reservation_id"]
})
Best Practices
-
Make steps idempotent - Safe to retry
-
Design compensations carefully - Must always work
-
Use correlation IDs - For tracing across services
-
Implement timeouts - Don't wait forever
-
Log everything - For debugging failures
When to Use
Orchestration:
-
Complex multi-step workflows
-
Need visibility into saga state
-
Central error handling
Choreography:
-
Simple event flows
-
Loose coupling required
-
Independent service teams