Skip to main content
OrchestKit v6.7.1 — 67 skills, 38 agents, 77 hooks with Opus 4.6 support
OrchestKit
Skills

Agent Orchestration

Agent orchestration patterns for agentic loops, multi-agent coordination, alternative frameworks, and multi-scenario workflows. Use when building autonomous agent loops, coordinating multiple agents, evaluating CrewAI/AutoGen/Swarm, or orchestrating complex multi-step scenarios.

Reference high

Primary Agent: workflow-architect

Agent Orchestration

Comprehensive patterns for building and coordinating AI agents -- from single-agent reasoning loops to multi-agent systems and framework selection. Each category has individual rule files in rules/ loaded on-demand.

Quick Reference

CategoryRulesImpactWhen to Use
Agent Loops2HIGHReAct reasoning, plan-and-execute, self-correction
Multi-Agent Coordination3CRITICALSupervisor routing, agent debate, result synthesis
Alternative Frameworks3HIGHCrewAI crews, AutoGen teams, framework comparison
Multi-Scenario2MEDIUMParallel scenario orchestration, difficulty routing

Total: 10 rules across 4 categories

Quick Start

# ReAct agent loop
async def react_loop(question: str, tools: dict, max_steps: int = 10) -> str:
    history = REACT_PROMPT.format(tools=list(tools.keys()), question=question)
    for step in range(max_steps):
        response = await llm.chat([{"role": "user", "content": history}])
        if "Final Answer:" in response.content:
            return response.content.split("Final Answer:")[-1].strip()
        if "Action:" in response.content:
            action = parse_action(response.content)
            result = await tools[action.name](*action.args)
            history += f"\nObservation: {result}\n"
    return "Max steps reached without answer"
# Supervisor with fan-out/fan-in
async def multi_agent_analysis(content: str) -> dict:
    agents = [("security", security_agent), ("perf", perf_agent)]
    tasks = [agent(content) for _, agent in agents]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return await synthesize_findings(results)

Agent Loops

Patterns for autonomous LLM reasoning: ReAct (Reasoning + Acting), Plan-and-Execute with replanning, self-correction loops, and sliding-window memory management.

Key decisions: Max steps 5-15, temperature 0.3-0.7, memory window 10-20 messages.

Multi-Agent Coordination

Fan-out/fan-in parallelism, supervisor routing with dependency ordering, conflict resolution (confidence-based or LLM arbitration), result synthesis, and CC Agent Teams (mesh topology for peer messaging in CC 2.1.33+).

Key decisions: 3-8 specialists, parallelize independent agents, use Task tool (star) for simple work, Agent Teams (mesh) for cross-cutting concerns.

Alternative Frameworks

CrewAI hierarchical crews with Flows (1.8+), OpenAI Agents SDK handoffs and guardrails (0.7.0), Microsoft Agent Framework (AutoGen + SK merger), GPT-5.2-Codex for long-horizon coding, and AG2 for open-source flexibility.

Key decisions: Match framework to team expertise + use case. LangGraph for state machines, CrewAI for role-based teams, OpenAI SDK for handoff workflows, MS Agent for enterprise compliance.

Multi-Scenario

Orchestrate a single skill across 3 parallel scenarios (simple/medium/complex) with progressive difficulty scaling (1x/3x/8x), milestone synchronization, and cross-scenario result aggregation.

Key decisions: Free-running with checkpoints, always 3 scenarios, 1x/3x/8x exponential scaling, 30s/90s/300s time budgets.

Key Decisions

DecisionRecommendation
Single vs multi-agentSingle for focused tasks, multi for decomposable work
Max loop steps5-15 (prevent infinite loops)
Agent count3-8 specialists per workflow
FrameworkMatch to team expertise + use case
TopologyTask tool (star) for simple; Agent Teams (mesh) for complex
Scenario countAlways 3: simple, medium, complex

Common Mistakes

  • No step limit in agent loops (infinite loops)
  • No memory management (context overflow)
  • No error isolation in multi-agent (one failure crashes all)
  • Missing synthesis step (raw agent outputs not useful)
  • Mixing frameworks in one project (complexity explosion)
  • Using Agent Teams for simple sequential work (use Task tool)
  • Sequential instead of parallel scenarios (defeats purpose)
  • ork:langgraph - LangGraph workflow patterns (supervisor, routing, state)
  • function-calling - Tool definitions and execution
  • ork:task-dependency-patterns - Task management with Agent Teams workflow

Capability Details

react-loop

Keywords: react, reason, act, observe, loop, agent Solves:

  • Implement ReAct pattern
  • Create reasoning loops
  • Build iterative agents

plan-execute

Keywords: plan, execute, replan, multi-step, autonomous Solves:

  • Create plan then execute steps
  • Implement replanning on failure
  • Build goal-oriented agents

supervisor-coordination

Keywords: supervisor, route, coordinate, fan-out, fan-in, parallel Solves:

  • Route tasks to specialized agents
  • Run agents in parallel
  • Aggregate multi-agent results

agent-debate

Keywords: debate, conflict, resolution, arbitration, consensus Solves:

  • Resolve agent disagreements
  • Implement LLM arbitration
  • Handle conflicting outputs

result-synthesis

Keywords: synthesize, combine, aggregate, merge, summary Solves:

  • Combine outputs from multiple agents
  • Create executive summaries
  • Score confidence across findings

crewai-patterns

Keywords: crewai, crew, hierarchical, delegation, role-based, flows Solves:

  • Build role-based agent teams
  • Implement hierarchical coordination
  • Use Flows for event-driven orchestration

autogen-patterns

Keywords: autogen, microsoft, agent framework, teams, enterprise, a2a Solves:

  • Build enterprise agent systems
  • Use AutoGen/SK merged framework
  • Implement A2A protocol

framework-selection

Keywords: choose, compare, framework, decision, which, crewai, autogen, openai Solves:

  • Select appropriate framework
  • Compare framework capabilities
  • Match framework to requirements

scenario-orchestrator

Keywords: scenario, parallel, fan-out, difficulty, progressive, demo Solves:

  • Run skill across multiple difficulty levels
  • Implement parallel scenario execution
  • Aggregate cross-scenario results

scenario-routing

Keywords: route, synchronize, milestone, checkpoint, scaling Solves:

  • Route tasks by difficulty level
  • Synchronize at milestones
  • Scale inputs progressively

Rules (10)

Frameworks: Microsoft Agent Framework / AutoGen — HIGH

Microsoft Agent Framework (AutoGen + Semantic Kernel)

Enterprise multi-agent systems with RoundRobin/Selector teams, termination conditions, A2A protocol, and tool integration.

Team Setup

from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create model client
model_client = OpenAIChatCompletionClient(model="gpt-5.2")

# Define agents
planner = AssistantAgent(
    name="planner",
    description="Plans complex tasks and breaks them into steps",
    model_client=model_client,
    system_message="You are a planning expert. Break tasks into actionable steps."
)

executor = AssistantAgent(
    name="executor",
    description="Executes planned tasks",
    model_client=model_client,
    system_message="You execute tasks according to the plan."
)

reviewer = AssistantAgent(
    name="reviewer",
    description="Reviews work and provides feedback",
    model_client=model_client,
    system_message="You review work. Say 'APPROVED' if satisfactory."
)

# Create team with termination condition
termination = TextMentionTermination("APPROVED")
team = RoundRobinGroupChat(
    participants=[planner, executor, reviewer],
    termination_condition=termination
)

# Run team
result = await team.run(task="Create a marketing strategy")

Selector Group Chat

from autogen_agentchat.teams import SelectorGroupChat

# Selector chooses next speaker based on context
team = SelectorGroupChat(
    participants=[analyst, writer, reviewer],
    model_client=model_client,
    termination_condition=termination
)

Termination Conditions

from autogen_agentchat.conditions import (
    TextMentionTermination,
    MaxMessageTermination,
    TokenUsageTermination,
    TimeoutTermination,
    OrTerminationCondition,
)

termination = OrTerminationCondition(
    TextMentionTermination("DONE"),
    MaxMessageTermination(max_messages=20),
    TimeoutTermination(timeout_seconds=300)
)

Tool Integration

from autogen_core.tools import FunctionTool

def search_database(query: str) -> str:
    """Search the database for information."""
    results = db.search(query)
    return json.dumps(results)

search_tool = FunctionTool(search_database, description="Search the database")

researcher = AssistantAgent(
    name="researcher",
    description="Researches information",
    model_client=model_client,
    tools=[search_tool],
    system_message="Use the search tool to find information."
)

State Management

# Save state
state = await team.save_state()

# Restore state
await team.load_state(state)

# Resume conversation
result = await team.run(task="Continue from where we left off")

Agent-to-Agent Protocol (A2A)

from autogen_agentchat.protocols import A2AProtocol

protocol = A2AProtocol(
    agent=my_agent,
    endpoint="https://api.example.com/agent",
    auth_token=os.environ["A2A_TOKEN"]
)

response = await protocol.send(
    to="external-agent-id",
    message="Process this request"
)

Streaming

async for message in team.run_stream(task="Analyze this data"):
    print(f"{message.source}: {message.content}")

OpenAI Agents SDK (0.7.0)

Alternative for OpenAI-native ecosystems:

from agents import Agent, Runner, handoff, RunConfig
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX

researcher_agent = Agent(
    name="researcher",
    instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a research specialist. Gather information and facts.
When research is complete, hand off to the writer.""",
    model="gpt-5.2"
)

writer_agent = Agent(
    name="writer",
    instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a content writer. Create compelling content from research.""",
    model="gpt-5.2"
)

orchestrator = Agent(
    name="orchestrator",
    instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You coordinate research and writing tasks.""",
    model="gpt-5.2",
    handoffs=[handoff(agent=researcher_agent), handoff(agent=writer_agent)]
)

async def run_workflow(task: str):
    runner = Runner()
    config = RunConfig(nest_handoff_history=True)
    result = await runner.run(orchestrator, task, run_config=config)
    return result.final_output

Migration from AutoGen 0.2

# Old AutoGen 0.2
# from autogen import AssistantAgent, UserProxyAgent

# New AutoGen 0.4+
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat

# Key differences:
# - No UserProxyAgent needed for simple tasks
# - Teams replace GroupChat
# - Explicit termination conditions required
# - Model client separate from agent

Best Practices

  1. Always set explicit termination conditions
  2. Team size: 3-5 agents optimal
  3. Clear role definitions in system_message
  4. One function per tool with clear descriptions
  5. Use try/except around team.run()
  6. Use run_stream() for real-time feedback

Incorrect — team without termination condition runs indefinitely:

team = RoundRobinGroupChat(
    participants=[planner, executor, reviewer]
    # No termination condition - infinite loop risk
)
result = await team.run(task="Complete task")

Correct — explicit termination prevents infinite loops:

termination = OrTerminationCondition(
    TextMentionTermination("APPROVED"),
    MaxMessageTermination(max_messages=20)
)
team = RoundRobinGroupChat(
    participants=[planner, executor, reviewer],
    termination_condition=termination
)
result = await team.run(task="Complete task")

Compare and select multi-agent frameworks based on complexity, use case, and production needs — HIGH

Framework Comparison & Selection

Decision matrix for choosing between multi-agent frameworks.

Framework Overview

FrameworkBest ForKey FeaturesStatus
LangGraph 1.0.6Complex stateful workflowsPersistence, streaming, human-in-loopProduction
CrewAI 1.8.xRole-based collaborationFlows, hierarchical crews, a2a, HITLProduction
OpenAI Agents SDK 0.7.0OpenAI ecosystemHandoffs, guardrails, MCPServerManagerProduction
GPT-5.2-CodexLong-horizon codingContext compaction, project-scale, securityProduction
MS Agent FrameworkEnterpriseAutoGen+SK merger, A2A, compliancePublic Preview
AG2Open-source, flexibleCommunity fork of AutoGenActive

Feature Comparison

FeatureLangGraphCrewAIOpenAI SDKMS Agent
State ManagementExcellentGoodBasicGood
PersistenceBuilt-inPluginManualBuilt-in
StreamingNativeLimitedNativeNative
Human-in-LoopNativeManualManualNative
MemoryVia StoreBuilt-inManualManual
ObservabilityLangfuse/LangSmithLimitedTracingAzure Monitor
Learning CurveSteepEasyMediumMedium
Production ReadyYesYesYesQ1 2026

Decision Tree

Start
  |
  +-- Need complex state machines?
  |     +-- Yes --> LangGraph
  |     +-- No
  |           |
  +-- Role-based collaboration?
  |     +-- Yes --> CrewAI
  |     +-- No
  |           |
  +-- OpenAI ecosystem only?
  |     +-- Yes --> OpenAI Agents SDK
  |     +-- No
  |           |
  +-- Enterprise requirements?
  |     +-- Yes --> Microsoft Agent Framework
  |     +-- No
  |           |
  +-- Long-horizon coding tasks?
  |     +-- Yes --> GPT-5.2-Codex
  |     +-- No
  |           |
  +-- Open-source priority?
        +-- Yes --> AG2
        +-- No --> LangGraph (default)

Use Case Matrix

Use CaseBest FrameworkWhy
Complex state machinesLangGraphNative StateGraph, persistence
Role-based teamsCrewAIBuilt-in delegation, backstories
OpenAI-only projectsOpenAI SDKNative integration, handoffs
Enterprise/complianceMS AgentAzure integration, A2A
Long-horizon codingGPT-5.2-CodexContext compaction, security
Quick prototypesCrewAIMinimal boilerplate
Long-running workflowsLangGraphCheckpointing, recovery
Customer support botsOpenAI SDKHandoffs, guardrails
Research/experimentsAG2Open-source, flexible

Cost Considerations

FrameworkLicensingInfra CostLLM Cost
LangGraphMITSelf-host / LangGraph CloudAny LLM
CrewAIMITSelf-hostAny LLM
OpenAI SDKMITSelf-hostOpenAI only
MS AgentMITSelf-host / AzureAny LLM
AG2Apache 2.0Self-hostAny LLM

Performance Characteristics

FrameworkCold StartLatencyThroughput
LangGraph~100msLowHigh
CrewAI~200msMediumMedium
OpenAI SDK~50msLowHigh
MS Agent~150msMediumHigh

Team Expertise Requirements

FrameworkPythonLLMInfra
LangGraphExpertExpertMedium
CrewAIBeginnerBeginnerLow
OpenAI SDKMediumMediumLow
MS AgentMediumMediumHigh

Migration Paths

From AutoGen to MS Agent Framework

# AutoGen 0.2 (old)
from autogen import AssistantAgent, UserProxyAgent
agent = AssistantAgent(name="assistant", llm_config=config)

# MS Agent Framework (new)
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(model="gpt-5.2")
agent = AssistantAgent(name="assistant", model_client=model_client)

From Custom to LangGraph

# Custom orchestration (old)
async def workflow(task):
    step1 = await agent1.run(task)
    step2 = await agent2.run(step1)
    return step2

# LangGraph (new)
from langgraph.graph import StateGraph
workflow = StateGraph(State)
workflow.add_node("agent1", agent1_node)
workflow.add_node("agent2", agent2_node)
workflow.add_edge("agent1", "agent2")

GPT-5.2-Codex Decision Matrix

Task Duration > 1 hour?
  +-- Yes --> GPT-5.2-Codex
  +-- No
        +-- Multiple files affected?
        |     +-- Yes --> GPT-5.2-Codex
        |     +-- No
        |           +-- Security review needed?
        |                 +-- Yes --> GPT-5.2-Codex
        |                 +-- No --> GPT-5.2 (standard)

Recommendation Summary

  1. Default choice: LangGraph (most capable, production-proven)
  2. Fastest to prototype: CrewAI (minimal code, intuitive)
  3. OpenAI shops: OpenAI Agents SDK (native integration)
  4. Enterprise: Microsoft Agent Framework (compliance, Azure)
  5. Long-horizon coding: GPT-5.2-Codex (context compaction)
  6. Research: AG2 (open community, experimental features)

Common Mistakes

  • Mixing frameworks in one project (complexity explosion)
  • Ignoring framework maturity (beta vs production)
  • No fallback strategy (framework lock-in)
  • Overcomplicating simple tasks (use single agent)

Incorrect — using multi-agent framework for simple task:

# Overkill: LangGraph for single-step task
workflow = StateGraph(State)
workflow.add_node("summarize", summarize_node)
workflow.add_edge(START, "summarize")
workflow.add_edge("summarize", END)
app = workflow.compile()
result = await app.ainvoke({"text": "..."})

Correct — single LLM call for simple task:

# Simple task = single agent
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-5.2")
result = await llm.ainvoke("Summarize this text: ...")

Build role-based agent collaboration with CrewAI Flows, hierarchical crews, and structured outputs — HIGH

CrewAI Patterns (v1.8+)

Role-based multi-agent collaboration with Flows architecture, hierarchical crews, MCP tools, and async execution.

Hierarchical Crew

from crewai import Agent, Crew, Task, Process
from crewai.flow.flow import Flow, listen, start

# Manager coordinates the team
manager = Agent(
    role="Project Manager",
    goal="Coordinate team efforts and ensure project success",
    backstory="Experienced project manager skilled at delegation",
    allow_delegation=True,
    memory=True,
    verbose=True
)

# Specialist agents
researcher = Agent(
    role="Researcher",
    goal="Provide accurate research and analysis",
    backstory="Expert researcher with deep analytical skills",
    allow_delegation=False,
    verbose=True
)

writer = Agent(
    role="Writer",
    goal="Create compelling content",
    backstory="Skilled writer who creates engaging content",
    allow_delegation=False,
    verbose=True
)

# Manager-led task
project_task = Task(
    description="Create a comprehensive market analysis report",
    expected_output="Executive summary, analysis, recommendations",
    agent=manager
)

# Hierarchical crew
crew = Crew(
    agents=[manager, researcher, writer],
    tasks=[project_task],
    process=Process.hierarchical,
    manager_llm="gpt-5.2",
    memory=True,
    verbose=True
)

result = crew.kickoff()

Flows Architecture (1.8+)

Event-driven orchestration with state management:

from crewai.flow.flow import Flow, listen, start, router

class ResearchFlow(Flow):
    @start()
    def generate_topic(self):
        return "AI Safety"

    @listen(generate_topic)
    def research_topic(self, topic):
        return f"Research findings on {topic}"

    @router(research_topic)
    def route_result(self, result):
        if "sufficient" in result:
            return "success"
        return "retry"

    @listen("success")
    def handle_success(self):
        return "Workflow completed"

    @listen("retry")
    def handle_retry(self):
        return "Retrying..."

flow = ResearchFlow()
result = flow.kickoff()

Parallel Execution with and_/or_

from crewai.flow.flow import Flow, listen, start, and_, or_

class ParallelFlow(Flow):
    @start()
    def task_a(self):
        return "Result A"

    @start()
    def task_b(self):
        return "Result B"

    @listen(and_(task_a, task_b))
    def combine_results(self):
        """Triggers when BOTH complete"""
        return "Combined results"

Structured Output

from pydantic import BaseModel

class ReportOutput(BaseModel):
    title: str
    summary: str
    findings: list[str]
    confidence: float

task = Task(
    description="Analyze market trends",
    expected_output="Structured market analysis",
    agent=analyst,
    output_pydantic=ReportOutput
)

result = crew.kickoff()
report = result.pydantic

Task Guardrails

from crewai.tasks import TaskOutput

def validate_length(result: TaskOutput) -> tuple[bool, any]:
    if len(result.raw.split()) < 100:
        return (False, "Content too brief, expand analysis")
    return (True, result.raw)

task = Task(
    description="Write comprehensive analysis",
    expected_output="Detailed analysis (100+ words)",
    agent=writer,
    guardrail=validate_length,
    guardrail_max_retries=3
)

MCP Tool Support (1.8+)

from crewai import Agent

agent = Agent(
    role="Research Analyst",
    goal="Research and analyze information",
    backstory="Expert analyst",
    mcps=[
        "https://mcp.example.com/mcp?api_key=your_key",
        "crewai-amp:financial-data",
    ]
)
from crewai import Agent, Crew, Task, CrewBase, agent, task, crew

@CrewBase
class ResearchCrew:
    agents_config = 'config/agents.yaml'
    tasks_config = 'config/tasks.yaml'

    @agent
    def researcher(self) -> Agent:
        return Agent(config=self.agents_config['researcher'], tools=[search_tool])

    @task
    def research_task(self) -> Task:
        return Task(config=self.tasks_config['research'])

    @crew
    def crew(self) -> Crew:
        return Crew(agents=self.agents, tasks=self.tasks, process=Process.sequential)

result = ResearchCrew().crew().kickoff(inputs={"topic": "AI Safety"})

Best Practices

  1. Use Flows for complex multi-step workflows
  2. Prefer @CrewBase decorator-based definition
  3. Enable structured outputs with output_pydantic
  4. Add guardrails for output validation
  5. Use async_execution=True for independent tasks
  6. Role clarity: each agent has distinct, non-overlapping role
  7. One clear deliverable per task

Incorrect — sequential process without hierarchical manager:

crew = Crew(
    agents=[researcher, writer, reviewer],
    tasks=[research_task, write_task, review_task],
    process=Process.sequential  # No delegation, rigid order
)

Correct — hierarchical process with manager delegation:

crew = Crew(
    agents=[manager, researcher, writer, reviewer],
    tasks=[project_task],
    process=Process.hierarchical,  # Manager delegates dynamically
    manager_llm="gpt-5.2"
)

Generate multi-step plans before execution with replanning when conditions change — HIGH

Plan-and-Execute Pattern

Generate a multi-step plan first, then execute each step sequentially with the option to replan when conditions change.

Core Implementation

async def plan_and_execute(goal: str) -> str:
    """Create plan first, then execute steps."""
    # 1. Generate plan
    plan = await llm.chat([{
        "role": "user",
        "content": f"Create a step-by-step plan to: {goal}\n\nFormat as numbered list."
    }])

    steps = parse_plan(plan.content)
    results = []

    # 2. Execute each step
    for i, step in enumerate(steps):
        result = await execute_step(step, context=results)
        results.append({"step": step, "result": result})

        # 3. Check if replanning needed
        if should_replan(results):
            return await plan_and_execute(
                f"{goal}\n\nProgress so far: {results}"
            )

    # 4. Synthesize final answer
    return await synthesize(goal, results)

TypeScript ReAct Agent

interface AgentStep {
  thought: string;
  action?: string;
  actionInput?: unknown;
  observation?: string;
}

interface AgentResult {
  answer: string;
  steps: AgentStep[];
  totalCost: number;
  iterations: number;
}

export async function reactAgent(
  task: string,
  options: { maxIterations?: number; verbose?: boolean } = {}
): Promise<AgentResult> {
  const { maxIterations = 10, verbose = false } = options;
  const steps: AgentStep[] = [];
  let totalCost = 0;

  const systemPrompt = `You are an autonomous agent that can use tools.

Available tools:
${tools.map(t => `- ${t.name}: ${t.description}`).join('\n')}

Use this format:
Thought: [Your reasoning]
Action: [tool name]
Action Input: {"param": "value"}
Observation: [Tool result]

When done: Answer: [Final answer]`;

  const messages = [
    { role: 'system' as const, content: systemPrompt },
    { role: 'user' as const, content: task }
  ];

  for (let i = 0; i < maxIterations; i++) {
    const response = await openai.chat.completions.create({
      model: 'gpt-5.2', messages, temperature: 0.1
    });

    const content = response.choices[0].message.content!;
    totalCost += (response.usage!.total_tokens / 1000) * 0.01;

    if (content.includes('Answer:')) {
      const answer = content.split('Answer:')[1].trim();
      return { answer, steps, totalCost, iterations: i + 1 };
    }

    const parsed = parseAgentResponse(content);
    const step: AgentStep = { thought: parsed.thought };

    if (parsed.action && parsed.actionInput) {
      step.action = parsed.action;
      step.actionInput = JSON.parse(parsed.actionInput);
      step.observation = await executeTool(parsed.action, step.actionInput);
    }

    steps.push(step);
    messages.push({ role: 'assistant', content });
    if (step.observation) {
      messages.push({ role: 'user', content: `Observation: ${step.observation}` });
    }
  }

  throw new Error(`Agent exceeded max iterations (${maxIterations})`);
}

Function Calling Agent (Alternative)

export async function functionCallingAgent(task: string): Promise<AgentResult> {
  const steps: AgentStep[] = [];
  let totalCost = 0;

  const messages = [
    { role: 'system' as const, content: 'You are a helpful assistant with access to tools.' },
    { role: 'user' as const, content: task }
  ];

  const openaiTools = tools.map(tool => ({
    type: 'function' as const,
    function: { name: tool.name, description: tool.description, parameters: tool.parameters }
  }));

  let iteration = 0;
  while (iteration < 10) {
    const response = await openai.chat.completions.create({
      model: 'gpt-5.2', messages, tools: openaiTools
    });

    const message = response.choices[0].message;
    totalCost += (response.usage!.total_tokens / 1000) * 0.01;

    // No tool calls = final answer
    if (!message.tool_calls) {
      return { answer: message.content!, steps, totalCost, iterations: iteration + 1 };
    }

    messages.push(message as any);
    for (const toolCall of message.tool_calls) {
      const tool = tools.find(t => t.name === toolCall.function.name);
      if (!tool) continue;
      const args = JSON.parse(toolCall.function.arguments);
      const result = await tool.execute(args);
      steps.push({
        thought: `Calling ${toolCall.function.name}`,
        action: toolCall.function.name,
        actionInput: args,
        observation: JSON.stringify(result)
      });
      messages.push({ role: 'tool', tool_call_id: toolCall.id, content: JSON.stringify(result) });
    }
    iteration++;
  }

  throw new Error('Agent exceeded max iterations');
}

When to Choose Which Pattern

PatternBest ForTrade-off
ReActExploratory tasks, researchMore flexible, harder to control
Plan-ExecuteWell-defined goalsStructured, but replanning adds cost
Function CallingProduction APIsMost reliable, requires tool schemas
Self-CorrectionQuality-critical outputHigher cost, better quality

Incorrect — executing without planning first:

async def execute_goal(goal: str):
    steps = ["step1", "step2", "step3"]  # Hardcoded, no reasoning
    for step in steps:
        await execute_step(step)

Correct — LLM generates plan before execution:

async def plan_and_execute(goal: str):
    plan = await llm.chat([{"role": "user", "content": f"Create plan for: {goal}"}])
    steps = parse_plan(plan.content)
    for step in steps:
        result = await execute_step(step)
        if should_replan(result):
            return await plan_and_execute(f"{goal}\nProgress: {result}")

Implement ReAct pattern where agents reason, act via tools, observe, and iterate — HIGH

ReAct Pattern (Reasoning + Acting)

LLMs reason step-by-step, take actions via tools, observe results, and iterate until a final answer is reached.

ReAct Prompt Template

REACT_PROMPT = """You are an agent that reasons step by step.

For each step, respond with:
Thought: [your reasoning about what to do next]
Action: [tool_name(arg1, arg2)]
Observation: [you'll see the result here]

When you have the final answer:
Thought: I now have enough information
Final Answer: [your response]

Available tools: {tools}

Question: {question}
"""

Python Implementation

async def react_loop(question: str, tools: dict, max_steps: int = 10) -> str:
    """Execute ReAct reasoning loop."""
    history = REACT_PROMPT.format(tools=list(tools.keys()), question=question)

    for step in range(max_steps):
        response = await llm.chat([{"role": "user", "content": history}])
        history += response.content

        # Check for final answer
        if "Final Answer:" in response.content:
            return response.content.split("Final Answer:")[-1].strip()

        # Extract and execute action
        if "Action:" in response.content:
            action = parse_action(response.content)
            result = await tools[action.name](*action.args)
            history += f"\nObservation: {result}\n"

    return "Max steps reached without answer"

Self-Correction Loop

async def self_correcting_agent(task: str, max_retries: int = 3) -> str:
    """Agent that validates and corrects its own output."""
    for attempt in range(max_retries):
        response = await llm.chat([{"role": "user", "content": task}])

        # Self-validate
        validation = await llm.chat([{
            "role": "user",
            "content": f"""Validate this response for the task: {task}

Response: {response.content}

Check for:
1. Correctness - Is it factually accurate?
2. Completeness - Does it fully answer the task?
3. Format - Is it properly formatted?

If valid, respond: VALID
If invalid, respond: INVALID: [what's wrong and how to fix]"""
        }])

        if "VALID" in validation.content:
            return response.content

        # Correct based on feedback
        task = f"{task}\n\nPrevious attempt had issues: {validation.content}"

    return response.content  # Return best attempt

