Skip to main content
OrchestKit v6.7.1 — 67 skills, 38 agents, 77 hooks with Opus 4.6 support
OrchestKit
Skills

Langgraph

LangGraph workflow patterns for state management, routing, parallel execution, supervisor-worker, tool calling, checkpointing, human-in-loop, streaming, subgraphs, and functional API. Use when building LangGraph pipelines, multi-agent systems, or AI workflows.

Reference high

Primary Agent: workflow-architect

LangGraph Workflow Patterns

Comprehensive patterns for building production LangGraph workflows. Each category has individual rule files in rules/ loaded on-demand.

Quick Reference

CategoryRulesImpactWhen to Use
State Management4CRITICALDesigning workflow state schemas, accumulators, reducers
Routing & Branching4HIGHDynamic routing, retry loops, semantic routing, cross-graph
Parallel Execution3HIGHFan-out/fan-in, map-reduce, concurrent agents
Supervisor Patterns3HIGHCentral coordinators, round-robin, priority dispatch
Tool Calling4CRITICALBinding tools, ToolNode, dynamic selection, approvals
Checkpointing3HIGHPersistence, recovery, cross-thread Store memory
Human-in-Loop3MEDIUMApproval gates, feedback loops, interrupt/resume
Streaming3MEDIUMReal-time updates, token streaming, custom events
Subgraphs3MEDIUMModular composition, nested graphs, state mapping
Functional API3MEDIUM@entrypoint/@task decorators, migration from StateGraph
Platform3HIGHDeployment, RemoteGraph, double-texting strategies

Total: 36 rules across 11 categories

State Management

State schemas determine how data flows between nodes. Wrong schemas cause silent data loss.

RuleFileKey Pattern
TypedDict Staterules/state-typeddict.mdTypedDict + Annotated[list, add] for accumulators
Pydantic Validationrules/state-pydantic.mdBaseModel at boundaries, TypedDict internally
MessagesStaterules/state-messages.mdMessagesState or add_messages reducer
Custom Reducersrules/state-reducers.mdAnnotated[T, reducer_fn] for merge/overwrite

Routing & Branching

Control flow between nodes. Always include END fallback to prevent hangs.

RuleFileKey Pattern
Conditional Edgesrules/routing-conditional.mdadd_conditional_edges with explicit mapping
Retry Loopsrules/routing-retry-loops.mdLoop-back edges with max retry counter
Semantic Routingrules/routing-semantic.mdEmbedding similarity or Command API routing
Cross-Graph Navigationrules/routing-cross-graph.mdCommand(graph=Command.PARENT) for parent/sibling routing

Parallel Execution

Run independent nodes concurrently. Use Annotated[list, add] to accumulate results.

RuleFileKey Pattern
Fan-Out/Fan-Inrules/parallel-fanout-fanin.mdSend API for dynamic parallel branches
Map-Reducerules/parallel-map-reduce.mdasyncio.gather + result aggregation
Error Isolationrules/parallel-error-isolation.mdreturn_exceptions=True + per-branch timeout

Supervisor Patterns

Central coordinator routes to specialized workers. Workers return to supervisor.

RuleFileKey Pattern
Basic Supervisorrules/supervisor-basic.mdCommand API for state update + routing
Priority Routingrules/supervisor-priority.mdPriority dict ordering agent execution
Round-Robinrules/supervisor-round-robin.mdCompletion tracking with agents_completed

Tool Calling

Integrate function calling into LangGraph agents. Keep tools under 10 per agent.

RuleFileKey Pattern
Tool Bindingrules/tools-bind.mdmodel.bind_tools(tools) + tool_choice
ToolNode Executionrules/tools-toolnode.mdToolNode(tools) prebuilt parallel executor
Dynamic Selectionrules/tools-dynamic.mdEmbedding-based tool relevance filtering
Tool Interruptsrules/tools-interrupts.mdinterrupt() for approval gates on tools

Checkpointing

Persist workflow state for recovery and debugging.

RuleFileKey Pattern
Checkpointer Setuprules/checkpoints-setup.mdMemorySaver dev / PostgresSaver prod
State Recoveryrules/checkpoints-recovery.mdthread_id resume + get_state_history
Cross-Thread Storerules/checkpoints-store.mdStore for long-term memory across threads

Human-in-Loop

Pause workflows for human intervention. Requires checkpointer for state persistence.

RuleFileKey Pattern
Interrupt/Resumerules/human-in-loop-interrupt.mdinterrupt() function + Command(resume=)
Approval Gaterules/human-in-loop-approval.mdinterrupt_before + state update + resume
Feedback Looprules/human-in-loop-feedback.mdIterative interrupt until approved

Streaming

Real-time updates and progress tracking for workflows.

RuleFileKey Pattern
Stream Modesrules/streaming-modes.md5 modes: values, updates, messages, custom, debug
Token Streamingrules/streaming-tokens.mdmessages mode with node/tag filtering
Custom Eventsrules/streaming-custom-events.mdget_stream_writer() for progress events

Subgraphs

Compose modular, reusable workflow components with nested graphs.

RuleFileKey Pattern
Invoke from Noderules/subgraphs-invoke.mdDifferent schemas, explicit state mapping
Add as Noderules/subgraphs-add-as-node.mdShared state, add_node(name, compiled_graph)
State Mappingrules/subgraphs-state-mapping.mdBoundary transforms between parent/child

Functional API

Build workflows using @entrypoint and @task decorators instead of explicit graph construction.

RuleFileKey Pattern
@entrypointrules/functional-entrypoint.mdWorkflow entry point with optional checkpointer
@taskrules/functional-task.mdReturns futures, .result() to block
Migrationrules/functional-migration.mdStateGraph to Functional API conversion

Platform

Deploy graphs as managed APIs with persistence, streaming, and multi-tenancy.

RuleFileKey Pattern
Deploymentrules/platform-deployment.mdlanggraph.json + CLI + Assistants API
RemoteGraphrules/platform-remote-graph.mdRemoteGraph for calling deployed graphs
Double Textingrules/platform-double-texting.md4 strategies: reject, rollback, enqueue, interrupt

Quick Start Example

from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from typing import TypedDict, Annotated, Literal
from operator import add

class State(TypedDict):
    input: str
    results: Annotated[list[str], add]

def supervisor(state) -> Command[Literal["worker", END]]:
    if not state.get("results"):
        return Command(update={"input": state["input"]}, goto="worker")
    return Command(goto=END)

def worker(state) -> dict:
    return {"results": [f"Processed: {state['input']}"]}

graph = StateGraph(State)
graph.add_node("supervisor", supervisor)
graph.add_node("worker", worker)
graph.add_edge(START, "supervisor")
graph.add_edge("worker", "supervisor")
app = graph.compile()

2026 Key Patterns

  • Command API: Use Command(update=..., goto=...) when updating state AND routing together
  • context_schema: Pass runtime config (temperature, provider) without polluting state
  • CachePolicy: Cache expensive node results with TTL via InMemoryCache
  • RemainingSteps: Proactively handle recursion limits
  • Store: Cross-thread memory separate from Checkpointer (thread-scoped)
  • interrupt(): Dynamic interrupts inside node logic (replaces interrupt_before for conditional cases)
  • add_edge(START, node): Not set_entry_point() (deprecated)

Key Decisions

DecisionRecommendation
State typeTypedDict internally, Pydantic at boundaries
Entry pointadd_edge(START, node) not set_entry_point()
Routing + state updateCommand API
Routing onlyConditional edges
AccumulatorsAnnotated[list[T], add] always
Dev checkpointerMemorySaver
Prod checkpointerPostgresSaver
Short-term memoryCheckpointer (thread-scoped)
Long-term memoryStore (cross-thread, namespaced)
Max parallel branches5-10 concurrent
Tools per agent5-10 max (dynamic selection for more)
Approval gatesinterrupt() for high-risk operations
Stream modes["updates", "custom"] for most UIs
Subgraph patternInvoke for isolation, Add-as-Node for shared state
Functional vs GraphFunctional for simple flows, Graph for complex topology

Common Mistakes

  1. Forgetting add reducer (overwrites instead of accumulates)
  2. Mutating state in place (breaks checkpointing)
  3. No END fallback in routing (workflow hangs)
  4. Infinite retry loops (no max counter)
  5. Side effects in router functions
  6. Too many tools per agent (context overflow)
  7. Raising exceptions in tools (crashes agent loop)
  8. No checkpointer in production (lose progress on crash)
  9. Wrapping interrupt() in try/except (breaks the mechanism)
  10. Not transforming state at subgraph boundaries
  11. Forgetting .result() on Functional API tasks
  12. Using set_entry_point() (deprecated, use add_edge(START, ...))

Evaluations

See test-cases.json for consolidated test cases across all categories.

  • ork:agent-orchestration - Higher-level multi-agent coordination, ReAct loop patterns, and framework comparisons
  • temporal-io - Durable execution alternative
  • ork:llm-integration - General LLM function calling
  • type-safety-validation - Pydantic model patterns

Rules (36)

Recover interrupted workflow state and debug checkpoint history — HIGH

State Recovery and Debugging

Resume interrupted workflows and inspect checkpoint history for debugging.

Incorrect — no recovery handling:

# If this crashes at step 5 of 10, all progress is lost
result = app.invoke(initial_state)

Correct — automatic recovery:

import logging

