Langgraph
LangGraph workflow patterns for state management, routing, parallel execution, supervisor-worker, tool calling, checkpointing, human-in-loop, streaming, subgraphs, and functional API. Use when building LangGraph pipelines, multi-agent systems, or AI workflows.
Primary Agent: workflow-architect
LangGraph Workflow Patterns
Comprehensive patterns for building production LangGraph workflows. Each category has individual rule files in rules/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| State Management | 4 | CRITICAL | Designing workflow state schemas, accumulators, reducers |
| Routing & Branching | 4 | HIGH | Dynamic routing, retry loops, semantic routing, cross-graph |
| Parallel Execution | 3 | HIGH | Fan-out/fan-in, map-reduce, concurrent agents |
| Supervisor Patterns | 3 | HIGH | Central coordinators, round-robin, priority dispatch |
| Tool Calling | 4 | CRITICAL | Binding tools, ToolNode, dynamic selection, approvals |
| Checkpointing | 3 | HIGH | Persistence, recovery, cross-thread Store memory |
| Human-in-Loop | 3 | MEDIUM | Approval gates, feedback loops, interrupt/resume |
| Streaming | 3 | MEDIUM | Real-time updates, token streaming, custom events |
| Subgraphs | 3 | MEDIUM | Modular composition, nested graphs, state mapping |
| Functional API | 3 | MEDIUM | @entrypoint/@task decorators, migration from StateGraph |
| Platform | 3 | HIGH | Deployment, RemoteGraph, double-texting strategies |
Total: 36 rules across 11 categories
State Management
State schemas determine how data flows between nodes. Wrong schemas cause silent data loss.
| Rule | File | Key Pattern |
|---|---|---|
| TypedDict State | rules/state-typeddict.md | TypedDict + Annotated[list, add] for accumulators |
| Pydantic Validation | rules/state-pydantic.md | BaseModel at boundaries, TypedDict internally |
| MessagesState | rules/state-messages.md | MessagesState or add_messages reducer |
| Custom Reducers | rules/state-reducers.md | Annotated[T, reducer_fn] for merge/overwrite |
Routing & Branching
Control flow between nodes. Always include END fallback to prevent hangs.
| Rule | File | Key Pattern |
|---|---|---|
| Conditional Edges | rules/routing-conditional.md | add_conditional_edges with explicit mapping |
| Retry Loops | rules/routing-retry-loops.md | Loop-back edges with max retry counter |
| Semantic Routing | rules/routing-semantic.md | Embedding similarity or Command API routing |
| Cross-Graph Navigation | rules/routing-cross-graph.md | Command(graph=Command.PARENT) for parent/sibling routing |
Parallel Execution
Run independent nodes concurrently. Use Annotated[list, add] to accumulate results.
| Rule | File | Key Pattern |
|---|---|---|
| Fan-Out/Fan-In | rules/parallel-fanout-fanin.md | Send API for dynamic parallel branches |
| Map-Reduce | rules/parallel-map-reduce.md | asyncio.gather + result aggregation |
| Error Isolation | rules/parallel-error-isolation.md | return_exceptions=True + per-branch timeout |
Supervisor Patterns
Central coordinator routes to specialized workers. Workers return to supervisor.
| Rule | File | Key Pattern |
|---|---|---|
| Basic Supervisor | rules/supervisor-basic.md | Command API for state update + routing |
| Priority Routing | rules/supervisor-priority.md | Priority dict ordering agent execution |
| Round-Robin | rules/supervisor-round-robin.md | Completion tracking with agents_completed |
Tool Calling
Integrate function calling into LangGraph agents. Keep tools under 10 per agent.
| Rule | File | Key Pattern |
|---|---|---|
| Tool Binding | rules/tools-bind.md | model.bind_tools(tools) + tool_choice |
| ToolNode Execution | rules/tools-toolnode.md | ToolNode(tools) prebuilt parallel executor |
| Dynamic Selection | rules/tools-dynamic.md | Embedding-based tool relevance filtering |
| Tool Interrupts | rules/tools-interrupts.md | interrupt() for approval gates on tools |
Checkpointing
Persist workflow state for recovery and debugging.
| Rule | File | Key Pattern |
|---|---|---|
| Checkpointer Setup | rules/checkpoints-setup.md | MemorySaver dev / PostgresSaver prod |
| State Recovery | rules/checkpoints-recovery.md | thread_id resume + get_state_history |
| Cross-Thread Store | rules/checkpoints-store.md | Store for long-term memory across threads |
Human-in-Loop
Pause workflows for human intervention. Requires checkpointer for state persistence.
| Rule | File | Key Pattern |
|---|---|---|
| Interrupt/Resume | rules/human-in-loop-interrupt.md | interrupt() function + Command(resume=) |
| Approval Gate | rules/human-in-loop-approval.md | interrupt_before + state update + resume |
| Feedback Loop | rules/human-in-loop-feedback.md | Iterative interrupt until approved |
Streaming
Real-time updates and progress tracking for workflows.
| Rule | File | Key Pattern |
|---|---|---|
| Stream Modes | rules/streaming-modes.md | 5 modes: values, updates, messages, custom, debug |
| Token Streaming | rules/streaming-tokens.md | messages mode with node/tag filtering |
| Custom Events | rules/streaming-custom-events.md | get_stream_writer() for progress events |
Subgraphs
Compose modular, reusable workflow components with nested graphs.
| Rule | File | Key Pattern |
|---|---|---|
| Invoke from Node | rules/subgraphs-invoke.md | Different schemas, explicit state mapping |
| Add as Node | rules/subgraphs-add-as-node.md | Shared state, add_node(name, compiled_graph) |
| State Mapping | rules/subgraphs-state-mapping.md | Boundary transforms between parent/child |
Functional API
Build workflows using @entrypoint and @task decorators instead of explicit graph construction.
| Rule | File | Key Pattern |
|---|---|---|
| @entrypoint | rules/functional-entrypoint.md | Workflow entry point with optional checkpointer |
| @task | rules/functional-task.md | Returns futures, .result() to block |
| Migration | rules/functional-migration.md | StateGraph to Functional API conversion |
Platform
Deploy graphs as managed APIs with persistence, streaming, and multi-tenancy.
| Rule | File | Key Pattern |
|---|---|---|
| Deployment | rules/platform-deployment.md | langgraph.json + CLI + Assistants API |
| RemoteGraph | rules/platform-remote-graph.md | RemoteGraph for calling deployed graphs |
| Double Texting | rules/platform-double-texting.md | 4 strategies: reject, rollback, enqueue, interrupt |
Quick Start Example
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from typing import TypedDict, Annotated, Literal
from operator import add
class State(TypedDict):
input: str
results: Annotated[list[str], add]
def supervisor(state) -> Command[Literal["worker", END]]:
if not state.get("results"):
return Command(update={"input": state["input"]}, goto="worker")
return Command(goto=END)
def worker(state) -> dict:
return {"results": [f"Processed: {state['input']}"]}
graph = StateGraph(State)
graph.add_node("supervisor", supervisor)
graph.add_node("worker", worker)
graph.add_edge(START, "supervisor")
graph.add_edge("worker", "supervisor")
app = graph.compile()2026 Key Patterns
- Command API: Use
Command(update=..., goto=...)when updating state AND routing together - context_schema: Pass runtime config (temperature, provider) without polluting state
- CachePolicy: Cache expensive node results with TTL via
InMemoryCache - RemainingSteps: Proactively handle recursion limits
- Store: Cross-thread memory separate from Checkpointer (thread-scoped)
- interrupt(): Dynamic interrupts inside node logic (replaces
interrupt_beforefor conditional cases) - add_edge(START, node): Not
set_entry_point()(deprecated)
Key Decisions
| Decision | Recommendation |
|---|---|
| State type | TypedDict internally, Pydantic at boundaries |
| Entry point | add_edge(START, node) not set_entry_point() |
| Routing + state update | Command API |
| Routing only | Conditional edges |
| Accumulators | Annotated[list[T], add] always |
| Dev checkpointer | MemorySaver |
| Prod checkpointer | PostgresSaver |
| Short-term memory | Checkpointer (thread-scoped) |
| Long-term memory | Store (cross-thread, namespaced) |
| Max parallel branches | 5-10 concurrent |
| Tools per agent | 5-10 max (dynamic selection for more) |
| Approval gates | interrupt() for high-risk operations |
| Stream modes | ["updates", "custom"] for most UIs |
| Subgraph pattern | Invoke for isolation, Add-as-Node for shared state |
| Functional vs Graph | Functional for simple flows, Graph for complex topology |
Common Mistakes
- Forgetting
addreducer (overwrites instead of accumulates) - Mutating state in place (breaks checkpointing)
- No END fallback in routing (workflow hangs)
- Infinite retry loops (no max counter)
- Side effects in router functions
- Too many tools per agent (context overflow)
- Raising exceptions in tools (crashes agent loop)
- No checkpointer in production (lose progress on crash)
- Wrapping
interrupt()in try/except (breaks the mechanism) - Not transforming state at subgraph boundaries
- Forgetting
.result()on Functional API tasks - Using
set_entry_point()(deprecated, useadd_edge(START, ...))
Evaluations
See test-cases.json for consolidated test cases across all categories.
Related Skills
ork:agent-orchestration- Higher-level multi-agent coordination, ReAct loop patterns, and framework comparisonstemporal-io- Durable execution alternativeork:llm-integration- General LLM function callingtype-safety-validation- Pydantic model patterns
Rules (36)
Recover interrupted workflow state and debug checkpoint history — HIGH
State Recovery and Debugging
Resume interrupted workflows and inspect checkpoint history for debugging.
Incorrect — no recovery handling:
# If this crashes at step 5 of 10, all progress is lost
result = app.invoke(initial_state)Correct — automatic recovery:
import logging
async def run_with_recovery(workflow_id: str, initial_state: dict):
"""Run workflow with automatic recovery from checkpoint."""
config = {"configurable": {"thread_id": workflow_id}}
try:
state = app.get_state(config)
if state.values:
logging.info(f"Resuming workflow {workflow_id}")
return app.invoke(None, config=config) # None = resume from checkpoint
except Exception:
pass # No existing checkpoint
logging.info(f"Starting new workflow {workflow_id}")
return app.invoke(initial_state, config=config)Debugging with checkpoint history:
# Get all checkpoints for a workflow
config = {"configurable": {"thread_id": "analysis-123"}}
for checkpoint in app.get_state_history(config):
print(f"Step: {checkpoint.metadata['step']}")
print(f"Node: {checkpoint.metadata['source']}")
print(f"State: {checkpoint.values}")
# Rollback to previous checkpoint
history = list(app.get_state_history(config))
previous = history[1] # One step back
app.update_state(config, previous.values)Graph Migrations (2026): LangGraph handles topology changes automatically — adding/removing nodes, adding/removing state keys. Limitation: can't remove a node if a thread is interrupted at that node.
Key rules:
- Pass
Noneas input to resume from checkpoint - Use
get_state_history()to inspect all checkpoints - Use
update_state()for rollback/manual state correction - Clean up old checkpoints (TTL-based or keep-latest-N)
Reference: LangGraph Persistence
Configure persistent checkpointer to survive crashes in production workflows — HIGH
Checkpointer Setup
Use MemorySaver for development, PostgresSaver for production.
Incorrect — no checkpointer:
app = workflow.compile() # No checkpointer — progress lost on crash
result = app.invoke(state) # Can't resume if interruptedCorrect — environment-appropriate checkpointer:
from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver
# Development: In-memory (fast, no setup)
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
# Production: PostgreSQL (shared, durable)
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
app = workflow.compile(checkpointer=checkpointer)
# Invoke with thread_id for resumability
config = {"configurable": {"thread_id": "analysis-123"}}
result = app.invoke(initial_state, config=config)Key rules:
- Always use a checkpointer in production
- Use deterministic
thread_id(not random UUID) so you can resume - Checkpointer saves state after each node execution
- Add
interrupt_beforefor manual review points
Reference: LangGraph Persistence
Use Store for cross-thread memory instead of per-thread checkpoints — HIGH
Cross-Thread Store Memory
Checkpointer = short-term (thread-scoped). Store = long-term (cross-thread, namespaced).
Incorrect — preferences in checkpointer only:
# User preferences stored in thread-1 state
# When user starts thread-2, preferences are lost!
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
app = workflow.compile(checkpointer=checkpointer)Correct — Store for cross-thread memory:
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore
# Checkpointer = SHORT-TERM (thread-scoped)
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
# Store = LONG-TERM (cross-thread, namespaced)
store = PostgresStore.from_conn_string(DATABASE_URL)
# Compile with BOTH
app = workflow.compile(checkpointer=checkpointer, store=store)Using Store in nodes:
from langgraph.store.base import BaseStore
async def agent_with_memory(state: AgentState, *, store: BaseStore):
user_id = state["user_id"]
# Read cross-thread memory
memories = await store.aget(namespace=("users", user_id), key="preferences")
if memories and memories.value.get("prefers_concise"):
state["system_prompt"] += "\nBe concise."
# Write cross-thread memory
await store.aput(
namespace=("users", user_id),
key="last_topic",
value={"topic": state["current_topic"], "timestamp": datetime.now().isoformat()}
)
return stateMemory architecture:
Thread 1 (chat-001) Thread 2 (chat-002)
┌─────────────────┐ ┌─────────────────┐
│ Checkpointer │ │ Checkpointer │
│ - msg history │ │ - msg history │
│ - workflow pos │ │ - workflow pos │
└─────────────────┘ └─────────────────┘
↓ shared ↓
┌─────────────────────────────────────┐
│ Store (cross-thread) │
│ namespace=("users", "alice") │
│ - preferences, last_topic │
└─────────────────────────────────────┘Key rules:
- Checkpointer for conversation history and workflow position (thread-scoped)
- Store for user preferences, learned facts, settings (cross-thread)
- Always use namespaces in Store to prevent data collisions
- Clean up old checkpoints but keep Store data persistent
Reference: LangGraph Memory
Configure @entrypoint decorator with checkpointer for resumable workflows — MEDIUM
@entrypoint Decorator
Define workflow entry points with optional checkpointing. Simpler than explicit StateGraph construction.
Incorrect — no checkpointer for resumable workflow:
@entrypoint()
def my_workflow(data: str) -> str:
result = expensive_task(data).result()
return result
# If interrupted, all progress lost — no checkpointerCorrect — with checkpointer:
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
@entrypoint(checkpointer=checkpointer)
def my_workflow(data: str) -> str:
result = expensive_task(data).result()
return result
# Invoke with thread_id for resumability
config = {"configurable": {"thread_id": "session-123"}}
result = my_workflow.invoke("input", config)Human-in-the-loop with @entrypoint:
from langgraph.types import interrupt, Command
@entrypoint(checkpointer=checkpointer)
def approval_workflow(request: dict) -> dict:
result = analyze_request(request).result()
approved = interrupt({"question": "Approve?", "details": result})
if approved:
return execute_action(result).result()
return {"status": "rejected"}
# Resume after human review
for chunk in approval_workflow.stream(Command(resume=True), config):
print(chunk)Graph API vs Functional API:
- Functional: Sequential workflows, orchestrator-worker, simpler debugging
- Graph: Complex topology, dynamic routing, subgraph composition
Key rules:
- Add
checkpointerwhen workflow needs persistence/resume - Functional API builds graph implicitly from task call order
@entrypointis the workflow entry — orchestrates@taskfunctions- Regular functions (no decorator) execute normally, not tracked
Reference: LangGraph Functional API
Migrate from StateGraph to Functional API while preserving routing flexibility — MEDIUM
StateGraph to Functional API Migration
Convert simple StateGraph workflows to Functional API. Keep complex topologies as StateGraph.
Before — Graph API:
from langgraph.graph import StateGraph
def node_a(state):
return {"data": process(state["input"])}
def node_b(state):
return {"result": transform(state["data"])}
graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_edge("a", "b")
app = graph.compile()After — Functional API:
from langgraph.func import entrypoint, task
@task
def process_data(input: str) -> str:
return process(input)
@task
def transform_data(data: str) -> str:
return transform(data)
@entrypoint()
def workflow(input: str) -> str:
data = process_data(input).result()
return transform_data(data).result()Orchestrator-worker migration:
@task
def plan(topic: str) -> list[str]:
return planner.invoke(f"Create outline for: {topic}")
@task
def write_section(section: str) -> str:
return llm.invoke(f"Write section: {section}")
@entrypoint()
def report_workflow(topic: str) -> str:
sections = plan(topic).result()
section_futures = [write_section(s) for s in sections] # Fan-out
completed = [f.result() for f in section_futures] # Fan-in
return "\n\n".join(completed)TypeScript equivalent:
import { entrypoint, task, MemorySaver } from "@langchain/langgraph";
const processData = task("processData", async (data: string) => transform(data));
const workflow = entrypoint(
{ name: "myWorkflow", checkpointer: new MemorySaver() },
async (input: string) => {
const result = await processData(input);
return result;
}
);When NOT to migrate:
- Complex dynamic routing (conditional edges, semantic routing)
- Subgraph composition with different state schemas
- Graph topologies that aren't linear/tree-shaped
Incorrect — using Graph API for simple linear workflow:
from langgraph.graph import StateGraph
def node_a(state):
return {"data": process(state["input"])}
def node_b(state):
return {"result": transform(state["data"])}
# Verbose for simple linear flow
graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_edge("a", "b")
app = graph.compile()Correct — Functional API for simple workflows:
from langgraph.func import entrypoint, task
@task
def process_data(input: str) -> str:
return process(input)
@task
def transform_data(data: str) -> str:
return transform(data)
@entrypoint()
def workflow(input: str) -> str:
data = process_data(input).result() # Clean, simple
return transform_data(data).result()Key rules:
- Functional API is best for sequential and orchestrator-worker patterns
- Use Graph API when you need complex topology (loops, diamonds, dynamic routing)
- Both APIs support checkpointing, streaming, and human-in-the-loop
- Futures enable implicit parallelism without explicit Send API
Reference: LangGraph Functional API
Use @task decorator correctly and always call .result() on futures — MEDIUM
@task Decorator and Futures
@task functions return futures. Call .result() to get the value. Launch multiple tasks before calling .result() for parallelism.
Incorrect — forgetting .result():
@task
def process(data: str) -> str:
return transform(data)
@entrypoint()
def workflow(data: str) -> str:
result = process(data) # result is a Future, not a string!
return result # Returns Future object, not the processed stringCorrect — parallel execution with futures:
@task
def fetch_source_a(query: str) -> dict:
return api_a.search(query)
@task
def fetch_source_b(query: str) -> dict:
return api_b.search(query)
@entrypoint()
def parallel_search(query: str) -> dict:
# Launch in parallel — futures start immediately
future_a = fetch_source_a(query)
future_b = fetch_source_b(query)
# Block on both results
results = [future_a.result(), future_b.result()]
return {"combined": results}Map over collection:
@task
def process_item(item: dict) -> dict:
return transform(item)
@entrypoint()
def batch_workflow(items: list[dict]) -> list[dict]:
futures = [process_item(item) for item in items] # All launch in parallel
return [f.result() for f in futures] # Collect all resultsKey rules:
@taskreturns a future — always call.result()to get the value- Launch tasks before blocking on
.result()for parallel execution - Tasks inside
@entrypointare tracked for persistence and streaming - Don't nest
@entrypointinside@entrypoint
Reference: LangGraph Functional API
Implement approval gate pattern with reject paths to prevent dead-end workflows — MEDIUM
Approval Gate Pattern
Use interrupt_before for static approval points. Update state and resume.
Incorrect — no reject path:
app = workflow.compile(interrupt_before=["publish"])
# Human reviews...
state.values["approved"] = True
app.update_state(config, state.values)
result = app.invoke(None, config)
# What if human rejects? No path!Correct — approval with approve/reject paths:
def approval_gate(state) -> str:
if not state.get("human_reviewed"):
return state # Pauses here due to interrupt_before
if state["approved"]:
return {"next": "publish"}
else:
return {"next": "revise"}
# Compile with interrupt
app = workflow.compile(interrupt_before=["approval_gate"])
# Step 1: Run until approval gate
config = {"configurable": {"thread_id": "doc-123"}}
result = app.invoke({"topic": "AI"}, config=config)
# Step 2: Human reviews
state = app.get_state(config)
print(f"Draft: {state.values['draft']}")
# Step 3: Human decides
app.update_state(config, {
"approved": True,
"feedback": "Looks good",
"human_reviewed": True,
})
# Step 4: Resume
result = app.invoke(None, config=config)Multiple approval points:
app = workflow.compile(interrupt_before=["first_review", "final_review"])Key rules:
- Always include both approve and reject paths
- Set timeout for human review (24-48h, auto-reject after)
- Send notification when workflow pauses (email/Slack)
- Use
get_state()to show current state for review
Reference: LangGraph Human-in-the-Loop
Build iterative feedback loops that refine output across multiple human rounds — MEDIUM
Feedback Loop Pattern
Repeatedly interrupt until human approves, incorporating feedback each iteration.
Incorrect — single feedback, no iteration:
def review(state):
feedback = get_human_feedback()
state["feedback"] = feedback
# Proceeds regardless of whether human approvedCorrect — iterative feedback loop:
from langgraph.types import interrupt, Command
async def run_with_feedback(initial_state: dict):
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
while True:
result = app.invoke(initial_state, config=config)
if "__interrupt__" not in result:
return result # Completed without interrupt
info = result["__interrupt__"][0].value
print(f"Output: {info.get('output', 'N/A')}")
feedback = input("Approve? (yes/no/feedback): ")
if feedback.lower() == "yes":
return app.invoke(Command(resume={"approved": True}), config=config)
elif feedback.lower() == "no":
return {"status": "rejected"}
else:
# Incorporate feedback and retry
initial_state = None
app.invoke(
Command(resume={"approved": False, "feedback": feedback}),
config=config
)Input validation loop:
def get_valid_age(state):
prompt = "What is your age?"
while True:
answer = interrupt(prompt)
if isinstance(answer, int) and 0 < answer < 150:
return {"age": answer}
prompt = f"'{answer}' is not valid. Enter 1-150."Key rules:
- Track
review_countto limit iterations - Pass rejection feedback back to generation node
- Preserve approved partial results between iterations
- Timeout after max iterations with best-effort result
Reference: LangGraph Human-in-the-Loop
Use interrupt() for conditional pausing and Command for resuming workflows — MEDIUM
Dynamic Interrupt and Resume
Use interrupt() for conditional pausing and Command(resume=) for resuming.
Incorrect — interrupt in try/except:
def approval_node(state):
try:
response = interrupt({"question": "Approve?"}) # interrupt raises!
except Exception:
response = {"approved": False} # Catches the interrupt exception — brokenCorrect — dynamic interrupt (2026 pattern):
from langgraph.types import interrupt, Command
def approval_node(state):
"""Conditionally interrupt based on risk level."""
if state["risk_level"] == "high":
response = interrupt({
"question": "High-risk action. Approve?",
"action": state["proposed_action"],
"risk_level": state["risk_level"],
})
if not response.get("approved"):
return {"status": "rejected", "action": None}
return {"status": "approved", "action": state["proposed_action"]}Resume with Command:
config = {"configurable": {"thread_id": "workflow-123"}}
# Initial run — stops at interrupt
result = graph.invoke(initial_state, config)
# Check for interrupt
if "__interrupt__" in result:
info = result["__interrupt__"][0].value
print(f"Question: {info['question']}")
# Resume with user response
final = graph.invoke(Command(resume={"approved": True}), config)Critical rules:
- DO: Place side effects AFTER interrupt calls
- DO: Make pre-interrupt side effects idempotent (upsert, not create)
- DO: Keep interrupt call order consistent across executions
- DON'T: Wrap interrupt in bare try/except
- DON'T: Conditionally skip interrupt calls (breaks determinism)
- DON'T: Pass functions or class instances to interrupt()
Reference: LangGraph Human-in-the-Loop
Isolate errors in parallel branches to preserve completed results on failure — HIGH
Error Isolation in Parallel Execution
Isolate failures in parallel branches. Use return_exceptions=True and per-branch timeouts.
Incorrect — one failure kills all:
async def run_agents(state):
tasks = [agent.analyze(state["content"]) for agent in agents]
results = await asyncio.gather(*tasks) # One exception crashes everything
return {"results": results}Correct — isolated failures with timeout:
import asyncio
async def parallel_with_isolation(agents: list, content: str, timeout: int = 30):
"""Run agents with per-agent timeout and error isolation."""
async def run_with_timeout(agent):
try:
return await asyncio.wait_for(agent.analyze(content), timeout=timeout)
except asyncio.TimeoutError:
return {"agent": agent.name, "error": "timeout"}
except Exception as e:
return {"agent": agent.name, "error": str(e)}
tasks = [run_with_timeout(a) for a in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception) and "error" not in r]
failures = [r for r in results if isinstance(r, Exception) or "error" in r]
return {"successes": successes, "failures": failures}Key rules:
- Always use
return_exceptions=Trueinasyncio.gather - Add per-branch timeout (30-60s) to prevent slow branches blocking
- Separate successes from failures — partial results are still valuable
- Log failures for debugging but continue with available results
Reference: LangGraph Parallel Execution
Use Send API for dynamic fan-out and fan-in parallel branches — HIGH
Fan-Out/Fan-In Pattern
Use Send API to dispatch dynamic parallel branches. Results accumulate via Annotated[list, add].
Incorrect — sequential execution disguised as parallel:
def process_all(state):
results = []
for task in state["tasks"]:
result = process(task) # Sequential!
results.append(result)
return {"results": results}Correct — true parallel with Send API:
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from typing import TypedDict, Annotated
from operator import add
class OverallState(TypedDict):
subjects: list[str]
jokes: Annotated[list[str], add] # Accumulates from parallel branches
class JokeState(TypedDict):
subject: str
def continue_to_jokes(state: OverallState) -> list[Send]:
"""Fan-out: create parallel branch for each subject."""
return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]
def generate_joke(state: JokeState) -> dict:
"""Worker: each runs in parallel, writes to accumulator."""
joke = llm.invoke(f"Tell a joke about {state['subject']}")
return {"jokes": [f"{state['subject']}: {joke.content}"]}
builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes)
builder.add_edge("generate_joke", END) # All branches converge automaticallyKey rules:
SendAPI creates true parallel branches (not async, graph-level parallelism)- Worker state can differ from overall state (separate TypedDict)
- Use
Annotated[list, add]on the accumulating field in overall state - All branches converge automatically when connected to a common next node or END
Reference: LangGraph Send API
Apply map-reduce with asyncio.gather for independent parallel task processing — HIGH
Map-Reduce Pattern
Use asyncio.gather for async parallel processing within a single node, or Send for graph-level parallelism.
Incorrect — sequential in async context:
async def process_all(state):
results = []
for item in state["items"]:
result = await process_item(item) # Sequential await!
results.append(result)
return {"results": results}Correct — parallel with asyncio.gather:
import asyncio
async def parallel_map(items: list, process_fn) -> list:
"""Map: process all items concurrently."""
tasks = [asyncio.create_task(process_fn(item)) for item in items]
return await asyncio.gather(*tasks, return_exceptions=True)
def reduce_results(results: list) -> dict:
"""Reduce: combine all results."""
successes = [r for r in results if not isinstance(r, Exception)]
failures = [r for r in results if isinstance(r, Exception)]
return {
"total": len(results),
"passed": len(successes),
"failed": len(failures),
"results": successes,
"errors": [str(e) for e in failures],
}
async def map_reduce_node(state: State) -> dict:
results = await parallel_map(state["items"], process_item_async)
return {"summary": reduce_results(results)}Key rules:
- Use
asyncio.gatherfor I/O-bound parallel work within a node - Use
SendAPI for graph-level parallelism across nodes - Always use
return_exceptions=Trueto prevent one failure killing all - Max 5-10 concurrent tasks to avoid overwhelming APIs
Reference: LangGraph Parallel Execution
Configure LangGraph Platform for local dev, Docker builds, and cloud deployment — HIGH
Platform Deployment
LangGraph Platform provides infrastructure for deploying graphs as APIs with persistence, streaming, and multi-tenancy built in.
Incorrect — no configuration file:
# Running graph directly without Platform config
python my_graph.py # No API server, no persistence, no multi-tenancyCorrect — langgraph.json + CLI:
{
"dependencies": ["langchain_openai", "./my_package"],
"graphs": {
"my_agent": "./my_package/agent.py:graph",
"my_workflow": "./my_package/workflow.py:app"
},
"env": "./.env"
}# Local development (hot reload, in-memory)
langgraph dev
# Build Docker image for self-hosted deployment
langgraph build -t my-agent:latest
# Deploy to LangGraph Cloud
langgraph deployAssistants API — multiple configs from one graph:
from langgraph_sdk import get_client
client = get_client(url="http://localhost:2024")
# Create assistants with different configs from the same graph
creative = await client.assistants.create(
graph_id="my_agent",
config={"configurable": {"temperature": 0.9, "model": "claude-sonnet-4-5-20250929"}},
name="creative-writer",
)
precise = await client.assistants.create(
graph_id="my_agent",
config={"configurable": {"temperature": 0.1, "model": "claude-sonnet-4-5-20250929"}},
name="precise-analyst",
)Background runs and webhooks:
# Fire-and-forget with webhook callback
run = await client.runs.create(
thread_id=thread["thread_id"],
assistant_id=creative["assistant_id"],
input={"messages": [{"role": "user", "content": "Write a story"}]},
webhook="https://myapp.com/api/langgraph-callback",
background=True,
)@entrypoint for Platform deployments:
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver
@entrypoint(path="my_workflow", checkpointer=MemorySaver())
def my_workflow(inputs: dict) -> dict:
result = analyze_task(inputs["query"]).result()
return {"answer": result}Authentication middleware:
from langgraph_sdk import Auth
auth = Auth()
@auth.authenticate
async def authenticate(authorization: str) -> str:
# Validate token, return user_id
user = await verify_token(authorization)
return user["id"]
@auth.on
async def authorize(params, user_id: str):
# Filter resources by user ownership
params["metadata"] = {"owner": user_id}Key rules:
langgraph.jsonis required — defines graphs, dependencies, and env vars- Use
langgraph devfor local development (hot reload, in-memory state) - Use
langgraph buildfor Docker images,langgraph deployfor cloud - Assistants API creates multiple configurations from a single deployed graph
@entrypoint(path=...)registers functional API workflows with Platform- Background runs return immediately; use webhooks for completion callbacks
- Add authentication middleware for multi-tenant deployments
Reference: LangGraph Platform
Handle concurrent user messages with double-texting strategies — MEDIUM
Double Texting
When a user sends a new message while a previous run is still executing, LangGraph Platform provides four strategies to handle the conflict.
Problem — no strategy (race condition):
User: "Analyze Q1 data" → Run starts...
User: "Actually use Q2 data" → Second run starts on same thread
# Both runs mutate state concurrently — corrupted resultsStrategy 1 — Reject (simplest, safest):
from langgraph_sdk import get_client
client = get_client(url="http://localhost:2024")
# New input rejected while run is active
run = await client.runs.create(
thread_id=thread["thread_id"],
assistant_id="my_agent",
input={"messages": [{"role": "user", "content": "Analyze Q2"}]},
multitask_strategy="reject",
)
# Raises 409 Conflict if a run is already activeUse case: Strict turn-based workflows, form submissions, payment processing.
Strategy 2 — Rollback (cancel and restart):
# Cancel current run, roll back state, start fresh with new input
run = await client.runs.create(
thread_id=thread["thread_id"],
assistant_id="my_agent",
input={"messages": [{"role": "user", "content": "Use Q2 data instead"}]},
multitask_strategy="rollback",
)
# Previous run cancelled, state rolled back to before it startedUse case: Chatbots where user corrections should override in-progress work.
Strategy 3 — Enqueue (process sequentially):
# Queue new input to run after current run completes
run = await client.runs.create(
thread_id=thread["thread_id"],
assistant_id="my_agent",
input={"messages": [{"role": "user", "content": "Also check Q3"}]},
multitask_strategy="enqueue",
)
# Runs sequentially: current run finishes, then queued run startsUse case: Task queues, sequential pipelines, batch processing interfaces.
Strategy 4 — Interrupt (stop and continue):
# Interrupt current run but keep its state, then continue with new input
run = await client.runs.create(
thread_id=thread["thread_id"],
assistant_id="my_agent",
input={"messages": [{"role": "user", "content": "Focus on revenue"}]},
multitask_strategy="interrupt",
)
# Current run stops mid-execution, new run builds on existing stateUse case: Interactive agents where context should accumulate across interruptions.
Key rules:
- Set
multitask_strategyon everyruns.createcall (no global default) reject— safest, returns 409 if busy; client must retryrollback— cancels active run, resets state, starts new runenqueue— queues new run after current completes; preserves orderinginterrupt— stops active run mid-execution, keeps partial state, starts new run- Choose based on UX: corrections → rollback, queuing → enqueue, strict → reject
- Only relevant for same-thread concurrent runs; different threads are independent
Reference: Double Texting
Use RemoteGraph to invoke deployed LangGraph instances from client code — HIGH
RemoteGraph
RemoteGraph provides a graph-compatible interface for calling deployed LangGraph instances. It supports sync/async invocation, streaming, and state management.
Incorrect — manual HTTP calls:
import httpx
# Manual API calls lose type safety and streaming support
response = httpx.post(
"http://localhost:2024/runs",
json={"input": {"messages": [{"role": "user", "content": "hello"}]}},
)Correct — RemoteGraph (sync):
from langgraph.pregel.remote import RemoteGraph
remote = RemoteGraph(
"my_agent",
url="http://localhost:2024",
)
# Invoke like a local graph
result = remote.invoke(
{"messages": [{"role": "user", "content": "Analyze this data"}]},
config={"configurable": {"thread_id": "remote-123"}},
)Async invocation:
from langgraph.pregel.remote import RemoteGraph
remote = RemoteGraph(
"my_agent",
url="http://localhost:2024",
)
result = await remote.ainvoke(
{"messages": [{"role": "user", "content": "Analyze this data"}]},
config={"configurable": {"thread_id": "remote-123"}},
)Streaming from remote graphs:
# Stream updates in real-time
async for chunk in remote.astream(
{"messages": [{"role": "user", "content": "Write a report"}]},
config={"configurable": {"thread_id": "stream-456"}},
stream_mode="updates",
):
print(chunk)
# Stream events for fine-grained control
async for event in remote.astream_events(
{"messages": [{"role": "user", "content": "Summarize"}]},
config={"configurable": {"thread_id": "events-789"}},
version="v2",
):
if event["event"] == "on_chat_model_stream":
print(event["data"]["chunk"].content, end="")Use as a subgraph node:
from langgraph.graph import StateGraph, START, END
from langgraph.pregel.remote import RemoteGraph
remote_analyzer = RemoteGraph("analyzer", url="http://analyzer:2024")
builder = StateGraph(OrchestratorState)
builder.add_node("analyze", remote_analyzer) # Remote graph as local node
builder.add_node("summarize", summarize_node)
builder.add_edge(START, "analyze")
builder.add_edge("analyze", "summarize")
builder.add_edge("summarize", END)Key rules:
- Import from
langgraph.pregel.remote import RemoteGraph - First argument is the
graph_idmatching the key inlanggraph.json - Supports
invoke,ainvoke,stream,astream,astream_events - Use
thread_idin config for stateful conversations - Can be added as a node in a parent graph for distributed orchestration
- Handle
httpx.ConnectErrorandhttpx.TimeoutExceptionfor network failures
Reference: RemoteGraph
Define conditional edge routing with explicit mappings and END fallback — HIGH
Conditional Edge Routing
Route workflow execution dynamically based on state. Always include END or fallback.
Incorrect:
def route(state) -> str:
if state["quality_score"] >= 0.8:
return "publish"
elif state["retry_count"] < 3:
return "retry"
# No fallback — if quality < 0.8 AND retries exhausted, returns None → runtime error
workflow.add_conditional_edges("check", route)
# No explicit mapping — unclear what routes existCorrect:
from langgraph.graph import END
def route_based_on_quality(state: WorkflowState) -> str:
if state["quality_score"] >= 0.8:
return "publish"
elif state["retry_count"] < 3:
return "retry"
else:
return "manual_review" # Always have a fallback
workflow.add_conditional_edges(
"quality_check",
route_based_on_quality,
{
"publish": "publish_node",
"retry": "generator",
"manual_review": "review_queue",
}
)Routing patterns reference:
Sequential: A → B → C (simple edges)
Branching: A → (B or C) (conditional edges)
Looping: A → B → A (retry logic)
Convergence: (A or B) → C (multiple inputs)
Diamond: A → (B, C) → D (parallel then merge)Key rules:
- Router functions must be pure — no side effects
- Always provide explicit edge mapping dict for clarity
- Always include END or fallback condition
- Keep router logic lightweight
Reference: LangGraph Conditional Edges
Use Command(graph=...) for cross-graph navigation between parent and sibling subgraphs — HIGH
Cross-Graph Navigation
Use Command(graph=Command.PARENT) to navigate from a subgraph node to the parent graph. Use Command(graph="sibling_name") for cross-graph routing between sibling subgraphs. Available in LangGraph v0.2.58+.
Incorrect — routing stays within subgraph:
def child_node(state: ChildState) -> Command[Literal["escalate"]]:
if state["needs_escalation"]:
return Command(goto="escalate") # Looks for "escalate" in child graph — fails!Correct — Command.PARENT for child-to-parent:
from langgraph.types import Command
from typing_extensions import Literal
def child_node(state: ChildState) -> Command[Literal["supervisor"]]:
if state["needs_escalation"]:
return Command(
update={"escalation_reason": "Complexity exceeded threshold"},
goto="supervisor",
graph=Command.PARENT, # Navigate to parent graph
)
return Command(update={"result": "handled"}, goto="next_step")Parent graph setup with child subgraph:
from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add
class ParentState(TypedDict):
query: str
escalation_reason: str
results: Annotated[list[str], add] # Reducer required for shared keys
class ChildState(TypedDict):
query: str
escalation_reason: str
# Child subgraph
child_builder = StateGraph(ChildState)
child_builder.add_node("analyze", child_node)
child_builder.add_edge(START, "analyze")
child_graph = child_builder.compile()
# Parent graph
parent_builder = StateGraph(ParentState)
parent_builder.add_node("child_agent", child_graph)
parent_builder.add_node("supervisor", supervisor_node)
parent_builder.add_node("specialist", specialist_node)
parent_builder.add_edge(START, "child_agent")
parent_builder.add_edge("supervisor", "specialist")
parent_builder.add_edge("specialist", END)
app = parent_builder.compile()Cross-graph routing to sibling subgraph:
def agent_a_node(state: AgentAState) -> Command[Literal["agent_b"]]:
if state["needs_different_expertise"]:
return Command(
update={"handoff_context": state["partial_result"]},
goto="agent_b", # Sibling node in parent graph
graph=Command.PARENT, # Navigate up to parent first
)
return Command(update={"result": "done"}, goto=END)State mapping between different schemas:
class ParentState(TypedDict):
query: str
context: Annotated[list[str], add] # Must have reducer for updates from children
class ResearcherState(TypedDict):
query: str
context: str # Different type than parent — mapping happens at boundary
def researcher_node(state: ResearcherState) -> Command[Literal["writer"]]:
finding = research(state["query"])
return Command(
update={"context": [finding]}, # Match parent's list type with reducer
goto="writer",
graph=Command.PARENT,
)Key rules:
Command.PARENTnavigates to the closest parent graph- For sibling routing: go to parent first, then target the sibling node name
- Shared state keys updated via
Command.PARENTmust have reducers in parent state - State types can differ between parent and child — map values at the boundary
- Use cases: escalation, delegation, multi-agent handoff, error propagation
- Requires LangGraph v0.2.58+ (stable in v0.3+)
- Do not wrap
Commandreturns in try/except — it uses exceptions internally
Reference: Cross-graph navigation
Implement retry loops with max counter to prevent infinite resource consumption — HIGH
Retry Loop Pattern
Loop-back edges for retrying failed operations. Always include a max retry counter.
Incorrect:
def should_retry(state) -> str:
if state.get("output"):
return "success"
return "retry" # No max counter — infinite loop if LLM keeps failing
workflow.add_conditional_edges("llm_call", should_retry, {
"success": "next_step",
"retry": "llm_call", # Loops forever
})Correct:
def llm_call_with_retry(state):
try:
result = call_llm(state["input"])
return {"output": result, "retry_count": 0}
except Exception as e:
return {
"retry_count": state.get("retry_count", 0) + 1,
"error": str(e),
}
def should_retry(state) -> str:
if state.get("output"):
return "success"
elif state["retry_count"] < 3:
return "retry"
else:
return "failed" # Max retries exceeded
workflow.add_conditional_edges("llm_call", should_retry, {
"success": "next_step",
"retry": "llm_call",
"failed": "error_handler",
})Key rules:
- Always track
retry_countin state - Set max retries (2-3 for LLM calls)
- Include explicit "failed" path for max retries exceeded
- Consider exponential backoff for API calls
Reference: LangGraph Conditional Edges
Route with embedding similarity or Command API instead of conditional edges — HIGH
Semantic & Command Routing
Use Command API when routing AND updating state. Use semantic routing for intent classification.
Incorrect — split routing + state update:
def router(state):
state["route_reason"] = "high score" # State update in router
if state["score"] > 0.8:
return "approve"
return "reject"
workflow.add_conditional_edges("evaluate", router)
# State update happens but routing is separate — hard to reason aboutCorrect — Command API (2026 pattern):
from langgraph.types import Command
from typing import Literal
def router_with_state(state) -> Command[Literal["approve", "reject"]]:
if state["score"] > 0.8:
return Command(
update={"route_reason": "high score", "routed_at": time.time()},
goto="approve"
)
return Command(
update={"route_reason": "low score", "routed_at": time.time()},
goto="reject"
)
workflow.add_node("evaluate", router_with_state)
# No conditional edges needed — Command handles both state + routingSemantic routing — embedding-based intent classification:
from sentence_transformers import SentenceTransformer
import numpy as np
embedder = SentenceTransformer("all-MiniLM-L6-v2")
ROUTE_EMBEDDINGS = {
"technical": embedder.encode("technical code programming"),
"business": embedder.encode("business strategy revenue"),
"support": embedder.encode("help troubleshoot error fix"),
}
def semantic_router(state) -> str:
query_embedding = embedder.encode(state["query"])
similarities = {
route: np.dot(query_embedding, emb) / (
np.linalg.norm(query_embedding) * np.linalg.norm(emb)
)
for route, emb in ROUTE_EMBEDDINGS.items()
}
best = max(similarities, key=similarities.get)
return "general" if similarities[best] < 0.3 else bestKey rules:
- Use Command when updating state AND routing together
- Use conditional edges when routing only (no state updates)
- Pre-compute embeddings for semantic routing (don't embed on every call)
- Always include a fallback/general route
Reference: LangGraph Command API
Use MessagesState with add_messages reducer to preserve conversation history — CRITICAL
MessagesState Pattern
Use MessagesState or add_messages reducer for any workflow handling conversation messages.
Incorrect:
class AgentState(TypedDict):
messages: list # No reducer — each node REPLACES entire message history
user_id: str
def agent_node(state):
response = llm.invoke(state["messages"])
return {"messages": [response]} # Overwrites all previous messages!Correct:
from langgraph.graph import MessagesState
from langgraph.graph.message import add_messages
from typing import Annotated
# Option 1: Extend built-in MessagesState (recommended)
class AgentState(MessagesState):
user_id: str
context: dict
# Option 2: Manual add_messages reducer
class AgentState(TypedDict):
messages: Annotated[list, add_messages] # Smart append/update by ID
user_id: str
def agent_node(state):
response = llm.invoke(state["messages"])
return {"messages": [response]} # Appends, doesn't overwriteWhy add_messages matters:
- Appends new messages (doesn't overwrite)
- Updates existing messages by matching ID
- Handles message deduplication automatically
Note: MessageGraph is deprecated in LangGraph v1.0.0. Use StateGraph with a messages key instead.
Reference: LangGraph MessagesState
Apply Pydantic state validation at boundaries only to minimize runtime overhead — HIGH
Pydantic State Validation
Use TypedDict internally (lightweight), Pydantic BaseModel at system boundaries (user input, API output).
Incorrect:
from pydantic import BaseModel
# Using Pydantic for ALL internal state — unnecessary runtime validation overhead
class WorkflowState(BaseModel):
input: str
intermediate_result: str = ""
agent_responses: list[dict] = []
# Every node update triggers Pydantic validationCorrect:
from typing import TypedDict, Annotated
from operator import add
from pydantic import BaseModel, Field
# Internal state: TypedDict (no runtime overhead)
class WorkflowState(TypedDict):
input: str
output: str
agent_responses: Annotated[list[dict], add]
# Boundary validation: Pydantic (validates external data)
class WorkflowInput(BaseModel):
input: str = Field(description="User input", min_length=1)
config: dict = Field(default_factory=dict)
class WorkflowOutput(BaseModel):
output: str
confidence: float = Field(ge=0, le=1)Key rules:
- TypedDict for internal graph state (lightweight, no runtime cost)
- Pydantic at boundaries: API inputs, user-facing outputs, LLM structured output
Annotated[list, add]works with TypedDict, not with Pydantic BaseModel
Reference: LangGraph State Concepts
Define custom Annotated reducers for merge, dedup, and last-value state patterns — HIGH
Custom Annotated Reducers
Define custom reducers when operator.add doesn't fit. Common patterns: merge dicts, keep latest, dedup.
Incorrect:
class State(TypedDict):
config: dict # No reducer — later nodes overwrite entire dict
status: str # No reducer — same issue
def node_a(state):
return {"config": {"key_a": "value_a"}}
def node_b(state):
return {"config": {"key_b": "value_b"}}
# config is now {"key_b": "value_b"} — key_a is LOSTCorrect:
from typing import Annotated
def merge_dicts(a: dict, b: dict) -> dict:
"""Custom reducer that deep-merges dictionaries."""
return {**a, **b}
def last_value(a, b):
"""Keep only the latest value."""
return b
class State(TypedDict):
config: Annotated[dict, merge_dicts] # Merges updates from all nodes
status: Annotated[str, last_value] # Explicit: keeps latest value
def node_a(state):
return {"config": {"key_a": "value_a"}}
def node_b(state):
return {"config": {"key_b": "value_b"}}
# config is now {"key_a": "value_a", "key_b": "value_b"}RemainingSteps (2026 pattern) — proactive recursion handling:
from langgraph.types import RemainingSteps
def agent_node(state: WorkflowState, remaining: RemainingSteps):
if remaining.steps < 5:
return {"action": "summarize_and_exit"}
return {"action": "continue"}Key rules:
- Use
Annotated[dict, merge_dicts]for config-style fields - Use
Annotated[str, last_value]to make overwrites explicit - Always return new state from nodes, never mutate in place (breaks checkpointing)
Reference: LangGraph Reducers
Design TypedDict state with Annotated accumulators to prevent silent data loss — CRITICAL
TypedDict State Pattern
Use TypedDict for lightweight internal state. Use Annotated[list[T], add] for any field that multiple nodes write to.
Incorrect:
from typing import TypedDict
class WorkflowState(TypedDict):
findings: list[dict] # No reducer — each node REPLACES the list
def agent_a(state):
return {"findings": [{"source": "a", "result": "..."}]}
def agent_b(state):
return {"findings": [{"source": "b", "result": "..."}]}
# agent_a's findings are LOSTCorrect:
from typing import TypedDict, Annotated
from operator import add
class WorkflowState(TypedDict):
input: str
output: str
findings: Annotated[list[dict], add] # Accumulates across nodes
metadata: dict
def agent_a(state):
return {"findings": [{"source": "a", "result": "..."}]}
def agent_b(state):
return {"findings": [{"source": "b", "result": "..."}]}
# Both findings are preserved: [agent_a's, agent_b's]Context Schema (2026 pattern) — pass runtime config without polluting state:
from dataclasses import dataclass
@dataclass
class ContextSchema:
llm_provider: str = "anthropic"
temperature: float = 0.7
graph = StateGraph(WorkflowState, context_schema=ContextSchema)
def my_node(state: WorkflowState, context: ContextSchema):
return {"output": call_llm(state["input"], context.temperature)}Node Caching (2026 pattern):
from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy
builder.add_node("embed", embed_node, cache_policy=CachePolicy(ttl=300))
graph = builder.compile(cache=InMemoryCache())Key rules:
- Always use
Annotated[list[T], add]for multi-agent accumulation - Return partial state updates from nodes, never mutate in place
- Use
context_schemafor runtime config (temperature, provider) - Use
CachePolicyfor expensive operations (embeddings, API calls)
Reference: LangGraph State Concepts
Emit custom streaming events to provide client visibility into node progress — MEDIUM
Custom Event Streaming
Emit progress events from nodes using get_stream_writer(). Consume via stream_mode="custom".
Incorrect — no progress visibility:
def process_items(state):
for item in state["items"]:
result = process(item) # Client sees nothing until ALL items done
return {"results": results}Correct — custom progress events:
from langgraph.config import get_stream_writer
def node_with_progress(state):
writer = get_stream_writer()
for i, item in enumerate(state["items"]):
writer({
"type": "progress",
"current": i + 1,
"total": len(state["items"]),
"status": f"Processing {item}",
})
result = process(item)
writer({"type": "complete", "message": "All items processed"})
return {"results": results}
# Consume custom events
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
if mode == "custom":
if chunk.get("type") == "progress":
print(f"Progress: {chunk['current']}/{chunk['total']}")
elif mode == "updates":
print(f"State updated: {list(chunk.keys())}")FastAPI SSE integration:
@app.post("/stream")
async def stream_workflow(request: WorkflowRequest):
async def event_generator():
async for mode, chunk in graph.astream(
request.inputs, stream_mode=["updates", "custom"]
):
yield f"data: {json.dumps({'mode': mode, 'data': chunk})}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")Key rules:
- Use
get_stream_writer()to emit custom events from any node - Events should have a
typefield for client-side routing - Include progress data (current, total) for progress bars
- Combine with
stream_mode=["updates", "custom"]for comprehensive output
Reference: LangGraph Streaming
Choose the right stream mode for efficient real-time UI integration — MEDIUM
Stream Modes
LangGraph provides 5 stream modes. Choose based on your use case.
| Mode | Purpose | Use Case |
|---|---|---|
values | Full state after each step | Debugging, state inspection |
updates | State deltas after each step | Efficient UI updates |
messages | LLM tokens + metadata | Chat interfaces, typing indicators |
custom | User-defined events | Progress bars, status updates |
debug | Maximum information | Development, troubleshooting |
Incorrect — default mode only:
for chunk in graph.stream(inputs): # defaults to "values" — full state every step
print(chunk) # Massive output, hard to parseCorrect — multiple modes for comprehensive feedback:
async for mode, chunk in graph.astream(
inputs,
stream_mode=["updates", "custom", "messages"]
):
match mode:
case "updates":
update_ui_state(chunk)
case "custom":
show_progress(chunk)
case "messages":
append_to_chat(chunk)Subgraph streaming:
for namespace, chunk in graph.stream(inputs, subgraphs=True, stream_mode="updates"):
print(f"[{'/'.join(namespace) or 'root'}] {chunk}")Key rules:
- Use
["updates", "custom"]for most UIs - Use
"messages"for chat interfaces - Enable
subgraphs=Truefor complex nested workflows - Combine multiple modes in a list for comprehensive output
Reference: LangGraph Streaming
Filter LLM token streams by node or tags to avoid cross-node noise — MEDIUM
LLM Token Streaming
Stream LLM tokens in real-time. Filter by node name or model tags to control output.
Incorrect — no filtering, noisy output:
for msg, meta in graph.stream(inputs, stream_mode="messages"):
print(msg.content, end="") # Shows tokens from ALL LLM calls — mixed outputCorrect — filtered by node:
for msg, meta in graph.stream(inputs, stream_mode="messages"):
if meta["langgraph_node"] == "writer_agent":
print(msg.content, end="", flush=True)Filtered by tags (more flexible):
model = init_chat_model("claude-sonnet-4-20250514", tags=["main_response"])
for msg, meta in graph.stream(inputs, stream_mode="messages"):
if "main_response" in meta.get("tags", []):
print(msg.content, end="", flush=True)Non-LangChain LLM streaming:
from langgraph.config import get_stream_writer
def call_custom_llm(state):
writer = get_stream_writer()
for chunk in your_streaming_client.generate(state["prompt"]):
writer({"type": "llm_token", "content": chunk.text})
return {"response": full_response}Key rules:
- Use
stream_mode="messages"for token-by-token streaming - Filter by
meta["langgraph_node"]ormeta["tags"]to isolate output - Use
flush=Trueon print for real-time display - Use
get_stream_writer()for non-LangChain LLM APIs
Reference: LangGraph Streaming
Add subgraph directly as node when parent and child share the same state — MEDIUM
Add Subgraph as Node (Shared State)
Use when parent and subgraph share the same state schema. Add compiled graph directly as a node.
Incorrect — unnecessary state mapping for shared schema:
def call_agent(state: SharedState):
# Unnecessary transformation when schemas match
input = {"messages": state["messages"], "context": state["context"]}
output = agent_subgraph.invoke(input)
return {"messages": output["messages"], "context": output["context"]}Correct — add compiled graph directly:
from langgraph.graph.message import add_messages
class SharedState(TypedDict):
messages: Annotated[list, add_messages]
context: dict
# Build subgraph with SAME state
agent_builder = StateGraph(SharedState)
agent_builder.add_node("think", think_node)
agent_builder.add_node("act", act_node)
agent_builder.add_edge(START, "think")
agent_builder.add_edge("think", "act")
agent_builder.add_edge("act", END)
agent_subgraph = agent_builder.compile()
# Add compiled subgraph directly as node — no wrapper needed
parent_builder = StateGraph(SharedState)
parent_builder.add_node("agent_team", agent_subgraph)
parent_builder.add_edge(START, "agent_team")
parent_builder.add_edge("agent_team", END)Key rules:
- Use when parent and subgraph have identical or overlapping state schemas
- No state transformation needed — shared keys pass through automatically
- Ideal for agent coordination via message passing
- Reducers (like
add_messages) work across the boundary
Reference: LangGraph Subgraphs
Invoke subgraph from node with state transformation to prevent schema mismatch — MEDIUM
Invoke Subgraph from Node
Use when subgraph needs completely isolated state (different schema from parent).
Incorrect — no state transformation:
def call_analysis(state: ParentState):
result = analysis_subgraph.invoke(state) # Schema mismatch! KeyError
return result # Wrong schema returned to parentCorrect — explicit state mapping at boundaries:
class ParentState(TypedDict):
query: str
analysis_result: dict
class AnalysisState(TypedDict):
input_text: str
findings: list[str]
score: float
# Build and compile subgraph
analysis_builder = StateGraph(AnalysisState)
analysis_builder.add_node("analyze", analyze_node)
analysis_builder.add_node("score", score_node)
analysis_builder.add_edge(START, "analyze")
analysis_builder.add_edge("analyze", "score")
analysis_builder.add_edge("score", END)
analysis_subgraph = analysis_builder.compile()
def call_analysis(state: ParentState) -> dict:
# Map parent → subgraph state
subgraph_input = {"input_text": state["query"], "findings": [], "score": 0.0}
# Invoke subgraph
subgraph_output = analysis_subgraph.invoke(subgraph_input)
# Map subgraph → parent state
return {
"analysis_result": {
"findings": subgraph_output["findings"],
"score": subgraph_output["score"],
}
}
parent_builder.add_node("analysis", call_analysis)Key rules:
- Transform state at both boundaries (parent→subgraph AND subgraph→parent)
- Compile subgraph separately before adding to parent
- Use for different schemas, private message histories, multi-level nesting
Reference: LangGraph Subgraphs
Propagate config through subgraph state mapping for tracing and checkpointing — MEDIUM
Subgraph State Mapping
Explicit state transforms at boundaries with proper config propagation for tracing and checkpointing.
Incorrect — no config propagation:
def call_subgraph(state: ParentState):
result = subgraph.invoke({"query": state["query"]}) # No config — breaks tracing
return {"result": result["output"]}Correct — config propagation and explicit mapping:
from langgraph.config import get_runnable_config
def call_subgraph_with_mapping(state: ParentState) -> dict:
# 1. Extract relevant data
subgraph_input = {
"query": state["user_query"],
"context": state.get("context", {}),
"history": [],
}
# 2. Propagate config for tracing/checkpointing
config = get_runnable_config()
result = subgraph.invoke(subgraph_input, config)
# 3. Transform output back
return {
"subgraph_result": result["output"],
"metadata": {"subgraph": "analysis", "steps": result.get("step_count", 0)},
}Checkpointing strategies:
# Parent-only (recommended) — propagates to all subgraphs
parent = parent_builder.compile(checkpointer=PostgresSaver(...))
# Independent subgraph memory — for persistent agent histories
agent_subgraph = agent_builder.compile(checkpointer=True)Streaming nested graphs:
for namespace, chunk in graph.stream(inputs, subgraphs=True, stream_mode="updates"):
depth = len(namespace)
print(f"{' ' * depth}[{'/'.join(namespace) or 'root'}] {chunk}")Key rules:
- Always propagate config with
get_runnable_config()for tracing - Parent-only checkpointer is sufficient for most cases
- Use
subgraphs=Truein stream/get_state for nested visibility - Keep state mapping explicit and documented at each boundary
Reference: LangGraph Subgraphs
Build supervisor-worker pattern with Command API for coordinated state routing — HIGH
Supervisor-Worker Pattern
Central supervisor routes to specialized workers. Use Command API for combined state update + routing.
Incorrect — missing edges and deprecated API:
workflow.set_entry_point("supervisor") # Deprecated!
def supervisor(state):
state["next"] = "analyzer" # Mutating state directly
return state
# No worker → supervisor edges — workers exit after runningCorrect — Command API (2026 pattern):
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from typing import Literal
def supervisor(state) -> Command[Literal["analyzer", "validator", END]]:
if "analyzer" not in state["agents_completed"]:
return Command(
update={"current_agent": "analyzer"},
goto="analyzer"
)
elif "validator" not in state["agents_completed"]:
return Command(
update={"current_agent": "validator"},
goto="validator"
)
return Command(update={"status": "complete"}, goto=END)
def analyzer(state):
result = analyze(state["input"])
return {"results": [result], "agents_completed": ["analyzer"]}
graph = StateGraph(WorkflowState)
graph.add_node("supervisor", supervisor)
graph.add_node("analyzer", analyzer)
graph.add_node("validator", validator)
graph.add_edge(START, "supervisor") # Entry point
graph.add_edge("analyzer", "supervisor") # Workers return to supervisor
graph.add_edge("validator", "supervisor")
# No conditional edges needed — Command handles routing
app = graph.compile()Key rules:
- Use
add_edge(START, "supervisor")notset_entry_point()(deprecated) - Use
Commandwhen updating state AND routing together - Every worker must have an edge back to supervisor
- Always include END condition to prevent infinite loops
- Keep supervisor logic lightweight (routing only, no heavy computation)
Reference: LangGraph Supervisor
Route supervisor tasks by priority to ensure critical-first execution order — HIGH
Priority-Based Supervisor Routing
Route by priority instead of round-robin when execution order matters (e.g., security before implementation).
Incorrect — round-robin ignores priority:
ALL_AGENTS = ["tutorial", "security", "tech", "implementation"]
def supervisor(state):
completed = set(state["agents_completed"])
available = [a for a in ALL_AGENTS if a not in completed]
state["next"] = available[0] if available else END # tutorial runs before security!
return stateCorrect — priority-ordered execution:
AGENT_PRIORITIES = {
"security": 1, # Run first — block on vulnerabilities
"tech": 2,
"implementation": 3,
"tutorial": 4, # Run last
}
def priority_supervisor(state) -> Command[Literal["security", "tech", "implementation", "tutorial", END]]:
completed = set(state["agents_completed"])
available = [a for a in AGENT_PRIORITIES if a not in completed]
if not available:
return Command(update={"status": "complete"}, goto=END)
next_agent = min(available, key=lambda a: AGENT_PRIORITIES[a])
return Command(
update={"current_agent": next_agent},
goto=next_agent
)LLM-Based Supervisor (2026 pattern):
from pydantic import BaseModel, Field
class SupervisorDecision(BaseModel):
next_agent: Literal["security", "tech", "DONE"]
reasoning: str = Field(description="Brief routing rationale")
async def llm_supervisor(state):
decision = await llm.with_structured_output(SupervisorDecision).ainvoke(prompt)
if decision.next_agent == "DONE":
return Command(goto=END)
return Command(update={"routing_reasoning": decision.reasoning}, goto=decision.next_agent)Key rules:
- Use priority dict when execution order matters
- Use LLM-based routing when priorities are dynamic/context-dependent
- Track
agents_completedlist to prevent infinite loops - 3-8 specialists max per supervisor (avoid coordination overhead)
Reference: LangGraph Supervisor
Dispatch agents in round-robin order with completion tracking to avoid repeats — MEDIUM
Round-Robin Supervisor Dispatch
Visit all agents exactly once before finishing. Track completion to prevent re-dispatch.
Incorrect — no completion tracking:
ALL_AGENTS = ["security", "tech", "implementation"]
def supervisor(state):
# No tracking — may dispatch same agent multiple times
state["next"] = ALL_AGENTS[0]
return stateCorrect — completion-tracked round-robin:
ALL_AGENTS = ["security", "tech", "implementation", "tutorial"]
def supervisor(state) -> Command[Literal[*ALL_AGENTS, "quality_gate", END]]:
completed = set(state["agents_completed"])
available = [a for a in ALL_AGENTS if a not in completed]
if not available:
return Command(goto="quality_gate")
return Command(
update={"current_agent": available[0]},
goto=available[0]
)
def agent_node_factory(agent_name: str):
"""Create agent node that tracks completion."""
async def node(state):
result = await agents[agent_name].run(state["input"])
return {
"results": [result],
"agents_completed": [agent_name],
"current_agent": None,
}
return node
# Register all agents
for name in ALL_AGENTS:
workflow.add_node(name, agent_node_factory(name))
workflow.add_edge(name, "supervisor")Key rules:
- Track
agents_completedasAnnotated[list[str], add]in state - Check available vs completed to determine next agent
- Route to quality gate or END when all agents done
- Use factory function for consistent agent node creation
Reference: LangGraph Multi-Agent
Bind tools to LLMs correctly with proper tool_choice to ensure invocation — CRITICAL
Tool Binding to LLMs
Bind tools to models with bind_tools(). Use tool_choice to control selection.
Incorrect — tools defined but not bound:
@tool
def search_database(query: str) -> str:
"""Search the database."""
return db.search(query)
def agent_node(state):
response = model.invoke(state["messages"]) # Model doesn't know about tools!
return {"messages": [response]}Correct — tools bound to model:
from langchain_core.tools import tool
from langchain_anthropic import ChatAnthropic
@tool
def search_database(query: str) -> str:
"""Search the database for information."""
return db.search(query)
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""Send an email to a recipient."""
email_service.send(to, subject, body)
return f"Email sent to {to}"
tools = [search_database, send_email]
model = ChatAnthropic(model="claude-sonnet-4-20250514")
model_with_tools = model.bind_tools(tools)
def agent_node(state):
response = model_with_tools.invoke(state["messages"])
return {"messages": [response]}Force specific tool:
model.bind_tools(tools, tool_choice="any") # At least one tool
model.bind_tools(tools, tool_choice="search_database") # Specific toolKey rules:
- Always
bind_tools()before invoking the model - Use descriptive
@tooldocstrings — LLM uses them to decide which tool to call - Keep 5-10 tools max per agent (use dynamic selection for more)
- Use
tool_choicewhen a specific tool is required
Reference: LangGraph Tool Calling
Filter tools dynamically by relevance to avoid context overflow from large sets — HIGH
Dynamic Tool Selection
When you have many tools, select the most relevant subset per query using embeddings.
Incorrect — all tools always bound:
# 50 tools bound — floods context, LLM makes poor choices
model_with_all = model.bind_tools(all_50_tools)Correct — dynamic selection by relevance:
from sentence_transformers import SentenceTransformer
embedder = SentenceTransformer("all-MiniLM-L6-v2")
# Pre-compute tool embeddings (once at startup)
TOOL_EMBEDDINGS = {
tool.name: embedder.encode(tool.description)
for tool in all_tools
}
def select_relevant_tools(query: str, all_tools: list, top_k: int = 5) -> list:
query_embedding = embedder.encode(query)
similarities = [
(tool, cosine_similarity(query_embedding, TOOL_EMBEDDINGS[tool.name]))
for tool in all_tools
]
sorted_tools = sorted(similarities, key=lambda x: x[1], reverse=True)
return [tool for tool, _ in sorted_tools[:top_k]]
def agent_with_dynamic_tools(state):
relevant_tools = select_relevant_tools(
state["messages"][-1].content, all_tools, top_k=5
)
model_bound = model.bind_tools(relevant_tools)
response = model_bound.invoke(state["messages"])
return {"messages": [response]}Key rules:
- Pre-compute embeddings at startup (not per request)
- Select top 5-10 tools per query
- Use cosine similarity for relevance scoring
- Fall back to general tools if no strong match
Reference: LangGraph Tool Calling
Gate dangerous tool execution with interrupt-based human approval before proceeding — CRITICAL
Tool Interrupt Approval Gates
Use interrupt() inside tool functions for human approval before destructive operations.
Incorrect — no approval for dangerous operation:
@tool
def delete_user(user_id: str) -> str:
"""Delete a user account."""
db.delete_user(user_id) # Executes immediately without approval!
return f"User {user_id} deleted"Correct — interrupt for approval:
from langgraph.types import interrupt
@tool
def delete_user(user_id: str) -> str:
"""Delete a user account. Requires approval."""
response = interrupt({
"action": "delete_user",
"user_id": user_id,
"message": f"Approve deletion of user {user_id}?",
"risk_level": "high",
})
if response.get("approved"):
db.delete_user(user_id)
return f"User {user_id} deleted successfully"
return "Deletion cancelled by user"
@tool
def transfer_funds(from_account: str, to_account: str, amount: float) -> str:
"""Transfer funds. Requires approval for large amounts."""
if amount > 1000:
response = interrupt({
"action": "transfer_funds",
"amount": amount,
"message": f"Approve transfer of ${amount}?",
})
if not response.get("approved"):
return "Transfer cancelled"
execute_transfer(from_account, to_account, amount)
return f"Transferred ${amount}"Streaming from tools:
from langgraph.config import get_stream_writer
@tool
def long_running_analysis(data: str) -> str:
writer = get_stream_writer()
writer({"status": "starting", "progress": 0})
for i, chunk in enumerate(process_chunks(data)):
writer({"status": "processing", "progress": (i + 1) * 10})
return "Analysis complete"Key rules:
- Use
interrupt()for any destructive or high-risk tool - Return error strings from tools, don't raise exceptions (lets agent recover)
- Place side effects AFTER interrupt calls (not before)
- Use
get_stream_writer()for long-running tool progress
Reference: LangGraph Human-in-the-Loop
Use ToolNode for automatic parallel tool execution with built-in error handling — CRITICAL
ToolNode Execution
Use ToolNode for automatic tool execution with parallel support and error handling.
Incorrect — manual tool dispatch:
def execute_tools(state):
last_message = state["messages"][-1]
results = []
for tool_call in last_message.tool_calls:
# Manual dispatch — sequential, no error handling
tool = tools_dict[tool_call["name"]]
result = tool.invoke(tool_call["args"])
results.append(result)
return {"messages": results}Correct — ToolNode handles everything:
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, START, END, MessagesState
tool_node = ToolNode(tools) # Parallel execution built-in
builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)
def should_continue(state) -> str:
last_message = state["messages"][-1]
if last_message.tool_calls:
return "tools"
return END
builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})
builder.add_edge("tools", "agent") # Return to agent after tool execution
graph = builder.compile()Key rules:
ToolNodeexecutes multiple tool calls in parallel automatically- Results returned in order matching the original tool_calls
- Always add
tools → agentedge for the ReAct loop - Route based on
tool_callspresence in last message
Reference: LangGraph ToolNode
Issue Progress Tracking
Auto-updates GitHub issues with commit progress. Use when starting work on an issue, tracking progress during implementation, or completing work with a PR.
Llm Integration
LLM integration patterns for function calling, streaming responses, local inference with Ollama, and fine-tuning customization. Use when implementing tool use, SSE streaming, local model deployment, LoRA/QLoRA fine-tuning, or multi-provider LLM APIs.
Last updated on