Memory Management

class AgentMemory:
    """Sliding window memory for agents."""

    def __init__(self, max_messages: int = 20):
        self.messages = []
        self.max_messages = max_messages
        self.summary = ""

    def add(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})
        if len(self.messages) > self.max_messages:
            self._compress()

    def _compress(self):
        """Summarize oldest messages."""
        old = self.messages[:10]
        self.messages = self.messages[10:]
        summary = summarize(old)
        self.summary = f"{self.summary}\n{summary}"

    def get_context(self) -> list:
        """Get messages with summary prefix."""
        context = []
        if self.summary:
            context.append({
                "role": "system",
                "content": f"Previous context summary: {self.summary}"
            })
        return context + self.messages

Key Decisions

DecisionRecommendation
Max steps5-15 (prevent infinite loops)
Temperature0.3-0.7 (balance creativity/focus)
Memory window10-20 messages
Validation frequencyEvery 3-5 steps

Common Mistakes

  • No step limit (infinite loops)
  • No memory management (context overflow)
  • No error recovery (crashes on tool failure)
  • Over-complex prompts (agent gets confused)

Incorrect — ReAct loop without step limit:

async def react_loop(question: str, tools: dict):
    history = f"Question: {question}"
    while True:  # Infinite loop risk
        response = await llm.chat([{"role": "user", "content": history}])
        if "Final Answer:" in response.content:
            return response.content

Correct — max_steps prevents infinite loops:

async def react_loop(question: str, tools: dict, max_steps: int = 10):
    history = f"Question: {question}"
    for step in range(max_steps):  # Bounded loop
        response = await llm.chat([{"role": "user", "content": history}])
        if "Final Answer:" in response.content:
            return response.content.split("Final Answer:")[-1]
    return "Max steps reached without answer"

Resolve agent disagreements through confidence scores, LLM arbitration, or majority voting — HIGH

Agent Debate & Conflict Resolution

Patterns for handling disagreements between agents and establishing communication channels.

Conflict Resolution

async def resolve_conflicts(findings: list[dict]) -> list[dict]:
    """When agents disagree, resolve by confidence or LLM."""
    conflicts = detect_conflicts(findings)

    if not conflicts:
        return findings

    for conflict in conflicts:
        # Option 1: Higher confidence wins
        winner = max(conflict.agents, key=lambda a: a.confidence)

        # Option 2: LLM arbitration
        resolution = await llm.chat([{
            "role": "user",
            "content": f"""Two agents disagree:

Agent A ({conflict.agent_a.name}): {conflict.agent_a.finding}
Agent B ({conflict.agent_b.name}): {conflict.agent_b.finding}

Which is more likely correct and why?"""
        }])

        conflict.resolution = parse_resolution(resolution.content)

    return apply_resolutions(findings, conflicts)

Structured Conflict Detection

async def resolve_agent_conflicts(
    findings: list[dict], llm: Any
) -> dict:
    """Resolve conflicts between agent outputs."""
    conflicts = []
    for i, f1 in enumerate(findings):
        for f2 in findings[i+1:]:
            if f1.get("recommendation") != f2.get("recommendation"):
                conflicts.append((f1, f2))

    if not conflicts:
        return {"status": "no_conflicts", "findings": findings}

    # LLM arbitration
    resolution = await llm.ainvoke(f"""
        Agents disagree. Determine best recommendation:
        Agent 1: {conflicts[0][0]}
        Agent 2: {conflicts[0][1]}
        Provide: winner, reasoning, confidence (0-1)
    """)
    return {"status": "resolved", "resolution": resolution}

Agent Communication Bus

class AgentBus:
    """Message passing between agents."""

    def __init__(self):
        self.messages = []
        self.subscribers = {}

    def publish(self, from_agent: str, message: dict):
        """Broadcast message to all agents."""
        msg = {"from": from_agent, "data": message, "ts": time.time()}
        self.messages.append(msg)
        for callback in self.subscribers.values():
            callback(msg)

    def subscribe(self, agent_id: str, callback):
        """Register agent to receive messages."""
        self.subscribers[agent_id] = callback

    def get_history(self, agent_id: str = None) -> list:
        """Get message history, optionally filtered."""
        if agent_id:
            return [m for m in self.messages if m["from"] == agent_id]
        return self.messages

Resolution Strategies

StrategyWhen to UseTrade-off
Confidence-basedAgents provide confidence scoresFast but requires calibrated scores
LLM arbitrationComplex disagreementsHigher quality but adds LLM cost
Majority voting3+ agents on same questionSimple but requires odd count
Weighted consensusAgents have different expertiseBest for specialized teams
Human-in-the-loopHigh-stakes decisionsMost reliable but slowest

Common Mistakes

  • No timeout per agent (one slow agent blocks all)
  • No error isolation (one failure crashes workflow)
  • Over-coordination (too much overhead)
  • Using Agent Teams for simple sequential work (use Task tool)
  • Broadcasting when a direct message suffices (wastes tokens)

Incorrect — no conflict resolution strategy:

async def multi_agent_analysis(task: str):
    results = await asyncio.gather(agent1(task), agent2(task))
    return results  # Return conflicting results without resolution

Correct — LLM arbitration resolves conflicts:

async def multi_agent_analysis(task: str):
    results = await asyncio.gather(agent1(task), agent2(task))
    if results[0] != results[1]:  # Conflict detected
        resolution = await llm.chat([{
            "role": "user",
            "content": f"Agent 1: {results[0]}\nAgent 2: {results[1]}\nWhich is correct?"
        }])
        return resolution.content
    return results[0]

Coordinate specialist agents through a central supervisor with parallel execution and timeouts — CRITICAL

Supervisor Pattern

Central coordinator that routes tasks to specialist agents, supports parallel and sequential execution, and aggregates results.

Fan-Out/Fan-In

async def multi_agent_analysis(content: str) -> dict:
    """Fan-out to specialists, fan-in to synthesize."""
    agents = [
        ("security", security_agent),
        ("performance", performance_agent),
        ("code_quality", quality_agent),
        ("architecture", architecture_agent),
    ]

    # Fan-out: Run all agents in parallel
    tasks = [agent(content) for _, agent in agents]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    # Filter successful results
    findings = [
        {"agent": name, "result": result}
        for (name, _), result in zip(agents, results)
        if not isinstance(result, Exception)
    ]

    # Fan-in: Synthesize findings
    return await synthesize_findings(findings)

Supervisor with Routing

class Supervisor:
    """Central coordinator that routes to specialists."""

    def __init__(self, agents: dict):
        self.agents = agents  # {"security": agent, "performance": agent}
        self.completed = []

    async def run(self, task: str) -> dict:
        """Route task through appropriate agents."""
        # 1. Determine which agents to use
        plan = await self.plan_routing(task)

        # 2. Execute in dependency order
        results = {}
        for agent_name in plan.execution_order:
            if plan.can_parallelize(agent_name):
                batch = plan.get_parallel_batch(agent_name)
                batch_results = await asyncio.gather(*[
                    self.agents[name](task, context=results)
                    for name in batch
                ])
                results.update(dict(zip(batch, batch_results)))
            else:
                results[agent_name] = await self.agents[agent_name](
                    task, context=results
                )

        return results

    async def plan_routing(self, task: str) -> RoutingPlan:
        """Use LLM to determine agent routing."""
        response = await llm.chat([{
            "role": "user",
            "content": f"""Task: {task}

Available agents: {list(self.agents.keys())}

Which agents should handle this task?
What order? Can any run in parallel?"""
        }])
        return parse_routing_plan(response.content)

Supervisor-Worker with Timeout

class SupervisorCoordinator:
    """Central supervisor that routes tasks to worker agents."""

    def __init__(self, workers: dict[str, Agent]):
        self.workers = workers
        self.execution_log: list[dict] = []

    async def route_and_execute(
        self, task: str, required_agents: list[str], parallel: bool = True
    ) -> dict[str, Any]:
        context = {"task": task, "results": {}}

        if parallel:
            tasks = [self._run_worker(name, task, context) for name in required_agents]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            return dict(zip(required_agents, results))
        else:
            for name in required_agents:
                context["results"][name] = await self._run_worker(name, task, context)
            return context["results"]

    async def _run_worker(self, name: str, task: str, context: dict) -> dict:
        """Execute single worker with timeout."""
        try:
            result = await asyncio.wait_for(
                self.workers[name].run(task, context), timeout=30.0
            )
            self.execution_log.append({"agent": name, "status": "success", "result": result})
            return result
        except asyncio.TimeoutError:
            return {"error": f"{name} timed out"}

CC Agent Teams (CC 2.1.33+)

CC 2.1.33 introduces native Agent Teams with peer-to-peer messaging and mesh topology.

Star vs Mesh Topology

Star (Task tool):              Mesh (Agent Teams):
      Lead                           Lead (delegate)
     /||\                          /  |  \
    / || \                        /   |   \
   A  B  C  D                   A <-> B <-> C
   (no cross-talk)              (peer messaging)

Dual-Mode Decision Tree

Complexity Assessment:
+-- Score < 3.0  -> Task tool subagents (cheaper, simpler)
+-- Score 3.0-3.5 -> User choice (recommend Teams for cross-cutting)
+-- Score > 3.5  -> Agent Teams (if CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS=1)

Team Formation

# 1. Create team with shared task list
TeamCreate(team_name="feature-auth", description="User auth implementation")

# 2. Create tasks in shared list
TaskCreate(subject="Design API schema", description="...")
TaskCreate(subject="Build React components", description="...", addBlockedBy=["1"])

# 3. Spawn teammates
Task(prompt="You are the backend architect...",
     team_name="feature-auth", name="backend-dev",
     subagent_type="backend-system-architect")

Peer Messaging

# Direct message (default)
SendMessage(type="message", recipient="frontend-dev",
  content="API contract: GET /users/:id -> {id, name, email}",
  summary="API contract ready")

# Broadcast (expensive -- use sparingly)
SendMessage(type="broadcast",
  content="Auth header format changed to Bearer",
  summary="Breaking auth change")

Cost Comparison

ScenarioTask ToolAgent TeamsRatio
3-agent review~150K tokens~400K tokens2.7x
8-agent feature~500K tokens~1.2M tokens2.4x
6-agent research~300K tokens~800K tokens2.7x

Key Decisions

DecisionRecommendation
Agent count3-8 specialists
ParallelismParallelize independent agents
Worker timeout30s default
CommunicationShared state, message bus, or SendMessage (CC 2.1.33+)
TopologyTask tool (star) for simple; Agent Teams (mesh) for complex

Incorrect — sequential execution of independent agents:

async def analyze(content: str):
    security_result = await security_agent(content)  # Wait
    perf_result = await performance_agent(content)   # Wait
    quality_result = await quality_agent(content)    # Wait
    return [security_result, perf_result, quality_result]

Correct — parallel fan-out for independent agents:

async def analyze(content: str):
    tasks = [
        security_agent(content),
        performance_agent(content),
        quality_agent(content)
    ]
    results = await asyncio.gather(*tasks)  # Run in parallel
    return results

Synthesize parallel agent outputs into coherent actionable results with quality metrics — HIGH

Result Synthesis

Combine outputs from multiple parallel agents into coherent, actionable results.

Synthesis Pattern

async def synthesize_findings(findings: list[dict]) -> dict:
    """Combine multiple agent outputs into coherent result."""
    # Group by category
    by_category = {}
    for f in findings:
        cat = f.get("category", "general")
        by_category.setdefault(cat, []).append(f)

    # Synthesize each category
    synthesis = await llm.chat([{
        "role": "user",
        "content": f"""Synthesize these agent findings into a coherent summary:

{json.dumps(by_category, indent=2)}

Output format:
- Executive summary (2-3 sentences)
- Key findings by category
- Recommendations
- Confidence score (0-1)"""
    }])

    return parse_synthesis(synthesis.content)

Multi-Agent Collaboration (TypeScript)

export async function multiAgentCollaboration(
  task: string,
  agents: Agent[]
): Promise<AgentResult> {
  const steps: AgentStep[] = [];
  let totalCost = 0;

  // 1. Coordinator plans the task
  const planResponse = await openai.chat.completions.create({
    model: 'gpt-5.2',
    messages: [
      {
        role: 'system',
        content: `You are a coordinator. Break down tasks and assign to agents:
${agents.map(a => `- ${a.name}: ${a.role}`).join('\n')}

Provide a numbered plan with agent assignments.`
      },
      { role: 'user', content: `Task: ${task}\n\nProvide a step-by-step plan.` }
    ]
  });

  const plan = planResponse.choices[0].message.content!;
  steps.push({ thought: 'Coordinator planning', observation: plan });

  // 2. Execute agent subtasks in parallel
  const agentResults = await Promise.all(
    agents.map(async (agent) => {
      const response = await openai.chat.completions.create({
        model: 'gpt-5.2',
        messages: [
          { role: 'system', content: agent.systemPrompt },
          { role: 'user', content: `Task: ${task}\n\nPlan:\n${plan}\n\nComplete your part.` }
        ]
      });
      return { agent: agent.name, result: response.choices[0].message.content! };
    })
  );

  // 3. Synthesize results
  const synthesisResponse = await openai.chat.completions.create({
    model: 'gpt-5.2',
    messages: [
      { role: 'system', content: 'Synthesize agent results into a coherent final answer.' },
      { role: 'user', content: `Task: ${task}\n\nAgent Results:\n${JSON.stringify(agentResults, null, 2)}` }
    ]
  });

  return {
    answer: synthesisResponse.choices[0].message.content!,
    steps,
    totalCost,
    iterations: agents.length + 2
  };
}

Aggregation Strategies

Comparative (Default)

Compare metrics across all agents:

{
  "quality_comparison": {
    "security_agent": {"score": 0.92, "findings": 3},
    "perf_agent": {"score": 0.88, "findings": 5}
  },
  "consensus_items": ["Use parameterized queries", "Add rate limiting"],
  "disagreements": []
}

Pattern Extraction

Find common patterns:

{
  "success_patterns": ["Caching strategy effective", "Batch size 50+ preferred"],
  "failure_patterns": ["Timeout at >5000 items per batch"]
}

Recommendation Engine

Generate actionable recommendations:

{
  "priority_1": "Fix SQL injection in auth module (security + quality agree)",
  "priority_2": "Add connection pooling (performance agent)",
  "confidence": 0.91
}

Cost Optimization

  • Batch similar tasks to reduce overhead
  • Cache agent results by task hash
  • Use cheaper models for simple agents
  • Parallelize independent agents always
  • Max parallel agents: 8

Orchestration Checklist

  • Define agent responsibilities (single responsibility per agent)
  • Plan communication patterns (shared state, message bus, or SendMessage)
  • Set coordination strategy (central orchestrator with task queue)
  • Design failure handling (timeout per agent, error isolation)
  • Agent health checks and performance metrics

Incorrect — returning raw agent outputs without synthesis:

async def multi_agent_analysis(task: str):
    results = await asyncio.gather(agent1(task), agent2(task), agent3(task))
    return results  # Returns list of disconnected findings

Correct — synthesizing results into coherent output:

async def multi_agent_analysis(task: str):
    results = await asyncio.gather(agent1(task), agent2(task), agent3(task))
    synthesis = await llm.chat([{
        "role": "user",
        "content": f"Synthesize these findings: {json.dumps(results)}"
    }])
    return parse_synthesis(synthesis.content)

Test skills across three parallel scenarios with progressive difficulty and synchronized execution — MEDIUM

Multi-Scenario Orchestrator

Run a single skill across 3 parallel scenarios (simple/medium/complex) with synchronized execution and progressive difficulty.

Core Pattern

+---------------------------------------------------------------------+
|                   MULTI-SCENARIO ORCHESTRATOR                        |
+---------------------------------------------------------------------+
|  [Coordinator] --+--> [Scenario 1: Simple]       (Easy)             |
|       ^          |      +--> [Skill Instance 1]                     |
|       |          +--> [Scenario 2: Medium]       (Intermediate)     |
|       |          |      +--> [Skill Instance 2]                     |
|       |          +--> [Scenario 3: Complex]      (Advanced)         |
|       |                 +--> [Skill Instance 3]                     |
|       |                                                             |
|   [State Manager] <---- All instances report progress               |
|   [Aggregator] --> Cross-scenario synthesis                         |
+---------------------------------------------------------------------+

When to Use

ScenarioExample
Skill demosShow /ork:implement on simple, medium, complex tasks
Progressive testingValidate skill scales with complexity
Comparative analysisHow does approach differ by difficulty?
Training/tutorialsShow skill progression from easy to hard

LangGraph Implementation

from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send

async def scenario_supervisor(state: ScenarioOrchestratorState) -> list[Command]:
    """Route to all 3 scenarios in parallel."""
    for scenario_id in ["simple", "medium", "complex"]:
        state[f"progress_{scenario_id}"] = ScenarioProgress(
            scenario_id=scenario_id, status="pending",
            start_time_ms=int(time.time() * 1000)
        )

    return [
        Send("scenario_worker", {"scenario_id": "simple", **state}),
        Send("scenario_worker", {"scenario_id": "medium", **state}),
        Send("scenario_worker", {"scenario_id": "complex", **state}),
    ]

async def scenario_worker(state: ScenarioOrchestratorState) -> dict:
    """Execute one scenario."""
    scenario_id = state.get("scenario_id")
    progress = state[f"progress_{scenario_id}"]
    scenario_def = state[f"scenario_{scenario_id}"]

    progress.status = "running"
    try:
        result = await execute_skill_with_milestones(
            skill_name=state["skill_name"],
            scenario_def=scenario_def,
            progress=progress, state=state
        )
        progress.status = "complete"
        progress.elapsed_ms = int(time.time() * 1000) - progress.start_time_ms
        return {f"progress_{scenario_id}": progress}
    except Exception as e:
        progress.status = "failed"
        progress.errors.append({"message": str(e)})
        return {f"progress_{scenario_id}": progress}

async def scenario_aggregator(state: ScenarioOrchestratorState) -> dict:
    """Collect all results and synthesize findings."""
    aggregated = {
        "orchestration_id": state["orchestration_id"],
        "metrics": {},
        "comparison": {},
        "recommendations": []
    }

    for scenario_id in ["simple", "medium", "complex"]:
        progress = state[f"progress_{scenario_id}"]
        aggregated["metrics"][scenario_id] = {
            "elapsed_ms": progress.elapsed_ms,
            "items_processed": progress.items_processed,
            "quality_scores": progress.quality_scores,
        }

    return {"final_results": aggregated}

# Build graph
graph = StateGraph(ScenarioOrchestratorState)
graph.add_node("supervisor", scenario_supervisor)
graph.add_node("scenario_worker", scenario_worker)
graph.add_node("aggregator", scenario_aggregator)
graph.add_edge(START, "supervisor")
graph.add_edge("scenario_worker", "aggregator")
graph.add_edge("aggregator", END)
app = graph.compile(checkpointer=checkpointer)

Skill-Agnostic Template

from abc import ABC, abstractmethod

class SkillOrchestrator(ABC):
    """Abstract orchestrator for any user-invocable skill."""

    def __init__(self, skill_name: str, skill_version: str):
        self.skill_name = skill_name
        self.skill_version = skill_version

    @abstractmethod
    async def invoke_skill(self, input_data: list[dict], scenario_params: dict) -> dict:
        """Invoke your skill on input data."""
        pass

    @abstractmethod
    def get_scenario_configs(self) -> dict[str, dict]:
        """Return configs for simple/medium/complex."""
        pass

    @abstractmethod
    def calculate_quality_metrics(self, results: list[dict], metric_names: list[str]) -> dict:
        """Calculate quality metrics from results."""
        pass

    async def orchestrate(self, orchestration_id: str) -> dict:
        """Run all 3 scenarios in parallel and aggregate."""
        results = await asyncio.gather(
            self.run_scenario("simple", orchestration_id),
            self.run_scenario("medium", orchestration_id),
            self.run_scenario("complex", orchestration_id),
            return_exceptions=True
        )
        return self.aggregate_results(results)

Difficulty Scaling

LevelComplexityInput SizeTime BudgetQuality
Simple1xSmall (10-100)30sBasic
Medium3xMedium (30-300)90sGood
Complex8xLarge (80-800)300sExcellent

Output Example

{
  "orchestration_id": "demo-001",
  "quality_comparison": {
    "simple": 0.92, "medium": 0.88, "complex": 0.84
  },
  "scaling_analysis": {
    "time_per_item_ms": {
      "simple": 0.012, "medium": 0.012, "complex": 0.032
    },
    "recommendation": "Sublinear scaling up to 3x, superlinear at 8x"
  }
}

Common Mistakes

  • Sequential instead of parallel (defeats purpose)
  • No synchronization (results appear disjointed)
  • Unclear difficulty scaling (differ in scale, not approach)
  • Missing aggregation (individual results lack comparative insights)

Incorrect — running scenarios sequentially:

async def orchestrate(skill_name: str):
    simple = await run_scenario("simple", skill_name)  # Wait
    medium = await run_scenario("medium", skill_name)  # Wait
    complex = await run_scenario("complex", skill_name) # Wait
    return [simple, medium, complex]

Correct — parallel execution of all scenarios:

async def orchestrate(skill_name: str):
    results = await asyncio.gather(
        run_scenario("simple", skill_name),
        run_scenario("medium", skill_name),
        run_scenario("complex", skill_name)
    )
    return aggregate_results(results)

Synchronize milestones, scale difficulty, and recover from failures across multi-scenario orchestration — MEDIUM

Scenario Routing & Synchronization

Milestone synchronization modes, difficulty scaling strategies, checkpointing, and failure recovery for multi-scenario orchestration.

Synchronization Modes

ModeDescriptionUse When
Free-runningAll run independentlyDemo videos, production
Milestone-syncWait at 30%, 70%, 100%Comparative analysis
Lock-stepAll proceed togetherTraining, tutorials

Milestone Synchronization

async def synchronize_at_milestone(
    milestone_pct: int,
    state: ScenarioOrchestratorState,
    timeout_seconds: int = 30
) -> bool:
    """Wait for all scenarios to reach milestone."""
    start = time.time()

    while time.time() - start < timeout_seconds:
        simple_at = milestone_pct in state["progress_simple"].milestones_reached
        medium_at = milestone_pct in state["progress_medium"].milestones_reached
        complex_at = milestone_pct in state["progress_complex"].milestones_reached

        if simple_at and medium_at and complex_at:
            print(f"[SYNC] All scenarios reached {milestone_pct}%")
            return True

        if any(state[f"progress_{s}"].status == "failed"
               for s in ["simple", "medium", "complex"]):
            print(f"[SYNC] A scenario failed, proceeding without sync")
            return False

        await asyncio.sleep(0.5)

    print(f"[SYNC] Timeout at {milestone_pct}%, proceeding")
    return False

Input Scaling Strategies

Linear Scaling (I/O-bound skills)

Simple:  100 items
Medium:  300 items (3x)
Complex: 800 items (8x)
Time: O(n) -- expected medium ~3x simple

Adaptive Scaling (per-skill tuning)