async def run_with_recovery(workflow_id: str, initial_state: dict):
    """Run workflow with automatic recovery from checkpoint."""
    config = {"configurable": {"thread_id": workflow_id}}

    try:
        state = app.get_state(config)
        if state.values:
            logging.info(f"Resuming workflow {workflow_id}")
            return app.invoke(None, config=config)  # None = resume from checkpoint
    except Exception:
        pass  # No existing checkpoint

    logging.info(f"Starting new workflow {workflow_id}")
    return app.invoke(initial_state, config=config)

Debugging with checkpoint history:

# Get all checkpoints for a workflow
config = {"configurable": {"thread_id": "analysis-123"}}
for checkpoint in app.get_state_history(config):
    print(f"Step: {checkpoint.metadata['step']}")
    print(f"Node: {checkpoint.metadata['source']}")
    print(f"State: {checkpoint.values}")

# Rollback to previous checkpoint
history = list(app.get_state_history(config))
previous = history[1]  # One step back
app.update_state(config, previous.values)

Graph Migrations (2026): LangGraph handles topology changes automatically — adding/removing nodes, adding/removing state keys. Limitation: can't remove a node if a thread is interrupted at that node.

Key rules:

  • Pass None as input to resume from checkpoint
  • Use get_state_history() to inspect all checkpoints
  • Use update_state() for rollback/manual state correction
  • Clean up old checkpoints (TTL-based or keep-latest-N)

Reference: LangGraph Persistence

Configure persistent checkpointer to survive crashes in production workflows — HIGH

Checkpointer Setup

Use MemorySaver for development, PostgresSaver for production.

Incorrect — no checkpointer:

app = workflow.compile()  # No checkpointer — progress lost on crash
result = app.invoke(state)  # Can't resume if interrupted

Correct — environment-appropriate checkpointer:

from langgraph.checkpoint.memory import MemorySaver
from langgraph.checkpoint.postgres import PostgresSaver

# Development: In-memory (fast, no setup)
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)

# Production: PostgreSQL (shared, durable)
checkpointer = PostgresSaver.from_conn_string("postgresql://...")
app = workflow.compile(checkpointer=checkpointer)

# Invoke with thread_id for resumability
config = {"configurable": {"thread_id": "analysis-123"}}
result = app.invoke(initial_state, config=config)

Key rules:

  • Always use a checkpointer in production
  • Use deterministic thread_id (not random UUID) so you can resume
  • Checkpointer saves state after each node execution
  • Add interrupt_before for manual review points

Reference: LangGraph Persistence

Use Store for cross-thread memory instead of per-thread checkpoints — HIGH

Cross-Thread Store Memory

Checkpointer = short-term (thread-scoped). Store = long-term (cross-thread, namespaced).

Incorrect — preferences in checkpointer only:

# User preferences stored in thread-1 state
# When user starts thread-2, preferences are lost!
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
app = workflow.compile(checkpointer=checkpointer)

Correct — Store for cross-thread memory:

from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.store.postgres import PostgresStore

# Checkpointer = SHORT-TERM (thread-scoped)
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)

# Store = LONG-TERM (cross-thread, namespaced)
store = PostgresStore.from_conn_string(DATABASE_URL)

# Compile with BOTH
app = workflow.compile(checkpointer=checkpointer, store=store)

Using Store in nodes:

from langgraph.store.base import BaseStore

async def agent_with_memory(state: AgentState, *, store: BaseStore):
    user_id = state["user_id"]

    # Read cross-thread memory
    memories = await store.aget(namespace=("users", user_id), key="preferences")
    if memories and memories.value.get("prefers_concise"):
        state["system_prompt"] += "\nBe concise."

    # Write cross-thread memory
    await store.aput(
        namespace=("users", user_id),
        key="last_topic",
        value={"topic": state["current_topic"], "timestamp": datetime.now().isoformat()}
    )
    return state

Memory architecture:

Thread 1 (chat-001)    Thread 2 (chat-002)
┌─────────────────┐    ┌─────────────────┐
│ Checkpointer    │    │ Checkpointer    │
│ - msg history   │    │ - msg history   │
│ - workflow pos  │    │ - workflow pos  │
└─────────────────┘    └─────────────────┘
         ↓ shared ↓
┌─────────────────────────────────────┐
│     Store (cross-thread)            │
│  namespace=("users", "alice")       │
│  - preferences, last_topic          │
└─────────────────────────────────────┘

Key rules:

  • Checkpointer for conversation history and workflow position (thread-scoped)
  • Store for user preferences, learned facts, settings (cross-thread)
  • Always use namespaces in Store to prevent data collisions
  • Clean up old checkpoints but keep Store data persistent

Reference: LangGraph Memory

Configure @entrypoint decorator with checkpointer for resumable workflows — MEDIUM

@entrypoint Decorator

Define workflow entry points with optional checkpointing. Simpler than explicit StateGraph construction.

Incorrect — no checkpointer for resumable workflow:

@entrypoint()
def my_workflow(data: str) -> str:
    result = expensive_task(data).result()
    return result
    # If interrupted, all progress lost — no checkpointer

Correct — with checkpointer:

from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def my_workflow(data: str) -> str:
    result = expensive_task(data).result()
    return result

# Invoke with thread_id for resumability
config = {"configurable": {"thread_id": "session-123"}}
result = my_workflow.invoke("input", config)

Human-in-the-loop with @entrypoint:

from langgraph.types import interrupt, Command

@entrypoint(checkpointer=checkpointer)
def approval_workflow(request: dict) -> dict:
    result = analyze_request(request).result()

    approved = interrupt({"question": "Approve?", "details": result})

    if approved:
        return execute_action(result).result()
    return {"status": "rejected"}

# Resume after human review
for chunk in approval_workflow.stream(Command(resume=True), config):
    print(chunk)

Graph API vs Functional API:

  • Functional: Sequential workflows, orchestrator-worker, simpler debugging
  • Graph: Complex topology, dynamic routing, subgraph composition

Key rules:

  • Add checkpointer when workflow needs persistence/resume
  • Functional API builds graph implicitly from task call order
  • @entrypoint is the workflow entry — orchestrates @task functions
  • Regular functions (no decorator) execute normally, not tracked

Reference: LangGraph Functional API

Migrate from StateGraph to Functional API while preserving routing flexibility — MEDIUM

StateGraph to Functional API Migration

Convert simple StateGraph workflows to Functional API. Keep complex topologies as StateGraph.

Before — Graph API:

from langgraph.graph import StateGraph

def node_a(state):
    return {"data": process(state["input"])}

def node_b(state):
    return {"result": transform(state["data"])}

graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_edge("a", "b")
app = graph.compile()

After — Functional API:

from langgraph.func import entrypoint, task

@task
def process_data(input: str) -> str:
    return process(input)

@task
def transform_data(data: str) -> str:
    return transform(data)

@entrypoint()
def workflow(input: str) -> str:
    data = process_data(input).result()
    return transform_data(data).result()

Orchestrator-worker migration:

@task
def plan(topic: str) -> list[str]:
    return planner.invoke(f"Create outline for: {topic}")

@task
def write_section(section: str) -> str:
    return llm.invoke(f"Write section: {section}")

@entrypoint()
def report_workflow(topic: str) -> str:
    sections = plan(topic).result()
    section_futures = [write_section(s) for s in sections]  # Fan-out
    completed = [f.result() for f in section_futures]         # Fan-in
    return "\n\n".join(completed)

TypeScript equivalent:

import { entrypoint, task, MemorySaver } from "@langchain/langgraph";

const processData = task("processData", async (data: string) => transform(data));

const workflow = entrypoint(
  { name: "myWorkflow", checkpointer: new MemorySaver() },
  async (input: string) => {
    const result = await processData(input);
    return result;
  }
);

When NOT to migrate:

  • Complex dynamic routing (conditional edges, semantic routing)
  • Subgraph composition with different state schemas
  • Graph topologies that aren't linear/tree-shaped

Incorrect — using Graph API for simple linear workflow:

from langgraph.graph import StateGraph

def node_a(state):
    return {"data": process(state["input"])}

def node_b(state):
    return {"result": transform(state["data"])}

# Verbose for simple linear flow
graph = StateGraph(State)
graph.add_node("a", node_a)
graph.add_node("b", node_b)
graph.add_edge("a", "b")
app = graph.compile()

Correct — Functional API for simple workflows:

from langgraph.func import entrypoint, task

@task
def process_data(input: str) -> str:
    return process(input)

@task
def transform_data(data: str) -> str:
    return transform(data)

@entrypoint()
def workflow(input: str) -> str:
    data = process_data(input).result()  # Clean, simple
    return transform_data(data).result()

Key rules:

  • Functional API is best for sequential and orchestrator-worker patterns
  • Use Graph API when you need complex topology (loops, diamonds, dynamic routing)
  • Both APIs support checkpointing, streaming, and human-in-the-loop
  • Futures enable implicit parallelism without explicit Send API

Reference: LangGraph Functional API

Use @task decorator correctly and always call .result() on futures — MEDIUM

@task Decorator and Futures

@task functions return futures. Call .result() to get the value. Launch multiple tasks before calling .result() for parallelism.

Incorrect — forgetting .result():

@task
def process(data: str) -> str:
    return transform(data)

@entrypoint()
def workflow(data: str) -> str:
    result = process(data)  # result is a Future, not a string!
    return result  # Returns Future object, not the processed string

Correct — parallel execution with futures:

@task
def fetch_source_a(query: str) -> dict:
    return api_a.search(query)

@task
def fetch_source_b(query: str) -> dict:
    return api_b.search(query)

@entrypoint()
def parallel_search(query: str) -> dict:
    # Launch in parallel — futures start immediately
    future_a = fetch_source_a(query)
    future_b = fetch_source_b(query)

    # Block on both results
    results = [future_a.result(), future_b.result()]
    return {"combined": results}

