saga-patterns

Saga Patterns for Distributed Transactions

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "saga-patterns" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-saga-patterns

Saga Patterns for Distributed Transactions

Maintain consistency across microservices without distributed locks.

Overview

  • Multi-service business transactions (order -> payment -> inventory -> shipping)

  • Operations that must eventually succeed or roll back completely

  • Long-running business processes (minutes to days)

  • Microservices avoiding 2PC (two-phase commit)

When NOT to Use

  • Single database operations (use transactions)

  • Real-time consistency requirements (use synchronous calls)

  • When eventual consistency is unacceptable

Orchestration vs Choreography

Aspect Orchestration Choreography

Control Central orchestrator Distributed events

Coupling Services depend on orchestrator Loosely coupled

Visibility Single point of observation Requires distributed tracing

Best for Complex, ordered workflows Simple, parallel flows

Orchestration Pattern

from enum import Enum from dataclasses import dataclass, field from typing import Callable, Any from datetime import datetime, timezone

class SagaStatus(Enum): PENDING = "pending" RUNNING = "running" COMPLETED = "completed" COMPENSATING = "compensating" COMPENSATED = "compensated" FAILED = "failed"

@dataclass class SagaStep: name: str action: Callable compensation: Callable status: SagaStatus = SagaStatus.PENDING result: Any = None

@dataclass class SagaContext: saga_id: str data: dict = field(default_factory=dict) steps: list[SagaStep] = field(default_factory=list) status: SagaStatus = SagaStatus.PENDING current_step: int = 0

class SagaOrchestrator: def init(self, saga_repository, event_publisher): self.repo = saga_repository self.publisher = event_publisher

async def execute(self, saga: SagaContext) -> SagaContext:
    saga.status = SagaStatus.RUNNING
    await self.repo.save(saga)

    for i, step in enumerate(saga.steps):
        saga.current_step = i
        try:
            step.result = await step.action(saga.data)
            saga.data.update(step.result or {})
            step.status = SagaStatus.COMPLETED
        except Exception:
            step.status = SagaStatus.FAILED
            await self._compensate(saga, i)
            return saga

    saga.status = SagaStatus.COMPLETED
    await self.repo.save(saga)
    return saga

async def _compensate(self, saga: SagaContext, failed_step: int):
    saga.status = SagaStatus.COMPENSATING
    for i in range(failed_step - 1, -1, -1):
        step = saga.steps[i]
        if step.status == SagaStatus.COMPLETED:
            try:
                await step.compensation(saga.data)
                step.status = SagaStatus.COMPENSATED
            except Exception as e:
                step.error = f"Compensation failed: {e}"
    saga.status = SagaStatus.COMPENSATED
    await self.repo.save(saga)

Order Saga Example

class OrderSaga: def init(self, payment_service, inventory_service, shipping_service): self.payment = payment_service self.inventory = inventory_service self.shipping = shipping_service

def create_saga(self, order: Order) -> SagaContext:
    return SagaContext(
        saga_id=f"order-{order.id}",
        data={"order": order.dict()},
        steps=[
            SagaStep("reserve_inventory", self._reserve_inventory, self._release_inventory),
            SagaStep("process_payment", self._process_payment, self._refund_payment),
            SagaStep("create_shipment", self._create_shipment, self._cancel_shipment),
        ],
    )

async def _reserve_inventory(self, data: dict) -> dict:
    reservation = await self.inventory.reserve(items=data["order"]["items"])
    return {"reservation_id": reservation.id}

async def _release_inventory(self, data: dict):
    await self.inventory.release(data["reservation_id"])

async def _process_payment(self, data: dict) -> dict:
    payment = await self.payment.charge(amount=data["order"]["total"])
    return {"payment_id": payment.id}

async def _refund_payment(self, data: dict):
    await self.payment.refund(data["payment_id"])

async def _create_shipment(self, data: dict) -> dict:
    shipment = await self.shipping.create(order_id=data["order"]["id"])
    return {"shipment_id": shipment.id}