SKILL_SCALING_PROFILES = {
    "performance-testing": {
        "scaling": "linear",
        "simple": 10, "medium": 30, "complex": 80
    },
    "security-scanning": {
        "scaling": "sublinear",
        "simple": 20, "medium": 100, "complex": 500
    },
    "data-transformation": {
        "scaling": "quadratic",
        "simple": 100, "medium": 200, "complex": 300
    }
}

Complexity Detection

# Calculate actual time complexity
simple_tpi = simple_time / simple_size    # time per item
medium_tpi = medium_time / medium_size
complex_tpi = complex_time / complex_size
ratio = complex_tpi / simple_tpi  # >2 = superlinear

Failure Recovery

One Scenario Fails (Independent)

try:
    result = await invoke_skill(batch)
except Exception as e:
    progress.errors.append({"message": str(e), "batch_index": i})
    # Don't raise -- let other scenarios continue

Timeout Handling

async def invoke_skill_with_timeout(skill, input_data, timeout_seconds):
    try:
        return await asyncio.wait_for(
            invoke_skill(skill, input_data),
            timeout=timeout_seconds
        )
    except asyncio.TimeoutError:
        return {
            "processed": len(input_data),
            "results": [],
            "error": "timeout",
            "quality_score": 0.0,
        }

All Scenarios Fail (Systematic)

async def orchestrator_with_recovery(initial_state):
    result = await app.ainvoke(initial_state)

    all_failed = all(
        state[f"progress_{s}"].status == "failed"
        for s in ["simple", "medium", "complex"]
    )

    if all_failed:
        # 1. Reduce resource contention
        # 2. Retry with smaller batches
        # 3. Or abort with diagnostic info
        return retry_with_reduced_load(initial_state)

Checkpointing

Scenario-Level Checkpoints

INSERT INTO scenario_checkpoints (
    orchestration_id, scenario_id, milestone_pct, elapsed_ms, state_snapshot
) VALUES (
    'demo-001', 'medium', 30, 3200, '{"items": 90, "results": [...]}'
);

Full-State Snapshots

async def checkpoint_full_state(state: ScenarioOrchestratorState):
    checkpoint_data = {
        "orchestration_id": state["orchestration_id"],
        "timestamp": datetime.now().isoformat(),
        "progress_simple": state["progress_simple"].to_dict(),
        "progress_medium": state["progress_medium"].to_dict(),
        "progress_complex": state["progress_complex"].to_dict(),
    }
    await db.insert("full_state_checkpoints", checkpoint_data)

Quality Metrics Framework

Functional Metrics (per-skill)

{
    "performance-testing": {
        "latency_p95_ms": {"target": "<500ms", "weight": 0.5},
        "error_rate": {"target": "<1%", "weight": 0.5},
    },
    "security-scanning": {
        "vulnerabilities_found": {"target": ">0", "weight": 0.3},
        "coverage_pct": {"target": "100%", "weight": 0.7},
    }
}

Comparative Metrics

{
    "quality_scaling": {
        "formula": "complex_quality / simple_quality",
        "expected": 1.0,
        "acceptable": ">0.8"
    },
    "time_efficiency": {
        "formula": "simple_tpi / complex_tpi",
        "expected": 1.0,
        "acceptable": ">0.5"
    }
}

Multi-Host Execution

For greater parallelism, run scenarios on different machines sharing the same database:

# Host 1: Coordinator + Simple
python coordinator.py

# Host 2: Medium (different machine, same DB)
export DATABASE_URL="postgresql://user:pass@coordinator-host/orchestkit"
export SCENARIO_ID=medium
python run_scenario.py

# Host 3: Complex
export SCENARIO_ID=complex
python run_scenario.py

Key Decisions

DecisionRecommendation
Synchronization modeFree-running with checkpoints
Scenario countAlways 3: simple, medium, complex
Input scaling1x, 3x, 8x (exponential)
Time budgets30s, 90s, 300s
Checkpoint frequencyEvery milestone + completion

Incorrect — no timeout on skill invocation:

async def run_scenario(scenario_id: str, skill_name: str):
    result = await invoke_skill(skill_name, get_input(scenario_id))  # Hangs forever
    return result

Correct — timeout prevents infinite hangs:

async def run_scenario(scenario_id: str, skill_name: str):
    timeout = {"simple": 30, "medium": 90, "complex": 300}[scenario_id]
    try:
        result = await asyncio.wait_for(
            invoke_skill(skill_name, get_input(scenario_id)),
            timeout=timeout
        )
        return result
    except asyncio.TimeoutError:
        return {"error": "timeout", "scenario_id": scenario_id}

References (11)

Architectural Patterns

Architectural Patterns for Multi-Scenario Orchestration

Deep patterns and design decisions for production multi-scenario demos.

Pattern 1: Three-Tier Synchronization

Tier 1: Free-Running (Baseline)

Each scenario runs independently, no blocking.

Time →
─────────────────────────────────────────────────┐
Simple   ███████████████ Complete at 1.2s        │
         └─────────────────────────────────────┘ │

Medium      ██████████████████████░░░░ In progress at 3.5s
            └──────────────────────────────────┘ │

Complex      ██████████░░░░░░░░░░░░░░░░ In progress at 25.7s
             └─────────────────────────────────────┘ │
─────────────────────────────────────────────────┘

Advantages:

  • Realistic—shows natural skill behavior
  • Tolerates slowness in one scenario
  • Lower synchronization overhead

Implementation:

# Each scenario runs its own event loop
# No waiting between scenarios
# Checkpoints are independent

Tier 2: Milestone Synchronization

Scenarios pause at checkpoints (30%, 50%, 70%, 90%) to allow others to catch up.

Time →
─────────────────────────────────────────────────┐
Simple   ███ PAUSE ███ PAUSE ███ Complete        │
         └───┬────────┬────────┬────────────────┘│
                │      │      │                  │
Medium      ██ PAUSE ██ PAUSE ██████░░░░ In-prog │
            └────┬────────┬──────────────────────┘│
                 │      │                        │
Complex      █ PAUSE █ PAUSE ██░░░░░░░░░░░░░░░ │
             └──┬────────┬──────────────────────────┘│
─────────────────────────────────────────────────┘

Advantages:

  • Synchronized checkpoints for state capture
  • Better for demos (shows progression together)
  • Easier to explain ("all at 30%")

Implementation:

async def synchronize_at_milestone(milestone_pct, timeout_seconds=60):
    while time.time() - start < timeout_seconds:
        if all_scenarios_at_milestone:
            return True
        await asyncio.sleep(0.5)

    # Timeout: proceed anyway (don't block forever)
    return False

Tier 3: Lock-Step (Strict Synchronization)

All scenarios advance together, slowest determines pace.

Time →
─────────────────────────────────────────────────┐
Step 1:  Simple   ███ | Medium   ███ | Complex █  │
         └────────────────────────────────────────┘│
Step 2:  Simple   ███ | Medium   ███ | Complex █  │
         └────────────────────────────────────────┘│
Step 3:  All complete together                    │
─────────────────────────────────────────────────┘

Advantages:

  • Perfect synchronization for demos
  • Easy to explain ("all scenarios complete together")

Disadvantages:

  • Complex scenario blocks others (1-2 min delays)
  • Unrealistic performance representation

Recommendation: Use Tier 1 (Free-Running) for production, Tier 2 (Milestone) for interactive demos.


Pattern 2: Input Scaling Strategies

Strategy A: Linear Scaling (Additive)

Simple:  100 items
Medium:  100 + 200 = 300 items (+200%)
Complex: 300 + 500 = 800 items (+267%)

Time complexity: O(n)
Expected medium time ≈ 3x simple
Expected complex time ≈ 8x simple

Best for: I/O-bound skills (API calls, database queries)

Strategy B: Exponential Scaling (Multiplicative)

Simple:  100 items
Medium:  100 × 3 = 300 items (3x)
Complex: 100 × 8 = 800 items (8x)

Time complexity: O(n) or O(n log n)
Expected medium time ≈ 3x simple (if linear)
Expected complex time ≈ 8x simple (if linear)

Best for: Batch processing, LLM calls

Strategy C: Quadratic Scaling

Simple:  100 items
Medium:  300 items (3x)
Complex: 800 items (8x)

But if algorithm is O(n²):
Expected medium time ≈ 9x simple
Expected complex time ≈ 64x simple

Detection:

# Calculate actual time complexity
simple_time = 1.2  # seconds
medium_time = 3.5
complex_time = 25.7

simple_size = 100
medium_size = 300
complex_size = 800

# Time per item
simple_tpi = simple_time / simple_size        # 0.012 s/item
medium_tpi = medium_time / medium_size        # 0.012 s/item
complex_tpi = complex_time / complex_size     # 0.032 s/item

# Ratio indicates scaling behavior
ratio = complex_tpi / simple_tpi  # 2.67 → O(n log n) or worse

Strategy D: Adaptive Scaling

Choose scaling based on skill characteristics:

SKILL_SCALING_PROFILES = {
    "performance-testing": {
        "scaling": "linear",
        "simple": 10,
        "medium": 30,
        "complex": 80
    },
    "security-scanning": {
        "scaling": "sublinear",  # Gets faster with caching
        "simple": 20,
        "medium": 100,
        "complex": 500
    },
    "data-transformation": {
        "scaling": "quadratic",  # O(n²) worst case
        "simple": 100,
        "medium": 200,  # Limit increase
        "complex": 300
    }
}

Pattern 3: Quality Metrics Framework

Metric Category 1: Functional Metrics

What the skill is designed to measure:

{
    "performance-testing": {
        "latency_p95_ms": {"target": "<500ms", "weight": 0.5},
        "error_rate": {"target": "<1%", "weight": 0.5},
    },
    "security-scanning": {
        "vulnerabilities_found": {"target": ">0", "weight": 0.3},
        "coverage_pct": {"target": "100%", "weight": 0.7},
    }
}

Metric Category 2: Comparative Metrics

How scenarios compare:

{
    "quality_scaling": {
        "formula": "complex_quality / simple_quality",
        "expected": 1.0,  # Expect no degradation
        "acceptable": ">0.8"
    },
    "time_efficiency": {
        "formula": "simple_tpi / complex_tpi",
        "expected": 1.0,  # Linear scaling
        "acceptable": ">0.5"
    },
    "resource_efficiency": {
        "formula": "quality_per_second_complex / quality_per_second_simple",
        "expected": 0.8,  # Complex less efficient (higher overhead)
        "acceptable": ">0.5"
    }
}

Metric Category 3: Stability Metrics

Consistency across scenarios:

{
    "quality_variance": {
        "formula": "stdev(simple_quality, medium_quality, complex_quality)",
        "expected": "<0.05",
        "interpretation": "Low variance = stable algorithm"
    },
    "error_consistency": {
        "formula": "all_scenarios_error_rate < threshold",
        "expected": True,
        "interpretation": "Same error rate across loads"
    }
}

Pattern 4: Failure Modes & Recovery

Failure Mode 1: One Scenario Fails (Independent)

Simple   ███████████████ Complete ✓
Medium   ██████████ FAILED ✗
Complex  ███ In progress...

Recovery:
• Medium stores checkpoint at failure point
• Can be restarted independently
• Simple/Complex continue
• Aggregator combines partial results

Implementation:

# Isolate failures
try:
    result = await invoke_skill(batch)
except Exception as e:
    progress.errors.append({"message": str(e), "batch_index": i})
    # Don't raise—let other scenarios continue

# Report but don't block
if progress.errors:
    print(f"⚠ {scenario_id} had {len(progress.errors)} errors")

Failure Mode 2: All Scenarios Fail (Systematic)

Simple   ███ FAILED ✗
Medium   ███ FAILED ✗
Complex  ███ FAILED ✗

Possible causes:
• Skill has a bug
• Resource limit exceeded
• Network/database unavailable

Recovery:

async def orchestrator_with_recovery(initial_state):
    """Attempt recovery if all scenarios fail."""

    result = await app.ainvoke(initial_state)

    all_failed = all(
        state[f"progress_{s}"].status == "failed"
        for s in ["simple", "medium", "complex"]
    )

    if all_failed:
        print("All scenarios failed—attempting recovery...")

        # 1. Reduce resource contention
        # 2. Retry with smaller batches
        # 3. Or abort with diagnostic info

        return retry_with_reduced_load(initial_state)

Failure Mode 3: Timeout (Skill Takes Too Long)

Simple   ███████████ Complete in 1.2s (budget: 30s) ✓
Medium   ██████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ TIMEOUT ✗ (budget: 90s)
Complex  █░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ IN PROGRESS

Recovery:
• Medium: Cancel at 90s, return partial results
• Complex: Continue until 300s timeout

Implementation:

async def invoke_skill_with_timeout(skill, input_data, timeout_seconds):
    try:
        return await asyncio.wait_for(
            invoke_skill(skill, input_data),
            timeout=timeout_seconds
        )
    except asyncio.TimeoutError:
        print(f"Timeout after {timeout_seconds}s, returning partial results")
        return {
            "processed": len(input_data),
            "results": [],
            "error": "timeout",
            "quality_score": 0.0,
        }

Pattern 5: Observability & Monitoring

Monitoring Strategy 1: Real-Time Progress

# Stream progress from PostgreSQL checkpoints
async def monitor_real_time():
    while orchestration_running:
        progress = await db.query("""
            SELECT scenario_id, MAX(progress_pct), MAX(elapsed_ms)
            FROM scenario_checkpoints
            WHERE orchestration_id = $1
            GROUP BY scenario_id
        """, orchestration_id)

        for scenario_id, progress_pct, elapsed_ms in progress:
            bar = "█" * int(progress_pct / 5) + "░" * (20 - int(progress_pct / 5))
            print(f"{scenario_id}: │{bar}{progress_pct:.0f}%")

        await asyncio.sleep(2)

Monitoring Strategy 2: Comparative Timeline

                   0s      10s      20s      30s      40s
Simple:            |████████|                            (complete)
Medium:            |              |██████████|           (in progress)
Complex:           |                   |████|            (in progress)
                   |─────────────────────────────────────|

Milestones:
Simple:  ✓ 30% @ 0.4s   ✓ 50% @ 0.6s   ✓ 70% @ 0.9s   ✓ 100% @ 1.2s
Medium:  ✓ 30% @ 3.2s   ✓ 50% @ 5.1s   ⏳ 70% @ 8.3s  ⏳ In progress
Complex: ✓ 30% @ 9.1s   ⏳ 50% in progress

Monitoring Strategy 3: Quality Trend

Quality Score (0-1)
1.0 ├─────────────────
    │ Simple    ████░░░░░░
0.8 ├───────────────────
    │ Medium      ██████░░
0.6 ├───────────────────
    │ Complex       ███░░░░
0.4 ├───────────────────

0.2 ├───────────────────
    └─────────────────────
    0%  30%  50%  70%  100%
         Progress

Pattern 6: Result Aggregation Strategies

Aggregation Type 1: Comparative (Default)

Compare metrics across all 3 scenarios:

{
  "quality_comparison": {
    "simple": {"latency_p95": 120, "score": 0.92},
    "medium": {"latency_p95": 145, "score": 0.88},
    "complex": {"latency_p95": 185, "score": 0.84}
  },
  "scaling_analysis": {
    "quality_degradation": "8% from simple to complex",
    "time_growth": "linear (as expected)",
    "recommendation": "Quality acceptable, can scale to complex"
  }
}

Aggregation Type 2: Pattern Extraction

Find common patterns across scenarios:

{
  "success_patterns": [
    "Caching strategy effective at all scales",
    "Batch size of 50+ preferred",
    "Memory usage stays below 512MB"
  ],
  "failure_patterns": [
    "Timeout at >5000 items per batch",
    "Quality drops with skewed data distribution"
  ]
}

Aggregation Type 3: Recommendation Engine

Suggest optimal difficulty for production:

{
  "recommended_difficulty": "medium",
  "reasoning": [
    "Simple: Insufficient load to detect bottlenecks",
    "Medium: Good balance of realism and speed",
    "Complex: Takes too long for frequent testing (300s)"
  ],
  "production_scaling": {
    "estimated_items_per_request": 50,
    "estimated_response_time_ms": 450,
    "required_concurrency_support": 10
  }
}

Pattern 7: Checkpointing Strategy

Checkpoint Type 1: Scenario-Level

Save progress at each scenario milestone:

INSERT INTO scenario_checkpoints (
    orchestration_id, scenario_id, milestone_pct, elapsed_ms, state_snapshot
) VALUES (
    'demo-001', 'medium', 30, 3200, {'items': 90, 'results': [...]}
);

Use case: Resume interrupted scenario

Checkpoint Type 2: Milestone-Level

Save synchronized state across all scenarios:

INSERT INTO orchestration_milestones (
    orchestration_id, milestone_pct, timestamp, simple_status, medium_status, complex_status
) VALUES (
    'demo-001', 30, NOW(), 'complete', 'paused', 'in_progress'
);

Use case: Track synchronization progress

Checkpoint Type 3: Full-State

Periodic snapshots for recovery:

async def checkpoint_full_state(state: ScenarioOrchestratorState):
    """Save complete state to disk."""

    checkpoint_data = {
        "orchestration_id": state["orchestration_id"],
        "timestamp": datetime.now().isoformat(),
        "progress_simple": state["progress_simple"].to_dict(),
        "progress_medium": state["progress_medium"].to_dict(),
        "progress_complex": state["progress_complex"].to_dict(),
    }

    await db.insert("full_state_checkpoints", checkpoint_data)

Use case: Complete recovery from any point


Pattern 8: Cost & Performance Analysis

Cost Analysis

def estimate_orchestration_cost(
    scenarios: dict[str, ScenarioDefinition]
) -> dict:
    """Estimate total execution cost."""

    # LLM cost (if using Claude)
    llm_cost_per_scenario = {
        "simple": estimate_tokens(100) * 0.001,   # ~$0.002
        "medium": estimate_tokens(300) * 0.001,   # ~$0.005
        "complex": estimate_tokens(800) * 0.001,  # ~$0.010
    }

    # Compute cost (if cloud)
    compute_cost = {
        "simple": 30 / 3600 * 0.10,   # 30s @ $0.10/hour
        "medium": 90 / 3600 * 0.10,   # 90s @ $0.10/hour
        "complex": 300 / 3600 * 0.10, # 300s @ $0.10/hour
    }

    # Database cost
    db_cost = 0.001  # Negligible for checkpointing

    return {
        "llm_cost": sum(llm_cost_per_scenario.values()),
        "compute_cost": sum(compute_cost.values()),
        "db_cost": db_cost,
        "total_cost": sum(llm_cost_per_scenario.values()) + sum(compute_cost.values()) + db_cost,
        "cost_per_scenario": llm_cost_per_scenario,
    }

Performance Analysis

def analyze_performance(
    results: dict
) -> dict:
    """Analyze orchestration performance."""

    return {
        "total_execution_time_minutes": (sum(r["elapsed_ms"] for r in results.values()) / 60000),
        "critical_path_seconds": max(r["elapsed_ms"] for r in results.values()) / 1000,
        "parallel_efficiency": (
            (sum(r["elapsed_ms"] for r in results.values()) / 1000) /
            (max(r["elapsed_ms"] for r in results.values()) / 1000)
        ),
        "cost_per_quality_point": estimate_orchestration_cost({}) / avg_quality_score,
    }

Key Architectural Decisions

DecisionChoiceRationale
SynchronizationMilestone-based (Tier 2)Balance between realism and demo experience
Input Scaling1x, 3x, 8x (exponential)Exponential because most skills have overhead
Quality MetricsMultiple per-skill metricsSingle metric insufficient to assess quality
Failure RecoveryIsolation + checkpointingPartial results preferable to total failure
MonitoringReal-time DB queries + LangfuseDistributed state requires DB
Checkpoint FrequencyEvery milestone + completionBalance between safety and overhead
AggregationComparative + recommendationsProvide actionable insights
Skill AbstractionGeneric orchestrator base classTemplate for ANY skill

References

  • langgraph-implementation.md - Python implementation details
  • claude-code-instance-management.md - Multi-terminal setup
  • state-machine-design.md - Detailed state transitions
  • skill-agnostic-template.md - Template for new skills

Claude Code Instance Management

Claude Code Instance Management: Multi-Scenario Demos

Structure 3 parallel Claude Code terminal instances for simultaneous scenario execution with shared state synchronization.

Instance Architecture

┌─────────────────────────────────────────────────────────────────────┐
│                    COORDINATOR PROCESS (Python)                      │
│                   (Runs orchestrator graph)                          │
├─────────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐      ┌─────────────┐      ┌─────────────┐         │
│  │  Terminal 1 │      │  Terminal 2 │      │  Terminal 3 │         │
│  │  (Simple)   │      │  (Medium)   │      │  (Complex)  │         │
│  │             │      │             │      │             │         │
│  │  Session:   │      │  Session:   │      │  Session:   │         │
│  │  simple-123 │      │  medium-123 │      │  complex-123│         │
│  └─────────────┘      └─────────────┘      └─────────────┘         │
│       │                    │                    │                   │
│  Claude Code instances     Claude Code instances     Claude Code     │
│  (3 parallel processes)    (3 parallel processes)    instance        │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────┐       │
│  │   PostgreSQL Checkpoint Table                            │       │
│  │   (Shared state synchronization across instances)        │       │
│  └─────────────────────────────────────────────────────────┘       │
│                                                                     │
└─────────────────────────────────────────────────────────────────────┘

Setup Instructions

Step 1: Prepare the Project

Ensure your project has the orchestrator graph and shared utilities:

# At project root
mkdir -p backend/app/workflows/multi_scenario
cp src/skills/multi-scenario-orchestration/references/langgraph-implementation.py \
   backend/app/workflows/multi_scenario/orchestrator.py

# Create coordinator script
cat > backend/app/workflows/multi_scenario/coordinator.py << 'EOF'
"""
Main coordinator that launches and monitors 3 Claude Code instances.
"""
import asyncio
import subprocess
import os
from pathlib import Path

PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
SCENARIOS = ["simple", "medium", "complex"]
SKILL_NAME = "your-skill-name"  # Change this

async def launch_scenario_instance(scenario_id: str, orchestration_id: str):
    """Launch one Claude Code instance for a scenario."""

    env = os.environ.copy()
    env["SCENARIO_ID"] = scenario_id
    env["ORCHESTRATION_ID"] = orchestration_id
    env["PROJECT_ROOT"] = str(PROJECT_ROOT)

    # Launch Claude Code instance
    process = subprocess.Popen(
        [
            "claude", "code",
            str(PROJECT_ROOT),
            "--session", f"scenario-{scenario_id}-{orchestration_id}",
            "--skill", SKILL_NAME,
        ],
        env=env,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )

    print(f"[COORDINATOR] Launched {scenario_id} instance (PID: {process.pid})")
    return process

async def monitor_instances(processes: dict):
    """Monitor all instances for completion."""

    while any(p.poll() is None for p in processes.values()):
        for scenario_id, process in processes.items():
            if process.poll() is not None:
                print(f"[COORDINATOR] {scenario_id} instance completed")

        await asyncio.sleep(1)

async def main():
    orchestration_id = "demo-001"

    print(f"[COORDINATOR] Starting orchestration {orchestration_id}")
    print(f"[COORDINATOR] Launching 3 parallel instances...")

    # Launch all instances
    processes = {}
    for scenario_id in SCENARIOS:
        process = await launch_scenario_instance(scenario_id, orchestration_id)
        processes[scenario_id] = process

    print(f"[COORDINATOR] All instances launched. Monitoring...")

    # Monitor
    await monitor_instances(processes)

    print(f"[COORDINATOR] All instances completed")

if __name__ == "__main__":
    asyncio.run(main())
EOF

Step 2: Create Scenario Runner Script

Create /backend/app/workflows/multi_scenario/run_scenario.py:

"""
Runner for single scenario. Invoked by Claude Code instance.
Sets up environment from SCENARIO_ID and ORCHESTRATION_ID env vars.
"""
import os
import asyncio
from orchestrator import (
    build_scenario_orchestrator,
    ScenarioOrchestratorState,
    ScenarioDefinition,
    ScenarioProgress,
)
from langgraph.checkpoint.postgres import PostgresSaver

async def run_scenario():
    # Read from environment
    scenario_id = os.getenv("SCENARIO_ID", "simple")
    orchestration_id = os.getenv("ORCHESTRATION_ID", "demo-001")
    project_root = os.getenv("PROJECT_ROOT", ".")

    print(f"[{scenario_id.upper()}] Starting scenario execution")
    print(f"[{scenario_id.upper()}] Orchestration ID: {orchestration_id}")

    # Setup checkpointer
    db_url = os.getenv("DATABASE_URL", "postgresql://localhost/orchestkit")
    checkpointer = PostgresSaver.from_conn_string(db_url)

    # Build orchestrator
    app = build_scenario_orchestrator(checkpointer=checkpointer)

    # Prepare scenario definitions
    configs = {
        "simple": {
            "complexity_multiplier": 1.0,
            "input_size": 100,
            "time_budget_seconds": 30,
            "skill_params": {"batch_size": 10, "cache_enabled": True}
        },
        "medium": {
            "complexity_multiplier": 3.0,
            "input_size": 300,
            "time_budget_seconds": 90,
            "skill_params": {"batch_size": 50, "cache_enabled": True}
        },
        "complex": {
            "complexity_multiplier": 8.0,
            "input_size": 800,
            "time_budget_seconds": 300,
            "skill_params": {"batch_size": 100, "cache_enabled": True, "parallel_workers": 4}
        }
    }

    cfg = configs[scenario_id]

    # Build initial state
    initial_state: ScenarioOrchestratorState = {
        "orchestration_id": orchestration_id,
        "start_time_unix": int(time.time()),
        "skill_name": "your-skill-name",
        "skill_version": "1.0.0",

        # Current scenario only
        "scenario_simple": None,
        "scenario_medium": None,
        "scenario_complex": None,
        "progress_simple": None,
        "progress_medium": None,
        "progress_complex": None,
    }

    # Set only the relevant scenario
    initial_state[f"scenario_{scenario_id}"] = ScenarioDefinition(
        name=scenario_id,
        difficulty={"simple": "easy", "medium": "intermediate", "complex": "advanced"}[scenario_id],
        complexity_multiplier=cfg["complexity_multiplier"],
        input_size=cfg["input_size"],
        dataset_characteristics={"distribution": "uniform"},
        time_budget_seconds=cfg["time_budget_seconds"],
        memory_limit_mb={"simple": 256, "medium": 512, "complex": 1024}[scenario_id],
        error_tolerance={"simple": 0.0, "medium": 0.05, "complex": 0.1}[scenario_id],
        skill_params=cfg["skill_params"],
        expected_quality={"simple": "basic", "medium": "good", "complex": "excellent"}[scenario_id],
        quality_metrics=["accuracy", "coverage"]
    )

    initial_state[f"progress_{scenario_id}"] = ScenarioProgress(scenario_id=scenario_id)

    # Run orchestrator
    config = {"configurable": {"thread_id": f"orch-{orchestration_id}"}}

    print(f"[{scenario_id.upper()}] Invoking orchestrator...")

    try:
        # Stream progress
        async for update in app.astream(initial_state, config=config, stream_mode="updates"):
            if f"progress_{scenario_id}" in update:
                progress = update[f"progress_{scenario_id}"]
                print(f"[{scenario_id.upper()}] Progress: {progress.progress_pct:.1f}% "
                      f"({progress.items_processed} items, {progress.elapsed_ms}ms)")

        print(f"[{scenario_id.upper()}] Scenario complete")

    except Exception as e:
        print(f"[{scenario_id.upper()}] Error: {e}")
        raise