Map over collection:

@task
def process_item(item: dict) -> dict:
    return transform(item)

@entrypoint()
def batch_workflow(items: list[dict]) -> list[dict]:
    futures = [process_item(item) for item in items]  # All launch in parallel
    return [f.result() for f in futures]  # Collect all results

Key rules:

  • @task returns a future — always call .result() to get the value
  • Launch tasks before blocking on .result() for parallel execution
  • Tasks inside @entrypoint are tracked for persistence and streaming
  • Don't nest @entrypoint inside @entrypoint

Reference: LangGraph Functional API

Implement approval gate pattern with reject paths to prevent dead-end workflows — MEDIUM

Approval Gate Pattern

Use interrupt_before for static approval points. Update state and resume.

Incorrect — no reject path:

app = workflow.compile(interrupt_before=["publish"])

# Human reviews...
state.values["approved"] = True
app.update_state(config, state.values)
result = app.invoke(None, config)
# What if human rejects? No path!

Correct — approval with approve/reject paths:

def approval_gate(state) -> str:
    if not state.get("human_reviewed"):
        return state  # Pauses here due to interrupt_before

    if state["approved"]:
        return {"next": "publish"}
    else:
        return {"next": "revise"}

# Compile with interrupt
app = workflow.compile(interrupt_before=["approval_gate"])

# Step 1: Run until approval gate
config = {"configurable": {"thread_id": "doc-123"}}
result = app.invoke({"topic": "AI"}, config=config)

# Step 2: Human reviews
state = app.get_state(config)
print(f"Draft: {state.values['draft']}")

# Step 3: Human decides
app.update_state(config, {
    "approved": True,
    "feedback": "Looks good",
    "human_reviewed": True,
})

# Step 4: Resume
result = app.invoke(None, config=config)

Multiple approval points:

app = workflow.compile(interrupt_before=["first_review", "final_review"])

Key rules:

  • Always include both approve and reject paths
  • Set timeout for human review (24-48h, auto-reject after)
  • Send notification when workflow pauses (email/Slack)
  • Use get_state() to show current state for review

Reference: LangGraph Human-in-the-Loop

Build iterative feedback loops that refine output across multiple human rounds — MEDIUM

Feedback Loop Pattern

Repeatedly interrupt until human approves, incorporating feedback each iteration.

Incorrect — single feedback, no iteration:

def review(state):
    feedback = get_human_feedback()
    state["feedback"] = feedback
    # Proceeds regardless of whether human approved

Correct — iterative feedback loop:

from langgraph.types import interrupt, Command

async def run_with_feedback(initial_state: dict):
    config = {"configurable": {"thread_id": str(uuid.uuid4())}}

    while True:
        result = app.invoke(initial_state, config=config)

        if "__interrupt__" not in result:
            return result  # Completed without interrupt

        info = result["__interrupt__"][0].value
        print(f"Output: {info.get('output', 'N/A')}")
        feedback = input("Approve? (yes/no/feedback): ")

        if feedback.lower() == "yes":
            return app.invoke(Command(resume={"approved": True}), config=config)
        elif feedback.lower() == "no":
            return {"status": "rejected"}
        else:
            # Incorporate feedback and retry
            initial_state = None
            app.invoke(
                Command(resume={"approved": False, "feedback": feedback}),
                config=config
            )

Input validation loop:

def get_valid_age(state):
    prompt = "What is your age?"
    while True:
        answer = interrupt(prompt)
        if isinstance(answer, int) and 0 < answer < 150:
            return {"age": answer}
        prompt = f"'{answer}' is not valid. Enter 1-150."

Key rules:

  • Track review_count to limit iterations
  • Pass rejection feedback back to generation node
  • Preserve approved partial results between iterations
  • Timeout after max iterations with best-effort result

Reference: LangGraph Human-in-the-Loop

Use interrupt() for conditional pausing and Command for resuming workflows — MEDIUM

Dynamic Interrupt and Resume

Use interrupt() for conditional pausing and Command(resume=) for resuming.

Incorrect — interrupt in try/except:

def approval_node(state):
    try:
        response = interrupt({"question": "Approve?"})  # interrupt raises!
    except Exception:
        response = {"approved": False}  # Catches the interrupt exception — broken

Correct — dynamic interrupt (2026 pattern):

from langgraph.types import interrupt, Command

def approval_node(state):
    """Conditionally interrupt based on risk level."""
    if state["risk_level"] == "high":
        response = interrupt({
            "question": "High-risk action. Approve?",
            "action": state["proposed_action"],
            "risk_level": state["risk_level"],
        })
        if not response.get("approved"):
            return {"status": "rejected", "action": None}

    return {"status": "approved", "action": state["proposed_action"]}

Resume with Command:

config = {"configurable": {"thread_id": "workflow-123"}}

# Initial run — stops at interrupt
result = graph.invoke(initial_state, config)

# Check for interrupt
if "__interrupt__" in result:
    info = result["__interrupt__"][0].value
    print(f"Question: {info['question']}")

    # Resume with user response
    final = graph.invoke(Command(resume={"approved": True}), config)

Critical rules:

  • DO: Place side effects AFTER interrupt calls
  • DO: Make pre-interrupt side effects idempotent (upsert, not create)
  • DO: Keep interrupt call order consistent across executions
  • DON'T: Wrap interrupt in bare try/except
  • DON'T: Conditionally skip interrupt calls (breaks determinism)
  • DON'T: Pass functions or class instances to interrupt()

Reference: LangGraph Human-in-the-Loop

Isolate errors in parallel branches to preserve completed results on failure — HIGH

Error Isolation in Parallel Execution

Isolate failures in parallel branches. Use return_exceptions=True and per-branch timeouts.

Incorrect — one failure kills all:

async def run_agents(state):
    tasks = [agent.analyze(state["content"]) for agent in agents]
    results = await asyncio.gather(*tasks)  # One exception crashes everything
    return {"results": results}

Correct — isolated failures with timeout:

import asyncio

async def parallel_with_isolation(agents: list, content: str, timeout: int = 30):
    """Run agents with per-agent timeout and error isolation."""
    async def run_with_timeout(agent):
        try:
            return await asyncio.wait_for(agent.analyze(content), timeout=timeout)
        except asyncio.TimeoutError:
            return {"agent": agent.name, "error": "timeout"}
        except Exception as e:
            return {"agent": agent.name, "error": str(e)}

    tasks = [run_with_timeout(a) for a in agents]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    successes = [r for r in results if not isinstance(r, Exception) and "error" not in r]
    failures = [r for r in results if isinstance(r, Exception) or "error" in r]

    return {"successes": successes, "failures": failures}

Key rules:

  • Always use return_exceptions=True in asyncio.gather
  • Add per-branch timeout (30-60s) to prevent slow branches blocking
  • Separate successes from failures — partial results are still valuable
  • Log failures for debugging but continue with available results

Reference: LangGraph Parallel Execution

Use Send API for dynamic fan-out and fan-in parallel branches — HIGH

Fan-Out/Fan-In Pattern

Use Send API to dispatch dynamic parallel branches. Results accumulate via Annotated[list, add].

Incorrect — sequential execution disguised as parallel:

def process_all(state):
    results = []
    for task in state["tasks"]:
        result = process(task)  # Sequential!
        results.append(result)
    return {"results": results}

Correct — true parallel with Send API:

from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from typing import TypedDict, Annotated
from operator import add

class OverallState(TypedDict):
    subjects: list[str]
    jokes: Annotated[list[str], add]  # Accumulates from parallel branches

class JokeState(TypedDict):
    subject: str

def continue_to_jokes(state: OverallState) -> list[Send]:
    """Fan-out: create parallel branch for each subject."""
    return [Send("generate_joke", {"subject": s}) for s in state["subjects"]]

def generate_joke(state: JokeState) -> dict:
    """Worker: each runs in parallel, writes to accumulator."""
    joke = llm.invoke(f"Tell a joke about {state['subject']}")
    return {"jokes": [f"{state['subject']}: {joke.content}"]}

builder = StateGraph(OverallState)
builder.add_node("generate_topics", generate_topics)
builder.add_node("generate_joke", generate_joke)
builder.add_edge(START, "generate_topics")
builder.add_conditional_edges("generate_topics", continue_to_jokes)
builder.add_edge("generate_joke", END)  # All branches converge automatically

Key rules:

  • Send API creates true parallel branches (not async, graph-level parallelism)
  • Worker state can differ from overall state (separate TypedDict)
  • Use Annotated[list, add] on the accumulating field in overall state
  • All branches converge automatically when connected to a common next node or END

Reference: LangGraph Send API

Apply map-reduce with asyncio.gather for independent parallel task processing — HIGH

Map-Reduce Pattern

Use asyncio.gather for async parallel processing within a single node, or Send for graph-level parallelism.

Incorrect — sequential in async context:

async def process_all(state):
    results = []
    for item in state["items"]:
        result = await process_item(item)  # Sequential await!
        results.append(result)
    return {"results": results}

Correct — parallel with asyncio.gather:

import asyncio

async def parallel_map(items: list, process_fn) -> list:
    """Map: process all items concurrently."""
    tasks = [asyncio.create_task(process_fn(item)) for item in items]
    return await asyncio.gather(*tasks, return_exceptions=True)

def reduce_results(results: list) -> dict:
    """Reduce: combine all results."""
    successes = [r for r in results if not isinstance(r, Exception)]
    failures = [r for r in results if isinstance(r, Exception)]
    return {
        "total": len(results),
        "passed": len(successes),
        "failed": len(failures),
        "results": successes,
        "errors": [str(e) for e in failures],
    }

