langgraph-functional

LangGraph Functional API

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "langgraph-functional" with this command: npx skills add yonatangross/orchestkit/yonatangross-orchestkit-langgraph-functional

LangGraph Functional API

Build workflows using decorators instead of explicit graph construction.

Overview

  • Sequential workflows with conditional branching

  • Orchestrator-worker patterns with parallel execution

  • Workflows needing persistence and checkpointing

  • Human-in-the-loop approval flows

  • Simpler alternative to explicit StateGraph construction

Core Concepts

Graph API vs Functional API

Graph API (explicit): Functional API (implicit): StateGraph → add_node → @task functions + add_edge → compile @entrypoint orchestration

When to Use Functional API:

  • Sequential workflows with conditional logic

  • Orchestrator-worker patterns

  • Simpler debugging (regular Python functions)

  • Parallel task execution

Quick Start

Basic Pattern

from langgraph.func import entrypoint, task

@task def step_one(data: str) -> str: """Task returns a future - call .result() to block""" return process(data)

@task def step_two(result: str) -> str: return transform(result)

@entrypoint() def my_workflow(input_data: str) -> str: # Tasks return futures - enables parallel execution result1 = step_one(input_data).result() result2 = step_two(result1).result() return result2

Invoke

output = my_workflow.invoke("hello")

Key Rules

  • @task functions return futures - call .result() to get value

  • @entrypoint is the workflow entry point - orchestrates tasks

  • Tasks inside entrypoint are tracked for persistence/streaming

  • Regular functions (no decorator) execute normally

Parallel Execution

Fan-Out Pattern

@task def fetch_source_a(query: str) -> dict: return api_a.search(query)

@task def fetch_source_b(query: str) -> dict: return api_b.search(query)

@task def merge_results(results: list[dict]) -> dict: return {"combined": results}

@entrypoint() def parallel_search(query: str) -> dict: # Launch in parallel - futures start immediately future_a = fetch_source_a(query) future_b = fetch_source_b(query)

# Block on both results
results = [future_a.result(), future_b.result()]

return merge_results(results).result()

Map Over Collection

@task def process_item(item: dict) -> dict: return transform(item)

@entrypoint() def batch_workflow(items: list[dict]) -> list[dict]: # Launch all in parallel futures = [process_item(item) for item in items]

# Collect results
return [f.result() for f in futures]

Persistence & Checkpointing

Enable Checkpointing

from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer) def resumable_workflow(data: str) -> str: # Workflow state is automatically saved after each task result = expensive_task(data).result() return result

Use thread_id for persistence

config = {"configurable": {"thread_id": "session-123"}} result = resumable_workflow.invoke("input", config)

Access Previous State

from typing import Optional

@entrypoint(checkpointer=checkpointer) def stateful_workflow(data: str, previous: Optional[dict] = None) -> dict: """previous contains last return value for this thread_id"""

if previous and previous.get("step") == "complete":
    return previous  # Already done

result = process(data).result()
return {"step": "complete", "result": result}

Human-in-the-Loop

Interrupt for Approval

from langgraph.types import interrupt, Command

@entrypoint(checkpointer=checkpointer) def approval_workflow(request: dict) -> dict: # Process request result = analyze_request(request).result()

# Pause for human approval
approved = interrupt({
    "question": "Approve this action?",
    "details": result
})

if approved:
    return execute_action(result).result()
else:
    return {"status": "rejected"}

Initial run - pauses at interrupt

config = {"configurable": {"thread_id": "approval-1"}} for chunk in approval_workflow.stream(request, config): print(chunk)

Resume after human review

for chunk in approval_workflow.stream(Command(resume=True), config): print(chunk)

Conditional Logic

Branching

@task def classify(text: str) -> str: return llm.invoke(f"Classify: {text}") # "positive" or "negative"

@task def handle_positive(text: str) -> str: return "Thank you for the positive feedback!"

@task def handle_negative(text: str) -> str: return "We're sorry to hear that. Creating support ticket..."

@entrypoint() def feedback_workflow(text: str) -> str: sentiment = classify(text).result()

if sentiment == "positive":
    return handle_positive(text).result()
else:
    return handle_negative(text).result()

Loop Until Done

@task def call_llm(messages: list) -> dict: return llm_with_tools.invoke(messages)

@task def call_tool(tool_call: dict) -> str: tool = tools[tool_call["name"]] return tool.invoke(tool_call["args"])

@entrypoint() def agent_loop(query: str) -> str: messages = [{"role": "user", "content": query}]

