langgraph-human-in-the-loop

- interrupt(value) — pauses execution, surfaces a value to the caller

Safety Notice

This listing is imported from skills.sh public index metadata. Review upstream SKILL.md and repository scripts before running.

Copy this and send it to your AI assistant to learn

Install skill "langgraph-human-in-the-loop" with this command: npx skills add langchain-ai/langchain-skills/langchain-ai-langchain-skills-langgraph-human-in-the-loop

  • interrupt(value) — pauses execution, surfaces a value to the caller

  • Command(resume=value) — resumes execution, providing the value back to interrupt()

  • Checkpointer — required to save state while paused

  • Thread ID — required to identify which paused execution to resume

Requirements

Three things are required for interrupts to work:

  • Checkpointer — compile with checkpointer=InMemorySaver() (dev) or PostgresSaver (prod)

  • Thread ID — pass {"configurable": {"thread_id": "..."}} to every invoke /stream call

  • JSON-serializable payload — the value passed to interrupt() must be JSON-serializable

Basic Interrupt + Resume

interrupt(value) pauses the graph. The value surfaces in the result under interrupt . Command(resume=value) resumes — the resume value becomes the return value of interrupt() .

Critical: when the graph resumes, the node restarts from the beginning — all code before interrupt() re-runs.

class State(TypedDict): approved: bool

def approval_node(state: State):

Pause and ask for approval

approved = interrupt("Do you approve this action?")

When resumed, Command(resume=...) returns that value here

return {"approved": approved}

checkpointer = InMemorySaver() graph = ( StateGraph(State) .add_node("approval", approval_node) .add_edge(START, "approval") .add_edge("approval", END) .compile(checkpointer=checkpointer) )

config = {"configurable": {"thread_id": "thread-1"}}

Initial run — hits interrupt and pauses

result = graph.invoke({"approved": False}, config) print(result["interrupt"])

[Interrupt(value='Do you approve this action?')]

Resume with the human's response

result = graph.invoke(Command(resume=True), config) print(result["approved"]) # True

</python> <typescript> Pause execution for human review and resume with Command.

