首页/AI 智能体核心开发/langgraph-human-in-the-loop
L

langgraph-human-in-the-loop

by @langchain-aiv
4.5(20)

实现LangGraph的人机协作(Human-in-the-Loop),支持中断执行、恢复指令和状态检查点。

langgraphhuman-in-the-loop-(hitl)ai-supervisionworkflow-automationdecision-makingGitHub
安装方式
npx skills add langchain-ai/langchain-skills --skill langgraph-human-in-the-loop
compare_arrows

Before / After 效果对比

1
使用前

自动化流程缺乏人工干预机制,无法处理异常。难以中断执行或恢复指令,影响复杂任务的灵活性。

使用后

实现LangGraph人机协作,支持中断与恢复。增强流程控制,显著提升复杂任务的灵活性和可靠性。

SKILL.md

langgraph-human-in-the-loop

interrupt(value) — pauses execution, surfaces a value to the caller Command(resume=value) — resumes execution, providing the value back to interrupt() Checkpointer — required to save state while paused Thread ID — required to identify which paused execution to resume Requirements Three things are required for interrupts to work: Checkpointer — compile with checkpointer=InMemorySaver() (dev) or PostgresSaver (prod) Thread ID — pass {"configurable": {"thread_id": "..."}} to every invoke/stream call JSON-serializable payload — the value passed to interrupt() must be JSON-serializable Basic Interrupt + Resume interrupt(value) pauses the graph. The value surfaces in the result under interrupt. Command(resume=value) resumes — the resume value becomes the return value of interrupt(). Critical: when the graph resumes, the node restarts from the beginning — all code before interrupt() re-runs. class State(TypedDict): approved: bool def approval_node(state: State): # Pause and ask for approval approved = interrupt("Do you approve this action?") # When resumed, Command(resume=...) returns that value here return {"approved": approved} checkpointer = InMemorySaver() graph = ( StateGraph(State) .add_node("approval", approval_node) .add_edge(START, "approval") .add_edge("approval", END) .compile(checkpointer=checkpointer) ) config = {"configurable": {"thread_id": "thread-1"}} Initial run — hits interrupt and pauses result = graph.invoke({"approved": False}, config) print(result["interrupt"]) [Interrupt(value='Do you approve this action?')] Resume with the human's response result = graph.invoke(Command(resume=True), config) print(result["approved"]) # True Pause execution for human review and resume with Command. typescript import { interrupt, Command, MemorySaver, StateGraph, StateSchema, START, END } from "@langchain/langgraph"; import { z } from "zod"; const State = new StateSchema({ approved: z.boolean().default(false), }); const approvalNode = async (state: typeof State.State) => { // Pause and ask for approval const approved = interrupt("Do you approve this action?"); // When resumed, Command({ resume }) returns that value here return { approved }; }; const checkpointer = new MemorySaver(); const graph = new StateGraph(State) .addNode("approval", approvalNode) .addEdge(START, "approval") .addEdge("approval", END) .compile({ checkpointer }); const config = { configurable: { thread_id: "thread-1" } }; // Initial run — hits interrupt and pauses let result = await graph.invoke({ approved: false }, config); console.log(result.__interrupt__); // [{ value: 'Do you approve this action?', ... }] // Resume with the human's response result = await graph.invoke(new Command({ resume: true }), config); console.log(result.approved); // true Approval Workflow A common pattern: interrupt to show a draft, then route based on the human's decision. class EmailAgentState(TypedDict): email_content: str draft_response: str classification: dict def human_review(state: EmailAgentState) -> Command[Literal["send_reply", "end"]]: """Pause for human review using interrupt and route based on decision.""" classification = state.get("classification", {}) # interrupt() must come first — any code before it will re-run on resume human_decision = interrupt({ "email_id": state.get("email_content", ""), "draft_response": state.get("draft_response", ""), "urgency": classification.get("urgency"), "action": "Please review and approve/edit this response" }) # Process the human's decision if human_decision.get("approved"): return Command( update={"draft_response": human_decision.get("edited_response", state.get("draft_response", ""))}, goto="send_reply" ) else: # Rejection — human will handle directly return Command(update={}, goto=END) </python> <typescript> Interrupt for human review, then route to send or end based on the decision. typescript import { interrupt, Command, END, GraphNode } from "@langchain/langgraph"; const humanReview: GraphNode = async (state) => { const classification = state.classification!; // interrupt() must come first — any code before it will re-run on resume const humanDecision = interrupt({ emailId: state.emailContent, draftResponse: state.responseText, urgency: classification.urgency, action: "Please review and approve/edit this response", }); // Process the human's decision if (humanDecision.approved) { return new Command({ update: { responseText: humanDecision.editedResponse || state.responseText }, goto: "sendReply", }); } else { return new Command({ update: {}, goto: END }); } }; Validation Loop Use interrupt() in a loop to validate human input and re-prompt if invalid. def get_age_node(state): prompt = "What is your age?" while True: answer = interrupt(prompt) # Validate the input if isinstance(answer, int) and answer > 0: break else: # Invalid input — ask again with a more specific prompt prompt = f"'{answer}' is not a valid age. Please enter a positive number." return {"age": answer} Each Command(resume=...) call provides the next answer. If invalid, the loop re-interrupts with a clearer message. python config = {"configurable": {"thread_id": "form-1"}} first = graph.invoke({"age": None}, config) # __interrupt__: "What is your age?" retry = graph.invoke(Command(resume="thirty"), config) # __interrupt__: "'thirty' is not a valid age..." final = graph.invoke(Command(resume=30), config) print(final["age"]) # 30 const getAgeNode = (state: typeof State.State) => { let prompt = "What is your age?"; while (true) { const answer = interrupt(prompt); // Validate the input if (typeof answer === "number" &#x26;&#x26; answer > 0) { return { age: answer }; } else { // Invalid input — ask again with a more specific prompt prompt = `'${answer}' is not a valid age. Please enter a positive number.`; } } }; </typescript> </ex-validation-loop> --- ## Multiple Interrupts When parallel branches each call `interrupt()`, resume all of them in a single invocation by mapping each interrupt ID to its resume value. <ex-multiple-interrupts> <python> Resume multiple parallel interrupts by mapping interrupt IDs to values. python from typing import Annotated, TypedDict import operator from langgraph.checkpoint.memory import InMemorySaver from langgraph.graph import START, END, StateGraph from langgraph.types import Command, interrupt class State(TypedDict): vals: Annotated[list[str], operator.add] def node_a(state): answer = interrupt("question_a") return {"vals": [f"a:{answer}"]} def node_b(state): answer = interrupt("question_b") return {"vals": [f"b:{answer}"]} graph = ( StateGraph(State) .add_node("a", node_a) .add_node("b", node_b) .add_edge(START, "a") .add_edge(START, "b") .add_edge("a", END) .add_edge("b", END) .compile(checkpointer=InMemorySaver()) ) config = {"configurable": {"thread_id": "1"}} # Both parallel nodes hit interrupt() and pause result = graph.invoke({"vals": []}, config) # result["interrupt"] contains both Interrupt objects with IDs # Resume all pending interrupts at once using a map of id -> value resume_map = { i.id: f"answer for {i.value}" for i in result["interrupt"] } result = graph.invoke(Command(resume=resume_map), config) # result["vals"] = ["a:answer for question_a", "b:answer for question_b"] const State = Annotation.Root({ vals: Annotation<string[]>({ reducer: (left, right) => left.concat(Array.isArray(right) ? right : [right]), default: () => [], }), }); function nodeA(_state: typeof State.State) { const answer = interrupt("question_a") as string; return { vals: [a:${answer}] }; } function nodeB(_state: typeof State.State) { const answer = interrupt("question_b") as string; return { vals: [b:${answer}] }; } const graph = new StateGraph(State) .addNode("a", nodeA) .addNode("b", nodeB) .addEdge(START, "a") .addEdge(START, "b") .addEdge("a", END) .addEdge("b", END) .compile({ checkpointer: new MemorySaver() }); const config = { configurable: { thread_id: "1" } }; const interruptedResult = await graph.invoke({ vals: [] }, config); // Resume all pending interrupts at once const resumeMap: Record<string, string> = {}; if (isInterrupted(interruptedResult)) { for (const i of interruptedResult[INTERRUPT]) { if (i.id != null) { resumeMap[i.id] = answer for ${i.value}; } } } const result = await graph.invoke(new Command({ resume: resumeMap }), config); // result.vals = ["a:answer for question_a", "b:answer for question_b"] User-fixable errors use interrupt() to pause and collect missing data — that's the pattern covered by this skill. For the full 4-tier error handling strategy (RetryPolicy, Command error loops, etc.), see the fundamentals skill. --- ## Side Effects Before Interrupt Must Be Idempotent When the graph resumes, the node restarts from the beginning — ALL code before interrupt() re-runs. In subgraphs, BOTH the parent node and the subgraph node re-execute. Do: - Use upsert (not insert) operations before interrupt() - Use check-before-create patterns - Place side effects after interrupt() when possible - Separate side effects into their own nodes Don't: - Create new records before interrupt() — duplicates on each resume - Append to lists before interrupt() — duplicate entries on each resume Idempotent operations before interrupt vs non-idempotent (wrong). python # GOOD: Upsert is idempotent — safe before interrupt def node_a(state: State): db.upsert_user(user_id=state["user_id"], status="pending_approval") approved = interrupt("Approve this change?") return {"approved": approved} # GOOD: Side effect AFTER interrupt — only runs once def node_a(state: State): approved = interrupt("Approve this change?") if approved: db.create_audit_log(user_id=state["user_id"], action="approved") return {"approved": approved} # BAD: Insert creates duplicates on each resume! def node_a(state: State): audit_id = db.create_audit_log({ # Runs again on resume! "user_id": state["user_id"], "action": "pending_approval", }) approved = interrupt("Approve this change?") return {"approved": approved} // GOOD: Side effect AFTER interrupt — only runs once const nodeA = async (state: typeof State.State) => { const approved = interrupt("Approve this change?"); if (approved) { await db.createAuditLog({ userId: state.userId, action: "approved" }); } return { approved }; }; // BAD: Insert creates duplicates on each resume! const nodeA = async (state: typeof State.State) => { await db.createAuditLog({ // Runs again on resume! userId: state.userId, action: "pending_approval", }); const approved = interrupt("Approve this change?"); return { approved }; }; </typescript> </ex-idempotent-patterns> <subgraph-interrupt-re-execution> ### Subgraph re-execution on resume When a subgraph contains an `interrupt()`, resuming re-executes BOTH the parent node (that invoked the subgraph) AND the subgraph node (that called `interrupt()`): <python> python def node_in_parent_graph(state: State): some_code() # <-- Re-executes on resume subgraph_result = subgraph.invoke(some_input) # ... def node_in_subgraph(state: State): some_other_code() # <-- Also re-executes on resume result = interrupt("What's your name?") # ... async function nodeInSubgraph(state: State) { someOtherCode(); // <-- Also re-executes on resume const result = interrupt("What's your name?"); // ... } --- ## Command(resume) Warning Command(resume=...) is the only Command pattern intended as input to invoke()/stream(). Do NOT pass Command(update=...) as input — it resumes from the latest checkpoint and the graph appears stuck. See the fundamentals skill for the full antipattern explanation. --- ## Fixes Checkpointer required for interrupt functionality. python # WRONG graph = builder.compile() # CORRECT graph = builder.compile(checkpointer=InMemorySaver()) // CORRECT const graph = builder.compile({ checkpointer: new MemorySaver() }); </typescript> </fix-checkpointer-required-for-interrupts> <fix-resume-with-command> <python> Use Command to resume from an interrupt (regular dict restarts graph). python # WRONG graph.invoke({"resume_data": "approve"}, config) # CORRECT graph.invoke(Command(resume="approve"), config) // CORRECT await graph.invoke(new Command({ resume: "approve" }), config); ### What You Should NOT Do - Use interrupts without a checkpointer — will fail - Resume without the same thread_id — creates a new thread instead of resuming - Pass Command(update=...) as invoke input — graph appears stuck (use plain dict) - Perform non-idempotent side effects before interrupt() — creates duplicates on resume - Assume code before interrupt() only runs once — it re-runs every resume Weekly Installs1.6KRepositorylangchain-ai/la…n-skillsGitHub Stars376First Seen14 days agoSecurity AuditsGen Agent Trust HubPassSocketPassSnykPassInstalled onclaude-code1.4Kcursor1.2Kcodex1.2Kgithub-copilot1.2Kopencode1.2Kgemini-cli1.2K

用户评价 (0)

发表评价

效果
易用性
文档
兼容性

暂无评价

统计数据

安装量7.2K
评分4.5 / 5.0
版本
更新日期2026年5月23日
对比案例1 组

用户评分

4.5(20)
5
25%
4
50%
3
25%
2
0%
1
0%

为此 Skill 评分

0.0

兼容平台

🔧Claude Code
🔧OpenClaw
🔧OpenCode
🔧Codex
🔧Gemini CLI
🔧GitHub Copilot
🔧Amp
🔧Kimi CLI

时间线

创建2026年3月17日
最后更新2026年5月23日