while True:
    response = call_llm(messages).result()

    if not response.get("tool_calls"):
        return response["content"]

    # Execute tools in parallel
    tool_futures = [call_tool(tc) for tc in response["tool_calls"]]
    tool_results = [f.result() for f in tool_futures]

    messages.extend([response, *tool_results])

Streaming

Stream Updates

@entrypoint() def streaming_workflow(data: str) -> str: step1 = task_one(data).result() step2 = task_two(step1).result() return step2

Stream task completion updates

for update in streaming_workflow.stream("input", stream_mode="updates"): print(f"Task completed: {update}")

Stream Modes

"updates" - task completion events

for chunk in workflow.stream(input, stream_mode="updates"): print(chunk)

"values" - full state after each task

for chunk in workflow.stream(input, stream_mode="values"): print(chunk)

"custom" - custom events from your code

for chunk in workflow.stream(input, stream_mode="custom"): print(chunk)

TypeScript

Basic Pattern

import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

const processData = task("processData", async (data: string) => { return await transform(data); });

const workflow = entrypoint( { name: "myWorkflow", checkpointer: new MemorySaver() }, async (input: string) => { const result = await processData(input); return result; } );

// Invoke const config = { configurable: { thread_id: "session-1" } }; const result = await workflow.invoke("hello", config);

Parallel Execution

const fetchA = task("fetchA", async (q: string) => api.fetchA(q)); const fetchB = task("fetchB", async (q: string) => api.fetchB(q));

const parallelWorkflow = entrypoint("parallel", async (query: string) => { // Launch in parallel using Promise.all const [resultA, resultB] = await Promise.all([ fetchA(query), fetchB(query) ]); return { a: resultA, b: resultB }; });

Common Patterns

Orchestrator-Worker

@task def plan(topic: str) -> list[str]: """Orchestrator creates work items""" sections = planner.invoke(f"Create outline for: {topic}") return sections

@task def write_section(section: str) -> str: """Worker processes one item""" return llm.invoke(f"Write section: {section}")

@task def synthesize(sections: list[str]) -> str: """Combine results""" return "\n\n".join(sections)

@entrypoint() def report_workflow(topic: str) -> str: sections = plan(topic).result()

# Fan-out to workers
section_futures = [write_section(s) for s in sections]
completed = [f.result() for f in section_futures]

# Fan-in
return synthesize(completed).result()

Retry Pattern

@task def unreliable_api(data: str) -> dict: return external_api.call(data)

@entrypoint() def retry_workflow(data: str, max_retries: int = 3) -> dict: for attempt in range(max_retries): try: return unreliable_api(data).result() except Exception as e: if attempt == max_retries - 1: raise continue

Anti-Patterns

  • Forgetting .result(): Tasks return futures, must call .result()

  • Blocking in tasks: Keep tasks focused, don't nest entrypoints

  • Missing checkpointer: Without it, can't resume interrupted workflows

  • Sequential when parallel: Launch tasks before blocking on results

Migration from Graph API

Graph API (before)

from langgraph.graph import StateGraph

def node_a(state): return {"data": process(state["input"])} def node_b(state): return {"result": transform(state["data"])}

graph = StateGraph(State) graph.add_node("a", node_a) graph.add_node("b", node_b) graph.add_edge("a", "b") app = graph.compile()

Functional API (after)

@task def process_data(input: str) -> str: return process(input)

@task def transform_data(data: str) -> str: return transform(data)

@entrypoint() def workflow(input: str) -> str: data = process_data(input).result() return transform_data(data).result()

Evaluations

See references/evaluations.md for test cases.

Related Skills

  • langgraph-state

  • State management patterns for complex workflow data

  • langgraph-routing

  • Conditional routing and branching decisions

  • langgraph-parallel

  • Advanced parallel execution and fan-out patterns

  • langgraph-checkpoints

  • Persistence and recovery for long-running workflows

  • langgraph-human-in-loop

  • Human approval with Functional API

  • langgraph-subgraphs

  • Compose functional workflows as subgraphs

Key Decisions

Decision Choice Rationale

API Style Functional over Graph Simpler debugging, familiar Python patterns, implicit graph construction

Task Returns Futures with .result() Enables parallel execution without explicit async/await

Checkpointing Optional per-entrypoint Flexibility for stateless vs. resumable workflows

Human-in-Loop interrupt() function Clean pause/resume semantics with Command pattern

Resources

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

General

responsive-patterns

No summary provided by upstream source.

Repository SourceNeeds Review
General

domain-driven-design

No summary provided by upstream source.

Repository SourceNeeds Review
General

dashboard-patterns

No summary provided by upstream source.

Repository SourceNeeds Review