langgraph-parallel

LangGraph Parallel Execution

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "langgraph-parallel" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-langgraph-parallel

LangGraph Parallel Execution

Run independent nodes concurrently for performance.

Fan-Out/Fan-In Pattern

from langgraph.graph import StateGraph

def fan_out(state): """Split work into parallel tasks.""" state["tasks"] = [{"id": 1}, {"id": 2}, {"id": 3}] return state

def worker(state): """Process one task.""" task = state["current_task"] result = process(task) return {"results": [result]}

def fan_in(state): """Combine parallel results.""" combined = aggregate(state["results"]) return {"final": combined}

workflow = StateGraph(State) workflow.add_node("fan_out", fan_out) workflow.add_node("worker", worker) workflow.add_node("fan_in", fan_in)

workflow.add_edge("fan_out", "worker") workflow.add_edge("worker", "fan_in") # Waits for all workers

Using Send API

from langgraph.constants import Send

def router(state): """Route to multiple workers in parallel.""" return [ Send("worker", {"task": task}) for task in state["tasks"] ]

workflow.add_conditional_edges("router", router)

Complete Send API Example (2026 Pattern)

from langgraph.graph import StateGraph, START, END from langgraph.constants import Send from typing import TypedDict, Annotated from operator import add

class OverallState(TypedDict): subjects: list[str] jokes: Annotated[list[str], add] # Accumulates from parallel branches

class JokeState(TypedDict): subject: str

def generate_topics(state: OverallState) -> dict: """Initial node: create list of subjects.""" return {"subjects": ["cats", "dogs", "programming", "coffee"]}

def continue_to_jokes(state: OverallState) -> list[Send]: """Fan-out: create parallel branch for each subject.""" return [ Send("generate_joke", {"subject": s}) for s in state["subjects"] ]

def generate_joke(state: JokeState) -> dict: """Worker: process one subject, return to accumulator.""" joke = llm.invoke(f"Tell a short joke about {state['subject']}") return {"jokes": [f"{state['subject']}: {joke.content}"]}

Build graph

builder = StateGraph(OverallState) builder.add_node("generate_topics", generate_topics) builder.add_node("generate_joke", generate_joke)

builder.add_edge(START, "generate_topics") builder.add_conditional_edges("generate_topics", continue_to_jokes) builder.add_edge("generate_joke", END) # All branches converge automatically

graph = builder.compile()

Invoke

result = graph.invoke({"subjects": [], "jokes": []})

result["jokes"] contains all 4 jokes

Parallel Agent Analysis

from typing import Annotated from operator import add

class AnalysisState(TypedDict): content: str findings: Annotated[list[dict], add] # Accumulates

async def run_parallel_agents(state: AnalysisState): """Run multiple agents in parallel.""" agents = [security_agent, tech_agent, quality_agent]

# Run all concurrently
tasks = [agent.analyze(state["content"]) for agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)

# Filter successful results
findings = [r for r in results if not isinstance(r, Exception)]

return {"findings": findings}

Map-Reduce Pattern (True Parallel)

import asyncio

async def parallel_map(items: list, process_fn) -> list: """Map: Process all items concurrently.""" tasks = [asyncio.create_task(process_fn(item)) for item in items] return await asyncio.gather(*tasks, return_exceptions=True)

def reduce_results(results: list) -> dict: """Reduce: Combine all results.""" successes = [r for r in results if not isinstance(r, Exception)] failures = [r for r in results if isinstance(r, Exception)]

return {
    "total": len(results),
    "passed": len(successes),
    "failed": len(failures),
    "results": successes,
    "errors": [str(e) for e in failures]
}

async def map_reduce_node(state: State) -> dict: """Combined map-reduce in single node.""" results = await parallel_map(state["items"], process_item_async) summary = reduce_results(results) return {"summary": summary}

Alternative: Using Send API for true parallelism in graph

def fan_out_to_mappers(state: State) -> list[Send]: """Fan-out pattern for parallel map.""" return [ Send("mapper", {"item": item, "index": i}) for i, item in enumerate(state["items"]) ]

All mappers write to accumulating state key

Reducer runs after all mappers complete (automatic fan-in)

Error Isolation

async def parallel_with_isolation(tasks: list): """Run parallel tasks, isolate failures.""" results = await asyncio.gather(*tasks, return_exceptions=True)

successes = []
failures = []

for task, result in zip(tasks, results):
    if isinstance(result, Exception):
        failures.append({"task": task, "error": str(result)})
    else:
        successes.append(result)

return {"successes": successes, "failures": failures}

Timeout per Branch

import asyncio

async def parallel_with_timeout(agents: list, content: str, timeout: int = 30): """Run agents with per-agent timeout.""" async def run_with_timeout(agent): try: return await asyncio.wait_for( agent.analyze(content), timeout=timeout ) except asyncio.TimeoutError: return {"agent": agent.name, "error": "timeout"}

tasks = [run_with_timeout(a) for a in agents]
return await asyncio.gather(*tasks)

Key Decisions

Decision Recommendation

Max parallel 5-10 concurrent (avoid overwhelming APIs)

Error handling return_exceptions=True (don't fail all)

Timeout 30-60s per branch

Accumulator Use Annotated[list, add] for results

Common Mistakes

  • No error isolation (one failure kills all)

  • No timeout (one slow branch blocks)

  • Sequential where parallel possible

  • Forgetting to wait for all branches

Evaluations

See references/evaluations.md for test cases.

Related Skills

  • langgraph-state

  • Accumulating state with Annotated[list, add] reducer

  • langgraph-supervisor

  • Supervisor dispatching to parallel workers

  • langgraph-subgraphs

  • Parallel subgraph execution

  • langgraph-streaming

  • Stream progress from parallel branches

  • langgraph-checkpoints

  • Checkpoint parallel execution for recovery

  • multi-agent-orchestration

  • Higher-level coordination patterns

Capability Details

fanout-pattern

Keywords: fanout, parallel, concurrent, scatter Solves:

  • Run agents in parallel

  • Implement fan-out pattern

  • Distribute work across workers

fanin-pattern

Keywords: fanin, gather, aggregate, collect Solves:

  • Aggregate parallel results

  • Implement fan-in pattern

  • Collect worker outputs

parallel-template

Keywords: template, implementation, parallel, agent Solves:

  • Parallel agent fanout template

  • Production-ready code

  • Copy-paste implementation

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

responsive-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

domain-driven-design

No summary provided by upstream source.

Repository SourceNeeds Review
General

dashboard-patterns

No summary provided by upstream source.

Repository SourceNeeds Review