Mcp Patterns
MCP server building, advanced patterns, and security hardening. Use when building MCP servers, implementing tool handlers, adding authentication, creating interactive UIs, hardening MCP security, or debugging MCP integrations.
MCP Patterns
Patterns for building, composing, and securing Model Context Protocol servers. Based on the 2025-11-25 specification — the latest stable release maintained by the Agentic AI Foundation (Linux Foundation), co-founded by Anthropic, Block, and OpenAI.
Scaffolding a new server? Use Anthropic's
mcp-builderskill (claude install anthropics/skills) for project setup and evaluation creation. This skill focuses on patterns, security, and advanced features after initial setup.Deploying to Cloudflare? See the
building-mcp-server-on-cloudflareskill for Workers-specific deployment patterns.
Decision Tree — Which Rule to Read
What are you building?
│
├── New MCP server
│ ├── Setup & primitives ──────► rules/server-setup.md
│ ├── Transport selection ─────► rules/server-transport.md
│ └── Scaffolding ─────────────► mcp-builder skill (anthropics/skills)
│
├── Authentication & authorization
│ └── OAuth 2.1 + OIDC ───────► rules/auth-oauth21.md
│
├── Advanced server features
│ ├── Tool composition ────────► rules/advanced-composition.md
│ ├── Resource caching ────────► rules/advanced-resources.md
│ ├── Elicitation (user input) ► rules/elicitation.md
│ ├── Sampling (agent loops) ──► rules/sampling-tools.md
│ └── Interactive UI ──────────► rules/apps-ui.md
│
├── Client-side consumption
│ └── Connecting to servers ───► rules/client-patterns.md
│
├── Security hardening
│ ├── Prompt injection defense ► rules/security-injection.md
│ └── Zero-trust & verification ► rules/security-hardening.md
│
├── Testing & debugging
│ └── Inspector + unit tests ──► rules/testing-debugging.md
│
├── Discovery & ecosystem
│ └── Registries & catalogs ──► rules/registry-discovery.md
│
└── Browser-native tools
└── WebMCP (W3C) ───────────► rules/webmcp-browser.mdQuick Reference
| Category | Rule | Impact | Key Pattern |
|---|---|---|---|
| Server | server-setup.md | HIGH | FastMCP lifespan, Tool/Resource/Prompt primitives |
| Server | server-transport.md | HIGH | stdio for CLI, Streamable HTTP for production |
| Auth | auth-oauth21.md | HIGH | PKCE, RFC 8707 resource indicators, token validation |
| Advanced | advanced-composition.md | MEDIUM | Pipeline, parallel, and branching tool composition |
| Advanced | advanced-resources.md | MEDIUM | Resource caching with TTL, LRU eviction, lifecycle |
| Advanced | elicitation.md | MEDIUM | Server-initiated structured input from users |
| Advanced | sampling-tools.md | MEDIUM | Server-side agent loops with tool calling |
| Advanced | apps-ui.md | MEDIUM | Interactive UI via MCP Apps + @mcp-ui/* SDK |
| Client | client-patterns.md | MEDIUM | TypeScript/Python MCP client connection patterns |
| Security | security-injection.md | HIGH | Description sanitization, encoding normalization |
| Security | security-hardening.md | HIGH | Zero-trust allowlist, hash verification, rug pull detection |
| Quality | testing-debugging.md | MEDIUM | MCP Inspector, unit tests, transport debugging |
| Ecosystem | registry-discovery.md | LOW | Official registry API, server metadata |
| Ecosystem | webmcp-browser.md | LOW | W3C browser-native agent tools (complementary) |
Total: 14 rules across 6 categories
Key Decisions
| Decision | Recommendation |
|---|---|
| Transport | stdio for CLI/Desktop, Streamable HTTP for production (SSE deprecated) |
| Language | TypeScript for production (better SDK support, type safety) |
| Auth | OAuth 2.1 with PKCE (S256) + RFC 8707 resource indicators |
| Server lifecycle | Always use FastMCP lifespan for resource management |
| Error handling | Return errors as text content (Claude can interpret and retry) |
| Tool composition | Pipeline for sequential, asyncio.gather for parallel |
| Resource caching | TTL + LRU eviction with memory cap |
| Tool trust model | Zero-trust: explicit allowlist + hash verification |
| User input | Elicitation for runtime input; never request PII via elicitation |
| Interactive UI | MCP Apps with @mcp-ui/* SDK; sandbox all iframes |
| Token handling | Never pass through client tokens to downstream services |
Spec & Governance
- Protocol: Model Context Protocol, spec version 2025-11-25
- Governance: Agentic AI Foundation (Linux Foundation, Dec 2025)
- Platinum members: AWS, Anthropic, Block, Bloomberg, Cloudflare, Google, Microsoft, OpenAI
- Adoption: 10,000+ servers; Claude, Cursor, Copilot, Gemini, ChatGPT, VS Code
- Spec URL: https://modelcontextprotocol.io/specification/2025-11-25
Feature Maturity
| Feature | Spec Version | Status |
|---|---|---|
| Tools, Resources, Prompts | 2024-11-05 | Stable |
| Streamable HTTP transport | 2025-03-26 | Stable (replaces SSE) |
| OAuth 2.1 + Elicitation (form) | 2025-06-18 | Stable |
| Sampling with tool calling | 2025-11-25 | Stable |
| Elicitation URL mode | 2025-11-25 | Stable |
| MCP Apps (UI extension) | 2026-01-26 | Extension (ext-apps) |
| WebMCP (browser-native) | 2026-02-14 | W3C Community Draft |
Example
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("my-server")
@mcp.tool()
async def search(query: str) -> str:
"""Search documents. Returns matching results."""
results = await db.search(query)
return "\n".join(r.title for r in results[:10])Common Mistakes
- No lifecycle management (connection/resource leaks on shutdown)
- Missing input validation on tool arguments
- Returning secrets in tool output (API keys, credentials)
- Unbounded response sizes (Claude has context limits)
- Trusting tool descriptions without sanitization (injection risk)
- No hash verification on tool invocations (rug pull vulnerability)
- Storing auth tokens in session IDs (credential leak)
- Blocking synchronous code in async server (use
asyncio.to_thread()) - Using SSE transport instead of Streamable HTTP (deprecated since March 2025)
- Passing through client tokens to downstream services (confused deputy)
Ecosystem
| Resource | What For |
|---|---|
mcp-builder skill (anthropics/skills) | Scaffold new MCP servers + create evals |
building-mcp-server-on-cloudflare skill | Deploy MCP servers on Cloudflare Workers |
@mcp-ui/* packages (npm) | Implement MCP Apps UI standard |
| MCP Registry | Discover servers: https://registry.modelcontextprotocol.io/ |
| MCP Inspector | Debug and test servers interactively |
Related Skills
ork:llm-integration— LLM function calling patternsork:security-patterns— General input sanitization and layered securityork:api-design— REST/GraphQL API design patterns
Rules (14)
Compose multi-tool MCP workflows with error isolation to avoid brittle spaghetti code — MEDIUM
Advanced Composition
Compose multiple MCP tools into pipelines, parallel fans, or conditional branches.
Incorrect -- manual sequential calls with no error handling:
result1 = await tool_a(data)
result2 = await tool_b(result1) # Crashes if tool_a fails
result3 = await tool_c(result2) # No way to recoverCorrect -- pipeline composition with error propagation:
from dataclasses import dataclass, field
from typing import Any, Callable, Awaitable
@dataclass
class ToolResult:
success: bool
data: Any
error: str | None = None
@dataclass
class ComposedTool:
name: str
tools: dict[str, Callable[..., Awaitable[ToolResult]]]
pipeline: list[str]
async def execute(self, input_data: dict[str, Any]) -> ToolResult:
result = ToolResult(success=True, data=input_data)
for tool_name in self.pipeline:
if not result.success:
break
try:
result = await self.tools[tool_name](result.data)
except Exception as e:
result = ToolResult(success=False, data=None,
error=f"'{tool_name}' failed: {e}")
return result
# Usage: search then summarize
search_summarize = ComposedTool(
name="search_and_summarize",
tools={"search": search_docs, "summarize": summarize_content},
pipeline=["search", "summarize"],
)Correct -- parallel composition with error isolation:
import asyncio
async def parallel_execute(
tools: dict[str, Callable],
input_data: dict,
) -> list[ToolResult]:
tasks = [
asyncio.create_task(tool(input_data))
for tool in tools.values()
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [
ToolResult(success=False, data=None, error=str(r))
if isinstance(r, Exception) else r
for r in results
]Correct -- conditional branching:
def content_router(data: dict) -> str:
return {
"text": "text_processor",
"image": "image_analyzer",
"audio": "audio_transcriber",
}.get(data.get("type", "text"), "text_processor")
# Route to the right tool based on input
tool_name = content_router(input_data)
result = await tools[tool_name](input_data)Key rules:
- Pipeline: stop on first failure, propagate error context
- Parallel: use
return_exceptions=Trueto isolate failures - Branching: always include a default/fallback route
- Keep composition depth shallow (3-4 steps max)
Manage MCP resource caching and lifecycles to prevent memory leaks and redundant calls — MEDIUM
Advanced Resources
Cache MCP resources with TTL and LRU eviction. Always track memory usage and clean up expired entries.
Incorrect -- no caching, no cleanup:
@mcp.resource("user://{id}/profile")
async def get_profile(id: str) -> dict:
return await db.query(f"SELECT * FROM users WHERE id = {id}") # SQL injection + no cacheCorrect -- resource manager with TTL and LRU eviction:
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
import asyncio
@dataclass
class CachedResource:
data: Any
created_at: datetime
last_accessed: datetime
size_bytes: int = 0
def touch(self) -> None:
self.last_accessed = datetime.now()
class MCPResourceManager:
def __init__(
self,
cache_ttl: timedelta = timedelta(minutes=15),
max_cache_size: int = 100,
max_memory_bytes: int = 100 * 1024 * 1024, # 100MB
):
self.cache_ttl = cache_ttl
self.max_cache_size = max_cache_size
self.max_memory_bytes = max_memory_bytes
self._cache: dict[str, CachedResource] = {}
self._lock = asyncio.Lock()
async def get(self, uri: str, loader: callable) -> Any:
async with self._lock:
if uri in self._cache:
resource = self._cache[uri]
if datetime.now() - resource.created_at <= self.cache_ttl:
resource.touch()
return resource.data
del self._cache[uri] # Expired
data = await loader(uri)
await self._store(uri, data)
return data
async def _store(self, uri: str, data: Any) -> None:
import sys
size = sys.getsizeof(data)
# Evict LRU entries if needed
while (len(self._cache) >= self.max_cache_size
or self._total_size() + size > self.max_memory_bytes):
if not self._cache:
break
lru_uri = min(self._cache, key=lambda k: self._cache[k].last_accessed)
del self._cache[lru_uri]
now = datetime.now()
self._cache[uri] = CachedResource(
data=data, created_at=now, last_accessed=now, size_bytes=size,
)
def _total_size(self) -> int:
return sum(r.size_bytes for r in self._cache.values())
async def cleanup_expired(self) -> int:
async with self._lock:
now = datetime.now()
expired = [
uri for uri, r in self._cache.items()
if now - r.created_at > self.cache_ttl
]
for uri in expired:
del self._cache[uri]
return len(expired)Correct -- FastMCP lifespan with resource lifecycle:
from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP
@asynccontextmanager
async def app_lifespan(server: FastMCP):
resources = MCPResourceManager(
cache_ttl=timedelta(minutes=10),
max_memory_bytes=50 * 1024 * 1024,
)
try:
yield {"resources": resources}
finally:
await resources.cleanup_expired() # Final cleanup
mcp = FastMCP("cached-server", lifespan=app_lifespan)Key rules:
- Always set
max_cache_sizeandmax_memory_bytescaps - Use
asyncio.Lockfor thread-safe cache access - Run
cleanup_expired()on shutdown and periodically - Parameterize queries -- never interpolate user input into SQL
Configure MCP Apps UI sandboxing, CSP declarations, and visibility controls correctly — MEDIUM
MCP Apps UI
MCP Apps (SEP-1865) let tools return interactive UIs rendered in sandboxed iframes. Declare ui:// resources, link them to tools via _meta.ui.resourceUri, and configure CSP domains for secure external access.
Incorrect -- no CSP, no sandbox awareness, no visibility control:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
const server = new McpServer({ name: "my-app", version: "1.0.0" });
// BAD: resource uses generic mimeType, no ui:// scheme
server.registerResource("dashboard", "https://my-app.com/dashboard", {
mimeType: "text/html",
});
// BAD: no _meta.ui linkage, no visibility — internal tool exposed to model
server.registerTool("refresh_dashboard", {
description: "Refresh dashboard data",
inputSchema: { type: "object" },
}, async () => ({
content: [{ type: "text", text: "refreshed" }],
}));Correct -- registerAppTool/registerAppResource with CSP and visibility:
import {
registerAppTool,
registerAppResource,
RESOURCE_MIME_TYPE, // "text/html;profile=mcp-app"
} from "@modelcontextprotocol/ext-apps/server";
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import type { CallToolResult, ReadResourceResult } from "@modelcontextprotocol/sdk/types.js";
const server = new McpServer({ name: "my-app", version: "1.0.0" });
const RESOURCE_URI = "ui://my-app/dashboard";
// Declare CSP domains for external tile/API access
const cspMeta = {
ui: {
csp: {
connectDomains: ["https://api.example.com"], // fetch/XHR/WebSocket
resourceDomains: ["https://cdn.jsdelivr.net"], // scripts, images, styles
frameDomains: ["https://www.youtube.com"], // nested iframes
},
prefersBorder: true,
},
};
// Register UI resource with CSP metadata
registerAppResource(server, RESOURCE_URI, RESOURCE_URI,
{ mimeType: RESOURCE_MIME_TYPE },
async (): Promise<ReadResourceResult> => ({
contents: [{
uri: RESOURCE_URI,
mimeType: RESOURCE_MIME_TYPE,
text: htmlContent,
_meta: cspMeta,
}],
}),
);
// Tool visible to both model and app (default)
registerAppTool(server, "get-dashboard", {
title: "Get Dashboard",
description: "Show interactive analytics dashboard.",
inputSchema: {},
_meta: { ui: { resourceUri: RESOURCE_URI } },
}, async (): Promise<CallToolResult> => ({
content: [{ type: "text", text: JSON.stringify(data) }],
}));
// App-only tool — hidden from model, callable only by the UI
registerAppTool(server, "refresh_data", {
title: "Refresh Data",
description: "Refresh dashboard data (internal).",
inputSchema: {},
_meta: {
ui: {
resourceUri: RESOURCE_URI,
visibility: ["app"], // hidden from model tool list
},
},
}, async (): Promise<CallToolResult> => ({
content: [{ type: "text", text: JSON.stringify(freshData) }],
}));Correct -- React app using @modelcontextprotocol/ext-apps/react:
import { useToolResult } from "@modelcontextprotocol/ext-apps/react";
function Dashboard() {
const result = useToolResult(); // receives tool call data
const data = JSON.parse(result?.content?.[0]?.text ?? "{}");
return <div>{/* render interactive UI from data */}</div>;
}Key rules:
- Use
ui://URI scheme for all UI resources, withtext/html;profile=mcp-appmimeType - Use
registerAppToolandregisterAppResourcefrom@modelcontextprotocol/ext-apps/server - Link tools to UIs via
_meta.ui.resourceUrion the tool definition - Declare CSP domains explicitly:
connectDomains(fetch),resourceDomains(CDN),frameDomains(iframes) - Omitting CSP defaults to
connect-src 'none'-- no external network access - Set
visibility: ["app"]for tools only the UI should call (hides from model) - Default visibility is
["model", "app"]-- tool visible to both model and UI - Host renders UI in sandboxed iframe; never assume permissions are granted
- Content MUST be valid HTML5 provided via
text(string) orblob(base64)
Reference: MCP Apps Extension (SEP-1865)
OAuth 2.1 Authorization for MCP Servers — HIGH
OAuth 2.1 Authorization for MCP Servers
MCP servers are OAuth 2.1 Resource Servers (spec 2025-11-25). Clients MUST use PKCE with S256, bind tokens to the target resource via RFC 8707, and never pass tokens through to downstream services.
Incorrect -- no PKCE, no resource indicator, token passthrough:
// BAD: Missing PKCE and resource parameter
const authUrl = `${authServer}/authorize?client_id=${clientId}&redirect_uri=${redirect}`;
// BAD: Passing client's token to upstream API (confused deputy)
async function callUpstreamApi(clientToken: string) {
return fetch("https://api.example.com/data", {
headers: { Authorization: `Bearer ${clientToken}` }, // NEVER DO THIS
});
}
// BAD: No audience validation on the resource server
function validateToken(token: string) {
const decoded = jwt.verify(token, publicKey);
return decoded; // Missing audience check — accepts ANY valid token
}Correct -- PKCE S256 + RFC 8707 resource binding:
import crypto from "node:crypto";
// 1. PKCE: Generate verifier and S256 challenge
function createPkce() {
const verifier = crypto.randomBytes(32).toString("base64url");
const challenge = crypto.createHash("sha256").update(verifier).digest("base64url");
return { verifier, challenge };
}
// 2. Authorization request with resource indicator (RFC 8707)
function buildAuthUrl(
authServer: string, clientId: string, redirectUri: string,
mcpServerUri: string, scopes: string[],
) {
const { verifier, challenge } = createPkce();
const state = crypto.randomBytes(16).toString("base64url");
const params = new URLSearchParams({
response_type: "code",
client_id: clientId,
redirect_uri: redirectUri,
code_challenge: challenge,
code_challenge_method: "S256",
resource: mcpServerUri, // MUST match MCP server's canonical URI
scope: scopes.join(" "),
state,
});
return { url: `${authServer}/authorize?${params}`, verifier, state };
}
// 3. Token exchange — resource parameter MUST match authorization request
async function exchangeCode(
tokenEndpoint: string, code: string, verifier: string,
clientId: string, redirectUri: string, mcpServerUri: string,
) {
const res = await fetch(tokenEndpoint, {
method: "POST",
headers: { "Content-Type": "application/x-www-form-urlencoded" },
body: new URLSearchParams({
grant_type: "authorization_code", code,
code_verifier: verifier, client_id: clientId,
redirect_uri: redirectUri, resource: mcpServerUri,
}),
});
return res.json();
}Correct -- token validation + confused deputy prevention:
// 4. MCP server validates audience (RFC 8707 + RFC 9068)
function validateAccessToken(token: string, expectedAudience: string) {
const decoded = jwt.verify(token, publicKey, {
algorithms: ["RS256"],
audience: expectedAudience, // MUST be this server's canonical URI
issuer: trustedIssuer,
});
return decoded;
}
// 5. Upstream calls use a SEPARATE token — never forward the client's token
async function callUpstream(upstreamTokenEndpoint: string) {
const { access_token } = await fetch(upstreamTokenEndpoint, {
method: "POST",
body: new URLSearchParams({ grant_type: "client_credentials", scope: "upstream:read" }),
}).then((r) => r.json());
return access_token; // Scoped to upstream, NOT the client's token
}Correct -- discovery, registration, and incremental scope consent:
// 6. Protected Resource Metadata discovery (RFC 9728)
async function discoverAuthServer(mcpServerUrl: string) {
const origin = new URL(mcpServerUrl).origin;
const meta = await fetch(`${origin}/.well-known/oauth-protected-resource`).then((r) => r.json());
const asUrl = meta.authorization_servers[0];
// Try OAuth 2.0 AS Metadata, then OIDC Discovery
for (const p of ["/.well-known/oauth-authorization-server", "/.well-known/openid-configuration"]) {
const res = await fetch(`${asUrl}${p}`);
if (res.ok) return res.json();
}
throw new Error("No authorization server metadata found");
}
// 7. Dynamic Client Registration (RFC 7591) — fallback when no pre-registration
async function registerClient(registrationEndpoint: string) {
return fetch(registrationEndpoint, {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
client_name: "My MCP Client",
redirect_uris: ["http://127.0.0.1:3000/callback"],
grant_types: ["authorization_code"],
token_endpoint_auth_method: "none",
}),
}).then((r) => r.json());
}
// 8. Incremental scope consent — handle 403 insufficient_scope
function handleInsufficientScope(wwwAuth: string) {
const match = wwwAuth.match(/scope="([^"]+)"/);
if (match) return match[1].split(" "); // Re-authorize with these scopes
}Key rules:
- PKCE with S256 is mandatory; refuse to proceed if AS lacks
code_challenge_methods_supported - Include
resourceparameter (RFC 8707) in both authorization and token requests, set to the MCP server's canonical URI - MCP servers MUST validate the
audclaim matches their own URI — reject all other tokens - NEVER pass the client's access token to upstream APIs (confused deputy); obtain a separate token via client credentials or token exchange (RFC 8693)
- Use Protected Resource Metadata (RFC 9728) for AS discovery; support both OAuth 2.0 AS Metadata and OIDC Discovery
- Prefer Client ID Metadata Documents over Dynamic Client Registration (RFC 7591) for new implementations
- Handle
403 insufficient_scopeby re-authorizing with scopes from theWWW-Authenticateheader - For high-security deployments, bind tokens to client certificates via mTLS (RFC 8705) to prevent token theft and replay
Implement MCP client patterns for reliable connections and multi-server orchestration — MEDIUM
Client Patterns
Set up MCP clients with proper session management, error handling, and reconnection. Covers TypeScript and Python SDKs for consuming MCP servers from applications.
Incorrect -- no error handling, no cleanup:
import { Client, StreamableHTTPClientTransport } from "@modelcontextprotocol/client";
const client = new Client({ name: "app", version: "1.0.0" });
const transport = new StreamableHTTPClientTransport(new URL("http://localhost:3000/mcp"));
await client.connect(transport);
const result = await client.callTool({ name: "search", arguments: { q: "test" } });
console.log(result.content[0].text); // Crashes if tool errors or content empty
// Transport never closed -- connection leakedfrom mcp.client.streamable_http import streamable_http_client
from mcp import ClientSession
# No context manager -- session never cleaned up
read, write = await streamable_http_client("http://localhost:3000/mcp").__aenter__()
session = ClientSession(read, write)
await session.initialize()
result = await session.call_tool("search", arguments={"q": "test"})
print(result.content[0].text) # No type check, no error handlingCorrect -- TypeScript client with reconnection and capability negotiation:
import { Client, StreamableHTTPClientTransport } from "@modelcontextprotocol/client";
const transport = new StreamableHTTPClientTransport(
new URL("http://localhost:3000/mcp"),
{
sessionId: cachedSessionId, // Reconnect to existing session
reconnectionOptions: {
maxRetries: 5,
initialReconnectionDelay: 1000,
maxReconnectionDelay: 30000,
reconnectionDelayGrowFactor: 1.5,
},
}
);
const client = new Client(
{ name: "my-app", version: "1.0.0" },
{ capabilities: { sampling: {} } } // Declare client capabilities
);
try {
await client.connect(transport);
const caps = client.getServerCapabilities();
// Discover tools before calling
const { tools } = await client.listTools();
const hasTool = tools.some((t) => t.name === "search");
if (!hasTool) throw new Error("Required tool 'search' not available");
const result = await client.callTool({ name: "search", arguments: { q: "test" } });
for (const content of result.content) {
if (content.type === "text") console.log(content.text);
}
} finally {
await transport.terminateSession();
await transport.close();
}Correct -- Python client with context managers:
import asyncio
from mcp import ClientSession, StdioServerParameters, types
from mcp.client.stdio import stdio_client
from mcp.client.streamable_http import streamable_http_client
async def run_stdio_client():
server_params = StdioServerParameters(
command="python", args=["my_server.py"]
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await session.list_tools()
result = await session.call_tool("add", arguments={"a": 5, "b": 3})
for content in result.content:
if isinstance(content, types.TextContent):
print(content.text)
async def run_http_client():
async with streamable_http_client("http://localhost:8000/mcp") as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
tools = await session.list_tools()
print([t.name for t in tools.tools])Correct -- multi-server orchestration (TypeScript):
async function connectServers(urls: string[]) {
const clients = await Promise.all(
urls.map(async (url) => {
const transport = new StreamableHTTPClientTransport(new URL(url));
const client = new Client({ name: "orchestrator", version: "1.0.0" });
await client.connect(transport);
const { tools } = await client.listTools();
return { client, transport, tools, url };
})
);
// Build unified tool registry across servers
const toolMap = new Map<string, typeof clients[0]>();
for (const entry of clients) {
for (const tool of entry.tools) {
toolMap.set(`${tool.name}@${entry.url}`, entry);
}
}
return { clients, toolMap };
}Key rules:
- Always close transports in
finallyblocks (TS) or use context managers (Python) - Call
initialize()before any other session method in Python - Discover tools with
listTools()before calling -- never assume tool availability - Use
reconnectionOptionswith exponential backoff for remote HTTP servers - Cache
sessionIdto resume sessions after reconnection - Check
content.typebefore accessing.text-- tools may return images or errors - For multi-server setups, namespace tools by server to avoid name collisions
- Declare client capabilities (
sampling,elicitation) during construction
Reference: https://modelcontextprotocol.io/specification/2025-11-25/architecture
Use MCP elicitation safely with consent handling and secure form-mode data collection — MEDIUM
Elicitation
MCP elicitation lets servers request structured input from users at runtime via form mode (JSON Schema) or URL mode (external flows). Form mode collects non-sensitive data in-band; URL mode redirects users to secure pages for credentials, OAuth, or payments.
Incorrect -- requesting secrets via form mode, ignoring decline/cancel:
@mcp.tool()
async def connect_api(ctx: Context) -> str:
# WRONG: form mode exposes secrets to the LLM context
result = await ctx.session.create_elicitation(
mode="form",
message="Enter your API key",
requestedSchema={
"type": "object",
"properties": {
"api_key": {"type": "string"},
# WRONG: nested objects not allowed in elicitation schemas
"config": {"type": "object", "properties": {"timeout": {"type": "number"}}},
},
},
)
# WRONG: assumes accept, crashes on decline/cancel
return call_api(result.content["api_key"])Correct -- form mode for non-sensitive data, flat schema, handle all actions:
@mcp.tool()
async def configure_search(ctx: Context) -> str:
result = await ctx.session.create_elicitation(
mode="form",
message="Configure your search preferences",
requestedSchema={
"type": "object",
"properties": {
"query": {"type": "string", "minLength": 1, "description": "Search terms"},
"category": {
"type": "string",
"enum": ["docs", "code", "issues"],
"default": "docs",
},
"max_results": {
"type": "integer",
"minimum": 1,
"maximum": 50,
"default": 10,
},
},
"required": ["query"],
},
)
if result.action == "accept":
return search(result.content)
elif result.action == "decline":
return "Search cancelled. Let me know if you'd like to try different options."
else: # cancel
return "Search dismissed. I can search with defaults if you'd like."Correct -- URL mode for sensitive data (API keys, OAuth):
@mcp.tool()
async def connect_service(ctx: Context) -> str:
elicitation_id = str(uuid.uuid4())
result = await ctx.session.create_elicitation(
mode="url",
message="Please authorize access to your account.",
elicitation_id=elicitation_id,
url=f"https://myserver.example.com/connect?eid={elicitation_id}",
)
if result.action == "accept":
# User consented to open URL -- interaction happens out-of-band.
# Server sends notifications/elicitation/complete when done.
return "Authorization started. I'll proceed once you complete the flow."
elif result.action == "decline":
return "Authorization declined. Some features will be unavailable."
else: # cancel
return "Authorization dismissed."Correct -- client declares elicitation capabilities:
const client = new Client({
name: "my-client",
version: "1.0.0",
}, {
capabilities: {
elicitation: { form: {}, url: {} }, // declare supported modes
},
});Key rules:
- Never request secrets (API keys, passwords, tokens) via form mode -- use URL mode instead
- Schemas must be flat objects with primitive properties only (string, number, integer, boolean, enum) -- no nested objects or
$ref - Always handle all three response actions:
accept,decline,cancel - URL mode
acceptmeans user consented to open the URL, not that the flow is complete -- listen fornotifications/elicitation/complete - Clients must show the full URL and get explicit consent before opening; never auto-fetch or auto-navigate
- Servers must verify the user who completes a URL flow is the same user who initiated it (prevent phishing/account takeover)
- Check client capabilities before sending elicitation requests -- clients may support only
form, onlyurl, or both
Reference: https://modelcontextprotocol.io/specification/2025-11-25/client/elicitation
Vet MCP servers from registries to prevent supply-chain attacks and data exfiltration — LOW
Registry Discovery
Use the official MCP Registry API for programmatic server discovery and apply a vetting checklist before installing any third-party server.
Incorrect -- blindly install unvetted servers:
# Grabbed a random server name from a blog post
config = {"mcpServers": {"sketchy-db": {"command": "npx", "args": ["@unknown/mcp-db"]}}}
# No source review, no version pinning, no permission auditCorrect -- query the official registry and vet before installing:
import httpx
REGISTRY = "https://registry.modelcontextprotocol.io"
async def discover_servers(query: str) -> list[dict]:
"""Search the official MCP Registry API."""
async with httpx.AsyncClient() as client:
resp = await client.get(f"{REGISTRY}/v0.1/servers", params={
"search": query, "version": "latest", "limit": 20,
})
resp.raise_for_status()
return resp.json()["servers"]
async def get_server_detail(name: str, version: str = "latest") -> dict:
"""Fetch full metadata for a specific server."""
async with httpx.AsyncClient() as client:
resp = await client.get(f"{REGISTRY}/v0.1/servers/{name}/versions/{version}")
resp.raise_for_status()
return resp.json()
def vet_server(server: dict) -> list[str]:
"""Return warnings if server fails vetting checks."""
warnings = []
s = server.get("server", server)
if not s.get("repository", {}).get("url"):
warnings.append("No public source repository")
if not s.get("packages"):
warnings.append("Not published to any package registry")
meta = server.get("_meta", {}).get("io.modelcontextprotocol.registry/official", {})
if meta.get("status") != "active":
warnings.append(f"Registry status: {meta.get('status', 'unknown')}")
return warningsCommunity directories for broader discovery:
| Directory | URL | Notes |
|---|---|---|
| Official Registry | registry.modelcontextprotocol.io | API-accessible, moderation |
| mcp.run | mcp.run | Hosted runtime, sandboxed |
| Smithery | smithery.ai | Install counts, reviews |
| Glama | glama.ai/mcp/servers | Curated catalog |
| MCP Servers | mcpservers.org | Community-maintained list |
Vetting checklist before installing any server:
- [ ] Source code in a public repository with commit history
- [ ] Published to npm/PyPI (not just a git clone)
- [ ] Version pinned in config (no `@latest` in production)
- [ ] README documents all tools, resources, and required permissions
- [ ] No overly broad capabilities (filesystem root, network wildcard)
- [ ] Active maintenance (commits within last 90 days)
- [ ] Listed in official registry or reputable directoryIcon metadata (spec 2025-11-25) -- expose icons for tools/resources:
@mcp.tool(metadata={"icon": "https://example.com/icons/search.svg"})
def search(query: str) -> str:
"""Search documents."""
...Key rules:
- Always query the official registry at
registry.modelcontextprotocol.io/v0.1/serversfirst - Never install a server without checking its source repository and package provenance
- Pin exact versions in MCP server configurations -- avoid
@latestin production - Cross-reference multiple directories (registry, smithery, mcp.run) for trust signals
- Treat community servers as untrusted by default; apply allowlist patterns from security-hardening
- Use
vet_server()checks programmatically when building multi-server orchestrations
Bound MCP sampling loops with user approval to prevent unbounded LLM call chains — MEDIUM
Sampling with Tool Calling
MCP sampling lets servers request LLM completions from clients, with optional tool definitions for agentic multi-turn loops. The client controls model access and user approval throughout.
Incorrect -- no iteration cap, skips user approval:
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("agent-server")
@mcp.tool()
async def run_agent(task: str, ctx) -> str:
messages = [{"role": "user", "content": {"type": "text", "text": task}}]
tools = [{"name": "search", "description": "Search docs",
"inputSchema": {"type": "object", "properties": {"q": {"type": "string"}}, "required": ["q"]}}]
# Unbounded loop -- runs forever if LLM keeps calling tools
while True:
result = await ctx.session.create_message(
messages=messages, tools=tools, max_tokens=2000
)
if result.stop_reason != "toolUse":
return result.content.text
# Blindly append and continue without any limit
messages.append({"role": "assistant", "content": result.content})
tool_results = [execute_tool(tc) for tc in result.content]
messages.append({"role": "user", "content": tool_results})Correct -- bounded loop, tool choice control, proper message structure:
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("agent-server")
MAX_ITERATIONS = 5
TOOLS = [{
"name": "search",
"description": "Search documentation by keyword",
"inputSchema": {
"type": "object",
"properties": {"q": {"type": "string", "description": "Search query"}},
"required": ["q"],
},
}]
@mcp.tool()
async def run_agent(task: str, ctx: Context) -> str:
"""Run a bounded agent loop with tool access via sampling."""
messages = [{"role": "user", "content": {"type": "text", "text": task}}]
for i in range(MAX_ITERATIONS):
# Force text-only response on final iteration
tool_choice = (
{"mode": "none"} if i == MAX_ITERATIONS - 1
else {"mode": "auto"}
)
result = await ctx.session.create_message(
messages=messages,
tools=TOOLS,
tool_choice=tool_choice,
max_tokens=2000,
)
# LLM chose not to use tools -- return final answer
if result.stop_reason != "toolUse":
return result.content.text if hasattr(result.content, "text") else str(result.content)
# Execute each tool call, build tool_result messages
assistant_content = result.content if isinstance(result.content, list) else [result.content]
messages.append({"role": "assistant", "content": assistant_content})
# Tool results MUST be in their own user message -- no mixed content
tool_results = []
for block in assistant_content:
if block.type == "tool_use":
output = await execute_tool(block.name, block.input)
tool_results.append({
"type": "tool_result",
"toolUseId": block.id,
"content": [{"type": "text", "text": str(output)}],
})
messages.append({"role": "user", "content": tool_results})
return "Agent reached iteration limit without a final answer."Declaring sampling capability with tool support (client-side):
# Client must advertise sampling.tools capability during initialization
capabilities = {
"sampling": {
"tools": {} # Required for tool-enabled sampling requests
}
}Key rules:
- Always cap iteration count and use
toolChoice: \{mode: "none"\}on the final turn to force a text response - Tool result messages MUST contain only
tool_resultblocks -- never mix with text or image content - Every
tool_useblock (byid) must have a matchingtool_result(bytoolUseId) before the next assistant turn - Clients MUST declare
sampling.toolscapability; servers MUST NOT send tool-enabled requests without it - Human-in-the-loop: clients SHOULD present sampling requests and tool calls for user review before execution
- Use
toolChoicemodes:auto(LLM decides),required(must call a tool),none(text only) - Parallel tool calls are supported -- handle arrays of
tool_useblocks in a single assistant message - Implement rate limiting on the client side to prevent runaway sampling loops
Apply zero-trust verification to MCP servers to prevent rug-pull and data exfiltration — HIGH
Security Hardening
Verify every tool with hash-based integrity checks. Use zero-trust allowlists, capability enforcement, and secure sessions.
Incorrect -- trust all tools without verification:
tools = await mcp.list_tools() # No vetting!
result = await mcp.call_tool(name, args) # No integrity check!
session_id = f"{user_id}:{auth_token}" # CREDENTIAL LEAK in session ID!Correct -- zero-trust tool allowlist with hash verification:
from hashlib import sha256
from dataclasses import dataclass
from datetime import datetime, timezone
@dataclass
class AllowedTool:
name: str
description_hash: str
capabilities: list[str]
approved_by: str
max_calls_per_minute: int = 60
class MCPToolAllowlist:
def __init__(self):
self._allowed: dict[str, AllowedTool] = {}
self._call_counts: dict[str, list[datetime]] = {}
def register(self, tool: AllowedTool) -> None:
self._allowed[tool.name] = tool
self._call_counts[tool.name] = []
def validate(self, name: str, description: str) -> tuple[bool, str]:
if name not in self._allowed:
return False, f"Tool '{name}' not in allowlist"
expected = self._allowed[name]
actual_hash = sha256(description.encode('utf-8')).hexdigest()
if actual_hash != expected.description_hash:
return False, "Description changed (possible rug pull)"
# Rate limit
now = datetime.now(timezone.utc)
recent = [t for t in self._call_counts[name]
if (now - t).total_seconds() < 60]
if len(recent) >= expected.max_calls_per_minute:
return False, "Rate limit exceeded"
self._call_counts[name] = recent + [now]
return True, "OK"Correct -- capability enforcement (least privilege):
from enum import Enum
class ToolCapability(Enum):
READ_FILE = "read:file"
WRITE_FILE = "write:file"
EXECUTE_COMMAND = "execute:command"
NETWORK_REQUEST = "network:request"
SENSITIVE_PATHS = ["/etc/passwd", "~/.ssh", ".env", "credentials"]
class CapabilityEnforcer:
def __init__(self):
self._declarations: dict[str, set[ToolCapability]] = {}
def register(self, tool_name: str, caps: set[ToolCapability]) -> None:
self._declarations[tool_name] = caps
def check(self, tool_name: str, cap: ToolCapability, resource: str = "") -> tuple[bool, str]:
if tool_name not in self._declarations:
return False, "No capability declaration"
if cap not in self._declarations[tool_name]:
return False, f"Capability {cap.value} not allowed"
if cap in (ToolCapability.READ_FILE, ToolCapability.WRITE_FILE):
if any(s in resource for s in SENSITIVE_PATHS):
return False, "Sensitive path denied"
return True, "Allowed"Correct -- secure session management:
import secrets
def generate_session_id() -> str:
return secrets.token_urlsafe(32) # 256 bits of entropy
# NEVER: session_id = f"{user_id}:{auth_token}"
# ALWAYS: session_id = secrets.token_urlsafe(32)Rug pull detection -- hash comparison on every call:
class ToolIntegrityMonitor:
def __init__(self):
self._fingerprints: dict[str, str] = {}
def register(self, tool: dict) -> None:
desc = tool.get("description", "")
params = json.dumps(tool.get("parameters", {}), sort_keys=True)
combined = sha256(f"{desc}:{params}".encode()).hexdigest()
self._fingerprints[tool["name"]] = combined
def verify(self, tool: dict) -> tuple[bool, str | None]:
name = tool["name"]
if name not in self._fingerprints:
return False, "Tool not registered"
desc = tool.get("description", "")
params = json.dumps(tool.get("parameters", {}), sort_keys=True)
current = sha256(f"{desc}:{params}".encode()).hexdigest()
if current != self._fingerprints[name]:
return False, f"Tool '{name}' modified since registration"
return True, NoneKey rules:
- Every tool must be explicitly vetted before use (zero-trust)
- Hash-verify description + parameters on every invocation
- Use
secrets.token_urlsafe(32)for session IDs, never embed auth tokens - Enforce least-privilege capabilities per tool
- Rate limit tool calls (per-tool and per-session)
- Auto-suspend tools that fail integrity checks
Defend against prompt injection in MCP tool descriptions that can hijack LLM behavior — HIGH
Security Injection Defense
Treat ALL tool descriptions as untrusted input. Normalize encodings, detect injection patterns, and sanitize before LLM exposure.
Incorrect -- raw tool description passed to LLM:
# INJECTION RISK: description may contain "ignore previous instructions..."
prompt = f"Use this tool: {tool.description}"
tools = await mcp.list_tools() # No validation!Correct -- sanitize tool descriptions before use:
import re
FORBIDDEN_PATTERNS = {
"critical": [
(r"ignore\s+(all\s+)?previous", "instruction_override"),
(r"you\s+are\s+now", "role_hijack"),
(r"forget\s+(everything|all|above)", "context_wipe"),
(r"system\s*prompt", "system_access"),
],
"high": [
(r"IMPORTANT\s*:", "attention_hijack"),
(r"override\s+(all\s+)?settings", "config_override"),
(r"<\|.*?\|>", "delimiter_attack"),
(r"reveal\s+(your|the)\s+(prompt|instructions)", "prompt_extraction"),
],
}
def sanitize_description(description: str) -> tuple[str, list[str]]:
"""Sanitize tool description. Returns (sanitized, detected_threats)."""
if not description:
return "", []
threats = []
sanitized = normalize_encodings(description)
for level in ["critical", "high"]:
for pattern, name in FORBIDDEN_PATTERNS[level]:
if re.search(pattern, sanitized, re.I):
threats.append(f"{level}:{name}")
sanitized = re.sub(pattern, "[REDACTED]", sanitized, flags=re.I)
return sanitized.strip(), threatsCorrect -- normalize encodings to reveal hidden attacks:
import html
import urllib.parse
import unicodedata
HOMOGLYPHS = {
'\u0430': 'a', '\u0435': 'e', '\u043e': 'o',
'\u0440': 'p', '\u0441': 'c', '\u0443': 'y',
}
def normalize_encodings(text: str) -> str:
"""Decode HTML entities, URL encoding, hex escapes, homoglyphs."""
result = html.unescape(text) # I -> I
result = urllib.parse.unquote(result) # %69 -> i
result = re.sub( # \x69 -> i
r'\\x([0-9a-fA-F]{2})',
lambda m: chr(int(m.group(1), 16)),
result,
)
result = unicodedata.normalize('NFKC', result) # Unicode normalization
for glyph, latin in HOMOGLYPHS.items(): # Cyrillic -> Latin
result = result.replace(glyph, latin)
return resultCorrect -- filter sensitive data from tool responses:
RESPONSE_FILTERS = [
(r"api[_-]?key\s*[:=]\s*\S+", "[API_KEY_REDACTED]"),
(r"password\s*[:=]\s*\S+", "[PASSWORD_REDACTED]"),
(r"bearer\s+\S+", "[TOKEN_REDACTED]"),
(r"-----BEGIN.*KEY-----[\s\S]*-----END.*KEY-----", "[PRIVATE_KEY_REDACTED]"),
]
def filter_tool_response(response: str) -> str:
for pattern, replacement in RESPONSE_FILTERS:
response = re.sub(pattern, replacement, response, flags=re.I)
return responseKey rules:
- Always normalize encodings BEFORE pattern matching
- Block on critical threats (instruction override, role hijack)
- Redact high-severity patterns but allow the tool through
- Filter tool responses for secrets before they reach the LLM
- Test with known attack payloads: base64, homoglyphs, HTML entities
Set up MCP servers with proper lifecycle management and structured error handling — HIGH
Server Setup
Use FastMCP with lifespan context for shared resources. Define tools with explicit schemas and return errors as text content.
Incorrect -- no lifecycle, raw exception:
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("my-server")
db = Database.connect() # Global -- never cleaned up
@mcp.tool()
def query(sql: str) -> str:
return db.query(sql) # Crashes on connection failureCorrect -- FastMCP with lifespan and error handling:
from contextlib import asynccontextmanager
from collections.abc import AsyncIterator
from dataclasses import dataclass
from mcp.server.fastmcp import Context, FastMCP
@dataclass
class AppContext:
db: Database
cache: CacheService
@asynccontextmanager
async def app_lifespan(server: FastMCP) -> AsyncIterator[AppContext]:
db = await Database.connect()
cache = await CacheService.connect()
try:
yield AppContext(db=db, cache=cache)
finally:
await cache.disconnect()
await db.disconnect()
mcp = FastMCP("my-server", lifespan=app_lifespan)
@mcp.tool()
def query(sql: str, ctx: Context) -> str:
"""Execute a read-only SQL query. Returns up to 100 rows."""
try:
app = ctx.request_context.lifespan_context
return app.db.query(sql)
except DatabaseError as e:
return f"Error: {e}" # Claude sees and can retryTool definition best practices:
from mcp.types import Tool
Tool(
name="search_products",
description="Search product catalog. Returns up to 10 results.",
inputSchema={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search terms"},
"category": {
"type": "string",
"enum": ["electronics", "clothing", "books"],
},
"max_results": {
"type": "integer", "minimum": 1, "maximum": 50, "default": 10,
},
},
"required": ["query"],
},
)Key rules:
- Always use lifespan for database connections, caches, HTTP clients
- Return errors as
TextContent-- never raise unhandled exceptions - Include
descriptionfor every schema property - Use
enumfor fixed option sets,minimum/maximumfor numbers - Use
asyncio.to_thread()for blocking synchronous operations - Limit response sizes (Claude has context limits)
Choose the right MCP transport for production multi-client and deployment requirements — HIGH
Server Transport
Choose stdio for CLI/Desktop, Streamable HTTP for web apps and production multi-client. SSE is deprecated.
Transport decision matrix:
| Transport | Use Case | Pros | Cons |
|---|---|---|---|
| stdio | CLI, Claude Desktop | Simple, no network | Single client only |
| SSE | Deprecated | Browser-compatible | Deprecated since March 2025 |
| Streamable HTTP | Web apps, production APIs | Multi-client, scalable, stateless option | More setup |
Incorrect -- hardcoded transport, no configuration:
# Forces stdio -- can't switch to web deployment
from mcp.server.stdio import stdio_server
async def main():
async with stdio_server() as (read, write):
await server.run(read, write, server.create_initialization_options())Correct -- Python stdio server:
from mcp.server import Server
from mcp.server.stdio import stdio_server
server = Server("my-tools")
# Register handlers...
async def main():
async with stdio_server() as (read, write):
await server.run(read, write, server.create_initialization_options())
if __name__ == "__main__":
import asyncio
asyncio.run(main())Correct -- TypeScript stdio server:
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
const server = new Server(
{ name: "my-tools", version: "1.0.0" },
{ capabilities: { tools: {} } }
);
// Register handlers...
await server.connect(new StdioServerTransport());Deprecated -- SSE for web deployment (use Streamable HTTP instead):
SSE transport was deprecated in March 2025. Migrate to Streamable HTTP for new projects. SSE remains functional but receives no new features.
from mcp.server.sse import SseServerTransport
from starlette.applications import Starlette
from starlette.routing import Route
sse = SseServerTransport("/messages")
async def handle_sse(request):
async with sse.connect_sse(
request.scope, request.receive, request._send
) as streams:
await server.run(
streams[0], streams[1],
server.create_initialization_options()
)
app = Starlette(routes=[
Route("/sse", endpoint=handle_sse),
Route("/messages", endpoint=sse.handle_post_message, methods=["POST"]),
])Correct -- Streamable HTTP server (Python, recommended):
from mcp.server.mcpserver import MCPServer
mcp = MCPServer("my-tools")
@mcp.tool()
def greet(name: str = "World") -> str:
"""Greet someone by name."""
return f"Hello, {name}!"
if __name__ == "__main__":
# Stateless with JSON responses -- best for production
mcp.run(transport="streamable-http", stateless_http=True, json_response=True)
# Stateful with session persistence (when needed):
# mcp.run(transport="streamable-http")Correct -- Streamable HTTP server (TypeScript, recommended):
import { createServer } from "node:http";
import { NodeStreamableHTTPServerTransport } from "@modelcontextprotocol/node";
import { McpServer } from "@modelcontextprotocol/server";
const server = new McpServer({ name: "my-tools", version: "1.0.0" });
// Register handlers...
createServer(async (req, res) => {
const transport = new NodeStreamableHTTPServerTransport({
sessionIdGenerator: undefined, // stateless; use () => randomUUID() for sessions
});
await server.connect(transport);
await transport.handleRequest(req, res);
}).listen(3000);Migrating SSE → Streamable HTTP:
- Python: Replace
SseServerTransportwithMCPServer.run(transport="streamable-http") - TypeScript: Replace
SSEServerTransportwithNodeStreamableHTTPServerTransport - Client endpoint changes from
/sse+/messagesto single/mcppath - Streamable HTTP supports both stateless (scalable) and stateful (session) modes
Claude Desktop configuration:
{
"mcpServers": {
"my-tools": {
"command": "npx",
"args": ["-y", "@myorg/my-tools"],
"env": { "DATABASE_URL": "postgres://..." }
},
"python-tools": {
"command": "uv",
"args": ["run", "python", "-m", "my_mcp_server"],
"cwd": "/path/to/project"
}
}
}Key rules:
- Use Streamable HTTP for all new web/production deployments (SSE is deprecated)
- Use
uv(notpip) for Python MCP server commands in Claude Desktop config - Set
cwdwhen the server needs access to project files - Pass secrets via
env, never hardcode in args - TypeScript servers: use
npx -yfor zero-install execution - Prefer stateless mode (
stateless_http=True) unless session persistence is required
Test and debug MCP servers to catch broken tools and transport failures before production — MEDIUM
Testing & Debugging
Write automated tests for every tool using the SDK's in-process Client, and use MCP Inspector for interactive debugging of transports and auth.
Incorrect -- manual testing only, no assertions:
# "I'll just test it in Claude Desktop"
mcp = FastMCP("my-server")
@mcp.tool()
def search(query: str) -> str:
return db.search(query)
# No tests, no fixtures, no CI -- bugs found by end usersCorrect -- unit tests with in-process Client:
import pytest
from mcp import Client
from mcp.types import CallToolResult, TextContent
from server import app
@pytest.fixture
def anyio_backend():
return "asyncio"
@pytest.fixture
async def client():
async with Client(app, raise_exceptions=True) as c:
yield c
@pytest.mark.anyio
async def test_search_returns_results(client: Client):
result = await client.call_tool("search", {"query": "test"})
assert isinstance(result, CallToolResult)
assert len(result.content) > 0
assert result.content[0].type == "text"
@pytest.mark.anyio
async def test_search_empty_query(client: Client):
result = await client.call_tool("search", {"query": ""})
assert "Error" in result.content[0].text # Graceful error, not crashCorrect -- parametrized edge-case tests:
@pytest.mark.anyio
@pytest.mark.parametrize("args", [{"query": ""}, {"max_results": -1}, {}])
async def test_invalid_inputs_return_errors(client: Client, args):
result = await client.call_tool("search", args)
assert result.isError or "Error" in result.content[0].textCorrect -- integration test with stdio transport:
import subprocess, json
def test_stdio_transport_connects():
"""Verify the server starts and responds to initialize over stdio."""
proc = subprocess.Popen(
["uv", "run", "server.py"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
)
init_msg = {"jsonrpc": "2.0", "id": 1, "method": "initialize",
"params": {"capabilities": {}, "clientInfo": {"name": "test"},
"protocolVersion": "2025-03-26"}}
proc.stdin.write(json.dumps(init_msg).encode() + b"\n")
proc.stdin.flush()
line = proc.stdout.readline()
assert b'"result"' in line # Server responded to init
proc.terminate()Interactive debugging with MCP Inspector:
# Inspect a local Python server
npx @modelcontextprotocol/inspector uv run server.py
# Inspect a PyPI package
npx @modelcontextprotocol/inspector uvx mcp-server-git --repository ~/repo
# Inspect with environment variables
npx @modelcontextprotocol/inspector -e API_KEY=xxx uv run server.py
# Use Inspector to: list tools/resources, test tool calls with custom
# inputs, check capability negotiation, and view server logs.
# For scaffolding new servers, see the mcp-builder skill.Debug common connection failures:
# Timeout: slow lifespan init blocks connection -- keep lifespan under 5s
# Auth 401: pass secrets via Inspector's -e flag or .env file
# "Connection refused": wrong transport -- match stdio vs Streamable HTTP
# Hang on tool call: blocking sync code -- wrap with asyncio.to_thread()Key rules:
- Use
Client(app, raise_exceptions=True)for unit tests -- no transport overhead - Test both valid inputs and edge cases (empty, missing, out-of-range)
- Use
@pytest.mark.anyiowithanyio_backendfixture for async tests - Use MCP Inspector (
npx @modelcontextprotocol/inspector) for interactive debugging - Keep lifespan initialization under 5s so Inspector and clients can connect
- Test stdio transport separately with
subprocessfor integration coverage - Install test deps:
pip install inline-snapshot pytest anyio
Integrate WebMCP browser mediation correctly to avoid confusing it with standard MCP — LOW
WebMCP Browser Integration
WebMCP is a W3C Community Group standard that exposes structured tools to AI agents inside the browser via navigator.modelContext. It complements MCP (not a replacement) — MCP handles AI-to-backend over JSON-RPC, WebMCP handles AI-to-browser-UI via in-page callbacks.
Incorrect -- registering tools without input schema or user mediation:
// No schema, no description, no user interaction handling
navigator.modelContext.registerTool({
name: "submit-order",
description: "Submit order",
execute: async (input) => {
// Directly mutates state with no user confirmation
await fetch("/api/orders", { method: "POST", body: JSON.stringify(input) });
return { status: "submitted" };
},
});Correct -- full schema, annotations, and user interaction request:
navigator.modelContext.registerTool({
name: "submit-order",
description: "Submit the current shopping cart as an order. Requires user confirmation.",
inputSchema: {
type: "object",
properties: {
cartId: { type: "string", description: "Cart identifier" },
shipping: { type: "string", enum: ["standard", "express"] },
},
required: ["cartId"],
},
annotations: { readOnlyHint: false },
execute: async (input, client) => {
// Request explicit user confirmation before mutating state
const confirmed = await client.requestUserInteraction(async () => {
return window.confirm(`Place order for cart ${input.cartId}?`);
});
if (!confirmed) return { status: "cancelled_by_user" };
const res = await fetch("/api/orders", {
method: "POST",
body: JSON.stringify(input),
});
return { status: "submitted", orderId: (await res.json()).id };
},
});Read-only tool with annotations:
navigator.modelContext.registerTool({
name: "get-product-details",
description: "Retrieve product name, price, and availability from the current page.",
inputSchema: {
type: "object",
properties: {
productId: { type: "string", description: "Product ID visible on page" },
},
required: ["productId"],
},
annotations: { readOnlyHint: true },
execute: async (input) => {
const el = document.querySelector(`[data-product-id="${input.productId}"]`);
return el ? { name: el.dataset.name, price: el.dataset.price } : { error: "Not found" };
},
});When to use MCP vs WebMCP:
| Concern | MCP | WebMCP |
|---|---|---|
| Transport | JSON-RPC (stdio / SSE / HTTP) | In-page callbacks |
| Runs on | Server / backend | Browser (SecureContext) |
| Use case | DB queries, APIs, file I/O | DOM access, form fill, UI actions |
| Auth | OAuth 2.1 / tokens | Browser-mediated permission |
Key rules:
- WebMCP complements MCP — use MCP for backend services, WebMCP for browser-side UI tools
- Always provide
inputSchemawith property descriptions so agents understand parameters - Set
annotations.readOnlyHint: trueon tools that only read data (no side effects) - Use
client.requestUserInteraction()before any state-mutating operation - WebMCP requires
SecureContext(HTTPS only) —navigator.modelContextis undefined on HTTP - Call
unregisterTool(name)orclearContext()during SPA route teardown to prevent stale tools - Keep tool descriptions specific — agents select tools by description, not by probing
Checklists (1)
Mcp Server Checklist
MCP Server Pre-Deployment Checklist
Server Setup
- FastMCP lifespan used for resource management
- Transport selected (stdio for CLI, SSE for web, Streamable HTTP for production)
- All tools have descriptive docstrings
- Input validation on all tool arguments
- Error responses return text content (not exceptions)
Security Hardening
- Zero-trust tool allowlist configured
- Tool description sanitization enabled
- Hash verification on tool invocations
- No secrets in tool output (API keys, credentials)
- Human-in-the-loop for high-risk operations
- Encoding normalization applied to inputs
- Injection pattern detection active
Resource Management
- Response sizes bounded (Claude context limits)
- Resource caching with TTL and LRU eviction
- Memory cap configured for resource cache
- No blocking synchronous code in async server
Testing
- Tool invocations tested with valid and invalid inputs
- Security rules validated against injection payloads
- Transport failover tested (reconnect behavior)
- Lifecycle cleanup verified (no leaked connections)
Llm Integration
LLM integration patterns for function calling, streaming responses, local inference with Ollama, and fine-tuning customization. Use when implementing tool use, SSE streaming, local model deployment, LoRA/QLoRA fine-tuning, or multi-provider LLM APIs.
Memory
Read-side memory operations: search, recall, load, sync, history, visualize. Use when searching past decisions, loading session context, or viewing the knowledge graph.
Last updated on