if __name__ == "__main__":
    import time
    asyncio.run(run_scenario())

Execution: Three-Terminal Mode

Terminal 1: Coordinator

cd /path/to/project
python backend/app/workflows/multi_scenario/coordinator.py

Output:

[COORDINATOR] Starting orchestration demo-001
[COORDINATOR] Launching 3 parallel instances...
[COORDINATOR] Launched simple instance (PID: 1234)
[COORDINATOR] Launched medium instance (PID: 1235)
[COORDINATOR] Launched complex instance (PID: 1236)
[COORDINATOR] All instances launched. Monitoring...

Terminal 2: Simple Scenario

cd /path/to/project
export SCENARIO_ID=simple
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.py

Output:

[SIMPLE] Starting scenario execution
[SIMPLE] Orchestration ID: demo-001
[SIMPLE] Invoking orchestrator...
[SIMPLE] Progress: 10.0% (10 items, 100ms)
[SIMPLE] Progress: 20.0% (20 items, 200ms)
...
[SIMPLE] Progress: 100.0% (100 items, 1050ms)
[SIMPLE] Scenario complete

Terminal 3: Medium Scenario

cd /path/to/project
export SCENARIO_ID=medium
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.py

Output:

[MEDIUM] Starting scenario execution
[MEDIUM] Orchestration ID: demo-001
[MEDIUM] Invoking orchestrator...
[MEDIUM] Progress: 3.3% (10 items, 100ms)
[MEDIUM] Progress: 6.7% (20 items, 200ms)
...
[MEDIUM] Progress: 100.0% (300 items, 3100ms)
[MEDIUM] Scenario complete

Terminal 4 (Optional): Complex Scenario

If you have 4 terminals, run complex in parallel:

export SCENARIO_ID=complex
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.py

Shared State Synchronization

PostgreSQL Checkpoint Schema

-- Create checkpoint table (run once)
CREATE TABLE IF NOT EXISTS scenario_orchestration_checkpoints (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    orchestration_id VARCHAR(255) NOT NULL,
    scenario_id VARCHAR(50) NOT NULL,
    milestone_name VARCHAR(100),
    progress_pct FLOAT,
    timestamp_unix BIGINT NOT NULL,
    state_snapshot JSONB,
    metrics JSONB,
    created_at TIMESTAMP DEFAULT NOW(),
    INDEX idx_orchestration_id (orchestration_id),
    INDEX idx_scenario_id (scenario_id),
    INDEX idx_timestamp (timestamp_unix)
);

-- View progress across all scenarios
SELECT
    orchestration_id,
    scenario_id,
    progress_pct,
    milestone_name,
    timestamp_unix,
    (timestamp_unix / 1000.0) as seconds_elapsed
FROM scenario_orchestration_checkpoints
WHERE orchestration_id = 'demo-001'
ORDER BY scenario_id, progress_pct;

-- Example output:
-- orchestration_id | scenario_id | progress_pct | milestone_name | seconds_elapsed
-- demo-001         | simple      | 30           | checkpoint_1   | 1.2
-- demo-001         | simple      | 70           | checkpoint_2   | 2.8
-- demo-001         | simple      | 100          | completion     | 3.1
-- demo-001         | medium      | 30           | checkpoint_1   | 3.5
-- demo-001         | medium      | 70           | checkpoint_2   | 8.2
-- demo-001         | medium      | 100          | completion     | 9.3
-- demo-001         | complex     | 30           | checkpoint_1   | 9.1
-- demo-001         | complex     | 70           | checkpoint_2   | 22.5
-- demo-001         | complex     | 100          | completion     | 25.7

Monitor Progress from Coordinator

"""Monitor script to watch progress across all instances."""
import asyncio
import time
from datetime import datetime
import psycopg2

async def monitor_orchestration(orchestration_id: str, interval: int = 2):
    """Watch progress of all scenarios."""

    conn = psycopg2.connect("dbname=orchestkit user=postgres")
    cursor = conn.cursor()

    print(f"Monitoring orchestration {orchestration_id}...\n")

    while True:
        cursor.execute("""
            SELECT
                scenario_id,
                MAX(progress_pct) as progress,
                MAX(timestamp_unix) as last_update
            FROM scenario_orchestration_checkpoints
            WHERE orchestration_id = %s
            GROUP BY scenario_id
            ORDER BY scenario_id
        """, (orchestration_id,))

        rows = cursor.fetchall()
        if not rows:
            print("No progress yet...")
            await asyncio.sleep(interval)
            continue

        # Clear screen and print progress
        print(f"\r{datetime.now().strftime('%H:%M:%S')}")
        print("-" * 50)

        all_complete = True
        for scenario_id, progress, timestamp in rows:
            bar_length = int(progress / 5)  # 20-char bar
            bar = "█" * bar_length + "░" * (20 - bar_length)

            print(f"{scenario_id:10}{bar}{progress:3.0f}%")

            if progress < 100:
                all_complete = False

        if all_complete:
            print("\n✓ All scenarios complete!")
            break

        await asyncio.sleep(interval)

    conn.close()

if __name__ == "__main__":
    asyncio.run(monitor_orchestration("demo-001"))

Synchronization at Milestones

To enable forced synchronization at milestones (all scenarios pause and wait):

# In run_scenario.py

async def wait_for_milestone_sync(
    orchestration_id: str,
    scenario_id: str,
    milestone_pct: int,
    timeout_seconds: int = 30
):
    """Wait for all scenarios to reach milestone."""

    checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
    start = time.time()

    while time.time() - start < timeout_seconds:
        # Query checkpoint status
        async with checkpointer.get_connection() as conn:
            result = await conn.fetch("""
                SELECT DISTINCT scenario_id, MAX(progress_pct)
                FROM scenario_orchestration_checkpoints
                WHERE orchestration_id = $1
                GROUP BY scenario_id
            """, orchestration_id)

            scenarios_at_milestone = {
                row["scenario_id"]: row["max"] >= milestone_pct
                for row in result
            }

            if all(scenarios_at_milestone.values()):
                print(f"[{scenario_id.upper()}] All scenarios reached {milestone_pct}%")
                return True

        await asyncio.sleep(0.5)

    print(f"[{scenario_id.upper()}] Sync timeout at {milestone_pct}%")
    return False

Advanced: Multi-Host Execution

For even greater parallelism, run scenarios on different machines:

# Host 1: Coordinator + Simple
python backend/app/workflows/multi_scenario/coordinator.py

# Host 2: Medium (different machine, same DB)
export DATABASE_URL="postgresql://user:pass@coordinator-host/orchestkit"
export SCENARIO_ID=medium
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.py

# Host 3: Complex (different machine, same DB)
export DATABASE_URL="postgresql://user:pass@coordinator-host/orchestkit"
export SCENARIO_ID=complex
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.py

PostgreSQL checkpoints serve as the distributed state store.

Best Practices

  1. Unique Orchestration IDs: Use timestamp or UUID for each demo run
  2. Session Isolation: Each instance gets its own Claude Code session
  3. Checkpointing: Always enable PostgreSQL persistence
  4. Monitoring: Watch progress via checkpoint table queries
  5. Timeout Handling: Allow asynchronous completion, don't force lock-step
  6. Error Recovery: Failed instances can be restarted without resetting state

Troubleshooting

Instances get stuck at milestone: → Increase timeout_seconds in wait_for_milestone_sync()

Database connection errors: → Check DATABASE_URL environment variable, ensure PostgreSQL is running

One instance much slower than others: → This is expected! Use Mode A (free-running), not lock-step. Slower instance will eventually complete.

Memory usage grows over time: → Enable checkpointing to disk, reduce batch sizes for complex scenario

Coordination Patterns

Agent Coordination Patterns

Patterns for coordinating multiple specialized agents in complex workflows.

Supervisor-Worker Pattern

from typing import Protocol, Any
import asyncio

class Agent(Protocol):
    async def run(self, task: str, context: dict) -> dict: ...

class SupervisorCoordinator:
    """Central supervisor that routes tasks to worker agents."""

    def __init__(self, workers: dict[str, Agent]):
        self.workers = workers
        self.execution_log: list[dict] = []

    async def route_and_execute(
        self,
        task: str,
        required_agents: list[str],
        parallel: bool = True
    ) -> dict[str, Any]:
        """Route task to specified agents."""
        context = {"task": task, "results": {}}

        if parallel:
            tasks = [
                self._run_worker(name, task, context)
                for name in required_agents
            ]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            return dict(zip(required_agents, results))
        else:
            for name in required_agents:
                context["results"][name] = await self._run_worker(
                    name, task, context
                )
            return context["results"]

    async def _run_worker(
        self, name: str, task: str, context: dict
    ) -> dict:
        """Execute single worker with timeout."""
        try:
            result = await asyncio.wait_for(
                self.workers[name].run(task, context),
                timeout=30.0
            )
            self.execution_log.append({
                "agent": name, "status": "success", "result": result
            })
            return result
        except asyncio.TimeoutError:
            return {"error": f"{name} timed out"}

Conflict Resolution

async def resolve_agent_conflicts(
    findings: list[dict],
    llm: Any
) -> dict:
    """Resolve conflicts between agent outputs."""
    conflicts = []
    for i, f1 in enumerate(findings):
        for f2 in findings[i+1:]:
            if f1.get("recommendation") != f2.get("recommendation"):
                conflicts.append((f1, f2))

    if not conflicts:
        return {"status": "no_conflicts", "findings": findings}

    # LLM arbitration
    resolution = await llm.ainvoke(f"""
        Agents disagree. Determine best recommendation:
        Agent 1: {conflicts[0][0]}
        Agent 2: {conflicts[0][1]}
        Provide: winner, reasoning, confidence (0-1)
    """)
    return {"status": "resolved", "resolution": resolution}

Configuration

  • Worker timeout: 30s default
  • Max parallel agents: 8
  • Retry failed agents: 1 attempt
  • Log all executions for debugging

Cost Optimization

  • Batch similar tasks to reduce overhead
  • Cache agent results by task hash
  • Use cheaper models for simple agents
  • Parallelize independent agents always

Crewai Patterns

CrewAI Patterns (v1.8+)

CrewAI patterns for role-based multi-agent collaboration with Flows architecture, hierarchical crews, MCP tools, and async execution.

Version: This document covers CrewAI 1.8.x - 1.9.x (2026). For earlier versions, patterns may differ.

Table of Contents


Flows Architecture (1.8+)

Flows provide event-driven orchestration with state management. This is the major 1.x feature for complex multi-step workflows.

Basic Flow

from crewai.flow.flow import Flow, listen, start

class ResearchFlow(Flow):
    @start()
    def generate_topic(self):
        """Entry point - marked with @start()"""
        return "AI Safety"

    @listen(generate_topic)
    def research_topic(self, topic):
        """Triggered when generate_topic completes"""
        return f"Research findings on {topic}"

    @listen(research_topic)
    def summarize(self, findings):
        """Chain multiple listeners"""
        return f"Summary: {findings[:100]}..."

# Execute flow
flow = ResearchFlow()
result = flow.kickoff()

Structured State (Pydantic)

from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start

class WorkflowState(BaseModel):
    topic: str = ""
    research: str = ""
    summary: str = ""
    iteration: int = 0

class StatefulFlow(Flow[WorkflowState]):
    @start()
    def initialize(self):
        self.state.topic = "Machine Learning"
        self.state.iteration = 1

    @listen(initialize)
    def process(self):
        self.state.research = f"Research on {self.state.topic}"
        self.state.iteration += 1
        return self.state.research

Router for Conditional Branching

from crewai.flow.flow import Flow, listen, start, router

class ConditionalFlow(Flow):
    @start()
    def evaluate(self):
        # Returns condition result
        return {"score": 85, "passed": True}

    @router(evaluate)
    def route_result(self, result):
        """Route based on evaluation"""
        if result["passed"]:
            return "success"
        return "retry"

    @listen("success")
    def handle_success(self):
        return "Workflow completed successfully"

    @listen("retry")
    def handle_retry(self):
        return "Retrying workflow..."

Parallel Execution with and_/or_

from crewai.flow.flow import Flow, listen, start, and_, or_

class ParallelFlow(Flow):
    @start()
    def task_a(self):
        return "Result A"

    @start()
    def task_b(self):
        return "Result B"

    @listen(and_(task_a, task_b))
    def combine_results(self):
        """Triggers when BOTH complete"""
        return "Combined results"

    @listen(or_(task_a, task_b))
    def first_result(self):
        """Triggers when EITHER completes"""
        return "First result received"

Integrating Crews with Flows

from crewai.flow.flow import Flow, listen, start
from crewai import Crew, Agent, Task

class CrewFlow(Flow):
    @start()
    def prepare_inputs(self):
        return {"topic": "AI Agents", "depth": "detailed"}

    @listen(prepare_inputs)
    def run_research_crew(self, inputs):
        researcher = Agent(
            role="Researcher",
            goal="Research the given topic thoroughly",
            backstory="Expert researcher with domain knowledge"
        )

        task = Task(
            description=f"Research {inputs['topic']} at {inputs['depth']} level",
            expected_output="Comprehensive research report",
            agent=researcher
        )

        crew = Crew(agents=[researcher], tasks=[task])
        result = crew.kickoff()
        return result.raw

MCP Tool Support (1.8+)

CrewAI supports Model Context Protocol (MCP) for external tool integration.

from crewai import Agent

# URL-based MCP server
agent = Agent(
    role="Research Analyst",
    goal="Research and analyze information",
    backstory="Expert analyst",
    mcps=[
        "https://mcp.example.com/mcp?api_key=your_key",
        "crewai-amp:financial-data",  # CrewAI marketplace
        "crewai-amp:research-tools#pubmed_search"  # Specific tool
    ]
)

Transport-Specific Configuration

from crewai import Agent
from crewai.mcp import MCPServerStdio, MCPServerHTTP, MCPServerSSE
from crewai.mcp.tool_filter import create_static_tool_filter

# Local server via stdio
agent = Agent(
    role="File Analyst",
    goal="Analyze local files",
    backstory="File processing expert",
    mcps=[
        MCPServerStdio(
            command="npx",
            args=["-y", "@modelcontextprotocol/server-filesystem"],
            tool_filter=create_static_tool_filter(
                allowed_tool_names=["read_file", "list_directory"]
            )
        )
    ]
)

# Remote HTTP server
agent = Agent(
    role="API Analyst",
    goal="Query external APIs",
    backstory="Integration specialist",
    mcps=[
        MCPServerHTTP(
            url="https://api.example.com/mcp",
            headers={"Authorization": "Bearer token"},
            connect_timeout=60
        )
    ]
)

# Server-Sent Events (streaming)
agent = Agent(
    role="Real-time Analyst",
    goal="Monitor streaming data",
    backstory="Real-time data specialist",
    mcps=[
        MCPServerSSE(
            url="https://stream.example.com/mcp",
            headers={"Authorization": "Bearer token"}
        )
    ]
)

MCPServerAdapter (Advanced)

from crewai import Agent, Crew, Task
from crewai_tools import MCPServerAdapter

# Context manager for manual connection management
with MCPServerAdapter(server_params, connect_timeout=60) as mcp_tools:
    agent = Agent(
        role="MCP Tool User",
        goal="Use MCP tools effectively",
        backstory="Tool specialist",
        tools=mcp_tools,
        verbose=True
    )

    # Or filter specific tools
    filtered_tools = mcp_tools["specific_tool_name"]

Hierarchical Process

from crewai import Agent, Crew, Task, Process

manager = Agent(
    role="Project Manager",
    goal="Coordinate team and ensure deliverables",
    backstory="Senior PM with 10 years experience",
    allow_delegation=True,
    verbose=True
)

researcher = Agent(
    role="Researcher",
    goal="Find accurate information",
    backstory="Expert researcher",
    allow_delegation=False
)

writer = Agent(
    role="Content Writer",
    goal="Create compelling content",
    backstory="Professional writer"
)

crew = Crew(
    agents=[manager, researcher, writer],
    tasks=[research_task, write_task, review_task],
    process=Process.hierarchical,
    manager_llm="gpt-4o",  # Required for hierarchical
    memory=True,
    verbose=True
)

Agent Configuration (1.8+)

from crewai import Agent

agent = Agent(
    # Core identity
    role="Senior Data Scientist",
    goal="Analyze data and provide insights",
    backstory="Expert with 10 years experience",

    # LLM configuration
    llm="gpt-4o",
    function_calling_llm="gpt-4o-mini",  # Cheaper model for tools
    use_system_prompt=True,

    # Execution control
    max_iter=20,
    max_rpm=100,
    max_execution_time=300,  # seconds
    max_retry_limit=2,

    # Advanced features (1.8+)
    reasoning=True,  # Enable reflection before tasks
    max_reasoning_attempts=3,
    multimodal=True,  # Text and visual processing
    inject_date=True,
    date_format="%Y-%m-%d",

    # Memory and context
    memory=True,
    respect_context_window=True,  # Auto-summarize on limit

    # Tools
    tools=[tool1, tool2],
    cache=True,

    # Delegation
    allow_delegation=True,
    verbose=True
)

Task Configuration (1.8+)

Structured Output

from pydantic import BaseModel
from crewai import Task

class ReportOutput(BaseModel):
    title: str
    summary: str
    findings: list[str]
    confidence: float

task = Task(
    description="Analyze market trends and create report",
    expected_output="Structured market analysis report",
    agent=analyst,
    output_pydantic=ReportOutput  # Structured output
)

# Access structured result
result = crew.kickoff()
report = result.pydantic
print(report.title, report.confidence)

Async Task Execution

from crewai import Task

# Parallel research tasks
research_task1 = Task(
    description="Research topic A",
    expected_output="Research findings",
    agent=researcher,
    async_execution=True  # Non-blocking
)

research_task2 = Task(
    description="Research topic B",
    expected_output="Research findings",
    agent=researcher,
    async_execution=True
)

# Dependent task waits for async tasks
synthesis_task = Task(
    description="Synthesize all research",
    expected_output="Integrated analysis",
    agent=analyst,
    context=[research_task1, research_task2]  # Waits for completion
)

Task Guardrails (Validation)

from crewai import Task
from crewai.tasks import TaskOutput

def validate_length(result: TaskOutput) -> tuple[bool, any]:
    """Validate output meets requirements"""
    if len(result.raw.split()) < 100:
        return (False, "Content too brief, expand analysis")
    return (True, result.raw)

task = Task(
    description="Write comprehensive analysis",
    expected_output="Detailed analysis (100+ words)",
    agent=writer,
    guardrail=validate_length,
    guardrail_max_retries=3
)

# Multiple guardrails
task = Task(
    description="Generate report",
    expected_output="Validated report",
    agent=analyst,
    guardrails=[
        validate_length,
        validate_sources,
        "Content must be objective and data-driven"  # LLM-based
    ]
)

Human Input Tasks

task = Task(
    description="Review and approve recommendations",
    expected_output="Approved recommendations",
    agent=reviewer,
    human_input=True  # Pauses for human verification
)

Task Callbacks

from crewai.tasks import TaskOutput

def task_callback(output: TaskOutput):
    print(f"Task completed: {output.description}")
    print(f"Result: {output.raw[:100]}...")
    # Send notifications, log metrics, etc.

task = Task(
    description="Analyze data",
    expected_output="Analysis results",
    agent=analyst,
    callback=task_callback
)

Async Execution

Async Crew Kickoff

import asyncio
from crewai import Crew

async def run_crews_parallel():
    crew1 = Crew(agents=[agent1], tasks=[task1])
    crew2 = Crew(agents=[agent2], tasks=[task2])

    # Run multiple crews in parallel
    results = await asyncio.gather(
        crew1.kickoff_async(),
        crew2.kickoff_async()
    )
    return results

# Execute
results = asyncio.run(run_crews_parallel())

Async Flow Kickoff

from crewai.flow.flow import Flow, start, listen

class AsyncFlow(Flow):
    @start()
    async def fetch_data(self):
        # Async operations supported
        data = await external_api.fetch()
        return data

    @listen(fetch_data)
    async def process_data(self, data):
        result = await process_async(data)
        return result

# Async execution
async def main():
    flow = AsyncFlow()
    result = await flow.kickoff_async()
    return result

asyncio.run(main())

Streaming Output

from crewai import Crew

# Enable streaming on crew
crew = Crew(
    agents=[agent1, agent2],
    tasks=[task1, task2],
    stream=True  # Enable real-time output
)

# Stream results
result = crew.kickoff()

# Flow streaming
flow = ExampleFlow()
flow.stream = True
streaming = flow.kickoff()

for chunk in streaming:
    print(chunk.content, end="", flush=True)

final_result = streaming.result

Knowledge Sources (1.8+)

from crewai import Agent, Crew
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
from crewai.knowledge.knowledge_config import KnowledgeConfig

# String knowledge
company_info = StringKnowledgeSource(
    content="Company policies and guidelines..."
)

# PDF knowledge
docs = PDFKnowledgeSource(file_paths=["manual.pdf", "guide.pdf"])

# Web knowledge
web_source = CrewDoclingSource(
    file_paths=["https://example.com/docs"]
)

# Configure retrieval
config = KnowledgeConfig(
    results_limit=10,      # Documents returned (default: 3)
    score_threshold=0.5    # Relevance minimum (default: 0.35)
)

# Agent-level knowledge
agent = Agent(
    role="Support Agent",
    goal="Answer questions using company knowledge",
    backstory="Expert support representative",
    knowledge_sources=[company_info]
)

# Crew-level knowledge (all agents)
crew = Crew(
    agents=[agent1, agent2],
    tasks=[task1, task2],
    knowledge_sources=[docs, web_source]
)

Memory Configuration

from crewai import Crew
from crewai.memory import ShortTermMemory, LongTermMemory, EntityMemory

# Simple memory
crew = Crew(
    agents=[agent1, agent2],
    tasks=[task1, task2],
    memory=True  # Enable all memory types
)

# Custom memory configuration
crew = Crew(
    agents=[agent1, agent2],
    tasks=[task1, task2],
    short_term_memory=ShortTermMemory(),
    long_term_memory=LongTermMemory(
        storage=ChromaStorage(collection_name="crew_memory")
    ),
    entity_memory=EntityMemory()
)

Custom Tools

from crewai.tools import tool

@tool("Search Database")
def search_database(query: str) -> str:
    """Search the internal database for relevant information.

    Args:
        query: The search query string
    """
    results = db.search(query)
    return json.dumps(results)

# Async tool
@tool("Fetch API Data")
async def fetch_api_data(endpoint: str) -> str:
    """Fetch data from external API asynchronously.

    Args:
        endpoint: API endpoint to query
    """
    async with aiohttp.ClientSession() as session:
        async with session.get(endpoint) as response:
            return await response.text()

# Assign tools to agent
researcher = Agent(
    role="Researcher",
    goal="Find accurate information",
    backstory="Expert researcher",
    tools=[search_database, fetch_api_data],
    verbose=True
)

from crewai import Agent, Crew, Task, CrewBase, agent, task, crew

@CrewBase
class ResearchCrew:
    agents_config = 'config/agents.yaml'
    tasks_config = 'config/tasks.yaml'

    @agent
    def researcher(self) -> Agent:
        return Agent(
            config=self.agents_config['researcher'],
            tools=[search_tool]
        )

    @agent
    def analyst(self) -> Agent:
        return Agent(config=self.agents_config['analyst'])

    @task
    def research_task(self) -> Task:
        return Task(config=self.tasks_config['research'])

    @task
    def analysis_task(self) -> Task:
        return Task(
            config=self.tasks_config['analysis'],
            context=[self.research_task()]
        )

    @crew
    def crew(self) -> Crew:
        return Crew(
            agents=self.agents,  # Auto-collected
            tasks=self.tasks,    # Auto-collected
            process=Process.sequential
        )

# Execute
result = ResearchCrew().crew().kickoff(inputs={"topic": "AI Safety"})

Human-in-the-Loop (Flows)

from crewai.flow.flow import Flow, listen, start
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult

class ReviewFlow(Flow):
    @start()
    @human_feedback(
        message="Do you approve this content?",
        emit=["approved", "rejected"],
        llm="gpt-4o-mini"
    )
    def generate_content(self):
        return "Content for human review..."

    @listen("approved")
    def handle_approval(self, result: HumanFeedbackResult):
        print(f"Approved with feedback: {result.feedback}")
        return "Processing approved content"

    @listen("rejected")
    def handle_rejection(self, result: HumanFeedbackResult):
        print(f"Rejected: {result.feedback}")
        return "Revising content"