async def _cancel_shipment(self, data: dict):
    if "shipment_id" in data:
        await self.shipping.cancel(data["shipment_id"])

Choreography Pattern

class OrderChoreography: """Event handlers for order saga choreography."""

def __init__(self, event_bus, order_repo):
    self.bus = event_bus
    self.repo = order_repo

async def handle_order_created(self, event):
    await self.bus.publish("inventory.reserve.requested", {
        "saga_id": event.saga_id,
        "items": event.payload["order"]["items"],
    })

async def handle_inventory_reserved(self, event):
    await self.bus.publish("payment.charge.requested", {
        "saga_id": event.saga_id,
        "amount": event.payload["amount"],
    })

async def handle_payment_failed(self, event):
    # Compensation: release inventory
    await self.bus.publish("inventory.release.requested", {
        "saga_id": event.saga_id,
        "reservation_id": event.payload["reservation_id"],
    })

async def handle_shipment_created(self, event):
    order = await self.repo.get(event.payload["order_id"])
    order.status = "shipped"
    await self.repo.save(order)

Timeout and Recovery

from datetime import timedelta import asyncio

class SagaRecovery: def init(self, saga_repo, orchestrator): self.repo = saga_repo self.orchestrator = orchestrator

async def recover_stuck_sagas(self, timeout: timedelta = timedelta(hours=1)):
    cutoff = datetime.now(timezone.utc) - timeout
    stuck_sagas = await self.repo.find_by_status_and_age(SagaStatus.RUNNING, cutoff)

    for saga in stuck_sagas:
        try:
            await self.orchestrator.resume(saga)
        except Exception:
            await self.orchestrator._compensate(saga, saga.current_step)

async def retry_failed_step(self, saga_id: str, max_retries: int = 3):
    saga = await self.repo.get(saga_id)
    failed_step = saga.steps[saga.current_step]

    for attempt in range(max_retries):
        try:
            failed_step.result = await failed_step.action(saga.data)
            failed_step.status = SagaStatus.COMPLETED
            await self.orchestrator.resume(saga, from_step=saga.current_step + 1)
            return
        except Exception:
            await asyncio.sleep(2 ** attempt)

    await self.orchestrator._compensate(saga, saga.current_step)

Idempotency

class IdempotentSagaStep: def init(self, step_name: str, idempotency_store): self.step_name = step_name self.store = idempotency_store

async def execute(self, saga_id: str, action: Callable, *args, **kwargs):
    idempotency_key = f"{saga_id}:{self.step_name}"
    existing = await self.store.get(idempotency_key)
    if existing:
        return existing["result"]

    result = await action(*args, **kwargs)
    await self.store.set(idempotency_key, {"result": result}, ttl=timedelta(days=7))
    return result

Key Decisions

Decision Recommendation

Pattern choice Orchestration for complex flows, Choreography for simple

State storage Persistent store (PostgreSQL) for saga state

Idempotency Required for all saga steps

Timeouts Per-step timeouts with recovery

Compensation Always implement, test thoroughly

Observability Trace saga ID across all services

Anti-Patterns (FORBIDDEN)

NEVER skip compensation logic

async def _process_payment(self, data: dict): return await self.payment.charge(data) # Missing compensation!

NEVER rely on synchronous calls across services

async def _reserve_and_pay(self, data: dict): await self.inventory.reserve(data) await self.payment.charge(data) # If fails, inventory stuck!

NEVER ignore idempotency

async def _create_order(self, data: dict): return await self.db.insert(Order(**data)) # Duplicate on retry!

NEVER use in-memory saga state

sagas = {} # Lost on restart!

ALWAYS persist saga state and test compensation paths

Related Skills

  • temporal-io

  • Durable workflow execution

  • event-sourcing

  • Event-driven state management

  • idempotency-patterns

  • Idempotent operations

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

responsive-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

domain-driven-design

No summary provided by upstream source.

Repository SourceNeeds Review
General

dashboard-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

rag-retrieval

No summary provided by upstream source.

Repository SourceNeeds Review