Agent Orchestration
Agent orchestration patterns for agentic loops, multi-agent coordination, alternative frameworks, and multi-scenario workflows. Use when building autonomous agent loops, coordinating multiple agents, evaluating CrewAI/AutoGen/Swarm, or orchestrating complex multi-step scenarios.
Auto-activated — this skill loads automatically when Claude detects matching context.
Agent Orchestration
Comprehensive patterns for building and coordinating AI agents -- from single-agent reasoning loops to multi-agent systems and framework selection. Each category has individual rule files in rules/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Agent Loops | 2 | HIGH | ReAct reasoning, plan-and-execute, self-correction |
| Multi-Agent Coordination | 3 | CRITICAL | Supervisor routing, agent debate, result synthesis |
| Alternative Frameworks | 3 | HIGH | CrewAI crews, AutoGen teams, framework comparison |
| Multi-Scenario | 2 | MEDIUM | Parallel scenario orchestration, difficulty routing |
Total: 10 rules across 4 categories
Quick Start
# ReAct agent loop
async def react_loop(question: str, tools: dict, max_steps: int = 10) -> str:
history = REACT_PROMPT.format(tools=list(tools.keys()), question=question)
for step in range(max_steps):
response = await llm.chat([{"role": "user", "content": history}])
if "Final Answer:" in response.content:
return response.content.split("Final Answer:")[-1].strip()
if "Action:" in response.content:
action = parse_action(response.content)
result = await tools[action.name](*action.args)
history += f"\nObservation: {result}\n"
return "Max steps reached without answer"# Supervisor with fan-out/fan-in
async def multi_agent_analysis(content: str) -> dict:
agents = [("security", security_agent), ("perf", perf_agent)]
tasks = [agent(content) for _, agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
return await synthesize_findings(results)Agent Loops
Patterns for autonomous LLM reasoning: ReAct (Reasoning + Acting), Plan-and-Execute with replanning, self-correction loops, and sliding-window memory management.
Key decisions: Max steps 5-15, temperature 0.3-0.7, memory window 10-20 messages.
Multi-Agent Coordination
Fan-out/fan-in parallelism, supervisor routing with dependency ordering, conflict resolution (confidence-based or LLM arbitration), result synthesis, and CC Agent Teams (mesh topology for peer messaging in CC 2.1.33+).
Key decisions: 3-8 specialists, parallelize independent agents, use Task tool (star) for simple work, Agent Teams (mesh) for cross-cutting concerns.
Alternative Frameworks
CrewAI hierarchical crews with Flows (1.8+), OpenAI Agents SDK handoffs and guardrails (0.12+), Microsoft Agent Framework (AutoGen + SK merger), GPT-5.2-Codex for long-horizon coding, and AG2 for open-source flexibility.
Key decisions: Match framework to team expertise + use case. LangGraph for state machines, CrewAI for role-based teams, OpenAI SDK for handoff workflows, MS Agent for enterprise compliance.
Multi-Scenario
Orchestrate a single skill across 3 parallel scenarios (simple/medium/complex) with progressive difficulty scaling (1x/3x/8x), milestone synchronization, and cross-scenario result aggregation.
Key decisions: Free-running with checkpoints, always 3 scenarios, 1x/3x/8x exponential scaling, 30s/90s/300s time budgets.
Key Decisions
| Decision | Recommendation |
|---|---|
| Single vs multi-agent | Single for focused tasks, multi for decomposable work |
| Max loop steps | 5-15 (prevent infinite loops) |
| Agent count | 3-8 specialists per workflow |
| Framework | Match to team expertise + use case |
| Topology | Task tool (star) for simple; Agent Teams (mesh) for complex |
| Scenario count | Always 3: simple, medium, complex |
Common Mistakes
- No step limit in agent loops (infinite loops)
- No memory management (context overflow)
- No error isolation in multi-agent (one failure crashes all)
- Missing synthesis step (raw agent outputs not useful)
- Mixing frameworks in one project (complexity explosion)
- Using Agent Teams for simple sequential work (use Task tool)
- Sequential instead of parallel scenarios (defeats purpose)
Related Skills
ork:langgraph- LangGraph workflow patterns (supervisor, routing, state)function-calling- Tool definitions and executionork:task-dependency-patterns- Task management with Agent Teams workflow
Capability Details
react-loop
Keywords: react, reason, act, observe, loop, agent Solves:
- Implement ReAct pattern
- Create reasoning loops
- Build iterative agents
plan-execute
Keywords: plan, execute, replan, multi-step, autonomous Solves:
- Create plan then execute steps
- Implement replanning on failure
- Build goal-oriented agents
supervisor-coordination
Keywords: supervisor, route, coordinate, fan-out, fan-in, parallel Solves:
- Route tasks to specialized agents
- Run agents in parallel
- Aggregate multi-agent results
agent-debate
Keywords: debate, conflict, resolution, arbitration, consensus Solves:
- Resolve agent disagreements
- Implement LLM arbitration
- Handle conflicting outputs
result-synthesis
Keywords: synthesize, combine, aggregate, merge, summary Solves:
- Combine outputs from multiple agents
- Create executive summaries
- Score confidence across findings
crewai-patterns
Keywords: crewai, crew, hierarchical, delegation, role-based, flows Solves:
- Build role-based agent teams
- Implement hierarchical coordination
- Use Flows for event-driven orchestration
autogen-patterns
Keywords: autogen, microsoft, agent framework, teams, enterprise, a2a Solves:
- Build enterprise agent systems
- Use AutoGen/SK merged framework
- Implement A2A protocol
framework-selection
Keywords: choose, compare, framework, decision, which, crewai, autogen, openai Solves:
- Select appropriate framework
- Compare framework capabilities
- Match framework to requirements
scenario-orchestrator
Keywords: scenario, parallel, fan-out, difficulty, progressive, demo Solves:
- Run skill across multiple difficulty levels
- Implement parallel scenario execution
- Aggregate cross-scenario results
scenario-routing
Keywords: route, synchronize, milestone, checkpoint, scaling Solves:
- Route tasks by difficulty level
- Synchronize at milestones
- Scale inputs progressively
Rules (10)
Frameworks: Microsoft Agent Framework / AutoGen — HIGH
Microsoft Agent Framework (AutoGen + Semantic Kernel)
Enterprise multi-agent systems with RoundRobin/Selector teams, termination conditions, A2A protocol, and tool integration.
Team Setup
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
from autogen_ext.models.openai import OpenAIChatCompletionClient
# Create model client
model_client = OpenAIChatCompletionClient(model="gpt-5.2")
# Define agents
planner = AssistantAgent(
name="planner",
description="Plans complex tasks and breaks them into steps",
model_client=model_client,
system_message="You are a planning expert. Break tasks into actionable steps."
)
executor = AssistantAgent(
name="executor",
description="Executes planned tasks",
model_client=model_client,
system_message="You execute tasks according to the plan."
)
reviewer = AssistantAgent(
name="reviewer",
description="Reviews work and provides feedback",
model_client=model_client,
system_message="You review work. Say 'APPROVED' if satisfactory."
)
# Create team with termination condition
termination = TextMentionTermination("APPROVED")
team = RoundRobinGroupChat(
participants=[planner, executor, reviewer],
termination_condition=termination
)
# Run team
result = await team.run(task="Create a marketing strategy")Selector Group Chat
from autogen_agentchat.teams import SelectorGroupChat
# Selector chooses next speaker based on context
team = SelectorGroupChat(
participants=[analyst, writer, reviewer],
model_client=model_client,
termination_condition=termination
)Termination Conditions
from autogen_agentchat.conditions import (
TextMentionTermination,
MaxMessageTermination,
TokenUsageTermination,
TimeoutTermination,
OrTerminationCondition,
)
termination = OrTerminationCondition(
TextMentionTermination("DONE"),
MaxMessageTermination(max_messages=20),
TimeoutTermination(timeout_seconds=300)
)Tool Integration
from autogen_core.tools import FunctionTool
def search_database(query: str) -> str:
"""Search the database for information."""
results = db.search(query)
return json.dumps(results)
search_tool = FunctionTool(search_database, description="Search the database")
researcher = AssistantAgent(
name="researcher",
description="Researches information",
model_client=model_client,
tools=[search_tool],
system_message="Use the search tool to find information."
)State Management
# Save state
state = await team.save_state()
# Restore state
await team.load_state(state)
# Resume conversation
result = await team.run(task="Continue from where we left off")Agent-to-Agent Protocol (A2A)
from autogen_agentchat.protocols import A2AProtocol
protocol = A2AProtocol(
agent=my_agent,
endpoint="https://api.example.com/agent",
auth_token=os.environ["A2A_TOKEN"]
)
response = await protocol.send(
to="external-agent-id",
message="Process this request"
)Streaming
async for message in team.run_stream(task="Analyze this data"):
print(f"{message.source}: {message.content}")OpenAI Agents SDK (0.12+)
Alternative for OpenAI-native ecosystems:
from agents import Agent, Runner, handoff, RunConfig
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX
researcher_agent = Agent(
name="researcher",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a research specialist. Gather information and facts.
When research is complete, hand off to the writer.""",
model="gpt-5.2"
)
writer_agent = Agent(
name="writer",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a content writer. Create compelling content from research.""",
model="gpt-5.2"
)
orchestrator = Agent(
name="orchestrator",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You coordinate research and writing tasks.""",
model="gpt-5.2",
handoffs=[handoff(agent=researcher_agent), handoff(agent=writer_agent)]
)
async def run_workflow(task: str):
runner = Runner()
config = RunConfig(nest_handoff_history=True)
result = await runner.run(orchestrator, task, run_config=config)
return result.final_outputMigration from AutoGen 0.2
# Old AutoGen 0.2
# from autogen import AssistantAgent, UserProxyAgent
# New AutoGen 0.4+
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
# Key differences:
# - No UserProxyAgent needed for simple tasks
# - Teams replace GroupChat
# - Explicit termination conditions required
# - Model client separate from agentBest Practices
- Always set explicit termination conditions
- Team size: 3-5 agents optimal
- Clear role definitions in system_message
- One function per tool with clear descriptions
- Use try/except around team.run()
- Use run_stream() for real-time feedback
Incorrect — team without termination condition runs indefinitely:
team = RoundRobinGroupChat(
participants=[planner, executor, reviewer]
# No termination condition - infinite loop risk
)
result = await team.run(task="Complete task")Correct — explicit termination prevents infinite loops:
termination = OrTerminationCondition(
TextMentionTermination("APPROVED"),
MaxMessageTermination(max_messages=20)
)
team = RoundRobinGroupChat(
participants=[planner, executor, reviewer],
termination_condition=termination
)
result = await team.run(task="Complete task")Compare and select multi-agent frameworks based on complexity, use case, and production needs — HIGH
Framework Comparison & Selection
Decision matrix for choosing between multi-agent frameworks.
Framework Overview
| Framework | Best For | Key Features | Status |
|---|---|---|---|
| LangGraph 1.0.6 | Complex stateful workflows | Persistence, streaming, human-in-loop | Production |
| CrewAI 1.8.x | Role-based collaboration | Flows, hierarchical crews, a2a, HITL | Production |
| OpenAI Agents SDK 0.12+ | Provider-agnostic (100+ LLMs) | Handoffs, guardrails, tool search, MCPServerManager | Production |
| GPT-5.2-Codex | Long-horizon coding | Context compaction, project-scale, security | Production |
| MS Agent Framework | Enterprise | AutoGen+SK merger, A2A, compliance | Public Preview |
| AG2 | Open-source, flexible | Community fork of AutoGen | Active |
Feature Comparison
| Feature | LangGraph | CrewAI | OpenAI SDK | MS Agent |
|---|---|---|---|---|
| State Management | Excellent | Good | Basic | Good |
| Persistence | Built-in | Plugin | Manual | Built-in |
| Streaming | Native | Limited | Native | Native |
| Human-in-Loop | Native | Manual | Manual | Native |
| Memory | Via Store | Built-in | Manual | Manual |
| Observability | Langfuse/LangSmith | Limited | Tracing | Azure Monitor |
| Learning Curve | Steep | Easy | Medium | Medium |
| Production Ready | Yes | Yes | Yes | Q1 2026 |
Decision Tree
Start
|
+-- Need complex state machines?
| +-- Yes --> LangGraph
| +-- No
| |
+-- Role-based collaboration?
| +-- Yes --> CrewAI
| +-- No
| |
+-- OpenAI ecosystem only?
| +-- Yes --> OpenAI Agents SDK
| +-- No
| |
+-- Enterprise requirements?
| +-- Yes --> Microsoft Agent Framework
| +-- No
| |
+-- Long-horizon coding tasks?
| +-- Yes --> GPT-5.2-Codex
| +-- No
| |
+-- Open-source priority?
+-- Yes --> AG2
+-- No --> LangGraph (default)Use Case Matrix
| Use Case | Best Framework | Why |
|---|---|---|
| Complex state machines | LangGraph | Native StateGraph, persistence |
| Role-based teams | CrewAI | Built-in delegation, backstories |
| OpenAI-only projects | OpenAI SDK | Native integration, handoffs |
| Enterprise/compliance | MS Agent | Azure integration, A2A |
| Long-horizon coding | GPT-5.2-Codex | Context compaction, security |
| Quick prototypes | CrewAI | Minimal boilerplate |
| Long-running workflows | LangGraph | Checkpointing, recovery |
| Customer support bots | OpenAI SDK | Handoffs, guardrails |
| Research/experiments | AG2 | Open-source, flexible |
Cost Considerations
| Framework | Licensing | Infra Cost | LLM Cost |
|---|---|---|---|
| LangGraph | MIT | Self-host / LangGraph Cloud | Any LLM |
| CrewAI | MIT | Self-host | Any LLM |
| OpenAI SDK | MIT | Self-host | Any LLM (via LiteLLM) |
| MS Agent | MIT | Self-host / Azure | Any LLM |
| AG2 | Apache 2.0 | Self-host | Any LLM |
Performance Characteristics
| Framework | Cold Start | Latency | Throughput |
|---|---|---|---|
| LangGraph | ~100ms | Low | High |
| CrewAI | ~200ms | Medium | Medium |
| OpenAI SDK | ~50ms | Low | High |
| MS Agent | ~150ms | Medium | High |
Team Expertise Requirements
| Framework | Python | LLM | Infra |
|---|---|---|---|
| LangGraph | Expert | Expert | Medium |
| CrewAI | Beginner | Beginner | Low |
| OpenAI SDK | Medium | Medium | Low |
| MS Agent | Medium | Medium | High |
Migration Paths
From AutoGen to MS Agent Framework
# AutoGen 0.2 (old)
from autogen import AssistantAgent, UserProxyAgent
agent = AssistantAgent(name="assistant", llm_config=config)
# MS Agent Framework (new)
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(model="gpt-5.2")
agent = AssistantAgent(name="assistant", model_client=model_client)From Custom to LangGraph
# Custom orchestration (old)
async def workflow(task):
step1 = await agent1.run(task)
step2 = await agent2.run(step1)
return step2
# LangGraph (new)
from langgraph.graph import StateGraph
workflow = StateGraph(State)
workflow.add_node("agent1", agent1_node)
workflow.add_node("agent2", agent2_node)
workflow.add_edge("agent1", "agent2")GPT-5.2-Codex Decision Matrix
Task Duration > 1 hour?
+-- Yes --> GPT-5.2-Codex
+-- No
+-- Multiple files affected?
| +-- Yes --> GPT-5.2-Codex
| +-- No
| +-- Security review needed?
| +-- Yes --> GPT-5.2-Codex
| +-- No --> GPT-5.2 (standard)Recommendation Summary
- Default choice: LangGraph (most capable, production-proven)
- Fastest to prototype: CrewAI (minimal code, intuitive)
- OpenAI shops: OpenAI Agents SDK (native integration)
- Enterprise: Microsoft Agent Framework (compliance, Azure)
- Long-horizon coding: GPT-5.2-Codex (context compaction)
- Research: AG2 (open community, experimental features)
Common Mistakes
- Mixing frameworks in one project (complexity explosion)
- Ignoring framework maturity (beta vs production)
- No fallback strategy (framework lock-in)
- Overcomplicating simple tasks (use single agent)
Incorrect — using multi-agent framework for simple task:
# Overkill: LangGraph for single-step task
workflow = StateGraph(State)
workflow.add_node("summarize", summarize_node)
workflow.add_edge(START, "summarize")
workflow.add_edge("summarize", END)
app = workflow.compile()
result = await app.ainvoke({"text": "..."})Correct — single LLM call for simple task:
# Simple task = single agent
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-5.2")
result = await llm.ainvoke("Summarize this text: ...")Build role-based agent collaboration with CrewAI Flows, hierarchical crews, and structured outputs — HIGH
CrewAI Patterns (v1.8+)
Role-based multi-agent collaboration with Flows architecture, hierarchical crews, MCP tools, and async execution.
Hierarchical Crew
from crewai import Agent, Crew, Task, Process
from crewai.flow.flow import Flow, listen, start
# Manager coordinates the team
manager = Agent(
role="Project Manager",
goal="Coordinate team efforts and ensure project success",
backstory="Experienced project manager skilled at delegation",
allow_delegation=True,
memory=True,
verbose=True
)
# Specialist agents
researcher = Agent(
role="Researcher",
goal="Provide accurate research and analysis",
backstory="Expert researcher with deep analytical skills",
allow_delegation=False,
verbose=True
)
writer = Agent(
role="Writer",
goal="Create compelling content",
backstory="Skilled writer who creates engaging content",
allow_delegation=False,
verbose=True
)
# Manager-led task
project_task = Agent(
description="Create a comprehensive market analysis report",
expected_output="Executive summary, analysis, recommendations",
agent=manager
)
# Hierarchical crew
crew = Crew(
agents=[manager, researcher, writer],
tasks=[project_task],
process=Process.hierarchical,
manager_llm="gpt-5.2",
memory=True,
verbose=True
)
result = crew.kickoff()Flows Architecture (1.8+)
Event-driven orchestration with state management:
from crewai.flow.flow import Flow, listen, start, router
class ResearchFlow(Flow):
@start()
def generate_topic(self):
return "AI Safety"
@listen(generate_topic)
def research_topic(self, topic):
return f"Research findings on {topic}"
@router(research_topic)
def route_result(self, result):
if "sufficient" in result:
return "success"
return "retry"
@listen("success")
def handle_success(self):
return "Workflow completed"
@listen("retry")
def handle_retry(self):
return "Retrying..."
flow = ResearchFlow()
result = flow.kickoff()Parallel Execution with and_/or_
from crewai.flow.flow import Flow, listen, start, and_, or_
class ParallelFlow(Flow):
@start()
def task_a(self):
return "Result A"
@start()
def task_b(self):
return "Result B"
@listen(and_(task_a, task_b))
def combine_results(self):
"""Triggers when BOTH complete"""
return "Combined results"Structured Output
from pydantic import BaseModel
class ReportOutput(BaseModel):
title: str
summary: str
findings: list[str]
confidence: float
task = Agent(
description="Analyze market trends",
expected_output="Structured market analysis",
agent=analyst,
output_pydantic=ReportOutput
)
result = crew.kickoff()
report = result.pydanticTask Guardrails
from crewai.tasks import TaskOutput
def validate_length(result: TaskOutput) -> tuple[bool, any]:
if len(result.raw.split()) < 100:
return (False, "Content too brief, expand analysis")
return (True, result.raw)
task = Agent(
description="Write comprehensive analysis",
expected_output="Detailed analysis (100+ words)",
agent=writer,
guardrail=validate_length,
guardrail_max_retries=3
)MCP Tool Support (1.8+)
from crewai import Agent
agent = Agent(
role="Research Analyst",
goal="Research and analyze information",
backstory="Expert analyst",
mcps=[
"https://mcp.example.com/mcp?api_key=your_key",
"crewai-amp:financial-data",
]
)Decorator-Based Crew (Recommended)
from crewai import Agent, Crew, Task, CrewBase, agent, task, crew
@CrewBase
class ResearchCrew:
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
@agent
def researcher(self) -> Agent:
return Agent(config=self.agents_config['researcher'], tools=[search_tool])
@task
def research_task(self) -> Task:
return Agent(config=self.tasks_config['research'])
@crew
def crew(self) -> Crew:
return Crew(agents=self.agents, tasks=self.tasks, process=Process.sequential)
result = ResearchCrew().crew().kickoff(inputs={"topic": "AI Safety"})Best Practices
- Use Flows for complex multi-step workflows
- Prefer
@CrewBasedecorator-based definition - Enable structured outputs with
output_pydantic - Add guardrails for output validation
- Use
async_execution=Truefor independent tasks - Role clarity: each agent has distinct, non-overlapping role
- One clear deliverable per task
Incorrect — sequential process without hierarchical manager:
crew = Crew(
agents=[researcher, writer, reviewer],
tasks=[research_task, write_task, review_task],
process=Process.sequential # No delegation, rigid order
)Correct — hierarchical process with manager delegation:
crew = Crew(
agents=[manager, researcher, writer, reviewer],
tasks=[project_task],
process=Process.hierarchical, # Manager delegates dynamically
manager_llm="gpt-5.2"
)Generate multi-step plans before execution with replanning when conditions change — HIGH
Plan-and-Execute Pattern
Generate a multi-step plan first, then execute each step sequentially with the option to replan when conditions change.
Core Implementation
async def plan_and_execute(goal: str) -> str:
"""Create plan first, then execute steps."""
# 1. Generate plan
plan = await llm.chat([{
"role": "user",
"content": f"Create a step-by-step plan to: {goal}\n\nFormat as numbered list."
}])
steps = parse_plan(plan.content)
results = []
# 2. Execute each step
for i, step in enumerate(steps):
result = await execute_step(step, context=results)
results.append({"step": step, "result": result})
# 3. Check if replanning needed
if should_replan(results):
return await plan_and_execute(
f"{goal}\n\nProgress so far: {results}"
)
# 4. Synthesize final answer
return await synthesize(goal, results)TypeScript ReAct Agent
interface AgentStep {
thought: string;
action?: string;
actionInput?: unknown;
observation?: string;
}
interface AgentResult {
answer: string;
steps: AgentStep[];
totalCost: number;
iterations: number;
}
export async function reactAgent(
task: string,
options: { maxIterations?: number; verbose?: boolean } = {}
): Promise<AgentResult> {
const { maxIterations = 10, verbose = false } = options;
const steps: AgentStep[] = [];
let totalCost = 0;
const systemPrompt = `You are an autonomous agent that can use tools.
Available tools:
${tools.map(t => `- ${t.name}: ${t.description}`).join('\n')}
Use this format:
Thought: [Your reasoning]
Action: [tool name]
Action Input: {"param": "value"}
Observation: [Tool result]
When done: Answer: [Final answer]`;
const messages = [
{ role: 'system' as const, content: systemPrompt },
{ role: 'user' as const, content: task }
];
for (let i = 0; i < maxIterations; i++) {
const response = await openai.chat.completions.create({
model: 'gpt-5.2', messages, temperature: 0.1
});
const content = response.choices[0].message.content!;
totalCost += (response.usage!.total_tokens / 1000) * 0.01;
if (content.includes('Answer:')) {
const answer = content.split('Answer:')[1].trim();
return { answer, steps, totalCost, iterations: i + 1 };
}
const parsed = parseAgentResponse(content);
const step: AgentStep = { thought: parsed.thought };
if (parsed.action && parsed.actionInput) {
step.action = parsed.action;
step.actionInput = JSON.parse(parsed.actionInput);
step.observation = await executeTool(parsed.action, step.actionInput);
}
steps.push(step);
messages.push({ role: 'assistant', content });
if (step.observation) {
messages.push({ role: 'user', content: `Observation: ${step.observation}` });
}
}
throw new Error(`Agent exceeded max iterations (${maxIterations})`);
}Function Calling Agent (Alternative)
export async function functionCallingAgent(task: string): Promise<AgentResult> {
const steps: AgentStep[] = [];
let totalCost = 0;
const messages = [
{ role: 'system' as const, content: 'You are a helpful assistant with access to tools.' },
{ role: 'user' as const, content: task }
];
const openaiTools = tools.map(tool => ({
type: 'function' as const,
function: { name: tool.name, description: tool.description, parameters: tool.parameters }
}));
let iteration = 0;
while (iteration < 10) {
const response = await openai.chat.completions.create({
model: 'gpt-5.2', messages, tools: openaiTools
});
const message = response.choices[0].message;
totalCost += (response.usage!.total_tokens / 1000) * 0.01;
// No tool calls = final answer
if (!message.tool_calls) {
return { answer: message.content!, steps, totalCost, iterations: iteration + 1 };
}
messages.push(message as any);
for (const toolCall of message.tool_calls) {
const tool = tools.find(t => t.name === toolCall.function.name);
if (!tool) continue;
const args = JSON.parse(toolCall.function.arguments);
const result = await tool.execute(args);
steps.push({
thought: `Calling ${toolCall.function.name}`,
action: toolCall.function.name,
actionInput: args,
observation: JSON.stringify(result)
});
messages.push({ role: 'tool', tool_call_id: toolCall.id, content: JSON.stringify(result) });
}
iteration++;
}
throw new Error('Agent exceeded max iterations');
}When to Choose Which Pattern
| Pattern | Best For | Trade-off |
|---|---|---|
| ReAct | Exploratory tasks, research | More flexible, harder to control |
| Plan-Execute | Well-defined goals | Structured, but replanning adds cost |
| Function Calling | Production APIs | Most reliable, requires tool schemas |
| Self-Correction | Quality-critical output | Higher cost, better quality |
Incorrect — executing without planning first:
async def execute_goal(goal: str):
steps = ["step1", "step2", "step3"] # Hardcoded, no reasoning
for step in steps:
await execute_step(step)Correct — LLM generates plan before execution:
async def plan_and_execute(goal: str):
plan = await llm.chat([{"role": "user", "content": f"Create plan for: {goal}"}])
steps = parse_plan(plan.content)
for step in steps:
result = await execute_step(step)
if should_replan(result):
return await plan_and_execute(f"{goal}\nProgress: {result}")Implement ReAct pattern where agents reason, act via tools, observe, and iterate — HIGH
ReAct Pattern (Reasoning + Acting)
LLMs reason step-by-step, take actions via tools, observe results, and iterate until a final answer is reached.
ReAct Prompt Template
REACT_PROMPT = """You are an agent that reasons step by step.
For each step, respond with:
Thought: [your reasoning about what to do next]
Action: [tool_name(arg1, arg2)]
Observation: [you'll see the result here]
When you have the final answer:
Thought: I now have enough information
Final Answer: [your response]
Available tools: {tools}
Question: {question}
"""Python Implementation
async def react_loop(question: str, tools: dict, max_steps: int = 10) -> str:
"""Execute ReAct reasoning loop."""
history = REACT_PROMPT.format(tools=list(tools.keys()), question=question)
for step in range(max_steps):
response = await llm.chat([{"role": "user", "content": history}])
history += response.content
# Check for final answer
if "Final Answer:" in response.content:
return response.content.split("Final Answer:")[-1].strip()
# Extract and execute action
if "Action:" in response.content:
action = parse_action(response.content)
result = await tools[action.name](*action.args)
history += f"\nObservation: {result}\n"
return "Max steps reached without answer"Self-Correction Loop
async def self_correcting_agent(task: str, max_retries: int = 3) -> str:
"""Agent that validates and corrects its own output."""
for attempt in range(max_retries):
response = await llm.chat([{"role": "user", "content": task}])
# Self-validate
validation = await llm.chat([{
"role": "user",
"content": f"""Validate this response for the task: {task}
Response: {response.content}
Check for:
1. Correctness - Is it factually accurate?
2. Completeness - Does it fully answer the task?
3. Format - Is it properly formatted?
If valid, respond: VALID
If invalid, respond: INVALID: [what's wrong and how to fix]"""
}])
if "VALID" in validation.content:
return response.content
# Correct based on feedback
task = f"{task}\n\nPrevious attempt had issues: {validation.content}"
return response.content # Return best attemptMemory Management
class AgentMemory:
"""Sliding window memory for agents."""
def __init__(self, max_messages: int = 20):
self.messages = []
self.max_messages = max_messages
self.summary = ""
def add(self, role: str, content: str):
self.messages.append({"role": role, "content": content})
if len(self.messages) > self.max_messages:
self._compress()
def _compress(self):
"""Summarize oldest messages."""
old = self.messages[:10]
self.messages = self.messages[10:]
summary = summarize(old)
self.summary = f"{self.summary}\n{summary}"
def get_context(self) -> list:
"""Get messages with summary prefix."""
context = []
if self.summary:
context.append({
"role": "system",
"content": f"Previous context summary: {self.summary}"
})
return context + self.messagesKey Decisions
| Decision | Recommendation |
|---|---|
| Max steps | 5-15 (prevent infinite loops) |
| Temperature | 0.3-0.7 (balance creativity/focus) |
| Memory window | 10-20 messages |
| Validation frequency | Every 3-5 steps |
Common Mistakes
- No step limit (infinite loops)
- No memory management (context overflow)
- No error recovery (crashes on tool failure)
- Over-complex prompts (agent gets confused)
Incorrect — ReAct loop without step limit:
async def react_loop(question: str, tools: dict):
history = f"Question: {question}"
while True: # Infinite loop risk
response = await llm.chat([{"role": "user", "content": history}])
if "Final Answer:" in response.content:
return response.contentCorrect — max_steps prevents infinite loops:
async def react_loop(question: str, tools: dict, max_steps: int = 10):
history = f"Question: {question}"
for step in range(max_steps): # Bounded loop
response = await llm.chat([{"role": "user", "content": history}])
if "Final Answer:" in response.content:
return response.content.split("Final Answer:")[-1]
return "Max steps reached without answer"Resolve agent disagreements through confidence scores, LLM arbitration, or majority voting — HIGH
Agent Debate & Conflict Resolution
Patterns for handling disagreements between agents and establishing communication channels.
Conflict Resolution
async def resolve_conflicts(findings: list[dict]) -> list[dict]:
"""When agents disagree, resolve by confidence or LLM."""
conflicts = detect_conflicts(findings)
if not conflicts:
return findings
for conflict in conflicts:
# Option 1: Higher confidence wins
winner = max(conflict.agents, key=lambda a: a.confidence)
# Option 2: LLM arbitration
resolution = await llm.chat([{
"role": "user",
"content": f"""Two agents disagree:
Agent A ({conflict.agent_a.name}): {conflict.agent_a.finding}
Agent B ({conflict.agent_b.name}): {conflict.agent_b.finding}
Which is more likely correct and why?"""
}])
conflict.resolution = parse_resolution(resolution.content)
return apply_resolutions(findings, conflicts)Structured Conflict Detection
async def resolve_agent_conflicts(
findings: list[dict], llm: Any
) -> dict:
"""Resolve conflicts between agent outputs."""
conflicts = []
for i, f1 in enumerate(findings):
for f2 in findings[i+1:]:
if f1.get("recommendation") != f2.get("recommendation"):
conflicts.append((f1, f2))
if not conflicts:
return {"status": "no_conflicts", "findings": findings}
# LLM arbitration
resolution = await llm.ainvoke(f"""
Agents disagree. Determine best recommendation:
Agent 1: {conflicts[0][0]}
Agent 2: {conflicts[0][1]}
Provide: winner, reasoning, confidence (0-1)
""")
return {"status": "resolved", "resolution": resolution}Agent Communication Bus
class AgentBus:
"""Message passing between agents."""
def __init__(self):
self.messages = []
self.subscribers = {}
def publish(self, from_agent: str, message: dict):
"""Broadcast message to all agents."""
msg = {"from": from_agent, "data": message, "ts": time.time()}
self.messages.append(msg)
for callback in self.subscribers.values():
callback(msg)
def subscribe(self, agent_id: str, callback):
"""Register agent to receive messages."""
self.subscribers[agent_id] = callback
def get_history(self, agent_id: str = None) -> list:
"""Get message history, optionally filtered."""
if agent_id:
return [m for m in self.messages if m["from"] == agent_id]
return self.messagesResolution Strategies
| Strategy | When to Use | Trade-off |
|---|---|---|
| Confidence-based | Agents provide confidence scores | Fast but requires calibrated scores |
| LLM arbitration | Complex disagreements | Higher quality but adds LLM cost |
| Majority voting | 3+ agents on same question | Simple but requires odd count |
| Weighted consensus | Agents have different expertise | Best for specialized teams |
| Human-in-the-loop | High-stakes decisions | Most reliable but slowest |
Common Mistakes
- No timeout per agent (one slow agent blocks all)
- No error isolation (one failure crashes workflow)
- Over-coordination (too much overhead)
- Using Agent Teams for simple sequential work (use Task tool)
- Broadcasting when a direct message suffices (wastes tokens)
Incorrect — no conflict resolution strategy:
async def multi_agent_analysis(task: str):
results = await asyncio.gather(agent1(task), agent2(task))
return results # Return conflicting results without resolutionCorrect — LLM arbitration resolves conflicts:
async def multi_agent_analysis(task: str):
results = await asyncio.gather(agent1(task), agent2(task))
if results[0] != results[1]: # Conflict detected
resolution = await llm.chat([{
"role": "user",
"content": f"Agent 1: {results[0]}\nAgent 2: {results[1]}\nWhich is correct?"
}])
return resolution.content
return results[0]Coordinate specialist agents through a central supervisor with parallel execution and timeouts — CRITICAL
Supervisor Pattern
Central coordinator that routes tasks to specialist agents, supports parallel and sequential execution, and aggregates results.
Fan-Out/Fan-In
async def multi_agent_analysis(content: str) -> dict:
"""Fan-out to specialists, fan-in to synthesize."""
agents = [
("security", security_agent),
("performance", performance_agent),
("code_quality", quality_agent),
("architecture", architecture_agent),
]
# Fan-out: Run all agents in parallel
tasks = [agent(content) for _, agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful results
findings = [
{"agent": name, "result": result}
for (name, _), result in zip(agents, results)
if not isinstance(result, Exception)
]
# Fan-in: Synthesize findings
return await synthesize_findings(findings)Supervisor with Routing
class Supervisor:
"""Central coordinator that routes to specialists."""
def __init__(self, agents: dict):
self.agents = agents # {"security": agent, "performance": agent}
self.completed = []
async def run(self, task: str) -> dict:
"""Route task through appropriate agents."""
# 1. Determine which agents to use
plan = await self.plan_routing(task)
# 2. Execute in dependency order
results = {}
for agent_name in plan.execution_order:
if plan.can_parallelize(agent_name):
batch = plan.get_parallel_batch(agent_name)
batch_results = await asyncio.gather(*[
self.agents[name](task, context=results)
for name in batch
])
results.update(dict(zip(batch, batch_results)))
else:
results[agent_name] = await self.agents[agent_name](
task, context=results
)
return results
async def plan_routing(self, task: str) -> RoutingPlan:
"""Use LLM to determine agent routing."""
response = await llm.chat([{
"role": "user",
"content": f"""Task: {task}
Available agents: {list(self.agents.keys())}
Which agents should handle this task?
What order? Can any run in parallel?"""
}])
return parse_routing_plan(response.content)Supervisor-Worker with Timeout
class SupervisorCoordinator:
"""Central supervisor that routes tasks to worker agents."""
def __init__(self, workers: dict[str, Agent]):
self.workers = workers
self.execution_log: list[dict] = []
async def route_and_execute(
self, task: str, required_agents: list[str], parallel: bool = True
) -> dict[str, Any]:
context = {"task": task, "results": {}}
if parallel:
tasks = [self._run_worker(name, task, context) for name in required_agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
return dict(zip(required_agents, results))
else:
for name in required_agents:
context["results"][name] = await self._run_worker(name, task, context)
return context["results"]
async def _run_worker(self, name: str, task: str, context: dict) -> dict:
"""Execute single worker with timeout."""
try:
result = await asyncio.wait_for(
self.workers[name].run(task, context), timeout=30.0
)
self.execution_log.append({"agent": name, "status": "success", "result": result})
return result
except asyncio.TimeoutError:
return {"error": f"{name} timed out"}CC Agent Teams (CC 2.1.33+)
CC 2.1.33 introduces native Agent Teams with peer-to-peer messaging and mesh topology.
Star vs Mesh Topology
Star (Task tool): Mesh (Agent Teams):
Lead Lead (delegate)
/||\ / | \
/ || \ / | \
A B C D A <-> B <-> C
(no cross-talk) (peer messaging)Dual-Mode Decision Tree
Complexity Assessment:
+-- Score < 3.0 -> Task tool subagents (cheaper, simpler)
+-- Score 3.0-3.5 -> User choice (recommend Teams for cross-cutting)
+-- Score > 3.5 -> Agent Teams (GA since CC 2.1.33)Team Formation
# 1. Create team with shared task list
TeamCreate(team_name="feature-auth", description="User auth implementation")
# 2. Create tasks in shared list
TaskCreate(subject="Design API schema", description="...")
TaskCreate(subject="Build React components", description="...", addBlockedBy=["1"])
# 3. Spawn teammates
Agent(prompt="You are the backend architect...",
team_name="feature-auth", name="backend-dev",
subagent_type="backend-system-architect")Peer Messaging
# Direct message (default)
SendMessage(type="message", recipient="frontend-dev",
content="API contract: GET /users/:id -> {id, name, email}",
summary="API contract ready")
# Broadcast (expensive -- use sparingly)
SendMessage(type="broadcast",
content="Auth header format changed to Bearer",
summary="Breaking auth change")Cost Comparison
| Scenario | Task Tool | Agent Teams | Ratio |
|---|---|---|---|
| 3-agent review | ~150K tokens | ~400K tokens | 2.7x |
| 8-agent feature | ~500K tokens | ~1.2M tokens | 2.4x |
| 6-agent research | ~300K tokens | ~800K tokens | 2.7x |
Key Decisions
| Decision | Recommendation |
|---|---|
| Agent count | 3-8 specialists |
| Parallelism | Parallelize independent agents |
| Worker timeout | 30s default |
| Communication | Shared state, message bus, or SendMessage (CC 2.1.33+) |
| Topology | Task tool (star) for simple; Agent Teams (mesh) for complex |
Incorrect — sequential execution of independent agents:
async def analyze(content: str):
security_result = await security_agent(content) # Wait
perf_result = await performance_agent(content) # Wait
quality_result = await quality_agent(content) # Wait
return [security_result, perf_result, quality_result]Correct — parallel fan-out for independent agents:
async def analyze(content: str):
tasks = [
security_agent(content),
performance_agent(content),
quality_agent(content)
]
results = await asyncio.gather(*tasks) # Run in parallel
return resultsSynthesize parallel agent outputs into coherent actionable results with quality metrics — HIGH
Result Synthesis
Combine outputs from multiple parallel agents into coherent, actionable results.
Synthesis Pattern
async def synthesize_findings(findings: list[dict]) -> dict:
"""Combine multiple agent outputs into coherent result."""
# Group by category
by_category = {}
for f in findings:
cat = f.get("category", "general")
by_category.setdefault(cat, []).append(f)
# Synthesize each category
synthesis = await llm.chat([{
"role": "user",
"content": f"""Synthesize these agent findings into a coherent summary:
{json.dumps(by_category, indent=2)}
Output format:
- Executive summary (2-3 sentences)
- Key findings by category
- Recommendations
- Confidence score (0-1)"""
}])
return parse_synthesis(synthesis.content)Multi-Agent Collaboration (TypeScript)
export async function multiAgentCollaboration(
task: string,
agents: Agent[]
): Promise<AgentResult> {
const steps: AgentStep[] = [];
let totalCost = 0;
// 1. Coordinator plans the task
const planResponse = await openai.chat.completions.create({
model: 'gpt-5.2',
messages: [
{
role: 'system',
content: `You are a coordinator. Break down tasks and assign to agents:
${agents.map(a => `- ${a.name}: ${a.role}`).join('\n')}
Provide a numbered plan with agent assignments.`
},
{ role: 'user', content: `Task: ${task}\n\nProvide a step-by-step plan.` }
]
});
const plan = planResponse.choices[0].message.content!;
steps.push({ thought: 'Coordinator planning', observation: plan });
// 2. Execute agent subtasks in parallel
const agentResults = await Promise.all(
agents.map(async (agent) => {
const response = await openai.chat.completions.create({
model: 'gpt-5.2',
messages: [
{ role: 'system', content: agent.systemPrompt },
{ role: 'user', content: `Task: ${task}\n\nPlan:\n${plan}\n\nComplete your part.` }
]
});
return { agent: agent.name, result: response.choices[0].message.content! };
})
);
// 3. Synthesize results
const synthesisResponse = await openai.chat.completions.create({
model: 'gpt-5.2',
messages: [
{ role: 'system', content: 'Synthesize agent results into a coherent final answer.' },
{ role: 'user', content: `Task: ${task}\n\nAgent Results:\n${JSON.stringify(agentResults, null, 2)}` }
]
});
return {
answer: synthesisResponse.choices[0].message.content!,
steps,
totalCost,
iterations: agents.length + 2
};
}Aggregation Strategies
Comparative (Default)
Compare metrics across all agents:
{
"quality_comparison": {
"security_agent": {"score": 0.92, "findings": 3},
"perf_agent": {"score": 0.88, "findings": 5}
},
"consensus_items": ["Use parameterized queries", "Add rate limiting"],
"disagreements": []
}Pattern Extraction
Find common patterns:
{
"success_patterns": ["Caching strategy effective", "Batch size 50+ preferred"],
"failure_patterns": ["Timeout at >5000 items per batch"]
}Recommendation Engine
Generate actionable recommendations:
{
"priority_1": "Fix SQL injection in auth module (security + quality agree)",
"priority_2": "Add connection pooling (performance agent)",
"confidence": 0.91
}Cost Optimization
- Batch similar tasks to reduce overhead
- Cache agent results by task hash
- Use cheaper models for simple agents
- Parallelize independent agents always
- Max parallel agents: 8
Orchestration Checklist
- Define agent responsibilities (single responsibility per agent)
- Plan communication patterns (shared state, message bus, or SendMessage)
- Set coordination strategy (central orchestrator with task queue)
- Design failure handling (timeout per agent, error isolation)
- Agent health checks and performance metrics
Incorrect — returning raw agent outputs without synthesis:
async def multi_agent_analysis(task: str):
results = await asyncio.gather(agent1(task), agent2(task), agent3(task))
return results # Returns list of disconnected findingsCorrect — synthesizing results into coherent output:
async def multi_agent_analysis(task: str):
results = await asyncio.gather(agent1(task), agent2(task), agent3(task))
synthesis = await llm.chat([{
"role": "user",
"content": f"Synthesize these findings: {json.dumps(results)}"
}])
return parse_synthesis(synthesis.content)Test skills across three parallel scenarios with progressive difficulty and synchronized execution — MEDIUM
Multi-Scenario Orchestrator
Run a single skill across 3 parallel scenarios (simple/medium/complex) with synchronized execution and progressive difficulty.
Core Pattern
+---------------------------------------------------------------------+
| MULTI-SCENARIO ORCHESTRATOR |
+---------------------------------------------------------------------+
| [Coordinator] --+--> [Scenario 1: Simple] (Easy) |
| ^ | +--> [Skill Instance 1] |
| | +--> [Scenario 2: Medium] (Intermediate) |
| | | +--> [Skill Instance 2] |
| | +--> [Scenario 3: Complex] (Advanced) |
| | +--> [Skill Instance 3] |
| | |
| [State Manager] <---- All instances report progress |
| [Aggregator] --> Cross-scenario synthesis |
+---------------------------------------------------------------------+When to Use
| Scenario | Example |
|---|---|
| Skill demos | Show /ork:implement on simple, medium, complex tasks |
| Progressive testing | Validate skill scales with complexity |
| Comparative analysis | How does approach differ by difficulty? |
| Training/tutorials | Show skill progression from easy to hard |
LangGraph Implementation
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send
async def scenario_supervisor(state: ScenarioOrchestratorState) -> list[Command]:
"""Route to all 3 scenarios in parallel."""
for scenario_id in ["simple", "medium", "complex"]:
state[f"progress_{scenario_id}"] = ScenarioProgress(
scenario_id=scenario_id, status="pending",
start_time_ms=int(time.time() * 1000)
)
return [
Send("scenario_worker", {"scenario_id": "simple", **state}),
Send("scenario_worker", {"scenario_id": "medium", **state}),
Send("scenario_worker", {"scenario_id": "complex", **state}),
]
async def scenario_worker(state: ScenarioOrchestratorState) -> dict:
"""Execute one scenario."""
scenario_id = state.get("scenario_id")
progress = state[f"progress_{scenario_id}"]
scenario_def = state[f"scenario_{scenario_id}"]
progress.status = "running"
try:
result = await execute_skill_with_milestones(
skill_name=state["skill_name"],
scenario_def=scenario_def,
progress=progress, state=state
)
progress.status = "complete"
progress.elapsed_ms = int(time.time() * 1000) - progress.start_time_ms
return {f"progress_{scenario_id}": progress}
except Exception as e:
progress.status = "failed"
progress.errors.append({"message": str(e)})
return {f"progress_{scenario_id}": progress}
async def scenario_aggregator(state: ScenarioOrchestratorState) -> dict:
"""Collect all results and synthesize findings."""
aggregated = {
"orchestration_id": state["orchestration_id"],
"metrics": {},
"comparison": {},
"recommendations": []
}
for scenario_id in ["simple", "medium", "complex"]:
progress = state[f"progress_{scenario_id}"]
aggregated["metrics"][scenario_id] = {
"elapsed_ms": progress.elapsed_ms,
"items_processed": progress.items_processed,
"quality_scores": progress.quality_scores,
}
return {"final_results": aggregated}
# Build graph
graph = StateGraph(ScenarioOrchestratorState)
graph.add_node("supervisor", scenario_supervisor)
graph.add_node("scenario_worker", scenario_worker)
graph.add_node("aggregator", scenario_aggregator)
graph.add_edge(START, "supervisor")
graph.add_edge("scenario_worker", "aggregator")
graph.add_edge("aggregator", END)
app = graph.compile(checkpointer=checkpointer)Skill-Agnostic Template
from abc import ABC, abstractmethod
class SkillOrchestrator(ABC):
"""Abstract orchestrator for any user-invocable skill."""
def __init__(self, skill_name: str, skill_version: str):
self.skill_name = skill_name
self.skill_version = skill_version
@abstractmethod
async def invoke_skill(self, input_data: list[dict], scenario_params: dict) -> dict:
"""Invoke your skill on input data."""
pass
@abstractmethod
def get_scenario_configs(self) -> dict[str, dict]:
"""Return configs for simple/medium/complex."""
pass
@abstractmethod
def calculate_quality_metrics(self, results: list[dict], metric_names: list[str]) -> dict:
"""Calculate quality metrics from results."""
pass
async def orchestrate(self, orchestration_id: str) -> dict:
"""Run all 3 scenarios in parallel and aggregate."""
results = await asyncio.gather(
self.run_scenario("simple", orchestration_id),
self.run_scenario("medium", orchestration_id),
self.run_scenario("complex", orchestration_id),
return_exceptions=True
)
return self.aggregate_results(results)Difficulty Scaling
| Level | Complexity | Input Size | Time Budget | Quality |
|---|---|---|---|---|
| Simple | 1x | Small (10-100) | 30s | Basic |
| Medium | 3x | Medium (30-300) | 90s | Good |
| Complex | 8x | Large (80-800) | 300s | Excellent |
Output Example
{
"orchestration_id": "demo-001",
"quality_comparison": {
"simple": 0.92, "medium": 0.88, "complex": 0.84
},
"scaling_analysis": {
"time_per_item_ms": {
"simple": 0.012, "medium": 0.012, "complex": 0.032
},
"recommendation": "Sublinear scaling up to 3x, superlinear at 8x"
}
}Common Mistakes
- Sequential instead of parallel (defeats purpose)
- No synchronization (results appear disjointed)
- Unclear difficulty scaling (differ in scale, not approach)
- Missing aggregation (individual results lack comparative insights)
Incorrect — running scenarios sequentially:
async def orchestrate(skill_name: str):
simple = await run_scenario("simple", skill_name) # Wait
medium = await run_scenario("medium", skill_name) # Wait
complex = await run_scenario("complex", skill_name) # Wait
return [simple, medium, complex]Correct — parallel execution of all scenarios:
async def orchestrate(skill_name: str):
results = await asyncio.gather(
run_scenario("simple", skill_name),
run_scenario("medium", skill_name),
run_scenario("complex", skill_name)
)
return aggregate_results(results)Synchronize milestones, scale difficulty, and recover from failures across multi-scenario orchestration — MEDIUM
Scenario Routing & Synchronization
Milestone synchronization modes, difficulty scaling strategies, checkpointing, and failure recovery for multi-scenario orchestration.
Synchronization Modes
| Mode | Description | Use When |
|---|---|---|
| Free-running | All run independently | Demo videos, production |
| Milestone-sync | Wait at 30%, 70%, 100% | Comparative analysis |
| Lock-step | All proceed together | Training, tutorials |
Milestone Synchronization
async def synchronize_at_milestone(
milestone_pct: int,
state: ScenarioOrchestratorState,
timeout_seconds: int = 30
) -> bool:
"""Wait for all scenarios to reach milestone."""
start = time.time()
while time.time() - start < timeout_seconds:
simple_at = milestone_pct in state["progress_simple"].milestones_reached
medium_at = milestone_pct in state["progress_medium"].milestones_reached
complex_at = milestone_pct in state["progress_complex"].milestones_reached
if simple_at and medium_at and complex_at:
print(f"[SYNC] All scenarios reached {milestone_pct}%")
return True
if any(state[f"progress_{s}"].status == "failed"
for s in ["simple", "medium", "complex"]):
print(f"[SYNC] A scenario failed, proceeding without sync")
return False
await asyncio.sleep(0.5)
print(f"[SYNC] Timeout at {milestone_pct}%, proceeding")
return FalseInput Scaling Strategies
Linear Scaling (I/O-bound skills)
Simple: 100 items
Medium: 300 items (3x)
Complex: 800 items (8x)
Time: O(n) -- expected medium ~3x simpleAdaptive Scaling (per-skill tuning)
SKILL_SCALING_PROFILES = {
"performance-testing": {
"scaling": "linear",
"simple": 10, "medium": 30, "complex": 80
},
"security-scanning": {
"scaling": "sublinear",
"simple": 20, "medium": 100, "complex": 500
},
"data-transformation": {
"scaling": "quadratic",
"simple": 100, "medium": 200, "complex": 300
}
}Complexity Detection
# Calculate actual time complexity
simple_tpi = simple_time / simple_size # time per item
medium_tpi = medium_time / medium_size
complex_tpi = complex_time / complex_size
ratio = complex_tpi / simple_tpi # >2 = superlinearFailure Recovery
One Scenario Fails (Independent)
try:
result = await invoke_skill(batch)
except Exception as e:
progress.errors.append({"message": str(e), "batch_index": i})
# Don't raise -- let other scenarios continueTimeout Handling
async def invoke_skill_with_timeout(skill, input_data, timeout_seconds):
try:
return await asyncio.wait_for(
invoke_skill(skill, input_data),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
return {
"processed": len(input_data),
"results": [],
"error": "timeout",
"quality_score": 0.0,
}All Scenarios Fail (Systematic)
async def orchestrator_with_recovery(initial_state):
result = await app.ainvoke(initial_state)
all_failed = all(
state[f"progress_{s}"].status == "failed"
for s in ["simple", "medium", "complex"]
)
if all_failed:
# 1. Reduce resource contention
# 2. Retry with smaller batches
# 3. Or abort with diagnostic info
return retry_with_reduced_load(initial_state)Checkpointing
Scenario-Level Checkpoints
INSERT INTO scenario_checkpoints (
orchestration_id, scenario_id, milestone_pct, elapsed_ms, state_snapshot
) VALUES (
'demo-001', 'medium', 30, 3200, '{"items": 90, "results": [...]}'
);Full-State Snapshots
async def checkpoint_full_state(state: ScenarioOrchestratorState):
checkpoint_data = {
"orchestration_id": state["orchestration_id"],
"timestamp": datetime.now().isoformat(),
"progress_simple": state["progress_simple"].to_dict(),
"progress_medium": state["progress_medium"].to_dict(),
"progress_complex": state["progress_complex"].to_dict(),
}
await db.insert("full_state_checkpoints", checkpoint_data)Quality Metrics Framework
Functional Metrics (per-skill)
{
"performance-testing": {
"latency_p95_ms": {"target": "<500ms", "weight": 0.5},
"error_rate": {"target": "<1%", "weight": 0.5},
},
"security-scanning": {
"vulnerabilities_found": {"target": ">0", "weight": 0.3},
"coverage_pct": {"target": "100%", "weight": 0.7},
}
}Comparative Metrics
{
"quality_scaling": {
"formula": "complex_quality / simple_quality",
"expected": 1.0,
"acceptable": ">0.8"
},
"time_efficiency": {
"formula": "simple_tpi / complex_tpi",
"expected": 1.0,
"acceptable": ">0.5"
}
}Multi-Host Execution
For greater parallelism, run scenarios on different machines sharing the same database:
# Host 1: Coordinator + Simple
python coordinator.py
# Host 2: Medium (different machine, same DB)
export DATABASE_URL="postgresql://user:pass@coordinator-host/orchestkit"
export SCENARIO_ID=medium
python run_scenario.py
# Host 3: Complex
export SCENARIO_ID=complex
python run_scenario.pyKey Decisions
| Decision | Recommendation |
|---|---|
| Synchronization mode | Free-running with checkpoints |
| Scenario count | Always 3: simple, medium, complex |
| Input scaling | 1x, 3x, 8x (exponential) |
| Time budgets | 30s, 90s, 300s |
| Checkpoint frequency | Every milestone + completion |
Incorrect — no timeout on skill invocation:
async def run_scenario(scenario_id: str, skill_name: str):
result = await invoke_skill(skill_name, get_input(scenario_id)) # Hangs forever
return resultCorrect — timeout prevents infinite hangs:
async def run_scenario(scenario_id: str, skill_name: str):
timeout = {"simple": 30, "medium": 90, "complex": 300}[scenario_id]
try:
result = await asyncio.wait_for(
invoke_skill(skill_name, get_input(scenario_id)),
timeout=timeout
)
return result
except asyncio.TimeoutError:
return {"error": "timeout", "scenario_id": scenario_id}References (11)
Architectural Patterns
Architectural Patterns for Multi-Scenario Orchestration
Deep patterns and design decisions for production multi-scenario demos.
Pattern 1: Three-Tier Synchronization
Tier 1: Free-Running (Baseline)
Each scenario runs independently, no blocking.
Time →
─────────────────────────────────────────────────┐
Simple ███████████████ Complete at 1.2s │
└─────────────────────────────────────┘ │
│
Medium ██████████████████████░░░░ In progress at 3.5s
└──────────────────────────────────┘ │
│
Complex ██████████░░░░░░░░░░░░░░░░ In progress at 25.7s
└─────────────────────────────────────┘ │
─────────────────────────────────────────────────┘Advantages:
- Realistic—shows natural skill behavior
- Tolerates slowness in one scenario
- Lower synchronization overhead
Implementation:
# Each scenario runs its own event loop
# No waiting between scenarios
# Checkpoints are independentTier 2: Milestone Synchronization
Scenarios pause at checkpoints (30%, 50%, 70%, 90%) to allow others to catch up.
Time →
─────────────────────────────────────────────────┐
Simple ███ PAUSE ███ PAUSE ███ Complete │
└───┬────────┬────────┬────────────────┘│
│ │ │ │
Medium ██ PAUSE ██ PAUSE ██████░░░░ In-prog │
└────┬────────┬──────────────────────┘│
│ │ │
Complex █ PAUSE █ PAUSE ██░░░░░░░░░░░░░░░ │
└──┬────────┬──────────────────────────┘│
─────────────────────────────────────────────────┘Advantages:
- Synchronized checkpoints for state capture
- Better for demos (shows progression together)
- Easier to explain ("all at 30%")
Implementation:
async def synchronize_at_milestone(milestone_pct, timeout_seconds=60):
while time.time() - start < timeout_seconds:
if all_scenarios_at_milestone:
return True
await asyncio.sleep(0.5)
# Timeout: proceed anyway (don't block forever)
return FalseTier 3: Lock-Step (Strict Synchronization)
All scenarios advance together, slowest determines pace.
Time →
─────────────────────────────────────────────────┐
Step 1: Simple ███ | Medium ███ | Complex █ │
└────────────────────────────────────────┘│
Step 2: Simple ███ | Medium ███ | Complex █ │
└────────────────────────────────────────┘│
Step 3: All complete together │
─────────────────────────────────────────────────┘Advantages:
- Perfect synchronization for demos
- Easy to explain ("all scenarios complete together")
Disadvantages:
- Complex scenario blocks others (1-2 min delays)
- Unrealistic performance representation
Recommendation: Use Tier 1 (Free-Running) for production, Tier 2 (Milestone) for interactive demos.
Pattern 2: Input Scaling Strategies
Strategy A: Linear Scaling (Additive)
Simple: 100 items
Medium: 100 + 200 = 300 items (+200%)
Complex: 300 + 500 = 800 items (+267%)
Time complexity: O(n)
Expected medium time ≈ 3x simple
Expected complex time ≈ 8x simpleBest for: I/O-bound skills (API calls, database queries)
Strategy B: Exponential Scaling (Multiplicative)
Simple: 100 items
Medium: 100 × 3 = 300 items (3x)
Complex: 100 × 8 = 800 items (8x)
Time complexity: O(n) or O(n log n)
Expected medium time ≈ 3x simple (if linear)
Expected complex time ≈ 8x simple (if linear)Best for: Batch processing, LLM calls
Strategy C: Quadratic Scaling
Simple: 100 items
Medium: 300 items (3x)
Complex: 800 items (8x)
But if algorithm is O(n²):
Expected medium time ≈ 9x simple
Expected complex time ≈ 64x simpleDetection:
# Calculate actual time complexity
simple_time = 1.2 # seconds
medium_time = 3.5
complex_time = 25.7
simple_size = 100
medium_size = 300
complex_size = 800
# Time per item
simple_tpi = simple_time / simple_size # 0.012 s/item
medium_tpi = medium_time / medium_size # 0.012 s/item
complex_tpi = complex_time / complex_size # 0.032 s/item
# Ratio indicates scaling behavior
ratio = complex_tpi / simple_tpi # 2.67 → O(n log n) or worseStrategy D: Adaptive Scaling
Choose scaling based on skill characteristics:
SKILL_SCALING_PROFILES = {
"performance-testing": {
"scaling": "linear",
"simple": 10,
"medium": 30,
"complex": 80
},
"security-scanning": {
"scaling": "sublinear", # Gets faster with caching
"simple": 20,
"medium": 100,
"complex": 500
},
"data-transformation": {
"scaling": "quadratic", # O(n²) worst case
"simple": 100,
"medium": 200, # Limit increase
"complex": 300
}
}Pattern 3: Quality Metrics Framework
Metric Category 1: Functional Metrics
What the skill is designed to measure:
{
"performance-testing": {
"latency_p95_ms": {"target": "<500ms", "weight": 0.5},
"error_rate": {"target": "<1%", "weight": 0.5},
},
"security-scanning": {
"vulnerabilities_found": {"target": ">0", "weight": 0.3},
"coverage_pct": {"target": "100%", "weight": 0.7},
}
}Metric Category 2: Comparative Metrics
How scenarios compare:
{
"quality_scaling": {
"formula": "complex_quality / simple_quality",
"expected": 1.0, # Expect no degradation
"acceptable": ">0.8"
},
"time_efficiency": {
"formula": "simple_tpi / complex_tpi",
"expected": 1.0, # Linear scaling
"acceptable": ">0.5"
},
"resource_efficiency": {
"formula": "quality_per_second_complex / quality_per_second_simple",
"expected": 0.8, # Complex less efficient (higher overhead)
"acceptable": ">0.5"
}
}Metric Category 3: Stability Metrics
Consistency across scenarios:
{
"quality_variance": {
"formula": "stdev(simple_quality, medium_quality, complex_quality)",
"expected": "<0.05",
"interpretation": "Low variance = stable algorithm"
},
"error_consistency": {
"formula": "all_scenarios_error_rate < threshold",
"expected": True,
"interpretation": "Same error rate across loads"
}
}Pattern 4: Failure Modes & Recovery
Failure Mode 1: One Scenario Fails (Independent)
Simple ███████████████ Complete ✓
Medium ██████████ FAILED ✗
Complex ███ In progress...
Recovery:
• Medium stores checkpoint at failure point
• Can be restarted independently
• Simple/Complex continue
• Aggregator combines partial resultsImplementation:
# Isolate failures
try:
result = await invoke_skill(batch)
except Exception as e:
progress.errors.append({"message": str(e), "batch_index": i})
# Don't raise—let other scenarios continue
# Report but don't block
if progress.errors:
print(f"⚠ {scenario_id} had {len(progress.errors)} errors")Failure Mode 2: All Scenarios Fail (Systematic)
Simple ███ FAILED ✗
Medium ███ FAILED ✗
Complex ███ FAILED ✗
Possible causes:
• Skill has a bug
• Resource limit exceeded
• Network/database unavailableRecovery:
async def orchestrator_with_recovery(initial_state):
"""Attempt recovery if all scenarios fail."""
result = await app.ainvoke(initial_state)
all_failed = all(
state[f"progress_{s}"].status == "failed"
for s in ["simple", "medium", "complex"]
)
if all_failed:
print("All scenarios failed—attempting recovery...")
# 1. Reduce resource contention
# 2. Retry with smaller batches
# 3. Or abort with diagnostic info
return retry_with_reduced_load(initial_state)Failure Mode 3: Timeout (Skill Takes Too Long)
Simple ███████████ Complete in 1.2s (budget: 30s) ✓
Medium ██████░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ TIMEOUT ✗ (budget: 90s)
Complex █░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░░ IN PROGRESS
Recovery:
• Medium: Cancel at 90s, return partial results
• Complex: Continue until 300s timeoutImplementation:
async def invoke_skill_with_timeout(skill, input_data, timeout_seconds):
try:
return await asyncio.wait_for(
invoke_skill(skill, input_data),
timeout=timeout_seconds
)
except asyncio.TimeoutError:
print(f"Timeout after {timeout_seconds}s, returning partial results")
return {
"processed": len(input_data),
"results": [],
"error": "timeout",
"quality_score": 0.0,
}Pattern 5: Observability & Monitoring
Monitoring Strategy 1: Real-Time Progress
# Stream progress from PostgreSQL checkpoints
async def monitor_real_time():
while orchestration_running:
progress = await db.query("""
SELECT scenario_id, MAX(progress_pct), MAX(elapsed_ms)
FROM scenario_checkpoints
WHERE orchestration_id = $1
GROUP BY scenario_id
""", orchestration_id)
for scenario_id, progress_pct, elapsed_ms in progress:
bar = "█" * int(progress_pct / 5) + "░" * (20 - int(progress_pct / 5))
print(f"{scenario_id}: │{bar}│ {progress_pct:.0f}%")
await asyncio.sleep(2)Monitoring Strategy 2: Comparative Timeline
0s 10s 20s 30s 40s
Simple: |████████| (complete)
Medium: | |██████████| (in progress)
Complex: | |████| (in progress)
|─────────────────────────────────────|
Milestones:
Simple: ✓ 30% @ 0.4s ✓ 50% @ 0.6s ✓ 70% @ 0.9s ✓ 100% @ 1.2s
Medium: ✓ 30% @ 3.2s ✓ 50% @ 5.1s ⏳ 70% @ 8.3s ⏳ In progress
Complex: ✓ 30% @ 9.1s ⏳ 50% in progressMonitoring Strategy 3: Quality Trend
Quality Score (0-1)
1.0 ├─────────────────
│ Simple ████░░░░░░
0.8 ├───────────────────
│ Medium ██████░░
0.6 ├───────────────────
│ Complex ███░░░░
0.4 ├───────────────────
│
0.2 ├───────────────────
└─────────────────────
0% 30% 50% 70% 100%
ProgressPattern 6: Result Aggregation Strategies
Aggregation Type 1: Comparative (Default)
Compare metrics across all 3 scenarios:
{
"quality_comparison": {
"simple": {"latency_p95": 120, "score": 0.92},
"medium": {"latency_p95": 145, "score": 0.88},
"complex": {"latency_p95": 185, "score": 0.84}
},
"scaling_analysis": {
"quality_degradation": "8% from simple to complex",
"time_growth": "linear (as expected)",
"recommendation": "Quality acceptable, can scale to complex"
}
}Aggregation Type 2: Pattern Extraction
Find common patterns across scenarios:
{
"success_patterns": [
"Caching strategy effective at all scales",
"Batch size of 50+ preferred",
"Memory usage stays below 512MB"
],
"failure_patterns": [
"Timeout at >5000 items per batch",
"Quality drops with skewed data distribution"
]
}Aggregation Type 3: Recommendation Engine
Suggest optimal difficulty for production:
{
"recommended_difficulty": "medium",
"reasoning": [
"Simple: Insufficient load to detect bottlenecks",
"Medium: Good balance of realism and speed",
"Complex: Takes too long for frequent testing (300s)"
],
"production_scaling": {
"estimated_items_per_request": 50,
"estimated_response_time_ms": 450,
"required_concurrency_support": 10
}
}Pattern 7: Checkpointing Strategy
Checkpoint Type 1: Scenario-Level
Save progress at each scenario milestone:
INSERT INTO scenario_checkpoints (
orchestration_id, scenario_id, milestone_pct, elapsed_ms, state_snapshot
) VALUES (
'demo-001', 'medium', 30, 3200, {'items': 90, 'results': [...]}
);Use case: Resume interrupted scenario
Checkpoint Type 2: Milestone-Level
Save synchronized state across all scenarios:
INSERT INTO orchestration_milestones (
orchestration_id, milestone_pct, timestamp, simple_status, medium_status, complex_status
) VALUES (
'demo-001', 30, NOW(), 'complete', 'paused', 'in_progress'
);Use case: Track synchronization progress
Checkpoint Type 3: Full-State
Periodic snapshots for recovery:
async def checkpoint_full_state(state: ScenarioOrchestratorState):
"""Save complete state to disk."""
checkpoint_data = {
"orchestration_id": state["orchestration_id"],
"timestamp": datetime.now().isoformat(),
"progress_simple": state["progress_simple"].to_dict(),
"progress_medium": state["progress_medium"].to_dict(),
"progress_complex": state["progress_complex"].to_dict(),
}
await db.insert("full_state_checkpoints", checkpoint_data)Use case: Complete recovery from any point
Pattern 8: Cost & Performance Analysis
Cost Analysis
def estimate_orchestration_cost(
scenarios: dict[str, ScenarioDefinition]
) -> dict:
"""Estimate total execution cost."""
# LLM cost (if using Claude)
llm_cost_per_scenario = {
"simple": estimate_tokens(100) * 0.001, # ~$0.002
"medium": estimate_tokens(300) * 0.001, # ~$0.005
"complex": estimate_tokens(800) * 0.001, # ~$0.010
}
# Compute cost (if cloud)
compute_cost = {
"simple": 30 / 3600 * 0.10, # 30s @ $0.10/hour
"medium": 90 / 3600 * 0.10, # 90s @ $0.10/hour
"complex": 300 / 3600 * 0.10, # 300s @ $0.10/hour
}
# Database cost
db_cost = 0.001 # Negligible for checkpointing
return {
"llm_cost": sum(llm_cost_per_scenario.values()),
"compute_cost": sum(compute_cost.values()),
"db_cost": db_cost,
"total_cost": sum(llm_cost_per_scenario.values()) + sum(compute_cost.values()) + db_cost,
"cost_per_scenario": llm_cost_per_scenario,
}Performance Analysis
def analyze_performance(
results: dict
) -> dict:
"""Analyze orchestration performance."""
return {
"total_execution_time_minutes": (sum(r["elapsed_ms"] for r in results.values()) / 60000),
"critical_path_seconds": max(r["elapsed_ms"] for r in results.values()) / 1000,
"parallel_efficiency": (
(sum(r["elapsed_ms"] for r in results.values()) / 1000) /
(max(r["elapsed_ms"] for r in results.values()) / 1000)
),
"cost_per_quality_point": estimate_orchestration_cost({}) / avg_quality_score,
}Key Architectural Decisions
| Decision | Choice | Rationale |
|---|---|---|
| Synchronization | Milestone-based (Tier 2) | Balance between realism and demo experience |
| Input Scaling | 1x, 3x, 8x (exponential) | Exponential because most skills have overhead |
| Quality Metrics | Multiple per-skill metrics | Single metric insufficient to assess quality |
| Failure Recovery | Isolation + checkpointing | Partial results preferable to total failure |
| Monitoring | Real-time DB queries + Langfuse | Distributed state requires DB |
| Checkpoint Frequency | Every milestone + completion | Balance between safety and overhead |
| Aggregation | Comparative + recommendations | Provide actionable insights |
| Skill Abstraction | Generic orchestrator base class | Template for ANY skill |
References
langgraph-implementation.md- Python implementation detailsclaude-code-instance-management.md- Multi-terminal setupstate-machine-design.md- Detailed state transitionsskill-agnostic-template.md- Template for new skills
Claude Code Instance Management
Claude Code Instance Management: Multi-Scenario Demos
Structure 3 parallel Claude Code terminal instances for simultaneous scenario execution with shared state synchronization.
Instance Architecture
┌─────────────────────────────────────────────────────────────────────┐
│ COORDINATOR PROCESS (Python) │
│ (Runs orchestrator graph) │
├─────────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Terminal 1 │ │ Terminal 2 │ │ Terminal 3 │ │
│ │ (Simple) │ │ (Medium) │ │ (Complex) │ │
│ │ │ │ │ │ │ │
│ │ Session: │ │ Session: │ │ Session: │ │
│ │ simple-123 │ │ medium-123 │ │ complex-123│ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │ │ │ │
│ Claude Code instances Claude Code instances Claude Code │
│ (3 parallel processes) (3 parallel processes) instance │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ PostgreSQL Checkpoint Table │ │
│ │ (Shared state synchronization across instances) │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘Setup Instructions
Step 1: Prepare the Project
Ensure your project has the orchestrator graph and shared utilities:
# At project root
mkdir -p backend/app/workflows/multi_scenario
cp src/skills/multi-scenario-orchestration/references/langgraph-implementation.py \
backend/app/workflows/multi_scenario/orchestrator.py
# Create coordinator script
cat > backend/app/workflows/multi_scenario/coordinator.py << 'EOF'
"""
Main coordinator that launches and monitors 3 Claude Code instances.
"""
import asyncio
import subprocess
import os
from pathlib import Path
PROJECT_ROOT = Path(__file__).parent.parent.parent.parent
SCENARIOS = ["simple", "medium", "complex"]
SKILL_NAME = "your-skill-name" # Change this
async def launch_scenario_instance(scenario_id: str, orchestration_id: str):
"""Launch one Claude Code instance for a scenario."""
env = os.environ.copy()
env["SCENARIO_ID"] = scenario_id
env["ORCHESTRATION_ID"] = orchestration_id
env["PROJECT_ROOT"] = str(PROJECT_ROOT)
# Launch Claude Code instance
process = subprocess.Popen(
[
"claude", "code",
str(PROJECT_ROOT),
"--session", f"scenario-{scenario_id}-{orchestration_id}",
"--skill", SKILL_NAME,
],
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
print(f"[COORDINATOR] Launched {scenario_id} instance (PID: {process.pid})")
return process
async def monitor_instances(processes: dict):
"""Monitor all instances for completion."""
while any(p.poll() is None for p in processes.values()):
for scenario_id, process in processes.items():
if process.poll() is not None:
print(f"[COORDINATOR] {scenario_id} instance completed")
await asyncio.sleep(1)
async def main():
orchestration_id = "demo-001"
print(f"[COORDINATOR] Starting orchestration {orchestration_id}")
print(f"[COORDINATOR] Launching 3 parallel instances...")
# Launch all instances
processes = {}
for scenario_id in SCENARIOS:
process = await launch_scenario_instance(scenario_id, orchestration_id)
processes[scenario_id] = process
print(f"[COORDINATOR] All instances launched. Monitoring...")
# Monitor
await monitor_instances(processes)
print(f"[COORDINATOR] All instances completed")
if __name__ == "__main__":
asyncio.run(main())
EOFStep 2: Create Scenario Runner Script
Create /backend/app/workflows/multi_scenario/run_scenario.py:
"""
Runner for single scenario. Invoked by Claude Code instance.
Sets up environment from SCENARIO_ID and ORCHESTRATION_ID env vars.
"""
import os
import asyncio
from orchestrator import (
build_scenario_orchestrator,
ScenarioOrchestratorState,
ScenarioDefinition,
ScenarioProgress,
)
from langgraph.checkpoint.postgres import PostgresSaver
async def run_scenario():
# Read from environment
scenario_id = os.getenv("SCENARIO_ID", "simple")
orchestration_id = os.getenv("ORCHESTRATION_ID", "demo-001")
project_root = os.getenv("PROJECT_ROOT", ".")
print(f"[{scenario_id.upper()}] Starting scenario execution")
print(f"[{scenario_id.upper()}] Orchestration ID: {orchestration_id}")
# Setup checkpointer
db_url = os.getenv("DATABASE_URL", "postgresql://localhost/orchestkit")
checkpointer = PostgresSaver.from_conn_string(db_url)
# Build orchestrator
app = build_scenario_orchestrator(checkpointer=checkpointer)
# Prepare scenario definitions
configs = {
"simple": {
"complexity_multiplier": 1.0,
"input_size": 100,
"time_budget_seconds": 30,
"skill_params": {"batch_size": 10, "cache_enabled": True}
},
"medium": {
"complexity_multiplier": 3.0,
"input_size": 300,
"time_budget_seconds": 90,
"skill_params": {"batch_size": 50, "cache_enabled": True}
},
"complex": {
"complexity_multiplier": 8.0,
"input_size": 800,
"time_budget_seconds": 300,
"skill_params": {"batch_size": 100, "cache_enabled": True, "parallel_workers": 4}
}
}
cfg = configs[scenario_id]
# Build initial state
initial_state: ScenarioOrchestratorState = {
"orchestration_id": orchestration_id,
"start_time_unix": int(time.time()),
"skill_name": "your-skill-name",
"skill_version": "1.0.0",
# Current scenario only
"scenario_simple": None,
"scenario_medium": None,
"scenario_complex": None,
"progress_simple": None,
"progress_medium": None,
"progress_complex": None,
}
# Set only the relevant scenario
initial_state[f"scenario_{scenario_id}"] = ScenarioDefinition(
name=scenario_id,
difficulty={"simple": "easy", "medium": "intermediate", "complex": "advanced"}[scenario_id],
complexity_multiplier=cfg["complexity_multiplier"],
input_size=cfg["input_size"],
dataset_characteristics={"distribution": "uniform"},
time_budget_seconds=cfg["time_budget_seconds"],
memory_limit_mb={"simple": 256, "medium": 512, "complex": 1024}[scenario_id],
error_tolerance={"simple": 0.0, "medium": 0.05, "complex": 0.1}[scenario_id],
skill_params=cfg["skill_params"],
expected_quality={"simple": "basic", "medium": "good", "complex": "excellent"}[scenario_id],
quality_metrics=["accuracy", "coverage"]
)
initial_state[f"progress_{scenario_id}"] = ScenarioProgress(scenario_id=scenario_id)
# Run orchestrator
config = {"configurable": {"thread_id": f"orch-{orchestration_id}"}}
print(f"[{scenario_id.upper()}] Invoking orchestrator...")
try:
# Stream progress
async for update in app.astream(initial_state, config=config, stream_mode="updates"):
if f"progress_{scenario_id}" in update:
progress = update[f"progress_{scenario_id}"]
print(f"[{scenario_id.upper()}] Progress: {progress.progress_pct:.1f}% "
f"({progress.items_processed} items, {progress.elapsed_ms}ms)")
print(f"[{scenario_id.upper()}] Scenario complete")
except Exception as e:
print(f"[{scenario_id.upper()}] Error: {e}")
raise
if __name__ == "__main__":
import time
asyncio.run(run_scenario())Execution: Three-Terminal Mode
Terminal 1: Coordinator
cd /path/to/project
python backend/app/workflows/multi_scenario/coordinator.pyOutput:
[COORDINATOR] Starting orchestration demo-001
[COORDINATOR] Launching 3 parallel instances...
[COORDINATOR] Launched simple instance (PID: 1234)
[COORDINATOR] Launched medium instance (PID: 1235)
[COORDINATOR] Launched complex instance (PID: 1236)
[COORDINATOR] All instances launched. Monitoring...Terminal 2: Simple Scenario
cd /path/to/project
export SCENARIO_ID=simple
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.pyOutput:
[SIMPLE] Starting scenario execution
[SIMPLE] Orchestration ID: demo-001
[SIMPLE] Invoking orchestrator...
[SIMPLE] Progress: 10.0% (10 items, 100ms)
[SIMPLE] Progress: 20.0% (20 items, 200ms)
...
[SIMPLE] Progress: 100.0% (100 items, 1050ms)
[SIMPLE] Scenario completeTerminal 3: Medium Scenario
cd /path/to/project
export SCENARIO_ID=medium
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.pyOutput:
[MEDIUM] Starting scenario execution
[MEDIUM] Orchestration ID: demo-001
[MEDIUM] Invoking orchestrator...
[MEDIUM] Progress: 3.3% (10 items, 100ms)
[MEDIUM] Progress: 6.7% (20 items, 200ms)
...
[MEDIUM] Progress: 100.0% (300 items, 3100ms)
[MEDIUM] Scenario completeTerminal 4 (Optional): Complex Scenario
If you have 4 terminals, run complex in parallel:
export SCENARIO_ID=complex
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.pyShared State Synchronization
PostgreSQL Checkpoint Schema
-- Create checkpoint table (run once)
CREATE TABLE IF NOT EXISTS scenario_orchestration_checkpoints (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
orchestration_id VARCHAR(255) NOT NULL,
scenario_id VARCHAR(50) NOT NULL,
milestone_name VARCHAR(100),
progress_pct FLOAT,
timestamp_unix BIGINT NOT NULL,
state_snapshot JSONB,
metrics JSONB,
created_at TIMESTAMP DEFAULT NOW(),
INDEX idx_orchestration_id (orchestration_id),
INDEX idx_scenario_id (scenario_id),
INDEX idx_timestamp (timestamp_unix)
);
-- View progress across all scenarios
SELECT
orchestration_id,
scenario_id,
progress_pct,
milestone_name,
timestamp_unix,
(timestamp_unix / 1000.0) as seconds_elapsed
FROM scenario_orchestration_checkpoints
WHERE orchestration_id = 'demo-001'
ORDER BY scenario_id, progress_pct;
-- Example output:
-- orchestration_id | scenario_id | progress_pct | milestone_name | seconds_elapsed
-- demo-001 | simple | 30 | checkpoint_1 | 1.2
-- demo-001 | simple | 70 | checkpoint_2 | 2.8
-- demo-001 | simple | 100 | completion | 3.1
-- demo-001 | medium | 30 | checkpoint_1 | 3.5
-- demo-001 | medium | 70 | checkpoint_2 | 8.2
-- demo-001 | medium | 100 | completion | 9.3
-- demo-001 | complex | 30 | checkpoint_1 | 9.1
-- demo-001 | complex | 70 | checkpoint_2 | 22.5
-- demo-001 | complex | 100 | completion | 25.7Monitor Progress from Coordinator
"""Monitor script to watch progress across all instances."""
import asyncio
import time
from datetime import datetime
import psycopg2
async def monitor_orchestration(orchestration_id: str, interval: int = 2):
"""Watch progress of all scenarios."""
conn = psycopg2.connect("dbname=orchestkit user=postgres")
cursor = conn.cursor()
print(f"Monitoring orchestration {orchestration_id}...\n")
while True:
cursor.execute("""
SELECT
scenario_id,
MAX(progress_pct) as progress,
MAX(timestamp_unix) as last_update
FROM scenario_orchestration_checkpoints
WHERE orchestration_id = %s
GROUP BY scenario_id
ORDER BY scenario_id
""", (orchestration_id,))
rows = cursor.fetchall()
if not rows:
print("No progress yet...")
await asyncio.sleep(interval)
continue
# Clear screen and print progress
print(f"\r{datetime.now().strftime('%H:%M:%S')}")
print("-" * 50)
all_complete = True
for scenario_id, progress, timestamp in rows:
bar_length = int(progress / 5) # 20-char bar
bar = "█" * bar_length + "░" * (20 - bar_length)
print(f"{scenario_id:10} │{bar}│ {progress:3.0f}%")
if progress < 100:
all_complete = False
if all_complete:
print("\n✓ All scenarios complete!")
break
await asyncio.sleep(interval)
conn.close()
if __name__ == "__main__":
asyncio.run(monitor_orchestration("demo-001"))Synchronization at Milestones
To enable forced synchronization at milestones (all scenarios pause and wait):
# In run_scenario.py
async def wait_for_milestone_sync(
orchestration_id: str,
scenario_id: str,
milestone_pct: int,
timeout_seconds: int = 30
):
"""Wait for all scenarios to reach milestone."""
checkpointer = PostgresSaver.from_conn_string(DATABASE_URL)
start = time.time()
while time.time() - start < timeout_seconds:
# Query checkpoint status
async with checkpointer.get_connection() as conn:
result = await conn.fetch("""
SELECT DISTINCT scenario_id, MAX(progress_pct)
FROM scenario_orchestration_checkpoints
WHERE orchestration_id = $1
GROUP BY scenario_id
""", orchestration_id)
scenarios_at_milestone = {
row["scenario_id"]: row["max"] >= milestone_pct
for row in result
}
if all(scenarios_at_milestone.values()):
print(f"[{scenario_id.upper()}] All scenarios reached {milestone_pct}%")
return True
await asyncio.sleep(0.5)
print(f"[{scenario_id.upper()}] Sync timeout at {milestone_pct}%")
return FalseAdvanced: Multi-Host Execution
For even greater parallelism, run scenarios on different machines:
# Host 1: Coordinator + Simple
python backend/app/workflows/multi_scenario/coordinator.py
# Host 2: Medium (different machine, same DB)
export DATABASE_URL="postgresql://user:pass@coordinator-host/orchestkit"
export SCENARIO_ID=medium
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.py
# Host 3: Complex (different machine, same DB)
export DATABASE_URL="postgresql://user:pass@coordinator-host/orchestkit"
export SCENARIO_ID=complex
export ORCHESTRATION_ID=demo-001
python backend/app/workflows/multi_scenario/run_scenario.pyPostgreSQL checkpoints serve as the distributed state store.
Best Practices
- Unique Orchestration IDs: Use timestamp or UUID for each demo run
- Session Isolation: Each instance gets its own Claude Code session
- Checkpointing: Always enable PostgreSQL persistence
- Monitoring: Watch progress via checkpoint table queries
- Timeout Handling: Allow asynchronous completion, don't force lock-step
- Error Recovery: Failed instances can be restarted without resetting state
Troubleshooting
Instances get stuck at milestone:
→ Increase timeout_seconds in wait_for_milestone_sync()
Database connection errors:
→ Check DATABASE_URL environment variable, ensure PostgreSQL is running
One instance much slower than others: → This is expected! Use Mode A (free-running), not lock-step. Slower instance will eventually complete.
Memory usage grows over time: → Enable checkpointing to disk, reduce batch sizes for complex scenario
Coordination Patterns
Agent Coordination Patterns
Patterns for coordinating multiple specialized agents in complex workflows.
Supervisor-Worker Pattern
from typing import Protocol, Any
import asyncio
class Agent(Protocol):
async def run(self, task: str, context: dict) -> dict: ...
class SupervisorCoordinator:
"""Central supervisor that routes tasks to worker agents."""
def __init__(self, workers: dict[str, Agent]):
self.workers = workers
self.execution_log: list[dict] = []
async def route_and_execute(
self,
task: str,
required_agents: list[str],
parallel: bool = True
) -> dict[str, Any]:
"""Route task to specified agents."""
context = {"task": task, "results": {}}
if parallel:
tasks = [
self._run_worker(name, task, context)
for name in required_agents
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return dict(zip(required_agents, results))
else:
for name in required_agents:
context["results"][name] = await self._run_worker(
name, task, context
)
return context["results"]
async def _run_worker(
self, name: str, task: str, context: dict
) -> dict:
"""Execute single worker with timeout."""
try:
result = await asyncio.wait_for(
self.workers[name].run(task, context),
timeout=30.0
)
self.execution_log.append({
"agent": name, "status": "success", "result": result
})
return result
except asyncio.TimeoutError:
return {"error": f"{name} timed out"}Conflict Resolution
async def resolve_agent_conflicts(
findings: list[dict],
llm: Any
) -> dict:
"""Resolve conflicts between agent outputs."""
conflicts = []
for i, f1 in enumerate(findings):
for f2 in findings[i+1:]:
if f1.get("recommendation") != f2.get("recommendation"):
conflicts.append((f1, f2))
if not conflicts:
return {"status": "no_conflicts", "findings": findings}
# LLM arbitration
resolution = await llm.ainvoke(f"""
Agents disagree. Determine best recommendation:
Agent 1: {conflicts[0][0]}
Agent 2: {conflicts[0][1]}
Provide: winner, reasoning, confidence (0-1)
""")
return {"status": "resolved", "resolution": resolution}Configuration
- Worker timeout: 30s default
- Max parallel agents: 8
- Retry failed agents: 1 attempt
- Log all executions for debugging
Cost Optimization
- Batch similar tasks to reduce overhead
- Cache agent results by task hash
- Use cheaper models for simple agents
- Parallelize independent agents always
Crewai Patterns
CrewAI Patterns (v1.8+)
CrewAI patterns for role-based multi-agent collaboration with Flows architecture, hierarchical crews, MCP tools, and async execution.
Version: This document covers CrewAI 1.8.x - 1.9.x (2026). For earlier versions, patterns may differ.
Table of Contents
- Flows Architecture (1.8+)
- MCP Tool Support (1.8+)
- Hierarchical Process
- Agent Configuration (1.8+)
- Task Configuration (1.8+)
- Async Execution
- Streaming Output
- Knowledge Sources (1.8+)
- Memory Configuration
- Custom Tools
- Decorator-Based Crew Definition
- Human-in-the-Loop (Flows)
- Configuration Summary
- Best Practices
- Migration from 0.x
Flows Architecture (1.8+)
Flows provide event-driven orchestration with state management. This is the major 1.x feature for complex multi-step workflows.
Basic Flow
from crewai.flow.flow import Flow, listen, start
class ResearchFlow(Flow):
@start()
def generate_topic(self):
"""Entry point - marked with @start()"""
return "AI Safety"
@listen(generate_topic)
def research_topic(self, topic):
"""Triggered when generate_topic completes"""
return f"Research findings on {topic}"
@listen(research_topic)
def summarize(self, findings):
"""Chain multiple listeners"""
return f"Summary: {findings[:100]}..."
# Execute flow
flow = ResearchFlow()
result = flow.kickoff()Structured State (Pydantic)
from pydantic import BaseModel
from crewai.flow.flow import Flow, listen, start
class WorkflowState(BaseModel):
topic: str = ""
research: str = ""
summary: str = ""
iteration: int = 0
class StatefulFlow(Flow[WorkflowState]):
@start()
def initialize(self):
self.state.topic = "Machine Learning"
self.state.iteration = 1
@listen(initialize)
def process(self):
self.state.research = f"Research on {self.state.topic}"
self.state.iteration += 1
return self.state.researchRouter for Conditional Branching
from crewai.flow.flow import Flow, listen, start, router
class ConditionalFlow(Flow):
@start()
def evaluate(self):
# Returns condition result
return {"score": 85, "passed": True}
@router(evaluate)
def route_result(self, result):
"""Route based on evaluation"""
if result["passed"]:
return "success"
return "retry"
@listen("success")
def handle_success(self):
return "Workflow completed successfully"
@listen("retry")
def handle_retry(self):
return "Retrying workflow..."Parallel Execution with and_/or_
from crewai.flow.flow import Flow, listen, start, and_, or_
class ParallelFlow(Flow):
@start()
def task_a(self):
return "Result A"
@start()
def task_b(self):
return "Result B"
@listen(and_(task_a, task_b))
def combine_results(self):
"""Triggers when BOTH complete"""
return "Combined results"
@listen(or_(task_a, task_b))
def first_result(self):
"""Triggers when EITHER completes"""
return "First result received"Integrating Crews with Flows
from crewai.flow.flow import Flow, listen, start
from crewai import Crew, Agent, Task
class CrewFlow(Flow):
@start()
def prepare_inputs(self):
return {"topic": "AI Agents", "depth": "detailed"}
@listen(prepare_inputs)
def run_research_crew(self, inputs):
researcher = Agent(
role="Researcher",
goal="Research the given topic thoroughly",
backstory="Expert researcher with domain knowledge"
)
task = Agent(
description=f"Research {inputs['topic']} at {inputs['depth']} level",
expected_output="Comprehensive research report",
agent=researcher
)
crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()
return result.rawMCP Tool Support (1.8+)
CrewAI supports Model Context Protocol (MCP) for external tool integration.
Simple DSL (Recommended)
from crewai import Agent
# URL-based MCP server
agent = Agent(
role="Research Analyst",
goal="Research and analyze information",
backstory="Expert analyst",
mcps=[
"https://mcp.example.com/mcp?api_key=your_key",
"crewai-amp:financial-data", # CrewAI marketplace
"crewai-amp:research-tools#pubmed_search" # Specific tool
]
)Transport-Specific Configuration
from crewai import Agent
from crewai.mcp import MCPServerStdio, MCPServerHTTP, MCPServerSSE
from crewai.mcp.tool_filter import create_static_tool_filter
# Local server via stdio
agent = Agent(
role="File Analyst",
goal="Analyze local files",
backstory="File processing expert",
mcps=[
MCPServerStdio(
command="npx",
args=["-y", "@modelcontextprotocol/server-filesystem"],
tool_filter=create_static_tool_filter(
allowed_tool_names=["read_file", "list_directory"]
)
)
]
)
# Remote HTTP server
agent = Agent(
role="API Analyst",
goal="Query external APIs",
backstory="Integration specialist",
mcps=[
MCPServerHTTP(
url="https://api.example.com/mcp",
headers={"Authorization": "Bearer token"},
connect_timeout=60
)
]
)
# Server-Sent Events (streaming)
agent = Agent(
role="Real-time Analyst",
goal="Monitor streaming data",
backstory="Real-time data specialist",
mcps=[
MCPServerSSE(
url="https://stream.example.com/mcp",
headers={"Authorization": "Bearer token"}
)
]
)MCPServerAdapter (Advanced)
from crewai import Agent, Crew, Task
from crewai_tools import MCPServerAdapter
# Context manager for manual connection management
with MCPServerAdapter(server_params, connect_timeout=60) as mcp_tools:
agent = Agent(
role="MCP Tool User",
goal="Use MCP tools effectively",
backstory="Tool specialist",
tools=mcp_tools,
verbose=True
)
# Or filter specific tools
filtered_tools = mcp_tools["specific_tool_name"]Hierarchical Process
from crewai import Agent, Crew, Task, Process
manager = Agent(
role="Project Manager",
goal="Coordinate team and ensure deliverables",
backstory="Senior PM with 10 years experience",
allow_delegation=True,
verbose=True
)
researcher = Agent(
role="Researcher",
goal="Find accurate information",
backstory="Expert researcher",
allow_delegation=False
)
writer = Agent(
role="Content Writer",
goal="Create compelling content",
backstory="Professional writer"
)
crew = Crew(
agents=[manager, researcher, writer],
tasks=[research_task, write_task, review_task],
process=Process.hierarchical,
manager_llm="gpt-4o", # Required for hierarchical
memory=True,
verbose=True
)Agent Configuration (1.8+)
from crewai import Agent
agent = Agent(
# Core identity
role="Senior Data Scientist",
goal="Analyze data and provide insights",
backstory="Expert with 10 years experience",
# LLM configuration
llm="gpt-4o",
function_calling_llm="gpt-4o-mini", # Cheaper model for tools
use_system_prompt=True,
# Execution control
max_iter=20,
max_rpm=100,
max_execution_time=300, # seconds
max_retry_limit=2,
# Advanced features (1.8+)
reasoning=True, # Enable reflection before tasks
max_reasoning_attempts=3,
multimodal=True, # Text and visual processing
inject_date=True,
date_format="%Y-%m-%d",
# Memory and context
memory=True,
respect_context_window=True, # Auto-summarize on limit
# Tools
tools=[tool1, tool2],
cache=True,
# Delegation
allow_delegation=True,
verbose=True
)Task Configuration (1.8+)
Structured Output
from pydantic import BaseModel
from crewai import Task
class ReportOutput(BaseModel):
title: str
summary: str
findings: list[str]
confidence: float
task = Agent(
description="Analyze market trends and create report",
expected_output="Structured market analysis report",
agent=analyst,
output_pydantic=ReportOutput # Structured output
)
# Access structured result
result = crew.kickoff()
report = result.pydantic
print(report.title, report.confidence)Async Task Execution
from crewai import Task
# Parallel research tasks
research_task1 = Agent(
description="Research topic A",
expected_output="Research findings",
agent=researcher,
async_execution=True # Non-blocking
)
research_task2 = Agent(
description="Research topic B",
expected_output="Research findings",
agent=researcher,
async_execution=True
)
# Dependent task waits for async tasks
synthesis_task = Agent(
description="Synthesize all research",
expected_output="Integrated analysis",
agent=analyst,
context=[research_task1, research_task2] # Waits for completion
)Task Guardrails (Validation)
from crewai import Task
from crewai.tasks import TaskOutput
def validate_length(result: TaskOutput) -> tuple[bool, any]:
"""Validate output meets requirements"""
if len(result.raw.split()) < 100:
return (False, "Content too brief, expand analysis")
return (True, result.raw)
task = Agent(
description="Write comprehensive analysis",
expected_output="Detailed analysis (100+ words)",
agent=writer,
guardrail=validate_length,
guardrail_max_retries=3
)
# Multiple guardrails
task = Agent(
description="Generate report",
expected_output="Validated report",
agent=analyst,
guardrails=[
validate_length,
validate_sources,
"Content must be objective and data-driven" # LLM-based
]
)Human Input Tasks
task = Agent(
description="Review and approve recommendations",
expected_output="Approved recommendations",
agent=reviewer,
human_input=True # Pauses for human verification
)Task Callbacks
from crewai.tasks import TaskOutput
def task_callback(output: TaskOutput):
print(f"Task completed: {output.description}")
print(f"Result: {output.raw[:100]}...")
# Send notifications, log metrics, etc.
task = Agent(
description="Analyze data",
expected_output="Analysis results",
agent=analyst,
callback=task_callback
)Async Execution
Async Crew Kickoff
import asyncio
from crewai import Crew
async def run_crews_parallel():
crew1 = Crew(agents=[agent1], tasks=[task1])
crew2 = Crew(agents=[agent2], tasks=[task2])
# Run multiple crews in parallel
results = await asyncio.gather(
crew1.kickoff_async(),
crew2.kickoff_async()
)
return results
# Execute
results = asyncio.run(run_crews_parallel())Async Flow Kickoff
from crewai.flow.flow import Flow, start, listen
class AsyncFlow(Flow):
@start()
async def fetch_data(self):
# Async operations supported
data = await external_api.fetch()
return data
@listen(fetch_data)
async def process_data(self, data):
result = await process_async(data)
return result
# Async execution
async def main():
flow = AsyncFlow()
result = await flow.kickoff_async()
return result
asyncio.run(main())Streaming Output
from crewai import Crew
# Enable streaming on crew
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
stream=True # Enable real-time output
)
# Stream results
result = crew.kickoff()
# Flow streaming
flow = ExampleFlow()
flow.stream = True
streaming = flow.kickoff()
for chunk in streaming:
print(chunk.content, end="", flush=True)
final_result = streaming.resultKnowledge Sources (1.8+)
from crewai import Agent, Crew
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.knowledge.source.pdf_knowledge_source import PDFKnowledgeSource
from crewai.knowledge.source.crew_docling_source import CrewDoclingSource
from crewai.knowledge.knowledge_config import KnowledgeConfig
# String knowledge
company_info = StringKnowledgeSource(
content="Company policies and guidelines..."
)
# PDF knowledge
docs = PDFKnowledgeSource(file_paths=["manual.pdf", "guide.pdf"])
# Web knowledge
web_source = CrewDoclingSource(
file_paths=["https://example.com/docs"]
)
# Configure retrieval
config = KnowledgeConfig(
results_limit=10, # Documents returned (default: 3)
score_threshold=0.5 # Relevance minimum (default: 0.35)
)
# Agent-level knowledge
agent = Agent(
role="Support Agent",
goal="Answer questions using company knowledge",
backstory="Expert support representative",
knowledge_sources=[company_info]
)
# Crew-level knowledge (all agents)
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
knowledge_sources=[docs, web_source]
)Memory Configuration
from crewai import Crew
from crewai.memory import ShortTermMemory, LongTermMemory, EntityMemory
# Simple memory
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
memory=True # Enable all memory types
)
# Custom memory configuration
crew = Crew(
agents=[agent1, agent2],
tasks=[task1, task2],
short_term_memory=ShortTermMemory(),
long_term_memory=LongTermMemory(
storage=ChromaStorage(collection_name="crew_memory")
),
entity_memory=EntityMemory()
)Custom Tools
from crewai.tools import tool
@tool("Search Database")
def search_database(query: str) -> str:
"""Search the internal database for relevant information.
Args:
query: The search query string
"""
results = db.search(query)
return json.dumps(results)
# Async tool
@tool("Fetch API Data")
async def fetch_api_data(endpoint: str) -> str:
"""Fetch data from external API asynchronously.
Args:
endpoint: API endpoint to query
"""
async with aiohttp.ClientSession() as session:
async with session.get(endpoint) as response:
return await response.text()
# Assign tools to agent
researcher = Agent(
role="Researcher",
goal="Find accurate information",
backstory="Expert researcher",
tools=[search_database, fetch_api_data],
verbose=True
)Decorator-Based Crew Definition (Recommended)
from crewai import Agent, Crew, Task, CrewBase, agent, task, crew
@CrewBase
class ResearchCrew:
agents_config = 'config/agents.yaml'
tasks_config = 'config/tasks.yaml'
@agent
def researcher(self) -> Agent:
return Agent(
config=self.agents_config['researcher'],
tools=[search_tool]
)
@agent
def analyst(self) -> Agent:
return Agent(config=self.agents_config['analyst'])
@task
def research_task(self) -> Task:
return Agent(config=self.tasks_config['research'])
@task
def analysis_task(self) -> Task:
return Agent(
config=self.tasks_config['analysis'],
context=[self.research_task()]
)
@crew
def crew(self) -> Crew:
return Crew(
agents=self.agents, # Auto-collected
tasks=self.tasks, # Auto-collected
process=Process.sequential
)
# Execute
result = ResearchCrew().crew().kickoff(inputs={"topic": "AI Safety"})Human-in-the-Loop (Flows)
from crewai.flow.flow import Flow, listen, start
from crewai.flow.human_feedback import human_feedback, HumanFeedbackResult
class ReviewFlow(Flow):
@start()
@human_feedback(
message="Do you approve this content?",
emit=["approved", "rejected"],
llm="gpt-4o-mini"
)
def generate_content(self):
return "Content for human review..."
@listen("approved")
def handle_approval(self, result: HumanFeedbackResult):
print(f"Approved with feedback: {result.feedback}")
return "Processing approved content"
@listen("rejected")
def handle_rejection(self, result: HumanFeedbackResult):
print(f"Rejected: {result.feedback}")
return "Revising content"Configuration Summary
| Feature | Parameter | Default |
|---|---|---|
| Process types | process | sequential, hierarchical |
| Manager LLM | manager_llm | Required for hierarchical |
| Memory | memory | False |
| Streaming | stream | False |
| Verbose | verbose | False |
| Max RPM | max_rpm | Unlimited |
| Planning | planning | False |
Best Practices
- Use Flows for complex workflows: Multi-step processes benefit from Flows architecture
- Prefer decorator-based definition: Use
@CrewBasefor maintainable crew definitions - Leverage MCP for external tools: Use the simple DSL for quick MCP integration
- Enable structured outputs: Use
output_pydanticfor type-safe results - Add guardrails: Validate outputs with function or LLM-based guardrails
- Use async for parallel work:
async_execution=Truefor independent tasks - Configure knowledge sources: Add crew/agent-level knowledge for context
- Role clarity: Each agent has distinct, non-overlapping role
- Task granularity: One clear deliverable per task
- Memory scope: Use short-term for session, long-term for persistent knowledge
Migration from 0.x
| 0.x Pattern | 1.8+ Pattern |
|---|---|
| Manual agent/task lists | @CrewBase with @agent, @task decorators |
| Synchronous only | Async support with kickoff_async() |
| No streaming | stream=True parameter |
| Basic tools | MCP integration with mcps parameter |
| No validation | Task guardrails |
| No flow control | Flows with @start, @listen, @router |
Framework Comparison
Framework Comparison
Decision matrix for choosing between multi-agent frameworks.
Feature Comparison
| Feature | LangGraph | CrewAI | OpenAI SDK | MS Agent |
|---|---|---|---|---|
| State Management | Excellent | Good | Basic | Good |
| Persistence | Built-in | Plugin | Manual | Built-in |
| Streaming | Native | Limited | Native | Native |
| Human-in-Loop | Native | Manual | Manual | Native |
| Memory | Via Store | Built-in | Manual | Manual |
| Observability | Langfuse/LangSmith | Limited | Tracing | Azure Monitor |
| Learning Curve | Steep | Easy | Medium | Medium |
| Production Ready | Yes | Yes | Yes | Q1 2026 |
Use Case Matrix
| Use Case | Best Framework | Why |
|---|---|---|
| Complex state machines | LangGraph | Native StateGraph, persistence |
| Role-based teams | CrewAI | Built-in delegation, backstories |
| OpenAI-only projects | OpenAI SDK | Native integration, handoffs |
| Enterprise/compliance | MS Agent | Azure integration, A2A |
| Research/experiments | AG2 | Open-source, flexible |
| Quick prototypes | CrewAI | Minimal boilerplate |
| Long-running workflows | LangGraph | Checkpointing, recovery |
| Customer support bots | OpenAI SDK | Handoffs, guardrails |
Decision Tree
Start
|
+-- Need complex state machines?
| |
| +-- Yes --> LangGraph
| |
| +-- No
| |
+-- Role-based collaboration?
| |
| +-- Yes --> CrewAI
| |
| +-- No
| |
+-- OpenAI ecosystem only?
| |
| +-- Yes --> OpenAI Agents SDK
| |
| +-- No
| |
+-- Enterprise requirements?
| |
| +-- Yes --> Microsoft Agent Framework
| |
| +-- No
| |
+-- Open-source priority?
|
+-- Yes --> AG2
|
+-- No --> LangGraph (default)Migration Paths
From AutoGen to MS Agent Framework
# AutoGen 0.2 (old)
from autogen import AssistantAgent, UserProxyAgent
agent = AssistantAgent(name="assistant", llm_config=config)
# MS Agent Framework (new)
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
model_client = OpenAIChatCompletionClient(model="gpt-5.2")
agent = AssistantAgent(name="assistant", model_client=model_client)From Custom to LangGraph
# Custom orchestration (old)
async def workflow(task):
step1 = await agent1.run(task)
step2 = await agent2.run(step1)
return step2
# LangGraph (new)
from langgraph.graph import StateGraph
workflow = StateGraph(State)
workflow.add_node("agent1", agent1_node)
workflow.add_node("agent2", agent2_node)
workflow.add_edge("agent1", "agent2")Cost Considerations
| Framework | Licensing | Infra Cost | LLM Cost |
|---|---|---|---|
| LangGraph | MIT | Self-host / LangGraph Cloud | Any LLM |
| CrewAI | MIT | Self-host | Any LLM |
| OpenAI SDK | MIT | Self-host | OpenAI only |
| MS Agent | MIT | Self-host / Azure | Any LLM |
| AG2 | Apache 2.0 | Self-host | Any LLM |
Performance Characteristics
| Framework | Cold Start | Latency | Throughput |
|---|---|---|---|
| LangGraph | ~100ms | Low | High |
| CrewAI | ~200ms | Medium | Medium |
| OpenAI SDK | ~50ms | Low | High |
| MS Agent | ~150ms | Medium | High |
Team Expertise Requirements
| Framework | Python | LLM | Infra |
|---|---|---|---|
| LangGraph | Expert | Expert | Medium |
| CrewAI | Beginner | Beginner | Low |
| OpenAI SDK | Medium | Medium | Low |
| MS Agent | Medium | Medium | High |
Recommendation Summary
- Default choice: LangGraph (most capable, production-proven)
- Fastest to prototype: CrewAI (minimal code, intuitive)
- OpenAI shops: OpenAI Agents SDK (native integration)
- Enterprise: Microsoft Agent Framework (compliance, Azure)
- Research: AG2 (open community, experimental features)
Gpt 5 2 Codex
GPT-5.2-Codex
OpenAI's specialized agentic coding model (January 2026) optimized for long-horizon software engineering tasks.
Overview
GPT-5.2-Codex is a specialized variant of GPT-5.2 purpose-built for agentic coding workflows. Unlike the general-purpose GPT-5.2, Codex is optimized for:
- Extended autonomous operation: Hours-long coding sessions without degradation
- Context compaction: Intelligent summarization for long-running tasks
- Project-scale understanding: Full codebase comprehension and refactoring
- Tool reliability: Deterministic file operations and terminal commands
Key Differences from GPT-5.2
| Capability | GPT-5.2 | GPT-5.2-Codex |
|---|---|---|
| Context Window | 256K tokens | 256K + compaction |
| Session Duration | Single request | Hours/days |
| Tool Execution | General | Code-optimized |
| File Operations | Basic | Atomic, rollback-aware |
| Terminal Access | Sandboxed | Full with safety rails |
| Vision | General | Code/diagram-aware |
| Cost per 1M tokens | $2.50/$10 | $5.00/$20 |
Key Capabilities
Long-Horizon Work Through Context Compaction
Codex automatically compacts context during extended sessions, preserving critical information while discarding ephemeral details.
from openai import OpenAI
client = OpenAI()
# Codex maintains context across many tool calls
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{"role": "system", "content": "You are a senior software engineer."},
{"role": "user", "content": "Refactor the authentication module to use JWT."}
],
# Codex-specific parameters
extra_body={
"codex_config": {
"compaction_strategy": "semantic", # semantic, aggressive, minimal
"preserve_file_state": True,
"max_session_hours": 8
}
}
)Compaction Strategies:
| Strategy | Use Case | Retention |
|---|---|---|
semantic | General development | Code structure, decisions, errors |
aggressive | Very long tasks | Only current focus + critical history |
minimal | Short tasks | Full context, no compaction |
Project-Scale Tasks
Codex excels at large-scale operations that span entire codebases:
from agents import Agent, tool
# Define codebase navigation tools
@tool
def search_codebase(query: str, file_types: list[str] = None) -> str:
"""Search across the entire codebase for patterns or definitions."""
# Implementation
pass
@tool
def apply_refactor(pattern: str, replacement: str, scope: str = "project") -> dict:
"""Apply a refactoring pattern across multiple files with preview."""
# Returns affected files and changes for approval
pass
codex_agent = Agent(
name="refactor-engineer",
model="gpt-5.2-codex",
instructions="""You are a senior engineer performing large-scale refactors.
Guidelines:
1. Analyze impact before changes
2. Create rollback points
3. Run tests after each file change
4. Document breaking changes""",
tools=[search_codebase, apply_refactor]
)Supported Project Tasks:
- Full codebase migrations (Python 2 to 3, React class to hooks)
- Dependency upgrades with breaking change resolution
- Architecture refactors (monolith to microservices)
- Test coverage expansion across modules
- Security vulnerability remediation
Enhanced Cybersecurity Capabilities
Codex includes specialized training for security-aware coding:
# Security-focused agent configuration
security_agent = Agent(
name="security-engineer",
model="gpt-5.2-codex",
instructions="""You are a security engineer. When writing or reviewing code:
1. Identify OWASP Top 10 vulnerabilities
2. Check for secrets/credentials in code
3. Validate input sanitization
4. Review authentication/authorization flows
5. Check dependency vulnerabilities via CVE databases""",
extra_config={
"security_mode": True, # Enables security-focused reasoning
"cve_lookup": True # Real-time CVE database access
}
)Security Capabilities:
| Feature | Description |
|---|---|
| Vulnerability Detection | SAST-like scanning during code review |
| CVE Awareness | Real-time vulnerability database lookups |
| Secrets Detection | Identifies hardcoded credentials, API keys |
| Threat Modeling | Suggests security improvements |
| Compliance Hints | GDPR, HIPAA, SOC2 pattern recognition |
Vision for Code Artifacts
Codex processes visual inputs with code-aware understanding:
import base64
# Process architecture diagram
with open("architecture.png", "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode("utf-8")
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{
"role": "user",
"content": [
{
"type": "text",
"text": "Implement the microservices shown in this architecture diagram."
},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{image_data}"}
}
]
}
]
)Vision Use Cases:
- Architecture diagrams to code scaffolding
- UI mockups to component implementation
- Error screenshots to debugging steps
- Whiteboard sketches to technical specs
- Database schemas (ERD) to migrations
Benchmark Performance
GPT-5.2-Codex achieves state-of-the-art results on coding benchmarks:
| Benchmark | GPT-5.2 | GPT-5.2-Codex | Previous SOTA |
|---|---|---|---|
| SWE-Bench Pro | 61.2% | 78.4% | 68.1% (Claude Opus 4.6) |
| Terminal-Bench 2.0 | 72.8% | 89.3% | 81.2% (Gemini 2.5) |
| HumanEval+ | 94.1% | 96.8% | 95.2% (GPT-5.2) |
| MBPP+ | 89.7% | 93.2% | 91.4% (Claude Opus 4.6) |
| CodeContests | 45.2% | 58.7% | 52.3% (Gemini 2.5) |
SWE-Bench Pro Notes:
- Tests real GitHub issues requiring multi-file changes
- Codex excels at test-writing and edge case handling
- Strong performance on legacy codebase understanding
Terminal-Bench 2.0 Notes:
- Tests long-horizon terminal tasks (setup, deploy, debug)
- Codex maintains coherent state across 50+ commands
- Superior error recovery and alternative path exploration
API Usage Patterns
Basic Completion
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{
"role": "system",
"content": "You are an expert Python developer."
},
{
"role": "user",
"content": "Write a connection pool manager with health checks."
}
],
temperature=0.2, # Lower temperature for code generation
max_tokens=4096
)
print(response.choices[0].message.content)Streaming with Tool Use
import json
# Define tools
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read contents of a file",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"}
},
"required": ["path"]
}
}
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "Write contents to a file",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"content": {"type": "string", "description": "File content"}
},
"required": ["path", "content"]
}
}
},
{
"type": "function",
"function": {
"name": "run_command",
"description": "Execute a shell command",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Command to run"},
"timeout": {"type": "integer", "description": "Timeout in seconds"}
},
"required": ["command"]
}
}
}
]
# Streaming with tool calls
stream = client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{"role": "user", "content": "Set up a new FastAPI project with tests"}
],
tools=tools,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.tool_calls:
tool_call = chunk.choices[0].delta.tool_calls[0]
print(f"Tool: {tool_call.function.name}")
elif chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")Async Operations
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def refactor_module(module_path: str) -> str:
response = await async_client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{
"role": "system",
"content": "Refactor code for readability and performance."
},
{
"role": "user",
"content": f"Refactor the module at {module_path}"
}
],
extra_body={
"codex_config": {
"preserve_behavior": True,
"add_type_hints": True
}
}
)
return response.choices[0].message.content
# Parallel refactoring
async def refactor_project(modules: list[str]):
tasks = [refactor_module(m) for m in modules]
results = await asyncio.gather(*tasks)
return dict(zip(modules, results))Integration with Agents SDK
GPT-5.2-Codex integrates seamlessly with OpenAI Agents SDK 0.12+:
from agents import Agent, Runner, handoff, tool
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX
# File operation tools
@tool
def read_file(path: str) -> str:
"""Read file contents."""
with open(path) as f:
return f.read()
@tool
def write_file(path: str, content: str) -> str:
"""Write content to file."""
with open(path, "w") as f:
f.write(content)
return f"Wrote {len(content)} bytes to {path}"
@tool
def run_tests(path: str = ".") -> str:
"""Run pytest on the specified path."""
import subprocess
result = subprocess.run(
["pytest", path, "-v", "--tb=short"],
capture_output=True,
text=True
)
return f"Exit code: {result.returncode}\n{result.stdout}\n{result.stderr}"
# Specialized agents using Codex
architect_agent = Agent(
name="architect",
model="gpt-5.2-codex",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a software architect. Analyze requirements and design solutions.
Hand off to implementer for coding tasks.
Hand off to reviewer for code review.""",
tools=[read_file]
)
implementer_agent = Agent(
name="implementer",
model="gpt-5.2-codex",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a senior developer. Implement designs with clean, tested code.
Hand off to reviewer when implementation is complete.""",
tools=[read_file, write_file, run_tests]
)
reviewer_agent = Agent(
name="reviewer",
model="gpt-5.2-codex",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are a code reviewer. Check for bugs, security issues, and style.
Request changes from implementer if needed.
Approve and hand back to architect when satisfied.""",
tools=[read_file]
)
# Wire up handoffs
architect_agent.handoffs = [
handoff(agent=implementer_agent),
handoff(agent=reviewer_agent)
]
implementer_agent.handoffs = [
handoff(agent=reviewer_agent),
handoff(agent=architect_agent)
]
reviewer_agent.handoffs = [
handoff(agent=implementer_agent),
handoff(agent=architect_agent)
]
# Run development workflow
async def develop_feature(requirement: str):
runner = Runner()
result = await runner.run(
architect_agent,
f"Design and implement: {requirement}"
)
return result.final_outputRealtimeRunner for Interactive Sessions
from agents import Agent
from agents.realtime import RealtimeRunner
# Interactive coding session
codex_agent = Agent(
name="pair-programmer",
model="gpt-5.2-codex",
instructions="You are a pair programming partner. Help write and debug code."
)
async def interactive_session():
async with RealtimeRunner(codex_agent) as runner:
# Continuous conversation with context preservation
while True:
user_input = input("> ")
if user_input == "exit":
break
async for chunk in runner.stream(user_input):
print(chunk.content, end="", flush=True)
print()IDE Integrations
GPT-5.2-Codex powers several IDE integrations:
Cursor
// .cursor/settings.json
{
"ai.model": "gpt-5.2-codex",
"ai.features": {
"composer": true,
"agent": true,
"codebaseIndexing": true
},
"ai.codex": {
"sessionDuration": "8h",
"compactionStrategy": "semantic"
}
}Cursor Features with Codex:
- Composer for multi-file generation
- Agent mode for autonomous tasks
- Background indexing for codebase awareness
- Inline completions with project context
Windsurf (Codeium)
# .windsurf/config.yaml
model:
provider: openai
name: gpt-5.2-codex
cascade:
enabled: true
max_depth: 10
auto_apply: false # Review changes before applying
features:
flows: true # Multi-step guided workflows
supercomplete: true
terminal_agent: trueWindsurf Features:
- Cascade for chained operations
- Flows for guided development
- Terminal integration for full-stack tasks
GitHub Copilot Workspace
# .github/copilot-workspace.yml
model: gpt-5.2-codex
workspace:
scope: repository
features:
- issue-to-pr
- multi-file-edit
- test-generation
review:
auto_suggest: true
security_scan: trueCopilot Workspace Features:
- Issue-to-PR automation
- Multi-repository awareness
- Integrated CI feedback
Factory (VSCode Extension)
// .vscode/settings.json
{
"factory.model": "gpt-5.2-codex",
"factory.drafter": {
"enabled": true,
"autoContext": true
},
"factory.pilot": {
"enabled": true,
"approvalRequired": true
}
}When to Use Codex vs Standard GPT-5.2
Use GPT-5.2-Codex When:
| Scenario | Why Codex |
|---|---|
| Multi-file refactors | Project-scale context management |
| Long debugging sessions | Context compaction prevents degradation |
| Security reviews | Specialized vulnerability detection |
| Test generation at scale | Understands test patterns across codebase |
| Architecture migrations | Maintains coherence across many changes |
| CI/CD pipeline work | Terminal-optimized tool execution |
Use Standard GPT-5.2 When:
| Scenario | Why Standard |
|---|---|
| Single-file tasks | No need for compaction overhead |
| Code explanation | General language understanding sufficient |
| Quick prototypes | Faster, cheaper for short tasks |
| Non-code tasks | Writing docs, emails, general Q&A |
| Cost-sensitive workloads | 50% cheaper than Codex |
Decision Matrix
Task Duration > 1 hour?
|
+-- Yes --> GPT-5.2-Codex
|
+-- No
|
+-- Multiple files affected?
| |
| +-- Yes --> GPT-5.2-Codex
| |
| +-- No
| |
| +-- Security review needed?
| | |
| | +-- Yes --> GPT-5.2-Codex
| | |
| | +-- No --> GPT-5.2 (standard)Pricing Considerations
Token Pricing (January 2026)
| Model | Input (per 1M) | Output (per 1M) | Cached Input |
|---|---|---|---|
| gpt-5.2 | $2.50 | $10.00 | $1.25 |
| gpt-5.2-codex | $5.00 | $20.00 | $2.50 |
| gpt-5.2-mini | $0.15 | $0.60 | $0.075 |
Cost Optimization Strategies
# 1. Use caching for repeated context
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=messages,
extra_body={
"cache_control": {
"system_prompt": "ephemeral", # Cache system prompt
"file_contents": "persistent" # Cache file reads
}
}
)
# 2. Batch similar operations
# Instead of separate calls per file:
files_to_refactor = ["auth.py", "users.py", "api.py"]
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=[
{"role": "user", "content": f"Refactor these files: {files_to_refactor}"}
]
)
# 3. Use gpt-5.2-mini for preprocessing
# Filter/classify tasks before sending to Codex
classification = client.chat.completions.create(
model="gpt-5.2-mini",
messages=[{"role": "user", "content": f"Is this task complex? {task}"}]
)
if "complex" in classification.choices[0].message.content.lower():
# Use Codex for complex tasks
use_model = "gpt-5.2-codex"
else:
# Use standard for simple tasks
use_model = "gpt-5.2"Estimated Costs by Task Type
| Task | Est. Tokens | Codex Cost | Standard Cost |
|---|---|---|---|
| Single file fix | ~5K | $0.12 | $0.06 |
| Module refactor | ~50K | $1.25 | $0.63 |
| Full codebase migration | ~500K | $12.50 | $6.25 |
| 8-hour dev session | ~2M | $50.00 | $25.00 |
Note: Codex typically requires fewer iterations for complex tasks, often making total cost comparable to standard GPT-5.2.
Configuration Reference
Codex-Specific Parameters
response = client.chat.completions.create(
model="gpt-5.2-codex",
messages=messages,
extra_body={
"codex_config": {
# Context management
"compaction_strategy": "semantic", # semantic, aggressive, minimal
"preserve_file_state": True, # Remember file contents
"max_session_hours": 8, # Session duration limit
# Code behavior
"preserve_behavior": True, # Ensure refactors don't change behavior
"add_type_hints": True, # Add type hints when refactoring
"follow_style_guide": "project", # project, google, pep8
# Safety
"security_mode": True, # Enable security scanning
"dry_run": False, # Preview changes without applying
"require_tests": True, # Require test coverage for changes
# Tools
"shell_timeout": 300, # Max seconds for shell commands
"file_size_limit": 1048576 # Max file size to read (1MB)
}
}
)Best Practices
- Start with clear goals: Define what "done" looks like upfront
- Provide project context: Include README, architecture docs, coding standards
- Use semantic compaction: Best balance of context and performance
- Enable security mode: Catch vulnerabilities during development
- Set session limits: Prevent runaway costs with
max_session_hours - Review before applying: Use
dry_runfor large refactors - Batch related operations: Reduce API calls by grouping similar tasks
- Cache file contents: Use persistent caching for frequently read files
Related Resources
Langgraph Implementation
LangGraph Implementation: Multi-Scenario Orchestration
Complete Python implementation of the multi-scenario orchestration pattern using LangGraph 1.0.6+.
1. State Definition
from typing import TypedDict, Annotated, Literal
from dataclasses import dataclass, field, asdict
from operator import add
import time
from datetime import datetime
@dataclass
class ScenarioProgress:
"""Track execution state for one scenario."""
scenario_id: str
status: Literal["pending", "running", "paused", "complete", "failed"]
progress_pct: float = 0.0
# Milestones
milestones_reached: list[str] = field(default_factory=list)
current_milestone: str = "start"
# Timing
start_time_ms: int = 0
elapsed_ms: int = 0
elapsed_checkpoints: dict = field(default_factory=dict) # {milestone: time_ms}
# Metrics
memory_used_mb: int = 0
items_processed: int = 0
batch_count: int = 0
# Results
partial_results: list[dict] = field(default_factory=list)
quality_scores: dict = field(default_factory=dict)
# Errors
errors: list[dict] = field(default_factory=list)
def to_dict(self):
return asdict(self)
@dataclass
class ScenarioDefinition:
"""Configuration for one scenario."""
name: str # "simple", "medium", "complex"
difficulty: Literal["easy", "intermediate", "advanced"]
complexity_multiplier: float # 1.0, 3.0, 8.0
# Inputs
input_size: int
dataset_characteristics: dict # {"distribution": "uniform"}
# Constraints
time_budget_seconds: int
memory_limit_mb: int
error_tolerance: float # 0-1
# Skill params
skill_params: dict
# Expectations
expected_quality: Literal["basic", "good", "excellent"]
quality_metrics: list[str]
def to_dict(self):
return asdict(self)
class ScenarioOrchestratorState(TypedDict, total=False):
"""State for the entire orchestration."""
# Orchestration metadata
orchestration_id: str
start_time_unix: int
skill_name: str
skill_version: str
# Scenario definitions
scenario_simple: ScenarioDefinition
scenario_medium: ScenarioDefinition
scenario_complex: ScenarioDefinition
# Progress tracking
progress_simple: ScenarioProgress
progress_medium: ScenarioProgress
progress_complex: ScenarioProgress
# Synchronization
sync_points: dict # {milestone: bool}
last_sync_time: int
# Aggregated results
final_results: dict2. Node Implementations
Supervisor Node
from langgraph.graph import StateGraph, START, END
from langgraph.types import Command, Send
async def scenario_supervisor(state: ScenarioOrchestratorState) -> list[Command]:
"""
Route to all 3 scenarios in parallel.
Returns Send commands that trigger parallel execution.
"""
print(f"[SUPERVISOR] Starting orchestration {state['orchestration_id']}")
# Initialize progress for each scenario
for scenario_id in ["simple", "medium", "complex"]:
progress = ScenarioProgress(
scenario_id=scenario_id,
status="pending",
start_time_ms=int(time.time() * 1000)
)
state[f"progress_{scenario_id}"] = progress
# Return Send commands for parallel execution
return [
Send("scenario_worker", {"scenario_id": "simple", **state}),
Send("scenario_worker", {"scenario_id": "medium", **state}),
Send("scenario_worker", {"scenario_id": "complex", **state}),
]
async def scenario_worker(state: ScenarioOrchestratorState) -> dict:
"""
Execute one scenario (simple, medium, or complex).
Receives scenario_id from supervisor via Send.
"""
scenario_id = state.get("scenario_id")
progress = state[f"progress_{scenario_id}"]
scenario_def = state[f"scenario_{scenario_id}"]
print(f"[SCENARIO {scenario_id.upper()}] Starting ({scenario_def.complexity_multiplier}x complexity)")
progress.status = "running"
progress.start_time_ms = int(time.time() * 1000)
try:
# Execute skill for this scenario
result = await execute_skill_with_milestones(
skill_name=state["skill_name"],
scenario_def=scenario_def,
progress=progress,
state=state
)
progress.status = "complete"
progress.elapsed_ms = int(time.time() * 1000) - progress.start_time_ms
progress.partial_results.append(result)
print(f"[SCENARIO {scenario_id.upper()}] Complete in {progress.elapsed_ms}ms")
return {f"progress_{scenario_id}": progress}
except Exception as e:
progress.status = "failed"
progress.errors.append({
"timestamp": datetime.now().isoformat(),
"message": str(e),
"severity": "error"
})
print(f"[SCENARIO {scenario_id.upper()}] Failed: {e}")
return {f"progress_{scenario_id}": progress}
async def execute_skill_with_milestones(
skill_name: str,
scenario_def: ScenarioDefinition,
progress: ScenarioProgress,
state: ScenarioOrchestratorState
) -> dict:
"""
Execute skill, recording milestones and checkpoints.
This is where you call YOUR SKILL.
"""
milestones = [0, 30, 50, 70, 90, 100] # Percentage checkpoints
results = {"batches": [], "quality": {}}
input_items = generate_test_data(
size=scenario_def.input_size,
characteristics=scenario_def.dataset_characteristics
)
batch_size = scenario_def.skill_params.get("batch_size", 10)
for batch_idx, batch in enumerate(chunks(input_items, batch_size)):
# Execute skill on this batch
# Replace this with your actual skill invocation
batch_result = await invoke_skill(
skill_name=skill_name,
input_data=batch,
params=scenario_def.skill_params
)
results["batches"].append(batch_result)
progress.batch_count += 1
progress.items_processed += len(batch)
# Update progress percentage
progress.progress_pct = (progress.items_processed / scenario_def.input_size) * 100
# Check if we've reached a milestone
reached_milestones = [m for m in milestones if m <= progress.progress_pct]
new_milestones = [m for m in reached_milestones if m not in progress.milestones_reached]
for milestone in new_milestones:
progress.milestones_reached.append(milestone)
elapsed = int(time.time() * 1000) - progress.start_time_ms
progress.elapsed_checkpoints[f"milestone_{milestone}"] = elapsed
print(f" [{progress.scenario_id}] Reached {milestone}% at {elapsed}ms")
# Optional: Wait for other scenarios at major milestones
if milestone in [30, 70]:
await synchronize_at_milestone(milestone, state)
# Score results
results["quality"] = calculate_quality_metrics(results["batches"], scenario_def.quality_metrics)
progress.quality_scores = results["quality"]
return resultsSynchronization Node
async def synchronize_at_milestone(
milestone_pct: int,
state: ScenarioOrchestratorState,
timeout_seconds: int = 30
) -> bool:
"""
Optional: Wait for other scenarios at major milestones.
Returns True if all scenarios reached milestone, False if timeout.
"""
start = time.time()
milestone_key = f"checkpoint_{milestone_pct}"
while time.time() - start < timeout_seconds:
simple_at_milestone = milestone_pct in state["progress_simple"].milestones_reached
medium_at_milestone = milestone_pct in state["progress_medium"].milestones_reached
complex_at_milestone = milestone_pct in state["progress_complex"].milestones_reached
all_reached = simple_at_milestone and medium_at_milestone and complex_at_milestone
if all_reached:
state["sync_points"][milestone_key] = True
print(f"[SYNC] All scenarios reached {milestone_pct}%")
return True
# Check if any scenario failed
if any(state[f"progress_{s}"].status == "failed" for s in ["simple", "medium", "complex"]):
print(f"[SYNC] A scenario failed, proceeding without sync")
return False
await asyncio.sleep(0.5)
print(f"[SYNC] Timeout at {milestone_pct}%, proceeding")
return FalseAggregator Node
async def scenario_aggregator(state: ScenarioOrchestratorState) -> dict:
"""
Collect all scenario results and synthesize findings.
"""
print("[AGGREGATOR] Combining results from all scenarios")
aggregated = {
"orchestration_id": state["orchestration_id"],
"skill": state["skill_name"],
"timestamp": datetime.now().isoformat(),
# Raw results
"results_by_scenario": {
"simple": state["progress_simple"].partial_results[-1] if state["progress_simple"].partial_results else {},
"medium": state["progress_medium"].partial_results[-1] if state["progress_medium"].partial_results else {},
"complex": state["progress_complex"].partial_results[-1] if state["progress_complex"].partial_results else {},
},
# Metrics
"metrics": {},
# Comparison
"comparison": {},
# Recommendations
"recommendations": []
}
# Calculate comparative metrics
for scenario_id in ["simple", "medium", "complex"]:
progress = state[f"progress_{scenario_id}"]
aggregated["metrics"][scenario_id] = {
"elapsed_ms": progress.elapsed_ms,
"items_processed": progress.items_processed,
"quality_scores": progress.quality_scores,
"errors": len(progress.errors)
}
# Compare quality vs. complexity
simple_quality = state["progress_simple"].quality_scores.get("overall", 0)
medium_quality = state["progress_medium"].quality_scores.get("overall", 0)
complex_quality = state["progress_complex"].quality_scores.get("overall", 0)
aggregated["comparison"]["quality_ranking"] = {
"best": max(
("simple", simple_quality),
("medium", medium_quality),
("complex", complex_quality),
key=lambda x: x[1]
)[0],
"scores": {
"simple": simple_quality,
"medium": medium_quality,
"complex": complex_quality
}
}
# Time complexity analysis
simple_time = state["progress_simple"].elapsed_ms
medium_time = state["progress_medium"].elapsed_ms
complex_time = state["progress_complex"].elapsed_ms
simple_size = 100 * 1.0
medium_size = 100 * 3.0
complex_size = 100 * 8.0
aggregated["comparison"]["time_per_item_ms"] = {
"simple": simple_time / simple_size,
"medium": medium_time / medium_size,
"complex": complex_time / complex_size,
}
# Identify scaling issues
if complex_time / complex_size > simple_time / simple_size * 2:
aggregated["recommendations"].append("Sublinear scaling—excellent performance with increased load")
elif complex_time / complex_size < simple_time / simple_size * 0.8:
aggregated["recommendations"].append("Superlinear scaling—overhead increases with load")
# Success patterns
success_patterns = []
for scenario_id in ["simple", "medium", "complex"]:
if state[f"progress_{scenario_id}"].status == "complete" and state[f"progress_{scenario_id}"].errors == []:
success_patterns.append(scenario_id)
aggregated["recommendations"].append(f"Successful in all scenarios: {', '.join(success_patterns)}")
return {"final_results": aggregated}3. Graph Construction
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.postgres import PostgresSaver
from langgraph.types import Command
def build_scenario_orchestrator(
checkpointer: PostgresSaver | None = None
) -> Any:
"""
Build the complete orchestration graph.
"""
graph = StateGraph(ScenarioOrchestratorState)
# Nodes
graph.add_node("supervisor", scenario_supervisor)
graph.add_node("scenario_worker", scenario_worker)
graph.add_node("aggregator", scenario_aggregator)
# Edges
graph.add_edge(START, "supervisor")
# Fan-out: supervisor sends to 3 parallel workers
graph.add_conditional_edges(
"supervisor",
lambda _: ["scenario_worker", "scenario_worker", "scenario_worker"]
)
# Workers converge at aggregator
graph.add_edge("scenario_worker", "aggregator")
graph.add_edge("aggregator", END)
# Compile with checkpointing
return graph.compile(checkpointer=checkpointer)4. Invocation Example
import asyncio
import uuid
from langgraph.checkpoint.postgres import PostgresSaver
async def main():
# Setup checkpointing
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:password@localhost/orchestkit"
)
# Build orchestrator
app = build_scenario_orchestrator(checkpointer=checkpointer)
# Prepare initial state
initial_state: ScenarioOrchestratorState = {
"orchestration_id": f"demo-{uuid.uuid4().hex[:8]}",
"start_time_unix": int(time.time()),
"skill_name": "your-skill-name",
"skill_version": "1.0.0",
# Scenarios
"scenario_simple": ScenarioDefinition(
name="simple",
difficulty="easy",
complexity_multiplier=1.0,
input_size=100,
dataset_characteristics={"distribution": "uniform"},
time_budget_seconds=30,
memory_limit_mb=256,
error_tolerance=0.0,
skill_params={"batch_size": 10, "cache_enabled": True},
expected_quality="basic",
quality_metrics=["accuracy", "coverage"]
),
"scenario_medium": ScenarioDefinition(
name="medium",
difficulty="intermediate",
complexity_multiplier=3.0,
input_size=300,
dataset_characteristics={"distribution": "uniform"},
time_budget_seconds=90,
memory_limit_mb=512,
error_tolerance=0.05,
skill_params={"batch_size": 50, "cache_enabled": True},
expected_quality="good",
quality_metrics=["accuracy", "coverage"]
),
"scenario_complex": ScenarioDefinition(
name="complex",
difficulty="advanced",
complexity_multiplier=8.0,
input_size=800,
dataset_characteristics={"distribution": "skewed"},
time_budget_seconds=300,
memory_limit_mb=1024,
error_tolerance=0.1,
skill_params={"batch_size": 100, "cache_enabled": True, "parallel_workers": 4},
expected_quality="excellent",
quality_metrics=["accuracy", "coverage", "latency"]
),
# Progress tracking
"progress_simple": ScenarioProgress(scenario_id="simple"),
"progress_medium": ScenarioProgress(scenario_id="medium"),
"progress_complex": ScenarioProgress(scenario_id="complex"),
# Synchronization
"sync_points": {},
"last_sync_time": 0,
}
# Run with thread_id for checkpointing
config = {"configurable": {"thread_id": f"orch-{initial_state['orchestration_id']}"}}
print("Starting multi-scenario orchestration...")
result = await app.ainvoke(initial_state, config=config)
# Print results
final = result["final_results"]
print("\n" + "="*60)
print("ORCHESTRATION RESULTS")
print("="*60)
print(f"Orchestration ID: {final['orchestration_id']}")
print(f"Skill: {final['skill']}")
print("\nQuality Comparison:")
for scenario, score in final["comparison"]["quality_ranking"]["scores"].items():
print(f" {scenario}: {score:.2f}")
print("\nTime per Item (ms):")
for scenario, time in final["comparison"]["time_per_item_ms"].items():
print(f" {scenario}: {time:.2f}ms")
print("\nRecommendations:")
for rec in final["recommendations"]:
print(f" • {rec}")
if __name__ == "__main__":
asyncio.run(main())5. Helper Functions
def chunks(items: list, size: int):
"""Split items into chunks."""
for i in range(0, len(items), size):
yield items[i:i + size]
def generate_test_data(size: int, characteristics: dict) -> list:
"""Generate test data based on scenario characteristics."""
import random
distribution = characteristics.get("distribution", "uniform")
if distribution == "uniform":
return [{"id": i, "value": random.random()} for i in range(size)]
elif distribution == "skewed":
# Zipfian distribution
return [
{"id": i, "value": random.random() ** 2}
for i in range(size)
]
else:
return [{"id": i, "value": random.random()} for i in range(size)]
async def invoke_skill(
skill_name: str,
input_data: list,
params: dict
) -> dict:
"""
Invoke your skill here.
Replace with actual skill invocation.
"""
# Simulate processing
await asyncio.sleep(0.1) # 100ms per batch
return {
"processed": len(input_data),
"quality_score": 0.85 + (random.random() * 0.15),
"timestamp": datetime.now().isoformat()
}
def calculate_quality_metrics(batches: list, metrics: list[str]) -> dict:
"""Calculate quality metrics across batches."""
if not batches:
return {metric: 0.0 for metric in metrics}
scores = {
"accuracy": sum(b.get("quality_score", 0) for b in batches) / len(batches),
"coverage": 1.0,
}
return {metric: scores.get(metric, 0.0) for metric in metrics}6. Streaming Results (Real-time Progress)
async def stream_orchestration_progress(
app,
initial_state: ScenarioOrchestratorState,
config: dict
):
"""
Stream progress updates as scenarios execute.
"""
async for step in app.astream(initial_state, config=config, stream_mode="updates"):
print(f"\n[UPDATE] {step}")
# Extract progress from step
if "progress_simple" in step:
p = step["progress_simple"]
print(f" Simple: {p.progress_pct:.1f}% ({p.items_processed} items)")
if "progress_medium" in step:
p = step["progress_medium"]
print(f" Medium: {p.progress_pct:.1f}% ({p.items_processed} items)")
if "progress_complex" in step:
p = step["progress_complex"]
print(f" Complex: {p.progress_pct:.1f}% ({p.items_processed} items)")Key Features
- Fan-Out/Fan-In: All 3 scenarios execute in parallel
- Milestone Tracking: Progress recorded at key checkpoints
- Synchronization: Optional wait points at 30% and 70%
- Error Isolation: One scenario's failure doesn't block others
- Checkpointing: State saved to PostgreSQL for recovery
- Aggregation: Cross-scenario analysis and recommendations
- Streaming: Real-time progress updates
Testing
@pytest.mark.asyncio
async def test_multi_scenario_orchestration():
# Mock checkpointer
from langgraph.checkpoint.memory import MemorySaver
app = build_scenario_orchestrator(checkpointer=MemorySaver())
initial_state = {...} # Setup
config = {"configurable": {"thread_id": "test-123"}}
result = await app.ainvoke(initial_state, config=config)
assert result["final_results"]["orchestration_id"]
assert "simple" in result["final_results"]["metrics"]
assert "medium" in result["final_results"]["metrics"]
assert "complex" in result["final_results"]["metrics"]Microsoft Agent Framework
Microsoft Agent Framework
Microsoft Agent Framework (AutoGen + Semantic Kernel merger) patterns for enterprise multi-agent systems.
AssistantAgent Setup
from autogen_agentchat.agents import AssistantAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
# Create model client
model_client = OpenAIChatCompletionClient(
model="gpt-5.2",
api_key=os.environ["OPENAI_API_KEY"]
)
# Define assistant agent
assistant = AssistantAgent(
name="assistant",
description="A helpful AI assistant",
model_client=model_client,
system_message="You are a helpful assistant. Answer questions concisely."
)Team Patterns
Round Robin Chat
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
# Define team members
planner = AssistantAgent(
name="planner",
description="Plans tasks",
model_client=model_client,
system_message="You plan tasks. When done, say 'PLAN_COMPLETE'."
)
executor = AssistantAgent(
name="executor",
description="Executes tasks",
model_client=model_client,
system_message="You execute the plan. When done, say 'EXECUTION_COMPLETE'."
)
reviewer = AssistantAgent(
name="reviewer",
description="Reviews work",
model_client=model_client,
system_message="Review the work. Say 'APPROVED' if satisfactory."
)
# Create team with termination
termination = TextMentionTermination("APPROVED")
team = RoundRobinGroupChat(
participants=[planner, executor, reviewer],
termination_condition=termination
)
# Run team
result = await team.run(task="Create a marketing strategy")Selector Group Chat
from autogen_agentchat.teams import SelectorGroupChat
# Selector chooses next speaker based on context
team = SelectorGroupChat(
participants=[analyst, writer, reviewer],
model_client=model_client, # For selection decisions
termination_condition=termination
)Tool Integration
from autogen_core.tools import FunctionTool
# Define tool function
def search_database(query: str) -> str:
"""Search the database for information."""
results = db.search(query)
return json.dumps(results)
# Create tool
search_tool = FunctionTool(search_database, description="Search the database")
# Agent with tools
researcher = AssistantAgent(
name="researcher",
description="Researches information",
model_client=model_client,
tools=[search_tool],
system_message="Use the search tool to find information."
)Termination Conditions
from autogen_agentchat.conditions import (
TextMentionTermination,
MaxMessageTermination,
TokenUsageTermination,
TimeoutTermination
)
# Combine termination conditions
from autogen_agentchat.conditions import OrTerminationCondition
termination = OrTerminationCondition(
TextMentionTermination("DONE"),
MaxMessageTermination(max_messages=20),
TimeoutTermination(timeout_seconds=300)
)Streaming
# Stream team responses
async for message in team.run_stream(task="Analyze this data"):
print(f"{message.source}: {message.content}")State Management
from autogen_agentchat.state import TeamState
# Save state
state = await team.save_state()
# Restore state
await team.load_state(state)
# Resume conversation
result = await team.run(task="Continue from where we left off")Agent-to-Agent Protocol (A2A)
from autogen_agentchat.protocols import A2AProtocol
# Enable A2A for cross-organization agent communication
protocol = A2AProtocol(
agent=my_agent,
endpoint="https://api.example.com/agent",
auth_token=os.environ["A2A_TOKEN"]
)
# Send message to external agent
response = await protocol.send(
to="external-agent-id",
message="Process this request"
)Migration from AutoGen 0.2
# Old AutoGen 0.2 pattern
# from autogen import AssistantAgent, UserProxyAgent
# New AutoGen 0.4+ pattern
from autogen_agentchat.agents import AssistantAgent
from autogen_agentchat.teams import RoundRobinGroupChat
# Key differences:
# - No UserProxyAgent needed for simple tasks
# - Teams replace GroupChat
# - Explicit termination conditions required
# - Model client separate from agentConfiguration
- Model clients: OpenAI, Azure OpenAI, Anthropic supported
- Teams: RoundRobin, Selector, Custom
- Termination: Text mention, max messages, timeout, token usage
- Tools: FunctionTool wrapper for Python functions
- State: Full state serialization for persistence
Best Practices
- Termination conditions: Always set explicit termination
- Team size: 3-5 agents optimal for most workflows
- System messages: Clear role definitions in system_message
- Tool design: One function per tool, clear descriptions
- Error handling: Use try/except around team.run()
- Streaming: Use run_stream() for real-time feedback
Openai Agents Sdk
OpenAI Agents SDK
OpenAI Agents SDK (v0.12+) patterns for handoffs, guardrails, agents-as-tools, sessions, MCP servers, tool search, and tracing. Provider-agnostic — supports 100+ LLMs via LiteLLM.
Requirements
# Install (requires Python 3.10+, supports up to 3.14)
pip install openai-agents>=0.12.0
# Note: Requires openai v2.x (v1.x no longer supported)
# openai>=2.9.0,<3 is requiredBasic Agent Definition
from agents import Agent, Runner
agent = Agent(
name="assistant",
instructions="You are a helpful assistant that answers questions.",
model="gpt-5.2"
)
# Synchronous run
runner = Runner()
result = runner.run_sync(agent, "What is the capital of France?")
print(result.final_output)Sessions (v0.6.6+)
Sessions provide automatic conversation history management across agent runs.
from agents import Agent, Runner
from agents.sessions import SQLiteSession
# Create a session store
session = SQLiteSession(db_path="conversations.db")
# Agent with automatic history management
agent = Agent(
name="assistant",
instructions="You are a helpful assistant.",
model="gpt-5.2"
)
runner = Runner()
# Sessions automatically:
# - Retrieve conversation history before each run
# - Store new messages after each run
result = await runner.run(
agent,
"Remember my name is Alice",
session=session,
session_id="user-123"
)
# Later conversation - history is automatic
result = await runner.run(
agent,
"What is my name?", # Agent recalls "Alice"
session=session,
session_id="user-123"
)Session Types
from agents.sessions import (
SQLiteSession, # File-based persistence
AsyncSQLiteSession, # Async SQLite (v0.6.6+)
SQLAlchemySession, # Database-agnostic ORM
RedisSession, # Redis-backed sessions
EncryptedSession, # Encrypted storage
)
# AsyncSQLiteSession for async workflows
async_session = AsyncSQLiteSession(db_path="async_conversations.db")
# Auto-compaction for long conversations (v0.6.6+)
# Use responses.compact: "auto" to manage context lengthHandoffs Between Agents
from agents import Agent, handoff, RunConfig
from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX
# Specialist agents
billing_agent = Agent(
name="billing",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You handle billing inquiries. Check account status and payment issues.
Hand back to triage when billing issue is resolved.""",
model="gpt-5.2"
)
support_agent = Agent(
name="support",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You handle technical support. Troubleshoot issues and provide solutions.
Hand back to triage when support issue is resolved.""",
model="gpt-5.2"
)
# Triage agent with handoffs
triage_agent = Agent(
name="triage",
instructions=f"""{RECOMMENDED_PROMPT_PREFIX}
You are the first point of contact. Determine the nature of inquiries.
- Billing questions: hand off to billing
- Technical issues: hand off to support
- General questions: answer directly""",
model="gpt-5.2",
handoffs=[
handoff(agent=billing_agent),
handoff(agent=support_agent)
]
)Handoff History Packaging (v0.7.0)
In v0.7.0, nested handoffs are opt-in (previously enabled by default). When enabled, conversation history is collapsed into a single assistant message wrapped in a <CONVERSATION HISTORY> block.
from agents import RunConfig
# Enable nested handoff history (opt-in in v0.7.0)
config = RunConfig(nest_handoff_history=True)
result = await runner.run(
triage_agent,
"I have a billing question",
run_config=config
)
# History is packaged as a single assistant message:
# <CONVERSATION HISTORY>
# [collapsed prior transcript]
# </CONVERSATION HISTORY>Handoff Input Filters
from agents import handoff
from agents.extensions.handoff_filters import remove_all_tools
# Filter what the receiving agent sees
billing_handoff = handoff(
agent=billing_agent,
input_filter=remove_all_tools # Strip tool calls from history
)
# Custom input filter
def custom_filter(data):
# Modify HandoffInputData before passing to receiving agent
return data
support_handoff = handoff(
agent=support_agent,
input_filter=custom_filter,
nest_handoff_history=True # Per-handoff override
)MCPServerManager (v0.7.0)
Manage multiple MCP server instances with improved lifecycle safety.
from agents import Agent
from agents.mcp import MCPServerManager, MCPServerStdio, MCPServerStreamableHTTP
# Create MCP server manager for multiple servers
async with MCPServerManager() as manager:
# Add stdio-based MCP server
filesystem_server = await manager.add_server(
MCPServerStdio(
name="filesystem",
command="npx",
args=["-y", "@anthropic/mcp-filesystem", "/path/to/dir"]
)
)
# Add HTTP-based MCP server
api_server = await manager.add_server(
MCPServerStreamableHTTP(
name="api-tools",
url="http://localhost:8080/mcp"
)
)
# Get tools from all servers
all_tools = await manager.list_tools()
# Create agent with MCP tools
agent = Agent(
name="mcp_agent",
instructions="Use available tools to help the user.",
model="gpt-5.2",
mcp_servers=[filesystem_server, api_server]
)
runner = Runner()
result = await runner.run(agent, "List files in the current directory")Single MCP Server
from agents import Agent
from agents.mcp import MCPServerStdio
# Single MCP server (simpler pattern)
async with MCPServerStdio(
name="filesystem",
command="npx",
args=["-y", "@anthropic/mcp-filesystem", "/tmp"]
) as server:
agent = Agent(
name="file_agent",
instructions="Help with file operations.",
model="gpt-5.2",
mcp_servers=[server]
)
result = await runner.run(agent, "What files are available?")Agents as Tools
from agents import Agent, tool
# Define tool functions
@tool
def search_knowledge_base(query: str) -> str:
"""Search the knowledge base for relevant information."""
# Implementation
return search_results
@tool
def create_ticket(title: str, description: str, priority: str) -> str:
"""Create a support ticket in the system."""
ticket_id = ticket_system.create(title, description, priority)
return f"Created ticket {ticket_id}"
# Agent with tools
support_agent = Agent(
name="support",
instructions="Help users with technical issues. Search knowledge base first.",
model="gpt-5.2",
tools=[search_knowledge_base, create_ticket]
)Guardrails
from agents import Agent, InputGuardrail, OutputGuardrail
from agents.exceptions import InputGuardrailException
# Input guardrail
class ContentFilter(InputGuardrail):
async def check(self, input_text: str) -> str:
if contains_pii(input_text):
raise InputGuardrailException("PII detected in input")
return input_text
# Output guardrail
class ResponseValidator(OutputGuardrail):
async def check(self, output_text: str) -> str:
if contains_harmful_content(output_text):
return "I cannot provide that information."
return output_text
# Agent with guardrails
agent = Agent(
name="safe_assistant",
instructions="You are a helpful assistant.",
model="gpt-5.2",
input_guardrails=[ContentFilter()],
output_guardrails=[ResponseValidator()]
)Tool Guardrails (v0.6.5+)
from agents import tool, tool_guardrail
@tool_guardrail
def validate_ticket_priority(priority: str) -> str:
"""Validate ticket priority before creation."""
valid = ["low", "medium", "high", "critical"]
if priority.lower() not in valid:
raise ValueError(f"Priority must be one of: {valid}")
return priority
@tool
@validate_ticket_priority
def create_ticket(title: str, description: str, priority: str) -> str:
"""Create a support ticket."""
return f"Created ticket with priority {priority}"Tracing and Observability
from agents import Agent, Runner, trace
# Enable tracing
runner = Runner(trace=True)
# Custom trace spans
async def complex_workflow(task: str):
with trace.span("research_phase"):
research = await runner.run(researcher, task)
with trace.span("writing_phase"):
content = await runner.run(writer, research.final_output)
return content
# Access trace data
result = await runner.run(agent, "Process this request")
print(result.trace_id) # For debugging
# Per-run tracing API key (v0.6.5+)
config = RunConfig(tracing={"api_key": "sk-tracing-123"})
result = await runner.run(agent, "Task", run_config=config)Streaming Responses
from agents import Agent, Runner
agent = Agent(
name="streamer",
instructions="Provide detailed explanations.",
model="gpt-5.2"
)
runner = Runner()
# Stream response chunks
async for chunk in runner.run_streamed(agent, "Explain quantum computing"):
print(chunk.content, end="", flush=True)Multi-Agent Conversation
from agents import Agent, Runner
# Manual conversation management
runner = Runner()
# Initial run
result1 = await runner.run(triage_agent, "I need help with my account")
# Continue with result's input list
result2 = await runner.run(
result1.handoff_to or triage_agent,
"Can you check my billing?",
input=result1.to_input_list() # Carries conversation history
)
# Or use Sessions for automatic management (recommended)
from agents.sessions import SQLiteSession
session = SQLiteSession(db_path="conversations.db")
result = await runner.run(
triage_agent,
"I need help",
session=session,
session_id="conv-123"
)Realtime and Voice
from agents import Agent
from agents.realtime import RealtimeRunner, OpenAIRealtimeWebSocketModel
# WebSocket-based realtime agent
model = OpenAIRealtimeWebSocketModel(
model="gpt-5.2-realtime",
# Custom WebSocket options (v0.7.0+)
websocket_options={"ping_interval": 30}
)
agent = Agent(
name="voice_assistant",
instructions="You are a voice assistant.",
model=model
)
# Realtime runner for voice interactions
realtime_runner = RealtimeRunner()
await realtime_runner.run(agent, audio_stream)Configuration
Model Settings (v0.7.0)
from agents import Agent, RunConfig
# Note: Default reasoning.effort for gpt-5.1/5.2 is now "none"
# (previously "low"). Set explicitly if needed:
config = RunConfig(
model_settings={
"reasoning": {"effort": "low"} # Restore previous behavior
}
)
agent = Agent(
name="assistant",
instructions="You are helpful.",
model="gpt-5.2"
)
result = await runner.run(agent, "Complex task", run_config=config)Provider Support
The SDK supports 100+ LLMs via LiteLLM integration:
from agents import Agent, set_default_openai_api
# Use Chat Completions API (for non-OpenAI providers)
set_default_openai_api("chat_completions")
# Or use LiteLLM for other providers
# pip install openai-agents[litellm]
from agents.extensions.litellm import LiteLLMModel
agent = Agent(
name="claude_agent",
instructions="You are helpful.",
model=LiteLLMModel(model="anthropic/claude-sonnet-4-20250514")
)Best Practices
- Handoff clarity: Use
RECOMMENDED_PROMPT_PREFIXfor reliable handoffs - Tool documentation: Clear docstrings improve tool selection accuracy
- Guardrail layers: Combine input + output guardrails for defense-in-depth
- Tracing: Always enable in production for debugging
- Error handling: Catch guardrail exceptions gracefully
- Sessions: Use for multi-turn conversations instead of manual history
- MCP servers: Use
MCPServerManagerfor multiple servers with proper lifecycle
Breaking Changes in v0.7.0
| Change | Previous | v0.7.0 |
|---|---|---|
| Nested handoffs | Enabled by default | Opt-in via nest_handoff_history=True |
| Reasoning effort | Default "low" | Default "none" for gpt-5.1/5.2 |
| Session input | Required callback | Auto-append (callback optional) |
| OpenAI library | v1.x supported | Requires v2.x (>=2.9.0) |
Version History
- v0.12.3 (Mar 2026): Tool search with namespaces, gpt-5.4 computer use support
- v0.12.0 (Mar 2026): Provider-agnostic (100+ LLMs via LiteLLM), tool search tool
- v0.7.0 (Jan 2026): MCPServerManager, opt-in nested handoffs, session input auto-append
- v0.6.6 (Jan 2026): Auto-compaction, AsyncSQLiteSession
- v0.6.5 (Jan 2026): Per-run tracing, tool guardrails decorator
- v0.6.0: Sessions feature, improved handoff history
Skill Agnostic Template
Skill-Agnostic Template Abstraction
Reusable template framework for applying multi-scenario orchestration to ANY user-invocable skill.
Template Architecture
┌─────────────────────────────────────────────────────────────┐
│ SKILL ORCHESTRATION TEMPLATE │
├─────────────────────────────────────────────────────────────┤
│ │
│ [Abstract Orchestrator] │
│ ├─ Scenario Definition (parameterized) │
│ ├─ State Machine (generic) │
│ ├─ Synchronization (milestone-based) │
│ └─ Aggregation (cross-scenario) │
│ │
│ [Skill Adapter Layer] ◄── Pluggable per skill │
│ ├─ invoke_skill() │
│ ├─ calculate_quality_metrics() │
│ ├─ scenario_configurations() │
│ └─ expected_outcomes() │
│ │
│ [Target Skill] ◄── Your skill here │
│ ├─ skill-name │
│ ├─ skill-version │
│ └─ skill-params │
│ │
└─────────────────────────────────────────────────────────────┘Generic Orchestrator Class
from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar
from dataclasses import dataclass
T = TypeVar("T") # Result type
class SkillOrchestrator(ABC, Generic[T]):
"""
Abstract orchestrator for any user-invocable skill.
Subclass for each skill and implement abstract methods.
"""
def __init__(self, skill_name: str, skill_version: str):
self.skill_name = skill_name
self.skill_version = skill_version
# ──────────────────────────────────────────────────────────
# Abstract Methods (MUST implement per skill)
# ──────────────────────────────────────────────────────────
@abstractmethod
async def invoke_skill(
self,
input_data: list[dict],
scenario_params: dict,
) -> dict[str, Any]:
"""
Invoke your skill on input data.
Args:
input_data: Items to process
scenario_params: Skill-specific parameters (batch_size, etc.)
Returns:
dict with keys:
- "processed": int (items successfully processed)
- "results": list[dict] (processed items)
- "quality_score": float (0-1)
- "metrics": dict (skill-specific metrics)
- "errors": list[str] (any errors encountered)
"""
pass
@abstractmethod
def get_scenario_configs(self) -> dict[str, dict]:
"""
Return scenario configurations for simple/medium/complex.
Returns:
{
"simple": {
"input_size": 100,
"time_budget_seconds": 30,
"batch_size": 10,
"skill_params": {...}
},
"medium": {...},
"complex": {...}
}
"""
pass
@abstractmethod
def calculate_quality_metrics(
self,
results: list[dict],
metric_names: list[str]
) -> dict[str, float]:
"""
Calculate quality metrics from results.
Args:
results: Processed items from skill
metric_names: ["accuracy", "coverage", ...]
Returns:
{
"accuracy": 0.92,
"coverage": 0.98,
...
}
"""
pass
@abstractmethod
def generate_test_data(
self,
size: int,
characteristics: dict
) -> list[dict]:
"""
Generate test data for this skill.
Args:
size: Number of items
characteristics: {"distribution": "uniform|skewed|clustered"}
Returns:
list of test items matching skill's input format
"""
pass
# ──────────────────────────────────────────────────────────
# Concrete Methods (reusable across all skills)
# ──────────────────────────────────────────────────────────
async def run_scenario(
self,
scenario_id: str,
orchestration_id: str,
) -> dict:
"""
Execute one scenario (simple/medium/complex).
Concrete implementation—uses abstract methods.
"""
config = self.get_scenario_configs()[scenario_id]
# Generate test data
test_data = self.generate_test_data(
size=config["input_size"],
characteristics=config.get("dataset_characteristics", {})
)
# Process in batches
all_results = []
batch_size = config.get("batch_size", 10)
for batch_idx, batch in enumerate(chunks(test_data, batch_size)):
# Invoke skill (abstract—implemented per skill)
result = await self.invoke_skill(batch, config["skill_params"])
all_results.extend(result.get("results", []))
progress_pct = (len(all_results) / len(test_data)) * 100
print(f"[{scenario_id}] Progress: {progress_pct:.1f}%")
# Calculate quality (abstract—implemented per skill)
quality = self.calculate_quality_metrics(
all_results,
config.get("quality_metrics", ["accuracy"])
)
return {
"scenario_id": scenario_id,
"orchestration_id": orchestration_id,
"items_processed": len(all_results),
"quality": quality,
"results": all_results,
}
async def orchestrate(
self,
orchestration_id: str
) -> dict:
"""
Run all 3 scenarios in parallel and aggregate results.
Concrete implementation—reusable for all skills.
"""
import asyncio
# Run scenarios in parallel
results = await asyncio.gather(
self.run_scenario("simple", orchestration_id),
self.run_scenario("medium", orchestration_id),
self.run_scenario("complex", orchestration_id),
return_exceptions=True
)
# Aggregate
aggregated = self.aggregate_results(results)
return aggregated
def aggregate_results(self, scenario_results: list[dict]) -> dict:
"""
Combine results from all 3 scenarios.
Generic aggregation—works for any skill.
"""
# Extract results by scenario
by_scenario = {r.get("scenario_id"): r for r in scenario_results if not isinstance(r, Exception)}
# Quality comparison
quality_comparison = {
sid: r.get("quality", {})
for sid, r in by_scenario.items()
}
# Time complexity (simulated—customize per skill)
simple_items = by_scenario.get("simple", {}).get("items_processed", 1)
medium_items = by_scenario.get("medium", {}).get("items_processed", 1)
complex_items = by_scenario.get("complex", {}).get("items_processed", 1)
scaling_ratio = complex_items / simple_items if simple_items > 0 else 1
return {
"orchestration_id": scenario_results[0].get("orchestration_id"),
"skill": self.skill_name,
"timestamp": datetime.now().isoformat(),
"results_by_scenario": by_scenario,
"quality_comparison": quality_comparison,
"scaling_analysis": {
"simple_vs_complex": scaling_ratio,
"recommendation": self._scaling_recommendation(scaling_ratio)
},
"success_rate": sum(1 for r in scenario_results if not isinstance(r, Exception)) / len(scenario_results),
}
@staticmethod
def _scaling_recommendation(ratio: float) -> str:
"""Recommend based on scaling behavior."""
if ratio < 1.5:
return "Excellent sublinear scaling"
elif ratio < 3:
return "Good linear scaling"
elif ratio < 10:
return "Acceptable superlinear scaling"
else:
return "Poor scaling—investigate bottlenecks"Example Implementation: Performance Testing
class PerformanceTestingOrchestrator(SkillOrchestrator):
"""Multi-scenario orchestration for performance-testing skill."""
async def invoke_skill(
self,
input_data: list[dict],
scenario_params: dict,
) -> dict:
"""
Invoke k6 or Locust performance test.
"""
import asyncio
# Mock: simulate running k6 with duration
duration_ms = scenario_params.get("duration_ms", 5000)
await asyncio.sleep(duration_ms / 1000) # Simulate k6 execution
return {
"processed": len(input_data),
"results": [
{
"endpoint": item["url"],
"response_time_ms": 100 + (i * 50),
"status": "200" if i % 10 != 0 else "500",
}
for i, item in enumerate(input_data)
],
"quality_score": 0.85,
"metrics": {
"p95_latency_ms": 450,
"p99_latency_ms": 500,
"error_rate": 0.01,
},
"errors": [],
}
def get_scenario_configs(self) -> dict:
return {
"simple": {
"input_size": 10, # 10 endpoints
"time_budget_seconds": 30,
"batch_size": 5,
"dataset_characteristics": {"distribution": "uniform"},
"quality_metrics": ["latency", "error_rate"],
"skill_params": {
"duration_ms": 5000,
"ramp_up_seconds": 2,
"load_profile": "steady"
}
},
"medium": {
"input_size": 30, # 30 endpoints
"time_budget_seconds": 90,
"batch_size": 15,
"dataset_characteristics": {"distribution": "uniform"},
"quality_metrics": ["latency", "error_rate"],
"skill_params": {
"duration_ms": 30000,
"ramp_up_seconds": 5,
"load_profile": "ramp_and_hold"
}
},
"complex": {
"input_size": 80, # 80 endpoints
"time_budget_seconds": 300,
"batch_size": 40,
"dataset_characteristics": {"distribution": "skewed"},
"quality_metrics": ["latency", "error_rate", "throughput"],
"skill_params": {
"duration_ms": 120000,
"ramp_up_seconds": 10,
"load_profile": "spike"
}
}
}
def calculate_quality_metrics(
self,
results: list[dict],
metric_names: list[str]
) -> dict:
"""Calculate latency and error rate metrics."""
if not results:
return {m: 0.0 for m in metric_names}
latencies = [r.get("response_time_ms", 0) for r in results]
errors = sum(1 for r in results if r.get("status", "200") != "200")
scores = {}
if "latency" in metric_names:
# p95 latency score: higher is better (inverse of latency)
p95 = sorted(latencies)[int(len(latencies) * 0.95)]
# Score: 1.0 if p95 < 100ms, 0.0 if > 500ms
scores["latency"] = max(0, min(1.0, (500 - p95) / 400))
if "error_rate" in metric_names:
# Error rate score: 1.0 if 0%, 0.0 if > 5%
error_pct = (errors / len(results)) * 100 if results else 0
scores["error_rate"] = max(0, min(1.0, (5 - error_pct) / 5))
if "throughput" in metric_names:
# Throughput score (normalized)
rps = len(results) / 30 # Assume 30-second test
scores["throughput"] = min(1.0, rps / 100) # Max 100 RPS = 1.0
return scores
def generate_test_data(
self,
size: int,
characteristics: dict
) -> list[dict]:
"""Generate endpoints to test."""
endpoints = [
{"url": f"https://api.example.com/endpoint/{i}", "method": "GET"}
for i in range(size)
]
if characteristics.get("distribution") == "skewed":
# Repeat 20% of endpoints (hot paths)
hot_endpoints = endpoints[:int(size * 0.2)]
return endpoints + hot_endpoints
return endpointsExample Implementation: Security Scanning
class SecurityScanningOrchestrator(SkillOrchestrator):
"""Multi-scenario orchestration for security-scanning skill."""
async def invoke_skill(
self,
input_data: list[dict],
scenario_params: dict,
) -> dict:
"""Invoke security scanner on code files."""
import random
# Simulate scanning files
results = []
for item in input_data:
findings = []
# Simulate vulnerability detection
if random.random() > 0.9:
findings.append({
"type": "SQL_INJECTION",
"severity": "HIGH",
"line": random.randint(1, 100)
})
results.append({
"file": item["path"],
"scanned": True,
"findings": findings,
"score": 1.0 - (len(findings) * 0.1),
})
return {
"processed": len(input_data),
"results": results,
"quality_score": 0.95,
"metrics": {
"vulnerabilities_found": sum(len(r.get("findings", [])) for r in results),
"files_scanned": len(results),
},
"errors": [],
}
def get_scenario_configs(self) -> dict:
return {
"simple": {
"input_size": 20, # 20 files
"time_budget_seconds": 45,
"batch_size": 10,
"dataset_characteristics": {"distribution": "uniform"},
"quality_metrics": ["coverage", "accuracy"],
"skill_params": {
"scan_depth": "shallow",
"rules": "OWASP_TOP_10"
}
},
"medium": {
"input_size": 100, # 100 files
"time_budget_seconds": 120,
"batch_size": 50,
"dataset_characteristics": {"distribution": "uniform"},
"quality_metrics": ["coverage", "accuracy"],
"skill_params": {
"scan_depth": "medium",
"rules": "OWASP_TOP_10 + CWE_TOP_25"
}
},
"complex": {
"input_size": 500, # 500 files
"time_budget_seconds": 600,
"batch_size": 100,
"dataset_characteristics": {"distribution": "skewed"},
"quality_metrics": ["coverage", "accuracy", "false_positive_rate"],
"skill_params": {
"scan_depth": "deep",
"rules": "ALL",
"enable_ml_detection": True
}
}
}
def calculate_quality_metrics(
self,
results: list[dict],
metric_names: list[str]
) -> dict:
"""Calculate security scanning quality metrics."""
total_files = len(results)
scanned_files = sum(1 for r in results if r.get("scanned"))
total_findings = sum(len(r.get("findings", [])) for r in results)
scores = {}
if "coverage" in metric_names:
# Coverage: percentage of files successfully scanned
scores["coverage"] = scanned_files / max(1, total_files)
if "accuracy" in metric_names:
# Accuracy: average finding score (file-level quality)
avg_score = sum(r.get("score", 0) for r in results) / max(1, total_files)
scores["accuracy"] = avg_score
if "false_positive_rate" in metric_names:
# FPR: simulated (would need manual validation in production)
scores["false_positive_rate"] = 1.0 - (total_findings / (total_findings + 10))
return scores
def generate_test_data(
self,
size: int,
characteristics: dict
) -> list[dict]:
"""Generate Python/JS files to scan."""
files = [
{"path": f"src/module_{i:03d}.py"}
for i in range(size)
]
if characteristics.get("distribution") == "skewed":
# Add some large files (skew toward complexity)
large_files = [
{"path": f"src/legacy_module_{i:03d}.py", "size_lines": 5000}
for i in range(int(size * 0.1))
]
return files + large_files
return filesRegistration and Factory Pattern
class OrchestratorRegistry:
"""Register orchestrators for each skill."""
_registry: dict[str, type[SkillOrchestrator]] = {}
@classmethod
def register(cls, skill_name: str, orchestrator_class: type):
"""Register an orchestrator for a skill."""
cls._registry[skill_name] = orchestrator_class
@classmethod
def get(cls, skill_name: str) -> SkillOrchestrator:
"""Instantiate orchestrator for a skill."""
if skill_name not in cls._registry:
raise ValueError(f"No orchestrator registered for {skill_name}")
return cls._registry[skill_name](skill_name, version="1.0.0")
# Registration
OrchestratorRegistry.register("performance-testing", PerformanceTestingOrchestrator)
OrchestratorRegistry.register("security-scanning", SecurityScanningOrchestrator)
# Usage
orchestrator = OrchestratorRegistry.get("performance-testing")
result = await orchestrator.orchestrate("demo-001")Integration with LangGraph
async def scenario_worker_generic(state: ScenarioOrchestratorState) -> dict:
"""Generic worker that uses skill-agnostic orchestrator."""
scenario_id = state.get("scenario_id")
skill_name = state["skill_name"]
# Get orchestrator for this skill
orchestrator = OrchestratorRegistry.get(skill_name)
# Run scenario
result = await orchestrator.run_scenario(
scenario_id=scenario_id,
orchestration_id=state["orchestration_id"]
)
return {f"progress_{scenario_id}": result}Adding a New Skill
To orchestrate a NEW skill, follow these steps:
Step 1: Create Orchestrator Class
# file: backend/app/workflows/multi_scenario/my_skill_orchestrator.py
from skill_agnostic_template import SkillOrchestrator
class MySkillOrchestrator(SkillOrchestrator):
async def invoke_skill(self, input_data, scenario_params):
# TODO: Call your skill here
pass
def get_scenario_configs(self):
# TODO: Define simple/medium/complex configs
pass
def calculate_quality_metrics(self, results, metric_names):
# TODO: Implement quality scoring
pass
def generate_test_data(self, size, characteristics):
# TODO: Generate test data for your skill
passStep 2: Register
OrchestratorRegistry.register("my-skill", MySkillOrchestrator)Step 3: Run
orchestrator = OrchestratorRegistry.get("my-skill")
result = await orchestrator.orchestrate("demo-001")Total implementation time: 15-30 minutes per skill
Benefits of Abstraction
| Benefit | Impact |
|---|---|
| No state machine boilerplate | 10x faster to add new skill |
| Consistent patterns | Team learns once, applies everywhere |
| Automatic comparison | All skills compared same way |
| Easy to extend | Override only the methods you need |
| LangGraph integration ready | Works with checkpointing, streaming, etc. |
Testing Template
@pytest.fixture
def orchestrator():
return PerformanceTestingOrchestrator("performance-testing", "1.0.0")
@pytest.mark.asyncio
async def test_simple_scenario(orchestrator):
result = await orchestrator.run_scenario("simple", "test-001")
assert result["scenario_id"] == "simple"
assert result["items_processed"] > 0
assert "quality" in result
assert result["quality"]["latency"] > 0
@pytest.mark.asyncio
async def test_full_orchestration(orchestrator):
result = await orchestrator.orchestrate("test-002")
assert "results_by_scenario" in result
assert len(result["results_by_scenario"]) == 3
assert result["success_rate"] > 0.5State Machine Design
State Machine Design: Multi-Scenario Orchestration
Detailed state machine and abstraction patterns for ANY user-invocable skill.
Core State Machine
┌────────────────────────────────────────────────────────────────────────────┐
│ SCENARIO STATE MACHINE │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────┐ │
│ │ PENDING │ │
│ │ (awaiting start signal) │ │
│ └──────┬───────────────────────┘ │
│ │ supervisor.route_scenarios() │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ RUNNING │ │
│ │ (executing skill batches) │◄──────────┐ │
│ └──────┬───────────────────────┘ │ │
│ │ │ │
│ ┌────────────┼────────────┐ │ │
│ │ │ │ │ │
│ [pause] [milestone] [error] [resume] │
│ │ │ │ │ │
│ ▼ ▼ ▼ │ │
│ ┌────────┐ ┌─────────┐ ┌──────────┐ │ │
│ │ PAUSED │ │MILESTONE│ │ FAILED │ │ │
│ │(waiting)│ │(sync pt)│ │(error) │ │ │
│ └────┬───┘ └────┬────┘ └──────────┘ │ │
│ │ │ │ │
│ │[resume] │[continue] │ │
│ │ │ │ │
│ └─────┬─────┘ │ │
│ │ │ │
│ [error recovery]──────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ COMPLETE │ │
│ │ (all batches processed) │ │
│ └──────────────────────────────┘ │
│ │ │
│ │ aggregator.collect_and_aggregate() │
│ ▼ │
│ ┌──────────────────────────────┐ │
│ │ FINAL_RESULTS │ │
│ │ (ready for comparison) │ │
│ └──────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘
Legend:
─────────────────
→ Transition
[action] = trigger/activityDetailed State Definitions
State: PENDING
Entry Point: Created at orchestration start
@dataclass
class PendingState:
scenario_id: str = "simple|medium|complex"
orchestration_id: str
created_at: datetime
definition: ScenarioDefinition
# Awaiting signal from supervisor
waiting_for_start: bool = True
# Supervisor signal triggers transition
# Event: supervisor.route_scenarios() returns Send("scenario_worker", ...)
# Transition: PENDING → RUNNINGExit Condition: Supervisor sends routing command
State: RUNNING
Active Work State: Executing skill batches
@dataclass
class RunningState:
scenario_id: str
status: Literal["running"] = "running"
# Progress tracking
progress_pct: float = 0.0 # 0-100
current_milestone: str = "batch_1"
milestones_reached: list[str] = field(default_factory=list)
# Metrics (accumulating)
items_processed: int = 0
batch_count: int = 0
elapsed_ms: int = 0
memory_used_mb: int = 0
# Results (partial)
partial_results: list[dict] = field(default_factory=list)
quality_scores: dict = field(default_factory=dict)
# Error tracking
errors: list[dict] = field(default_factory=list)
# Transition flags
should_pause: bool = False
should_continue: bool = TrueActivities:
- Execute skill batch N
- Update progress_pct = (items_processed / input_size) * 100
- Check if progress_pct reached milestone (30%, 50%, 70%, 90%)
- Record partial results
- Monitor memory and elapsed time
Exit Conditions:
progress_pct >= 100→ COMPLETEerror_occurred and recovery_possible→ PAUSE (or retry)error_occurred and recovery_failed→ FAILED
State: PAUSED
Waiting State: At milestone synchronization point or for recovery
@dataclass
class PausedState:
scenario_id: str
status: Literal["paused"] = "paused"
# Why paused?
pause_reason: Literal["sync_point", "waiting_for_recovery", "user_halt"] = "sync_point"
# Checkpoint (for resuming)
checkpoint_index: int # Which batch to resume from
checkpoint_state: dict # Full state snapshot
# Sync info (for milestone pause)
milestone_pct: int
other_scenarios_status: dict # {"simple": 45, "medium": 30, "complex": 10}
time_paused_ms: int = 0
# Recovery info (for error pause)
retry_count: int = 0
max_retries: int = 3
last_error: str = ""Activities:
- Wait for other scenarios (if sync_point)
- Wait for recovery decision (if error)
- Monitor timeout (max 60 seconds)
- Record checkpoint for resumption
Exit Conditions:
all_scenarios_at_milestone→ RUNNING (resume)timeout and waiting_for_others→ RUNNING (proceed anyway)retry_count < max_retries→ RUNNING (retry)retry_count >= max_retries→ FAILED
State: FAILED
Terminal Error State: Unrecoverable error
@dataclass
class FailedState:
scenario_id: str
status: Literal["failed"] = "failed"
# Error details
error_message: str
error_type: str # "timeout", "exception", "memory", "validation"
stack_trace: str
# Progress at failure
progress_pct: float
items_processed: int
batch_failed: int
# Attempted recovery
recovery_attempted: bool = False
recovery_method: str = ""
# Partial results (if any)
partial_results: list[dict] = field(default_factory=list)Activities:
- Log error with full context
- Store partial results (won't complete, but data preserved)
- Notify coordinator
- Update checkpoint for debugging
Exit Condition: Terminal state (no transition out)
State: COMPLETE
Success State: All batches processed
@dataclass
class CompleteState:
scenario_id: str
status: Literal["complete"] = "complete"
# Final metrics
total_items_processed: int
total_batches: int
total_elapsed_ms: int
peak_memory_mb: int
# Final results
results: dict # Full aggregated output
quality_scores: dict
error_count: int = 0
# Comparison data
time_per_item_ms: float = 0.0
items_per_second: float = 0.0
# Quality assessment
quality_rank: Literal["basic", "good", "excellent"]
recommendation: strActivities:
- Calculate final metrics
- Score results
- Prepare for comparison
- Signal readiness for aggregation
Exit Condition: Transition to aggregator
State: FINAL_RESULTS
Aggregated State: All scenarios combined
@dataclass
class AggregatedFinalResults:
orchestration_id: str
timestamp: datetime
# Per-scenario results
results: dict # {"simple": {...}, "medium": {...}, "complex": {...}}
# Comparative analysis
quality_ranking: dict
time_complexity: dict # {"simple": 1.2ms/item, "medium": 1.8ms/item, ...}
scaling_efficiency: float # simple_time / complex_time / (simple_size / complex_size)
# Cross-scenario patterns
success_patterns: list[str]
failure_modes: list[str]
optimization_opportunities: list[str]
# Recommendations
best_difficulty: Literal["simple", "medium", "complex"]
resource_requirements: dict
estimated_production_scaling: dictTransition Rules
PENDING → RUNNING
Trigger: supervisor.route_scenarios() returns Send commands
async def supervisor_trigger(state):
return [
Send("scenario_worker", {"scenario_id": "simple", ...}),
Send("scenario_worker", {"scenario_id": "medium", ...}),
Send("scenario_worker", {"scenario_id": "complex", ...}),
]Guard: None (always allowed)
Actions on Entry:
- Set
status = "running" - Initialize
start_time_ms = now() - Start processing first batch
RUNNING → MILESTONE (PAUSED at sync point)
Trigger: progress_pct in [30, 50, 70, 90] AND synchronize_at_milestone
# In scenario_worker
if progress_pct in [30, 50, 70, 90]:
should_sync = await synchronize_at_milestone(milestone_pct, state)
if should_sync:
# Transition to PAUSED
state.status = "paused"
state.pause_reason = "sync_point"
state.milestone_pct = progress_pctGuard: Other scenarios must be within 5% of milestone OR timeout expires
Actions on Entry:
- Record checkpoint
- Wait for other scenarios (max 60s)
- Monitor progress of siblings
PAUSED → RUNNING (Resume after sync)
Trigger: All scenarios reached milestone OR timeout
# Monitor loop in sync node
if all_at_milestone or timeout_expired:
# Resume
state.status = "running"
state.pause_reason = None
# Continue from checkpointGuard: None (always allowed)
Actions on Entry:
- Load checkpoint
- Resume from interrupted batch
- Continue processing
RUNNING → COMPLETE
Trigger: items_processed >= input_size
# In scenario_worker
for batch_idx, batch in enumerate(batches):
await invoke_skill(batch)
items_processed += len(batch)
if items_processed >= scenario_def.input_size:
state.status = "complete"
breakGuard: None
Actions on Entry:
- Calculate final metrics
- Score all results
- Prepare aggregation data
RUNNING → PAUSED (Error recovery pause)
Trigger: Exception in skill invocation, recovery possible
try:
result = await invoke_skill(batch)
except SkillException as e:
if can_recover(e):
state.status = "paused"
state.pause_reason = "waiting_for_recovery"
state.last_error = str(e)
state.retry_count += 1
# Decision logic for retryGuard: retry_count < max_retries (usually 3)
Actions on Entry:
- Save checkpoint before error
- Log error details
- Increment retry counter
- Wait for retry decision
PAUSED → RUNNING (Retry after error)
Trigger: Manual retry decision or automatic retry eligible
if error_state.retry_count < max_retries:
state.status = "running"
# Load checkpoint, try againGuard: retry_count < max_retries
RUNNING → FAILED
Trigger: Exception and recovery not possible
try:
result = await invoke_skill(batch)
except FatalException as e:
state.status = "failed"
state.error_message = str(e)
state.error_type = type(e).__name__Guard: retry_count >= max_retries OR is_fatal_error
Actions on Entry:
- Log full error context
- Store partial results (if any)
- Notify aggregator about failure
- Don't block other scenarios
COMPLETE → FINAL_RESULTS
Trigger: All 3 scenarios reached COMPLETE or FAILED
# In aggregator_node
simple_complete = state.progress_simple.status in ["complete", "failed"]
medium_complete = state.progress_medium.status in ["complete", "failed"]
complex_complete = state.progress_complex.status in ["complete", "failed"]
if simple_complete and medium_complete and complex_complete:
return aggregate_results(state)Guard: All scenarios in terminal state
Actions on Entry:
- Merge results from all scenarios
- Calculate comparisons
- Extract patterns
- Generate recommendations
Abstraction for ANY Skill
The state machine remains skill-agnostic. Customization happens in:
- Skill Invocation:
invoke_skill()(your skill here) - Quality Metrics:
calculate_quality_metrics()(what to measure) - Scenario Parameters: Batch size, timeout, memory (adjust per skill)
Skill Integration Point
async def invoke_skill(
skill_name: str, # "performance-testing", "security-scanning", etc.
input_data: list, # Items to process
skill_params: dict, # Skill-specific config
) -> dict:
"""
Call any user-invocable skill.
This is the ONLY skill-specific code in the state machine.
"""
# Match skill_name and invoke appropriately
if skill_name == "performance-testing":
return await invoke_performance_testing(input_data, skill_params)
elif skill_name == "security-scanning":
return await invoke_security_scanning(input_data, skill_params)
elif skill_name == "your-skill":
return await invoke_your_skill(input_data, skill_params)
else:
raise ValueError(f"Unknown skill: {skill_name}")Quality Metrics Adapter
def calculate_quality_metrics(
skill_name: str,
results: list[dict],
metric_names: list[str]
) -> dict:
"""
Calculate quality metrics (skill-specific).
"""
scores = {}
# Skill-agnostic metrics (always available)
scores["completion_rate"] = len(results) / max(1, len(results))
scores["error_rate"] = sum(1 for r in results if r.get("error")) / max(1, len(results))
# Skill-specific metrics
if "accuracy" in metric_names:
# For security-scanning: vulnerability count vs. expected
# For performance-testing: actual latency vs. threshold
scores["accuracy"] = calculate_accuracy(skill_name, results)
if "coverage" in metric_names:
# For security-scanning: percentage of codebase scanned
# For performance-testing: percentage of endpoints tested
scores["coverage"] = calculate_coverage(skill_name, results)
return scoresScenario Parameter Templates
SCENARIO_TEMPLATES = {
# Generic: 1x, 3x, 8x scaling
"default": {
"simple": {"multiplier": 1.0, "timeout": 30, "batch_size": 10},
"medium": {"multiplier": 3.0, "timeout": 90, "batch_size": 50},
"complex": {"multiplier": 8.0, "timeout": 300, "batch_size": 100},
},
# Performance-testing: lighter loads
"performance-testing": {
"simple": {"multiplier": 1.0, "timeout": 60, "batch_size": 5},
"medium": {"multiplier": 2.0, "timeout": 180, "batch_size": 20},
"complex": {"multiplier": 4.0, "timeout": 600, "batch_size": 50},
},
# Security-scanning: heavier loads
"security-scanning": {
"simple": {"multiplier": 1.0, "timeout": 45, "batch_size": 20},
"medium": {"multiplier": 2.5, "timeout": 120, "batch_size": 100},
"complex": {"multiplier": 10.0, "timeout": 900, "batch_size": 500},
},
}
def get_scenario_config(skill_name: str, difficulty: str) -> dict:
template = SCENARIO_TEMPLATES.get(skill_name, SCENARIO_TEMPLATES["default"])
return template[difficulty]Key State Machine Patterns
Pattern 1: Optimistic Completion
Assume success, handle errors reactively:
# Default behavior: run until complete
for batch in batches:
result = await invoke_skill(batch)
# No error checking—complete normally
# Only if exception: enter error recoveryPro: Efficient for reliable skills Con: Slower error detection
Pattern 2: Pessimistic Validation
Validate each batch before continuing:
for batch in batches:
result = await invoke_skill(batch)
# Validate result
if not validate_result(result, scenario_def.expected_quality):
# Error recovery
retry_count += 1Pro: Catches issues early Con: Higher overhead
Pattern 3: Timeout-Based State Transitions
Use elapsed time to trigger state changes:
@dataclass
class TimedState:
created_at: int
timeout_seconds: int
def is_expired(self) -> bool:
return (now() - created_at) > timeout_seconds * 1000
# Check in state machine
if paused_state.is_expired():
# Force transition (to RUNNING or FAILED)Visualizing State Transitions
# Generate state transition diagram for debugging
import graphviz
def generate_state_diagram():
dot = graphviz.Digraph(comment="Scenario State Machine")
states = ["PENDING", "RUNNING", "PAUSED", "COMPLETE", "FAILED", "FINAL_RESULTS"]
for state in states:
dot.node(state, shape="ellipse")
transitions = [
("PENDING", "RUNNING", "supervisor.route()"),
("RUNNING", "PAUSED", "milestone reached"),
("RUNNING", "COMPLETE", "all items processed"),
("RUNNING", "FAILED", "fatal error"),
("PAUSED", "RUNNING", "sync complete or timeout"),
("PAUSED", "FAILED", "max retries exceeded"),
("COMPLETE", "FINAL_RESULTS", "aggregation trigger"),
("FAILED", "FINAL_RESULTS", "aggregation trigger"),
]
for src, dst, label in transitions:
dot.edge(src, dst, label=label)
dot.render("/tmp/state_machine", format="png", view=True)Testing State Transitions
@pytest.mark.asyncio
async def test_state_transition_pending_to_running():
state = ScenarioOrchestratorState(...)
assert state.progress_simple.status == "pending"
# Trigger supervisor
commands = await scenario_supervisor(state)
assert len(commands) == 3 # 3 Send commands
@pytest.mark.asyncio
async def test_state_transition_running_to_complete():
state = ScenarioOrchestratorState(...)
state.progress_simple.status = "running"
# Simulate processing all items
state.progress_simple.items_processed = 100
# Invoke worker
result = await scenario_worker(state)
assert result["progress_simple"]["status"] == "complete"Checklists (2)
Framework Selection
Framework Selection Checklist
Choose the right multi-agent framework.
Requirements Analysis
- Use case clearly defined
- Complexity level assessed (single vs multi-agent)
- State management needs identified
- Human-in-the-loop requirements defined
- Observability needs documented
Framework Evaluation
LangGraph
- Need complex stateful workflows
- Require persistence and checkpoints
- Want streaming support
- Need human-in-the-loop
- Already using LangChain ecosystem
CrewAI
- Role-based collaboration pattern
- Hierarchical team structure
- Agent delegation needed
- Quick prototyping required
- Built-in memory preferred
OpenAI Agents SDK
- OpenAI-native ecosystem
- Handoff pattern fits use case
- Need built-in guardrails
- Want OpenAI tracing
- Simpler agent definition preferred
Microsoft Agent Framework
- Enterprise compliance requirements
- Using Azure ecosystem
- Need A2A protocol support
- Want AutoGen+SK merger features
- Long-term Microsoft support preferred
AG2 (Community AutoGen)
- Open-source flexibility priority
- Community-driven development OK
- AutoGen familiarity exists
- Custom modifications needed
Technical Considerations
- Team expertise with framework
- Framework maturity level acceptable
- Community support adequate
- Documentation quality sufficient
- Production readiness validated
Integration Assessment
- Observability tool compatibility (Langfuse, etc.)
- LLM provider compatibility
- Existing codebase integration
- Testing framework support
- CI/CD pipeline compatibility
Risk Mitigation
- Fallback strategy defined
- Framework lock-in assessed
- Migration path understood
- Version update strategy
- Community health evaluated
Decision Documentation
- Framework choice documented
- Rationale recorded
- Alternatives considered listed
- Trade-offs acknowledged
- Review date scheduled
Orchestration Checklist
Multi-Agent Orchestration Checklist
Architecture
- Define agent responsibilities
- Plan communication patterns
- Set coordination strategy
- Design failure handling
Agent Design
- Single responsibility per agent
- Clear input/output contracts
- Independent operation
- Stateless when possible
Communication
- Message format definition
- Async message passing
- Result aggregation
- Error propagation
Coordination
- Central orchestrator
- Task queue management
- Priority handling
- Deadlock prevention
Monitoring
- Agent health checks
- Task completion tracking
- Performance metrics
- Error rates
Accessibility
Accessibility patterns for WCAG 2.2 compliance, keyboard focus management, React Aria component patterns, cognitive inclusion, native HTML-first philosophy, and user preference honoring. Use when implementing screen reader support, keyboard navigation, ARIA patterns, focus traps, accessible component libraries, reduced motion, or cognitive accessibility.
Ai Ui Generation
AI-assisted UI generation patterns for json-render, v0, Bolt, and Cursor workflows. Covers prompt engineering for component generation, review checklists for AI-generated code, design token injection, refactoring for design system conformance, and CI gates for quality assurance. Use when generating UI components with AI tools, rendering multi-surface MCP visual output, reviewing AI-generated code, or integrating AI output into design systems.
Last updated on