Configuration Summary

FeatureParameterDefault
Process typesprocesssequential, hierarchical
Manager LLMmanager_llmRequired for hierarchical
MemorymemoryFalse
StreamingstreamFalse
VerboseverboseFalse
Max RPMmax_rpmUnlimited
PlanningplanningFalse

Best Practices

  1. Use Flows for complex workflows: Multi-step processes benefit from Flows architecture
  2. Prefer decorator-based definition: Use @CrewBase for maintainable crew definitions
  3. Leverage MCP for external tools: Use the simple DSL for quick MCP integration
  4. Enable structured outputs: Use output_pydantic for type-safe results
  5. Add guardrails: Validate outputs with function or LLM-based guardrails
  6. Use async for parallel work: async_execution=True for independent tasks
  7. Configure knowledge sources: Add crew/agent-level knowledge for context
  8. Role clarity: Each agent has distinct, non-overlapping role
  9. Task granularity: One clear deliverable per task
  10. Memory scope: Use short-term for session, long-term for persistent knowledge

Migration from 0.x

0.x Pattern1.8+ Pattern
Manual agent/task lists@CrewBase with @agent, @task decorators
Synchronous onlyAsync support with kickoff_async()
No streamingstream=True parameter
Basic toolsMCP integration with mcps parameter
No validationTask guardrails
No flow controlFlows with @start, @listen, @router

Framework Comparison

Framework Comparison

Decision matrix for choosing between multi-agent frameworks.

Feature Comparison

FeatureLangGraphCrewAIOpenAI SDKMS Agent
State ManagementExcellentGoodBasicGood
PersistenceBuilt-inPluginManualBuilt-in
StreamingNativeLimitedNativeNative
Human-in-LoopNativeManualManualNative
MemoryVia StoreBuilt-inManualManual
ObservabilityLangfuse/LangSmithLimitedTracingAzure Monitor
Learning CurveSteepEasyMediumMedium
Production ReadyYesYesYesQ1 2026

Use Case Matrix

Use CaseBest FrameworkWhy
Complex state machinesLangGraphNative StateGraph, persistence
Role-based teamsCrewAIBuilt-in delegation, backstories
OpenAI-only projectsOpenAI SDKNative integration, handoffs
Enterprise/complianceMS AgentAzure integration, A2A
Research/experimentsAG2Open-source, flexible
Quick prototypesCrewAIMinimal boilerplate
Long-running workflowsLangGraphCheckpointing, recovery
Customer support botsOpenAI SDKHandoffs, guardrails

Decision Tree

Start
  |
  +-- Need complex state machines?
  |     |
  |     +-- Yes --> LangGraph
  |     |
  |     +-- No
  |           |
  +-- Role-based collaboration?
  |     |
  |     +-- Yes --> CrewAI
  |     |
  |     +-- No
  |           |
  +-- OpenAI ecosystem only?
  |     |
  |     +-- Yes --> OpenAI Agents SDK
  |     |
  |     +-- No
  |           |
  +-- Enterprise requirements?
  |     |
  |     +-- Yes --> Microsoft Agent Framework
  |     |
  |     +-- No
  |           |
  +-- Open-source priority?
        |
        +-- Yes --> AG2
        |
        +-- No --> LangGraph (default)

Migration Paths

From AutoGen to MS Agent Framework

# AutoGen 0.2 (old)
from autogen import AssistantAgent, UserProxyAgent
agent = AssistantAgent(name="assistant", llm_config=config)

# MS Agent Framework (new)
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(model="gpt-5.2")
agent = AssistantAgent(name="assistant", model_client=model_client)

From Custom to LangGraph

# Custom orchestration (old)
async def workflow(task):
    step1 = await agent1.run(task)
    step2 = await agent2.run(step1)
    return step2

# LangGraph (new)
from langgraph.graph import StateGraph
workflow = StateGraph(State)
workflow.add_node("agent1", agent1_node)
workflow.add_node("agent2", agent2_node)
workflow.add_edge("agent1", "agent2")

Cost Considerations

FrameworkLicensingInfra CostLLM Cost
LangGraphMITSelf-host / LangGraph CloudAny LLM
CrewAIMITSelf-hostAny LLM
OpenAI SDKMITSelf-hostOpenAI only
MS AgentMITSelf-host / AzureAny LLM
AG2Apache 2.0Self-hostAny LLM

Performance Characteristics

FrameworkCold StartLatencyThroughput
LangGraph~100msLowHigh
CrewAI~200msMediumMedium
OpenAI SDK~50msLowHigh
MS Agent~150msMediumHigh

Team Expertise Requirements

FrameworkPythonLLMInfra
LangGraphExpertExpertMedium
CrewAIBeginnerBeginnerLow
OpenAI SDKMediumMediumLow
MS AgentMediumMediumHigh

Recommendation Summary

  1. Default choice: LangGraph (most capable, production-proven)
  2. Fastest to prototype: CrewAI (minimal code, intuitive)
  3. OpenAI shops: OpenAI Agents SDK (native integration)
  4. Enterprise: Microsoft Agent Framework (compliance, Azure)
  5. Research: AG2 (open community, experimental features)

Gpt 5 2 Codex

GPT-5.2-Codex

OpenAI's specialized agentic coding model (January 2026) optimized for long-horizon software engineering tasks.

Overview

GPT-5.2-Codex is a specialized variant of GPT-5.2 purpose-built for agentic coding workflows. Unlike the general-purpose GPT-5.2, Codex is optimized for:

  • Extended autonomous operation: Hours-long coding sessions without degradation
  • Context compaction: Intelligent summarization for long-running tasks
  • Project-scale understanding: Full codebase comprehension and refactoring
  • Tool reliability: Deterministic file operations and terminal commands

Key Differences from GPT-5.2

CapabilityGPT-5.2GPT-5.2-Codex
Context Window256K tokens256K + compaction
Session DurationSingle requestHours/days
Tool ExecutionGeneralCode-optimized
File OperationsBasicAtomic, rollback-aware
Terminal AccessSandboxedFull with safety rails
VisionGeneralCode/diagram-aware
Cost per 1M tokens$2.50/$10$5.00/$20

Key Capabilities

Long-Horizon Work Through Context Compaction

Codex automatically compacts context during extended sessions, preserving critical information while discarding ephemeral details.

from openai import OpenAI

client = OpenAI()

# Codex maintains context across many tool calls
response = client.chat.completions.create(
    model="gpt-5.2-codex",
    messages=[
        {"role": "system", "content": "You are a senior software engineer."},
        {"role": "user", "content": "Refactor the authentication module to use JWT."}
    ],
    # Codex-specific parameters
    extra_body={
        "codex_config": {
            "compaction_strategy": "semantic",  # semantic, aggressive, minimal
            "preserve_file_state": True,
            "max_session_hours": 8
        }
    }
)

Compaction Strategies:

StrategyUse CaseRetention
semanticGeneral developmentCode structure, decisions, errors
aggressiveVery long tasksOnly current focus + critical history
minimalShort tasksFull context, no compaction

Project-Scale Tasks

Codex excels at large-scale operations that span entire codebases:

from agents import Agent, tool

# Define codebase navigation tools
@tool
def search_codebase(query: str, file_types: list[str] = None) -> str:
    """Search across the entire codebase for patterns or definitions."""
    # Implementation
    pass

@tool
def apply_refactor(pattern: str, replacement: str, scope: str = "project") -> dict:
    """Apply a refactoring pattern across multiple files with preview."""
    # Returns affected files and changes for approval
    pass

codex_agent = Agent(
    name="refactor-engineer",
    model="gpt-5.2-codex",
    instructions="""You are a senior engineer performing large-scale refactors.

    Guidelines:
    1. Analyze impact before changes
    2. Create rollback points
    3. Run tests after each file change
    4. Document breaking changes""",
    tools=[search_codebase, apply_refactor]
)

Supported Project Tasks:

  • Full codebase migrations (Python 2 to 3, React class to hooks)
  • Dependency upgrades with breaking change resolution
  • Architecture refactors (monolith to microservices)
  • Test coverage expansion across modules
  • Security vulnerability remediation

Enhanced Cybersecurity Capabilities

Codex includes specialized training for security-aware coding:

# Security-focused agent configuration
security_agent = Agent(
    name="security-engineer",
    model="gpt-5.2-codex",
    instructions="""You are a security engineer. When writing or reviewing code:

    1. Identify OWASP Top 10 vulnerabilities
    2. Check for secrets/credentials in code
    3. Validate input sanitization
    4. Review authentication/authorization flows
    5. Check dependency vulnerabilities via CVE databases""",
    extra_config={
        "security_mode": True,  # Enables security-focused reasoning
        "cve_lookup": True      # Real-time CVE database access
    }
)

Security Capabilities:

FeatureDescription
Vulnerability DetectionSAST-like scanning during code review
CVE AwarenessReal-time vulnerability database lookups
Secrets DetectionIdentifies hardcoded credentials, API keys
Threat ModelingSuggests security improvements
Compliance HintsGDPR, HIPAA, SOC2 pattern recognition

Vision for Code Artifacts

Codex processes visual inputs with code-aware understanding:

import base64

# Process architecture diagram
with open("architecture.png", "rb") as f:
    image_data = base64.standard_b64encode(f.read()).decode("utf-8")

response = client.chat.completions.create(
    model="gpt-5.2-codex",
    messages=[
        {
            "role": "user",
            "content": [
                {
                    "type": "text",
                    "text": "Implement the microservices shown in this architecture diagram."
                },
                {
                    "type": "image_url",
                    "image_url": {"url": f"data:image/png;base64,{image_data}"}
                }
            ]
        }
    ]
)

Vision Use Cases:

  • Architecture diagrams to code scaffolding
  • UI mockups to component implementation
  • Error screenshots to debugging steps
  • Whiteboard sketches to technical specs
  • Database schemas (ERD) to migrations

Benchmark Performance

GPT-5.2-Codex achieves state-of-the-art results on coding benchmarks:

BenchmarkGPT-5.2GPT-5.2-CodexPrevious SOTA
SWE-Bench Pro61.2%78.4%68.1% (Claude Opus 4.6)
Terminal-Bench 2.072.8%89.3%81.2% (Gemini 2.5)
HumanEval+94.1%96.8%95.2% (GPT-5.2)
MBPP+89.7%93.2%91.4% (Claude Opus 4.6)
CodeContests45.2%58.7%52.3% (Gemini 2.5)

SWE-Bench Pro Notes:

  • Tests real GitHub issues requiring multi-file changes
  • Codex excels at test-writing and edge case handling
  • Strong performance on legacy codebase understanding

Terminal-Bench 2.0 Notes:

  • Tests long-horizon terminal tasks (setup, deploy, debug)
  • Codex maintains coherent state across 50+ commands
  • Superior error recovery and alternative path exploration

API Usage Patterns

Basic Completion

from openai import OpenAI

client = OpenAI()

response = client.chat.completions.create(
    model="gpt-5.2-codex",
    messages=[
        {
            "role": "system",
            "content": "You are an expert Python developer."
        },
        {
            "role": "user",
            "content": "Write a connection pool manager with health checks."
        }
    ],
    temperature=0.2,  # Lower temperature for code generation
    max_tokens=4096
)

print(response.choices[0].message.content)

Streaming with Tool Use

import json

