Multi-Agent Pipeline
A reusable pattern for orchestrating multi-step AI workflows where each stage is handled by a specialist agent. Extracted from a production system that processed 18 stories across 10 languages.
Pipeline Pattern
Input → [Stage 1: Generate] → [Stage 2: Validate] → [Stage 3: Transform] → [Stage 4: Deliver]
│ │ │ │
Story Writer Guardrails Narrator Storage
(sequential) (parallel ok) (parallel ok) (sequential)
Core Concepts
Stages: Named processing steps, each with an agent function, input/output schema, and error handler.
Sequential vs Parallel: Some stages must run in order (generate before validate). Others can run in parallel (narrate + generate SFX simultaneously).
Progress Callbacks: Each stage reports status for UI updates. The pipeline visualization shows 9 agent nodes lighting up sequentially.
Error Recovery: Failed stages can retry with backoff, skip with defaults, or halt the pipeline.
Caching: Integrate with prompt-cache skill to skip stages that have already produced identical output.
Quick Start
from pipeline import Pipeline, Stage
async def generate_story(input_data):
# Call your LLM here
return {"story": "Once upon a time..."}
async def validate_content(input_data):
# Check guardrails
return {"valid": True, "story": input_data["story"]}
async def narrate(input_data):
# Call TTS API
return {"audio": b"..."}
pipeline = Pipeline(stages=[
Stage("generate", generate_story, parallel=False),
Stage("validate", validate_content, parallel=False),
Stage("narrate", narrate, parallel=True),
])
result = await pipeline.run({"prompt": "A bedtime story about clouds"})
Status Tracking
The pipeline emits status updates suitable for real-time UI:
pipeline = Pipeline(
stages=[...],
on_status=lambda stage, status: print(f"{stage}: {status}")
)
# Output:
# generate: started
# generate: completed (2.3s)
# validate: started
# validate: completed (0.1s)
# narrate: started
# narrate: completed (4.7s)
Lessons from Production
- Pre-cache demo content — never rely on live API calls during presentations
- Parallel stages save wall-clock time but increase API concurrency — respect rate limits
- Status callbacks should be non-blocking — don't let UI updates slow the pipeline
- Error in stage N should not lose stages 1..N-1 output — persist intermediate results
Files
scripts/pipeline.py— Generic pipeline implementation with stages, parallelism, and callbacks
Security Notes
This skill uses patterns that may trigger automated security scanners:
- base64: Used for encoding audio/binary data in API responses (standard practice for media APIs)
- UploadFile: FastAPI's built-in file upload parameter for STT/voice isolation endpoints
- "system prompt": Refers to configuring agent instructions, not prompt injection