Agent Orchestration
Agent orchestration patterns for agentic loops, multi-agent coordination, alternative frameworks, and multi-scenario workflows. Use when building autonomous agent loops, coordinating multiple agents, evaluating CrewAI/AutoGen/Swarm, or orchestrating complex multi-step scenarios.
Primary Agent: workflow-architect
Agent Orchestration
Comprehensive patterns for building and coordinating AI agents -- from single-agent reasoning loops to multi-agent systems and framework selection. Each category has individual rule files in rules/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Agent Loops | 2 | HIGH | ReAct reasoning, plan-and-execute, self-correction |
| Multi-Agent Coordination | 3 | CRITICAL | Supervisor routing, agent debate, result synthesis |
| Alternative Frameworks | 3 | HIGH | CrewAI crews, AutoGen teams, framework comparison |
| Multi-Scenario | 2 | MEDIUM | Parallel scenario orchestration, difficulty routing |
Total: 10 rules across 4 categories
Quick Start
# ReAct agent loop
async def react_loop(question: str, tools: dict, max_steps: int = 10) -> str:
history = REACT_PROMPT.format(tools=list(tools.keys()), question=question)
for step in range(max_steps):
response = await llm.chat([{"role": "user", "content": history}])
if "Final Answer:" in response.content:
return response.content.split("Final Answer:")[-1].strip()
if "Action:" in response.content:
action = parse_action(response.content)
result = await tools[action.name](*action.args)
history += f"\nObservation: {result}\n"
return "Max steps reached without answer"# Supervisor with fan-out/fan-in
async def multi_agent_analysis(content: str) -> dict:
agents = [("security", security_agent), ("perf", perf_agent)]
tasks = [agent(content) for _, agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
return await synthesize_findings(results)Agent Loops
Patterns for autonomous LLM reasoning: ReAct (Reasoning + Acting), Plan-and-Execute with replanning, self-correction loops, and sliding-window memory management.
Key decisions: Max steps 5-15, temperature 0.3-0.7, memory window 10-20 messages.
Multi-Agent Coordination
Fan-out/fan-in parallelism, supervisor routing with dependency ordering, conflict resolution (confidence-based or LLM arbitration), result synthesis, and CC Agent Teams (mesh topology for peer messaging in CC 2.1.33+).
Key decisions: 3-8 specialists, parallelize independent agents, use Task tool (star) for simple work, Agent Teams (mesh) for cross-cutting concerns.
Alternative Frameworks
CrewAI hierarchical crews with Flows (1.8+), OpenAI Agents SDK handoffs and guardrails (0.7.0), Microsoft Agent Framework (AutoGen + SK merger), GPT-5.2-Codex for long-horizon coding, and AG2 for open-source flexibility.
Key decisions: Match framework to team expertise + use case. LangGraph for state machines, CrewAI for role-based teams, OpenAI SDK for handoff workflows, MS Agent for enterprise compliance.
Multi-Scenario
Orchestrate a single skill across 3 parallel scenarios (simple/medium/complex) with progressive difficulty scaling (1x/3x/8x), milestone synchronization, and cross-scenario result aggregation.
Key decisions: Free-running with checkpoints, always 3 scenarios, 1x/3x/8x exponential scaling, 30s/90s/300s time budgets.
Key Decisions
| Decision | Recommendation |
|---|---|
| Single vs multi-agent | Single for focused tasks, multi for decomposable work |
| Max loop steps | 5-15 (prevent infinite loops) |
| Agent count | 3-8 specialists per workflow |
| Framework | Match to team expertise + use case |
| Topology | Task tool (star) for simple; Agent Teams (mesh) for complex |
| Scenario count | Always 3: simple, medium, complex |
Common Mistakes
- No step limit in agent loops (infinite loops)
- No memory management (context overflow)
- No error isolation in multi-agent (one failure crashes all)
- Missing synthesis step (raw agent outputs not useful)
- Mixing frameworks in one project (complexity explosion)
- Using Agent Teams for simple sequential work (use Task tool)
- Sequential instead of parallel scenarios (defeats purpose)
Related Skills
ork:langgraph- LangGraph workflow patterns (supervisor, routing, state)function-calling- Tool definitions and executionork:task-dependency-patterns- Task management with Agent Teams workflow
Capability Details
react-loop
Keywords: react, reason, act, observe, loop, agent Solves:
- Implement ReAct pattern
- Create reasoning loops
- Build iterative agents
plan-execute
Keywords: plan, execute, replan, multi-step, autonomous Solves:
- Create plan then execute steps
- Implement replanning on failure
- Build goal-oriented agents
supervisor-coordination
Keywords: supervisor, route, coordinate, fan-out, fan-in, parallel Solves:
- Route tasks to specialized agents
- Run agents in parallel
- Aggregate multi-agent results
agent-debate
Keywords: debate, conflict, resolution, arbitration, consensus Solves:
- Resolve agent disagreements
- Implement LLM arbitration
- Handle conflicting outputs
result-synthesis
Keywords: synthesize, combine, aggregate, merge, summary Solves:
- Combine outputs from multiple agents
- Create executive summaries
- Score confidence across findings
crewai-patterns
Keywords: crewai, crew, hierarchical, delegation, role-based, flows Solves:
- Build role-based agent teams
- Implement hierarchical coordination
- Use Flows for event-driven orchestration
autogen-patterns
Keywords: autogen, microsoft, agent framework, teams, enterprise, a2a Solves:
- Build enterprise agent systems
- Use AutoGen/SK merged framework
- Implement A2A protocol
framework-selection
Keywords: choose, compare, framework, decision, which, crewai, autogen, openai Solves:
- Select appropriate framework
- Compare framework capabilities
- Match framework to requirements
scenario-orchestrator
Keywords: scenario, parallel, fan-out, difficulty, progressive, demo Solves:
- Run skill across multiple difficulty levels
- Implement parallel scenario execution
- Aggregate cross-scenario results
scenario-routing
Keywords: route, synchronize, milestone, checkpoint, scaling Solves:
- Route tasks by difficulty level
- Synchronize at milestones
- Scale inputs progressively
Rules (10)
Frameworks: Microsoft Agent Framework / AutoGen — HIGH
Microsoft Agent Framework (AutoGen + Semantic Kernel)
Enterprise multi-agent systems with RoundRobin/Selector teams, termination conditions, A2A protocol, and tool integration.
Team Setup
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient
# Create model client
model_client = OpenAIChatCompletionClient(model="gpt-5.2")
# Define agents
planner = AssistantAgent(
name="planner",
description="Plans complex tasks and breaks them into steps",
model_client=model_client,
system_message="You are a planning expert. Break tasks into actionable steps."
)
executor = AssistantAgent(
name="executor",
description="Executes planned tasks",
model_client=model_client,
system_message="You execute tasks according to the plan."
)
reviewer = AssistantAgent(
name="reviewer",
description="Reviews work and provides feedback",
model_client=model_client,
system_message="You review work. Say 'APPROVED' if satisfactory."
)
# Create team with termination condition
termination = TextMentionTermination("APPROVED")
team = RoundRobinGroupChat(
participants=[planner, executor, reviewer],
termination_condition=termination
)
# Run team
result = await team.run(task="Create a marketing strategy")Selector Group Chat
from autogen_agentchat.teams import SelectorGroupChat
# Selector chooses next speaker based on context
team = SelectorGroupChat(
participants=[analyst, writer, reviewer],
model_client=model_client,
termination_condition=termination
)Termination Conditions
from autogen_agentchat.conditions import (
TextMentionTermination,
MaxMessageTermination,
TokenUsageTermination,
TimeoutTermination,
OrTerminationCondition,
)
termination = OrTerminationCondition(
TextMentionTermination("DONE"),
MaxMessageTermination(max_messages=20),
TimeoutTermination(timeout_seconds=300)
)Tool Integration
from autogen_core.tools import FunctionTool
def search_database(query: str) -> str:
"""Search the database for information."""
results = db.search(query)
return json.dumps(results)
search_tool = FunctionTool(search_database, description="Search the database")
researcher = AssistantAgent(
name="researcher",
description="Researches information",
model_client=model_client,
tools=[search_tool],
system_message="Use the search tool to find information."
)State Management
# Save state
state = await team.save_state()
# Restore state
await team.load_state(state)
# Resume conversation
result = await team.run(task="Continue from where we left off")Agent-to-Agent Protocol (A2A)
from autogen_agentchat.protocols import A2AProtocol
protocol = A2AProtocol(
agent=my_agent,
endpoint="https://api.example.com/agent",
auth_token=os.environ["A2A_TOKEN"]
)
response = await protocol.send(
to="external-agent-id",
message="Process this request"
)Streaming
async for message in team.run_stream(task="Analyze this data"):
print(f"{message.source}: {message.content}")OpenAI Agents SDK (0.7.0)
Alternative for OpenAI-native ecosystems:
from agents import Agent, Runner, handoff, RunConfig
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX
researcher_agent = Agent(
name="researcher",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a research specialist. Gather information and facts.
When research is complete, hand off to the writer.""",
model="gpt-5.2"
)
writer_agent = Agent(
name="writer",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a content writer. Create compelling content from research.""",
model="gpt-5.2"
)
orchestrator = Agent(
name="orchestrator",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You coordinate research and writing tasks.""",
model="gpt-5.2",
handoffs=[handoff(agent=researcher_agent), handoff(agent=writer_agent)]
)
async def run_workflow(task: str):
runner = Runner()
config = RunConfig(nest_handoff_history=True)
result = await runner.run(orchestrator, task, run_config=config)
return result.final_outputMigration from AutoGen 0.2
# Old AutoGen 0.2
# from autogen import AssistantAgent, UserProxyAgent
# New AutoGen 0.4+
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
# Key differences:
# - No UserProxyAgent needed for simple tasks
# - Teams replace GroupChat
# - Explicit termination conditions required
# - Model client separate from agentBest Practices
- Always set explicit termination conditions
- Team size: 3-5 agents optimal
- Clear role definitions in system_message
- One function per tool with clear descriptions
- Use try/except around team.run()
- Use run_stream() for real-time feedback
Incorrect — team without termination condition runs indefinitely:
team = RoundRobinGroupChat(
participants=[planner, executor, reviewer]
# No termination condition - infinite loop risk
)
result = await team.run(task="Complete task")Correct — explicit termination prevents infinite loops:
termination = OrTerminationCondition(
TextMentionTermination("APPROVED"),
MaxMessageTermination(max_messages=20)
)
team = RoundRobinGroupChat(
participants=[planner, executor, reviewer],
termination_condition=termination
)
result = await team.run(task="Complete task")Compare and select multi-agent frameworks based on complexity, use case, and production needs — HIGH
Framework Comparison & Selection
Decision matrix for choosing between multi-agent frameworks.
Framework Overview
| Framework | Best For | Key Features | Status |
|---|---|---|---|
| LangGraph 1.0.6 | Complex stateful workflows | Persistence, streaming, human-in-loop | Production |
| CrewAI 1.8.x | Role-based collaboration | Flows, hierarchical crews, a2a, HITL | Production |
| OpenAI Agents SDK 0.7.0 | OpenAI ecosystem | Handoffs, guardrails, MCPServerManager | Production |
| GPT-5.2-Codex | Long-horizon coding | Context compaction, project-scale, security | Production |
| MS Agent Framework | Enterprise | AutoGen+SK merger, A2A, compliance | Public Preview |
| AG2 | Open-source, flexible | Community fork of AutoGen | Active |
Feature Comparison
| Feature | LangGraph | CrewAI | OpenAI SDK | MS Agent |
|---|---|---|---|---|
| State Management | Excellent | Good | Basic | Good |
| Persistence | Built-in | Plugin | Manual | Built-in |
| Streaming | Native | Limited | Native | Native |
| Human-in-Loop | Native | Manual | Manual | Native |
| Memory | Via Store | Built-in | Manual | Manual |
| Observability | Langfuse/LangSmith | Limited | Tracing | Azure Monitor |
| Learning Curve | Steep | Easy | Medium | Medium |
| Production Ready | Yes | Yes | Yes | Q1 2026 |
Decision Tree
Start
|
+-- Need complex state machines?
| +-- Yes --> LangGraph
| +-- No
| |
+-- Role-based collaboration?
| +-- Yes --> CrewAI
| +-- No
| |
+-- OpenAI ecosystem only?
| +-- Yes --> OpenAI Agents SDK
| +-- No
| |
+-- Enterprise requirements?
| +-- Yes --> Microsoft Agent Framework
| +-- No
| |
+-- Long-horizon coding tasks?
| +-- Yes --> GPT-5.2-Codex
| +-- No
| |
+-- Open-source priority?
+-- Yes --> AG2
+-- No --> LangGraph (default)Use Case Matrix
| Use Case | Best Framework | Why |
|---|---|---|
| Complex state machines | LangGraph | Native StateGraph, persistence |
| Role-based teams | CrewAI | Built-in delegation, backstories |
| OpenAI-only projects | OpenAI SDK | Native integration, handoffs |
| Enterprise/compliance | MS Agent | Azure integration, A2A |
| Long-horizon coding | GPT-5.2-Codex | Context compaction, security |
| Quick prototypes | CrewAI | Minimal boilerplate |
| Long-running workflows | LangGraph | Checkpointing, recovery |
| Customer support bots | OpenAI SDK | Handoffs, guardrails |
| Research/experiments | AG2 | Open-source, flexible |
Cost Considerations
| Framework | Licensing | Infra Cost | LLM Cost |
|---|---|---|---|
| LangGraph | MIT | Self-host / LangGraph Cloud | Any LLM |
| CrewAI | MIT | Self-host | Any LLM |
| OpenAI SDK | MIT | Self-host | OpenAI only |
| MS Agent | MIT | Self-host / Azure | Any LLM |
| AG2 | Apache 2.0 | Self-host | Any LLM |
Performance Characteristics
| Framework | Cold Start | Latency | Throughput |
|---|---|---|---|
| LangGraph | ~100ms | Low | High |
| CrewAI | ~200ms | Medium | Medium |
| OpenAI SDK | ~50ms | Low | High |
| MS Agent | ~150ms | Medium | High |
Team Expertise Requirements
| Framework | Python | LLM | Infra |
|---|---|---|---|
| LangGraph | Expert | Expert | Medium |
| CrewAI | Beginner | Beginner | Low |
| OpenAI SDK | Medium | Medium | Low |
| MS Agent | Medium | Medium | High |
Migration Paths
From AutoGen to MS Agent Framework
# AutoGen 0.2 (old)
from autogen import AssistantAgent, UserProxyAgent
agent = AssistantAgent(name="assistant", llm_config=config)
# MS Agent Framework (new)
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(model="gpt-5.2")
agent = AssistantAgent(name="assistant", model_client=model_client)From Custom to LangGraph
# Custom orchestration (old)
async def workflow(task):
step1 = await agent1.run(task)
step2 = await agent2.run(step1)
return step2
# LangGraph (new)
from langgraph.graph import StateGraph
workflow = StateGraph(State)
workflow.add_node("agent1", agent1_node)
workflow.add_node("agent2", agent2_node)
workflow.add_edge("agent1", "agent2")GPT-5.2-Codex Decision Matrix
Task Duration > 1 hour?
+-- Yes --> GPT-5.2-Codex
+-- No
+-- Multiple files affected?
| +-- Yes --> GPT-5.2-Codex
| +-- No
| +-- Security review needed?
| +-- Yes --> GPT-5.2-Codex
| +-- No --> GPT-5.2 (standard)Recommendation Summary
- Default choice: LangGraph (most capable, production-proven)
- Fastest to prototype: CrewAI (minimal code, intuitive)
- OpenAI shops: OpenAI Agents SDK (native integration)
- Enterprise: Microsoft Agent Framework (compliance, Azure)
- Long-horizon coding: GPT-5.2-Codex (context compaction)
- Research: AG2 (open community, experimental features)
Common Mistakes
- Mixing frameworks in one project (complexity explosion)
- Ignoring framework maturity (beta vs production)
- No fallback strategy (framework lock-in)
- Overcomplicating simple tasks (use single agent)
Incorrect — using multi-agent framework for simple task:
# Overkill: LangGraph for single-step task
workflow = StateGraph(State)
workflow.add_node("summarize", summarize_node)
workflow.add_edge(START, "summarize")
workflow.add_edge("summarize", END)
app = workflow.compile()
result = await app.ainvoke({"text": "..."})Correct — single LLM call for simple task:
# Simple task = single agent
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-5.2")
result = await llm.ainvoke("Summarize this text: ...")Build role-based agent collaboration with CrewAI Flows, hierarchical crews, and structured outputs — HIGH
CrewAI Patterns (v1.8+)
Role-based multi-agent collaboration with Flows architecture, hierarchical crews, MCP tools, and async execution.
Hierarchical Crew
from crewai import Agent, Crew, Task, Process
from crewai.flow.flow import Flow, listen, start
# Manager coordinates the team
manager = Agent(
role="Project Manager",
goal="Coordinate team efforts and ensure project success",
backstory="Experienced project manager skilled at delegation",
allow_delegation=True,
memory=True,
verbose=True
)
# Specialist agents
researcher = Agent(
role="Researcher",
goal="Provide accurate research and analysis",
backstory="Expert researcher with deep analytical skills",
allow_delegation=False,
verbose=True
)
writer = Agent(
role="Writer",
goal="Create compelling content",
backstory="Skilled writer who creates engaging content",
allow_delegation=False,
verbose=True
)
# Manager-led task
project_task = Task(
description="Create a comprehensive market analysis report",
expected_output="Executive summary, analysis, recommendations",
agent=manager
)
# Hierarchical crew
crew = Crew(
agents=[manager, researcher, writer],
tasks=[project_task],
process=Process.hierarchical,
manager_llm="gpt-5.2",
memory=True,
verbose=True
)
result = crew.kickoff()Flows Architecture (1.8+)
Event-driven orchestration with state management:
from crewai.flow.flow import Flow, listen, start, router
class ResearchFlow(Flow):
@start()
def generate_topic(self):
return "AI Safety"
@listen(generate_topic)
def research_topic(self, topic):
return f"Research findings on {topic}"
@router(research_topic)
def route_result(self, result):
if "sufficient" in result:
return "success"
return "retry"
@listen("success")
def handle_success(self):
return "Workflow completed"
@listen("retry")
def handle_retry(self):
return "Retrying..."
flow = ResearchFlow()
result = flow.kickoff()Parallel Execution with and_/or_
from crewai.flow.flow import Flow, listen, start, and_, or_
class ParallelFlow(Flow):
@start()
def task_a(self):
return "Result A"
@start()
def task_b(self):
return "Result B"
@listen(and_(task_a, task_b))
def combine_results(self):
"""Triggers when BOTH complete"""
return "Combined results"Structured Output
from pydantic import BaseModel
class ReportOutput(BaseModel):
title: str
summary: str
findings: list[str]
confidence: float
task = Task(
description="Analyze market trends",
expected_output="Structured market analysis",
agent=analyst,
output_pydantic=ReportOutput
)
result = crew.kickoff()
report = result.pydanticTask Guardrails
from crewai.tasks import TaskOutput
def validate_length(result: TaskOutput) -> tuple[bool, any]:
if len(result.raw.split()) < 100:
return (False, "Content too brief, expand analysis")
return (True, result.raw)
task = Task(
description="Write comprehensive analysis",
expected_output="Detailed analysis (100+ words)",
agent=writer,
guardrail=validate_length,
guardrail_max_retries=3
)MCP Tool Support (1.8+)
from crewai import Agent
agent = Agent(
role="Research Analyst",
goal="Research and analyze information",
backstory="Expert analyst",
mcps=[
"https://mcp.example.com/mcp?api_key=your_key",
"crewai-amp:financial-data",
]
)Decorator-Based Crew (Recommended)
from crewai import Agent, Crew, Task, CrewBase, agent, task, crew
@CrewBase
class ResearchCrew:
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
@agent
def researcher(self) -> Agent:
return Agent(config=self.agents_config['researcher'], tools=[search_tool])
@task
def research_task(self) -> Task:
return Task(config=self.tasks_config['research'])
@crew
def crew(self) -> Crew:
return Crew(agents=self.agents, tasks=self.tasks, process=Process.sequential)
result = ResearchCrew().crew().kickoff(inputs={"topic": "AI Safety"})Best Practices
- Use Flows for complex multi-step workflows
- Prefer
@CrewBasedecorator-based definition - Enable structured outputs with
output_pydantic - Add guardrails for output validation
- Use
async_execution=Truefor independent tasks - Role clarity: each agent has distinct, non-overlapping role
- One clear deliverable per task
Incorrect — sequential process without hierarchical manager:
crew = Crew(
agents=[researcher, writer, reviewer],
tasks=[research_task, write_task, review_task],
process=Process.sequential # No delegation, rigid order
)Correct — hierarchical process with manager delegation:
crew = Crew(
agents=[manager, researcher, writer, reviewer],
tasks=[project_task],
process=Process.hierarchical, # Manager delegates dynamically
manager_llm="gpt-5.2"
)Generate multi-step plans before execution with replanning when conditions change — HIGH
Plan-and-Execute Pattern
Generate a multi-step plan first, then execute each step sequentially with the option to replan when conditions change.
Core Implementation
async def plan_and_execute(goal: str) -> str:
"""Create plan first, then execute steps."""
# 1. Generate plan
plan = await llm.chat([{
"role": "user",
"content": f"Create a step-by-step plan to: {goal}\n\nFormat as numbered list."
}])
steps = parse_plan(plan.content)
results = []
# 2. Execute each step
for i, step in enumerate(steps):
result = await execute_step(step, context=results)
results.append({"step": step, "result": result})
# 3. Check if replanning needed
if should_replan(results):
return await plan_and_execute(
f"{goal}\n\nProgress so far: {results}"
)
# 4. Synthesize final answer
return await synthesize(goal, results)TypeScript ReAct Agent
interface AgentStep {
thought: string;
action?: string;
actionInput?: unknown;
observation?: string;
}
interface AgentResult {
answer: string;
steps: AgentStep[];
totalCost: number;
iterations: number;
}
export async function reactAgent(
task: string,
options: { maxIterations?: number; verbose?: boolean } = {}
): Promise<AgentResult> {
const { maxIterations = 10, verbose = false } = options;
const steps: AgentStep[] = [];
let totalCost = 0;
const systemPrompt = `You are an autonomous agent that can use tools.
Available tools:
${tools.map(t => `- ${t.name}: ${t.description}`).join('\n')}
Use this format:
Thought: [Your reasoning]
Action: [tool name]
Action Input: {"param": "value"}
Observation: [Tool result]
When done: Answer: [Final answer]`;
const messages = [
{ role: 'system' as const, content: systemPrompt },
{ role: 'user' as const, content: task }
];
for (let i = 0; i < maxIterations; i++) {
const response = await openai.chat.completions.create({
model: 'gpt-5.2', messages, temperature: 0.1
});
const content = response.choices[0].message.content!;
totalCost += (response.usage!.total_tokens / 1000) * 0.01;
if (content.includes('Answer:')) {
const answer = content.split('Answer:')[1].trim();
return { answer, steps, totalCost, iterations: i + 1 };
}
const parsed = parseAgentResponse(content);
const step: AgentStep = { thought: parsed.thought };
if (parsed.action && parsed.actionInput) {
step.action = parsed.action;
step.actionInput = JSON.parse(parsed.actionInput);
step.observation = await executeTool(parsed.action, step.actionInput);
}
steps.push(step);
messages.push({ role: 'assistant', content });
if (step.observation) {
messages.push({ role: 'user', content: `Observation: ${step.observation}` });
}
}
throw new Error(`Agent exceeded max iterations (${maxIterations})`);
}Function Calling Agent (Alternative)
export async function functionCallingAgent(task: string): Promise<AgentResult> {
const steps: AgentStep[] = [];
let totalCost = 0;
const messages = [
{ role: 'system' as const, content: 'You are a helpful assistant with access to tools.' },
{ role: 'user' as const, content: task }
];
const openaiTools = tools.map(tool => ({
type: 'function' as const,
function: { name: tool.name, description: tool.description, parameters: tool.parameters }
}));
let iteration = 0;
while (iteration < 10) {
const response = await openai.chat.completions.create({
model: 'gpt-5.2', messages, tools: openaiTools
});
const message = response.choices[0].message;
totalCost += (response.usage!.total_tokens / 1000) * 0.01;
// No tool calls = final answer
if (!message.tool_calls) {
return { answer: message.content!, steps, totalCost, iterations: iteration + 1 };
}
messages.push(message as any);
for (const toolCall of message.tool_calls) {
const tool = tools.find(t => t.name === toolCall.function.name);
if (!tool) continue;
const args = JSON.parse(toolCall.function.arguments);
const result = await tool.execute(args);
steps.push({
thought: `Calling ${toolCall.function.name}`,
action: toolCall.function.name,
actionInput: args,
observation: JSON.stringify(result)
});
messages.push({ role: 'tool', tool_call_id: toolCall.id, content: JSON.stringify(result) });
}
iteration++;
}
throw new Error('Agent exceeded max iterations');
}When to Choose Which Pattern
| Pattern | Best For | Trade-off |
|---|---|---|
| ReAct | Exploratory tasks, research | More flexible, harder to control |
| Plan-Execute | Well-defined goals | Structured, but replanning adds cost |
| Function Calling | Production APIs | Most reliable, requires tool schemas |
| Self-Correction | Quality-critical output | Higher cost, better quality |
Incorrect — executing without planning first:
async def execute_goal(goal: str):
steps = ["step1", "step2", "step3"] # Hardcoded, no reasoning
for step in steps:
await execute_step(step)Correct — LLM generates plan before execution:
async def plan_and_execute(goal: str):
plan = await llm.chat([{"role": "user", "content": f"Create plan for: {goal}"}])
steps = parse_plan(plan.content)
for step in steps:
result = await execute_step(step)
if should_replan(result):
return await plan_and_execute(f"{goal}\nProgress: {result}")Implement ReAct pattern where agents reason, act via tools, observe, and iterate — HIGH
ReAct Pattern (Reasoning + Acting)
LLMs reason step-by-step, take actions via tools, observe results, and iterate until a final answer is reached.
ReAct Prompt Template
REACT_PROMPT = """You are an agent that reasons step by step.
For each step, respond with:
Thought: [your reasoning about what to do next]
Action: [tool_name(arg1, arg2)]
Observation: [you'll see the result here]
When you have the final answer:
Thought: I now have enough information
Final Answer: [your response]
Available tools: {tools}
Question: {question}
"""Python Implementation
async def react_loop(question: str, tools: dict, max_steps: int = 10) -> str:
"""Execute ReAct reasoning loop."""
history = REACT_PROMPT.format(tools=list(tools.keys()), question=question)
for step in range(max_steps):
response = await llm.chat([{"role": "user", "content": history}])
history += response.content
# Check for final answer
if "Final Answer:" in response.content:
return response.content.split("Final Answer:")[-1].strip()
# Extract and execute action
if "Action:" in response.content:
action = parse_action(response.content)
result = await tools[action.name](*action.args)
history += f"\nObservation: {result}\n"
return "Max steps reached without answer"Self-Correction Loop
async def self_correcting_agent(task: str, max_retries: int = 3) -> str:
"""Agent that validates and corrects its own output."""
for attempt in range(max_retries):
response = await llm.chat([{"role": "user", "content": task}])
# Self-validate
validation = await llm.chat([{
"role": "user",
"content": f"""Validate this response for the task: {task}
Response: {response.content}
Check for:
1. Correctness - Is it factually accurate?
2. Completeness - Does it fully answer the task?
3. Format - Is it properly formatted?
If valid, respond: VALID
If invalid, respond: INVALID: [what's wrong and how to fix]"""
}])
if "VALID" in validation.content:
return response.content
# Correct based on feedback
task = f"{task}\n\nPrevious attempt had issues: {validation.content}"
return response.content # Return best attemptMemory Management
class AgentMemory:
"""Sliding window memory for agents."""
def __init__(self, max_messages: int = 20):
self.messages = []
self.max_messages = max_messages
self.summary = ""
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
if len(self.messages) > self.max_messages:
self._compress()
def _compress(self):
"""Summarize oldest messages."""
old = self.messages[:10]
self.messages = self.messages[10:]
summary = summarize(old)
self.summary = f"{self.summary}\n{summary}"
def get_context(self) -> list:
"""Get messages with summary prefix."""
context = []
if self.summary:
context.append({
"role": "system",
"content": f"Previous context summary: {self.summary}"
})
return context + self.messagesKey Decisions
| Decision | Recommendation |
|---|---|
| Max steps | 5-15 (prevent infinite loops) |
| Temperature | 0.3-0.7 (balance creativity/focus) |
| Memory window | 10-20 messages |
| Validation frequency | Every 3-5 steps |
Common Mistakes
- No step limit (infinite loops)
- No memory management (context overflow)
- No error recovery (crashes on tool failure)
- Over-complex prompts (agent gets confused)
Incorrect — ReAct loop without step limit:
async def react_loop(question: str, tools: dict):
history = f"Question: {question}"
while True: # Infinite loop risk
response = await llm.chat([{"role": "user", "content": history}])
if "Final Answer:" in response.content:
return response.contentCorrect — max_steps prevents infinite loops:
async def react_loop(question: str, tools: dict, max_steps: int = 10):
history = f"Question: {question}"
for step in range(max_steps): # Bounded loop
response = await llm.chat([{"role": "user", "content": history}])
if "Final Answer:" in response.content:
return response.content.split("Final Answer:")[-1]
return "Max steps reached without answer"Resolve agent disagreements through confidence scores, LLM arbitration, or majority voting — HIGH
Agent Debate & Conflict Resolution
Patterns for handling disagreements between agents and establishing communication channels.
Conflict Resolution
async def resolve_conflicts(findings: list[dict]) -> list[dict]:
"""When agents disagree, resolve by confidence or LLM."""
conflicts = detect_conflicts(findings)
if not conflicts:
return findings
for conflict in conflicts:
# Option 1: Higher confidence wins
winner = max(conflict.agents, key=lambda a: a.confidence)
# Option 2: LLM arbitration
resolution = await llm.chat([{
"role": "user",
"content": f"""Two agents disagree:
Agent A ({conflict.agent_a.name}): {conflict.agent_a.finding}
Agent B ({conflict.agent_b.name}): {conflict.agent_b.finding}
Which is more likely correct and why?"""
}])
conflict.resolution = parse_resolution(resolution.content)
return apply_resolutions(findings, conflicts)Structured Conflict Detection
async def resolve_agent_conflicts(
findings: list[dict], llm: Any
) -> dict:
"""Resolve conflicts between agent outputs."""
conflicts = []
for i, f1 in enumerate(findings):
for f2 in findings[i+1:]:
if f1.get("recommendation") != f2.get("recommendation"):
conflicts.append((f1, f2))
if not conflicts:
return {"status": "no_conflicts", "findings": findings}
# LLM arbitration
resolution = await llm.ainvoke(f"""
Agents disagree. Determine best recommendation:
Agent 1: {conflicts[0][0]}
Agent 2: {conflicts[0][1]}
Provide: winner, reasoning, confidence (0-1)
""")
return {"status": "resolved", "resolution": resolution}Agent Communication Bus
class AgentBus:
"""Message passing between agents."""
def __init__(self):
self.messages = []
self.subscribers = {}
def publish(self, from_agent: str, message: dict):
"""Broadcast message to all agents."""
msg = {"from": from_agent, "data": message, "ts": time.time()}
self.messages.append(msg)
for callback in self.subscribers.values():
callback(msg)
def subscribe(self, agent_id: str, callback):
"""Register agent to receive messages."""
self.subscribers[agent_id] = callback
def get_history(self, agent_id: str = None) -> list:
"""Get message history, optionally filtered."""
if agent_id:
return [m for m in self.messages if m["from"] == agent_id]
return self.messagesResolution Strategies
| Strategy | When to Use | Trade-off |
|---|---|---|
| Confidence-based | Agents provide confidence scores | Fast but requires calibrated scores |
| LLM arbitration | Complex disagreements | Higher quality but adds LLM cost |
| Majority voting | 3+ agents on same question | Simple but requires odd count |
| Weighted consensus | Agents have different expertise | Best for specialized teams |
| Human-in-the-loop | High-stakes decisions | Most reliable but slowest |
Common Mistakes
- No timeout per agent (one slow agent blocks all)
- No error isolation (one failure crashes workflow)
- Over-coordination (too much overhead)
- Using Agent Teams for simple sequential work (use Task tool)
- Broadcasting when a direct message suffices (wastes tokens)
Incorrect — no conflict resolution strategy:
async def multi_agent_analysis(task: str):
results = await asyncio.gather(agent1(task), agent2(task))
return results # Return conflicting results without resolutionCorrect — LLM arbitration resolves conflicts:
async def multi_agent_analysis(task: str):
results = await asyncio.gather(agent1(task), agent2(task))
if results[0] != results[1]: # Conflict detected
resolution = await llm.chat([{
"role": "user",
"content": f"Agent 1: {results[0]}\nAgent 2: {results[1]}\nWhich is correct?"
}])
return resolution.content
return results[0]Coordinate specialist agents through a central supervisor with parallel execution and timeouts — CRITICAL
Supervisor Pattern
Central coordinator that routes tasks to specialist agents, supports parallel and sequential execution, and aggregates results.
Fan-Out/Fan-In
async def multi_agent_analysis(content: str) -> dict:
"""Fan-out to specialists, fan-in to synthesize."""
agents = [
("security", security_agent),
("performance", performance_agent),
("code_quality", quality_agent),
("architecture", architecture_agent),
]
# Fan-out: Run all agents in parallel
tasks = [agent(content) for _, agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful results
findings = [
{"agent": name, "result": result}
for (name, _), result in zip(agents, results)
if not isinstance(result, Exception)
]
# Fan-in: Synthesize findings
return await synthesize_findings(findings)Supervisor with Routing
class Supervisor:
"""Central coordinator that routes to specialists."""
def __init__(self, agents: dict):
self.agents = agents # {"security": agent, "performance": agent}
self.completed = []
async def run(self, task: str) -> dict:
"""Route task through appropriate agents."""
# 1. Determine which agents to use
plan = await self.plan_routing(task)
# 2. Execute in dependency order
results = {}
for agent_name in plan.execution_order:
if plan.can_parallelize(agent_name):
batch = plan.get_parallel_batch(agent_name)
batch_results = await asyncio.gather(*[
self.agents[name](task, context=results)
for name in batch
])
results.update(dict(zip(batch, batch_results)))
else:
results[agent_name] = await self.agents[agent_name](
task, context=results
)
return results
async def plan_routing(self, task: str) -> RoutingPlan:
"""Use LLM to determine agent routing."""
response = await llm.chat([{
"role": "user",
"content": f"""Task: {task}
Available agents: {list(self.agents.keys())}
Which agents should handle this task?
What order? Can any run in parallel?"""
}])
return parse_routing_plan(response.content)Supervisor-Worker with Timeout
class SupervisorCoordinator:
"""Central supervisor that routes tasks to worker agents."""
def __init__(self, workers: dict[str, Agent]):
self.workers = workers
self.execution_log: list[dict] = []
async def route_and_execute(
self, task: str, required_agents: list[str], parallel: bool = True
) -> dict[str, Any]:
context = {"task": task, "results": {}}
if parallel:
tasks = [self._run_worker(name, task, context) for name in required_agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
return dict(zip(required_agents, results))
else:
for name in required_agents:
context["results"][name] = await self._run_worker(name, task, context)
return context["results"]
async def _run_worker(self, name: str, task: str, context: dict) -> dict:
"""Execute single worker with timeout."""
try:
result = await asyncio.wait_for(
self.workers[name].run(task, context), timeout=30.0
)
self.execution_log.append({"agent": name, "status": "success", "result": result})
return result
except asyncio.TimeoutError:
return {"error": f"{name} timed out"}CC Agent Teams (CC 2.1.33+)
CC 2.1.33 introduces native Agent Teams with peer-to-peer messaging and mesh topology.
Star vs Mesh Topology
Star (Task tool): Mesh (Agent Teams):
Lead Lead (delegate)
/||\ / | \
/ || \ / | \
A B C D A <-> B <-> C
(no cross-talk) (peer messaging)Dual-Mode Decision Tree
Complexity Assessment:
+-- Score < 3.0 -> Task tool subagents (cheaper, simpler)
+-- Score 3.0-3.5 -> User choice (recommend Teams for cross-cutting)
+-- Score > 3.5 -> Agent Teams (if CLAUDE_CODE_EXPERIMENTAL_AGENT_TEAMS=1)Team Formation
# 1. Create team with shared task list
TeamCreate(team_name="feature-auth", description="User auth implementation")
# 2. Create tasks in shared list
TaskCreate(subject="Design API schema", description="...")
TaskCreate(subject="Build React components", description="...", addBlockedBy=["1"])
# 3. Spawn teammates
Task(prompt="You are the backend architect...",
team_name="feature-auth", name="backend-dev",
subagent_type="backend-system-architect")Peer Messaging
# Direct message (default)
SendMessage(type="message", recipient="frontend-dev",
content="API contract: GET /users/:id -> {id, name, email}",
summary="API contract ready")
# Broadcast (expensive -- use sparingly)
SendMessage(type="broadcast",
content="Auth header format changed to Bearer",
summary="Breaking auth change")Cost Comparison
| Scenario | Task Tool | Agent Teams | Ratio |
|---|---|---|---|
| 3-agent review | ~150K tokens | ~400K tokens | 2.7x |
| 8-agent feature | ~500K tokens | ~1.2M tokens | 2.4x |
| 6-agent research | ~300K tokens | ~800K tokens | 2.7x |
Key Decisions
| Decision | Recommendation |
|---|---|
| Agent count | 3-8 specialists |
| Parallelism | Parallelize independent agents |
| Worker timeout | 30s default |
| Communication | Shared state, message bus, or SendMessage (CC 2.1.33+) |
| Topology | Task tool (star) for simple; Agent Teams (mesh) for complex |
Incorrect — sequential execution of independent agents:
async def analyze(content: str):
security_result = await security_agent(content) # Wait
perf_result = await performance_agent(content) # Wait
quality_result = await quality_agent(content) # Wait
return [security_result, perf_result, quality_result]Correct — parallel fan-out for independent agents:
async def analyze(content: str):
tasks = [
security_agent(content),
performance_agent(content),
quality_agent(content)
]
results = await asyncio.gather(*tasks) # Run in parallel
return resultsSynthesize parallel agent outputs into coherent actionable results with quality metrics — HIGH
Result Synthesis
Combine outputs from multiple parallel agents into coherent, actionable results.
Synthesis Pattern
async def synthesize_findings(findings: list[dict]) -> dict:
"""Combine multiple agent outputs into coherent result."""
# Group by category
by_category = {}
for f in findings:
cat = f.get("category", "general")
by_category.setdefault(cat, []).append(f)
# Synthesize each category
synthesis = await llm.chat([{
"role": "user",
"content": f"""Synthesize these agent findings into a coherent summary:
{json.dumps(by_category, indent=2)}
Output format:
- Executive summary (2-3 sentences)
- Key findings by category
- Recommendations
- Confidence score (0-1)"""
}])
return parse_synthesis(synthesis.content)Multi-Agent Collaboration (TypeScript)
export async function multiAgentCollaboration(
task: string,
agents: Agent[]
): Promise<AgentResult> {
const steps: AgentStep[] = [];
let totalCost = 0;
// 1. Coordinator plans the task
const planResponse = await openai.chat.completions.create({
model: 'gpt-5.2',
messages: [
{
role: 'system',
content: `You are a coordinator. Break down tasks and assign to agents:
${agents.map(a => `- ${a.name}: ${a.role}`).join('\n')}
Provide a numbered plan with agent assignments.`
},
{ role: 'user', content: `Task: ${task}\n\nProvide a step-by-step plan.` }
]
});
const plan = planResponse.choices[0].message.content!;
steps.push({ thought: 'Coordinator planning', observation: plan });
// 2. Execute agent subtasks in parallel
const agentResults = await Promise.all(
agents.map(async (agent) => {
const response = await openai.chat.completions.create({
model: 'gpt-5.2',
messages: [
{ role: 'system', content: agent.systemPrompt },
{ role: 'user', content: `Task: ${task}\n\nPlan:\n${plan}\n\nComplete your part.` }
]
});
return { agent: agent.name, result: response.choices[0].message.content! };
})
);
// 3. Synthesize results
const synthesisResponse = await openai.chat.completions.create({
model: 'gpt-5.2',
messages: [
{ role: 'system', content: 'Synthesize agent results into a coherent final answer.' },
{ role: 'user', content: `Task: ${task}\n\nAgent Results:\n${JSON.stringify(agentResults, null, 2)}` }
]
});
return {
answer: synthesisResponse.choices[0].message.content!,
steps,
totalCost,
iterations: agents.length + 2
};
}Aggregation Strategies
Comparative (Default)
Compare metrics across all agents:
{
"quality_comparison": {
"security_agent": {"score": 0.92, "findings": 3},
"perf_agent": {"score": 0.88, "findings": 5}
},
"consensus_items": ["Use parameterized queries", "Add rate limiting"],
"disagreements": []
}Pattern Extraction
Find common patterns:
{
"success_patterns": ["Caching strategy effective", "Batch size 50+ preferred"],
"failure_patterns": ["Timeout at >5000 items per batch"]
}Recommendation Engine
Generate actionable recommendations:
{
"priority_1": "Fix SQL injection in auth module (security + quality agree)",
"priority_2": "Add connection pooling (performance agent)",
"confidence": 0.91
}Cost Optimization
- Batch similar tasks to reduce overhead
- Cache agent results by task hash
- Use cheaper models for simple agents
- Parallelize independent agents always
- Max parallel agents: 8
Orchestration Checklist
- Define agent responsibilities (single responsibility per agent)
- Plan communication patterns (shared state, message bus, or SendMessage)
- Set coordination strategy (central orchestrator with task queue)
- Design failure handling (timeout per agent, error isolation)
- Agent health checks and performance metrics
Incorrect — returning raw agent outputs without synthesis:
async def multi_agent_analysis(task: str):
results = await asyncio.gather(agent1(task), agent2(task), agent3(task))
return results # Returns list of disconnected findingsCorrect — synthesizing results into coherent output:
async def multi_agent_analysis(task: str):
results = await asyncio.gather(agent1(task), agent2(task), agent3(task))
synthesis = await llm.chat([{
"role": "user",
"content": f"Synthesize these findings: {json.dumps(results)}"
}])
return parse_synthesis(synthesis.content)Test skills across three parallel scenarios with progressive difficulty and synchronized execution — MEDIUM
Multi-Scenario Orchestrator
Run a single skill across 3 parallel scenarios (simple/medium/complex) with synchronized execution and progressive difficulty.
Core Pattern
+---------------------------------------------------------------------+
| MULTI-SCENARIO ORCHESTRATOR |
+---------------------------------------------------------------------+
| [Coordinator] --+--> [Scenario 1: Simple] (Easy) |
| ^ | +--> [Skill Instance 1] |
| | +--> [Scenario 2: Medium] (Intermediate) |
| | | +--> [Skill Instance 2] |
| | +--> [Scenario 3: Complex] (Advanced) |
| | +--> [Skill Instance 3] |
| | |
| [State Manager] <---- All instances report progress |
| [Aggregator] --> Cross-scenario synthesis |
+---------------------------------------------------------------------+When to Use
| Scenario | Example |
|---|---|
| Skill demos | Show /ork:implement on simple, medium, complex tasks |
| Progressive testing | Validate skill scales with complexity |
| Comparative analysis | How does approach differ by difficulty? |
| Training/tutorials | Show skill progression from easy to hard |
LangGraph Implementation
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send
async def scenario_supervisor(state: ScenarioOrchestratorState) -> list[Command]:
"""Route to all 3 scenarios in parallel."""
for scenario_id in ["simple", "medium", "complex"]:
state[f"progress_{scenario_id}"] = ScenarioProgress(
scenario_id=scenario_id, status="pending",
start_time_ms=int(time.time() * 1000)
)
return [
Send("scenario_worker", {"scenario_id": "simple", **state}),
Send("scenario_worker", {"scenario_id": "medium", **state}),
Send("scenario_worker", {"scenario_id": "complex", **state}),
]
async def scenario_worker(state: ScenarioOrchestratorState) -> dict:
"""Execute one scenario."""
scenario_id = state.get("scenario_id")
progress = state[f"progress_{scenario_id}"]
scenario_def = state[f"scenario_{scenario_id}"]
progress.status = "running"
try:
result = await execute_skill_with_milestones(
skill_name=state["skill_name"],
scenario_def=scenario_def,
progress=progress, state=state
)
progress.status = "complete"
progress.elapsed_ms = int(time.time() * 1000) - progress.start_time_ms
return {f"progress_{scenario_id}": progress}
except Exception as e:
progress.status = "failed"
progress.errors.append({"message": str(e)})
return {f"progress_{scenario_id}": progress}
async def scenario_aggregator(state: ScenarioOrchestratorState) -> dict:
"""Collect all results and synthesize findings."""
aggregated = {
"orchestration_id": state["orchestration_id"],
"metrics": {},
"comparison": {},
"recommendations": []
}
for scenario_id in ["simple", "medium", "complex"]:
progress = state[f"progress_{scenario_id}"]
aggregated["metrics"][scenario_id] = {
"elapsed_ms": progress.elapsed_ms,
"items_processed": progress.items_processed,
"quality_scores": progress.quality_scores,
}
return {"final_results": aggregated}
# Build graph
graph = StateGraph(ScenarioOrchestratorState)
graph.add_node("supervisor", scenario_supervisor)
graph.add_node("scenario_worker", scenario_worker)
graph.add_node("aggregator", scenario_aggregator)
graph.add_edge(START, "supervisor")
graph.add_edge("scenario_worker", "aggregator")
graph.add_edge("aggregator", END)
app = graph.compile(checkpointer=checkpointer)Skill-Agnostic Template
from abc import ABC, abstractmethod
class SkillOrchestrator(ABC):
"""Abstract orchestrator for any user-invocable skill."""
def __init__(self, skill_name: str, skill_version: str):
self.skill_name = skill_name
self.skill_version = skill_version
@abstractmethod
async def invoke_skill(self, input_data: list[dict], scenario_params: dict) -> dict:
"""Invoke your skill on input data."""
pass
@abstractmethod
def get_scenario_configs(self) -> dict[str, dict]:
"""Return configs for simple/medium/complex."""
pass
@abstractmethod
def calculate_quality_metrics(self, results: list[dict], metric_names: list[str]) -> dict:
"""Calculate quality metrics from results."""
pass
async def orchestrate(self, orchestration_id: str) -> dict:
"""Run all 3 scenarios in parallel and aggregate."""
results = await asyncio.gather(
self.run_scenario("simple", orchestration_id),
self.run_scenario("medium", orchestration_id),
self.run_scenario("complex", orchestration_id),
return_exceptions=True
)
return self.aggregate_results(results)Difficulty Scaling
| Level | Complexity | Input Size | Time Budget | Quality |
|---|---|---|---|---|
| Simple | 1x | Small (10-100) | 30s | Basic |
| Medium | 3x | Medium (30-300) | 90s | Good |
| Complex | 8x | Large (80-800) | 300s | Excellent |
Output Example
{
"orchestration_id": "demo-001",
"quality_comparison": {
"simple": 0.92, "medium": 0.88, "complex": 0.84
},
"scaling_analysis": {
"time_per_item_ms": {
"simple": 0.012, "medium": 0.012, "complex": 0.032
},
"recommendation": "Sublinear scaling up to 3x, superlinear at 8x"
}
}Common Mistakes
- Sequential instead of parallel (defeats purpose)
- No synchronization (results appear disjointed)
- Unclear difficulty scaling (differ in scale, not approach)
- Missing aggregation (individual results lack comparative insights)
Incorrect — running scenarios sequentially:
async def orchestrate(skill_name: str):
simple = await run_scenario("simple", skill_name) # Wait
medium = await run_scenario("medium", skill_name) # Wait
complex = await run_scenario("complex", skill_name) # Wait
return [simple, medium, complex]Correct — parallel execution of all scenarios:
async def orchestrate(skill_name: str):
results = await asyncio.gather(
run_scenario("simple", skill_name),
run_scenario("medium", skill_name),
run_scenario("complex", skill_name)
)
return aggregate_results(results)Synchronize milestones, scale difficulty, and recover from failures across multi-scenario orchestration — MEDIUM
Scenario Routing & Synchronization
Milestone synchronization modes, difficulty scaling strategies, checkpointing, and failure recovery for multi-scenario orchestration.
Synchronization Modes
| Mode | Description | Use When |
|---|---|---|
| Free-running | All run independently | Demo videos, production |
| Milestone-sync | Wait at 30%, 70%, 100% | Comparative analysis |
| Lock-step | All proceed together | Training, tutorials |
Milestone Synchronization
async def synchronize_at_milestone(
milestone_pct: int,
state: ScenarioOrchestratorState,
timeout_seconds: int = 30
) -> bool:
"""Wait for all scenarios to reach milestone."""
start = time.time()
while time.time() - start < timeout_seconds:
simple_at = milestone_pct in state["progress_simple"].milestones_reached
medium_at = milestone_pct in state["progress_medium"].milestones_reached
complex_at = milestone_pct in state["progress_complex"].milestones_reached
if simple_at and medium_at and complex_at:
print(f"[SYNC] All scenarios reached {milestone_pct}%")
return True
if any(state[f"progress_{s}"].status == "failed"
for s in ["simple", "medium", "complex"]):
print(f"[SYNC] A scenario failed, proceeding without sync")
return False
await asyncio.sleep(0.5)
print(f"[SYNC] Timeout at {milestone_pct}%, proceeding")
return FalseInput Scaling Strategies
Linear Scaling (I/O-bound skills)
Simple: 100 items
Medium: 300 items (3x)
Complex: 800 items (8x)
Time: O(n) -- expected medium ~3x simpleAdaptive Scaling (per-skill tuning)
SKILL_SCALING_PROFILES = {
"performance-testing": {
"scaling": "linear",
"simple": 10, "medium": 30, "complex": 80
},
"security-scanning": {
"scaling": "sublinear",
"simple": 20, "medium": 100, "complex": 500
},
"data-transformation": {
"scaling": "quadratic",
"simple": 100, "medium": 200, "complex": 300
}
}Complexity Detection
# Calculate actual time complexity
simple_tpi = simple_time / simple_size # time per item
medium_tpi = medium_time / medium_size
complex_tpi = complex_time / complex_size
ratio = complex_tpi / simple_tpi # >2 = superlinearFailure Recovery
One Scenario Fails (Independent)
try:
result = await invoke_skill(batch)
except Exception as e:
progress.errors.append({"message": str(e), "batch_index": i})
# Don't raise -- let other scenarios continueTimeout Handling
async def invoke_skill_with_timeout(skill, input_data, timeout_seconds):
try:
return await asyncio.wait_for(
invoke_skill(skill, input_data),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
return {
"processed": len(input_data),
"results": [],
"error": "timeout",
"quality_score": 0.0,
}All Scenarios Fail (Systematic)
async def orchestrator_with_recovery(initial_state):
result = await app.ainvoke(initial_state)
all_failed = all(
state[f"progress_{s}"].status == "failed"
for s in ["simple", "medium", "complex"]
)
if all_failed:
# 1. Reduce resource contention
# 2. Retry with smaller batches
# 3. Or abort with diagnostic info
return retry_with_reduced_load(initial_state)Checkpointing
Scenario-Level Checkpoints
INSERT INTO scenario_checkpoints (
orchestration_id, scenario_id, milestone_pct, elapsed_ms, state_snapshot
) VALUES (
'demo-001', 'medium', 30, 3200, '{"items": 90, "results": [...]}'
);Full-State Snapshots
async def checkpoint_full_state(state: ScenarioOrchestratorState):
checkpoint_data = {
"orchestration_id": state["orchestration_id"],
"timestamp": datetime.now().isoformat(),
"progress_simple": state["progress_simple"].to_dict(),
"progress_medium": state["progress_medium"].to_dict(),
"progress_complex": state["progress_complex"].to_dict(),
}
await db.insert("full_state_checkpoints", checkpoint_data)Quality Metrics Framework
Functional Metrics (per-skill)
{
"performance-testing": {
"latency_p95_ms": {"target": "<500ms", "weight": 0.5},
"error_rate": {"target": "<1%", "weight": 0.5},
},
"security-scanning": {
"vulnerabilities_found": {"target": ">0", "weight": 0.3},
"coverage_pct": {"target": "100%", "weight": 0.7},
}
}Comparative Metrics
{
"quality_scaling": {
"formula": "complex_quality / simple_quality",
"expected": 1.0,
"acceptable": ">0.8"
},
"time_efficiency": {
"formula": "simple_tpi / complex_tpi",
"expected": 1.0,
"acceptable": ">0.5"
}
}Multi-Host Execution
For greater parallelism, run scenarios on different machines sharing the same database:
# Host 1: Coordinator + Simple
python coordinator.py
# Host 2: Medium (different machine, same DB)
export DATABASE_URL="postgresql://user:pass@coordinator-host/orchestkit"
export SCENARIO_ID=medium
python run_scenario.py
# Host 3: Complex
export SCENARIO_ID=complex
python run_scenario.pyKey Decisions
| Decision | Recommendation |
|---|---|
| Synchronization mode | Free-running with checkpoints |
| Scenario count | Always 3: simple, medium, complex |
| Input scaling | 1x, 3x, 8x (exponential) |
| Time budgets | 30s, 90s, 300s |
| Checkpoint frequency | Every milestone + completion |
Incorrect — no timeout on skill invocation:
async def run_scenario(scenario_id: str, skill_name: str):
result = await invoke_skill(skill_name, get_input(scenario_id)) # Hangs forever
return resultCorrect — timeout prevents infinite hangs:
async def run_scenario(scenario_id: str, skill_name: str):
timeout = {"simple": 30, "medium": 90, "complex": 300}[scenario_id]
try:
result = await asyncio.wait_for(
invoke_skill(skill_name, get_input(scenario_id)),
timeout=timeout
)
return result
except asyncio.TimeoutError:
return {"error": "timeout", "scenario_id": scenario_id}References (11)
Architectural Patterns
Architectural Patterns for Multi-Scenario Orchestration
Deep patterns and design decisions for production multi-scenario demos.
Pattern 1: Three-Tier Synchronization
Tier 1: Free-Running (Baseline)
Each scenario runs independently, no blocking.
Time →
─────────────────────────────────────────────────┐
Simple ███████████████ Complete at 1.2s │
└─────────────────────────────────────┘ │
│
Medium ██████████████████████░░░░ In progress at 3.5s
└──────────────────────────────────┘ │
│
Complex ██████████░░░░░░░░░░░░░░░░ In progress at 25.7s
└─────────────────────────────────────┘ │
─────────────────────────────────────────────────┘Advantages:
- Realistic—shows natural skill behavior
- Tolerates slowness in one scenario
- Lower synchronization overhead
Implementation:
# Each scenario runs its own event loop
# No waiting between scenarios
# Checkpoints are independentTier 2: Milestone Synchronization
Scenarios pause at checkpoints (30%, 50%, 70%, 90%) to allow others to catch up.
Time →
─────────────────────────────────────────────────┐
Simple ███ PAUSE ███ PAUSE ███ Complete │
└───┬────────┬────────┬────────────────┘│
│ │ │ │
Medium ██ PAUSE ██ PAUSE ██████░░░░ In-prog │
└────┬────────┬──────────────────────┘│
│ │ │
Complex █ PAUSE █ PAUSE ██░░░░░░░░░░░░░░░ │
└──┬────────┬──────────────────────────┘│
─────────────────────────────────────────────────┘Advantages:
- Synchronized checkpoints for state capture
- Better for demos (shows progression together)
- Easier to explain ("all at 30%")
Implementation:
async def synchronize_at_milestone(milestone_pct, timeout_seconds=60):
while time.time() - start < timeout_seconds:
if all_scenarios_at_milestone:
return True
await asyncio.sleep(0.5)
# Timeout: proceed anyway (don't block forever)
return FalseTier 3: Lock-Step (Strict Synchronization)
All scenarios advance together, slowest determines pace.
Time →
─────────────────────────────────────────────────┐
Step 1: Simple ███ | Medium ███ | Complex █ │
└────────────────────────────────────────┘│
Step 2: Simple ███ | Medium ███ | Complex █ │
└────────────────────────────────────────┘│
Step 3: All complete together │
─────────────────────────────────────────────────┘Advantages:
- Perfect synchronization for demos
- Easy to explain ("all scenarios complete together")
Disadvantages:
- Complex scenario blocks others (1-2 min delays)
- Unrealistic performance representation
Recommendation: Use Tier 1 (Free-Running) for production, Tier 2 (Milestone) for interactive demos.
Pattern 2: Input Scaling Strategies
Strategy A: Linear Scaling (Additive)
Simple: 100 items
Medium: 100 + 200 = 300 items (+200%)
Complex: 300 + 500 = 800 items (+267%)
Time complexity: O(n)
Expected medium time ≈ 3x simple
Expected complex time ≈ 8x simpleBest for: I/O-bound skills (API calls, database queries)
Strategy B: Exponential Scaling (Multiplicative)
Simple: 100 items
Medium: 100 × 3 = 300 items (3x)
Complex: 100 × 8 = 800 items (8x)
Time complexity: O(n) or O(n log n)
Expected medium time ≈ 3x simple (if linear)
Expected complex time ≈ 8x simple (if linear)Best for: Batch processing, LLM calls
Strategy C: Quadratic Scaling
Simple: 100 items
Medium: 300 items (3x)
Complex: 800 items (8x)
But if algorithm is O(n²):
Expected medium time ≈ 9x simple
Expected complex time ≈ 64x simpleDetection:
# Calculate actual time complexity
simple_time = 1.2 # seconds
medium_time = 3.5
complex_time = 25.7
simple_size = 100
medium_size = 300
complex_size = 800
# Time per item
simple_tpi = simple_time / simple_size # 0.012 s/item
medium_tpi = medium_time / medium_size # 0.012 s/item
complex_tpi = complex_time / complex_size # 0.032 s/item
# Ratio indicates scaling behavior
ratio = complex_tpi / simple_tpi # 2.67 → O(n log n) or worseStrategy D: Adaptive Scaling
Choose scaling based on skill characteristics:
SKILL_SCALING_PROFILES = {
"performance-testing": {
"scaling": "linear",
"simple": 10,
"medium": 30,
"complex": 80
},
"security-scanning": {
"scaling": "sublinear", # Gets faster with caching
"simple": 20,
"medium": 100,
"complex": 500
},
"data-transformation": {
"scaling": "quadratic", # O(n²) worst case
"simple": 100,
"medium": 200, # Limit increase
"complex": 300
}
}Pattern 3: Quality Metrics Framework
Metric Category 1: Functional Metrics
What the skill is designed to measure:
{
"performance-testing": {
"latency_p95_ms": {"target": "<500ms", "weight": 0.5},
"error_rate": {"target": "<1%", "weight": 0.5},
},
"security-scanning": {
"vulnerabilities_found": {"target": ">0", "weight": 0.3},
"coverage_pct": {"target": "100%", "weight": 0.7},
}
}Metric Category 2: Comparative Metrics
How scenarios compare:
{
"quality_scaling": {
"formula": "complex_quality / simple_quality",
"expected": 1.0, # Expect no degradation
"acceptable": ">0.8"
},
"time_efficiency": {
"formula": "simple_tpi / complex_tpi",
"expected": 1.0, # Linear scaling
"acceptable": ">0.5"
},
"resource_efficiency": {
"formula": "quality_per_second_complex / quality_per_second_simple",
"expected": 0.8, # Complex less efficient (higher overhead)
"acceptable": ">0.5"
}
}Metric Category 3: Stability Metrics
Consistency across scenarios:
{
"quality_variance": {
"formula": "stdev(simple_quality, medium_quality, complex_quality)",
"expected": "<0.05",
"interpretation": "Low variance = stable algorithm"
},
"error_consistency": {
"formula": "all_scenarios_error_rate < threshold",
"expected": True,
"interpretation": "Same error rate across loads"
}
}Pattern 4: Failure Modes & Recovery
Failure Mode 1: One Scenario Fails (Independent)
Simple ███████████████ Complete ✓
Medium ██████████ FAILED ✗
Complex ███ In progress...
Recovery:
• Medium stores checkpoint at failure point
• Can be restarted independently
• Simple/Complex continue
• Aggregator combines partial resultsImplementation:
# Isolate failures
try:
result = await invoke_skill(batch)
except Exception as e:
progress.errors.append({"message": str(e), "batch_index": i})
# Don't raise—let other scenarios continue
# Report but don't block
if progress.errors:
print(f"⚠ {scenario_id} had {len(progress.errors)} errors")Failure Mode 2: All Scenarios Fail (Systematic)
Simple ███ FAILED ✗
Medium ███ FAILED ✗
Complex ███ FAILED ✗
Possible causes:
• Skill has a bug
• Resource limit exceeded
• Network/database unavailableRecovery:
async def orchestrator_with_recovery(initial_state):
"""Attempt recovery if all scenarios fail."""
result = await app.ainvoke(initial_state)
all_failed = all(
state[f"progress_{s}"].status == "failed"
for s in ["simple", "medium", "complex"]
)
if all_failed:
print("All scenarios failed—attempting recovery...")
# 1. Reduce resource contention
# 2. Retry with smaller batches
# 3. Or abort with diagnostic info
return retry_with_reduced_load(initial_state)Failure Mode 3: Timeout (Skill Takes Too Long)
Simple ███████████ Complete in 1.2s (budget: 30s) ✓
Medium ██████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ TIMEOUT ✗ (budget: 90s)
Complex █░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ IN PROGRESS
Recovery:
• Medium: Cancel at 90s, return partial results
• Complex: Continue until 300s timeoutImplementation:
async def invoke_skill_with_timeout(skill, input_data, timeout_seconds):
try:
return await asyncio.wait_for(
invoke_skill(skill, input_data),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
print(f"Timeout after {timeout_seconds}s, returning partial results")
return {
"processed": len(input_data),
"results": [],
"error": "timeout",
"quality_score": 0.0,
}Pattern 5: Observability & Monitoring
Monitoring Strategy 1: Real-Time Progress
# Stream progress from PostgreSQL checkpoints
async def monitor_real_time():
while orchestration_running:
progress = await db.query("""
SELECT scenario_id, MAX(progress_pct), MAX(elapsed_ms)
FROM scenario_checkpoints
WHERE orchestration_id = $1
GROUP BY scenario_id
""", orchestration_id)
for scenario_id, progress_pct, elapsed_ms in progress:
bar = "█" * int(progress_pct / 5) + "░" * (20 - int(progress_pct / 5))
print(f"{scenario_id}: │{bar}│ {progress_pct:.0f}%")
await asyncio.sleep(2)Monitoring Strategy 2: Comparative Timeline
0s 10s 20s 30s 40s
Simple: |████████| (complete)
Medium: | |██████████| (in progress)
Complex: | |████| (in progress)
|─────────────────────────────────────|
Milestones:
Simple: ✓ 30% @ 0.4s ✓ 50% @ 0.6s ✓ 70% @ 0.9s ✓ 100% @ 1.2s
Medium: ✓ 30% @ 3.2s ✓ 50% @ 5.1s ⏳ 70% @ 8.3s ⏳ In progress
Complex: ✓ 30% @ 9.1s ⏳ 50% in progressMonitoring Strategy 3: Quality Trend
Quality Score (0-1)
1.0 ├─────────────────
│ Simple ████░░░░░░
0.8 ├───────────────────
│ Medium ██████░░
0.6 ├───────────────────
│ Complex ███░░░░
0.4 ├───────────────────
│
0.2 ├───────────────────
└─────────────────────
0% 30% 50% 70% 100%
ProgressPattern 6: Result Aggregation Strategies
Aggregation Type 1: Comparative (Default)
Compare metrics across all 3 scenarios:
{
"quality_comparison": {
"simple": {"latency_p95": 120, "score": 0.92},
"medium": {"latency_p95": 145, "score": 0.88},
"complex": {"latency_p95": 185, "score": 0.84}
},
"scaling_analysis": {
"quality_degradation": "8% from simple to complex",
"time_growth": "linear (as expected)",
"recommendation": "Quality acceptable, can scale to complex"
}
}Aggregation Type 2: Pattern Extraction
Find common patterns across scenarios:
{
"success_patterns": [
"Caching strategy effective at all scales",
"Batch size of 50+ preferred",
"Memory usage stays below 512MB"
],
"failure_patterns": [
"Timeout at >5000 items per batch",
"Quality drops with skewed data distribution"
]
}Aggregation Type 3: Recommendation Engine
Suggest optimal difficulty for production:
{
"recommended_difficulty": "medium",
"reasoning": [
"Simple: Insufficient load to detect bottlenecks",
"Medium: Good balance of realism and speed",
"Complex: Takes too long for frequent testing (300s)"
],
"production_scaling": {
"estimated_items_per_request": 50,
"estimated_response_time_ms": 450,
"required_concurrency_support": 10
}
}Pattern 7: Checkpointing Strategy
Checkpoint Type 1: Scenario-Level
Save progress at each scenario milestone:
INSERT INTO scenario_checkpoints (
orchestration_id, scenario_id, milestone_pct, elapsed_ms, state_snapshot
) VALUES (
'demo-001', 'medium', 30, 3200, {'items': 90, 'results': [...]}
);Use case: Resume interrupted scenario
Checkpoint Type 2: Milestone-Level
Save synchronized state across all scenarios:
INSERT INTO orchestration_milestones (
orchestration_id, milestone_pct, timestamp, simple_status, medium_status, complex_status
) VALUES (
'demo-001', 30, NOW(), 'complete', 'paused', 'in_progress'
);Use case: Track synchronization progress
Checkpoint Type 3: Full-State
Periodic snapshots for recovery:
async def checkpoint_full_state(state: ScenarioOrchestratorState):
"""Save complete state to disk."""
checkpoint_data = {
"orchestration_id": state["orchestration_id"],
"timestamp": datetime.now().isoformat(),
"progress_simple": state["progress_simple"].to_dict(),
"progress_medium": state["progress_medium"].to_dict(),
"progress_complex": state["progress_complex"].to_dict(),
}
await db.insert("full_state_checkpoints", checkpoint_data)Use case: Complete recovery from any point
Pattern 8: Cost & Performance Analysis
Cost Analysis
def estimate_orchestration_cost(
scenarios: dict[str, ScenarioDefinition]
) -> dict:
"""Estimate total execution cost."""
# LLM cost (if using Claude)
llm_cost_per_scenario = {
"simple": estimate_tokens(100) * 0.001, # ~$0.002
"medium": estimate_tokens(300) * 0.001, # ~$0.005
"complex": estimate_tokens(800) * 0.001, # ~$0.010
}
# Compute cost (if cloud)
compute_cost = {
"simple": 30 / 3600 * 0.10, # 30s @ $0.10/hour
"medium": 90 / 3600 * 0.10, # 90s @ $0.10/hour
"complex": 300 / 3600 * 0.10, # 300s @ $0.10/hour
}
# Database cost
db_cost = 0.001 # Negligible for checkpointing
return {
"llm_cost": sum(llm_cost_per_scenario.values()),
"compute_cost": sum(compute_cost.values()),
"db_cost": db_cost,
"total_cost": sum(llm_cost_per_scenario.values()) + sum(compute_cost.values()) + db_cost,
"cost_per_scenario": llm_cost_per_scenario,
}Performance Analysis
def analyze_performance(
results: dict
) -> dict:
"""Analyze orchestration performance."""
return {
"total_execution_time_minutes": (sum(r["elapsed_ms"] for r in results.values()) / 60000),
"critical_path_seconds": max(r["elapsed_ms"] for r in results.values()) / 1000,
"parallel_efficiency": (
(sum(r["elapsed_ms"] for r in results.values()) / 1000) /
(max(r["elapsed_ms"] for r in results.values()) / 1000)
),
"cost_per_quality_point": estimate_orchestration_cost({}) / avg_quality_score,
}Key Architectural Decisions
| Decision | Choice | Rationale |
|---|---|---|
| Synchronization | Milestone-based (Tier 2) | Balance between realism and demo experience |
| Input Scaling | 1x, 3x, 8x (exponential) | Exponential because most skills have overhead |
| Quality Metrics | Multiple per-skill metrics | Single metric insufficient to assess quality |
| Failure Recovery | Isolation + checkpointing | Partial results preferable to total failure |
| Monitoring | Real-time DB queries + Langfuse | Distributed state requires DB |
| Checkpoint Frequency | Every milestone + completion | Balance between safety and overhead |
| Aggregation | Comparative + recommendations | Provide actionable insights |
| Skill Abstraction | Generic orchestrator base class | Template for ANY skill |
References
langgraph-implementation.md- Python implementation detailsclaude-code-instance-management.md- Multi-terminal setupstate-machine-design.md- Detailed state transitionsskill-agnostic-template.md- Template for new skills
Claude Code Instance Management
Claude Code Instance Management: Multi-Scenario Demos
Structure 3 parallel Claude Code terminal instances for simultaneous scenario execution with shared state synchronization.
Instance Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ COORDINATOR PROCESS (Python) │
│ (Runs orchestrator graph) │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Terminal 1 │ │ Terminal 2 │ │ Terminal 3 │ │
│ │ (Simple) │ │ (Medium) │ │ (Complex) │ │
│ │ │ │ │ │ │ │
│ │ Session: │ │ Session: │ │ Session: │ │
│ │ simple-123 │ │ medium-123 │ │ complex-123│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ Claude Code instances Claude Code instances Claude Code │
│ (3 parallel processes) (3 parallel processes) instance │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ PostgreSQL Checkpoint Table │ │
│ │ (Shared state synchronization across instances) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘Setup Instructions
Step 1: Prepare the Project
Ensure your project has the orchestrator graph and shared utilities:
# At project root
mkdir -p backend/app/workflows/multi_scenario
cp src/skills/multi-scenario-orchestration/references/langgraph-implementation.py \
backend/app/workflows/multi_scenario/orchestrator.py
# Create coordinator script
cat > backend/app/workflows/multi_scenario/coordinator.py << 'EOF'
"""
Main coordinator that launches and monitors 3 Claude Code instances.
"""
import asyncio
import subprocess
import os
from pathlib import Path
PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
SCENARIOS = ["simple", "medium", "complex"]
SKILL_NAME = "your-skill-name" # Change this
async def launch_scenario_instance(scenario_id: str, orchestration_id: str):
"""Launch one Claude Code instance for a scenario."""
env = os.environ.copy()
env["SCENARIO_ID"] = scenario_id
env["ORCHESTRATION_ID"] = orchestration_id
env["PROJECT_ROOT"] = str(PROJECT_ROOT)
# Launch Claude Code instance
process = subprocess.Popen(
[
"claude", "code",
str(PROJECT_ROOT),
"--session", f"scenario-{scenario_id}-{orchestration_id}",
"--skill", SKILL_NAME,
],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
print(f"[COORDINATOR] Launched {scenario_id} instance (PID: {process.pid})")
return process
async def monitor_instances(processes: dict):
"""Monitor all instances for completion."""
while any(p.poll() is None for p in processes.values()):
for scenario_id, process in processes.items():
if process.poll() is not None:
print(f"[COORDINATOR] {scenario_id} instance completed")
await asyncio.sleep(1)
async def main():
orchestration_id = "demo-001"
print(f"[COORDINATOR] Starting orchestration {orchestration_id}")
print(f"[COORDINATOR] Launching 3 parallel instances...")
# Launch all instances
processes = {}
for scenario_id in SCENARIOS:
process = await launch_scenario_instance(scenario_id, orchestration_id)
processes[scenario_id] = process
print(f"[COORDINATOR] All instances launched. Monitoring...")
# Monitor
await monitor_instances(processes)
print(f"[COORDINATOR] All instances completed")
if __name__ == "__main__":
asyncio.run(main())
EOFStep 2: Create Scenario Runner Script
Create /backend/app/workflows/multi_scenario/run_scenario.py:
"""
Runner for single scenario. Invoked by Claude Code instance.
Sets up environment from SCENARIO_ID and ORCHESTRATION_ID env vars.
"""
import os
import asyncio
from orchestrator import (
build_scenario_orchestrator,
ScenarioOrchestratorState,
ScenarioDefinition,
ScenarioProgress,
)
from langgraph.checkpoint.postgres import PostgresSaver
async def run_scenario():
# Read from environment
scenario_id = os.getenv("SCENARIO_ID", "simple")
orchestration_id = os.getenv("ORCHESTRATION_ID", "demo-001")
project_root = os.getenv("PROJECT_ROOT", ".")
print(f"[{scenario_id.upper()}] Starting scenario execution")
print(f"[{scenario_id.upper()}] Orchestration ID: {orchestration_id}")
# Setup checkpointer
db_url = os.getenv("DATABASE_URL", "postgresql://localhost/orchestkit")
checkpointer = PostgresSaver.from_conn_string(db_url)
# Build orchestrator
app = build_scenario_orchestrator(checkpointer=checkpointer)
# Prepare scenario definitions
configs = {
"simple": {
"complexity_multiplier": 1.0,
"input_size": 100,
"time_budget_seconds": 30,
"skill_params": {"batch_size": 10, "cache_enabled": True}
},
"medium": {
"complexity_multiplier": 3.0,
"input_size": 300,
"time_budget_seconds": 90,
"skill_params": {"batch_size": 50, "cache_enabled": True}
},
"complex": {
"complexity_multiplier": 8.0,
"input_size": 800,
"time_budget_seconds": 300,
"skill_params": {"batch_size": 100, "cache_enabled": True, "parallel_workers": 4}
}
}
cfg = configs[scenario_id]
# Build initial state
initial_state: ScenarioOrchestratorState = {
"orchestration_id": orchestration_id,
"start_time_unix": int(time.time()),
"skill_name": "your-skill-name",
"skill_version": "1.0.0",
# Current scenario only
"scenario_simple": None,
"scenario_medium": None,
"scenario_complex": None,
"progress_simple": None,
"progress_medium": None,
"progress_complex": None,
}
# Set only the relevant scenario
initial_state[f"scenario_{scenario_id}"] = ScenarioDefinition(
name=scenario_id,
difficulty={"simple": "easy", "medium": "intermediate", "complex": "advanced"}[scenario_id],
complexity_multiplier=cfg["complexity_multiplier"],
input_size=cfg["input_size"],
dataset_characteristics={"distribution": "uniform"},
time_budget_seconds=cfg["time_budget_seconds"],
memory_limit_mb={"simple": 256, "medium": 512, "complex": 1024}[scenario_id],
error_tolerance={"simple": 0.0, "medium": 0.05, "complex": 0.1}[scenario_id],
skill_params=cfg["skill_params"],
expected_quality={"simple": "basic", "medium": "good", "complex": "excellent"}[scenario_id],
quality_metrics=["accuracy", "coverage"]
)
initial_state[f"progress_{scenario_id}"] = ScenarioProgress(scenario_id=scenario_id)
# Run orchestrator
config = {"configurable": {"thread_id": f"orch-{orchestration_id}"}}
print(f"[{scenario_id.upper()}] Invoking orchestrator...")
try:
# Stream progress
async for update in app.astream(initial_state, config=config, stream_mode="updates"):
if f"progress_{scenario_id}" in update:
progress = update[f"progress_{scenario_id}"]
print(f"[{scenario_id.upper()}] Progress: {progress.progress_pct:.1f}% "
f"({progress.items_processed} items, {progress.elapsed_ms}ms)")
print(f"[{scenario_id.upper()}] Scenario complete")
except Exception as e:
print(f"[{scenario_id.upper()}] Error: {e}")
raise
if __name__ == "__main__":
import time
asyncio.run(run_scenario())Execution: Three-Terminal Mode
Terminal 1: Coordinator
cd /path/to/project
python backend/app/workflows/multi_scenario/coordinator.pyOutput:
[COORDINATOR] Starting orchestration demo-001
[COORDINATOR] Launching 3 parallel instances...
[COORDINATOR] Launched simple instance (PID: 1234)
[COORDINATOR] Launched medium instance (PID: 1235)
[COORDINATOR] Launched complex instance (PID: 1236)
[COORDINATOR] All instances launched. Monitoring...Terminal 2: Simple Scenario
cd /path/to/project
export SCENARIO_ID=simple
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.pyOutput:
[SIMPLE] Starting scenario execution
[SIMPLE] Orchestration ID: demo-001
[SIMPLE] Invoking orchestrator...
[SIMPLE] Progress: 10.0% (10 items, 100ms)
[SIMPLE] Progress: 20.0% (20 items, 200ms)
...
[SIMPLE] Progress: 100.0% (100 items, 1050ms)
[SIMPLE] Scenario completeTerminal 3: Medium Scenario
cd /path/to/project
export SCENARIO_ID=medium
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.pyOutput:
[MEDIUM] Starting scenario execution
[MEDIUM] Orchestration ID: demo-001
[MEDIUM] Invoking orchestrator...
[MEDIUM] Progress: 3.3% (10 items, 100ms)
[MEDIUM] Progress: 6.7% (20 items, 200ms)
...
[MEDIUM] Progress: 100.0% (300 items, 3100ms)
[MEDIUM] Scenario completeTerminal 4 (Optional): Complex Scenario
If you have 4 terminals, run complex in parallel:
export SCENARIO_ID=complex
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.pyShared State Synchronization
PostgreSQL Checkpoint Schema
-- Create checkpoint table (run once)
CREATE TABLE IF NOT EXISTS scenario_orchestration_checkpoints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
orchestration_id VARCHAR(255) NOT NULL,
scenario_id VARCHAR(50) NOT NULL,
milestone_name VARCHAR(100),
progress_pct FLOAT,
timestamp_unix BIGINT NOT NULL,
state_snapshot JSONB,
metrics JSONB,
created_at TIMESTAMP DEFAULT NOW(),
INDEX idx_orchestration_id (orchestration_id),
INDEX idx_scenario_id (scenario_id),
INDEX idx_timestamp (timestamp_unix)
);
-- View progress across all scenarios
SELECT
orchestration_id,
scenario_id,
progress_pct,
milestone_name,
timestamp_unix,
(timestamp_unix / 1000.0) as seconds_elapsed
FROM scenario_orchestration_checkpoints
WHERE orchestration_id = 'demo-001'
ORDER BY scenario_id, progress_pct;
-- Example output:
-- orchestration_id | scenario_id | progress_pct | milestone_name | seconds_elapsed
-- demo-001 | simple | 30 | checkpoint_1 | 1.2
-- demo-001 | simple | 70 | checkpoint_2 | 2.8
-- demo-001 | simple | 100 | completion | 3.1
-- demo-001 | medium | 30 | checkpoint_1 | 3.5
-- demo-001 | medium | 70 | checkpoint_2 | 8.2
-- demo-001 | medium | 100 | completion | 9.3
-- demo-001 | complex | 30 | checkpoint_1 | 9.1
-- demo-001 | complex | 70 | checkpoint_2 | 22.5
-- demo-001 | complex | 100 | completion | 25.7Monitor Progress from Coordinator
"""Monitor script to watch progress across all instances."""
import asyncio
import time
from datetime import datetime
import psycopg2
async def monitor_orchestration(orchestration_id: str, interval: int = 2):
"""Watch progress of all scenarios."""
conn = psycopg2.connect("dbname=orchestkit user=postgres")
cursor = conn.cursor()
print(f"Monitoring orchestration {orchestration_id}...\n")
while True:
cursor.execute("""
SELECT
scenario_id,
MAX(progress_pct) as progress,
MAX(timestamp_unix) as last_update
FROM scenario_orchestration_checkpoints
WHERE orchestration_id = %s
GROUP BY scenario_id
ORDER BY scenario_id
""", (orchestration_id,))
rows = cursor.fetchall()
if not rows:
print("No progress yet...")
await asyncio.sleep(interval)
continue
# Clear screen and print progress
print(f"\r{datetime.now().strftime('%H:%M:%S')}")
print("-" * 50)
all_complete = True
for scenario_id, progress, timestamp in rows:
bar_length = int(progress / 5) # 20-char bar
bar = "█" * bar_length + "░" * (20 - bar_length)
print(f"{scenario_id:10} │{bar}│ {progress:3.0f}%")
if progress < 100:
all_complete = False
if all_complete:
print("\n✓ All scenarios complete!")
break
await asyncio.sleep(interval)
conn.close()
if __name__ == "__main__":
asyncio.run(monitor_orchestration("demo-001"))Synchronization at Milestones
To enable forced synchronization at milestones (all scenarios pause and wait):
# In run_scenario.py
async def wait_for_milestone_sync(
orchestration_id: str,
scenario_id: str,
milestone_pct: int,
timeout_seconds: int = 30
):
"""Wait for all scenarios to reach milestone."""
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
start = time.time()
while time.time() - start < timeout_seconds:
# Query checkpoint status
async with checkpointer.get_connection() as conn:
result = await conn.fetch("""
SELECT DISTINCT scenario_id, MAX(progress_pct)
FROM scenario_orchestration_checkpoints
WHERE orchestration_id = $1
GROUP BY scenario_id
""", orchestration_id)
scenarios_at_milestone = {
row["scenario_id"]: row["max"] >= milestone_pct
for row in result
}
if all(scenarios_at_milestone.values()):
print(f"[{scenario_id.upper()}] All scenarios reached {milestone_pct}%")
return True
await asyncio.sleep(0.5)
print(f"[{scenario_id.upper()}] Sync timeout at {milestone_pct}%")
return FalseAdvanced: Multi-Host Execution
For even greater parallelism, run scenarios on different machines:
# Host 1: Coordinator + Simple
python backend/app/workflows/multi_scenario/coordinator.py
# Host 2: Medium (different machine, same DB)
export DATABASE_URL="postgresql://user:pass@coordinator-host/orchestkit"
export SCENARIO_ID=medium
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.py
# Host 3: Complex (different machine, same DB)
export DATABASE_URL="postgresql://user:pass@coordinator-host/orchestkit"
export SCENARIO_ID=complex
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.pyPostgreSQL checkpoints serve as the distributed state store.
Best Practices
- Unique Orchestration IDs: Use timestamp or UUID for each demo run
- Session Isolation: Each instance gets its own Claude Code session
- Checkpointing: Always enable PostgreSQL persistence
- Monitoring: Watch progress via checkpoint table queries
- Timeout Handling: Allow asynchronous completion, don't force lock-step
- Error Recovery: Failed instances can be restarted without resetting state
Troubleshooting
Instances get stuck at milestone:
→ Increase timeout_seconds in wait_for_milestone_sync()
Database connection errors:
→ Check DATABASE_URL environment variable, ensure PostgreSQL is running
One instance much slower than others: → This is expected! Use Mode A (free-running), not lock-step. Slower instance will eventually complete.
Memory usage grows over time: → Enable checkpointing to disk, reduce batch sizes for complex scenario
Coordination Patterns
Agent Coordination Patterns
Patterns for coordinating multiple specialized agents in complex workflows.
Supervisor-Worker Pattern
from typing import Protocol, Any
import asyncio
class Agent(Protocol):
async def run(self, task: str, context: dict) -> dict: ...
class SupervisorCoordinator:
"""Central supervisor that routes tasks to worker agents."""
def __init__(self, workers: dict[str, Agent]):
self.workers = workers
self.execution_log: list[dict] = []
async def route_and_execute(
self,
task: str,
required_agents: list[str],
parallel: bool = True
) -> dict[str, Any]:
"""Route task to specified agents."""
context = {"task": task, "results": {}}
if parallel:
tasks = [
self._run_worker(name, task, context)
for name in required_agents
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return dict(zip(required_agents, results))
else:
for name in required_agents:
context["results"][name] = await self._run_worker(
name, task, context
)
return context["results"]
async def _run_worker(
self, name: str, task: str, context: dict
) -> dict:
"""Execute single worker with timeout."""
try:
result = await asyncio.wait_for(
self.workers[name].run(task, context),
timeout=30.0
)
self.execution_log.append({
"agent": name, "status": "success", "result": result
})
return result
except asyncio.TimeoutError:
return {"error": f"{name} timed out"}Conflict Resolution
async def resolve_agent_conflicts(
findings: list[dict],
llm: Any
) -> dict:
"""Resolve conflicts between agent outputs."""
conflicts = []
for i, f1 in enumerate(findings):
for f2 in findings[i+1:]:
if f1.get("recommendation") != f2.get("recommendation"):
conflicts.append((f1, f2))
if not conflicts:
return {"status": "no_conflicts", "findings": findings}
# LLM arbitration
resolution = await llm.ainvoke(f"""
Agents disagree. Determine best recommendation:
Agent 1: {conflicts[0][0]}
Agent 2: {conflicts[0][1]}
Provide: winner, reasoning, confidence (0-1)
""")
return {"status": "resolved", "resolution": resolution}Configuration
- Worker timeout: 30s default
- Max parallel agents: 8
- Retry failed agents: 1 attempt
- Log all executions for debugging
Cost Optimization
- Batch similar tasks to reduce overhead
- Cache agent results by task hash
- Use cheaper models for simple agents
- Parallelize independent agents always
Crewai Patterns
CrewAI Patterns (v1.8+)
CrewAI patterns for role-based multi-agent collaboration with Flows architecture, hierarchical crews, MCP tools, and async execution.
Version: This document covers CrewAI 1.8.x - 1.9.x (2026). For earlier versions, patterns may differ.
Table of Contents
- Flows Architecture (1.8+)
- MCP Tool Support (1.8+)
- Hierarchical Process
- Agent Configuration (1.8+)
- Task Configuration (1.8+)
- Async Execution
- Streaming Output
- Knowledge Sources (1.8+)
- Memory Configuration
- Custom Tools
- Decorator-Based Crew Definition
- Human-in-the-Loop (Flows)
- Configuration Summary
- Best Practices
- Migration from 0.x
Flows Architecture (1.8+)
Flows provide event-driven orchestration with state management. This is the major 1.x feature for complex multi-step workflows.
Basic Flow
from crewai.flow.flow import Flow, listen, start
class ResearchFlow(Flow):
@start()
def generate_topic(self):
"""Entry point - marked with @start()"""
return "AI Safety"
@listen(generate_topic)
def research_topic(self, topic):
"""Triggered when generate_topic completes"""
return f"Research findings on {topic}"
@listen(research_topic)
def summarize(self, findings):
"""Chain multiple listeners"""
return f"Summary: {findings[:100]}..."
# Execute flow
flow = ResearchFlow()
result = flow.kickoff()Structured State (Pydantic)
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
class WorkflowState(BaseModel):
topic: str = ""
research: str = ""
summary: str = ""
iteration: int = 0
class StatefulFlow(Flow[WorkflowState]):
@start()
def initialize(self):
self.state.topic = "Machine Learning"
self.state.iteration = 1
@listen(initialize)
def process(self):
self.state.research = f"Research on {self.state.topic}"
self.state.iteration += 1
return self.state.researchRouter for Conditional Branching
from crewai.flow.flow import Flow, listen, start, router
class ConditionalFlow(Flow):
@start()
def evaluate(self):
# Returns condition result
return {"score": 85, "passed": True}
@router(evaluate)
def route_result(self, result):
"""Route based on evaluation"""
if result["passed"]:
return "success"
return "retry"
@listen("success")
def handle_success(self):
return "Workflow completed successfully"
@listen("retry")
def handle_retry(self):
return "Retrying workflow..."Parallel Execution with and_/or_
from crewai.flow.flow import Flow, listen, start, and_, or_
class ParallelFlow(Flow):
@start()
def task_a(self):
return "Result A"
@start()
def task_b(self):
return "Result B"
@listen(and_(task_a, task_b))
def combine_results(self):
"""Triggers when BOTH complete"""
return "Combined results"
@listen(or_(task_a, task_b))
def first_result(self):
"""Triggers when EITHER completes"""
return "First result received"Integrating Crews with Flows
from crewai.flow.flow import Flow, listen, start
from crewai import Crew, Agent, Task
class CrewFlow(Flow):
@start()
def prepare_inputs(self):
return {"topic": "AI Agents", "depth": "detailed"}
@listen(prepare_inputs)
def run_research_crew(self, inputs):
researcher = Agent(
role="Researcher",
goal="Research the given topic thoroughly",
backstory="Expert researcher with domain knowledge"
)
task = Task(
description=f"Research {inputs['topic']} at {inputs['depth']} level",
expected_output="Comprehensive research report",
agent=researcher
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
return result.rawMCP Tool Support (1.8+)
CrewAI supports Model Context Protocol (MCP) for external tool integration.
Simple DSL (Recommended)
from crewai import Agent
# URL-based MCP server
agent = Agent(
role="Research Analyst",
goal="Research and analyze information",
backstory="Expert analyst",
mcps=[
"https://mcp.example.com/mcp?api_key=your_key",
"crewai-amp:financial-data", # CrewAI marketplace
"crewai-amp:research-tools#pubmed_search" # Specific tool
]
)Transport-Specific Configuration
from crewai import Agent
from crewai.mcp import MCPServerStdio, MCPServerHTTP, MCPServerSSE
from crewai.mcp.tool_filter import create_static_tool_filter
# Local server via stdio
agent = Agent(
role="File Analyst",
goal="Analyze local files",
backstory="File processing expert",
mcps=[
MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
tool_filter=create_static_tool_filter(
allowed_tool_names=["read_file", "list_directory"]
)
)
]
)
# Remote HTTP server
agent = Agent(
role="API Analyst",
goal="Query external APIs",
backstory="Integration specialist",
mcps=[
MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer token"},
connect_timeout=60
)
]
)
# Server-Sent Events (streaming)
agent = Agent(
role="Real-time Analyst",
goal="Monitor streaming data",
backstory="Real-time data specialist",
mcps=[
MCPServerSSE(
url="https://stream.example.com/mcp",
headers={"Authorization": "Bearer token"}
)
]
)MCPServerAdapter (Advanced)
from crewai import Agent, Crew, Task
from crewai_tools import MCPServerAdapter
# Context manager for manual connection management
with MCPServerAdapter(server_params, connect_timeout=60) as mcp_tools:
agent = Agent(
role="MCP Tool User",
goal="Use MCP tools effectively",
backstory="Tool specialist",
tools=mcp_tools,
verbose=True
)
# Or filter specific tools
filtered_tools = mcp_tools["specific_tool_name"]Hierarchical Process
from crewai import Agent, Crew, Task, Process
manager = Agent(
role="Project Manager",
goal="Coordinate team and ensure deliverables",
backstory="Senior PM with 10 years experience",
allow_delegation=True,
verbose=True
)
researcher = Agent(
role="Researcher",
goal="Find accurate information",
backstory="Expert researcher",
allow_delegation=False
)
writer = Agent(
role="Content Writer",
goal="Create compelling content",
backstory="Professional writer"
)
crew = Crew(
agents=[manager, researcher, writer],
tasks=[research_task, write_task, review_task],
process=Process.hierarchical,
manager_llm="gpt-4o", # Required for hierarchical
memory=True,
verbose=True
)Agent Configuration (1.8+)
from crewai import Agent
agent = Agent(
# Core identity
role="Senior Data Scientist",
goal="Analyze data and provide insights",
backstory="Expert with 10 years experience",
# LLM configuration
llm="gpt-4o",
function_calling_llm="gpt-4o-mini", # Cheaper model for tools
use_system_prompt=True,
# Execution control
max_iter=20,
max_rpm=100,
max_execution_time=300, # seconds
max_retry_limit=2,
# Advanced features (1.8+)
reasoning=True, # Enable reflection before tasks
max_reasoning_attempts=3,
multimodal=True, # Text and visual processing
inject_date=True,
date_format="%Y-%m-%d",
# Memory and context
memory=True,
respect_context_window=True, # Auto-summarize on limit
# Tools
tools=[tool1, tool2],
cache=True,
# Delegation
allow_delegation=True,
verbose=True
)Task Configuration (1.8+)
Structured Output
from pydantic import BaseModel
from crewai import Task
class ReportOutput(BaseModel):
title: str
summary: str
findings: list[str]
confidence: float
task = Task(
description="Analyze market trends and create report",
expected_output="Structured market analysis report",
agent=analyst,
output_pydantic=ReportOutput # Structured output
)
# Access structured result
result = crew.kickoff()
report = result.pydantic
print(report.title, report.confidence)Async Task Execution
from crewai import Task
# Parallel research tasks
research_task1 = Task(
description="Research topic A",
expected_output="Research findings",
agent=researcher,
async_execution=True # Non-blocking
)
research_task2 = Task(
description="Research topic B",
expected_output="Research findings",
agent=researcher,
async_execution=True
)
# Dependent task waits for async tasks
synthesis_task = Task(
description="Synthesize all research",
expected_output="Integrated analysis",
agent=analyst,
context=[research_task1, research_task2] # Waits for completion
)Task Guardrails (Validation)
from crewai import Task
from crewai.tasks import TaskOutput
def validate_length(result: TaskOutput) -> tuple[bool, any]:
"""Validate output meets requirements"""
if len(result.raw.split()) < 100:
return (False, "Content too brief, expand analysis")
return (True, result.raw)
task = Task(
description="Write comprehensive analysis",
expected_output="Detailed analysis (100+ words)",
agent=writer,
guardrail=validate_length,
guardrail_max_retries=3
)
# Multiple guardrails
task = Task(
description="Generate report",
expected_output="Validated report",
agent=analyst,
guardrails=[
validate_length,
validate_sources,
"Content must be objective and data-driven" # LLM-based
]
)Human Input Tasks
task = Task(
description="Review and approve recommendations",
expected_output="Approved recommendations",
agent=reviewer,
human_input=True # Pauses for human verification
)Task Callbacks
from crewai.tasks import TaskOutput
def task_callback(output: TaskOutput):
print(f"Task completed: {output.description}")
print(f"Result: {output.raw[:100]}...")
# Send notifications, log metrics, etc.
task = Task(
description="Analyze data",
expected_output="Analysis results",
agent=analyst,
callback=task_callback
)Async Execution
Async Crew Kickoff
import asyncio
from crewai import Crew
async def run_crews_parallel():
crew1 = Crew(agents=[agent1], tasks=[task1])
crew2 = Crew(agents=[agent2], tasks=[task2])
# Run multiple crews in parallel
results = await asyncio.gather(
crew1.kickoff_async(),
crew2.kickoff_async()
)
return results
# Execute
results = asyncio.run(run_crews_parallel())Async Flow Kickoff
from crewai.flow.flow import Flow, start, listen
class AsyncFlow(Flow):
@start()
async def fetch_data(self):
# Async operations supported
data = await external_api.fetch()
return data
@listen(fetch_data)
async def process_data(self, data):
result = await process_async(data)
return result
# Async execution
async def main():
flow = AsyncFlow()
result = await flow.kickoff_async()
return result
asyncio.run(main())Streaming Output
from crewai import Crew
# Enable streaming on crew
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
stream=True # Enable real-time output
)
# Stream results
result = crew.kickoff()
# Flow streaming
flow = ExampleFlow()
flow.stream = True
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
final_result = streaming.resultKnowledge Sources (1.8+)
from crewai import Agent, Crew
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
from crewai.knowledge.knowledge_config import KnowledgeConfig
# String knowledge
company_info = StringKnowledgeSource(
content="Company policies and guidelines..."
)
# PDF knowledge
docs = PDFKnowledgeSource(file_paths=["manual.pdf", "guide.pdf"])
# Web knowledge
web_source = CrewDoclingSource(
file_paths=["https://example.com/docs"]
)
# Configure retrieval
config = KnowledgeConfig(
results_limit=10, # Documents returned (default: 3)
score_threshold=0.5 # Relevance minimum (default: 0.35)
)
# Agent-level knowledge
agent = Agent(
role="Support Agent",
goal="Answer questions using company knowledge",
backstory="Expert support representative",
knowledge_sources=[company_info]
)
# Crew-level knowledge (all agents)
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
knowledge_sources=[docs, web_source]
)Memory Configuration
from crewai import Crew
from crewai.memory import ShortTermMemory, LongTermMemory, EntityMemory
# Simple memory
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
memory=True # Enable all memory types
)
# Custom memory configuration
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
short_term_memory=ShortTermMemory(),
long_term_memory=LongTermMemory(
storage=ChromaStorage(collection_name="crew_memory")
),
entity_memory=EntityMemory()
)Custom Tools
from crewai.tools import tool
@tool("Search Database")
def search_database(query: str) -> str:
"""Search the internal database for relevant information.
Args:
query: The search query string
"""
results = db.search(query)
return json.dumps(results)
# Async tool
@tool("Fetch API Data")
async def fetch_api_data(endpoint: str) -> str:
"""Fetch data from external API asynchronously.
Args:
endpoint: API endpoint to query
"""
async with aiohttp.ClientSession() as session:
async with session.get(endpoint) as response:
return await response.text()
# Assign tools to agent
researcher = Agent(
role="Researcher",
goal="Find accurate information",
backstory="Expert researcher",
tools=[search_database, fetch_api_data],
verbose=True
)Decorator-Based Crew Definition (Recommended)
from crewai import Agent, Crew, Task, CrewBase, agent, task, crew
@CrewBase
class ResearchCrew:
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
tools=[search_tool]
)
@agent
def analyst(self) -> Agent:
return Agent(config=self.agents_config['analyst'])
@task
def research_task(self) -> Task:
return Task(config=self.tasks_config['research'])
@task
def analysis_task(self) -> Task:
return Task(
config=self.tasks_config['analysis'],
context=[self.research_task()]
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents, # Auto-collected
tasks=self.tasks, # Auto-collected
process=Process.sequential
)
# Execute
result = ResearchCrew().crew().kickoff(inputs={"topic": "AI Safety"})Human-in-the-Loop (Flows)
from crewai.flow.flow import Flow, listen, start
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Do you approve this content?",
emit=["approved", "rejected"],
llm="gpt-4o-mini"
)
def generate_content(self):
return "Content for human review..."
@listen("approved")
def handle_approval(self, result: HumanFeedbackResult):
print(f"Approved with feedback: {result.feedback}")
return "Processing approved content"
@listen("rejected")
def handle_rejection(self, result: HumanFeedbackResult):
print(f"Rejected: {result.feedback}")
return "Revising content"Configuration Summary
| Feature | Parameter | Default |
|---|---|---|
| Process types | process | sequential, hierarchical |
| Manager LLM | manager_llm | Required for hierarchical |
| Memory | memory | False |
| Streaming | stream | False |
| Verbose | verbose | False |
| Max RPM | max_rpm | Unlimited |
| Planning | planning | False |
Best Practices
- Use Flows for complex workflows: Multi-step processes benefit from Flows architecture
- Prefer decorator-based definition: Use
@CrewBasefor maintainable crew definitions - Leverage MCP for external tools: Use the simple DSL for quick MCP integration
- Enable structured outputs: Use
output_pydanticfor type-safe results - Add guardrails: Validate outputs with function or LLM-based guardrails
- Use async for parallel work:
async_execution=Truefor independent tasks - Configure knowledge sources: Add crew/agent-level knowledge for context
- Role clarity: Each agent has distinct, non-overlapping role
- Task granularity: One clear deliverable per task
- Memory scope: Use short-term for session, long-term for persistent knowledge
Migration from 0.x
| 0.x Pattern | 1.8+ Pattern |
|---|---|
| Manual agent/task lists | @CrewBase with @agent, @task decorators |
| Synchronous only | Async support with kickoff_async() |
| No streaming | stream=True parameter |
| Basic tools | MCP integration with mcps parameter |
| No validation | Task guardrails |
| No flow control | Flows with @start, @listen, @router |
Framework Comparison
Framework Comparison
Decision matrix for choosing between multi-agent frameworks.
Feature Comparison
| Feature | LangGraph | CrewAI | OpenAI SDK | MS Agent |
|---|---|---|---|---|
| State Management | Excellent | Good | Basic | Good |
| Persistence | Built-in | Plugin | Manual | Built-in |
| Streaming | Native | Limited | Native | Native |
| Human-in-Loop | Native | Manual | Manual | Native |
| Memory | Via Store | Built-in | Manual | Manual |
| Observability | Langfuse/LangSmith | Limited | Tracing | Azure Monitor |
| Learning Curve | Steep | Easy | Medium | Medium |
| Production Ready | Yes | Yes | Yes | Q1 2026 |
Use Case Matrix
| Use Case | Best Framework | Why |
|---|---|---|
| Complex state machines | LangGraph | Native StateGraph, persistence |
| Role-based teams | CrewAI | Built-in delegation, backstories |
| OpenAI-only projects | OpenAI SDK | Native integration, handoffs |
| Enterprise/compliance | MS Agent | Azure integration, A2A |
| Research/experiments | AG2 | Open-source, flexible |
| Quick prototypes | CrewAI | Minimal boilerplate |
| Long-running workflows | LangGraph | Checkpointing, recovery |
| Customer support bots | OpenAI SDK | Handoffs, guardrails |
Decision Tree
Start
|
+-- Need complex state machines?
| |
| +-- Yes --> LangGraph
| |
| +-- No
| |
+-- Role-based collaboration?
| |
| +-- Yes --> CrewAI
| |
| +-- No
| |
+-- OpenAI ecosystem only?
| |
| +-- Yes --> OpenAI Agents SDK
| |
| +-- No
| |
+-- Enterprise requirements?
| |
| +-- Yes --> Microsoft Agent Framework
| |
| +-- No
| |
+-- Open-source priority?
|
+-- Yes --> AG2
|
+-- No --> LangGraph (default)Migration Paths
From AutoGen to MS Agent Framework
# AutoGen 0.2 (old)
from autogen import AssistantAgent, UserProxyAgent
agent = AssistantAgent(name="assistant", llm_config=config)
# MS Agent Framework (new)
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(model="gpt-5.2")
agent = AssistantAgent(name="assistant", model_client=model_client)From Custom to LangGraph
# Custom orchestration (old)
async def workflow(task):
step1 = await agent1.run(task)
step2 = await agent2.run(step1)
return step2
# LangGraph (new)
from langgraph.graph import StateGraph
workflow = StateGraph(State)
workflow.add_node("agent1", agent1_node)
workflow.add_node("agent2", agent2_node)
workflow.add_edge("agent1", "agent2")Cost Considerations
| Framework | Licensing | Infra Cost | LLM Cost |
|---|---|---|---|
| LangGraph | MIT | Self-host / LangGraph Cloud | Any LLM |
| CrewAI | MIT | Self-host | Any LLM |
| OpenAI SDK | MIT | Self-host | OpenAI only |
| MS Agent | MIT | Self-host / Azure | Any LLM |
| AG2 | Apache 2.0 | Self-host | Any LLM |
Performance Characteristics
| Framework | Cold Start | Latency | Throughput |
|---|---|---|---|
| LangGraph | ~100ms | Low | High |
| CrewAI | ~200ms | Medium | Medium |
| OpenAI SDK | ~50ms | Low | High |
| MS Agent | ~150ms | Medium | High |
Team Expertise Requirements
| Framework | Python | LLM | Infra |
|---|---|---|---|
| LangGraph | Expert | Expert | Medium |
| CrewAI | Beginner | Beginner | Low |
| OpenAI SDK | Medium | Medium | Low |
| MS Agent | Medium | Medium | High |
Recommendation Summary
- Default choice: LangGraph (most capable, production-proven)
- Fastest to prototype: CrewAI (minimal code, intuitive)
- OpenAI shops: OpenAI Agents SDK (native integration)
- Enterprise: Microsoft Agent Framework (compliance, Azure)
- Research: AG2 (open community, experimental features)
Gpt 5 2 Codex
GPT-5.2-Codex
OpenAI's specialized agentic coding model (January 2026) optimized for long-horizon software engineering tasks.
Overview
GPT-5.2-Codex is a specialized variant of GPT-5.2 purpose-built for agentic coding workflows. Unlike the general-purpose GPT-5.2, Codex is optimized for:
- Extended autonomous operation: Hours-long coding sessions without degradation
- Context compaction: Intelligent summarization for long-running tasks
- Project-scale understanding: Full codebase comprehension and refactoring
- Tool reliability: Deterministic file operations and terminal commands
Key Differences from GPT-5.2
| Capability | GPT-5.2 | GPT-5.2-Codex |
|---|---|---|
| Context Window | 256K tokens | 256K + compaction |
| Session Duration | Single request | Hours/days |
| Tool Execution | General | Code-optimized |
| File Operations | Basic | Atomic, rollback-aware |
| Terminal Access | Sandboxed | Full with safety rails |
| Vision | General | Code/diagram-aware |
| Cost per 1M tokens | $2.50/$10 | $5.00/$20 |
Key Capabilities
Long-Horizon Work Through Context Compaction
Codex automatically compacts context during extended sessions, preserving critical information while discarding ephemeral details.
from openai import OpenAI
client = OpenAI()
# Codex maintains context across many tool calls
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{"role": "system", "content": "You are a senior software engineer."},
{"role": "user", "content": "Refactor the authentication module to use JWT."}
],
# Codex-specific parameters
extra_body={
"codex_config": {
"compaction_strategy": "semantic", # semantic, aggressive, minimal
"preserve_file_state": True,
"max_session_hours": 8
}
}
)Compaction Strategies:
| Strategy | Use Case | Retention |
|---|---|---|
semantic | General development | Code structure, decisions, errors |
aggressive | Very long tasks | Only current focus + critical history |
minimal | Short tasks | Full context, no compaction |
Project-Scale Tasks
Codex excels at large-scale operations that span entire codebases:
from agents import Agent, tool
# Define codebase navigation tools
@tool
def search_codebase(query: str, file_types: list[str] = None) -> str:
"""Search across the entire codebase for patterns or definitions."""
# Implementation
pass
@tool
def apply_refactor(pattern: str, replacement: str, scope: str = "project") -> dict:
"""Apply a refactoring pattern across multiple files with preview."""
# Returns affected files and changes for approval
pass
codex_agent = Agent(
name="refactor-engineer",
model="gpt-5.2-codex",
instructions="""You are a senior engineer performing large-scale refactors.
Guidelines:
1. Analyze impact before changes
2. Create rollback points
3. Run tests after each file change
4. Document breaking changes""",
tools=[search_codebase, apply_refactor]
)Supported Project Tasks:
- Full codebase migrations (Python 2 to 3, React class to hooks)
- Dependency upgrades with breaking change resolution
- Architecture refactors (monolith to microservices)
- Test coverage expansion across modules
- Security vulnerability remediation
Enhanced Cybersecurity Capabilities
Codex includes specialized training for security-aware coding:
# Security-focused agent configuration
security_agent = Agent(
name="security-engineer",
model="gpt-5.2-codex",
instructions="""You are a security engineer. When writing or reviewing code:
1. Identify OWASP Top 10 vulnerabilities
2. Check for secrets/credentials in code
3. Validate input sanitization
4. Review authentication/authorization flows
5. Check dependency vulnerabilities via CVE databases""",
extra_config={
"security_mode": True, # Enables security-focused reasoning
"cve_lookup": True # Real-time CVE database access
}
)Security Capabilities:
| Feature | Description |
|---|---|
| Vulnerability Detection | SAST-like scanning during code review |
| CVE Awareness | Real-time vulnerability database lookups |
| Secrets Detection | Identifies hardcoded credentials, API keys |
| Threat Modeling | Suggests security improvements |
| Compliance Hints | GDPR, HIPAA, SOC2 pattern recognition |
Vision for Code Artifacts
Codex processes visual inputs with code-aware understanding:
import base64
# Process architecture diagram
with open("architecture.png", "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode("utf-8")
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": "Implement the microservices shown in this architecture diagram."
},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_data}"}
}
]
}
]
)Vision Use Cases:
- Architecture diagrams to code scaffolding
- UI mockups to component implementation
- Error screenshots to debugging steps
- Whiteboard sketches to technical specs
- Database schemas (ERD) to migrations
Benchmark Performance
GPT-5.2-Codex achieves state-of-the-art results on coding benchmarks:
| Benchmark | GPT-5.2 | GPT-5.2-Codex | Previous SOTA |
|---|---|---|---|
| SWE-Bench Pro | 61.2% | 78.4% | 68.1% (Claude Opus 4.6) |
| Terminal-Bench 2.0 | 72.8% | 89.3% | 81.2% (Gemini 2.5) |
| HumanEval+ | 94.1% | 96.8% | 95.2% (GPT-5.2) |
| MBPP+ | 89.7% | 93.2% | 91.4% (Claude Opus 4.6) |
| CodeContests | 45.2% | 58.7% | 52.3% (Gemini 2.5) |
SWE-Bench Pro Notes:
- Tests real GitHub issues requiring multi-file changes
- Codex excels at test-writing and edge case handling
- Strong performance on legacy codebase understanding
Terminal-Bench 2.0 Notes:
- Tests long-horizon terminal tasks (setup, deploy, debug)
- Codex maintains coherent state across 50+ commands
- Superior error recovery and alternative path exploration
API Usage Patterns
Basic Completion
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{
"role": "system",
"content": "You are an expert Python developer."
},
{
"role": "user",
"content": "Write a connection pool manager with health checks."
}
],
temperature=0.2, # Lower temperature for code generation
max_tokens=4096
)
print(response.choices[0].message.content)Streaming with Tool Use
import json
# Define tools
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read contents of a file",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "Write contents to a file",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"content": {"type": "string", "description": "File content"}
},
"required": ["path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "run_command",
"description": "Execute a shell command",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Command to run"},
"timeout": {"type": "integer", "description": "Timeout in seconds"}
},
"required": ["command"]
}
}
}
]
# Streaming with tool calls
stream = client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{"role": "user", "content": "Set up a new FastAPI project with tests"}
],
tools=tools,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.tool_calls:
tool_call = chunk.choices[0].delta.tool_calls[0]
print(f"Tool: {tool_call.function.name}")
elif chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")Async Operations
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def refactor_module(module_path: str) -> str:
response = await async_client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{
"role": "system",
"content": "Refactor code for readability and performance."
},
{
"role": "user",
"content": f"Refactor the module at {module_path}"
}
],
extra_body={
"codex_config": {
"preserve_behavior": True,
"add_type_hints": True
}
}
)
return response.choices[0].message.content
# Parallel refactoring
async def refactor_project(modules: list[str]):
tasks = [refactor_module(m) for m in modules]
results = await asyncio.gather(*tasks)
return dict(zip(modules, results))Integration with Agents SDK
GPT-5.2-Codex integrates seamlessly with OpenAI Agents SDK 0.6.x:
from agents import Agent, Runner, handoff, tool
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX
# File operation tools
@tool
def read_file(path: str) -> str:
"""Read file contents."""
with open(path) as f:
return f.read()
@tool
def write_file(path: str, content: str) -> str:
"""Write content to file."""
with open(path, "w") as f:
f.write(content)
return f"Wrote {len(content)} bytes to {path}"
@tool
def run_tests(path: str = ".") -> str:
"""Run pytest on the specified path."""
import subprocess
result = subprocess.run(
["pytest", path, "-v", "--tb=short"],
capture_output=True,
text=True
)
return f"Exit code: {result.returncode}\n{result.stdout}\n{result.stderr}"
# Specialized agents using Codex
architect_agent = Agent(
name="architect",
model="gpt-5.2-codex",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a software architect. Analyze requirements and design solutions.
Hand off to implementer for coding tasks.
Hand off to reviewer for code review.""",
tools=[read_file]
)
implementer_agent = Agent(
name="implementer",
model="gpt-5.2-codex",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a senior developer. Implement designs with clean, tested code.
Hand off to reviewer when implementation is complete.""",
tools=[read_file, write_file, run_tests]
)
reviewer_agent = Agent(
name="reviewer",
model="gpt-5.2-codex",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a code reviewer. Check for bugs, security issues, and style.
Request changes from implementer if needed.
Approve and hand back to architect when satisfied.""",
tools=[read_file]
)
# Wire up handoffs
architect_agent.handoffs = [
handoff(agent=implementer_agent),
handoff(agent=reviewer_agent)
]
implementer_agent.handoffs = [
handoff(agent=reviewer_agent),
handoff(agent=architect_agent)
]
reviewer_agent.handoffs = [
handoff(agent=implementer_agent),
handoff(agent=architect_agent)
]
# Run development workflow
async def develop_feature(requirement: str):
runner = Runner()
result = await runner.run(
architect_agent,
f"Design and implement: {requirement}"
)
return result.final_outputRealtimeRunner for Interactive Sessions
from agents import Agent
from agents.realtime import RealtimeRunner
# Interactive coding session
codex_agent = Agent(
name="pair-programmer",
model="gpt-5.2-codex",
instructions="You are a pair programming partner. Help write and debug code."
)
async def interactive_session():
async with RealtimeRunner(codex_agent) as runner:
# Continuous conversation with context preservation
while True:
user_input = input("> ")
if user_input == "exit":
break
async for chunk in runner.stream(user_input):
print(chunk.content, end="", flush=True)
print()IDE Integrations
GPT-5.2-Codex powers several IDE integrations:
Cursor
// .cursor/settings.json
{
"ai.model": "gpt-5.2-codex",
"ai.features": {
"composer": true,
"agent": true,
"codebaseIndexing": true
},
"ai.codex": {
"sessionDuration": "8h",
"compactionStrategy": "semantic"
}
}Cursor Features with Codex:
- Composer for multi-file generation
- Agent mode for autonomous tasks
- Background indexing for codebase awareness
- Inline completions with project context
Windsurf (Codeium)
# .windsurf/config.yaml
model:
provider: openai
name: gpt-5.2-codex
cascade:
enabled: true
max_depth: 10
auto_apply: false # Review changes before applying
features:
flows: true # Multi-step guided workflows
supercomplete: true
terminal_agent: trueWindsurf Features:
- Cascade for chained operations
- Flows for guided development
- Terminal integration for full-stack tasks
GitHub Copilot Workspace
# .github/copilot-workspace.yml
model: gpt-5.2-codex
workspace:
scope: repository
features:
- issue-to-pr
- multi-file-edit
- test-generation
review:
auto_suggest: true
security_scan: trueCopilot Workspace Features:
- Issue-to-PR automation
- Multi-repository awareness
- Integrated CI feedback
Factory (VSCode Extension)
// .vscode/settings.json
{
"factory.model": "gpt-5.2-codex",
"factory.drafter": {
"enabled": true,
"autoContext": true
},
"factory.pilot": {
"enabled": true,
"approvalRequired": true
}
}When to Use Codex vs Standard GPT-5.2
Use GPT-5.2-Codex When:
| Scenario | Why Codex |
|---|---|
| Multi-file refactors | Project-scale context management |
| Long debugging sessions | Context compaction prevents degradation |
| Security reviews | Specialized vulnerability detection |
| Test generation at scale | Understands test patterns across codebase |
| Architecture migrations | Maintains coherence across many changes |
| CI/CD pipeline work | Terminal-optimized tool execution |
Use Standard GPT-5.2 When:
| Scenario | Why Standard |
|---|---|
| Single-file tasks | No need for compaction overhead |
| Code explanation | General language understanding sufficient |
| Quick prototypes | Faster, cheaper for short tasks |
| Non-code tasks | Writing docs, emails, general Q&A |
| Cost-sensitive workloads | 50% cheaper than Codex |
Decision Matrix
Task Duration > 1 hour?
|
+-- Yes --> GPT-5.2-Codex
|
+-- No
|
+-- Multiple files affected?
| |
| +-- Yes --> GPT-5.2-Codex
| |
| +-- No
| |
| +-- Security review needed?
| | |
| | +-- Yes --> GPT-5.2-Codex
| | |
| | +-- No --> GPT-5.2 (standard)Pricing Considerations
Token Pricing (January 2026)
| Model | Input (per 1M) | Output (per 1M) | Cached Input |
|---|---|---|---|
| gpt-5.2 | $2.50 | $10.00 | $1.25 |
| gpt-5.2-codex | $5.00 | $20.00 | $2.50 |
| gpt-5.2-mini | $0.15 | $0.60 | $0.075 |
Cost Optimization Strategies
# 1. Use caching for repeated context
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=messages,
extra_body={
"cache_control": {
"system_prompt": "ephemeral", # Cache system prompt
"file_contents": "persistent" # Cache file reads
}
}
)
# 2. Batch similar operations
# Instead of separate calls per file:
files_to_refactor = ["auth.py", "users.py", "api.py"]
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{"role": "user", "content": f"Refactor these files: {files_to_refactor}"}
]
)
# 3. Use gpt-5.2-mini for preprocessing
# Filter/classify tasks before sending to Codex
classification = client.chat.completions.create(
model="gpt-5.2-mini",
messages=[{"role": "user", "content": f"Is this task complex? {task}"}]
)
if "complex" in classification.choices[0].message.content.lower():
# Use Codex for complex tasks
use_model = "gpt-5.2-codex"
else:
# Use standard for simple tasks
use_model = "gpt-5.2"Estimated Costs by Task Type
| Task | Est. Tokens | Codex Cost | Standard Cost |
|---|---|---|---|
| Single file fix | ~5K | $0.12 | $0.06 |
| Module refactor | ~50K | $1.25 | $0.63 |
| Full codebase migration | ~500K | $12.50 | $6.25 |
| 8-hour dev session | ~2M | $50.00 | $25.00 |
Note: Codex typically requires fewer iterations for complex tasks, often making total cost comparable to standard GPT-5.2.
Configuration Reference
Codex-Specific Parameters
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=messages,
extra_body={
"codex_config": {
# Context management
"compaction_strategy": "semantic", # semantic, aggressive, minimal
"preserve_file_state": True, # Remember file contents
"max_session_hours": 8, # Session duration limit
# Code behavior
"preserve_behavior": True, # Ensure refactors don't change behavior
"add_type_hints": True, # Add type hints when refactoring
"follow_style_guide": "project", # project, google, pep8
# Safety
"security_mode": True, # Enable security scanning
"dry_run": False, # Preview changes without applying
"require_tests": True, # Require test coverage for changes
# Tools
"shell_timeout": 300, # Max seconds for shell commands
"file_size_limit": 1048576 # Max file size to read (1MB)
}
}
)Best Practices
- Start with clear goals: Define what "done" looks like upfront
- Provide project context: Include README, architecture docs, coding standards
- Use semantic compaction: Best balance of context and performance
- Enable security mode: Catch vulnerabilities during development
- Set session limits: Prevent runaway costs with
max_session_hours - Review before applying: Use
dry_runfor large refactors - Batch related operations: Reduce API calls by grouping similar tasks
- Cache file contents: Use persistent caching for frequently read files
Related Resources
Langgraph Implementation
LangGraph Implementation: Multi-Scenario Orchestration
Complete Python implementation of the multi-scenario orchestration pattern using LangGraph 1.0.6+.
1. State Definition
from typing import TypedDict, Annotated, Literal
from dataclasses import dataclass, field, asdict
from operator import add
import time
from datetime import datetime
@dataclass
class ScenarioProgress:
"""Track execution state for one scenario."""
scenario_id: str
status: Literal["pending", "running", "paused", "complete", "failed"]
progress_pct: float = 0.0
# Milestones
milestones_reached: list[str] = field(default_factory=list)
current_milestone: str = "start"
# Timing
start_time_ms: int = 0
elapsed_ms: int = 0
elapsed_checkpoints: dict = field(default_factory=dict) # {milestone: time_ms}
# Metrics
memory_used_mb: int = 0
items_processed: int = 0
batch_count: int = 0
# Results
partial_results: list[dict] = field(default_factory=list)
quality_scores: dict = field(default_factory=dict)
# Errors
errors: list[dict] = field(default_factory=list)
def to_dict(self):
return asdict(self)
@dataclass
class ScenarioDefinition:
"""Configuration for one scenario."""
name: str # "simple", "medium", "complex"
difficulty: Literal["easy", "intermediate", "advanced"]
complexity_multiplier: float # 1.0, 3.0, 8.0
# Inputs
input_size: int
dataset_characteristics: dict # {"distribution": "uniform"}
# Constraints
time_budget_seconds: int
memory_limit_mb: int
error_tolerance: float # 0-1
# Skill params
skill_params: dict
# Expectations
expected_quality: Literal["basic", "good", "excellent"]
quality_metrics: list[str]
def to_dict(self):
return asdict(self)
class ScenarioOrchestratorState(TypedDict, total=False):
"""State for the entire orchestration."""
# Orchestration metadata
orchestration_id: str
start_time_unix: int
skill_name: str
skill_version: str
# Scenario definitions
scenario_simple: ScenarioDefinition
scenario_medium: ScenarioDefinition
scenario_complex: ScenarioDefinition
# Progress tracking
progress_simple: ScenarioProgress
progress_medium: ScenarioProgress
progress_complex: ScenarioProgress
# Synchronization
sync_points: dict # {milestone: bool}
last_sync_time: int
# Aggregated results
final_results: dict2. Node Implementations
Supervisor Node
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send
async def scenario_supervisor(state: ScenarioOrchestratorState) -> list[Command]:
"""
Route to all 3 scenarios in parallel.
Returns Send commands that trigger parallel execution.
"""
print(f"[SUPERVISOR] Starting orchestration {state['orchestration_id']}")
# Initialize progress for each scenario
for scenario_id in ["simple", "medium", "complex"]:
progress = ScenarioProgress(
scenario_id=scenario_id,
status="pending",
start_time_ms=int(time.time() * 1000)
)
state[f"progress_{scenario_id}"] = progress
# Return Send commands for parallel execution
return [
Send("scenario_worker", {"scenario_id": "simple", **state}),
Send("scenario_worker", {"scenario_id": "medium", **state}),
Send("scenario_worker", {"scenario_id": "complex", **state}),
]
async def scenario_worker(state: ScenarioOrchestratorState) -> dict:
"""
Execute one scenario (simple, medium, or complex).
Receives scenario_id from supervisor via Send.
"""
scenario_id = state.get("scenario_id")
progress = state[f"progress_{scenario_id}"]
scenario_def = state[f"scenario_{scenario_id}"]
print(f"[SCENARIO {scenario_id.upper()}] Starting ({scenario_def.complexity_multiplier}x complexity)")
progress.status = "running"
progress.start_time_ms = int(time.time() * 1000)
try:
# Execute skill for this scenario
result = await execute_skill_with_milestones(
skill_name=state["skill_name"],
scenario_def=scenario_def,
progress=progress,
state=state
)
progress.status = "complete"
progress.elapsed_ms = int(time.time() * 1000) - progress.start_time_ms
progress.partial_results.append(result)
print(f"[SCENARIO {scenario_id.upper()}] Complete in {progress.elapsed_ms}ms")
return {f"progress_{scenario_id}": progress}
except Exception as e:
progress.status = "failed"
progress.errors.append({
"timestamp": datetime.now().isoformat(),
"message": str(e),
"severity": "error"
})
print(f"[SCENARIO {scenario_id.upper()}] Failed: {e}")
return {f"progress_{scenario_id}": progress}
async def execute_skill_with_milestones(
skill_name: str,
scenario_def: ScenarioDefinition,
progress: ScenarioProgress,
state: ScenarioOrchestratorState
) -> dict:
"""
Execute skill, recording milestones and checkpoints.
This is where you call YOUR SKILL.
"""
milestones = [0, 30, 50, 70, 90, 100] # Percentage checkpoints
results = {"batches": [], "quality": {}}
input_items = generate_test_data(
size=scenario_def.input_size,
characteristics=scenario_def.dataset_characteristics
)
batch_size = scenario_def.skill_params.get("batch_size", 10)
for batch_idx, batch in enumerate(chunks(input_items, batch_size)):
# Execute skill on this batch
# Replace this with your actual skill invocation
batch_result = await invoke_skill(
skill_name=skill_name,
input_data=batch,
params=scenario_def.skill_params
)
results["batches"].append(batch_result)
progress.batch_count += 1
progress.items_processed += len(batch)
# Update progress percentage
progress.progress_pct = (progress.items_processed / scenario_def.input_size) * 100
# Check if we've reached a milestone
reached_milestones = [m for m in milestones if m <= progress.progress_pct]
new_milestones = [m for m in reached_milestones if m not in progress.milestones_reached]
for milestone in new_milestones:
progress.milestones_reached.append(milestone)
elapsed = int(time.time() * 1000) - progress.start_time_ms
progress.elapsed_checkpoints[f"milestone_{milestone}"] = elapsed
print(f" [{progress.scenario_id}] Reached {milestone}% at {elapsed}ms")
# Optional: Wait for other scenarios at major milestones
if milestone in [30, 70]:
await synchronize_at_milestone(milestone, state)
# Score results
results["quality"] = calculate_quality_metrics(results["batches"], scenario_def.quality_metrics)
progress.quality_scores = results["quality"]
return resultsSynchronization Node
async def synchronize_at_milestone(
milestone_pct: int,
state: ScenarioOrchestratorState,
timeout_seconds: int = 30
) -> bool:
"""
Optional: Wait for other scenarios at major milestones.
Returns True if all scenarios reached milestone, False if timeout.
"""
start = time.time()
milestone_key = f"checkpoint_{milestone_pct}"
while time.time() - start < timeout_seconds:
simple_at_milestone = milestone_pct in state["progress_simple"].milestones_reached
medium_at_milestone = milestone_pct in state["progress_medium"].milestones_reached
complex_at_milestone = milestone_pct in state["progress_complex"].milestones_reached
all_reached = simple_at_milestone and medium_at_milestone and complex_at_milestone
if all_reached:
state["sync_points"][milestone_key] = True
print(f"[SYNC] All scenarios reached {milestone_pct}%")
return True
# Check if any scenario failed
if any(state[f"progress_{s}"].status == "failed" for s in ["simple", "medium", "complex"]):
print(f"[SYNC] A scenario failed, proceeding without sync")
return False
await asyncio.sleep(0.5)
print(f"[SYNC] Timeout at {milestone_pct}%, proceeding")
return FalseAggregator Node
async def scenario_aggregator(state: ScenarioOrchestratorState) -> dict:
"""
Collect all scenario results and synthesize findings.
"""
print("[AGGREGATOR] Combining results from all scenarios")
aggregated = {
"orchestration_id": state["orchestration_id"],
"skill": state["skill_name"],
"timestamp": datetime.now().isoformat(),
# Raw results
"results_by_scenario": {
"simple": state["progress_simple"].partial_results[-1] if state["progress_simple"].partial_results else {},
"medium": state["progress_medium"].partial_results[-1] if state["progress_medium"].partial_results else {},
"complex": state["progress_complex"].partial_results[-1] if state["progress_complex"].partial_results else {},
},
# Metrics
"metrics": {},
# Comparison
"comparison": {},
# Recommendations
"recommendations": []
}
# Calculate comparative metrics
for scenario_id in ["simple", "medium", "complex"]:
progress = state[f"progress_{scenario_id}"]
aggregated["metrics"][scenario_id] = {
"elapsed_ms": progress.elapsed_ms,
"items_processed": progress.items_processed,
"quality_scores": progress.quality_scores,
"errors": len(progress.errors)
}
# Compare quality vs. complexity
simple_quality = state["progress_simple"].quality_scores.get("overall", 0)
medium_quality = state["progress_medium"].quality_scores.get("overall", 0)
complex_quality = state["progress_complex"].quality_scores.get("overall", 0)
aggregated["comparison"]["quality_ranking"] = {
"best": max(
("simple", simple_quality),
("medium", medium_quality),
("complex", complex_quality),
key=lambda x: x[1]
)[0],
"scores": {
"simple": simple_quality,
"medium": medium_quality,
"complex": complex_quality
}
}
# Time complexity analysis
simple_time = state["progress_simple"].elapsed_ms
medium_time = state["progress_medium"].elapsed_ms
complex_time = state["progress_complex"].elapsed_ms
simple_size = 100 * 1.0
medium_size = 100 * 3.0
complex_size = 100 * 8.0
aggregated["comparison"]["time_per_item_ms"] = {
"simple": simple_time / simple_size,
"medium": medium_time / medium_size,
"complex": complex_time / complex_size,
}
# Identify scaling issues
if complex_time / complex_size > simple_time / simple_size * 2:
aggregated["recommendations"].append("Sublinear scaling—excellent performance with increased load")
elif complex_time / complex_size < simple_time / simple_size * 0.8:
aggregated["recommendations"].append("Superlinear scaling—overhead increases with load")
# Success patterns
success_patterns = []
for scenario_id in ["simple", "medium", "complex"]:
if state[f"progress_{scenario_id}"].status == "complete" and state[f"progress_{scenario_id}"].errors == []:
success_patterns.append(scenario_id)
aggregated["recommendations"].append(f"Successful in all scenarios: {', '.join(success_patterns)}")
return {"final_results": aggregated}3. Graph Construction
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import Command
def build_scenario_orchestrator(
checkpointer: PostgresSaver | None = None
) -> Any:
"""
Build the complete orchestration graph.
"""
graph = StateGraph(ScenarioOrchestratorState)
# Nodes
graph.add_node("supervisor", scenario_supervisor)
graph.add_node("scenario_worker", scenario_worker)
graph.add_node("aggregator", scenario_aggregator)
# Edges
graph.add_edge(START, "supervisor")
# Fan-out: supervisor sends to 3 parallel workers
graph.add_conditional_edges(
"supervisor",
lambda _: ["scenario_worker", "scenario_worker", "scenario_worker"]
)
# Workers converge at aggregator
graph.add_edge("scenario_worker", "aggregator")
graph.add_edge("aggregator", END)
# Compile with checkpointing
return graph.compile(checkpointer=checkpointer)4. Invocation Example
import asyncio
import uuid
from langgraph.checkpoint.postgres import PostgresSaver
async def main():
# Setup checkpointing
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:password@localhost/orchestkit"
)
# Build orchestrator
app = build_scenario_orchestrator(checkpointer=checkpointer)
# Prepare initial state
initial_state: ScenarioOrchestratorState = {
"orchestration_id": f"demo-{uuid.uuid4().hex[:8]}",
"start_time_unix": int(time.time()),
"skill_name": "your-skill-name",
"skill_version": "1.0.0",
# Scenarios
"scenario_simple": ScenarioDefinition(
name="simple",
difficulty="easy",
complexity_multiplier=1.0,
input_size=100,
dataset_characteristics={"distribution": "uniform"},
time_budget_seconds=30,
memory_limit_mb=256,
error_tolerance=0.0,
skill_params={"batch_size": 10, "cache_enabled": True},
expected_quality="basic",
quality_metrics=["accuracy", "coverage"]
),
"scenario_medium": ScenarioDefinition(
name="medium",
difficulty="intermediate",
complexity_multiplier=3.0,
input_size=300,
dataset_characteristics={"distribution": "uniform"},
time_budget_seconds=90,
memory_limit_mb=512,
error_tolerance=0.05,
skill_params={"batch_size": 50, "cache_enabled": True},
expected_quality="good",
quality_metrics=["accuracy", "coverage"]
),
"scenario_complex": ScenarioDefinition(
name="complex",
difficulty="advanced",
complexity_multiplier=8.0,
input_size=800,
dataset_characteristics={"distribution": "skewed"},
time_budget_seconds=300,
memory_limit_mb=1024,
error_tolerance=0.1,
skill_params={"batch_size": 100, "cache_enabled": True, "parallel_workers": 4},
expected_quality="excellent",
quality_metrics=["accuracy", "coverage", "latency"]
),
# Progress tracking
"progress_simple": ScenarioProgress(scenario_id="simple"),
"progress_medium": ScenarioProgress(scenario_id="medium"),
"progress_complex": ScenarioProgress(scenario_id="complex"),
# Synchronization
"sync_points": {},
"last_sync_time": 0,
}
# Run with thread_id for checkpointing
config = {"configurable": {"thread_id": f"orch-{initial_state['orchestration_id']}"}}
print("Starting multi-scenario orchestration...")
result = await app.ainvoke(initial_state, config=config)
# Print results
final = result["final_results"]
print("\n" + "="*60)
print("ORCHESTRATION RESULTS")
print("="*60)
print(f"Orchestration ID: {final['orchestration_id']}")
print(f"Skill: {final['skill']}")
print("\nQuality Comparison:")
for scenario, score in final["comparison"]["quality_ranking"]["scores"].items():
print(f" {scenario}: {score:.2f}")
print("\nTime per Item (ms):")
for scenario, time in final["comparison"]["time_per_item_ms"].items():
print(f" {scenario}: {time:.2f}ms")
print("\nRecommendations:")
for rec in final["recommendations"]:
print(f" • {rec}")
if __name__ == "__main__":
asyncio.run(main())5. Helper Functions
def chunks(items: list, size: int):
"""Split items into chunks."""
for i in range(0, len(items), size):
yield items[i:i + size]
def generate_test_data(size: int, characteristics: dict) -> list:
"""Generate test data based on scenario characteristics."""
import random
distribution = characteristics.get("distribution", "uniform")
if distribution == "uniform":
return [{"id": i, "value": random.random()} for i in range(size)]
elif distribution == "skewed":
# Zipfian distribution
return [
{"id": i, "value": random.random() ** 2}
for i in range(size)
]
else:
return [{"id": i, "value": random.random()} for i in range(size)]
async def invoke_skill(
skill_name: str,
input_data: list,
params: dict
) -> dict:
"""
Invoke your skill here.
Replace with actual skill invocation.
"""
# Simulate processing
await asyncio.sleep(0.1) # 100ms per batch
return {
"processed": len(input_data),
"quality_score": 0.85 + (random.random() * 0.15),
"timestamp": datetime.now().isoformat()
}
def calculate_quality_metrics(batches: list, metrics: list[str]) -> dict:
"""Calculate quality metrics across batches."""
if not batches:
return {metric: 0.0 for metric in metrics}
scores = {
"accuracy": sum(b.get("quality_score", 0) for b in batches) / len(batches),
"coverage": 1.0,
}
return {metric: scores.get(metric, 0.0) for metric in metrics}6. Streaming Results (Real-time Progress)
async def stream_orchestration_progress(
app,
initial_state: ScenarioOrchestratorState,
config: dict
):
"""
Stream progress updates as scenarios execute.
"""
async for step in app.astream(initial_state, config=config, stream_mode="updates"):
print(f"\n[UPDATE] {step}")
# Extract progress from step
if "progress_simple" in step:
p = step["progress_simple"]
print(f" Simple: {p.progress_pct:.1f}% ({p.items_processed} items)")
if "progress_medium" in step:
p = step["progress_medium"]
print(f" Medium: {p.progress_pct:.1f}% ({p.items_processed} items)")
if "progress_complex" in step:
p = step["progress_complex"]
print(f" Complex: {p.progress_pct:.1f}% ({p.items_processed} items)")Key Features
- Fan-Out/Fan-In: All 3 scenarios execute in parallel
- Milestone Tracking: Progress recorded at key checkpoints
- Synchronization: Optional wait points at 30% and 70%
- Error Isolation: One scenario's failure doesn't block others
- Checkpointing: State saved to PostgreSQL for recovery
- Aggregation: Cross-scenario analysis and recommendations
- Streaming: Real-time progress updates
Testing
@pytest.mark.asyncio
async def test_multi_scenario_orchestration():
# Mock checkpointer
from langgraph.checkpoint.memory import MemorySaver
app = build_scenario_orchestrator(checkpointer=MemorySaver())
initial_state = {...} # Setup
config = {"configurable": {"thread_id": "test-123"}}
result = await app.ainvoke(initial_state, config=config)
assert result["final_results"]["orchestration_id"]
assert "simple" in result["final_results"]["metrics"]
assert "medium" in result["final_results"]["metrics"]
assert "complex" in result["final_results"]["metrics"]Microsoft Agent Framework
Microsoft Agent Framework
Microsoft Agent Framework (AutoGen + Semantic Kernel merger) patterns for enterprise multi-agent systems.
AssistantAgent Setup
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
# Create model client
model_client = OpenAIChatCompletionClient(
model="gpt-5.2",
api_key=os.environ["OPENAI_API_KEY"]
)
# Define assistant agent
assistant = AssistantAgent(
name="assistant",
description="A helpful AI assistant",
model_client=model_client,
system_message="You are a helpful assistant. Answer questions concisely."
)Team Patterns
Round Robin Chat
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
# Define team members
planner = AssistantAgent(
name="planner",
description="Plans tasks",
model_client=model_client,
system_message="You plan tasks. When done, say 'PLAN_COMPLETE'."
)
executor = AssistantAgent(
name="executor",
description="Executes tasks",
model_client=model_client,
system_message="You execute the plan. When done, say 'EXECUTION_COMPLETE'."
)
reviewer = AssistantAgent(
name="reviewer",
description="Reviews work",
model_client=model_client,
system_message="Review the work. Say 'APPROVED' if satisfactory."
)
# Create team with termination
termination = TextMentionTermination("APPROVED")
team = RoundRobinGroupChat(
participants=[planner, executor, reviewer],
termination_condition=termination
)
# Run team
result = await team.run(task="Create a marketing strategy")Selector Group Chat
from autogen_agentchat.teams import SelectorGroupChat
# Selector chooses next speaker based on context
team = SelectorGroupChat(
participants=[analyst, writer, reviewer],
model_client=model_client, # For selection decisions
termination_condition=termination
)Tool Integration
from autogen_core.tools import FunctionTool
# Define tool function
def search_database(query: str) -> str:
"""Search the database for information."""
results = db.search(query)
return json.dumps(results)
# Create tool
search_tool = FunctionTool(search_database, description="Search the database")
# Agent with tools
researcher = AssistantAgent(
name="researcher",
description="Researches information",
model_client=model_client,
tools=[search_tool],
system_message="Use the search tool to find information."
)Termination Conditions
from autogen_agentchat.conditions import (
TextMentionTermination,
MaxMessageTermination,
TokenUsageTermination,
TimeoutTermination
)
# Combine termination conditions
from autogen_agentchat.conditions import OrTerminationCondition
termination = OrTerminationCondition(
TextMentionTermination("DONE"),
MaxMessageTermination(max_messages=20),
TimeoutTermination(timeout_seconds=300)
)Streaming
# Stream team responses
async for message in team.run_stream(task="Analyze this data"):
print(f"{message.source}: {message.content}")State Management
from autogen_agentchat.state import TeamState
# Save state
state = await team.save_state()
# Restore state
await team.load_state(state)
# Resume conversation
result = await team.run(task="Continue from where we left off")Agent-to-Agent Protocol (A2A)
from autogen_agentchat.protocols import A2AProtocol
# Enable A2A for cross-organization agent communication
protocol = A2AProtocol(
agent=my_agent,
endpoint="https://api.example.com/agent",
auth_token=os.environ["A2A_TOKEN"]
)
# Send message to external agent
response = await protocol.send(
to="external-agent-id",
message="Process this request"
)Migration from AutoGen 0.2
# Old AutoGen 0.2 pattern
# from autogen import AssistantAgent, UserProxyAgent
# New AutoGen 0.4+ pattern
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
# Key differences:
# - No UserProxyAgent needed for simple tasks
# - Teams replace GroupChat
# - Explicit termination conditions required
# - Model client separate from agentConfiguration
- Model clients: OpenAI, Azure OpenAI, Anthropic supported
- Teams: RoundRobin, Selector, Custom
- Termination: Text mention, max messages, timeout, token usage
- Tools: FunctionTool wrapper for Python functions
- State: Full state serialization for persistence
Best Practices
- Termination conditions: Always set explicit termination
- Team size: 3-5 agents optimal for most workflows
- System messages: Clear role definitions in system_message
- Tool design: One function per tool, clear descriptions
- Error handling: Use try/except around team.run()
- Streaming: Use run_stream() for real-time feedback
Openai Agents Sdk
OpenAI Agents SDK
OpenAI Agents SDK (v0.7.0) patterns for handoffs, guardrails, agents-as-tools, sessions, MCP servers, and tracing.
Requirements
# Install (requires Python 3.9+, supports up to 3.14)
pip install openai-agents>=0.7.0
# Note: Requires openai v2.x (v1.x no longer supported)
# openai>=2.9.0,<3 is requiredBasic Agent Definition
from agents import Agent, Runner
agent = Agent(
name="assistant",
instructions="You are a helpful assistant that answers questions.",
model="gpt-5.2"
)
# Synchronous run
runner = Runner()
result = runner.run_sync(agent, "What is the capital of France?")
print(result.final_output)Sessions (v0.6.6+)
Sessions provide automatic conversation history management across agent runs.
from agents import Agent, Runner
from agents.sessions import SQLiteSession
# Create a session store
session = SQLiteSession(db_path="conversations.db")
# Agent with automatic history management
agent = Agent(
name="assistant",
instructions="You are a helpful assistant.",
model="gpt-5.2"
)
runner = Runner()
# Sessions automatically:
# - Retrieve conversation history before each run
# - Store new messages after each run
result = await runner.run(
agent,
"Remember my name is Alice",
session=session,
session_id="user-123"
)
# Later conversation - history is automatic
result = await runner.run(
agent,
"What is my name?", # Agent recalls "Alice"
session=session,
session_id="user-123"
)Session Types
from agents.sessions import (
SQLiteSession, # File-based persistence
AsyncSQLiteSession, # Async SQLite (v0.6.6+)
SQLAlchemySession, # Database-agnostic ORM
RedisSession, # Redis-backed sessions
EncryptedSession, # Encrypted storage
)
# AsyncSQLiteSession for async workflows
async_session = AsyncSQLiteSession(db_path="async_conversations.db")
# Auto-compaction for long conversations (v0.6.6+)
# Use responses.compact: "auto" to manage context lengthHandoffs Between Agents
from agents import Agent, handoff, RunConfig
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX
# Specialist agents
billing_agent = Agent(
name="billing",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You handle billing inquiries. Check account status and payment issues.
Hand back to triage when billing issue is resolved.""",
model="gpt-5.2"
)
support_agent = Agent(
name="support",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You handle technical support. Troubleshoot issues and provide solutions.
Hand back to triage when support issue is resolved.""",
model="gpt-5.2"
)
# Triage agent with handoffs
triage_agent = Agent(
name="triage",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are the first point of contact. Determine the nature of inquiries.
- Billing questions: hand off to billing
- Technical issues: hand off to support
- General questions: answer directly""",
model="gpt-5.2",
handoffs=[
handoff(agent=billing_agent),
handoff(agent=support_agent)
]
)Handoff History Packaging (v0.7.0)
In v0.7.0, nested handoffs are opt-in (previously enabled by default). When enabled, conversation history is collapsed into a single assistant message wrapped in a <CONVERSATION HISTORY> block.
from agents import RunConfig
# Enable nested handoff history (opt-in in v0.7.0)
config = RunConfig(nest_handoff_history=True)
result = await runner.run(
triage_agent,
"I have a billing question",
run_config=config
)
# History is packaged as a single assistant message:
# <CONVERSATION HISTORY>
# [collapsed prior transcript]
# </CONVERSATION HISTORY>Handoff Input Filters
from agents import handoff
from agents.extensions.handoff_filters import remove_all_tools
# Filter what the receiving agent sees
billing_handoff = handoff(
agent=billing_agent,
input_filter=remove_all_tools # Strip tool calls from history
)
# Custom input filter
def custom_filter(data):
# Modify HandoffInputData before passing to receiving agent
return data
support_handoff = handoff(
agent=support_agent,
input_filter=custom_filter,
nest_handoff_history=True # Per-handoff override
)MCPServerManager (v0.7.0)
Manage multiple MCP server instances with improved lifecycle safety.
from agents import Agent
from agents.mcp import MCPServerManager, MCPServerStdio, MCPServerStreamableHTTP
# Create MCP server manager for multiple servers
async with MCPServerManager() as manager:
# Add stdio-based MCP server
filesystem_server = await manager.add_server(
MCPServerStdio(
name="filesystem",
command="npx",
args=["-y", "@anthropic/mcp-filesystem", "/path/to/dir"]
)
)
# Add HTTP-based MCP server
api_server = await manager.add_server(
MCPServerStreamableHTTP(
name="api-tools",
url="http://localhost:8080/mcp"
)
)
# Get tools from all servers
all_tools = await manager.list_tools()
# Create agent with MCP tools
agent = Agent(
name="mcp_agent",
instructions="Use available tools to help the user.",
model="gpt-5.2",
mcp_servers=[filesystem_server, api_server]
)
runner = Runner()
result = await runner.run(agent, "List files in the current directory")Single MCP Server
from agents import Agent
from agents.mcp import MCPServerStdio
# Single MCP server (simpler pattern)
async with MCPServerStdio(
name="filesystem",
command="npx",
args=["-y", "@anthropic/mcp-filesystem", "/tmp"]
) as server:
agent = Agent(
name="file_agent",
instructions="Help with file operations.",
model="gpt-5.2",
mcp_servers=[server]
)
result = await runner.run(agent, "What files are available?")Agents as Tools
from agents import Agent, tool
# Define tool functions
@tool
def search_knowledge_base(query: str) -> str:
"""Search the knowledge base for relevant information."""
# Implementation
return search_results
@tool
def create_ticket(title: str, description: str, priority: str) -> str:
"""Create a support ticket in the system."""
ticket_id = ticket_system.create(title, description, priority)
return f"Created ticket {ticket_id}"
# Agent with tools
support_agent = Agent(
name="support",
instructions="Help users with technical issues. Search knowledge base first.",
model="gpt-5.2",
tools=[search_knowledge_base, create_ticket]
)Guardrails
from agents import Agent, InputGuardrail, OutputGuardrail
from agents.exceptions import InputGuardrailException
# Input guardrail
class ContentFilter(InputGuardrail):
async def check(self, input_text: str) -> str:
if contains_pii(input_text):
raise InputGuardrailException("PII detected in input")
return input_text
# Output guardrail
class ResponseValidator(OutputGuardrail):
async def check(self, output_text: str) -> str:
if contains_harmful_content(output_text):
return "I cannot provide that information."
return output_text
# Agent with guardrails
agent = Agent(
name="safe_assistant",
instructions="You are a helpful assistant.",
model="gpt-5.2",
input_guardrails=[ContentFilter()],
output_guardrails=[ResponseValidator()]
)Tool Guardrails (v0.6.5+)
from agents import tool, tool_guardrail
@tool_guardrail
def validate_ticket_priority(priority: str) -> str:
"""Validate ticket priority before creation."""
valid = ["low", "medium", "high", "critical"]
if priority.lower() not in valid:
raise ValueError(f"Priority must be one of: {valid}")
return priority
@tool
@validate_ticket_priority
def create_ticket(title: str, description: str, priority: str) -> str:
"""Create a support ticket."""
return f"Created ticket with priority {priority}"Tracing and Observability
from agents import Agent, Runner, trace
# Enable tracing
runner = Runner(trace=True)
# Custom trace spans
async def complex_workflow(task: str):
with trace.span("research_phase"):
research = await runner.run(researcher, task)
with trace.span("writing_phase"):
content = await runner.run(writer, research.final_output)
return content
# Access trace data
result = await runner.run(agent, "Process this request")
print(result.trace_id) # For debugging
# Per-run tracing API key (v0.6.5+)
config = RunConfig(tracing={"api_key": "sk-tracing-123"})
result = await runner.run(agent, "Task", run_config=config)Streaming Responses
from agents import Agent, Runner
agent = Agent(
name="streamer",
instructions="Provide detailed explanations.",
model="gpt-5.2"
)
runner = Runner()
# Stream response chunks
async for chunk in runner.run_streamed(agent, "Explain quantum computing"):
print(chunk.content, end="", flush=True)Multi-Agent Conversation
from agents import Agent, Runner
# Manual conversation management
runner = Runner()
# Initial run
result1 = await runner.run(triage_agent, "I need help with my account")
# Continue with result's input list
result2 = await runner.run(
result1.handoff_to or triage_agent,
"Can you check my billing?",
input=result1.to_input_list() # Carries conversation history
)
# Or use Sessions for automatic management (recommended)
from agents.sessions import SQLiteSession
session = SQLiteSession(db_path="conversations.db")
result = await runner.run(
triage_agent,
"I need help",
session=session,
session_id="conv-123"
)Realtime and Voice
from agents import Agent
from agents.realtime import RealtimeRunner, OpenAIRealtimeWebSocketModel
# WebSocket-based realtime agent
model = OpenAIRealtimeWebSocketModel(
model="gpt-5.2-realtime",
# Custom WebSocket options (v0.7.0+)
websocket_options={"ping_interval": 30}
)
agent = Agent(
name="voice_assistant",
instructions="You are a voice assistant.",
model=model
)
# Realtime runner for voice interactions
realtime_runner = RealtimeRunner()
await realtime_runner.run(agent, audio_stream)Configuration
Model Settings (v0.7.0)
from agents import Agent, RunConfig
# Note: Default reasoning.effort for gpt-5.1/5.2 is now "none"
# (previously "low"). Set explicitly if needed:
config = RunConfig(
model_settings={
"reasoning": {"effort": "low"} # Restore previous behavior
}
)
agent = Agent(
name="assistant",
instructions="You are helpful.",
model="gpt-5.2"
)
result = await runner.run(agent, "Complex task", run_config=config)Provider Support
The SDK supports 100+ LLMs via LiteLLM integration:
from agents import Agent, set_default_openai_api
# Use Chat Completions API (for non-OpenAI providers)
set_default_openai_api("chat_completions")
# Or use LiteLLM for other providers
# pip install openai-agents[litellm]
from agents.extensions.litellm import LiteLLMModel
agent = Agent(
name="claude_agent",
instructions="You are helpful.",
model=LiteLLMModel(model="anthropic/claude-sonnet-4-20250514")
)Best Practices
- Handoff clarity: Use
RECOMMENDED_PROMPT_PREFIXfor reliable handoffs - Tool documentation: Clear docstrings improve tool selection accuracy
- Guardrail layers: Combine input + output guardrails for defense-in-depth
- Tracing: Always enable in production for debugging
- Error handling: Catch guardrail exceptions gracefully
- Sessions: Use for multi-turn conversations instead of manual history
- MCP servers: Use
MCPServerManagerfor multiple servers with proper lifecycle
Breaking Changes in v0.7.0
| Change | Previous | v0.7.0 |
|---|---|---|
| Nested handoffs | Enabled by default | Opt-in via nest_handoff_history=True |
| Reasoning effort | Default "low" | Default "none" for gpt-5.1/5.2 |
| Session input | Required callback | Auto-append (callback optional) |
| OpenAI library | v1.x supported | Requires v2.x (>=2.9.0) |
Version History
- v0.7.0 (Jan 2026): MCPServerManager, opt-in nested handoffs, session input auto-append
- v0.6.6 (Jan 2026): Auto-compaction, AsyncSQLiteSession
- v0.6.5 (Jan 2026): Per-run tracing, tool guardrails decorator
- v0.6.0: Sessions feature, improved handoff history
Skill Agnostic Template
Skill-Agnostic Template Abstraction
Reusable template framework for applying multi-scenario orchestration to ANY user-invocable skill.
Template Architecture
┌─────────────────────────────────────────────────────────────┐
│ SKILL ORCHESTRATION TEMPLATE │
├─────────────────────────────────────────────────────────────┤
│ │
│ [Abstract Orchestrator] │
│ ├─ Scenario Definition (parameterized) │
│ ├─ State Machine (generic) │
│ ├─ Synchronization (milestone-based) │
│ └─ Aggregation (cross-scenario) │
│ │
│ [Skill Adapter Layer] ◄── Pluggable per skill │
│ ├─ invoke_skill() │
│ ├─ calculate_quality_metrics() │
│ ├─ scenario_configurations() │
│ └─ expected_outcomes() │
│ │
│ [Target Skill] ◄── Your skill here │
│ ├─ skill-name │
│ ├─ skill-version │
│ └─ skill-params │
│ │
└─────────────────────────────────────────────────────────────┘Generic Orchestrator Class
from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar
from dataclasses import dataclass
T = TypeVar("T") # Result type
class SkillOrchestrator(ABC, Generic[T]):
"""
Abstract orchestrator for any user-invocable skill.
Subclass for each skill and implement abstract methods.
"""
def __init__(self, skill_name: str, skill_version: str):
self.skill_name = skill_name
self.skill_version = skill_version
# ──────────────────────────────────────────────────────────
# Abstract Methods (MUST implement per skill)
# ──────────────────────────────────────────────────────────
@abstractmethod
async def invoke_skill(
self,
input_data: list[dict],
scenario_params: dict,
) -> dict[str, Any]:
"""
Invoke your skill on input data.
Args:
input_data: Items to process
scenario_params: Skill-specific parameters (batch_size, etc.)
Returns:
dict with keys:
- "processed": int (items successfully processed)
- "results": list[dict] (processed items)
- "quality_score": float (0-1)
- "metrics": dict (skill-specific metrics)
- "errors": list[str] (any errors encountered)
"""
pass
@abstractmethod
def get_scenario_configs(self) -> dict[str, dict]:
"""
Return scenario configurations for simple/medium/complex.
Returns:
{
"simple": {
"input_size": 100,
"time_budget_seconds": 30,
"batch_size": 10,
"skill_params": {...}
},
"medium": {...},
"complex": {...}
}
"""
pass
@abstractmethod
def calculate_quality_metrics(
self,
results: list[dict],
metric_names: list[str]
) -> dict[str, float]:
"""
Calculate quality metrics from results.
Args:
results: Processed items from skill
metric_names: ["accuracy", "coverage", ...]
Returns:
{
"accuracy": 0.92,
"coverage": 0.98,
...
}
"""
pass
@abstractmethod
def generate_test_data(
self,
size: int,
characteristics: dict
) -> list[dict]:
"""
Generate test data for this skill.
Args:
size: Number of items
characteristics: {"distribution": "uniform|skewed|clustered"}
Returns:
list of test items matching skill's input format
"""
pass
# ──────────────────────────────────────────────────────────
# Concrete Methods (reusable across all skills)
# ──────────────────────────────────────────────────────────
async def run_scenario(
self,
scenario_id: str,
orchestration_id: str,
) -> dict:
"""
Execute one scenario (simple/medium/complex).
Concrete implementation—uses abstract methods.
"""
config = self.get_scenario_configs()[scenario_id]
# Generate test data
test_data = self.generate_test_data(
size=config["input_size"],
characteristics=config.get("dataset_characteristics", {})
)
# Process in batches
all_results = []
batch_size = config.get("batch_size", 10)
for batch_idx, batch in enumerate(chunks(test_data, batch_size)):
# Invoke skill (abstract—implemented per skill)
result = await self.invoke_skill(batch, config["skill_params"])
all_results.extend(result.get("results", []))
progress_pct = (len(all_results) / len(test_data)) * 100
print(f"[{scenario_id}] Progress: {progress_pct:.1f}%")
# Calculate quality (abstract—implemented per skill)
quality = self.calculate_quality_metrics(
all_results,
config.get("quality_metrics", ["accuracy"])
)
return {
"scenario_id": scenario_id,
"orchestration_id": orchestration_id,
"items_processed": len(all_results),
"quality": quality,
"results": all_results,
}
async def orchestrate(
self,
orchestration_id: str
) -> dict:
"""
Run all 3 scenarios in parallel and aggregate results.
Concrete implementation—reusable for all skills.
"""
import asyncio
# Run scenarios in parallel
results = await asyncio.gather(
self.run_scenario("simple", orchestration_id),
self.run_scenario("medium", orchestration_id),
self.run_scenario("complex", orchestration_id),
return_exceptions=True
)
# Aggregate
aggregated = self.aggregate_results(results)
return aggregated
def aggregate_results(self, scenario_results: list[dict]) -> dict:
"""
Combine results from all 3 scenarios.
Generic aggregation—works for any skill.
"""
# Extract results by scenario
by_scenario = {r.get("scenario_id"): r for r in scenario_results if not isinstance(r, Exception)}
# Quality comparison
quality_comparison = {
sid: r.get("quality", {})
for sid, r in by_scenario.items()
}
# Time complexity (simulated—customize per skill)
simple_items = by_scenario.get("simple", {}).get("items_processed", 1)
medium_items = by_scenario.get("medium", {}).get("items_processed", 1)
complex_items = by_scenario.get("complex", {}).get("items_processed", 1)
scaling_ratio = complex_items / simple_items if simple_items > 0 else 1
return {
"orchestration_id": scenario_results[0].get("orchestration_id"),
"skill": self.skill_name,
"timestamp": datetime.now().isoformat(),
"results_by_scenario": by_scenario,
"quality_comparison": quality_comparison,
"scaling_analysis": {
"simple_vs_complex": scaling_ratio,
"recommendation": self._scaling_recommendation(scaling_ratio)
},
"success_rate": sum(1 for r in scenario_results if not isinstance(r, Exception)) / len(scenario_results),
}
@staticmethod
def _scaling_recommendation(ratio: float) -> str:
"""Recommend based on scaling behavior."""
if ratio < 1.5:
return "Excellent sublinear scaling"
elif ratio < 3:
return "Good linear scaling"
elif ratio < 10:
return "Acceptable superlinear scaling"
else:
return "Poor scaling—investigate bottlenecks"Example Implementation: Performance Testing
class PerformanceTestingOrchestrator(SkillOrchestrator):
"""Multi-scenario orchestration for performance-testing skill."""
async def invoke_skill(
self,
input_data: list[dict],
scenario_params: dict,
) -> dict:
"""
Invoke k6 or Locust performance test.
"""
import asyncio
# Mock: simulate running k6 with duration
duration_ms = scenario_params.get("duration_ms", 5000)
await asyncio.sleep(duration_ms / 1000) # Simulate k6 execution
return {
"processed": len(input_data),
"results": [
{
"endpoint": item["url"],
"response_time_ms": 100 + (i * 50),
"status": "200" if i % 10 != 0 else "500",
}
for i, item in enumerate(input_data)
],
"quality_score": 0.85,
"metrics": {
"p95_latency_ms": 450,
"p99_latency_ms": 500,
"error_rate": 0.01,
},
"errors": [],
}
def get_scenario_configs(self) -> dict:
return {
"simple": {
"input_size": 10, # 10 endpoints
"time_budget_seconds": 30,
"batch_size": 5,
"dataset_characteristics": {"distribution": "uniform"},
"quality_metrics": ["latency", "error_rate"],
"skill_params": {
"duration_ms": 5000,
"ramp_up_seconds": 2,
"load_profile": "steady"
}
},
"medium": {
"input_size": 30, # 30 endpoints
"time_budget_seconds": 90,
"batch_size": 15,
"dataset_characteristics": {"distribution": "uniform"},
"quality_metrics": ["latency", "error_rate"],
"skill_params": {
"duration_ms": 30000,
"ramp_up_seconds": 5,
"load_profile": "ramp_and_hold"
}
},
"complex": {
"input_size": 80, # 80 endpoints
"time_budget_seconds": 300,
"batch_size": 40,
"dataset_characteristics": {"distribution": "skewed"},
"quality_metrics": ["latency", "error_rate", "throughput"],
"skill_params": {
"duration_ms": 120000,
"ramp_up_seconds": 10,
"load_profile": "spike"
}
}
}
def calculate_quality_metrics(
self,
results: list[dict],
metric_names: list[str]
) -> dict:
"""Calculate latency and error rate metrics."""
if not results:
return {m: 0.0 for m in metric_names}
latencies = [r.get("response_time_ms", 0) for r in results]
errors = sum(1 for r in results if r.get("status", "200") != "200")
scores = {}
if "latency" in metric_names:
# p95 latency score: higher is better (inverse of latency)
p95 = sorted(latencies)[int(len(latencies) * 0.95)]
# Score: 1.0 if p95 < 100ms, 0.0 if > 500ms
scores["latency"] = max(0, min(1.0, (500 - p95) / 400))
if "error_rate" in metric_names:
# Error rate score: 1.0 if 0%, 0.0 if > 5%
error_pct = (errors / len(results)) * 100 if results else 0
scores["error_rate"] = max(0, min(1.0, (5 - error_pct) / 5))
if "throughput" in metric_names:
# Throughput score (normalized)
rps = len(results) / 30 # Assume 30-second test
scores["throughput"] = min(1.0, rps / 100) # Max 100 RPS = 1.0
return scores
def generate_test_data(
self,
size: int,
characteristics: dict
) -> list[dict]:
"""Generate endpoints to test."""
endpoints = [
{"url": f"https://api.example.com/endpoint/{i}", "method": "GET"}
for i in range(size)
]
if characteristics.get("distribution") == "skewed":
# Repeat 20% of endpoints (hot paths)
hot_endpoints = endpoints[:int(size * 0.2)]
return endpoints + hot_endpoints
return endpointsExample Implementation: Security Scanning
class SecurityScanningOrchestrator(SkillOrchestrator):
"""Multi-scenario orchestration for security-scanning skill."""
async def invoke_skill(
self,
input_data: list[dict],
scenario_params: dict,
) -> dict:
"""Invoke security scanner on code files."""
import random
# Simulate scanning files
results = []
for item in input_data:
findings = []
# Simulate vulnerability detection
if random.random() > 0.9:
findings.append({
"type": "SQL_INJECTION",
"severity": "HIGH",
"line": random.randint(1, 100)
})
results.append({
"file": item["path"],
"scanned": True,
"findings": findings,
"score": 1.0 - (len(findings) * 0.1),
})
return {
"processed": len(input_data),
"results": results,
"quality_score": 0.95,
"metrics": {
"vulnerabilities_found": sum(len(r.get("findings", [])) for r in results),
"files_scanned": len(results),
},
"errors": [],
}
def get_scenario_configs(self) -> dict:
return {
"simple": {
"input_size": 20, # 20 files
"time_budget_seconds": 45,
"batch_size": 10,
"dataset_characteristics": {"distribution": "uniform"},
"quality_metrics": ["coverage", "accuracy"],
"skill_params": {
"scan_depth": "shallow",
"rules": "OWASP_TOP_10"
}
},
"medium": {
"input_size": 100, # 100 files
"time_budget_seconds": 120,
"batch_size": 50,
"dataset_characteristics": {"distribution": "uniform"},
"quality_metrics": ["coverage", "accuracy"],
"skill_params": {
"scan_depth": "medium",
"rules": "OWASP_TOP_10 + CWE_TOP_25"
}
},
"complex": {
"input_size": 500, # 500 files
"time_budget_seconds": 600,
"batch_size": 100,
"dataset_characteristics": {"distribution": "skewed"},
"quality_metrics": ["coverage", "accuracy", "false_positive_rate"],
"skill_params": {
"scan_depth": "deep",
"rules": "ALL",
"enable_ml_detection": True
}
}
}
def calculate_quality_metrics(
self,
results: list[dict],
metric_names: list[str]
) -> dict:
"""Calculate security scanning quality metrics."""
total_files = len(results)
scanned_files = sum(1 for r in results if r.get("scanned"))
total_findings = sum(len(r.get("findings", [])) for r in results)
scores = {}
if "coverage" in metric_names:
# Coverage: percentage of files successfully scanned
scores["coverage"] = scanned_files / max(1, total_files)
if "accuracy" in metric_names:
# Accuracy: average finding score (file-level quality)
avg_score = sum(r.get("score", 0) for r in results) / max(1, total_files)
scores["accuracy"] = avg_score
if "false_positive_rate" in metric_names:
# FPR: simulated (would need manual validation in production)
scores["false_positive_rate"] = 1.0 - (total_findings / (total_findings + 10))
return scores
def generate_test_data(
self,
size: int,
characteristics: dict
) -> list[dict]:
"""Generate Python/JS files to scan."""
files = [
{"path": f"src/module_{i:03d}.py"}
for i in range(size)
]
if characteristics.get("distribution") == "skewed":
# Add some large files (skew toward complexity)
large_files = [
{"path": f"src/legacy_module_{i:03d}.py", "size_lines": 5000}
for i in range(int(size * 0.1))
]
return files + large_files
return filesRegistration and Factory Pattern
class OrchestratorRegistry:
"""Register orchestrators for each skill."""
_registry: dict[str, type[SkillOrchestrator]] = {}
@classmethod
def register(cls, skill_name: str, orchestrator_class: type):
"""Register an orchestrator for a skill."""
cls._registry[skill_name] = orchestrator_class
@classmethod
def get(cls, skill_name: str) -> SkillOrchestrator:
"""Instantiate orchestrator for a skill."""
if skill_name not in cls._registry:
raise ValueError(f"No orchestrator registered for {skill_name}")
return cls._registry[skill_name](skill_name, version="1.0.0")
# Registration
OrchestratorRegistry.register("performance-testing", PerformanceTestingOrchestrator)
OrchestratorRegistry.register("security-scanning", SecurityScanningOrchestrator)
# Usage
orchestrator = OrchestratorRegistry.get("performance-testing")
result = await orchestrator.orchestrate("demo-001")Integration with LangGraph
async def scenario_worker_generic(state: ScenarioOrchestratorState) -> dict:
"""Generic worker that uses skill-agnostic orchestrator."""
scenario_id = state.get("scenario_id")
skill_name = state["skill_name"]
# Get orchestrator for this skill
orchestrator = OrchestratorRegistry.get(skill_name)
# Run scenario
result = await orchestrator.run_scenario(
scenario_id=scenario_id,
orchestration_id=state["orchestration_id"]
)
return {f"progress_{scenario_id}": result}Adding a New Skill
To orchestrate a NEW skill, follow these steps:
Step 1: Create Orchestrator Class
# file: backend/app/workflows/multi_scenario/my_skill_orchestrator.py
from skill_agnostic_template import SkillOrchestrator
class MySkillOrchestrator(SkillOrchestrator):
async def invoke_skill(self, input_data, scenario_params):
# TODO: Call your skill here
pass
def get_scenario_configs(self):
# TODO: Define simple/medium/complex configs
pass
def calculate_quality_metrics(self, results, metric_names):
# TODO: Implement quality scoring
pass
def generate_test_data(self, size, characteristics):
# TODO: Generate test data for your skill
passStep 2: Register
OrchestratorRegistry.register("my-skill", MySkillOrchestrator)Step 3: Run
orchestrator = OrchestratorRegistry.get("my-skill")
result = await orchestrator.orchestrate("demo-001")Total implementation time: 15-30 minutes per skill
Benefits of Abstraction
| Benefit | Impact |
|---|---|
| No state machine boilerplate | 10x faster to add new skill |
| Consistent patterns | Team learns once, applies everywhere |
| Automatic comparison | All skills compared same way |
| Easy to extend | Override only the methods you need |
| LangGraph integration ready | Works with checkpointing, streaming, etc. |
Testing Template
@pytest.fixture
def orchestrator():
return PerformanceTestingOrchestrator("performance-testing", "1.0.0")
@pytest.mark.asyncio
async def test_simple_scenario(orchestrator):
result = await orchestrator.run_scenario("simple", "test-001")
assert result["scenario_id"] == "simple"
assert result["items_processed"] > 0
assert "quality" in result
assert result["quality"]["latency"] > 0
@pytest.mark.asyncio
async def test_full_orchestration(orchestrator):
result = await orchestrator.orchestrate("test-002")
assert "results_by_scenario" in result
assert len(result["results_by_scenario"]) == 3
assert result["success_rate"] > 0.5State Machine Design
State Machine Design: Multi-Scenario Orchestration
Detailed state machine and abstraction patterns for ANY user-invocable skill.
Core State Machine
┌────────────────────────────────────────────────────────────────────────────┐
│ SCENARIO STATE MACHINE │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────┐ │
│ │ PENDING │ │
│ │ (awaiting start signal) │ │
│ └──────┬───────────────────────┘ │
│ │ supervisor.route_scenarios() │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ RUNNING │ │
│ │ (executing skill batches) │◄──────────┐ │
│ └──────┬───────────────────────┘ │ │
│ │ │ │
│ ┌────────────┼────────────┐ │ │
│ │ │ │ │ │
│ [pause] [milestone] [error] [resume] │
│ │ │ │ │ │
│ ▼ ▼ ▼ │ │
│ ┌────────┐ ┌─────────┐ ┌──────────┐ │ │
│ │ PAUSED │ │MILESTONE│ │ FAILED │ │ │
│ │(waiting)│ │(sync pt)│ │(error) │ │ │
│ └────┬───┘ └────┬────┘ └──────────┘ │ │
│ │ │ │ │
│ │[resume] │[continue] │ │
│ │ │ │ │
│ └─────┬─────┘ │ │
│ │ │ │
│ [error recovery]──────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ COMPLETE │ │
│ │ (all batches processed) │ │
│ └──────────────────────────────┘ │
│ │ │
│ │ aggregator.collect_and_aggregate() │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ FINAL_RESULTS │ │
│ │ (ready for comparison) │ │
│ └──────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Legend:
─────────────────
→ Transition
[action] = trigger/activityDetailed State Definitions
State: PENDING
Entry Point: Created at orchestration start
@dataclass
class PendingState:
scenario_id: str = "simple|medium|complex"
orchestration_id: str
created_at: datetime
definition: ScenarioDefinition
# Awaiting signal from supervisor
waiting_for_start: bool = True
# Supervisor signal triggers transition
# Event: supervisor.route_scenarios() returns Send("scenario_worker", ...)
# Transition: PENDING → RUNNINGExit Condition: Supervisor sends routing command
State: RUNNING
Active Work State: Executing skill batches
@dataclass
class RunningState:
scenario_id: str
status: Literal["running"] = "running"
# Progress tracking
progress_pct: float = 0.0 # 0-100
current_milestone: str = "batch_1"
milestones_reached: list[str] = field(default_factory=list)
# Metrics (accumulating)
items_processed: int = 0
batch_count: int = 0
elapsed_ms: int = 0
memory_used_mb: int = 0
# Results (partial)
partial_results: list[dict] = field(default_factory=list)
quality_scores: dict = field(default_factory=dict)
# Error tracking
errors: list[dict] = field(default_factory=list)
# Transition flags
should_pause: bool = False
should_continue: bool = TrueActivities:
- Execute skill batch N
- Update progress_pct = (items_processed / input_size) * 100
- Check if progress_pct reached milestone (30%, 50%, 70%, 90%)
- Record partial results
- Monitor memory and elapsed time
Exit Conditions:
progress_pct >= 100→ COMPLETEerror_occurred and recovery_possible→ PAUSE (or retry)error_occurred and recovery_failed→ FAILED
State: PAUSED
Waiting State: At milestone synchronization point or for recovery
@dataclass
class PausedState:
scenario_id: str
status: Literal["paused"] = "paused"
# Why paused?
pause_reason: Literal["sync_point", "waiting_for_recovery", "user_halt"] = "sync_point"
# Checkpoint (for resuming)
checkpoint_index: int # Which batch to resume from
checkpoint_state: dict # Full state snapshot
# Sync info (for milestone pause)
milestone_pct: int
other_scenarios_status: dict # {"simple": 45, "medium": 30, "complex": 10}
time_paused_ms: int = 0
# Recovery info (for error pause)
retry_count: int = 0
max_retries: int = 3
last_error: str = ""Activities:
- Wait for other scenarios (if sync_point)
- Wait for recovery decision (if error)
- Monitor timeout (max 60 seconds)
- Record checkpoint for resumption
Exit Conditions:
all_scenarios_at_milestone→ RUNNING (resume)timeout and waiting_for_others→ RUNNING (proceed anyway)retry_count < max_retries→ RUNNING (retry)retry_count >= max_retries→ FAILED
State: FAILED
Terminal Error State: Unrecoverable error
@dataclass
class FailedState:
scenario_id: str
status: Literal["failed"] = "failed"
# Error details
error_message: str
error_type: str # "timeout", "exception", "memory", "validation"
stack_trace: str
# Progress at failure
progress_pct: float
items_processed: int
batch_failed: int
# Attempted recovery
recovery_attempted: bool = False
recovery_method: str = ""
# Partial results (if any)
partial_results: list[dict] = field(default_factory=list)Activities:
- Log error with full context
- Store partial results (won't complete, but data preserved)
- Notify coordinator
- Update checkpoint for debugging
Exit Condition: Terminal state (no transition out)
State: COMPLETE
Success State: All batches processed
@dataclass
class CompleteState:
scenario_id: str
status: Literal["complete"] = "complete"
# Final metrics
total_items_processed: int
total_batches: int
total_elapsed_ms: int
peak_memory_mb: int
# Final results
results: dict # Full aggregated output
quality_scores: dict
error_count: int = 0
# Comparison data
time_per_item_ms: float = 0.0
items_per_second: float = 0.0
# Quality assessment
quality_rank: Literal["basic", "good", "excellent"]
recommendation: strActivities:
- Calculate final metrics
- Score results
- Prepare for comparison
- Signal readiness for aggregation
Exit Condition: Transition to aggregator
State: FINAL_RESULTS
Aggregated State: All scenarios combined
@dataclass
class AggregatedFinalResults:
orchestration_id: str
timestamp: datetime
# Per-scenario results
results: dict # {"simple": {...}, "medium": {...}, "complex": {...}}
# Comparative analysis
quality_ranking: dict
time_complexity: dict # {"simple": 1.2ms/item, "medium": 1.8ms/item, ...}
scaling_efficiency: float # simple_time / complex_time / (simple_size / complex_size)
# Cross-scenario patterns
success_patterns: list[str]
failure_modes: list[str]
optimization_opportunities: list[str]
# Recommendations
best_difficulty: Literal["simple", "medium", "complex"]
resource_requirements: dict
estimated_production_scaling: dictTransition Rules
PENDING → RUNNING
Trigger: supervisor.route_scenarios() returns Send commands
async def supervisor_trigger(state):
return [
Send("scenario_worker", {"scenario_id": "simple", ...}),
Send("scenario_worker", {"scenario_id": "medium", ...}),
Send("scenario_worker", {"scenario_id": "complex", ...}),
]Guard: None (always allowed)
Actions on Entry:
- Set
status = "running" - Initialize
start_time_ms = now() - Start processing first batch
RUNNING → MILESTONE (PAUSED at sync point)
Trigger: progress_pct in [30, 50, 70, 90] AND synchronize_at_milestone
# In scenario_worker
if progress_pct in [30, 50, 70, 90]:
should_sync = await synchronize_at_milestone(milestone_pct, state)
if should_sync:
# Transition to PAUSED
state.status = "paused"
state.pause_reason = "sync_point"
state.milestone_pct = progress_pctGuard: Other scenarios must be within 5% of milestone OR timeout expires
Actions on Entry:
- Record checkpoint
- Wait for other scenarios (max 60s)
- Monitor progress of siblings
PAUSED → RUNNING (Resume after sync)
Trigger: All scenarios reached milestone OR timeout
# Monitor loop in sync node
if all_at_milestone or timeout_expired:
# Resume
state.status = "running"
state.pause_reason = None
# Continue from checkpointGuard: None (always allowed)
Actions on Entry:
- Load checkpoint
- Resume from interrupted batch
- Continue processing
RUNNING → COMPLETE
Trigger: items_processed >= input_size
# In scenario_worker
for batch_idx, batch in enumerate(batches):
await invoke_skill(batch)
items_processed += len(batch)
if items_processed >= scenario_def.input_size:
state.status = "complete"
breakGuard: None
Actions on Entry:
- Calculate final metrics
- Score all results
- Prepare aggregation data
RUNNING → PAUSED (Error recovery pause)
Trigger: Exception in skill invocation, recovery possible
try:
result = await invoke_skill(batch)
except SkillException as e:
if can_recover(e):
state.status = "paused"
state.pause_reason = "waiting_for_recovery"
state.last_error = str(e)
state.retry_count += 1
# Decision logic for retryGuard: retry_count < max_retries (usually 3)
Actions on Entry:
- Save checkpoint before error
- Log error details
- Increment retry counter
- Wait for retry decision
PAUSED → RUNNING (Retry after error)
Trigger: Manual retry decision or automatic retry eligible
if error_state.retry_count < max_retries:
state.status = "running"
# Load checkpoint, try againGuard: retry_count < max_retries
RUNNING → FAILED
Trigger: Exception and recovery not possible
try:
result = await invoke_skill(batch)
except FatalException as e:
state.status = "failed"
state.error_message = str(e)
state.error_type = type(e).__name__Guard: retry_count >= max_retries OR is_fatal_error
Actions on Entry:
- Log full error context
- Store partial results (if any)
- Notify aggregator about failure
- Don't block other scenarios
COMPLETE → FINAL_RESULTS
Trigger: All 3 scenarios reached COMPLETE or FAILED
# In aggregator_node
simple_complete = state.progress_simple.status in ["complete", "failed"]
medium_complete = state.progress_medium.status in ["complete", "failed"]
complex_complete = state.progress_complex.status in ["complete", "failed"]
if simple_complete and medium_complete and complex_complete:
return aggregate_results(state)Guard: All scenarios in terminal state
Actions on Entry:
- Merge results from all scenarios
- Calculate comparisons
- Extract patterns
- Generate recommendations
Abstraction for ANY Skill
The state machine remains skill-agnostic. Customization happens in:
- Skill Invocation:
invoke_skill()(your skill here) - Quality Metrics:
calculate_quality_metrics()(what to measure) - Scenario Parameters: Batch size, timeout, memory (adjust per skill)
Skill Integration Point
async def invoke_skill(
skill_name: str, # "performance-testing", "security-scanning", etc.
input_data: list, # Items to process
skill_params: dict, # Skill-specific config
) -> dict:
"""
Call any user-invocable skill.
This is the ONLY skill-specific code in the state machine.
"""
# Match skill_name and invoke appropriately
if skill_name == "performance-testing":
return await invoke_performance_testing(input_data, skill_params)
elif skill_name == "security-scanning":
return await invoke_security_scanning(input_data, skill_params)
elif skill_name == "your-skill":
return await invoke_your_skill(input_data, skill_params)
else:
raise ValueError(f"Unknown skill: {skill_name}")Quality Metrics Adapter
def calculate_quality_metrics(
skill_name: str,
results: list[dict],
metric_names: list[str]
) -> dict:
"""
Calculate quality metrics (skill-specific).
"""
scores = {}
# Skill-agnostic metrics (always available)
scores["completion_rate"] = len(results) / max(1, len(results))
scores["error_rate"] = sum(1 for r in results if r.get("error")) / max(1, len(results))
# Skill-specific metrics
if "accuracy" in metric_names:
# For security-scanning: vulnerability count vs. expected
# For performance-testing: actual latency vs. threshold
scores["accuracy"] = calculate_accuracy(skill_name, results)
if "coverage" in metric_names:
# For security-scanning: percentage of codebase scanned
# For performance-testing: percentage of endpoints tested
scores["coverage"] = calculate_coverage(skill_name, results)
return scoresScenario Parameter Templates
SCENARIO_TEMPLATES = {
# Generic: 1x, 3x, 8x scaling
"default": {
"simple": {"multiplier": 1.0, "timeout": 30, "batch_size": 10},
"medium": {"multiplier": 3.0, "timeout": 90, "batch_size": 50},
"complex": {"multiplier": 8.0, "timeout": 300, "batch_size": 100},
},
# Performance-testing: lighter loads
"performance-testing": {
"simple": {"multiplier": 1.0, "timeout": 60, "batch_size": 5},
"medium": {"multiplier": 2.0, "timeout": 180, "batch_size": 20},
"complex": {"multiplier": 4.0, "timeout": 600, "batch_size": 50},
},
# Security-scanning: heavier loads
"security-scanning": {
"simple": {"multiplier": 1.0, "timeout": 45, "batch_size": 20},
"medium": {"multiplier": 2.5, "timeout": 120, "batch_size": 100},
"complex": {"multiplier": 10.0, "timeout": 900, "batch_size": 500},
},
}
def get_scenario_config(skill_name: str, difficulty: str) -> dict:
template = SCENARIO_TEMPLATES.get(skill_name, SCENARIO_TEMPLATES["default"])
return template[difficulty]Key State Machine Patterns
Pattern 1: Optimistic Completion
Assume success, handle errors reactively:
# Default behavior: run until complete
for batch in batches:
result = await invoke_skill(batch)
# No error checking—complete normally
# Only if exception: enter error recoveryPro: Efficient for reliable skills Con: Slower error detection
Pattern 2: Pessimistic Validation
Validate each batch before continuing:
for batch in batches:
result = await invoke_skill(batch)
# Validate result
if not validate_result(result, scenario_def.expected_quality):
# Error recovery
retry_count += 1Pro: Catches issues early Con: Higher overhead
Pattern 3: Timeout-Based State Transitions
Use elapsed time to trigger state changes:
@dataclass
class TimedState:
created_at: int
timeout_seconds: int
def is_expired(self) -> bool:
return (now() - created_at) > timeout_seconds * 1000
# Check in state machine
if paused_state.is_expired():
# Force transition (to RUNNING or FAILED)Visualizing State Transitions
# Generate state transition diagram for debugging
import graphviz
def generate_state_diagram():
dot = graphviz.Digraph(comment="Scenario State Machine")
states = ["PENDING", "RUNNING", "PAUSED", "COMPLETE", "FAILED", "FINAL_RESULTS"]
for state in states:
dot.node(state, shape="ellipse")
transitions = [
("PENDING", "RUNNING", "supervisor.route()"),
("RUNNING", "PAUSED", "milestone reached"),
("RUNNING", "COMPLETE", "all items processed"),
("RUNNING", "FAILED", "fatal error"),
("PAUSED", "RUNNING", "sync complete or timeout"),
("PAUSED", "FAILED", "max retries exceeded"),
("COMPLETE", "FINAL_RESULTS", "aggregation trigger"),
("FAILED", "FINAL_RESULTS", "aggregation trigger"),
]
for src, dst, label in transitions:
dot.edge(src, dst, label=label)
dot.render("/tmp/state_machine", format="png", view=True)Testing State Transitions
@pytest.mark.asyncio
async def test_state_transition_pending_to_running():
state = ScenarioOrchestratorState(...)
assert state.progress_simple.status == "pending"
# Trigger supervisor
commands = await scenario_supervisor(state)
assert len(commands) == 3 # 3 Send commands
@pytest.mark.asyncio
async def test_state_transition_running_to_complete():
state = ScenarioOrchestratorState(...)
state.progress_simple.status = "running"
# Simulate processing all items
state.progress_simple.items_processed = 100
# Invoke worker
result = await scenario_worker(state)
assert result["progress_simple"]["status"] == "complete"Checklists (2)
Framework Selection
Framework Selection Checklist
Choose the right multi-agent framework.
Requirements Analysis
- Use case clearly defined
- Complexity level assessed (single vs multi-agent)
- State management needs identified
- Human-in-the-loop requirements defined
- Observability needs documented
Framework Evaluation
LangGraph
- Need complex stateful workflows
- Require persistence and checkpoints
- Want streaming support
- Need human-in-the-loop
- Already using LangChain ecosystem
CrewAI
- Role-based collaboration pattern
- Hierarchical team structure
- Agent delegation needed
- Quick prototyping required
- Built-in memory preferred
OpenAI Agents SDK
- OpenAI-native ecosystem
- Handoff pattern fits use case
- Need built-in guardrails
- Want OpenAI tracing
- Simpler agent definition preferred
Microsoft Agent Framework
- Enterprise compliance requirements
- Using Azure ecosystem
- Need A2A protocol support
- Want AutoGen+SK merger features
- Long-term Microsoft support preferred
AG2 (Community AutoGen)
- Open-source flexibility priority
- Community-driven development OK
- AutoGen familiarity exists
- Custom modifications needed
Technical Considerations
- Team expertise with framework
- Framework maturity level acceptable
- Community support adequate
- Documentation quality sufficient
- Production readiness validated
Integration Assessment
- Observability tool compatibility (Langfuse, etc.)
- LLM provider compatibility
- Existing codebase integration
- Testing framework support
- CI/CD pipeline compatibility
Risk Mitigation
- Fallback strategy defined
- Framework lock-in assessed
- Migration path understood
- Version update strategy
- Community health evaluated
Decision Documentation
- Framework choice documented
- Rationale recorded
- Alternatives considered listed
- Trade-offs acknowledged
- Review date scheduled
Orchestration Checklist
Multi-Agent Orchestration Checklist
Architecture
- Define agent responsibilities
- Plan communication patterns
- Set coordination strategy
- Design failure handling
Agent Design
- Single responsibility per agent
- Clear input/output contracts
- Independent operation
- Stateless when possible
Communication
- Message format definition
- Async message passing
- Result aggregation
- Error propagation
Coordination
- Central orchestrator
- Task queue management
- Priority handling
- Deadlock prevention
Monitoring
- Agent health checks
- Task completion tracking
- Performance metrics
- Error rates
Accessibility
Accessibility patterns for WCAG 2.2 compliance, keyboard focus management, and React Aria component patterns. Use when implementing screen reader support, keyboard navigation, ARIA patterns, focus traps, or accessible component libraries.
Analytics
Query cross-project usage analytics. Use when reviewing agent, skill, hook, or team performance across OrchestKit projects. Also replay sessions, estimate costs, and view model delegation trends.
Last updated on