# Define tools
tools = [
    {
        "type": "function",
        "function": {
            "name": "read_file",
            "description": "Read contents of a file",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "File path"}
                },
                "required": ["path"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "write_file",
            "description": "Write contents to a file",
            "parameters": {
                "type": "object",
                "properties": {
                    "path": {"type": "string", "description": "File path"},
                    "content": {"type": "string", "description": "File content"}
                },
                "required": ["path", "content"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "run_command",
            "description": "Execute a shell command",
            "parameters": {
                "type": "object",
                "properties": {
                    "command": {"type": "string", "description": "Command to run"},
                    "timeout": {"type": "integer", "description": "Timeout in seconds"}
                },
                "required": ["command"]
            }
        }
    }
]

# Streaming with tool calls
stream = client.chat.completions.create(
    model="gpt-5.2-codex",
    messages=[
        {"role": "user", "content": "Set up a new FastAPI project with tests"}
    ],
    tools=tools,
    stream=True
)

for chunk in stream:
    if chunk.choices[0].delta.tool_calls:
        tool_call = chunk.choices[0].delta.tool_calls[0]
        print(f"Tool: {tool_call.function.name}")
    elif chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

Async Operations

import asyncio
from openai import AsyncOpenAI

async_client = AsyncOpenAI()

async def refactor_module(module_path: str) -> str:
    response = await async_client.chat.completions.create(
        model="gpt-5.2-codex",
        messages=[
            {
                "role": "system",
                "content": "Refactor code for readability and performance."
            },
            {
                "role": "user",
                "content": f"Refactor the module at {module_path}"
            }
        ],
        extra_body={
            "codex_config": {
                "preserve_behavior": True,
                "add_type_hints": True
            }
        }
    )
    return response.choices[0].message.content

# Parallel refactoring
async def refactor_project(modules: list[str]):
    tasks = [refactor_module(m) for m in modules]
    results = await asyncio.gather(*tasks)
    return dict(zip(modules, results))

Integration with Agents SDK

GPT-5.2-Codex integrates seamlessly with OpenAI Agents SDK 0.6.x:

from agents import Agent, Runner, handoff, tool
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX

# File operation tools
@tool
def read_file(path: str) -> str:
    """Read file contents."""
    with open(path) as f:
        return f.read()

@tool
def write_file(path: str, content: str) -> str:
    """Write content to file."""
    with open(path, "w") as f:
        f.write(content)
    return f"Wrote {len(content)} bytes to {path}"

@tool
def run_tests(path: str = ".") -> str:
    """Run pytest on the specified path."""
    import subprocess
    result = subprocess.run(
        ["pytest", path, "-v", "--tb=short"],
        capture_output=True,
        text=True
    )
    return f"Exit code: {result.returncode}\n{result.stdout}\n{result.stderr}"

# Specialized agents using Codex
architect_agent = Agent(
    name="architect",
    model="gpt-5.2-codex",
    instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a software architect. Analyze requirements and design solutions.
Hand off to implementer for coding tasks.
Hand off to reviewer for code review.""",
    tools=[read_file]
)

implementer_agent = Agent(
    name="implementer",
    model="gpt-5.2-codex",
    instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a senior developer. Implement designs with clean, tested code.
Hand off to reviewer when implementation is complete.""",
    tools=[read_file, write_file, run_tests]
)

reviewer_agent = Agent(
    name="reviewer",
    model="gpt-5.2-codex",
    instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a code reviewer. Check for bugs, security issues, and style.
Request changes from implementer if needed.
Approve and hand back to architect when satisfied.""",
    tools=[read_file]
)

# Wire up handoffs
architect_agent.handoffs = [
    handoff(agent=implementer_agent),
    handoff(agent=reviewer_agent)
]
implementer_agent.handoffs = [
    handoff(agent=reviewer_agent),
    handoff(agent=architect_agent)
]
reviewer_agent.handoffs = [
    handoff(agent=implementer_agent),
    handoff(agent=architect_agent)
]

# Run development workflow
async def develop_feature(requirement: str):
    runner = Runner()
    result = await runner.run(
        architect_agent,
        f"Design and implement: {requirement}"
    )
    return result.final_output

RealtimeRunner for Interactive Sessions

from agents import Agent
from agents.realtime import RealtimeRunner

# Interactive coding session
codex_agent = Agent(
    name="pair-programmer",
    model="gpt-5.2-codex",
    instructions="You are a pair programming partner. Help write and debug code."
)

async def interactive_session():
    async with RealtimeRunner(codex_agent) as runner:
        # Continuous conversation with context preservation
        while True:
            user_input = input("> ")
            if user_input == "exit":
                break

            async for chunk in runner.stream(user_input):
                print(chunk.content, end="", flush=True)
            print()

IDE Integrations

GPT-5.2-Codex powers several IDE integrations:

Cursor

// .cursor/settings.json
{
  "ai.model": "gpt-5.2-codex",
  "ai.features": {
    "composer": true,
    "agent": true,
    "codebaseIndexing": true
  },
  "ai.codex": {
    "sessionDuration": "8h",
    "compactionStrategy": "semantic"
  }
}

Cursor Features with Codex:

  • Composer for multi-file generation
  • Agent mode for autonomous tasks
  • Background indexing for codebase awareness
  • Inline completions with project context

Windsurf (Codeium)

# .windsurf/config.yaml
model:
  provider: openai
  name: gpt-5.2-codex

cascade:
  enabled: true
  max_depth: 10
  auto_apply: false  # Review changes before applying

features:
  flows: true        # Multi-step guided workflows
  supercomplete: true
  terminal_agent: true

Windsurf Features:

  • Cascade for chained operations
  • Flows for guided development
  • Terminal integration for full-stack tasks

GitHub Copilot Workspace

# .github/copilot-workspace.yml
model: gpt-5.2-codex

workspace:
  scope: repository
  features:
    - issue-to-pr
    - multi-file-edit
    - test-generation

review:
  auto_suggest: true
  security_scan: true

Copilot Workspace Features:

  • Issue-to-PR automation
  • Multi-repository awareness
  • Integrated CI feedback

Factory (VSCode Extension)

// .vscode/settings.json
{
  "factory.model": "gpt-5.2-codex",
  "factory.drafter": {
    "enabled": true,
    "autoContext": true
  },
  "factory.pilot": {
    "enabled": true,
    "approvalRequired": true
  }
}

When to Use Codex vs Standard GPT-5.2

Use GPT-5.2-Codex When:

ScenarioWhy Codex
Multi-file refactorsProject-scale context management
Long debugging sessionsContext compaction prevents degradation
Security reviewsSpecialized vulnerability detection
Test generation at scaleUnderstands test patterns across codebase
Architecture migrationsMaintains coherence across many changes
CI/CD pipeline workTerminal-optimized tool execution

Use Standard GPT-5.2 When:

ScenarioWhy Standard
Single-file tasksNo need for compaction overhead
Code explanationGeneral language understanding sufficient
Quick prototypesFaster, cheaper for short tasks
Non-code tasksWriting docs, emails, general Q&A
Cost-sensitive workloads50% cheaper than Codex

Decision Matrix

Task Duration > 1 hour?
  |
  +-- Yes --> GPT-5.2-Codex
  |
  +-- No
        |
        +-- Multiple files affected?
        |     |
        |     +-- Yes --> GPT-5.2-Codex
        |     |
        |     +-- No
        |           |
        |           +-- Security review needed?
        |           |     |
        |           |     +-- Yes --> GPT-5.2-Codex
        |           |     |
        |           |     +-- No --> GPT-5.2 (standard)

Pricing Considerations

Token Pricing (January 2026)

ModelInput (per 1M)Output (per 1M)Cached Input
gpt-5.2$2.50$10.00$1.25
gpt-5.2-codex$5.00$20.00$2.50
gpt-5.2-mini$0.15$0.60$0.075

Cost Optimization Strategies

# 1. Use caching for repeated context
response = client.chat.completions.create(
    model="gpt-5.2-codex",
    messages=messages,
    extra_body={
        "cache_control": {
            "system_prompt": "ephemeral",  # Cache system prompt
            "file_contents": "persistent"   # Cache file reads
        }
    }
)

# 2. Batch similar operations
# Instead of separate calls per file:
files_to_refactor = ["auth.py", "users.py", "api.py"]
response = client.chat.completions.create(
    model="gpt-5.2-codex",
    messages=[
        {"role": "user", "content": f"Refactor these files: {files_to_refactor}"}
    ]
)

# 3. Use gpt-5.2-mini for preprocessing
# Filter/classify tasks before sending to Codex
classification = client.chat.completions.create(
    model="gpt-5.2-mini",
    messages=[{"role": "user", "content": f"Is this task complex? {task}"}]
)
if "complex" in classification.choices[0].message.content.lower():
    # Use Codex for complex tasks
    use_model = "gpt-5.2-codex"
else:
    # Use standard for simple tasks
    use_model = "gpt-5.2"

Estimated Costs by Task Type

TaskEst. TokensCodex CostStandard Cost
Single file fix~5K$0.12$0.06
Module refactor~50K$1.25$0.63
Full codebase migration~500K$12.50$6.25
8-hour dev session~2M$50.00$25.00

Note: Codex typically requires fewer iterations for complex tasks, often making total cost comparable to standard GPT-5.2.

Configuration Reference

Codex-Specific Parameters

response = client.chat.completions.create(
    model="gpt-5.2-codex",
    messages=messages,
    extra_body={
        "codex_config": {
            # Context management
            "compaction_strategy": "semantic",  # semantic, aggressive, minimal
            "preserve_file_state": True,        # Remember file contents
            "max_session_hours": 8,             # Session duration limit

            # Code behavior
            "preserve_behavior": True,          # Ensure refactors don't change behavior
            "add_type_hints": True,             # Add type hints when refactoring
            "follow_style_guide": "project",    # project, google, pep8

            # Safety
            "security_mode": True,              # Enable security scanning
            "dry_run": False,                   # Preview changes without applying
            "require_tests": True,              # Require test coverage for changes

            # Tools
            "shell_timeout": 300,               # Max seconds for shell commands
            "file_size_limit": 1048576          # Max file size to read (1MB)
        }
    }
)

Best Practices

  1. Start with clear goals: Define what "done" looks like upfront
  2. Provide project context: Include README, architecture docs, coding standards
  3. Use semantic compaction: Best balance of context and performance
  4. Enable security mode: Catch vulnerabilities during development
  5. Set session limits: Prevent runaway costs with max_session_hours
  6. Review before applying: Use dry_run for large refactors
  7. Batch related operations: Reduce API calls by grouping similar tasks
  8. Cache file contents: Use persistent caching for frequently read files

Langgraph Implementation

LangGraph Implementation: Multi-Scenario Orchestration

Complete Python implementation of the multi-scenario orchestration pattern using LangGraph 1.0.6+.

1. State Definition

from typing import TypedDict, Annotated, Literal
from dataclasses import dataclass, field, asdict
from operator import add
import time
from datetime import datetime

@dataclass
class ScenarioProgress:
    """Track execution state for one scenario."""
    scenario_id: str
    status: Literal["pending", "running", "paused", "complete", "failed"]
    progress_pct: float = 0.0

    # Milestones
    milestones_reached: list[str] = field(default_factory=list)
    current_milestone: str = "start"

    # Timing
    start_time_ms: int = 0
    elapsed_ms: int = 0
    elapsed_checkpoints: dict = field(default_factory=dict)  # {milestone: time_ms}

    # Metrics
    memory_used_mb: int = 0
    items_processed: int = 0
    batch_count: int = 0

    # Results
    partial_results: list[dict] = field(default_factory=list)
    quality_scores: dict = field(default_factory=dict)

    # Errors
    errors: list[dict] = field(default_factory=list)

    def to_dict(self):
        return asdict(self)

@dataclass
class ScenarioDefinition:
    """Configuration for one scenario."""
    name: str  # "simple", "medium", "complex"
    difficulty: Literal["easy", "intermediate", "advanced"]
    complexity_multiplier: float  # 1.0, 3.0, 8.0

    # Inputs
    input_size: int
    dataset_characteristics: dict  # {"distribution": "uniform"}

    # Constraints
    time_budget_seconds: int
    memory_limit_mb: int
    error_tolerance: float  # 0-1

    # Skill params
    skill_params: dict

    # Expectations
    expected_quality: Literal["basic", "good", "excellent"]
    quality_metrics: list[str]

    def to_dict(self):
        return asdict(self)

class ScenarioOrchestratorState(TypedDict, total=False):
    """State for the entire orchestration."""

    # Orchestration metadata
    orchestration_id: str
    start_time_unix: int
    skill_name: str
    skill_version: str

    # Scenario definitions
    scenario_simple: ScenarioDefinition
    scenario_medium: ScenarioDefinition
    scenario_complex: ScenarioDefinition

    # Progress tracking
    progress_simple: ScenarioProgress
    progress_medium: ScenarioProgress
    progress_complex: ScenarioProgress

    # Synchronization
    sync_points: dict  # {milestone: bool}
    last_sync_time: int

    # Aggregated results
    final_results: dict

2. Node Implementations

Supervisor Node

from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send

async def scenario_supervisor(state: ScenarioOrchestratorState) -> list[Command]:
    """
    Route to all 3 scenarios in parallel.

    Returns Send commands that trigger parallel execution.
    """
    print(f"[SUPERVISOR] Starting orchestration {state['orchestration_id']}")

    # Initialize progress for each scenario
    for scenario_id in ["simple", "medium", "complex"]:
        progress = ScenarioProgress(
            scenario_id=scenario_id,
            status="pending",
            start_time_ms=int(time.time() * 1000)
        )
        state[f"progress_{scenario_id}"] = progress

    # Return Send commands for parallel execution
    return [
        Send("scenario_worker", {"scenario_id": "simple", **state}),
        Send("scenario_worker", {"scenario_id": "medium", **state}),
        Send("scenario_worker", {"scenario_id": "complex", **state}),
    ]

async def scenario_worker(state: ScenarioOrchestratorState) -> dict:
    """
    Execute one scenario (simple, medium, or complex).

    Receives scenario_id from supervisor via Send.
    """
    scenario_id = state.get("scenario_id")
    progress = state[f"progress_{scenario_id}"]
    scenario_def = state[f"scenario_{scenario_id}"]

    print(f"[SCENARIO {scenario_id.upper()}] Starting ({scenario_def.complexity_multiplier}x complexity)")

    progress.status = "running"
    progress.start_time_ms = int(time.time() * 1000)

    try:
        # Execute skill for this scenario
        result = await execute_skill_with_milestones(
            skill_name=state["skill_name"],
            scenario_def=scenario_def,
            progress=progress,
            state=state
        )

        progress.status = "complete"
        progress.elapsed_ms = int(time.time() * 1000) - progress.start_time_ms
        progress.partial_results.append(result)

        print(f"[SCENARIO {scenario_id.upper()}] Complete in {progress.elapsed_ms}ms")

        return {f"progress_{scenario_id}": progress}

    except Exception as e:
        progress.status = "failed"
        progress.errors.append({
            "timestamp": datetime.now().isoformat(),
            "message": str(e),
            "severity": "error"
        })
        print(f"[SCENARIO {scenario_id.upper()}] Failed: {e}")

        return {f"progress_{scenario_id}": progress}


async def execute_skill_with_milestones(
    skill_name: str,
    scenario_def: ScenarioDefinition,
    progress: ScenarioProgress,
    state: ScenarioOrchestratorState
) -> dict:
    """
    Execute skill, recording milestones and checkpoints.

    This is where you call YOUR SKILL.
    """

    milestones = [0, 30, 50, 70, 90, 100]  # Percentage checkpoints
    results = {"batches": [], "quality": {}}

    input_items = generate_test_data(
        size=scenario_def.input_size,
        characteristics=scenario_def.dataset_characteristics
    )

    batch_size = scenario_def.skill_params.get("batch_size", 10)

    for batch_idx, batch in enumerate(chunks(input_items, batch_size)):
        # Execute skill on this batch
        # Replace this with your actual skill invocation
        batch_result = await invoke_skill(
            skill_name=skill_name,
            input_data=batch,
            params=scenario_def.skill_params
        )

        results["batches"].append(batch_result)
        progress.batch_count += 1
        progress.items_processed += len(batch)

        # Update progress percentage
        progress.progress_pct = (progress.items_processed / scenario_def.input_size) * 100

        # Check if we've reached a milestone
        reached_milestones = [m for m in milestones if m <= progress.progress_pct]
        new_milestones = [m for m in reached_milestones if m not in progress.milestones_reached]

        for milestone in new_milestones:
            progress.milestones_reached.append(milestone)
            elapsed = int(time.time() * 1000) - progress.start_time_ms
            progress.elapsed_checkpoints[f"milestone_{milestone}"] = elapsed

            print(f"  [{progress.scenario_id}] Reached {milestone}% at {elapsed}ms")

            # Optional: Wait for other scenarios at major milestones
            if milestone in [30, 70]:
                await synchronize_at_milestone(milestone, state)

    # Score results
    results["quality"] = calculate_quality_metrics(results["batches"], scenario_def.quality_metrics)
    progress.quality_scores = results["quality"]

    return results

Synchronization Node

async def synchronize_at_milestone(
    milestone_pct: int,
    state: ScenarioOrchestratorState,
    timeout_seconds: int = 30
) -> bool:
    """
    Optional: Wait for other scenarios at major milestones.

    Returns True if all scenarios reached milestone, False if timeout.
    """

    start = time.time()
    milestone_key = f"checkpoint_{milestone_pct}"

    while time.time() - start < timeout_seconds:
        simple_at_milestone = milestone_pct in state["progress_simple"].milestones_reached
        medium_at_milestone = milestone_pct in state["progress_medium"].milestones_reached
        complex_at_milestone = milestone_pct in state["progress_complex"].milestones_reached

        all_reached = simple_at_milestone and medium_at_milestone and complex_at_milestone

        if all_reached:
            state["sync_points"][milestone_key] = True
            print(f"[SYNC] All scenarios reached {milestone_pct}%")
            return True

        # Check if any scenario failed
        if any(state[f"progress_{s}"].status == "failed" for s in ["simple", "medium", "complex"]):
            print(f"[SYNC] A scenario failed, proceeding without sync")
            return False

        await asyncio.sleep(0.5)

    print(f"[SYNC] Timeout at {milestone_pct}%, proceeding")
    return False

Aggregator Node

async def scenario_aggregator(state: ScenarioOrchestratorState) -> dict:
    """
    Collect all scenario results and synthesize findings.
    """

    print("[AGGREGATOR] Combining results from all scenarios")

    aggregated = {
        "orchestration_id": state["orchestration_id"],
        "skill": state["skill_name"],
        "timestamp": datetime.now().isoformat(),

        # Raw results
        "results_by_scenario": {
            "simple": state["progress_simple"].partial_results[-1] if state["progress_simple"].partial_results else {},
            "medium": state["progress_medium"].partial_results[-1] if state["progress_medium"].partial_results else {},
            "complex": state["progress_complex"].partial_results[-1] if state["progress_complex"].partial_results else {},
        },

        # Metrics
        "metrics": {},

        # Comparison
        "comparison": {},

        # Recommendations
        "recommendations": []
    }

    # Calculate comparative metrics
    for scenario_id in ["simple", "medium", "complex"]:
        progress = state[f"progress_{scenario_id}"]

        aggregated["metrics"][scenario_id] = {
            "elapsed_ms": progress.elapsed_ms,
            "items_processed": progress.items_processed,
            "quality_scores": progress.quality_scores,
            "errors": len(progress.errors)
        }

    # Compare quality vs. complexity
    simple_quality = state["progress_simple"].quality_scores.get("overall", 0)
    medium_quality = state["progress_medium"].quality_scores.get("overall", 0)
    complex_quality = state["progress_complex"].quality_scores.get("overall", 0)

    aggregated["comparison"]["quality_ranking"] = {
        "best": max(
            ("simple", simple_quality),
            ("medium", medium_quality),
            ("complex", complex_quality),
            key=lambda x: x[1]
        )[0],
        "scores": {
            "simple": simple_quality,
            "medium": medium_quality,
            "complex": complex_quality
        }
    }

    # Time complexity analysis
    simple_time = state["progress_simple"].elapsed_ms
    medium_time = state["progress_medium"].elapsed_ms
    complex_time = state["progress_complex"].elapsed_ms

    simple_size = 100 * 1.0
    medium_size = 100 * 3.0
    complex_size = 100 * 8.0

    aggregated["comparison"]["time_per_item_ms"] = {
        "simple": simple_time / simple_size,
        "medium": medium_time / medium_size,
        "complex": complex_time / complex_size,
    }

    # Identify scaling issues
    if complex_time / complex_size > simple_time / simple_size * 2:
        aggregated["recommendations"].append("Sublinear scaling—excellent performance with increased load")
    elif complex_time / complex_size < simple_time / simple_size * 0.8:
        aggregated["recommendations"].append("Superlinear scaling—overhead increases with load")

    # Success patterns
    success_patterns = []
    for scenario_id in ["simple", "medium", "complex"]:
        if state[f"progress_{scenario_id}"].status == "complete" and state[f"progress_{scenario_id}"].errors == []:
            success_patterns.append(scenario_id)

    aggregated["recommendations"].append(f"Successful in all scenarios: {', '.join(success_patterns)}")

    return {"final_results": aggregated}

3. Graph Construction

from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import Command

def build_scenario_orchestrator(
    checkpointer: PostgresSaver | None = None
) -> Any:
    """
    Build the complete orchestration graph.
    """

    graph = StateGraph(ScenarioOrchestratorState)

    # Nodes
    graph.add_node("supervisor", scenario_supervisor)
    graph.add_node("scenario_worker", scenario_worker)
    graph.add_node("aggregator", scenario_aggregator)

    # Edges
    graph.add_edge(START, "supervisor")

    # Fan-out: supervisor sends to 3 parallel workers
    graph.add_conditional_edges(
        "supervisor",
        lambda _: ["scenario_worker", "scenario_worker", "scenario_worker"]
    )

    # Workers converge at aggregator
    graph.add_edge("scenario_worker", "aggregator")
    graph.add_edge("aggregator", END)

    # Compile with checkpointing
    return graph.compile(checkpointer=checkpointer)

4. Invocation Example

import asyncio
import uuid
from langgraph.checkpoint.postgres import PostgresSaver

async def main():
    # Setup checkpointing
    checkpointer = PostgresSaver.from_conn_string(
        "postgresql://user:password@localhost/orchestkit"
    )

    # Build orchestrator
    app = build_scenario_orchestrator(checkpointer=checkpointer)

    # Prepare initial state
    initial_state: ScenarioOrchestratorState = {
        "orchestration_id": f"demo-{uuid.uuid4().hex[:8]}",
        "start_time_unix": int(time.time()),
        "skill_name": "your-skill-name",
        "skill_version": "1.0.0",

        # Scenarios
        "scenario_simple": ScenarioDefinition(
            name="simple",
            difficulty="easy",
            complexity_multiplier=1.0,
            input_size=100,
            dataset_characteristics={"distribution": "uniform"},
            time_budget_seconds=30,
            memory_limit_mb=256,
            error_tolerance=0.0,
            skill_params={"batch_size": 10, "cache_enabled": True},
            expected_quality="basic",
            quality_metrics=["accuracy", "coverage"]
        ),
        "scenario_medium": ScenarioDefinition(
            name="medium",
            difficulty="intermediate",
            complexity_multiplier=3.0,
            input_size=300,
            dataset_characteristics={"distribution": "uniform"},
            time_budget_seconds=90,
            memory_limit_mb=512,
            error_tolerance=0.05,
            skill_params={"batch_size": 50, "cache_enabled": True},
            expected_quality="good",
            quality_metrics=["accuracy", "coverage"]
        ),
        "scenario_complex": ScenarioDefinition(
            name="complex",
            difficulty="advanced",
            complexity_multiplier=8.0,
            input_size=800,
            dataset_characteristics={"distribution": "skewed"},
            time_budget_seconds=300,
            memory_limit_mb=1024,
            error_tolerance=0.1,
            skill_params={"batch_size": 100, "cache_enabled": True, "parallel_workers": 4},
            expected_quality="excellent",
            quality_metrics=["accuracy", "coverage", "latency"]
        ),

        # Progress tracking
        "progress_simple": ScenarioProgress(scenario_id="simple"),
        "progress_medium": ScenarioProgress(scenario_id="medium"),
        "progress_complex": ScenarioProgress(scenario_id="complex"),

        # Synchronization
        "sync_points": {},
        "last_sync_time": 0,
    }

    # Run with thread_id for checkpointing
    config = {"configurable": {"thread_id": f"orch-{initial_state['orchestration_id']}"}}

    print("Starting multi-scenario orchestration...")
    result = await app.ainvoke(initial_state, config=config)

    # Print results
    final = result["final_results"]
    print("\n" + "="*60)
    print("ORCHESTRATION RESULTS")
    print("="*60)
    print(f"Orchestration ID: {final['orchestration_id']}")
    print(f"Skill: {final['skill']}")
    print("\nQuality Comparison:")
    for scenario, score in final["comparison"]["quality_ranking"]["scores"].items():
        print(f"  {scenario}: {score:.2f}")
    print("\nTime per Item (ms):")
    for scenario, time in final["comparison"]["time_per_item_ms"].items():
        print(f"  {scenario}: {time:.2f}ms")
    print("\nRecommendations:")
    for rec in final["recommendations"]:
        print(f"  • {rec}")

if __name__ == "__main__":
    asyncio.run(main())

5. Helper Functions

def chunks(items: list, size: int):
    """Split items into chunks."""
    for i in range(0, len(items), size):
        yield items[i:i + size]

def generate_test_data(size: int, characteristics: dict) -> list:
    """Generate test data based on scenario characteristics."""
    import random

    distribution = characteristics.get("distribution", "uniform")

    if distribution == "uniform":
        return [{"id": i, "value": random.random()} for i in range(size)]
    elif distribution == "skewed":
        # Zipfian distribution
        return [
            {"id": i, "value": random.random() ** 2}
            for i in range(size)
        ]
    else:
        return [{"id": i, "value": random.random()} for i in range(size)]

async def invoke_skill(
    skill_name: str,
    input_data: list,
    params: dict
) -> dict:
    """
    Invoke your skill here.

    Replace with actual skill invocation.
    """
    # Simulate processing
    await asyncio.sleep(0.1)  # 100ms per batch

    return {
        "processed": len(input_data),
        "quality_score": 0.85 + (random.random() * 0.15),
        "timestamp": datetime.now().isoformat()
    }

def calculate_quality_metrics(batches: list, metrics: list[str]) -> dict:
    """Calculate quality metrics across batches."""
    if not batches:
        return {metric: 0.0 for metric in metrics}

    scores = {
        "accuracy": sum(b.get("quality_score", 0) for b in batches) / len(batches),
        "coverage": 1.0,
    }

    return {metric: scores.get(metric, 0.0) for metric in metrics}

6. Streaming Results (Real-time Progress)

async def stream_orchestration_progress(
    app,
    initial_state: ScenarioOrchestratorState,
    config: dict
):
    """
    Stream progress updates as scenarios execute.
    """

    async for step in app.astream(initial_state, config=config, stream_mode="updates"):
        print(f"\n[UPDATE] {step}")

        # Extract progress from step
        if "progress_simple" in step:
            p = step["progress_simple"]
            print(f"  Simple: {p.progress_pct:.1f}% ({p.items_processed} items)")

        if "progress_medium" in step:
            p = step["progress_medium"]
            print(f"  Medium: {p.progress_pct:.1f}% ({p.items_processed} items)")

        if "progress_complex" in step:
            p = step["progress_complex"]
            print(f"  Complex: {p.progress_pct:.1f}% ({p.items_processed} items)")

Key Features

  1. Fan-Out/Fan-In: All 3 scenarios execute in parallel
  2. Milestone Tracking: Progress recorded at key checkpoints
  3. Synchronization: Optional wait points at 30% and 70%
  4. Error Isolation: One scenario's failure doesn't block others
  5. Checkpointing: State saved to PostgreSQL for recovery
  6. Aggregation: Cross-scenario analysis and recommendations
  7. Streaming: Real-time progress updates

Testing

@pytest.mark.asyncio
async def test_multi_scenario_orchestration():
    # Mock checkpointer
    from langgraph.checkpoint.memory import MemorySaver

    app = build_scenario_orchestrator(checkpointer=MemorySaver())

    initial_state = {...}  # Setup
    config = {"configurable": {"thread_id": "test-123"}}

    result = await app.ainvoke(initial_state, config=config)

    assert result["final_results"]["orchestration_id"]
    assert "simple" in result["final_results"]["metrics"]
    assert "medium" in result["final_results"]["metrics"]
    assert "complex" in result["final_results"]["metrics"]

Microsoft Agent Framework

Microsoft Agent Framework

Microsoft Agent Framework (AutoGen + Semantic Kernel merger) patterns for enterprise multi-agent systems.

AssistantAgent Setup

from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient

# Create model client
model_client = OpenAIChatCompletionClient(
    model="gpt-5.2",
    api_key=os.environ["OPENAI_API_KEY"]
)

# Define assistant agent
assistant = AssistantAgent(
    name="assistant",
    description="A helpful AI assistant",
    model_client=model_client,
    system_message="You are a helpful assistant. Answer questions concisely."
)

Team Patterns

Round Robin Chat

from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination

# Define team members
planner = AssistantAgent(
    name="planner",
    description="Plans tasks",
    model_client=model_client,
    system_message="You plan tasks. When done, say 'PLAN_COMPLETE'."
)

executor = AssistantAgent(
    name="executor",
    description="Executes tasks",
    model_client=model_client,
    system_message="You execute the plan. When done, say 'EXECUTION_COMPLETE'."
)

reviewer = AssistantAgent(
    name="reviewer",
    description="Reviews work",
    model_client=model_client,
    system_message="Review the work. Say 'APPROVED' if satisfactory."
)

# Create team with termination
termination = TextMentionTermination("APPROVED")
team = RoundRobinGroupChat(
    participants=[planner, executor, reviewer],
    termination_condition=termination
)

# Run team
result = await team.run(task="Create a marketing strategy")

Selector Group Chat

from autogen_agentchat.teams import SelectorGroupChat

# Selector chooses next speaker based on context
team = SelectorGroupChat(
    participants=[analyst, writer, reviewer],
    model_client=model_client,  # For selection decisions
    termination_condition=termination
)

Tool Integration

from autogen_core.tools import FunctionTool

# Define tool function
def search_database(query: str) -> str:
    """Search the database for information."""
    results = db.search(query)
    return json.dumps(results)

# Create tool
search_tool = FunctionTool(search_database, description="Search the database")

# Agent with tools
researcher = AssistantAgent(
    name="researcher",
    description="Researches information",
    model_client=model_client,
    tools=[search_tool],
    system_message="Use the search tool to find information."
)

Termination Conditions

from autogen_agentchat.conditions import (
    TextMentionTermination,
    MaxMessageTermination,
    TokenUsageTermination,
    TimeoutTermination
)

# Combine termination conditions
from autogen_agentchat.conditions import OrTerminationCondition

termination = OrTerminationCondition(
    TextMentionTermination("DONE"),
    MaxMessageTermination(max_messages=20),
    TimeoutTermination(timeout_seconds=300)
)

Streaming

# Stream team responses
async for message in team.run_stream(task="Analyze this data"):
    print(f"{message.source}: {message.content}")

State Management

from autogen_agentchat.state import TeamState

# Save state
state = await team.save_state()

# Restore state
await team.load_state(state)

# Resume conversation
result = await team.run(task="Continue from where we left off")

Agent-to-Agent Protocol (A2A)

from autogen_agentchat.protocols import A2AProtocol

# Enable A2A for cross-organization agent communication
protocol = A2AProtocol(
    agent=my_agent,
    endpoint="https://api.example.com/agent",
    auth_token=os.environ["A2A_TOKEN"]
)

# Send message to external agent
response = await protocol.send(
    to="external-agent-id",
    message="Process this request"
)

Migration from AutoGen 0.2

# Old AutoGen 0.2 pattern
# from autogen import AssistantAgent, UserProxyAgent

# New AutoGen 0.4+ pattern
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat

# Key differences:
# - No UserProxyAgent needed for simple tasks
# - Teams replace GroupChat
# - Explicit termination conditions required
# - Model client separate from agent

Configuration

  • Model clients: OpenAI, Azure OpenAI, Anthropic supported
  • Teams: RoundRobin, Selector, Custom
  • Termination: Text mention, max messages, timeout, token usage
  • Tools: FunctionTool wrapper for Python functions
  • State: Full state serialization for persistence

Best Practices

  1. Termination conditions: Always set explicit termination
  2. Team size: 3-5 agents optimal for most workflows
  3. System messages: Clear role definitions in system_message
  4. Tool design: One function per tool, clear descriptions
  5. Error handling: Use try/except around team.run()
  6. Streaming: Use run_stream() for real-time feedback

Openai Agents Sdk

OpenAI Agents SDK

OpenAI Agents SDK (v0.7.0) patterns for handoffs, guardrails, agents-as-tools, sessions, MCP servers, and tracing.

Requirements

# Install (requires Python 3.9+, supports up to 3.14)
pip install openai-agents>=0.7.0

# Note: Requires openai v2.x (v1.x no longer supported)
# openai>=2.9.0,<3 is required

Basic Agent Definition

from agents import Agent, Runner

agent = Agent(
    name="assistant",
    instructions="You are a helpful assistant that answers questions.",
    model="gpt-5.2"
)

# Synchronous run
runner = Runner()
result = runner.run_sync(agent, "What is the capital of France?")
print(result.final_output)

Sessions (v0.6.6+)

Sessions provide automatic conversation history management across agent runs.

from agents import Agent, Runner
from agents.sessions import SQLiteSession

# Create a session store
session = SQLiteSession(db_path="conversations.db")

# Agent with automatic history management
agent = Agent(
    name="assistant",
    instructions="You are a helpful assistant.",
    model="gpt-5.2"
)

runner = Runner()

# Sessions automatically:
# - Retrieve conversation history before each run
# - Store new messages after each run
result = await runner.run(
    agent,
    "Remember my name is Alice",
    session=session,
    session_id="user-123"
)

# Later conversation - history is automatic
result = await runner.run(
    agent,
    "What is my name?",  # Agent recalls "Alice"
    session=session,
    session_id="user-123"
)

Session Types

from agents.sessions import (
    SQLiteSession,          # File-based persistence
    AsyncSQLiteSession,     # Async SQLite (v0.6.6+)
    SQLAlchemySession,      # Database-agnostic ORM
    RedisSession,           # Redis-backed sessions
    EncryptedSession,       # Encrypted storage
)

# AsyncSQLiteSession for async workflows
async_session = AsyncSQLiteSession(db_path="async_conversations.db")

# Auto-compaction for long conversations (v0.6.6+)
# Use responses.compact: "auto" to manage context length

Handoffs Between Agents

from agents import Agent, handoff, RunConfig
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX

# Specialist agents
billing_agent = Agent(
    name="billing",
    instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You handle billing inquiries. Check account status and payment issues.
Hand back to triage when billing issue is resolved.""",
    model="gpt-5.2"
)

support_agent = Agent(
    name="support",
    instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You handle technical support. Troubleshoot issues and provide solutions.
Hand back to triage when support issue is resolved.""",
    model="gpt-5.2"
)

# Triage agent with handoffs
triage_agent = Agent(
    name="triage",
    instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are the first point of contact. Determine the nature of inquiries.
- Billing questions: hand off to billing
- Technical issues: hand off to support
- General questions: answer directly""",
    model="gpt-5.2",
    handoffs=[
        handoff(agent=billing_agent),
        handoff(agent=support_agent)
    ]
)

Handoff History Packaging (v0.7.0)

In v0.7.0, nested handoffs are opt-in (previously enabled by default). When enabled, conversation history is collapsed into a single assistant message wrapped in a &lt;CONVERSATION HISTORY&gt; block.

from agents import RunConfig

# Enable nested handoff history (opt-in in v0.7.0)
config = RunConfig(nest_handoff_history=True)

result = await runner.run(
    triage_agent,
    "I have a billing question",
    run_config=config
)

# History is packaged as a single assistant message:
# <CONVERSATION HISTORY>
# [collapsed prior transcript]
# </CONVERSATION HISTORY>

Handoff Input Filters

from agents import handoff
from agents.extensions.handoff_filters import remove_all_tools

# Filter what the receiving agent sees
billing_handoff = handoff(
    agent=billing_agent,
    input_filter=remove_all_tools  # Strip tool calls from history
)

# Custom input filter
def custom_filter(data):
    # Modify HandoffInputData before passing to receiving agent
    return data

support_handoff = handoff(
    agent=support_agent,
    input_filter=custom_filter,
    nest_handoff_history=True  # Per-handoff override
)

MCPServerManager (v0.7.0)

Manage multiple MCP server instances with improved lifecycle safety.

from agents import Agent
from agents.mcp import MCPServerManager, MCPServerStdio, MCPServerStreamableHTTP

# Create MCP server manager for multiple servers
async with MCPServerManager() as manager:
    # Add stdio-based MCP server
    filesystem_server = await manager.add_server(
        MCPServerStdio(
            name="filesystem",
            command="npx",
            args=["-y", "@anthropic/mcp-filesystem", "/path/to/dir"]
        )
    )

    # Add HTTP-based MCP server
    api_server = await manager.add_server(
        MCPServerStreamableHTTP(
            name="api-tools",
            url="http://localhost:8080/mcp"
        )
    )

    # Get tools from all servers
    all_tools = await manager.list_tools()

    # Create agent with MCP tools
    agent = Agent(
        name="mcp_agent",
        instructions="Use available tools to help the user.",
        model="gpt-5.2",
        mcp_servers=[filesystem_server, api_server]
    )

    runner = Runner()
    result = await runner.run(agent, "List files in the current directory")

Single MCP Server

from agents import Agent
from agents.mcp import MCPServerStdio

# Single MCP server (simpler pattern)
async with MCPServerStdio(
    name="filesystem",
    command="npx",
    args=["-y", "@anthropic/mcp-filesystem", "/tmp"]
) as server:
    agent = Agent(
        name="file_agent",
        instructions="Help with file operations.",
        model="gpt-5.2",
        mcp_servers=[server]
    )
    result = await runner.run(agent, "What files are available?")

Agents as Tools

from agents import Agent, tool

# Define tool functions
@tool
def search_knowledge_base(query: str) -> str:
    """Search the knowledge base for relevant information."""
    # Implementation
    return search_results

@tool
def create_ticket(title: str, description: str, priority: str) -> str:
    """Create a support ticket in the system."""
    ticket_id = ticket_system.create(title, description, priority)
    return f"Created ticket {ticket_id}"

# Agent with tools
support_agent = Agent(
    name="support",
    instructions="Help users with technical issues. Search knowledge base first.",
    model="gpt-5.2",
    tools=[search_knowledge_base, create_ticket]
)

Guardrails

from agents import Agent, InputGuardrail, OutputGuardrail
from agents.exceptions import InputGuardrailException

# Input guardrail
class ContentFilter(InputGuardrail):
    async def check(self, input_text: str) -> str:
        if contains_pii(input_text):
            raise InputGuardrailException("PII detected in input")
        return input_text

# Output guardrail
class ResponseValidator(OutputGuardrail):
    async def check(self, output_text: str) -> str:
        if contains_harmful_content(output_text):
            return "I cannot provide that information."
        return output_text

# Agent with guardrails
agent = Agent(
    name="safe_assistant",
    instructions="You are a helpful assistant.",
    model="gpt-5.2",
    input_guardrails=[ContentFilter()],
    output_guardrails=[ResponseValidator()]
)

Tool Guardrails (v0.6.5+)

from agents import tool, tool_guardrail

@tool_guardrail
def validate_ticket_priority(priority: str) -> str:
    """Validate ticket priority before creation."""
    valid = ["low", "medium", "high", "critical"]
    if priority.lower() not in valid:
        raise ValueError(f"Priority must be one of: {valid}")
    return priority

@tool
@validate_ticket_priority
def create_ticket(title: str, description: str, priority: str) -> str:
    """Create a support ticket."""
    return f"Created ticket with priority {priority}"

Tracing and Observability

from agents import Agent, Runner, trace

# Enable tracing
runner = Runner(trace=True)

# Custom trace spans
async def complex_workflow(task: str):
    with trace.span("research_phase"):
        research = await runner.run(researcher, task)

    with trace.span("writing_phase"):
        content = await runner.run(writer, research.final_output)

    return content

# Access trace data
result = await runner.run(agent, "Process this request")
print(result.trace_id)  # For debugging

# Per-run tracing API key (v0.6.5+)
config = RunConfig(tracing={"api_key": "sk-tracing-123"})
result = await runner.run(agent, "Task", run_config=config)

Streaming Responses

from agents import Agent, Runner

agent = Agent(
    name="streamer",
    instructions="Provide detailed explanations.",
    model="gpt-5.2"
)

runner = Runner()

# Stream response chunks
async for chunk in runner.run_streamed(agent, "Explain quantum computing"):
    print(chunk.content, end="", flush=True)

Multi-Agent Conversation

from agents import Agent, Runner

# Manual conversation management
runner = Runner()

# Initial run
result1 = await runner.run(triage_agent, "I need help with my account")

# Continue with result's input list
result2 = await runner.run(
    result1.handoff_to or triage_agent,
    "Can you check my billing?",
    input=result1.to_input_list()  # Carries conversation history
)

# Or use Sessions for automatic management (recommended)
from agents.sessions import SQLiteSession

session = SQLiteSession(db_path="conversations.db")
result = await runner.run(
    triage_agent,
    "I need help",
    session=session,
    session_id="conv-123"
)

Realtime and Voice

from agents import Agent
from agents.realtime import RealtimeRunner, OpenAIRealtimeWebSocketModel

# WebSocket-based realtime agent
model = OpenAIRealtimeWebSocketModel(
    model="gpt-5.2-realtime",
    # Custom WebSocket options (v0.7.0+)
    websocket_options={"ping_interval": 30}
)

agent = Agent(
    name="voice_assistant",
    instructions="You are a voice assistant.",
    model=model
)

# Realtime runner for voice interactions
realtime_runner = RealtimeRunner()
await realtime_runner.run(agent, audio_stream)

Configuration

Model Settings (v0.7.0)

from agents import Agent, RunConfig

# Note: Default reasoning.effort for gpt-5.1/5.2 is now "none"
# (previously "low"). Set explicitly if needed:
config = RunConfig(
    model_settings={
        "reasoning": {"effort": "low"}  # Restore previous behavior
    }
)

agent = Agent(
    name="assistant",
    instructions="You are helpful.",
    model="gpt-5.2"
)

result = await runner.run(agent, "Complex task", run_config=config)

Provider Support

The SDK supports 100+ LLMs via LiteLLM integration:

from agents import Agent, set_default_openai_api

# Use Chat Completions API (for non-OpenAI providers)
set_default_openai_api("chat_completions")

# Or use LiteLLM for other providers
# pip install openai-agents[litellm]
from agents.extensions.litellm import LiteLLMModel

agent = Agent(
    name="claude_agent",
    instructions="You are helpful.",
    model=LiteLLMModel(model="anthropic/claude-sonnet-4-20250514")
)

Best Practices

  1. Handoff clarity: Use RECOMMENDED_PROMPT_PREFIX for reliable handoffs
  2. Tool documentation: Clear docstrings improve tool selection accuracy
  3. Guardrail layers: Combine input + output guardrails for defense-in-depth
  4. Tracing: Always enable in production for debugging
  5. Error handling: Catch guardrail exceptions gracefully
  6. Sessions: Use for multi-turn conversations instead of manual history
  7. MCP servers: Use MCPServerManager for multiple servers with proper lifecycle

Breaking Changes in v0.7.0

ChangePreviousv0.7.0
Nested handoffsEnabled by defaultOpt-in via nest_handoff_history=True
Reasoning effortDefault "low"Default "none" for gpt-5.1/5.2
Session inputRequired callbackAuto-append (callback optional)
OpenAI libraryv1.x supportedRequires v2.x (>=2.9.0)

Version History

  • v0.7.0 (Jan 2026): MCPServerManager, opt-in nested handoffs, session input auto-append
  • v0.6.6 (Jan 2026): Auto-compaction, AsyncSQLiteSession
  • v0.6.5 (Jan 2026): Per-run tracing, tool guardrails decorator
  • v0.6.0: Sessions feature, improved handoff history

Skill Agnostic Template

Skill-Agnostic Template Abstraction

Reusable template framework for applying multi-scenario orchestration to ANY user-invocable skill.

Template Architecture

┌─────────────────────────────────────────────────────────────┐
│               SKILL ORCHESTRATION TEMPLATE                   │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│  [Abstract Orchestrator]                                   │
│   ├─ Scenario Definition (parameterized)                   │
│   ├─ State Machine (generic)                               │
│   ├─ Synchronization (milestone-based)                     │
│   └─ Aggregation (cross-scenario)                          │
│                                                             │
│  [Skill Adapter Layer]  ◄── Pluggable per skill            │
│   ├─ invoke_skill()                                        │
│   ├─ calculate_quality_metrics()                           │
│   ├─ scenario_configurations()                             │
│   └─ expected_outcomes()                                   │
│                                                             │
│  [Target Skill] ◄── Your skill here                         │
│   ├─ skill-name                                            │
│   ├─ skill-version                                         │
│   └─ skill-params                                          │
│                                                             │
└─────────────────────────────────────────────────────────────┘

Generic Orchestrator Class

from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar
from dataclasses import dataclass

T = TypeVar("T")  # Result type

class SkillOrchestrator(ABC, Generic[T]):
    """
    Abstract orchestrator for any user-invocable skill.

    Subclass for each skill and implement abstract methods.
    """

    def __init__(self, skill_name: str, skill_version: str):
        self.skill_name = skill_name
        self.skill_version = skill_version

    # ──────────────────────────────────────────────────────────
    # Abstract Methods (MUST implement per skill)
    # ──────────────────────────────────────────────────────────

    @abstractmethod
    async def invoke_skill(
        self,
        input_data: list[dict],
        scenario_params: dict,
    ) -> dict[str, Any]:
        """
        Invoke your skill on input data.

        Args:
            input_data: Items to process
            scenario_params: Skill-specific parameters (batch_size, etc.)

        Returns:
            dict with keys:
                - "processed": int (items successfully processed)
                - "results": list[dict] (processed items)
                - "quality_score": float (0-1)
                - "metrics": dict (skill-specific metrics)
                - "errors": list[str] (any errors encountered)
        """
        pass

    @abstractmethod
    def get_scenario_configs(self) -> dict[str, dict]:
        """
        Return scenario configurations for simple/medium/complex.

        Returns:
            {
                "simple": {
                    "input_size": 100,
                    "time_budget_seconds": 30,
                    "batch_size": 10,
                    "skill_params": {...}
                },
                "medium": {...},
                "complex": {...}
            }
        """
        pass

    @abstractmethod
    def calculate_quality_metrics(
        self,
        results: list[dict],
        metric_names: list[str]
    ) -> dict[str, float]:
        """
        Calculate quality metrics from results.

        Args:
            results: Processed items from skill
            metric_names: ["accuracy", "coverage", ...]

        Returns:
            {
                "accuracy": 0.92,
                "coverage": 0.98,
                ...
            }
        """
        pass

    @abstractmethod
    def generate_test_data(
        self,
        size: int,
        characteristics: dict
    ) -> list[dict]:
        """
        Generate test data for this skill.

        Args:
            size: Number of items
            characteristics: {"distribution": "uniform|skewed|clustered"}

        Returns:
            list of test items matching skill's input format
        """
        pass

    # ──────────────────────────────────────────────────────────
    # Concrete Methods (reusable across all skills)
    # ──────────────────────────────────────────────────────────

    async def run_scenario(
        self,
        scenario_id: str,
        orchestration_id: str,
    ) -> dict:
        """
        Execute one scenario (simple/medium/complex).

        Concrete implementation—uses abstract methods.
        """

        config = self.get_scenario_configs()[scenario_id]

        # Generate test data
        test_data = self.generate_test_data(
            size=config["input_size"],
            characteristics=config.get("dataset_characteristics", {})
        )

        # Process in batches
        all_results = []
        batch_size = config.get("batch_size", 10)

        for batch_idx, batch in enumerate(chunks(test_data, batch_size)):
            # Invoke skill (abstract—implemented per skill)
            result = await self.invoke_skill(batch, config["skill_params"])

            all_results.extend(result.get("results", []))

            progress_pct = (len(all_results) / len(test_data)) * 100
            print(f"[{scenario_id}] Progress: {progress_pct:.1f}%")

        # Calculate quality (abstract—implemented per skill)
        quality = self.calculate_quality_metrics(
            all_results,
            config.get("quality_metrics", ["accuracy"])
        )

        return {
            "scenario_id": scenario_id,
            "orchestration_id": orchestration_id,
            "items_processed": len(all_results),
            "quality": quality,
            "results": all_results,
        }

    async def orchestrate(
        self,
        orchestration_id: str
    ) -> dict:
        """
        Run all 3 scenarios in parallel and aggregate results.

        Concrete implementation—reusable for all skills.
        """

        import asyncio

        # Run scenarios in parallel
        results = await asyncio.gather(
            self.run_scenario("simple", orchestration_id),
            self.run_scenario("medium", orchestration_id),
            self.run_scenario("complex", orchestration_id),
            return_exceptions=True
        )

        # Aggregate
        aggregated = self.aggregate_results(results)

        return aggregated

    def aggregate_results(self, scenario_results: list[dict]) -> dict:
        """
        Combine results from all 3 scenarios.

        Generic aggregation—works for any skill.
        """

        # Extract results by scenario
        by_scenario = {r.get("scenario_id"): r for r in scenario_results if not isinstance(r, Exception)}

        # Quality comparison
        quality_comparison = {
            sid: r.get("quality", {})
            for sid, r in by_scenario.items()
        }

        # Time complexity (simulated—customize per skill)
        simple_items = by_scenario.get("simple", {}).get("items_processed", 1)
        medium_items = by_scenario.get("medium", {}).get("items_processed", 1)
        complex_items = by_scenario.get("complex", {}).get("items_processed", 1)

        scaling_ratio = complex_items / simple_items if simple_items > 0 else 1

        return {
            "orchestration_id": scenario_results[0].get("orchestration_id"),
            "skill": self.skill_name,
            "timestamp": datetime.now().isoformat(),
            "results_by_scenario": by_scenario,
            "quality_comparison": quality_comparison,
            "scaling_analysis": {
                "simple_vs_complex": scaling_ratio,
                "recommendation": self._scaling_recommendation(scaling_ratio)
            },
            "success_rate": sum(1 for r in scenario_results if not isinstance(r, Exception)) / len(scenario_results),
        }

    @staticmethod
    def _scaling_recommendation(ratio: float) -> str:
        """Recommend based on scaling behavior."""
        if ratio < 1.5:
            return "Excellent sublinear scaling"
        elif ratio < 3:
            return "Good linear scaling"
        elif ratio < 10:
            return "Acceptable superlinear scaling"
        else:
            return "Poor scaling—investigate bottlenecks"

Example Implementation: Performance Testing

class PerformanceTestingOrchestrator(SkillOrchestrator):
    """Multi-scenario orchestration for performance-testing skill."""

    async def invoke_skill(
        self,
        input_data: list[dict],
        scenario_params: dict,
    ) -> dict:
        """
        Invoke k6 or Locust performance test.
        """
        import asyncio

        # Mock: simulate running k6 with duration
        duration_ms = scenario_params.get("duration_ms", 5000)

        await asyncio.sleep(duration_ms / 1000)  # Simulate k6 execution

        return {
            "processed": len(input_data),
            "results": [
                {
                    "endpoint": item["url"],
                    "response_time_ms": 100 + (i * 50),
                    "status": "200" if i % 10 != 0 else "500",
                }
                for i, item in enumerate(input_data)
            ],
            "quality_score": 0.85,
            "metrics": {
                "p95_latency_ms": 450,
                "p99_latency_ms": 500,
                "error_rate": 0.01,
            },
            "errors": [],
        }

    def get_scenario_configs(self) -> dict:
        return {
            "simple": {
                "input_size": 10,  # 10 endpoints
                "time_budget_seconds": 30,
                "batch_size": 5,
                "dataset_characteristics": {"distribution": "uniform"},
                "quality_metrics": ["latency", "error_rate"],
                "skill_params": {
                    "duration_ms": 5000,
                    "ramp_up_seconds": 2,
                    "load_profile": "steady"
                }
            },
            "medium": {
                "input_size": 30,  # 30 endpoints
                "time_budget_seconds": 90,
                "batch_size": 15,
                "dataset_characteristics": {"distribution": "uniform"},
                "quality_metrics": ["latency", "error_rate"],
                "skill_params": {
                    "duration_ms": 30000,
                    "ramp_up_seconds": 5,
                    "load_profile": "ramp_and_hold"
                }
            },
            "complex": {
                "input_size": 80,  # 80 endpoints
                "time_budget_seconds": 300,
                "batch_size": 40,
                "dataset_characteristics": {"distribution": "skewed"},
                "quality_metrics": ["latency", "error_rate", "throughput"],
                "skill_params": {
                    "duration_ms": 120000,
                    "ramp_up_seconds": 10,
                    "load_profile": "spike"
                }
            }
        }

    def calculate_quality_metrics(
        self,
        results: list[dict],
        metric_names: list[str]
    ) -> dict:
        """Calculate latency and error rate metrics."""

        if not results:
            return {m: 0.0 for m in metric_names}

        latencies = [r.get("response_time_ms", 0) for r in results]
        errors = sum(1 for r in results if r.get("status", "200") != "200")

        scores = {}

        if "latency" in metric_names:
            # p95 latency score: higher is better (inverse of latency)
            p95 = sorted(latencies)[int(len(latencies) * 0.95)]
            # Score: 1.0 if p95 < 100ms, 0.0 if > 500ms
            scores["latency"] = max(0, min(1.0, (500 - p95) / 400))

        if "error_rate" in metric_names:
            # Error rate score: 1.0 if 0%, 0.0 if > 5%
            error_pct = (errors / len(results)) * 100 if results else 0
            scores["error_rate"] = max(0, min(1.0, (5 - error_pct) / 5))

        if "throughput" in metric_names:
            # Throughput score (normalized)
            rps = len(results) / 30  # Assume 30-second test
            scores["throughput"] = min(1.0, rps / 100)  # Max 100 RPS = 1.0

        return scores

    def generate_test_data(
        self,
        size: int,
        characteristics: dict
    ) -> list[dict]:
        """Generate endpoints to test."""

        endpoints = [
            {"url": f"https://api.example.com/endpoint/{i}", "method": "GET"}
            for i in range(size)
        ]

        if characteristics.get("distribution") == "skewed":
            # Repeat 20% of endpoints (hot paths)
            hot_endpoints = endpoints[:int(size * 0.2)]
            return endpoints + hot_endpoints

        return endpoints

Example Implementation: Security Scanning

class SecurityScanningOrchestrator(SkillOrchestrator):
    """Multi-scenario orchestration for security-scanning skill."""

    async def invoke_skill(
        self,
        input_data: list[dict],
        scenario_params: dict,
    ) -> dict:
        """Invoke security scanner on code files."""

        import random

        # Simulate scanning files
        results = []
        for item in input_data:
            findings = []

            # Simulate vulnerability detection
            if random.random() > 0.9:
                findings.append({
                    "type": "SQL_INJECTION",
                    "severity": "HIGH",
                    "line": random.randint(1, 100)
                })

            results.append({
                "file": item["path"],
                "scanned": True,
                "findings": findings,
                "score": 1.0 - (len(findings) * 0.1),
            })

        return {
            "processed": len(input_data),
            "results": results,
            "quality_score": 0.95,
            "metrics": {
                "vulnerabilities_found": sum(len(r.get("findings", [])) for r in results),
                "files_scanned": len(results),
            },
            "errors": [],
        }

    def get_scenario_configs(self) -> dict:
        return {
            "simple": {
                "input_size": 20,  # 20 files
                "time_budget_seconds": 45,
                "batch_size": 10,
                "dataset_characteristics": {"distribution": "uniform"},
                "quality_metrics": ["coverage", "accuracy"],
                "skill_params": {
                    "scan_depth": "shallow",
                    "rules": "OWASP_TOP_10"
                }
            },
            "medium": {
                "input_size": 100,  # 100 files
                "time_budget_seconds": 120,
                "batch_size": 50,
                "dataset_characteristics": {"distribution": "uniform"},
                "quality_metrics": ["coverage", "accuracy"],
                "skill_params": {
                    "scan_depth": "medium",
                    "rules": "OWASP_TOP_10 + CWE_TOP_25"
                }
            },
            "complex": {
                "input_size": 500,  # 500 files
                "time_budget_seconds": 600,
                "batch_size": 100,
                "dataset_characteristics": {"distribution": "skewed"},
                "quality_metrics": ["coverage", "accuracy", "false_positive_rate"],
                "skill_params": {
                    "scan_depth": "deep",
                    "rules": "ALL",
                    "enable_ml_detection": True
                }
            }
        }

    def calculate_quality_metrics(
        self,
        results: list[dict],
        metric_names: list[str]
    ) -> dict:
        """Calculate security scanning quality metrics."""

        total_files = len(results)
        scanned_files = sum(1 for r in results if r.get("scanned"))
        total_findings = sum(len(r.get("findings", [])) for r in results)

        scores = {}

        if "coverage" in metric_names:
            # Coverage: percentage of files successfully scanned
            scores["coverage"] = scanned_files / max(1, total_files)

        if "accuracy" in metric_names:
            # Accuracy: average finding score (file-level quality)
            avg_score = sum(r.get("score", 0) for r in results) / max(1, total_files)
            scores["accuracy"] = avg_score

        if "false_positive_rate" in metric_names:
            # FPR: simulated (would need manual validation in production)
            scores["false_positive_rate"] = 1.0 - (total_findings / (total_findings + 10))

        return scores

    def generate_test_data(
        self,
        size: int,
        characteristics: dict
    ) -> list[dict]:
        """Generate Python/JS files to scan."""

        files = [
            {"path": f"src/module_{i:03d}.py"}
            for i in range(size)
        ]

        if characteristics.get("distribution") == "skewed":
            # Add some large files (skew toward complexity)
            large_files = [
                {"path": f"src/legacy_module_{i:03d}.py", "size_lines": 5000}
                for i in range(int(size * 0.1))
            ]
            return files + large_files

        return files

Registration and Factory Pattern

class OrchestratorRegistry:
    """Register orchestrators for each skill."""

    _registry: dict[str, type[SkillOrchestrator]] = {}

    @classmethod
    def register(cls, skill_name: str, orchestrator_class: type):
        """Register an orchestrator for a skill."""
        cls._registry[skill_name] = orchestrator_class

    @classmethod
    def get(cls, skill_name: str) -> SkillOrchestrator:
        """Instantiate orchestrator for a skill."""
        if skill_name not in cls._registry:
            raise ValueError(f"No orchestrator registered for {skill_name}")

        return cls._registry[skill_name](skill_name, version="1.0.0")

# Registration
OrchestratorRegistry.register("performance-testing", PerformanceTestingOrchestrator)
OrchestratorRegistry.register("security-scanning", SecurityScanningOrchestrator)

# Usage
orchestrator = OrchestratorRegistry.get("performance-testing")
result = await orchestrator.orchestrate("demo-001")

Integration with LangGraph

async def scenario_worker_generic(state: ScenarioOrchestratorState) -> dict:
    """Generic worker that uses skill-agnostic orchestrator."""

    scenario_id = state.get("scenario_id")
    skill_name = state["skill_name"]

    # Get orchestrator for this skill
    orchestrator = OrchestratorRegistry.get(skill_name)

    # Run scenario
    result = await orchestrator.run_scenario(
        scenario_id=scenario_id,
        orchestration_id=state["orchestration_id"]
    )

    return {f"progress_{scenario_id}": result}

Adding a New Skill

To orchestrate a NEW skill, follow these steps:

Step 1: Create Orchestrator Class

# file: backend/app/workflows/multi_scenario/my_skill_orchestrator.py

from skill_agnostic_template import SkillOrchestrator

class MySkillOrchestrator(SkillOrchestrator):

    async def invoke_skill(self, input_data, scenario_params):
        # TODO: Call your skill here
        pass

    def get_scenario_configs(self):
        # TODO: Define simple/medium/complex configs
        pass

    def calculate_quality_metrics(self, results, metric_names):
        # TODO: Implement quality scoring
        pass

    def generate_test_data(self, size, characteristics):
        # TODO: Generate test data for your skill
        pass

Step 2: Register

OrchestratorRegistry.register("my-skill", MySkillOrchestrator)

Step 3: Run

orchestrator = OrchestratorRegistry.get("my-skill")
result = await orchestrator.orchestrate("demo-001")

Total implementation time: 15-30 minutes per skill

Benefits of Abstraction

BenefitImpact
No state machine boilerplate10x faster to add new skill
Consistent patternsTeam learns once, applies everywhere
Automatic comparisonAll skills compared same way
Easy to extendOverride only the methods you need
LangGraph integration readyWorks with checkpointing, streaming, etc.

Testing Template

@pytest.fixture
def orchestrator():
    return PerformanceTestingOrchestrator("performance-testing", "1.0.0")

@pytest.mark.asyncio
async def test_simple_scenario(orchestrator):
    result = await orchestrator.run_scenario("simple", "test-001")

    assert result["scenario_id"] == "simple"
    assert result["items_processed"] > 0
    assert "quality" in result
    assert result["quality"]["latency"] > 0

@pytest.mark.asyncio
async def test_full_orchestration(orchestrator):
    result = await orchestrator.orchestrate("test-002")

    assert "results_by_scenario" in result
    assert len(result["results_by_scenario"]) == 3
    assert result["success_rate"] > 0.5

State Machine Design

State Machine Design: Multi-Scenario Orchestration

Detailed state machine and abstraction patterns for ANY user-invocable skill.

Core State Machine

┌────────────────────────────────────────────────────────────────────────────┐
│                         SCENARIO STATE MACHINE                              │
├────────────────────────────────────────────────────────────────────────────┤
│                                                                             │
│                    ┌──────────────────────────────┐                        │
│                    │  PENDING                     │                        │
│                    │ (awaiting start signal)       │                        │
│                    └──────┬───────────────────────┘                        │
│                           │ supervisor.route_scenarios()                  │
│                           ▼                                               │
│                    ┌──────────────────────────────┐                        │
│                    │  RUNNING                     │                        │
│                    │ (executing skill batches)    │◄──────────┐           │
│                    └──────┬───────────────────────┘           │           │
│                           │                                   │           │
│              ┌────────────┼────────────┐                      │           │
│              │            │            │                      │           │
│         [pause]        [milestone]  [error]              [resume]         │
│              │            │            │                      │           │
│              ▼            ▼            ▼                      │           │
│         ┌────────┐  ┌─────────┐  ┌──────────┐               │           │
│         │ PAUSED │  │MILESTONE│  │ FAILED   │               │           │
│         │(waiting)│  │(sync pt)│  │(error)   │               │           │
│         └────┬───┘  └────┬────┘  └──────────┘               │           │
│              │           │                                    │           │
│              │[resume]   │[continue]                          │           │
│              │           │                                    │           │
│              └─────┬─────┘                                    │           │
│                    │                                          │           │
│              [error recovery]──────────────────────────────────┘          │
│                    │                                                       │
│                    ▼                                                       │
│            ┌──────────────────────────────┐                              │
│            │  COMPLETE                    │                              │
│            │ (all batches processed)      │                              │
│            └──────────────────────────────┘                              │
│                    │                                                       │
│                    │ aggregator.collect_and_aggregate()                  │
│                    ▼                                                       │
│            ┌──────────────────────────────┐                              │
│            │  FINAL_RESULTS               │                              │
│            │ (ready for comparison)       │                              │
│            └──────────────────────────────┘                              │
│                                                                            │
└────────────────────────────────────────────────────────────────────────────┘

Legend:
─────────────────
→ Transition
[action] = trigger/activity

Detailed State Definitions

State: PENDING

Entry Point: Created at orchestration start

@dataclass
class PendingState:
    scenario_id: str = "simple|medium|complex"
    orchestration_id: str
    created_at: datetime
    definition: ScenarioDefinition

    # Awaiting signal from supervisor
    waiting_for_start: bool = True

# Supervisor signal triggers transition
# Event: supervisor.route_scenarios() returns Send("scenario_worker", ...)
# Transition: PENDING → RUNNING

Exit Condition: Supervisor sends routing command

State: RUNNING

Active Work State: Executing skill batches

@dataclass
class RunningState:
    scenario_id: str
    status: Literal["running"] = "running"

    # Progress tracking
    progress_pct: float = 0.0  # 0-100
    current_milestone: str = "batch_1"
    milestones_reached: list[str] = field(default_factory=list)

    # Metrics (accumulating)
    items_processed: int = 0
    batch_count: int = 0
    elapsed_ms: int = 0
    memory_used_mb: int = 0

    # Results (partial)
    partial_results: list[dict] = field(default_factory=list)
    quality_scores: dict = field(default_factory=dict)

    # Error tracking
    errors: list[dict] = field(default_factory=list)

    # Transition flags
    should_pause: bool = False
    should_continue: bool = True

Activities:

  • Execute skill batch N
  • Update progress_pct = (items_processed / input_size) * 100
  • Check if progress_pct reached milestone (30%, 50%, 70%, 90%)
  • Record partial results
  • Monitor memory and elapsed time

Exit Conditions:

  1. progress_pct >= 100 → COMPLETE
  2. error_occurred and recovery_possible → PAUSE (or retry)
  3. error_occurred and recovery_failed → FAILED

State: PAUSED

Waiting State: At milestone synchronization point or for recovery

@dataclass
class PausedState:
    scenario_id: str
    status: Literal["paused"] = "paused"

    # Why paused?
    pause_reason: Literal["sync_point", "waiting_for_recovery", "user_halt"] = "sync_point"

    # Checkpoint (for resuming)
    checkpoint_index: int  # Which batch to resume from
    checkpoint_state: dict  # Full state snapshot

    # Sync info (for milestone pause)
    milestone_pct: int
    other_scenarios_status: dict  # {"simple": 45, "medium": 30, "complex": 10}
    time_paused_ms: int = 0

    # Recovery info (for error pause)
    retry_count: int = 0
    max_retries: int = 3
    last_error: str = ""

Activities:

  • Wait for other scenarios (if sync_point)
  • Wait for recovery decision (if error)
  • Monitor timeout (max 60 seconds)
  • Record checkpoint for resumption

Exit Conditions:

  1. all_scenarios_at_milestone → RUNNING (resume)
  2. timeout and waiting_for_others → RUNNING (proceed anyway)
  3. retry_count &lt; max_retries → RUNNING (retry)
  4. retry_count >= max_retries → FAILED

State: FAILED

Terminal Error State: Unrecoverable error

@dataclass
class FailedState:
    scenario_id: str
    status: Literal["failed"] = "failed"

    # Error details
    error_message: str
    error_type: str  # "timeout", "exception", "memory", "validation"
    stack_trace: str

    # Progress at failure
    progress_pct: float
    items_processed: int
    batch_failed: int

    # Attempted recovery
    recovery_attempted: bool = False
    recovery_method: str = ""

    # Partial results (if any)
    partial_results: list[dict] = field(default_factory=list)

Activities:

  • Log error with full context
  • Store partial results (won't complete, but data preserved)
  • Notify coordinator
  • Update checkpoint for debugging

Exit Condition: Terminal state (no transition out)

State: COMPLETE

Success State: All batches processed

@dataclass
class CompleteState:
    scenario_id: str
    status: Literal["complete"] = "complete"

    # Final metrics
    total_items_processed: int
    total_batches: int
    total_elapsed_ms: int
    peak_memory_mb: int

    # Final results
    results: dict  # Full aggregated output
    quality_scores: dict
    error_count: int = 0

    # Comparison data
    time_per_item_ms: float = 0.0
    items_per_second: float = 0.0

    # Quality assessment
    quality_rank: Literal["basic", "good", "excellent"]
    recommendation: str

Activities:

  • Calculate final metrics
  • Score results
  • Prepare for comparison
  • Signal readiness for aggregation

Exit Condition: Transition to aggregator

State: FINAL_RESULTS

Aggregated State: All scenarios combined

@dataclass
class AggregatedFinalResults:
    orchestration_id: str
    timestamp: datetime

    # Per-scenario results
    results: dict  # {"simple": {...}, "medium": {...}, "complex": {...}}

    # Comparative analysis
    quality_ranking: dict
    time_complexity: dict  # {"simple": 1.2ms/item, "medium": 1.8ms/item, ...}
    scaling_efficiency: float  # simple_time / complex_time / (simple_size / complex_size)

    # Cross-scenario patterns
    success_patterns: list[str]
    failure_modes: list[str]
    optimization_opportunities: list[str]

    # Recommendations
    best_difficulty: Literal["simple", "medium", "complex"]
    resource_requirements: dict
    estimated_production_scaling: dict

Transition Rules

PENDING → RUNNING

Trigger: supervisor.route_scenarios() returns Send commands

async def supervisor_trigger(state):
    return [
        Send("scenario_worker", {"scenario_id": "simple", ...}),
        Send("scenario_worker", {"scenario_id": "medium", ...}),
        Send("scenario_worker", {"scenario_id": "complex", ...}),
    ]

Guard: None (always allowed)

Actions on Entry:

  • Set status = "running"
  • Initialize start_time_ms = now()
  • Start processing first batch

RUNNING → MILESTONE (PAUSED at sync point)

Trigger: progress_pct in [30, 50, 70, 90] AND synchronize_at_milestone

# In scenario_worker
if progress_pct in [30, 50, 70, 90]:
    should_sync = await synchronize_at_milestone(milestone_pct, state)
    if should_sync:
        # Transition to PAUSED
        state.status = "paused"
        state.pause_reason = "sync_point"
        state.milestone_pct = progress_pct

Guard: Other scenarios must be within 5% of milestone OR timeout expires

Actions on Entry:

  • Record checkpoint
  • Wait for other scenarios (max 60s)
  • Monitor progress of siblings

PAUSED → RUNNING (Resume after sync)

Trigger: All scenarios reached milestone OR timeout

# Monitor loop in sync node
if all_at_milestone or timeout_expired:
    # Resume
    state.status = "running"
    state.pause_reason = None
    # Continue from checkpoint

Guard: None (always allowed)

Actions on Entry:

  • Load checkpoint
  • Resume from interrupted batch
  • Continue processing

RUNNING → COMPLETE

Trigger: items_processed >= input_size

# In scenario_worker
for batch_idx, batch in enumerate(batches):
    await invoke_skill(batch)
    items_processed += len(batch)

    if items_processed >= scenario_def.input_size:
        state.status = "complete"
        break

Guard: None

Actions on Entry:

  • Calculate final metrics
  • Score all results
  • Prepare aggregation data

RUNNING → PAUSED (Error recovery pause)

Trigger: Exception in skill invocation, recovery possible

try:
    result = await invoke_skill(batch)
except SkillException as e:
    if can_recover(e):
        state.status = "paused"
        state.pause_reason = "waiting_for_recovery"
        state.last_error = str(e)
        state.retry_count += 1
        # Decision logic for retry

Guard: retry_count &lt; max_retries (usually 3)

Actions on Entry:

  • Save checkpoint before error
  • Log error details
  • Increment retry counter
  • Wait for retry decision

PAUSED → RUNNING (Retry after error)

Trigger: Manual retry decision or automatic retry eligible

if error_state.retry_count < max_retries:
    state.status = "running"
    # Load checkpoint, try again

Guard: retry_count &lt; max_retries

RUNNING → FAILED

Trigger: Exception and recovery not possible

try:
    result = await invoke_skill(batch)
except FatalException as e:
    state.status = "failed"
    state.error_message = str(e)
    state.error_type = type(e).__name__

Guard: retry_count >= max_retries OR is_fatal_error

Actions on Entry:

  • Log full error context
  • Store partial results (if any)
  • Notify aggregator about failure
  • Don't block other scenarios

COMPLETE → FINAL_RESULTS

Trigger: All 3 scenarios reached COMPLETE or FAILED

# In aggregator_node
simple_complete = state.progress_simple.status in ["complete", "failed"]
medium_complete = state.progress_medium.status in ["complete", "failed"]
complex_complete = state.progress_complex.status in ["complete", "failed"]

if simple_complete and medium_complete and complex_complete:
    return aggregate_results(state)

Guard: All scenarios in terminal state

Actions on Entry:

  • Merge results from all scenarios
  • Calculate comparisons
  • Extract patterns
  • Generate recommendations

Abstraction for ANY Skill

The state machine remains skill-agnostic. Customization happens in:

  1. Skill Invocation: invoke_skill() (your skill here)
  2. Quality Metrics: calculate_quality_metrics() (what to measure)
  3. Scenario Parameters: Batch size, timeout, memory (adjust per skill)

Skill Integration Point

async def invoke_skill(
    skill_name: str,           # "performance-testing", "security-scanning", etc.
    input_data: list,          # Items to process
    skill_params: dict,        # Skill-specific config
) -> dict:
    """
    Call any user-invocable skill.

    This is the ONLY skill-specific code in the state machine.
    """

    # Match skill_name and invoke appropriately
    if skill_name == "performance-testing":
        return await invoke_performance_testing(input_data, skill_params)
    elif skill_name == "security-scanning":
        return await invoke_security_scanning(input_data, skill_params)
    elif skill_name == "your-skill":
        return await invoke_your_skill(input_data, skill_params)
    else:
        raise ValueError(f"Unknown skill: {skill_name}")

Quality Metrics Adapter

def calculate_quality_metrics(
    skill_name: str,
    results: list[dict],
    metric_names: list[str]
) -> dict:
    """
    Calculate quality metrics (skill-specific).
    """

    scores = {}

    # Skill-agnostic metrics (always available)
    scores["completion_rate"] = len(results) / max(1, len(results))
    scores["error_rate"] = sum(1 for r in results if r.get("error")) / max(1, len(results))

    # Skill-specific metrics
    if "accuracy" in metric_names:
        # For security-scanning: vulnerability count vs. expected
        # For performance-testing: actual latency vs. threshold
        scores["accuracy"] = calculate_accuracy(skill_name, results)

    if "coverage" in metric_names:
        # For security-scanning: percentage of codebase scanned
        # For performance-testing: percentage of endpoints tested
        scores["coverage"] = calculate_coverage(skill_name, results)

    return scores

Scenario Parameter Templates

SCENARIO_TEMPLATES = {
    # Generic: 1x, 3x, 8x scaling
    "default": {
        "simple": {"multiplier": 1.0, "timeout": 30, "batch_size": 10},
        "medium": {"multiplier": 3.0, "timeout": 90, "batch_size": 50},
        "complex": {"multiplier": 8.0, "timeout": 300, "batch_size": 100},
    },

    # Performance-testing: lighter loads
    "performance-testing": {
        "simple": {"multiplier": 1.0, "timeout": 60, "batch_size": 5},
        "medium": {"multiplier": 2.0, "timeout": 180, "batch_size": 20},
        "complex": {"multiplier": 4.0, "timeout": 600, "batch_size": 50},
    },

    # Security-scanning: heavier loads
    "security-scanning": {
        "simple": {"multiplier": 1.0, "timeout": 45, "batch_size": 20},
        "medium": {"multiplier": 2.5, "timeout": 120, "batch_size": 100},
        "complex": {"multiplier": 10.0, "timeout": 900, "batch_size": 500},
    },
}

def get_scenario_config(skill_name: str, difficulty: str) -> dict:
    template = SCENARIO_TEMPLATES.get(skill_name, SCENARIO_TEMPLATES["default"])
    return template[difficulty]

Key State Machine Patterns

Pattern 1: Optimistic Completion

Assume success, handle errors reactively:

# Default behavior: run until complete
for batch in batches:
    result = await invoke_skill(batch)
    # No error checking—complete normally

# Only if exception: enter error recovery

Pro: Efficient for reliable skills Con: Slower error detection

Pattern 2: Pessimistic Validation

Validate each batch before continuing:

for batch in batches:
    result = await invoke_skill(batch)

    # Validate result
    if not validate_result(result, scenario_def.expected_quality):
        # Error recovery
        retry_count += 1

Pro: Catches issues early Con: Higher overhead

Pattern 3: Timeout-Based State Transitions

Use elapsed time to trigger state changes:

@dataclass
class TimedState:
    created_at: int
    timeout_seconds: int

    def is_expired(self) -> bool:
        return (now() - created_at) > timeout_seconds * 1000

# Check in state machine
if paused_state.is_expired():
    # Force transition (to RUNNING or FAILED)

Visualizing State Transitions

# Generate state transition diagram for debugging
import graphviz

def generate_state_diagram():
    dot = graphviz.Digraph(comment="Scenario State Machine")

    states = ["PENDING", "RUNNING", "PAUSED", "COMPLETE", "FAILED", "FINAL_RESULTS"]
    for state in states:
        dot.node(state, shape="ellipse")

    transitions = [
        ("PENDING", "RUNNING", "supervisor.route()"),
        ("RUNNING", "PAUSED", "milestone reached"),
        ("RUNNING", "COMPLETE", "all items processed"),
        ("RUNNING", "FAILED", "fatal error"),
        ("PAUSED", "RUNNING", "sync complete or timeout"),
        ("PAUSED", "FAILED", "max retries exceeded"),
        ("COMPLETE", "FINAL_RESULTS", "aggregation trigger"),
        ("FAILED", "FINAL_RESULTS", "aggregation trigger"),
    ]

    for src, dst, label in transitions:
        dot.edge(src, dst, label=label)

    dot.render("/tmp/state_machine", format="png", view=True)

Testing State Transitions

@pytest.mark.asyncio
async def test_state_transition_pending_to_running():
    state = ScenarioOrchestratorState(...)
    assert state.progress_simple.status == "pending"

    # Trigger supervisor
    commands = await scenario_supervisor(state)

    assert len(commands) == 3  # 3 Send commands

@pytest.mark.asyncio
async def test_state_transition_running_to_complete():
    state = ScenarioOrchestratorState(...)
    state.progress_simple.status = "running"

    # Simulate processing all items
    state.progress_simple.items_processed = 100

    # Invoke worker
    result = await scenario_worker(state)

    assert result["progress_simple"]["status"] == "complete"

Checklists (2)

Framework Selection

Framework Selection Checklist

Choose the right multi-agent framework.

Requirements Analysis

  • Use case clearly defined
  • Complexity level assessed (single vs multi-agent)
  • State management needs identified
  • Human-in-the-loop requirements defined
  • Observability needs documented

Framework Evaluation

LangGraph

  • Need complex stateful workflows
  • Require persistence and checkpoints
  • Want streaming support
  • Need human-in-the-loop
  • Already using LangChain ecosystem

CrewAI

  • Role-based collaboration pattern
  • Hierarchical team structure
  • Agent delegation needed
  • Quick prototyping required
  • Built-in memory preferred

OpenAI Agents SDK

  • OpenAI-native ecosystem
  • Handoff pattern fits use case
  • Need built-in guardrails
  • Want OpenAI tracing
  • Simpler agent definition preferred

Microsoft Agent Framework

  • Enterprise compliance requirements
  • Using Azure ecosystem
  • Need A2A protocol support
  • Want AutoGen+SK merger features
  • Long-term Microsoft support preferred

AG2 (Community AutoGen)

  • Open-source flexibility priority
  • Community-driven development OK
  • AutoGen familiarity exists
  • Custom modifications needed

Technical Considerations

  • Team expertise with framework
  • Framework maturity level acceptable
  • Community support adequate
  • Documentation quality sufficient
  • Production readiness validated

Integration Assessment

  • Observability tool compatibility (Langfuse, etc.)
  • LLM provider compatibility
  • Existing codebase integration
  • Testing framework support
  • CI/CD pipeline compatibility

Risk Mitigation

  • Fallback strategy defined
  • Framework lock-in assessed
  • Migration path understood
  • Version update strategy
  • Community health evaluated

Decision Documentation

  • Framework choice documented
  • Rationale recorded
  • Alternatives considered listed
  • Trade-offs acknowledged
  • Review date scheduled

Orchestration Checklist

Multi-Agent Orchestration Checklist

Architecture

  • Define agent responsibilities
  • Plan communication patterns
  • Set coordination strategy
  • Design failure handling

Agent Design

  • Single responsibility per agent
  • Clear input/output contracts
  • Independent operation
  • Stateless when possible

Communication

  • Message format definition
  • Async message passing
  • Result aggregation
  • Error propagation

Coordination

  • Central orchestrator
  • Task queue management
  • Priority handling
  • Deadlock prevention

Monitoring

  • Agent health checks
  • Task completion tracking
  • Performance metrics
  • Error rates
Edit on GitHub

Last updated on

On this page

Agent OrchestrationQuick ReferenceQuick StartAgent LoopsMulti-Agent CoordinationAlternative FrameworksMulti-ScenarioKey DecisionsCommon MistakesRelated SkillsCapability Detailsreact-loopplan-executesupervisor-coordinationagent-debateresult-synthesiscrewai-patternsautogen-patternsframework-selectionscenario-orchestratorscenario-routingRules (10)Frameworks: Microsoft Agent Framework / AutoGen — HIGHMicrosoft Agent Framework (AutoGen + Semantic Kernel)Team SetupSelector Group ChatTermination ConditionsTool IntegrationState ManagementAgent-to-Agent Protocol (A2A)StreamingOpenAI Agents SDK (0.7.0)Migration from AutoGen 0.2Best PracticesCompare and select multi-agent frameworks based on complexity, use case, and production needs — HIGHFramework Comparison & SelectionFramework OverviewFeature ComparisonDecision TreeUse Case MatrixCost ConsiderationsPerformance CharacteristicsTeam Expertise RequirementsMigration PathsFrom AutoGen to MS Agent FrameworkFrom Custom to LangGraphGPT-5.2-Codex Decision MatrixRecommendation SummaryCommon MistakesBuild role-based agent collaboration with CrewAI Flows, hierarchical crews, and structured outputs — HIGHCrewAI Patterns (v1.8+)Hierarchical CrewFlows Architecture (1.8+)Parallel Execution with and_/or_Structured OutputTask GuardrailsMCP Tool Support (1.8+)Decorator-Based Crew (Recommended)Best PracticesGenerate multi-step plans before execution with replanning when conditions change — HIGHPlan-and-Execute PatternCore ImplementationTypeScript ReAct AgentFunction Calling Agent (Alternative)When to Choose Which PatternImplement ReAct pattern where agents reason, act via tools, observe, and iterate — HIGHReAct Pattern (Reasoning + Acting)ReAct Prompt TemplatePython ImplementationSelf-Correction LoopMemory ManagementKey DecisionsCommon MistakesResolve agent disagreements through confidence scores, LLM arbitration, or majority voting — HIGHAgent Debate & Conflict ResolutionConflict ResolutionStructured Conflict DetectionAgent Communication BusResolution StrategiesCommon MistakesCoordinate specialist agents through a central supervisor with parallel execution and timeouts — CRITICALSupervisor PatternFan-Out/Fan-InSupervisor with RoutingSupervisor-Worker with TimeoutCC Agent Teams (CC 2.1.33+)Star vs Mesh TopologyDual-Mode Decision TreeTeam FormationPeer MessagingCost ComparisonKey DecisionsSynthesize parallel agent outputs into coherent actionable results with quality metrics — HIGHResult SynthesisSynthesis PatternMulti-Agent Collaboration (TypeScript)Aggregation StrategiesComparative (Default)Pattern ExtractionRecommendation EngineCost OptimizationOrchestration ChecklistTest skills across three parallel scenarios with progressive difficulty and synchronized execution — MEDIUMMulti-Scenario OrchestratorCore PatternWhen to UseLangGraph ImplementationSkill-Agnostic TemplateDifficulty ScalingOutput ExampleCommon MistakesSynchronize milestones, scale difficulty, and recover from failures across multi-scenario orchestration — MEDIUMScenario Routing & SynchronizationSynchronization ModesMilestone SynchronizationInput Scaling StrategiesLinear Scaling (I/O-bound skills)Adaptive Scaling (per-skill tuning)Complexity DetectionFailure RecoveryOne Scenario Fails (Independent)Timeout HandlingAll Scenarios Fail (Systematic)CheckpointingScenario-Level CheckpointsFull-State SnapshotsQuality Metrics FrameworkFunctional Metrics (per-skill)Comparative MetricsMulti-Host ExecutionKey DecisionsReferences (11)Architectural PatternsArchitectural Patterns for Multi-Scenario OrchestrationPattern 1: Three-Tier SynchronizationTier 1: Free-Running (Baseline)Tier 2: Milestone SynchronizationTier 3: Lock-Step (Strict Synchronization)Pattern 2: Input Scaling StrategiesStrategy A: Linear Scaling (Additive)Strategy B: Exponential Scaling (Multiplicative)Strategy C: Quadratic ScalingStrategy D: Adaptive ScalingPattern 3: Quality Metrics FrameworkMetric Category 1: Functional MetricsMetric Category 2: Comparative MetricsMetric Category 3: Stability MetricsPattern 4: Failure Modes & RecoveryFailure Mode 1: One Scenario Fails (Independent)Failure Mode 2: All Scenarios Fail (Systematic)Failure Mode 3: Timeout (Skill Takes Too Long)Pattern 5: Observability & MonitoringMonitoring Strategy 1: Real-Time ProgressMonitoring Strategy 2: Comparative TimelineMonitoring Strategy 3: Quality TrendPattern 6: Result Aggregation StrategiesAggregation Type 1: Comparative (Default)Aggregation Type 2: Pattern ExtractionAggregation Type 3: Recommendation EnginePattern 7: Checkpointing StrategyCheckpoint Type 1: Scenario-LevelCheckpoint Type 2: Milestone-LevelCheckpoint Type 3: Full-StatePattern 8: Cost & Performance AnalysisCost AnalysisPerformance AnalysisKey Architectural DecisionsReferencesClaude Code Instance ManagementClaude Code Instance Management: Multi-Scenario DemosInstance ArchitectureSetup InstructionsStep 1: Prepare the ProjectStep 2: Create Scenario Runner ScriptExecution: Three-Terminal ModeTerminal 1: CoordinatorTerminal 2: Simple ScenarioTerminal 3: Medium ScenarioTerminal 4 (Optional): Complex ScenarioShared State SynchronizationPostgreSQL Checkpoint SchemaMonitor Progress from CoordinatorSynchronization at MilestonesAdvanced: Multi-Host ExecutionBest PracticesTroubleshootingCoordination PatternsAgent Coordination PatternsSupervisor-Worker PatternConflict ResolutionConfigurationCost OptimizationCrewai PatternsCrewAI Patterns (v1.8+)Table of ContentsFlows Architecture (1.8+)Basic FlowStructured State (Pydantic)Router for Conditional BranchingParallel Execution with and_/or_Integrating Crews with FlowsMCP Tool Support (1.8+)Simple DSL (Recommended)Transport-Specific ConfigurationMCPServerAdapter (Advanced)Hierarchical ProcessAgent Configuration (1.8+)Task Configuration (1.8+)Structured OutputAsync Task ExecutionTask Guardrails (Validation)Human Input TasksTask CallbacksAsync ExecutionAsync Crew KickoffAsync Flow KickoffStreaming OutputKnowledge Sources (1.8+)Memory ConfigurationCustom ToolsDecorator-Based Crew Definition (Recommended)Human-in-the-Loop (Flows)Configuration SummaryBest PracticesMigration from 0.xFramework ComparisonFramework ComparisonFeature ComparisonUse Case MatrixDecision TreeMigration PathsFrom AutoGen to MS Agent FrameworkFrom Custom to LangGraphCost ConsiderationsPerformance CharacteristicsTeam Expertise RequirementsRecommendation SummaryGpt 5 2 CodexGPT-5.2-CodexOverviewKey Differences from GPT-5.2Key CapabilitiesLong-Horizon Work Through Context CompactionProject-Scale TasksEnhanced Cybersecurity CapabilitiesVision for Code ArtifactsBenchmark PerformanceAPI Usage PatternsBasic CompletionStreaming with Tool UseAsync OperationsIntegration with Agents SDKRealtimeRunner for Interactive SessionsIDE IntegrationsCursorWindsurf (Codeium)GitHub Copilot WorkspaceFactory (VSCode Extension)When to Use Codex vs Standard GPT-5.2Use GPT-5.2-Codex When:Use Standard GPT-5.2 When:Decision MatrixPricing ConsiderationsToken Pricing (January 2026)Cost Optimization StrategiesEstimated Costs by Task TypeConfiguration ReferenceCodex-Specific ParametersBest PracticesRelated ResourcesLanggraph ImplementationLangGraph Implementation: Multi-Scenario Orchestration1. State Definition2. Node ImplementationsSupervisor NodeSynchronization NodeAggregator Node3. Graph Construction4. Invocation Example5. Helper Functions6. Streaming Results (Real-time Progress)Key FeaturesTestingMicrosoft Agent FrameworkMicrosoft Agent FrameworkAssistantAgent SetupTeam PatternsRound Robin ChatSelector Group ChatTool IntegrationTermination ConditionsStreamingState ManagementAgent-to-Agent Protocol (A2A)Migration from AutoGen 0.2ConfigurationBest PracticesOpenai Agents SdkOpenAI Agents SDKRequirementsBasic Agent DefinitionSessions (v0.6.6+)Session TypesHandoffs Between AgentsHandoff History Packaging (v0.7.0)Handoff Input FiltersMCPServerManager (v0.7.0)Single MCP ServerAgents as ToolsGuardrailsTool Guardrails (v0.6.5+)Tracing and ObservabilityStreaming ResponsesMulti-Agent ConversationRealtime and VoiceConfigurationModel Settings (v0.7.0)Provider SupportBest PracticesBreaking Changes in v0.7.0Version HistorySkill Agnostic TemplateSkill-Agnostic Template AbstractionTemplate ArchitectureGeneric Orchestrator ClassExample Implementation: Performance TestingExample Implementation: Security ScanningRegistration and Factory PatternIntegration with LangGraphAdding a New SkillStep 1: Create Orchestrator ClassStep 2: RegisterStep 3: RunBenefits of AbstractionTesting TemplateState Machine DesignState Machine Design: Multi-Scenario OrchestrationCore State MachineDetailed State DefinitionsState: PENDINGState: RUNNINGState: PAUSEDState: FAILEDState: COMPLETEState: FINAL_RESULTSTransition RulesPENDING → RUNNINGRUNNING → MILESTONE (PAUSED at sync point)PAUSED → RUNNING (Resume after sync)RUNNING → COMPLETERUNNING → PAUSED (Error recovery pause)PAUSED → RUNNING (Retry after error)RUNNING → FAILEDCOMPLETE → FINAL_RESULTSAbstraction for ANY SkillSkill Integration PointQuality Metrics AdapterScenario Parameter TemplatesKey State Machine PatternsPattern 1: Optimistic CompletionPattern 2: Pessimistic ValidationPattern 3: Timeout-Based State TransitionsVisualizing State TransitionsTesting State TransitionsChecklists (2)Framework SelectionFramework Selection ChecklistRequirements AnalysisFramework EvaluationLangGraphCrewAIOpenAI Agents SDKMicrosoft Agent FrameworkAG2 (Community AutoGen)Technical ConsiderationsIntegration AssessmentRisk MitigationDecision DocumentationOrchestration ChecklistMulti-Agent Orchestration ChecklistArchitectureAgent DesignCommunicationCoordinationMonitoring