import { interrupt, Command, MemorySaver, StateGraph, StateSchema, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = new StateSchema({
  approved: z.boolean().default(false),
});

const approvalNode = async (state: typeof State.State) => {
  // Pause and ask for approval
  const approved = interrupt("Do you approve this action?");
  // When resumed, Command({ resume }) returns that value here
  return { approved };
};

const checkpointer = new MemorySaver();
const graph = new StateGraph(State)
  .addNode("approval", approvalNode)
  .addEdge(START, "approval")
  .addEdge("approval", END)
  .compile({ checkpointer });

const config = { configurable: { thread_id: "thread-1" } };

// Initial run — hits interrupt and pauses
let result = await graph.invoke({ approved: false }, config);
console.log(result.__interrupt__);
// [{ value: 'Do you approve this action?', ... }]

// Resume with the human's response
result = await graph.invoke(new Command({ resume: true }), config);
console.log(result.approved);  // true

Approval Workflow

A common pattern: interrupt to show a draft, then route based on the human's decision.

class EmailAgentState(TypedDict):
email_content: str
draft_response: str
classification: dict

def human_review(state: EmailAgentState) -> Command[Literal["send_reply", "end"]]:
"""Pause for human review using interrupt and route based on decision."""
classification = state.get("classification", {})

# interrupt() must come first — any code before it will re-run on resume
human_decision = interrupt({
    "email_id": state.get("email_content", ""),
    "draft_response": state.get("draft_response", ""),
    "urgency": classification.get("urgency"),
    "action": "Please review and approve/edit this response"
})

# Process the human's decision
if human_decision.get("approved"):
    return Command(
        update={"draft_response": human_decision.get("edited_response", state.get("draft_response", ""))},
        goto="send_reply"
    )
else:
    # Rejection — human will handle directly
    return Command(update={}, goto=END)

&#x3C;/python>
&#x3C;typescript>
Interrupt for human review, then route to send or end based on the decision.
```typescript
import { interrupt, Command, END, GraphNode } from "@langchain/langgraph";

const humanReview: GraphNode&#x3C;typeof EmailAgentState> = async (state) => {
  const classification = state.classification!;

  // interrupt() must come first — any code before it will re-run on resume
  const humanDecision = interrupt({
    emailId: state.emailContent,
    draftResponse: state.responseText,
    urgency: classification.urgency,
    action: "Please review and approve/edit this response",
  });

  // Process the human's decision
  if (humanDecision.approved) {
    return new Command({
      update: { responseText: humanDecision.editedResponse || state.responseText },
      goto: "sendReply",
    });
  } else {
    return new Command({ update: {}, goto: END });
  }
};

Validation Loop

Use interrupt()
 in a loop to validate human input and re-prompt if invalid.

def get_age_node(state):
prompt = "What is your age?"

while True:
    answer = interrupt(prompt)

    # Validate the input
    if isinstance(answer, int) and answer > 0:
        break
    else:
        # Invalid input — ask again with a more specific prompt
        prompt = f"'{answer}' is not a valid age. Please enter a positive number."

return {"age": answer}

Each `Command(resume=...)` call provides the next answer. If invalid, the loop re-interrupts with a clearer message.
```python
config = {"configurable": {"thread_id": "form-1"}}
first = graph.invoke({"age": None}, config)
# __interrupt__: "What is your age?"

retry = graph.invoke(Command(resume="thirty"), config)
# __interrupt__: "'thirty' is not a valid age..."

final = graph.invoke(Command(resume=30), config)
print(final["age"])  # 30

const getAgeNode = (state: typeof State.State) => {
let prompt = "What is your age?";

while (true) {
const answer = interrupt(prompt);

// Validate the input
if (typeof answer === "number" &#x26;&#x26; answer > 0) {
  return { age: answer };
} else {
  // Invalid input — ask again with a more specific prompt
  prompt = `'${answer}' is not a valid age. Please enter a positive number.`;
}

}
};

&#x3C;/typescript>
&#x3C;/ex-validation-loop>

---

## Multiple Interrupts

When parallel branches each call `interrupt()`, resume all of them in a single invocation by mapping each interrupt ID to its resume value.

&#x3C;ex-multiple-interrupts>
&#x3C;python>
Resume multiple parallel interrupts by mapping interrupt IDs to values.
```python
from typing import Annotated, TypedDict
import operator
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt

class State(TypedDict):
    vals: Annotated[list[str], operator.add]

def node_a(state):
    answer = interrupt("question_a")
    return {"vals": [f"a:{answer}"]}

def node_b(state):
    answer = interrupt("question_b")
    return {"vals": [f"b:{answer}"]}

graph = (
    StateGraph(State)
    .add_node("a", node_a)
    .add_node("b", node_b)
    .add_edge(START, "a")
    .add_edge(START, "b")
    .add_edge("a", END)
    .add_edge("b", END)
    .compile(checkpointer=InMemorySaver())
)

config = {"configurable": {"thread_id": "1"}}

# Both parallel nodes hit interrupt() and pause
result = graph.invoke({"vals": []}, config)
# result["__interrupt__"] contains both Interrupt objects with IDs

# Resume all pending interrupts at once using a map of id -> value
resume_map = {
    i.id: f"answer for {i.value}"
    for i in result["__interrupt__"]
}
result = graph.invoke(Command(resume=resume_map), config)
# result["vals"] = ["a:answer for question_a", "b:answer for question_b"]

const State = Annotation.Root({
vals: Annotation&#x3C;string[]>({
reducer: (left, right) => left.concat(Array.isArray(right) ? right : [right]),
default: () => [],
}),
});

function nodeA(_state: typeof State.State) {
const answer = interrupt("question_a") as string;
return { vals: [a:${answer}
] };
}

function nodeB(_state: typeof State.State) {
const answer = interrupt("question_b") as string;
return { vals: [b:${answer}
] };
}

const graph = new StateGraph(State)
.addNode("a", nodeA)
.addNode("b", nodeB)
.addEdge(START, "a")
.addEdge(START, "b")
.addEdge("a", END)
.addEdge("b", END)
.compile({ checkpointer: new MemorySaver() });

const config = { configurable: { thread_id: "1" } };

const interruptedResult = await graph.invoke({ vals: [] }, config);

// Resume all pending interrupts at once
const resumeMap: Record&#x3C;string, string> = {};
if (isInterrupted(interruptedResult)) {
for (const i of interruptedResult[INTERRUPT]) {
if (i.id != null) {
resumeMap[i.id] = answer for ${i.value}
;
}
}
}
const result = await graph.invoke(new Command({ resume: resumeMap }), config);
// result.vals = ["a:answer for question_a", "b:answer for question_b"]

&#x3C;/typescript>
&#x3C;/ex-multiple-interrupts>

User-fixable errors use `interrupt()` to pause and collect missing data — that's the pattern covered by this skill. For the full 4-tier error handling strategy (RetryPolicy, Command error loops, etc.), see the **fundamentals** skill.

---

## Side Effects Before Interrupt Must Be Idempotent

When the graph resumes, the node restarts from the **beginning** — ALL code before `interrupt()` re-runs. In subgraphs, BOTH the parent node and the subgraph node re-execute.

&#x3C;idempotency-rules>

**Do:**
- Use **upsert** (not insert) operations before `interrupt()`
- Use **check-before-create** patterns
- Place side effects **after** `interrupt()` when possible
- Separate side effects into their own nodes

**Don't:**
- Create new records before `interrupt()` — duplicates on each resume
- Append to lists before `interrupt()` — duplicate entries on each resume

&#x3C;/idempotency-rules>

&#x3C;ex-idempotent-patterns>
&#x3C;python>
Idempotent operations before interrupt vs non-idempotent (wrong).
```python
# GOOD: Upsert is idempotent — safe before interrupt
def node_a(state: State):
    db.upsert_user(user_id=state["user_id"], status="pending_approval")
    approved = interrupt("Approve this change?")
    return {"approved": approved}

# GOOD: Side effect AFTER interrupt — only runs once
def node_a(state: State):
    approved = interrupt("Approve this change?")
    if approved:
        db.create_audit_log(user_id=state["user_id"], action="approved")
    return {"approved": approved}

# BAD: Insert creates duplicates on each resume!
def node_a(state: State):
    audit_id = db.create_audit_log({  # Runs again on resume!
        "user_id": state["user_id"],
        "action": "pending_approval",
    })
    approved = interrupt("Approve this change?")
    return {"approved": approved}

// GOOD: Side effect AFTER interrupt — only runs once
const nodeA = async (state: typeof State.State) => {
const approved = interrupt("Approve this change?");
if (approved) {
await db.createAuditLog({ userId: state.userId, action: "approved" });
}
return { approved };
};

// BAD: Insert creates duplicates on each resume!
const nodeA = async (state: typeof State.State) => {
await db.createAuditLog({  // Runs again on resume!
userId: state.userId,
action: "pending_approval",
});
const approved = interrupt("Approve this change?");
return { approved };
};

&#x3C;/typescript>
&#x3C;/ex-idempotent-patterns>

&#x3C;subgraph-interrupt-re-execution>

### Subgraph re-execution on resume

When a subgraph contains an `interrupt()`, resuming re-executes BOTH the parent node (that invoked the subgraph) AND the subgraph node (that called `interrupt()`):

&#x3C;python>
```python
def node_in_parent_graph(state: State):
    some_code()  # &#x3C;-- Re-executes on resume
    subgraph_result = subgraph.invoke(some_input)
    # ...

def node_in_subgraph(state: State):
    some_other_code()  # &#x3C;-- Also re-executes on resume
    result = interrupt("What's your name?")
    # ...

async function nodeInSubgraph(state: State) {
someOtherCode();  // &#x3C;-- Also re-executes on resume
const result = interrupt("What's your name?");
// ...
}

&#x3C;/typescript>
&#x3C;/subgraph-interrupt-re-execution>

---

## Command(resume) Warning

`Command(resume=...)` is the **only** Command pattern intended as input to `invoke()`/`stream()`. Do NOT pass `Command(update=...)` as input — it resumes from the latest checkpoint and the graph appears stuck. See the fundamentals skill for the full antipattern explanation.

---

## Fixes

&#x3C;fix-checkpointer-required-for-interrupts>
&#x3C;python>
Checkpointer required for interrupt functionality.
```python
# WRONG
graph = builder.compile()

# CORRECT
graph = builder.compile(checkpointer=InMemorySaver())

// CORRECT
const graph = builder.compile({ checkpointer: new MemorySaver() });

&#x3C;/typescript>
&#x3C;/fix-checkpointer-required-for-interrupts>

&#x3C;fix-resume-with-command>
&#x3C;python>
Use Command to resume from an interrupt (regular dict restarts graph).
```python
# WRONG
graph.invoke({"resume_data": "approve"}, config)

# CORRECT
graph.invoke(Command(resume="approve"), config)

// CORRECT
await graph.invoke(new Command({ resume: "approve" }), config);

&#x3C;/typescript>
&#x3C;/fix-resume-with-command>

&#x3C;boundaries>
### What You Should NOT Do

- Use interrupts without a checkpointer — will fail
- Resume without the same thread_id — creates a new thread instead of resuming
- Pass `Command(update=...)` as invoke input — graph appears stuck (use plain dict)
- Perform non-idempotent side effects before `interrupt()` — creates duplicates on resume
- Assume code before `interrupt()` only runs once — it re-runs every resume
&#x3C;/boundaries>

Source Transparency

This detail page is rendered from real SKILL.md content. Trust labels are metadata-based hints, not a safety guarantee.

Related Skills

Related by shared tags or category signals.

Web3

deep-agents-memory

No summary provided by upstream source.

Repository SourceNeeds Review
Web3

langgraph fundamentals

No summary provided by upstream source.

Repository SourceNeeds Review
Web3

langchain fundamentals

No summary provided by upstream source.

Repository SourceNeeds Review
Web3

langchain-rag

No summary provided by upstream source.

Repository SourceNeeds Review