async def map_reduce_node(state: State) -> dict:
    results = await parallel_map(state["items"], process_item_async)
    return {"summary": reduce_results(results)}

Key rules:

  • Use asyncio.gather for I/O-bound parallel work within a node
  • Use Send API for graph-level parallelism across nodes
  • Always use return_exceptions=True to prevent one failure killing all
  • Max 5-10 concurrent tasks to avoid overwhelming APIs

Reference: LangGraph Parallel Execution

Configure LangGraph Platform for local dev, Docker builds, and cloud deployment — HIGH

Platform Deployment

LangGraph Platform provides infrastructure for deploying graphs as APIs with persistence, streaming, and multi-tenancy built in.

Incorrect — no configuration file:

# Running graph directly without Platform config
python my_graph.py  # No API server, no persistence, no multi-tenancy

Correct — langgraph.json + CLI:

{
  "dependencies": ["langchain_openai", "./my_package"],
  "graphs": {
    "my_agent": "./my_package/agent.py:graph",
    "my_workflow": "./my_package/workflow.py:app"
  },
  "env": "./.env"
}
# Local development (hot reload, in-memory)
langgraph dev

# Build Docker image for self-hosted deployment
langgraph build -t my-agent:latest

# Deploy to LangGraph Cloud
langgraph deploy

Assistants API — multiple configs from one graph:

from langgraph_sdk import get_client

client = get_client(url="http://localhost:2024")

# Create assistants with different configs from the same graph
creative = await client.assistants.create(
    graph_id="my_agent",
    config={"configurable": {"temperature": 0.9, "model": "claude-sonnet-4-5-20250929"}},
    name="creative-writer",
)

precise = await client.assistants.create(
    graph_id="my_agent",
    config={"configurable": {"temperature": 0.1, "model": "claude-sonnet-4-5-20250929"}},
    name="precise-analyst",
)

Background runs and webhooks:

# Fire-and-forget with webhook callback
run = await client.runs.create(
    thread_id=thread["thread_id"],
    assistant_id=creative["assistant_id"],
    input={"messages": [{"role": "user", "content": "Write a story"}]},
    webhook="https://myapp.com/api/langgraph-callback",
    background=True,
)

@entrypoint for Platform deployments:

from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import MemorySaver

@entrypoint(path="my_workflow", checkpointer=MemorySaver())
def my_workflow(inputs: dict) -> dict:
    result = analyze_task(inputs["query"]).result()
    return {"answer": result}

Authentication middleware:

from langgraph_sdk import Auth

auth = Auth()

@auth.authenticate
async def authenticate(authorization: str) -> str:
    # Validate token, return user_id
    user = await verify_token(authorization)
    return user["id"]

@auth.on
async def authorize(params, user_id: str):
    # Filter resources by user ownership
    params["metadata"] = {"owner": user_id}

Key rules:

  • langgraph.json is required — defines graphs, dependencies, and env vars
  • Use langgraph dev for local development (hot reload, in-memory state)
  • Use langgraph build for Docker images, langgraph deploy for cloud
  • Assistants API creates multiple configurations from a single deployed graph
  • @entrypoint(path=...) registers functional API workflows with Platform
  • Background runs return immediately; use webhooks for completion callbacks
  • Add authentication middleware for multi-tenant deployments

Reference: LangGraph Platform

Handle concurrent user messages with double-texting strategies — MEDIUM

Double Texting

When a user sends a new message while a previous run is still executing, LangGraph Platform provides four strategies to handle the conflict.

Problem — no strategy (race condition):

User: "Analyze Q1 data"     → Run starts...
User: "Actually use Q2 data" → Second run starts on same thread
# Both runs mutate state concurrently — corrupted results

Strategy 1 — Reject (simplest, safest):

from langgraph_sdk import get_client

client = get_client(url="http://localhost:2024")

# New input rejected while run is active
run = await client.runs.create(
    thread_id=thread["thread_id"],
    assistant_id="my_agent",
    input={"messages": [{"role": "user", "content": "Analyze Q2"}]},
    multitask_strategy="reject",
)
# Raises 409 Conflict if a run is already active

Use case: Strict turn-based workflows, form submissions, payment processing.

Strategy 2 — Rollback (cancel and restart):

# Cancel current run, roll back state, start fresh with new input
run = await client.runs.create(
    thread_id=thread["thread_id"],
    assistant_id="my_agent",
    input={"messages": [{"role": "user", "content": "Use Q2 data instead"}]},
    multitask_strategy="rollback",
)
# Previous run cancelled, state rolled back to before it started

Use case: Chatbots where user corrections should override in-progress work.

Strategy 3 — Enqueue (process sequentially):

# Queue new input to run after current run completes
run = await client.runs.create(
    thread_id=thread["thread_id"],
    assistant_id="my_agent",
    input={"messages": [{"role": "user", "content": "Also check Q3"}]},
    multitask_strategy="enqueue",
)
# Runs sequentially: current run finishes, then queued run starts

Use case: Task queues, sequential pipelines, batch processing interfaces.

Strategy 4 — Interrupt (stop and continue):

# Interrupt current run but keep its state, then continue with new input
run = await client.runs.create(
    thread_id=thread["thread_id"],
    assistant_id="my_agent",
    input={"messages": [{"role": "user", "content": "Focus on revenue"}]},
    multitask_strategy="interrupt",
)
# Current run stops mid-execution, new run builds on existing state

Use case: Interactive agents where context should accumulate across interruptions.

Key rules:

  • Set multitask_strategy on every runs.create call (no global default)
  • reject — safest, returns 409 if busy; client must retry
  • rollback — cancels active run, resets state, starts new run
  • enqueue — queues new run after current completes; preserves ordering
  • interrupt — stops active run mid-execution, keeps partial state, starts new run
  • Choose based on UX: corrections → rollback, queuing → enqueue, strict → reject
  • Only relevant for same-thread concurrent runs; different threads are independent

Reference: Double Texting

Use RemoteGraph to invoke deployed LangGraph instances from client code — HIGH

RemoteGraph

RemoteGraph provides a graph-compatible interface for calling deployed LangGraph instances. It supports sync/async invocation, streaming, and state management.

Incorrect — manual HTTP calls:

import httpx

# Manual API calls lose type safety and streaming support
response = httpx.post(
    "http://localhost:2024/runs",
    json={"input": {"messages": [{"role": "user", "content": "hello"}]}},
)

Correct — RemoteGraph (sync):

from langgraph.pregel.remote import RemoteGraph

remote = RemoteGraph(
    "my_agent",
    url="http://localhost:2024",
)

# Invoke like a local graph
result = remote.invoke(
    {"messages": [{"role": "user", "content": "Analyze this data"}]},
    config={"configurable": {"thread_id": "remote-123"}},
)

Async invocation:

from langgraph.pregel.remote import RemoteGraph

remote = RemoteGraph(
    "my_agent",
    url="http://localhost:2024",
)

result = await remote.ainvoke(
    {"messages": [{"role": "user", "content": "Analyze this data"}]},
    config={"configurable": {"thread_id": "remote-123"}},
)

Streaming from remote graphs:

# Stream updates in real-time
async for chunk in remote.astream(
    {"messages": [{"role": "user", "content": "Write a report"}]},
    config={"configurable": {"thread_id": "stream-456"}},
    stream_mode="updates",
):
    print(chunk)

# Stream events for fine-grained control
async for event in remote.astream_events(
    {"messages": [{"role": "user", "content": "Summarize"}]},
    config={"configurable": {"thread_id": "events-789"}},
    version="v2",
):
    if event["event"] == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="")

Use as a subgraph node:

from langgraph.graph import StateGraph, START, END
from langgraph.pregel.remote import RemoteGraph

remote_analyzer = RemoteGraph("analyzer", url="http://analyzer:2024")

builder = StateGraph(OrchestratorState)
builder.add_node("analyze", remote_analyzer)  # Remote graph as local node
builder.add_node("summarize", summarize_node)
builder.add_edge(START, "analyze")
builder.add_edge("analyze", "summarize")
builder.add_edge("summarize", END)

Key rules:

  • Import from langgraph.pregel.remote import RemoteGraph
  • First argument is the graph_id matching the key in langgraph.json
  • Supports invoke, ainvoke, stream, astream, astream_events
  • Use thread_id in config for stateful conversations
  • Can be added as a node in a parent graph for distributed orchestration
  • Handle httpx.ConnectError and httpx.TimeoutException for network failures

Reference: RemoteGraph

Define conditional edge routing with explicit mappings and END fallback — HIGH

Conditional Edge Routing

Route workflow execution dynamically based on state. Always include END or fallback.

Incorrect:

def route(state) -> str:
    if state["quality_score"] >= 0.8:
        return "publish"
    elif state["retry_count"] < 3:
        return "retry"
    # No fallback — if quality < 0.8 AND retries exhausted, returns None → runtime error

workflow.add_conditional_edges("check", route)
# No explicit mapping — unclear what routes exist

Correct:

from langgraph.graph import END

def route_based_on_quality(state: WorkflowState) -> str:
    if state["quality_score"] >= 0.8:
        return "publish"
    elif state["retry_count"] < 3:
        return "retry"
    else:
        return "manual_review"  # Always have a fallback

workflow.add_conditional_edges(
    "quality_check",
    route_based_on_quality,
    {
        "publish": "publish_node",
        "retry": "generator",
        "manual_review": "review_queue",
    }
)

Routing patterns reference:

Sequential:    A → B → C              (simple edges)
Branching:     A → (B or C)           (conditional edges)
Looping:       A → B → A              (retry logic)
Convergence:   (A or B) → C           (multiple inputs)
Diamond:       A → (B, C) → D         (parallel then merge)

Key rules:

  • Router functions must be pure — no side effects
  • Always provide explicit edge mapping dict for clarity
  • Always include END or fallback condition
  • Keep router logic lightweight

Reference: LangGraph Conditional Edges

Use Command(graph=...) for cross-graph navigation between parent and sibling subgraphs — HIGH

Cross-Graph Navigation

Use Command(graph=Command.PARENT) to navigate from a subgraph node to the parent graph. Use Command(graph="sibling_name") for cross-graph routing between sibling subgraphs. Available in LangGraph v0.2.58+.

Incorrect — routing stays within subgraph:

def child_node(state: ChildState) -> Command[Literal["escalate"]]:
    if state["needs_escalation"]:
        return Command(goto="escalate")  # Looks for "escalate" in child graph — fails!

Correct — Command.PARENT for child-to-parent:

from langgraph.types import Command
from typing_extensions import Literal

def child_node(state: ChildState) -> Command[Literal["supervisor"]]:
    if state["needs_escalation"]:
        return Command(
            update={"escalation_reason": "Complexity exceeded threshold"},
            goto="supervisor",
            graph=Command.PARENT,  # Navigate to parent graph
        )
    return Command(update={"result": "handled"}, goto="next_step")

Parent graph setup with child subgraph:

from langgraph.graph import StateGraph, START, END
from typing import TypedDict, Annotated
from operator import add

class ParentState(TypedDict):
    query: str
    escalation_reason: str
    results: Annotated[list[str], add]  # Reducer required for shared keys

class ChildState(TypedDict):
    query: str
    escalation_reason: str

# Child subgraph
child_builder = StateGraph(ChildState)
child_builder.add_node("analyze", child_node)
child_builder.add_edge(START, "analyze")
child_graph = child_builder.compile()

# Parent graph
parent_builder = StateGraph(ParentState)
parent_builder.add_node("child_agent", child_graph)
parent_builder.add_node("supervisor", supervisor_node)
parent_builder.add_node("specialist", specialist_node)
parent_builder.add_edge(START, "child_agent")
parent_builder.add_edge("supervisor", "specialist")
parent_builder.add_edge("specialist", END)

app = parent_builder.compile()

Cross-graph routing to sibling subgraph:

def agent_a_node(state: AgentAState) -> Command[Literal["agent_b"]]:
    if state["needs_different_expertise"]:
        return Command(
            update={"handoff_context": state["partial_result"]},
            goto="agent_b",          # Sibling node in parent graph
            graph=Command.PARENT,    # Navigate up to parent first
        )
    return Command(update={"result": "done"}, goto=END)

State mapping between different schemas:

class ParentState(TypedDict):
    query: str
    context: Annotated[list[str], add]  # Must have reducer for updates from children

class ResearcherState(TypedDict):
    query: str
    context: str  # Different type than parent — mapping happens at boundary

def researcher_node(state: ResearcherState) -> Command[Literal["writer"]]:
    finding = research(state["query"])
    return Command(
        update={"context": [finding]},  # Match parent's list type with reducer
        goto="writer",
        graph=Command.PARENT,
    )

Key rules:

  • Command.PARENT navigates to the closest parent graph
  • For sibling routing: go to parent first, then target the sibling node name
  • Shared state keys updated via Command.PARENT must have reducers in parent state
  • State types can differ between parent and child — map values at the boundary
  • Use cases: escalation, delegation, multi-agent handoff, error propagation
  • Requires LangGraph v0.2.58+ (stable in v0.3+)
  • Do not wrap Command returns in try/except — it uses exceptions internally

Reference: Cross-graph navigation

Implement retry loops with max counter to prevent infinite resource consumption — HIGH

Retry Loop Pattern

Loop-back edges for retrying failed operations. Always include a max retry counter.

Incorrect:

def should_retry(state) -> str:
    if state.get("output"):
        return "success"
    return "retry"  # No max counter — infinite loop if LLM keeps failing

workflow.add_conditional_edges("llm_call", should_retry, {
    "success": "next_step",
    "retry": "llm_call",  # Loops forever
})

Correct:

def llm_call_with_retry(state):
    try:
        result = call_llm(state["input"])
        return {"output": result, "retry_count": 0}
    except Exception as e:
        return {
            "retry_count": state.get("retry_count", 0) + 1,
            "error": str(e),
        }

def should_retry(state) -> str:
    if state.get("output"):
        return "success"
    elif state["retry_count"] < 3:
        return "retry"
    else:
        return "failed"  # Max retries exceeded

workflow.add_conditional_edges("llm_call", should_retry, {
    "success": "next_step",
    "retry": "llm_call",
    "failed": "error_handler",
})

Key rules:

  • Always track retry_count in state
  • Set max retries (2-3 for LLM calls)
  • Include explicit "failed" path for max retries exceeded
  • Consider exponential backoff for API calls

Reference: LangGraph Conditional Edges

Route with embedding similarity or Command API instead of conditional edges — HIGH

Semantic & Command Routing

Use Command API when routing AND updating state. Use semantic routing for intent classification.

Incorrect — split routing + state update:

def router(state):
    state["route_reason"] = "high score"  # State update in router
    if state["score"] > 0.8:
        return "approve"
    return "reject"

workflow.add_conditional_edges("evaluate", router)
# State update happens but routing is separate — hard to reason about

Correct — Command API (2026 pattern):

from langgraph.types import Command
from typing import Literal

def router_with_state(state) -> Command[Literal["approve", "reject"]]:
    if state["score"] > 0.8:
        return Command(
            update={"route_reason": "high score", "routed_at": time.time()},
            goto="approve"
        )
    return Command(
        update={"route_reason": "low score", "routed_at": time.time()},
        goto="reject"
    )

workflow.add_node("evaluate", router_with_state)
# No conditional edges needed — Command handles both state + routing

Semantic routing — embedding-based intent classification:

from sentence_transformers import SentenceTransformer
import numpy as np

embedder = SentenceTransformer("all-MiniLM-L6-v2")

ROUTE_EMBEDDINGS = {
    "technical": embedder.encode("technical code programming"),
    "business": embedder.encode("business strategy revenue"),
    "support": embedder.encode("help troubleshoot error fix"),
}

def semantic_router(state) -> str:
    query_embedding = embedder.encode(state["query"])
    similarities = {
        route: np.dot(query_embedding, emb) / (
            np.linalg.norm(query_embedding) * np.linalg.norm(emb)
        )
        for route, emb in ROUTE_EMBEDDINGS.items()
    }
    best = max(similarities, key=similarities.get)
    return "general" if similarities[best] < 0.3 else best

Key rules:

  • Use Command when updating state AND routing together
  • Use conditional edges when routing only (no state updates)
  • Pre-compute embeddings for semantic routing (don't embed on every call)
  • Always include a fallback/general route

Reference: LangGraph Command API

Use MessagesState with add_messages reducer to preserve conversation history — CRITICAL

MessagesState Pattern

Use MessagesState or add_messages reducer for any workflow handling conversation messages.

Incorrect:

class AgentState(TypedDict):
    messages: list  # No reducer — each node REPLACES entire message history
    user_id: str

def agent_node(state):
    response = llm.invoke(state["messages"])
    return {"messages": [response]}  # Overwrites all previous messages!

Correct:

from langgraph.graph import MessagesState
from langgraph.graph.message import add_messages
from typing import Annotated

# Option 1: Extend built-in MessagesState (recommended)
class AgentState(MessagesState):
    user_id: str
    context: dict

# Option 2: Manual add_messages reducer
class AgentState(TypedDict):
    messages: Annotated[list, add_messages]  # Smart append/update by ID
    user_id: str

def agent_node(state):
    response = llm.invoke(state["messages"])
    return {"messages": [response]}  # Appends, doesn't overwrite

Why add_messages matters:

  • Appends new messages (doesn't overwrite)
  • Updates existing messages by matching ID
  • Handles message deduplication automatically

Note: MessageGraph is deprecated in LangGraph v1.0.0. Use StateGraph with a messages key instead.

Reference: LangGraph MessagesState

Apply Pydantic state validation at boundaries only to minimize runtime overhead — HIGH

Pydantic State Validation

Use TypedDict internally (lightweight), Pydantic BaseModel at system boundaries (user input, API output).

Incorrect:

from pydantic import BaseModel

# Using Pydantic for ALL internal state — unnecessary runtime validation overhead
class WorkflowState(BaseModel):
    input: str
    intermediate_result: str = ""
    agent_responses: list[dict] = []
    # Every node update triggers Pydantic validation

Correct:

from typing import TypedDict, Annotated
from operator import add
from pydantic import BaseModel, Field

# Internal state: TypedDict (no runtime overhead)
class WorkflowState(TypedDict):
    input: str
    output: str
    agent_responses: Annotated[list[dict], add]

# Boundary validation: Pydantic (validates external data)
class WorkflowInput(BaseModel):
    input: str = Field(description="User input", min_length=1)
    config: dict = Field(default_factory=dict)

class WorkflowOutput(BaseModel):
    output: str
    confidence: float = Field(ge=0, le=1)

Key rules:

  • TypedDict for internal graph state (lightweight, no runtime cost)
  • Pydantic at boundaries: API inputs, user-facing outputs, LLM structured output
  • Annotated[list, add] works with TypedDict, not with Pydantic BaseModel

Reference: LangGraph State Concepts

Define custom Annotated reducers for merge, dedup, and last-value state patterns — HIGH

Custom Annotated Reducers

Define custom reducers when operator.add doesn't fit. Common patterns: merge dicts, keep latest, dedup.

Incorrect:

class State(TypedDict):
    config: dict  # No reducer — later nodes overwrite entire dict
    status: str   # No reducer — same issue

def node_a(state):
    return {"config": {"key_a": "value_a"}}

def node_b(state):
    return {"config": {"key_b": "value_b"}}
    # config is now {"key_b": "value_b"} — key_a is LOST

Correct:

from typing import Annotated

def merge_dicts(a: dict, b: dict) -> dict:
    """Custom reducer that deep-merges dictionaries."""
    return {**a, **b}

def last_value(a, b):
    """Keep only the latest value."""
    return b

class State(TypedDict):
    config: Annotated[dict, merge_dicts]   # Merges updates from all nodes
    status: Annotated[str, last_value]     # Explicit: keeps latest value

def node_a(state):
    return {"config": {"key_a": "value_a"}}

def node_b(state):
    return {"config": {"key_b": "value_b"}}
    # config is now {"key_a": "value_a", "key_b": "value_b"}

RemainingSteps (2026 pattern) — proactive recursion handling:

from langgraph.types import RemainingSteps

def agent_node(state: WorkflowState, remaining: RemainingSteps):
    if remaining.steps < 5:
        return {"action": "summarize_and_exit"}
    return {"action": "continue"}

Key rules:

  • Use Annotated[dict, merge_dicts] for config-style fields
  • Use Annotated[str, last_value] to make overwrites explicit
  • Always return new state from nodes, never mutate in place (breaks checkpointing)

Reference: LangGraph Reducers

Design TypedDict state with Annotated accumulators to prevent silent data loss — CRITICAL

TypedDict State Pattern

Use TypedDict for lightweight internal state. Use Annotated[list[T], add] for any field that multiple nodes write to.

Incorrect:

from typing import TypedDict

class WorkflowState(TypedDict):
    findings: list[dict]  # No reducer — each node REPLACES the list

def agent_a(state):
    return {"findings": [{"source": "a", "result": "..."}]}

def agent_b(state):
    return {"findings": [{"source": "b", "result": "..."}]}
    # agent_a's findings are LOST

Correct:

from typing import TypedDict, Annotated
from operator import add

class WorkflowState(TypedDict):
    input: str
    output: str
    findings: Annotated[list[dict], add]  # Accumulates across nodes
    metadata: dict

def agent_a(state):
    return {"findings": [{"source": "a", "result": "..."}]}

def agent_b(state):
    return {"findings": [{"source": "b", "result": "..."}]}
    # Both findings are preserved: [agent_a's, agent_b's]

Context Schema (2026 pattern) — pass runtime config without polluting state:

from dataclasses import dataclass

@dataclass
class ContextSchema:
    llm_provider: str = "anthropic"
    temperature: float = 0.7

graph = StateGraph(WorkflowState, context_schema=ContextSchema)

def my_node(state: WorkflowState, context: ContextSchema):
    return {"output": call_llm(state["input"], context.temperature)}

Node Caching (2026 pattern):

from langgraph.cache.memory import InMemoryCache
from langgraph.types import CachePolicy

builder.add_node("embed", embed_node, cache_policy=CachePolicy(ttl=300))
graph = builder.compile(cache=InMemoryCache())

Key rules:

  • Always use Annotated[list[T], add] for multi-agent accumulation
  • Return partial state updates from nodes, never mutate in place
  • Use context_schema for runtime config (temperature, provider)
  • Use CachePolicy for expensive operations (embeddings, API calls)

Reference: LangGraph State Concepts

Emit custom streaming events to provide client visibility into node progress — MEDIUM

Custom Event Streaming

Emit progress events from nodes using get_stream_writer(). Consume via stream_mode="custom".

Incorrect — no progress visibility:

def process_items(state):
    for item in state["items"]:
        result = process(item)  # Client sees nothing until ALL items done
    return {"results": results}

Correct — custom progress events:

from langgraph.config import get_stream_writer

def node_with_progress(state):
    writer = get_stream_writer()

    for i, item in enumerate(state["items"]):
        writer({
            "type": "progress",
            "current": i + 1,
            "total": len(state["items"]),
            "status": f"Processing {item}",
        })
        result = process(item)

    writer({"type": "complete", "message": "All items processed"})
    return {"results": results}

# Consume custom events
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
    if mode == "custom":
        if chunk.get("type") == "progress":
            print(f"Progress: {chunk['current']}/{chunk['total']}")
    elif mode == "updates":
        print(f"State updated: {list(chunk.keys())}")

FastAPI SSE integration:

@app.post("/stream")
async def stream_workflow(request: WorkflowRequest):
    async def event_generator():
        async for mode, chunk in graph.astream(
            request.inputs, stream_mode=["updates", "custom"]
        ):
            yield f"data: {json.dumps({'mode': mode, 'data': chunk})}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")

Key rules:

  • Use get_stream_writer() to emit custom events from any node
  • Events should have a type field for client-side routing
  • Include progress data (current, total) for progress bars
  • Combine with stream_mode=["updates", "custom"] for comprehensive output

Reference: LangGraph Streaming

Choose the right stream mode for efficient real-time UI integration — MEDIUM

Stream Modes

LangGraph provides 5 stream modes. Choose based on your use case.

ModePurposeUse Case
valuesFull state after each stepDebugging, state inspection
updatesState deltas after each stepEfficient UI updates
messagesLLM tokens + metadataChat interfaces, typing indicators
customUser-defined eventsProgress bars, status updates
debugMaximum informationDevelopment, troubleshooting

Incorrect — default mode only:

for chunk in graph.stream(inputs):  # defaults to "values" — full state every step
    print(chunk)  # Massive output, hard to parse

Correct — multiple modes for comprehensive feedback:

async for mode, chunk in graph.astream(
    inputs,
    stream_mode=["updates", "custom", "messages"]
):
    match mode:
        case "updates":
            update_ui_state(chunk)
        case "custom":
            show_progress(chunk)
        case "messages":
            append_to_chat(chunk)

Subgraph streaming:

for namespace, chunk in graph.stream(inputs, subgraphs=True, stream_mode="updates"):
    print(f"[{'/'.join(namespace) or 'root'}] {chunk}")

Key rules:

  • Use ["updates", "custom"] for most UIs
  • Use "messages" for chat interfaces
  • Enable subgraphs=True for complex nested workflows
  • Combine multiple modes in a list for comprehensive output

Reference: LangGraph Streaming

Filter LLM token streams by node or tags to avoid cross-node noise — MEDIUM

LLM Token Streaming

Stream LLM tokens in real-time. Filter by node name or model tags to control output.

Incorrect — no filtering, noisy output:

for msg, meta in graph.stream(inputs, stream_mode="messages"):
    print(msg.content, end="")  # Shows tokens from ALL LLM calls — mixed output

Correct — filtered by node:

for msg, meta in graph.stream(inputs, stream_mode="messages"):
    if meta["langgraph_node"] == "writer_agent":
        print(msg.content, end="", flush=True)

Filtered by tags (more flexible):

model = init_chat_model("claude-sonnet-4-20250514", tags=["main_response"])

for msg, meta in graph.stream(inputs, stream_mode="messages"):
    if "main_response" in meta.get("tags", []):
        print(msg.content, end="", flush=True)

Non-LangChain LLM streaming:

from langgraph.config import get_stream_writer

def call_custom_llm(state):
    writer = get_stream_writer()
    for chunk in your_streaming_client.generate(state["prompt"]):
        writer({"type": "llm_token", "content": chunk.text})
    return {"response": full_response}

Key rules:

  • Use stream_mode="messages" for token-by-token streaming
  • Filter by meta["langgraph_node"] or meta["tags"] to isolate output
  • Use flush=True on print for real-time display
  • Use get_stream_writer() for non-LangChain LLM APIs

Reference: LangGraph Streaming

Add subgraph directly as node when parent and child share the same state — MEDIUM

Add Subgraph as Node (Shared State)

Use when parent and subgraph share the same state schema. Add compiled graph directly as a node.

Incorrect — unnecessary state mapping for shared schema:

def call_agent(state: SharedState):
    # Unnecessary transformation when schemas match
    input = {"messages": state["messages"], "context": state["context"]}
    output = agent_subgraph.invoke(input)
    return {"messages": output["messages"], "context": output["context"]}

Correct — add compiled graph directly:

from langgraph.graph.message import add_messages

class SharedState(TypedDict):
    messages: Annotated[list, add_messages]
    context: dict

# Build subgraph with SAME state
agent_builder = StateGraph(SharedState)
agent_builder.add_node("think", think_node)
agent_builder.add_node("act", act_node)
agent_builder.add_edge(START, "think")
agent_builder.add_edge("think", "act")
agent_builder.add_edge("act", END)
agent_subgraph = agent_builder.compile()

# Add compiled subgraph directly as node — no wrapper needed
parent_builder = StateGraph(SharedState)
parent_builder.add_node("agent_team", agent_subgraph)
parent_builder.add_edge(START, "agent_team")
parent_builder.add_edge("agent_team", END)

Key rules:

  • Use when parent and subgraph have identical or overlapping state schemas
  • No state transformation needed — shared keys pass through automatically
  • Ideal for agent coordination via message passing
  • Reducers (like add_messages) work across the boundary

Reference: LangGraph Subgraphs

Invoke subgraph from node with state transformation to prevent schema mismatch — MEDIUM

Invoke Subgraph from Node

Use when subgraph needs completely isolated state (different schema from parent).

Incorrect — no state transformation:

def call_analysis(state: ParentState):
    result = analysis_subgraph.invoke(state)  # Schema mismatch! KeyError
    return result  # Wrong schema returned to parent

Correct — explicit state mapping at boundaries:

class ParentState(TypedDict):
    query: str
    analysis_result: dict

class AnalysisState(TypedDict):
    input_text: str
    findings: list[str]
    score: float

# Build and compile subgraph
analysis_builder = StateGraph(AnalysisState)
analysis_builder.add_node("analyze", analyze_node)
analysis_builder.add_node("score", score_node)
analysis_builder.add_edge(START, "analyze")
analysis_builder.add_edge("analyze", "score")
analysis_builder.add_edge("score", END)
analysis_subgraph = analysis_builder.compile()

def call_analysis(state: ParentState) -> dict:
    # Map parent → subgraph state
    subgraph_input = {"input_text": state["query"], "findings": [], "score": 0.0}

    # Invoke subgraph
    subgraph_output = analysis_subgraph.invoke(subgraph_input)

    # Map subgraph → parent state
    return {
        "analysis_result": {
            "findings": subgraph_output["findings"],
            "score": subgraph_output["score"],
        }
    }

parent_builder.add_node("analysis", call_analysis)

Key rules:

  • Transform state at both boundaries (parent→subgraph AND subgraph→parent)
  • Compile subgraph separately before adding to parent
  • Use for different schemas, private message histories, multi-level nesting

Reference: LangGraph Subgraphs

Propagate config through subgraph state mapping for tracing and checkpointing — MEDIUM

Subgraph State Mapping

Explicit state transforms at boundaries with proper config propagation for tracing and checkpointing.

Incorrect — no config propagation:

def call_subgraph(state: ParentState):
    result = subgraph.invoke({"query": state["query"]})  # No config — breaks tracing
    return {"result": result["output"]}

Correct — config propagation and explicit mapping:

from langgraph.config import get_runnable_config

def call_subgraph_with_mapping(state: ParentState) -> dict:
    # 1. Extract relevant data
    subgraph_input = {
        "query": state["user_query"],
        "context": state.get("context", {}),
        "history": [],
    }

    # 2. Propagate config for tracing/checkpointing
    config = get_runnable_config()
    result = subgraph.invoke(subgraph_input, config)

    # 3. Transform output back
    return {
        "subgraph_result": result["output"],
        "metadata": {"subgraph": "analysis", "steps": result.get("step_count", 0)},
    }

Checkpointing strategies:

# Parent-only (recommended) — propagates to all subgraphs
parent = parent_builder.compile(checkpointer=PostgresSaver(...))

# Independent subgraph memory — for persistent agent histories
agent_subgraph = agent_builder.compile(checkpointer=True)

Streaming nested graphs:

for namespace, chunk in graph.stream(inputs, subgraphs=True, stream_mode="updates"):
    depth = len(namespace)
    print(f"{'  ' * depth}[{'/'.join(namespace) or 'root'}] {chunk}")

Key rules:

  • Always propagate config with get_runnable_config() for tracing
  • Parent-only checkpointer is sufficient for most cases
  • Use subgraphs=True in stream/get_state for nested visibility
  • Keep state mapping explicit and documented at each boundary

Reference: LangGraph Subgraphs

Build supervisor-worker pattern with Command API for coordinated state routing — HIGH

Supervisor-Worker Pattern

Central supervisor routes to specialized workers. Use Command API for combined state update + routing.

Incorrect — missing edges and deprecated API:

workflow.set_entry_point("supervisor")  # Deprecated!

def supervisor(state):
    state["next"] = "analyzer"  # Mutating state directly
    return state

# No worker → supervisor edges — workers exit after running

Correct — Command API (2026 pattern):

from langgraph.graph import StateGraph, START, END
from langgraph.types import Command
from typing import Literal

def supervisor(state) -> Command[Literal["analyzer", "validator", END]]:
    if "analyzer" not in state["agents_completed"]:
        return Command(
            update={"current_agent": "analyzer"},
            goto="analyzer"
        )
    elif "validator" not in state["agents_completed"]:
        return Command(
            update={"current_agent": "validator"},
            goto="validator"
        )
    return Command(update={"status": "complete"}, goto=END)

def analyzer(state):
    result = analyze(state["input"])
    return {"results": [result], "agents_completed": ["analyzer"]}

graph = StateGraph(WorkflowState)
graph.add_node("supervisor", supervisor)
graph.add_node("analyzer", analyzer)
graph.add_node("validator", validator)

graph.add_edge(START, "supervisor")        # Entry point
graph.add_edge("analyzer", "supervisor")   # Workers return to supervisor
graph.add_edge("validator", "supervisor")
# No conditional edges needed — Command handles routing

app = graph.compile()

Key rules:

  • Use add_edge(START, "supervisor") not set_entry_point() (deprecated)
  • Use Command when updating state AND routing together
  • Every worker must have an edge back to supervisor
  • Always include END condition to prevent infinite loops
  • Keep supervisor logic lightweight (routing only, no heavy computation)

Reference: LangGraph Supervisor

Route supervisor tasks by priority to ensure critical-first execution order — HIGH

Priority-Based Supervisor Routing

Route by priority instead of round-robin when execution order matters (e.g., security before implementation).

Incorrect — round-robin ignores priority:

ALL_AGENTS = ["tutorial", "security", "tech", "implementation"]

def supervisor(state):
    completed = set(state["agents_completed"])
    available = [a for a in ALL_AGENTS if a not in completed]
    state["next"] = available[0] if available else END  # tutorial runs before security!
    return state

Correct — priority-ordered execution:

AGENT_PRIORITIES = {
    "security": 1,         # Run first — block on vulnerabilities
    "tech": 2,
    "implementation": 3,
    "tutorial": 4,         # Run last
}

def priority_supervisor(state) -> Command[Literal["security", "tech", "implementation", "tutorial", END]]:
    completed = set(state["agents_completed"])
    available = [a for a in AGENT_PRIORITIES if a not in completed]

    if not available:
        return Command(update={"status": "complete"}, goto=END)

    next_agent = min(available, key=lambda a: AGENT_PRIORITIES[a])
    return Command(
        update={"current_agent": next_agent},
        goto=next_agent
    )

LLM-Based Supervisor (2026 pattern):

from pydantic import BaseModel, Field

class SupervisorDecision(BaseModel):
    next_agent: Literal["security", "tech", "DONE"]
    reasoning: str = Field(description="Brief routing rationale")

async def llm_supervisor(state):
    decision = await llm.with_structured_output(SupervisorDecision).ainvoke(prompt)
    if decision.next_agent == "DONE":
        return Command(goto=END)
    return Command(update={"routing_reasoning": decision.reasoning}, goto=decision.next_agent)

Key rules:

  • Use priority dict when execution order matters
  • Use LLM-based routing when priorities are dynamic/context-dependent
  • Track agents_completed list to prevent infinite loops
  • 3-8 specialists max per supervisor (avoid coordination overhead)

Reference: LangGraph Supervisor

Dispatch agents in round-robin order with completion tracking to avoid repeats — MEDIUM

Round-Robin Supervisor Dispatch

Visit all agents exactly once before finishing. Track completion to prevent re-dispatch.

Incorrect — no completion tracking:

ALL_AGENTS = ["security", "tech", "implementation"]

def supervisor(state):
    # No tracking — may dispatch same agent multiple times
    state["next"] = ALL_AGENTS[0]
    return state

Correct — completion-tracked round-robin:

ALL_AGENTS = ["security", "tech", "implementation", "tutorial"]

def supervisor(state) -> Command[Literal[*ALL_AGENTS, "quality_gate", END]]:
    completed = set(state["agents_completed"])
    available = [a for a in ALL_AGENTS if a not in completed]

    if not available:
        return Command(goto="quality_gate")

    return Command(
        update={"current_agent": available[0]},
        goto=available[0]
    )

def agent_node_factory(agent_name: str):
    """Create agent node that tracks completion."""
    async def node(state):
        result = await agents[agent_name].run(state["input"])
        return {
            "results": [result],
            "agents_completed": [agent_name],
            "current_agent": None,
        }
    return node

# Register all agents
for name in ALL_AGENTS:
    workflow.add_node(name, agent_node_factory(name))
    workflow.add_edge(name, "supervisor")

Key rules:

  • Track agents_completed as Annotated[list[str], add] in state
  • Check available vs completed to determine next agent
  • Route to quality gate or END when all agents done
  • Use factory function for consistent agent node creation

Reference: LangGraph Multi-Agent

Bind tools to LLMs correctly with proper tool_choice to ensure invocation — CRITICAL

Tool Binding to LLMs

Bind tools to models with bind_tools(). Use tool_choice to control selection.

Incorrect — tools defined but not bound:

@tool
def search_database(query: str) -> str:
    """Search the database."""
    return db.search(query)

def agent_node(state):
    response = model.invoke(state["messages"])  # Model doesn't know about tools!
    return {"messages": [response]}

Correct — tools bound to model:

from langchain_core.tools import tool
from langchain_anthropic import ChatAnthropic

@tool
def search_database(query: str) -> str:
    """Search the database for information."""
    return db.search(query)

@tool
def send_email(to: str, subject: str, body: str) -> str:
    """Send an email to a recipient."""
    email_service.send(to, subject, body)
    return f"Email sent to {to}"

tools = [search_database, send_email]
model = ChatAnthropic(model="claude-sonnet-4-20250514")
model_with_tools = model.bind_tools(tools)

def agent_node(state):
    response = model_with_tools.invoke(state["messages"])
    return {"messages": [response]}

Force specific tool:

model.bind_tools(tools, tool_choice="any")             # At least one tool
model.bind_tools(tools, tool_choice="search_database")  # Specific tool

Key rules:

  • Always bind_tools() before invoking the model
  • Use descriptive @tool docstrings — LLM uses them to decide which tool to call
  • Keep 5-10 tools max per agent (use dynamic selection for more)
  • Use tool_choice when a specific tool is required

Reference: LangGraph Tool Calling

Filter tools dynamically by relevance to avoid context overflow from large sets — HIGH

Dynamic Tool Selection

When you have many tools, select the most relevant subset per query using embeddings.

Incorrect — all tools always bound:

# 50 tools bound — floods context, LLM makes poor choices
model_with_all = model.bind_tools(all_50_tools)

Correct — dynamic selection by relevance:

from sentence_transformers import SentenceTransformer

embedder = SentenceTransformer("all-MiniLM-L6-v2")

# Pre-compute tool embeddings (once at startup)
TOOL_EMBEDDINGS = {
    tool.name: embedder.encode(tool.description)
    for tool in all_tools
}

def select_relevant_tools(query: str, all_tools: list, top_k: int = 5) -> list:
    query_embedding = embedder.encode(query)
    similarities = [
        (tool, cosine_similarity(query_embedding, TOOL_EMBEDDINGS[tool.name]))
        for tool in all_tools
    ]
    sorted_tools = sorted(similarities, key=lambda x: x[1], reverse=True)
    return [tool for tool, _ in sorted_tools[:top_k]]

def agent_with_dynamic_tools(state):
    relevant_tools = select_relevant_tools(
        state["messages"][-1].content, all_tools, top_k=5
    )
    model_bound = model.bind_tools(relevant_tools)
    response = model_bound.invoke(state["messages"])
    return {"messages": [response]}

Key rules:

  • Pre-compute embeddings at startup (not per request)
  • Select top 5-10 tools per query
  • Use cosine similarity for relevance scoring
  • Fall back to general tools if no strong match

Reference: LangGraph Tool Calling

Gate dangerous tool execution with interrupt-based human approval before proceeding — CRITICAL

Tool Interrupt Approval Gates

Use interrupt() inside tool functions for human approval before destructive operations.

Incorrect — no approval for dangerous operation:

@tool
def delete_user(user_id: str) -> str:
    """Delete a user account."""
    db.delete_user(user_id)  # Executes immediately without approval!
    return f"User {user_id} deleted"

Correct — interrupt for approval:

from langgraph.types import interrupt

@tool
def delete_user(user_id: str) -> str:
    """Delete a user account. Requires approval."""
    response = interrupt({
        "action": "delete_user",
        "user_id": user_id,
        "message": f"Approve deletion of user {user_id}?",
        "risk_level": "high",
    })

    if response.get("approved"):
        db.delete_user(user_id)
        return f"User {user_id} deleted successfully"
    return "Deletion cancelled by user"

@tool
def transfer_funds(from_account: str, to_account: str, amount: float) -> str:
    """Transfer funds. Requires approval for large amounts."""
    if amount > 1000:
        response = interrupt({
            "action": "transfer_funds",
            "amount": amount,
            "message": f"Approve transfer of ${amount}?",
        })
        if not response.get("approved"):
            return "Transfer cancelled"

    execute_transfer(from_account, to_account, amount)
    return f"Transferred ${amount}"

Streaming from tools:

from langgraph.config import get_stream_writer

@tool
def long_running_analysis(data: str) -> str:
    writer = get_stream_writer()
    writer({"status": "starting", "progress": 0})
    for i, chunk in enumerate(process_chunks(data)):
        writer({"status": "processing", "progress": (i + 1) * 10})
    return "Analysis complete"

Key rules:

  • Use interrupt() for any destructive or high-risk tool
  • Return error strings from tools, don't raise exceptions (lets agent recover)
  • Place side effects AFTER interrupt calls (not before)
  • Use get_stream_writer() for long-running tool progress

Reference: LangGraph Human-in-the-Loop

Use ToolNode for automatic parallel tool execution with built-in error handling — CRITICAL

ToolNode Execution

Use ToolNode for automatic tool execution with parallel support and error handling.

Incorrect — manual tool dispatch:

def execute_tools(state):
    last_message = state["messages"][-1]
    results = []
    for tool_call in last_message.tool_calls:
        # Manual dispatch — sequential, no error handling
        tool = tools_dict[tool_call["name"]]
        result = tool.invoke(tool_call["args"])
        results.append(result)
    return {"messages": results}

Correct — ToolNode handles everything:

from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, START, END, MessagesState

tool_node = ToolNode(tools)  # Parallel execution built-in

builder = StateGraph(MessagesState)
builder.add_node("agent", agent_node)
builder.add_node("tools", tool_node)

def should_continue(state) -> str:
    last_message = state["messages"][-1]
    if last_message.tool_calls:
        return "tools"
    return END

builder.add_edge(START, "agent")
builder.add_conditional_edges("agent", should_continue, {"tools": "tools", END: END})
builder.add_edge("tools", "agent")  # Return to agent after tool execution

graph = builder.compile()

Key rules:

  • ToolNode executes multiple tool calls in parallel automatically
  • Results returned in order matching the original tool_calls
  • Always add tools → agent edge for the ReAct loop
  • Route based on tool_calls presence in last message

Reference: LangGraph ToolNode

Edit on GitHub

Last updated on

On this page

LangGraph Workflow PatternsQuick ReferenceState ManagementRouting & BranchingParallel ExecutionSupervisor PatternsTool CallingCheckpointingHuman-in-LoopStreamingSubgraphsFunctional APIPlatformQuick Start Example2026 Key PatternsKey DecisionsCommon MistakesEvaluationsRelated SkillsRules (36)Recover interrupted workflow state and debug checkpoint history — HIGHState Recovery and DebuggingConfigure persistent checkpointer to survive crashes in production workflows — HIGHCheckpointer SetupUse Store for cross-thread memory instead of per-thread checkpoints — HIGHCross-Thread Store MemoryConfigure @entrypoint decorator with checkpointer for resumable workflows — MEDIUM@entrypoint DecoratorMigrate from StateGraph to Functional API while preserving routing flexibility — MEDIUMStateGraph to Functional API MigrationUse @task decorator correctly and always call .result() on futures — MEDIUM@task Decorator and FuturesImplement approval gate pattern with reject paths to prevent dead-end workflows — MEDIUMApproval Gate PatternBuild iterative feedback loops that refine output across multiple human rounds — MEDIUMFeedback Loop PatternUse interrupt() for conditional pausing and Command for resuming workflows — MEDIUMDynamic Interrupt and ResumeIsolate errors in parallel branches to preserve completed results on failure — HIGHError Isolation in Parallel ExecutionUse Send API for dynamic fan-out and fan-in parallel branches — HIGHFan-Out/Fan-In PatternApply map-reduce with asyncio.gather for independent parallel task processing — HIGHMap-Reduce PatternConfigure LangGraph Platform for local dev, Docker builds, and cloud deployment — HIGHPlatform DeploymentHandle concurrent user messages with double-texting strategies — MEDIUMDouble TextingUse RemoteGraph to invoke deployed LangGraph instances from client code — HIGHRemoteGraphDefine conditional edge routing with explicit mappings and END fallback — HIGHConditional Edge RoutingUse Command(graph=...) for cross-graph navigation between parent and sibling subgraphs — HIGHCross-Graph NavigationImplement retry loops with max counter to prevent infinite resource consumption — HIGHRetry Loop PatternRoute with embedding similarity or Command API instead of conditional edges — HIGHSemantic & Command RoutingUse MessagesState with add_messages reducer to preserve conversation history — CRITICALMessagesState PatternApply Pydantic state validation at boundaries only to minimize runtime overhead — HIGHPydantic State ValidationDefine custom Annotated reducers for merge, dedup, and last-value state patterns — HIGHCustom Annotated ReducersDesign TypedDict state with Annotated accumulators to prevent silent data loss — CRITICALTypedDict State PatternEmit custom streaming events to provide client visibility into node progress — MEDIUMCustom Event StreamingChoose the right stream mode for efficient real-time UI integration — MEDIUMStream ModesFilter LLM token streams by node or tags to avoid cross-node noise — MEDIUMLLM Token StreamingAdd subgraph directly as node when parent and child share the same state — MEDIUMAdd Subgraph as Node (Shared State)Invoke subgraph from node with state transformation to prevent schema mismatch — MEDIUMInvoke Subgraph from NodePropagate config through subgraph state mapping for tracing and checkpointing — MEDIUMSubgraph State MappingBuild supervisor-worker pattern with Command API for coordinated state routing — HIGHSupervisor-Worker PatternRoute supervisor tasks by priority to ensure critical-first execution order — HIGHPriority-Based Supervisor RoutingDispatch agents in round-robin order with completion tracking to avoid repeats — MEDIUMRound-Robin Supervisor DispatchBind tools to LLMs correctly with proper tool_choice to ensure invocation — CRITICALTool Binding to LLMsFilter tools dynamically by relevance to avoid context overflow from large sets — HIGHDynamic Tool SelectionGate dangerous tool execution with interrupt-based human approval before proceeding — CRITICALTool Interrupt Approval GatesUse ToolNode for automatic parallel tool execution with built-in error handling — CRITICALToolNode Execution