Async Jobs
Async job processing patterns for background tasks, Celery workflows, task scheduling, retry strategies, and distributed task execution. Use when implementing background job processing, task queues, or scheduled task systems.
Primary Agent: python-performance-engineer
Async Jobs
Patterns for background task processing with Celery, ARQ, and Redis. Covers task queues, canvas workflows, scheduling, retry strategies, rate limiting, and production monitoring. Each category has individual rule files in references/ loaded on-demand.
Quick Reference
| Category | Rules | Impact | When to Use |
|---|---|---|---|
| Configuration | celery-config | HIGH | Celery app setup, broker, serialization, worker tuning |
| Task Routing | task-routing | HIGH | Priority queues, multi-queue workers, dynamic routing |
| Canvas Workflows | canvas-workflows | HIGH | Chain, group, chord, nested workflows |
| Retry Strategies | retry-strategies | HIGH | Exponential backoff, idempotency, dead letter queues |
| Scheduling | scheduled-tasks | MEDIUM | Celery Beat, crontab, database-backed schedules |
| Monitoring | monitoring-health | MEDIUM | Flower, custom events, health checks, metrics |
| Result Backends | result-backends | MEDIUM | Redis results, custom states, progress tracking |
| ARQ Patterns | arq-patterns | MEDIUM | Async Redis Queue for FastAPI, lightweight jobs |
| Temporal Workflows | temporal-workflows | HIGH | Durable workflow definitions, sagas, signals, queries |
| Temporal Activities | temporal-activities | HIGH | Activity patterns, workers, heartbeats, testing |
Total: 10 rules across 9 categories
Quick Start
# Celery task with retry
from celery import shared_task
@shared_task(
bind=True,
max_retries=3,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True,
)
def process_order(self, order_id: str) -> dict:
result = do_processing(order_id)
return {"order_id": order_id, "status": "completed"}# ARQ task with FastAPI
from arq import create_pool
from arq.connections import RedisSettings
async def generate_report(ctx: dict, report_id: str) -> dict:
data = await ctx["db"].fetch_report_data(report_id)
pdf = await render_pdf(data)
return {"report_id": report_id, "size": len(pdf)}
@router.post("/api/v1/reports")
async def create_report(data: ReportRequest, arq: ArqRedis = Depends(get_arq_pool)):
job = await arq.enqueue_job("generate_report", data.report_id)
return {"job_id": job.job_id}Configuration
Production Celery app configuration with secure defaults and worker tuning.
Key Patterns
- JSON serialization with
task_serializer="json"for safety - Late acknowledgment with
task_acks_late=Trueto prevent task loss on crash - Time limits with both
task_time_limit(hard) andtask_soft_time_limit(soft) - Fair distribution with
worker_prefetch_multiplier=1 - Reject on lost with
task_reject_on_worker_lost=True
Key Decisions
| Decision | Recommendation |
|---|---|
| Serializer | JSON (never pickle) |
| Ack mode | Late ack (task_acks_late=True) |
| Prefetch | 1 for fair, 4-8 for throughput |
| Time limit | soft < hard (e.g., 540/600) |
| Timezone | UTC always |
Task Routing
Priority queue configuration with multi-queue workers and dynamic routing.
Key Patterns
- Named queues for critical/high/default/low/bulk separation
- Redis priority with
queue_order_strategy: "priority"and 0-9 levels - Task router classes for dynamic routing based on task attributes
- Per-queue workers with tuned concurrency and prefetch settings
- Content-based routing for dynamic workflow dispatch
Key Decisions
| Decision | Recommendation |
|---|---|
| Queue count | 3-5 (critical/high/default/low/bulk) |
| Priority levels | 0-9 with Redis x-max-priority |
| Worker assignment | Dedicated workers per queue |
| Prefetch | 1 for critical, 4-8 for bulk |
| Routing | Router class for 5+ routing rules |
Canvas Workflows
Celery canvas primitives for sequential, parallel, and fan-in/fan-out workflows.
Key Patterns
- Chain for sequential ETL pipelines with result passing
- Group for parallel execution of independent tasks
- Chord for fan-out/fan-in with aggregation callback
- Immutable signatures (
si()) for steps that ignore input - Nested workflows combining groups inside chains
- Link error callbacks for workflow-level error handling
Key Decisions
| Decision | Recommendation |
|---|---|
| Sequential | Chain with s() |
| Parallel | Group for independent tasks |
| Fan-in | Chord (all must succeed for callback) |
| Ignore input | Use si() immutable signature |
| Error in chain | Reject stops chain, retry continues |
| Partial failures | Return error dict in chord tasks |
Retry Strategies
Retry patterns with exponential backoff, idempotency, and dead letter queues.
Key Patterns
- Exponential backoff with
retry_backoff=Trueandretry_backoff_max - Jitter with
retry_jitter=Trueto prevent thundering herd - Idempotency keys in Redis to prevent duplicate processing
- Dead letter queues for failed tasks requiring manual review
- Task locking to prevent concurrent execution of singleton tasks
- Base task classes with shared retry configuration
Key Decisions
| Decision | Recommendation |
|---|---|
| Retry delay | Exponential backoff with jitter |
| Max retries | 3-5 for transient, 0 for permanent |
| Idempotency | Redis key with TTL |
| Failed tasks | DLQ for manual review |
| Singleton | Redis lock with TTL |
Scheduling
Celery Beat periodic task configuration with crontab, database-backed schedules, and overlap prevention.
Key Patterns
- Crontab for time-based schedules (daily, weekly, monthly)
- Interval for fixed-frequency tasks (every N seconds)
- Database scheduler with
django-celery-beatfor dynamic schedules - Schedule locks to prevent overlapping long-running scheduled tasks
- Adaptive polling with self-rescheduling tasks
Key Decisions
| Decision | Recommendation |
|---|---|
| Schedule type | Crontab for time-based, interval for frequency |
| Dynamic | Database scheduler (django-celery-beat) |
| Overlap | Redis lock with timeout |
| Beat process | Separate process (not embedded) |
| Timezone | UTC always |
Monitoring
Production monitoring with Flower, custom signals, health checks, and Prometheus metrics.
Key Patterns
- Flower dashboard for real-time task monitoring
- Celery signals (
task_prerun,task_postrun,task_failure) for metrics - Health check endpoint verifying broker connection and active workers
- Queue depth monitoring for autoscaling decisions
- Beat monitoring for scheduled task dispatch tracking
Key Decisions
| Decision | Recommendation |
|---|---|
| Dashboard | Flower with persistent storage |
| Metrics | Prometheus via celery signals |
| Health | Broker + worker + queue depth |
| Alerting | Signal on task_failure |
| Autoscale | Queue depth > threshold |
Result Backends
Task result storage, custom states, and progress tracking patterns.
Key Patterns
- Redis backend for task status and small results
- Custom task states (VALIDATING, PROCESSING, UPLOADING) for progress
update_state()for real-time progress reporting- S3/database for large result storage (never Redis)
- AsyncResult for querying task state and progress
Key Decisions
| Decision | Recommendation |
|---|---|
| Status storage | Redis result backend |
| Large results | S3 or database (never Redis) |
| Progress | Custom states with update_state() |
| Result query | AsyncResult with state checks |
ARQ Patterns
Lightweight async Redis Queue for FastAPI and simple background tasks.
Key Patterns
- Native async/await with
arqfor FastAPI integration - Worker lifecycle with
startup/shutdownhooks for resource management - Job enqueue from FastAPI routes with
enqueue_job() - Job status tracking with
Job.status()andJob.result() - Delayed tasks with
_delay=timedelta()for deferred execution
Key Decisions
| Decision | Recommendation |
|---|---|
| Simple async | ARQ (native async) |
| Complex workflows | Celery (chains, chords) |
| In-process quick | FastAPI BackgroundTasks |
| LLM workflows | LangGraph (not Celery) |
Tool Selection
| Tool | Best For | Complexity |
|---|---|---|
| ARQ | FastAPI, simple async jobs | Low |
| Celery | Complex workflows, enterprise | High |
| RQ | Simple Redis queues | Low |
| Dramatiq | Reliable messaging | Medium |
| FastAPI BackgroundTasks | In-process quick tasks | Minimal |
Anti-Patterns (FORBIDDEN)
# NEVER run long tasks synchronously in request handlers
@router.post("/api/v1/reports")
async def create_report(data: ReportRequest):
pdf = await generate_pdf(data) # Blocks for minutes!
# NEVER block on results inside tasks (causes deadlock)
@celery_app.task
def bad_task():
result = other_task.delay()
return result.get() # Blocks worker!
# NEVER store large results in Redis
@shared_task
def process_file(file_id: str) -> bytes:
return large_file_bytes # Store in S3/DB instead!
# NEVER skip idempotency for retried tasks
@celery_app.task(max_retries=3)
def create_order(order):
Order.create(order) # Creates duplicates on retry!
# NEVER use BackgroundTasks for distributed work
background_tasks.add_task(long_running_job) # Lost if server restarts
# NEVER ignore task acknowledgment settings
celery_app.conf.task_acks_late = False # Default loses tasks on crash
# ALWAYS use immutable signatures in chords
chord([task.s(x) for x in items], callback.si()) # si() prevents arg pollutionTemporal Workflows
Durable execution engine for reliable distributed applications with Temporal.io.
Key Patterns
- Workflow definitions with
@workflow.defnand deterministic code - Saga pattern with compensation for multi-step transactions
- Signals and queries for external interaction with running workflows
- Timers with
workflow.wait_condition()for human-in-the-loop - Parallel activities via
asyncio.gatherinside workflows
Key Decisions
| Decision | Recommendation |
|---|---|
| Workflow ID | Business-meaningful, idempotent |
| Determinism | Use workflow.random(), workflow.now() |
| I/O | Always via activities, never directly |
Temporal Activities
Activity and worker patterns for Temporal.io I/O operations.
Key Patterns
- Activity definitions with
@activity.defnfor all I/O - Heartbeating for long-running activities (> 60s)
- Error classification with
ApplicationError(non_retryable=True)for business errors - Worker configuration with dedicated task queues
- Testing with
WorkflowEnvironment.start_local()
Key Decisions
| Decision | Recommendation |
|---|---|
| Activity timeout | start_to_close for most cases |
| Error handling | Non-retryable for business errors |
| Testing | WorkflowEnvironment for integration tests |
Related Skills
ork:python-backend- FastAPI, asyncio, SQLAlchemy patternsork:langgraph- LangGraph workflow patterns (use for LLM workflows, not Celery)ork:distributed-systems- Resilience patterns, circuit breakersork:monitoring-observability- Metrics and alerting
Capability Details
celery-config
Keywords: celery, configuration, broker, worker, setup Solves:
- Production Celery app configuration
- Broker and backend setup
- Worker tuning and time limits
task-routing
Keywords: priority, queue, routing, high priority, worker Solves:
- Premium user task prioritization
- Multi-queue worker deployment
- Dynamic task routing
canvas-workflows
Keywords: chain, group, chord, signature, canvas, workflow, pipeline Solves:
- Complex multi-step task pipelines
- Parallel task execution with aggregation
- Sequential task dependencies
retry-strategies
Keywords: retry, backoff, idempotency, dead letter, resilience Solves:
- Exponential backoff with jitter
- Duplicate prevention for retried tasks
- Failed task handling with DLQ
scheduled-tasks
Keywords: periodic, scheduled, cron, celery beat, interval Solves:
- Run tasks on schedule (crontab)
- Dynamic schedule management
- Overlap prevention for long tasks
monitoring-health
Keywords: flower, monitoring, health check, metrics, alerting Solves:
- Production task monitoring dashboard
- Worker health checks
- Queue depth autoscaling
result-backends
Keywords: result, state, progress, AsyncResult, status Solves:
- Task progress tracking with custom states
- Result storage strategies
- Job status API endpoints
arq-patterns
Keywords: arq, async queue, redis queue, fastapi background Solves:
- Lightweight async background tasks for FastAPI
- Simple Redis job queue with async/await
- Job status tracking
Rules (6)
Compose multi-step task workflows using Celery canvas primitives and chains — HIGH
Canvas Workflows
Chains (Sequential)
from celery import chain
workflow = chain(
extract_data.s(source_id), # Returns raw_data
transform_data.s(), # Receives raw_data
load_data.s(destination_id), # Receives clean_data
)
result = workflow.apply_async()Groups (Parallel)
from celery import group
parallel = group(process_chunk.s(chunk) for chunk in chunks)
group_result = parallel.apply_async()
results = group_result.get() # List of resultsChords (Parallel + Callback)
from celery import chord
workflow = chord(
[process_chunk.s(chunk) for chunk in chunks],
aggregate_results.s() # Receives list of all results
)
result = workflow.apply_async()Signatures
# Reusable task signature
sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})
# Immutable signature (won't receive results from previous task)
sig = process_order.si(order_id)
# Partial signature (curry arguments)
partial_sig = send_email.s(subject="Order Update")Map and Starmap
workflow = process_item.map([item1, item2, item3])
workflow = send_email.starmap([("user1@ex.com", "S1"), ("user2@ex.com", "S2")])
workflow = process_item.chunks(items, batch_size=100)Incorrect — Mutable signature in chord:
# Body receives polluted args from each parallel task
chord(
[process_chunk.s(chunk) for chunk in chunks],
aggregate_results.s() # Receives all chunk results concatenated!
)Correct — Immutable signature in chord:
# Body receives clean list of results
chord(
[process_chunk.si(chunk) for chunk in chunks], # .si() = immutable
aggregate_results.s() # Receives [result1, result2, result3]
)Error Handling
- Chain stops when any task fails; subsequent tasks don't run
- If any chord header task fails, the body won't execute
- Always use
si()(immutable signatures) in chords to prevent arg pollution
Track background job status and execution metrics for operational visibility — HIGH
Job Status Tracking
Job Status Enum
from enum import Enum
class JobStatus(Enum):
PENDING = "pending"
STARTED = "started"
PROGRESS = "progress"
SUCCESS = "success"
FAILURE = "failure"
REVOKED = "revoked"ARQ Status Endpoint
@router.get("/api/v1/jobs/{job_id}")
async def get_job_status(job_id: str, arq: ArqRedis = Depends(get_arq_pool)):
job = Job(job_id, arq)
status = await job.status()
result = await job.result() if status == JobStatus.complete else None
return {"job_id": job_id, "status": status, "result": result}Celery Progress Updates
@shared_task(bind=True)
def generate_report(self, report_id: str) -> dict:
self.update_state(state="PROGRESS", meta={"step": "fetching"})
data = fetch_report_data(report_id)
self.update_state(state="PROGRESS", meta={"step": "rendering"})
pdf = render_pdf(data)
return {"report_id": report_id, "size": len(pdf)}Incorrect — No progress updates:
@shared_task
def generate_report(report_id: str):
# Long-running task with no feedback
data = fetch_report_data(report_id)
pdf = render_pdf(data)
return {"report_id": report_id}Correct — Progress updates:
@shared_task(bind=True)
def generate_report(self, report_id: str):
self.update_state(state="PROGRESS", meta={"step": "fetching", "percent": 25})
data = fetch_report_data(report_id)
self.update_state(state="PROGRESS", meta={"step": "rendering", "percent": 75})
pdf = render_pdf(data)
return {"report_id": report_id}Celery Status Endpoint
@router.get("/api/v1/jobs/{job_id}")
async def get_job(job_id: str):
result = AsyncResult(job_id, app=celery_app)
return {
"job_id": job_id,
"status": result.status,
"result": result.result if result.ready() else None,
"progress": result.info if result.status == "PROGRESS" else None,
}Schedule reliable periodic background tasks without overlap or timing drift — HIGH
Scheduling & Background Tasks
Celery Beat (Periodic Tasks)
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"cleanup-expired-sessions": {
"task": "app.workers.tasks.cleanup_sessions",
"schedule": crontab(minute=0, hour="*/6"), # Every 6 hours
},
"generate-daily-report": {
"task": "app.workers.tasks.daily_report",
"schedule": crontab(minute=0, hour=2), # 2 AM daily
},
"sync-external-data": {
"task": "app.workers.tasks.sync_data",
"schedule": 300.0, # Every 5 minutes
},
}FastAPI BackgroundTasks (In-Process)
from fastapi import BackgroundTasks
@router.post("/api/v1/users")
async def create_user(data: UserCreate, background_tasks: BackgroundTasks):
user = await service.create_user(data)
background_tasks.add_task(send_welcome_email, user.email)
return userFastAPI + Distributed Queue
@router.post("/api/v1/exports")
async def create_export(data: ExportRequest, arq: ArqRedis = Depends(get_arq_pool)):
job = await arq.enqueue_job("export_data", data.dict())
return {"job_id": job.job_id}Incorrect — Using BackgroundTasks for long jobs:
# In-process task blocks other requests
@router.post("/export")
async def create_export(background_tasks: BackgroundTasks):
background_tasks.add_task(generate_large_export) # 5+ minutes!
return {"status": "started"}Correct — Use distributed queue:
# Offload to worker, instant response
@router.post("/export")
async def create_export(arq: ArqRedis = Depends(get_arq_pool)):
job = await arq.enqueue_job("generate_large_export")
return {"job_id": job.job_id}Key Decisions
| Scenario | Use |
|---|---|
| Quick, non-critical | FastAPI BackgroundTasks |
| Periodic/scheduled | Celery Beat |
| Distributed, durable | ARQ or Celery |
| LLM workflows | LangGraph (not Celery) |
Set up task queues as the foundation for reliable background job processing — HIGH
Task Queue Setup
ARQ (Async Redis Queue)
from arq import create_pool
from arq.connections import RedisSettings
async def startup(ctx: dict):
ctx["db"] = await create_db_pool()
ctx["http"] = httpx.AsyncClient()
async def shutdown(ctx: dict):
await ctx["db"].close()
await ctx["http"].aclose()
class WorkerSettings:
redis_settings = RedisSettings(host="redis", port=6379)
functions = [send_email, generate_report, process_webhook]
on_startup = startup
on_shutdown = shutdown
max_jobs = 10
job_timeout = 300ARQ Task Definition
async def send_email(ctx: dict, to: str, subject: str, body: str) -> dict:
http = ctx["http"]
response = await http.post("https://api.sendgrid.com/v3/mail/send",
json={"to": to, "subject": subject, "html": body},
headers={"Authorization": f"Bearer {SENDGRID_KEY}"})
return {"status": response.status_code, "to": to}Celery Setup
from celery import Celery
celery_app = Celery("orchestkit",
broker="redis://redis:6379/0", backend="redis://redis:6379/1")
celery_app.conf.update(
task_serializer="json",
task_track_started=True,
task_time_limit=600,
task_soft_time_limit=540,
worker_prefetch_multiplier=1,
task_acks_late=True,
task_reject_on_worker_lost=True,
)Celery Task with Retry
@shared_task(bind=True, max_retries=3, default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError))
def send_email(self, to: str, subject: str, body: str) -> dict:
try:
response = requests.post(url, json=data, timeout=30)
response.raise_for_status()
return {"status": "sent", "to": to}
except Exception as exc:
raise self.retry(exc=exc)Incorrect — No retry strategy:
# Fails permanently on first error
@shared_task
def send_email(to: str, subject: str):
response = requests.post(url, json=data) # Network error = lost job
return response.json()Correct — Retry with exponential backoff:
# Retries with backoff
@shared_task(bind=True, max_retries=3, default_retry_delay=60,
autoretry_for=(ConnectionError, TimeoutError))
def send_email(self, to: str, subject: str):
try:
response = requests.post(url, json=data, timeout=30)
response.raise_for_status()
return response.json()
except Exception as exc:
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))Tool Selection
| Tool | Best For | Complexity |
|---|---|---|
| ARQ | FastAPI, async jobs | Low |
| Celery | Complex workflows | High |
| RQ | Simple Redis queues | Low |
| FastAPI BackgroundTasks | Quick in-process | None |
Configure Temporal activity timeouts, heartbeats, and retry policies to prevent data loss — HIGH
Temporal Activity and Worker Patterns
Incorrect — missing heartbeat and error classification:
@activity.defn
async def process_payment(input: PaymentInput) -> PaymentResult:
# WRONG: No heartbeat for long operation
# WRONG: No error classification (all errors retry)
response = await httpx.post("https://payments.example.com/charge",
json={"order_id": input.order_id, "amount": input.amount})
return PaymentResult(**response.json())Correct — heartbeat, error classification, and proper worker setup:
from temporalio import activity
from temporalio.exceptions import ApplicationError
@activity.defn
async def process_payment(input: PaymentInput) -> PaymentResult:
activity.logger.info(f"Processing payment for order {input.order_id}")
try:
async with httpx.AsyncClient() as client:
response = await client.post(
"https://payments.example.com/charge",
json={"order_id": input.order_id, "amount": input.amount})
response.raise_for_status()
return PaymentResult(**response.json())
except httpx.HTTPStatusError as e:
if e.response.status_code == 402:
# Non-retryable: payment declined is a business error
raise ApplicationError("Payment declined",
non_retryable=True, type="PaymentDeclined")
raise # Retryable: transient HTTP errors
@activity.defn
async def send_notification(input: NotificationInput) -> None:
for i, recipient in enumerate(input.recipients):
# Heartbeat for long operations (required for activities > 60s)
activity.heartbeat(f"Sending {i+1}/{len(input.recipients)}")
await send_email(recipient, input.subject, input.body)Worker Configuration
from temporalio.client import Client
from temporalio.worker import Worker
async def main():
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="order-processing",
workflows=[OrderWorkflow],
activities=[create_order, process_payment, reserve_inventory, cancel_order_activity],
)
await worker.run()
async def start_order_workflow(order_data: OrderInput) -> str:
client = await Client.connect("localhost:7233")
handle = await client.start_workflow(
OrderWorkflow.run, order_data,
id=f"order-{order_data.order_id}",
task_queue="order-processing",
)
return handle.idTesting with WorkflowEnvironment
import pytest
from temporalio.testing import WorkflowEnvironment
@pytest.fixture
async def workflow_env():
async with await WorkflowEnvironment.start_local() as env:
yield env
@pytest.mark.asyncio
async def test_order_workflow(workflow_env):
async with Worker(workflow_env.client, task_queue="test",
workflows=[OrderWorkflow],
activities=[create_order, process_payment]):
result = await workflow_env.client.execute_workflow(
OrderWorkflow.run, OrderInput(id="test-1", total=100),
id="test-order-1", task_queue="test",
)
assert result.order_id == "test-1"Key Decisions
| Decision | Recommendation |
|---|---|
| Activity timeout | start_to_close for most cases |
| Retry policy | 3 attempts default, exponential backoff |
| Heartbeating | Required for activities > 60s |
| Error handling | ApplicationError(non_retryable=True) for business errors |
| Worker deployment | Separate workers per task queue in production |
| Testing | WorkflowEnvironment.start_local() for integration tests |
Define deterministic Temporal workflows with correct signal and query patterns — HIGH
Temporal Workflow Definitions
Incorrect — non-deterministic workflow operations:
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order_data: OrderInput) -> OrderResult:
# WRONG: Non-deterministic in workflow code
if random.random() > 0.5:
await do_something()
if datetime.now() > deadline:
await cancel()
# WRONG: Direct I/O in workflow
response = await httpx.get("https://api.example.com")Correct — deterministic workflow with proper APIs:
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
@workflow.defn
class OrderWorkflow:
def __init__(self):
self._status = "pending"
self._order_id: str | None = None
@workflow.run
async def run(self, order_data: OrderInput) -> OrderResult:
self._order_id = await workflow.execute_activity(
create_order, order_data,
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=1)),
)
self._status = "processing"
# Parallel activities via asyncio.gather
payment, inventory = await asyncio.gather(
workflow.execute_activity(process_payment, PaymentInput(order_id=self._order_id),
start_to_close_timeout=timedelta(minutes=5)),
workflow.execute_activity(reserve_inventory, InventoryInput(order_id=self._order_id),
start_to_close_timeout=timedelta(minutes=2)),
)
self._status = "completed"
return OrderResult(order_id=self._order_id, payment_id=payment.id)
@workflow.query
def get_status(self) -> str:
return self._status
@workflow.signal
async def cancel_order(self, reason: str):
self._status = "cancelling"
await workflow.execute_activity(cancel_order_activity,
CancelInput(order_id=self._order_id),
start_to_close_timeout=timedelta(seconds=30))
self._status = "cancelled"Saga Pattern with Compensation
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order: OrderInput) -> OrderResult:
compensations: list[tuple[Callable, Any]] = []
try:
reservation = await workflow.execute_activity(
reserve_inventory, order.items,
start_to_close_timeout=timedelta(minutes=2))
compensations.append((release_inventory, reservation.id))
payment = await workflow.execute_activity(
charge_payment, PaymentInput(order_id=order.id),
start_to_close_timeout=timedelta(minutes=5))
compensations.append((refund_payment, payment.id))
shipment = await workflow.execute_activity(
create_shipment, ShipmentInput(order_id=order.id),
start_to_close_timeout=timedelta(minutes=3))
return OrderResult(order_id=order.id, payment_id=payment.id, shipment_id=shipment.id)
except Exception:
workflow.logger.warning(f"Saga failed, running {len(compensations)} compensations")
for compensate_fn, compensate_arg in reversed(compensations):
try:
await workflow.execute_activity(compensate_fn, compensate_arg,
start_to_close_timeout=timedelta(minutes=2))
except Exception as e:
workflow.logger.error(f"Compensation failed: {e}")
raiseKey Decisions
| Decision | Recommendation |
|---|---|
| Workflow ID | Business-meaningful, idempotent (e.g., order-\{order_id\}) |
| Task queue | Per-service or per-workflow-type |
| Determinism | Use workflow.random(), workflow.now() — never stdlib |
| I/O | Always via activities, never directly in workflows |
| Timers | workflow.wait_condition() with timeout for human-in-the-loop |
References (8)
Arq Patterns
ARQ Patterns
Lightweight async Redis Queue patterns for FastAPI and simple background tasks.
Worker Setup
# backend/app/workers/arq_worker.py
from arq import create_pool
from arq.connections import RedisSettings
async def startup(ctx: dict):
"""Initialize worker resources on startup."""
ctx["db"] = await create_db_pool()
ctx["http"] = httpx.AsyncClient()
async def shutdown(ctx: dict):
"""Cleanup worker resources on shutdown."""
await ctx["db"].close()
await ctx["http"].aclose()
class WorkerSettings:
redis_settings = RedisSettings(host="redis", port=6379)
functions = [
send_email,
generate_report,
process_webhook,
]
on_startup = startup
on_shutdown = shutdown
max_jobs = 10
job_timeout = 300 # 5 minutesTask Definition
from arq import func
async def send_email(
ctx: dict,
to: str,
subject: str,
body: str,
) -> dict:
"""Send email task using shared HTTP client."""
http = ctx["http"]
response = await http.post(
"https://api.sendgrid.com/v3/mail/send",
json={"to": to, "subject": subject, "html": body},
headers={"Authorization": f"Bearer {SENDGRID_KEY}"},
)
return {"status": response.status_code, "to": to}
async def generate_report(
ctx: dict,
report_id: str,
format: str = "pdf",
) -> dict:
"""Generate report asynchronously using shared DB pool."""
db = ctx["db"]
data = await db.fetch_report_data(report_id)
pdf_bytes = await render_pdf(data)
await db.save_report_file(report_id, pdf_bytes)
return {"report_id": report_id, "size": len(pdf_bytes)}FastAPI Integration
Enqueue Jobs
from arq import create_pool
from arq.connections import RedisSettings, ArqRedis
# Dependency
async def get_arq_pool() -> ArqRedis:
return await create_pool(RedisSettings(host="redis"))
@router.post("/api/v1/reports")
async def create_report(
data: ReportRequest,
arq: ArqRedis = Depends(get_arq_pool),
):
report = await service.create_report(data)
# Enqueue background job
job = await arq.enqueue_job(
"generate_report",
report.id,
format=data.format,
)
return {"report_id": report.id, "job_id": job.job_id}Job Status Endpoint
from arq.jobs import Job, JobStatus
@router.get("/api/v1/jobs/{job_id}")
async def get_job_status(
job_id: str,
arq: ArqRedis = Depends(get_arq_pool),
):
job = Job(job_id, arq)
status = await job.status()
result = await job.result() if status == JobStatus.complete else None
return {
"job_id": job_id,
"status": status.value,
"result": result,
}Delayed Tasks
from datetime import timedelta
# Run in 1 hour
job = await arq.enqueue_job(
"send_reminder",
user_id="123",
_defer_by=timedelta(hours=1),
)
# Run at specific time
from datetime import datetime, timezone
job = await arq.enqueue_job(
"send_notification",
user_id="123",
_defer_until=datetime(2024, 12, 25, 9, 0, tzinfo=timezone.utc),
)FastAPI Lifespan Integration
from contextlib import asynccontextmanager
from arq import create_pool
from arq.connections import RedisSettings
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Initialize ARQ pool at startup, close at shutdown."""
app.state.arq_pool = await create_pool(RedisSettings(host="redis"))
yield
await app.state.arq_pool.close()
app = FastAPI(lifespan=lifespan)
# Access pool in routes
async def get_arq(request: Request) -> ArqRedis:
return request.app.state.arq_poolFastAPI BackgroundTasks (In-Process)
from fastapi import BackgroundTasks
@router.post("/api/v1/users")
async def create_user(
data: UserCreate,
background_tasks: BackgroundTasks,
):
user = await service.create_user(data)
# Simple in-process background task
# WARNING: Lost if server restarts — only for non-critical work
background_tasks.add_task(send_welcome_email, user.email)
return userARQ vs Celery vs BackgroundTasks
| Feature | ARQ | Celery | BackgroundTasks |
|---|---|---|---|
| Async native | Yes | No (gevent/eventlet) | Yes |
| Broker | Redis only | Redis, RabbitMQ, SQS | None (in-process) |
| Workflows | No | Chain, group, chord | No |
| Monitoring | Basic | Flower, events | None |
| Persistence | Redis | Redis, RabbitMQ | None |
| Complexity | Low | High | Minimal |
When to Use ARQ
- Building with FastAPI and native async/await
- Simple background tasks (email, notifications, reports)
- Redis is already in the stack
- Want minimal dependencies and configuration
- Tasks are independent (no complex workflows)
When NOT to Use ARQ
- Need complex workflows (chains, chords) -- use Celery
- Need RabbitMQ or SQS -- use Celery
- Need LLM workflows -- use LangGraph
- Need guaranteed delivery with dead letter queues -- use Celery
- Need per-task rate limiting -- use Celery
Canvas Workflows
Canvas Workflows
Celery canvas primitives for building complex task pipelines with chain, group, chord, and nested workflows.
Signatures
from celery import signature, chain, group, chord
# Create a reusable task signature
sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})
# Immutable signature — won't receive results from previous task
sig = process_order.si(order_id)
# Partial signature — curry arguments
partial_sig = send_email.s(subject="Order Update")
# Later: partial_sig.delay(to="user@example.com", body="...")Chains (Sequential Execution)
from celery import chain
# Tasks execute sequentially, each receiving the previous result
workflow = chain(
extract_data.s(source_id), # Returns raw_data
transform_data.s(), # Receives raw_data, returns clean_data
load_data.s(destination_id), # Receives clean_data
)
result = workflow.apply_async()
# Access results
chain_result = result.get() # Final result
parent_result = result.parent.get() # Previous task resultChain Error Handling
from celery.exceptions import Reject
@celery_app.task(bind=True, max_retries=3)
def extract_data(self, source_id: str) -> dict:
try:
return fetch_from_source(source_id)
except ConnectionError as exc:
# Retryable: chain pauses, retries this step
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
except ValidationError as exc:
# Non-retryable: reject and stop entire chain
raise Reject(str(exc), requeue=False)
# Error callback for the whole chain
def create_etl_pipeline(source_id: str, dest: str) -> AsyncResult:
pipeline = chain(
extract_data.s(source_id),
transform_data.s(schema="v2"),
load_data.s(destination=dest),
)
return pipeline.apply_async(
link_error=handle_pipeline_error.s(source_id=source_id)
)
@celery_app.task
def handle_pipeline_error(request, exc, traceback, source_id: str):
alert_team(f"ETL failed for {source_id}: {exc}")Groups (Parallel Execution)
from celery import group
# Execute tasks in parallel
parallel = group(
process_chunk.s(chunk) for chunk in chunks
)
group_result = parallel.apply_async()
# Wait for all to complete
results = group_result.get()
# Check completion status
group_result.ready() # All completed?
group_result.successful() # All succeeded?
group_result.failed() # Any failed?Chords (Parallel + Callback)
from celery import chord
# Parallel execution with callback when ALL complete
workflow = chord(
[process_chunk.s(chunk) for chunk in chunks],
aggregate_results.s() # Receives list of all results
)
result = workflow.apply_async()
# IMPORTANT: If any header task fails, callback won't runChord with Partial Failure Tolerance
@celery_app.task(bind=True)
def process_chunk(self, chunk_id: int, data: list) -> dict:
"""Return error dict instead of raising to allow chord to complete."""
try:
result = expensive_computation(data)
return {"chunk_id": chunk_id, "status": "success", "result": result}
except Exception as exc:
return {"chunk_id": chunk_id, "status": "error", "error": str(exc)}
@celery_app.task
def aggregate_with_errors(results: list[dict]) -> dict:
"""Aggregate results, handling partial failures gracefully."""
successes = [r for r in results if r["status"] == "success"]
failures = [r for r in results if r["status"] == "error"]
return {
"total": len(results),
"succeeded": len(successes),
"failed": len(failures),
"aggregated": sum(r["result"] for r in successes),
"failed_chunks": [r["chunk_id"] for r in failures],
}Map, Starmap, and Chunks
# Map: apply same task to each item
workflow = process_item.map([item1, item2, item3])
# Starmap: unpack args for each call
workflow = send_email.starmap([
("user1@example.com", "Subject 1"),
("user2@example.com", "Subject 2"),
])
# Chunks: split large list into batches
workflow = process_item.chunks(items, batch_size=100)Nested Workflows
def create_order_workflow(order_id: str) -> AsyncResult:
"""Complex workflow combining chain, group, and immutable signatures."""
return chain(
# Step 1: Validate
validate_order.s(order_id),
# Step 2: Parallel inventory checks (group inside chain)
group(
check_inventory.s(item_id)
for item_id in get_order_items(order_id)
),
# Step 3: Aggregate inventory results
aggregate_inventory.s(),
# Step 4: Payment (ignores input from aggregate)
process_payment.si(order_id),
# Step 5: Parallel notifications
group(
send_confirmation_email.si(order_id),
send_sms_notification.si(order_id),
update_analytics.si(order_id),
),
).apply_async()Result Inspection
from celery.result import AsyncResult, GroupResult
def inspect_chain_result(task_id: str) -> dict:
"""Traverse chain results from leaf to root."""
result = AsyncResult(task_id)
chain_results = []
current = result
while current is not None:
chain_results.append({
"task_id": current.id,
"state": current.state,
"result": current.result if current.ready() else None,
})
current = current.parent
return {"results": list(reversed(chain_results))}Canvas Best Practices
| Pattern | When to Use | Key Consideration |
|---|---|---|
| Chain | Sequential steps | Use si() for steps that ignore input |
| Group | Parallel independent tasks | Monitor memory with large groups |
| Chord | Fan-out/fan-in | Callback runs only if ALL succeed |
| Starmap | Same task, different args | More efficient than group |
| Chunks | Large datasets | Balance chunk size vs overhead |
Error Recovery Strategies
- Retry transient: Use
autoretry_forwith backoff - Reject permanent: Use
Reject(requeue=False)to stop chain - Soft fail: Return error dict instead of raising (for chords)
- Link error: Use
link_errorcallback for notifications
Celery Config
Celery Configuration
Production Celery app setup with secure defaults, broker configuration, and worker tuning.
Application Setup
# backend/app/workers/celery_app.py
from celery import Celery
celery_app = Celery(
"myapp",
broker="redis://redis:6379/0",
backend="redis://redis:6379/1",
)
celery_app.conf.update(
# Serialization — JSON only (never pickle)
task_serializer="json",
accept_content=["json"],
result_serializer="json",
# Timezone
timezone="UTC",
enable_utc=True,
# Task tracking
task_track_started=True,
# Time limits (seconds)
task_time_limit=600, # 10 min hard kill
task_soft_time_limit=540, # 9 min soft limit (raises SoftTimeLimitExceeded)
# Worker behavior
worker_prefetch_multiplier=1, # Fair task distribution
worker_max_tasks_per_child=1000, # Restart worker after N tasks (leak protection)
# Reliability
task_acks_late=True, # Ack after completion (not before)
task_reject_on_worker_lost=True, # Re-queue if worker dies mid-task
# Result backend
result_expires=86400, # 24 hours
result_backend_transport_options={
"global_keyprefix": "celery_result:",
},
)Broker Configuration
# Redis broker options
celery_app.conf.broker_transport_options = {
"visibility_timeout": 43200, # 12 hours (must exceed longest task)
"retry_policy": {
"timeout": 5.0,
},
"max_retries": 3,
}
# Connection pool
celery_app.conf.broker_pool_limit = 10
celery_app.conf.broker_connection_retry_on_startup = TrueWorker Tuning
# Production worker startup
celery -A app worker \
--loglevel=INFO \
--concurrency=4 \
--prefetch-multiplier=1 \
--max-tasks-per-child=1000 \
--without-heartbeat \
--without-gossip \
--without-mingle \
-OfairConcurrency Guidelines
| Workload Type | Concurrency | Prefetch | Notes |
|---|---|---|---|
| CPU-bound | N cores | 1 | Process pool |
| I/O-bound | 2-4x cores | 1-4 | Gevent/eventlet or prefork |
| Mixed | N cores | 1 | Process pool, fair scheduling |
| Bulk/batch | N cores | 4-8 | Higher prefetch for throughput |
Docker Compose
# docker-compose.yml
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis-data:/data
celery-worker:
build: .
command: celery -A app worker --loglevel=INFO --concurrency=4
depends_on:
- redis
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
celery-beat:
build: .
command: celery -A app beat --loglevel=INFO
depends_on:
- redis
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
volumes:
redis-data:Configuration Anti-Patterns
# NEVER use pickle serializer (security risk)
celery_app.conf.task_serializer = "pickle" # FORBIDDEN
# NEVER disable late ack in production
celery_app.conf.task_acks_late = False # Tasks lost on crash
# NEVER set visibility_timeout shorter than longest task
celery_app.conf.broker_transport_options = {
"visibility_timeout": 60, # Task re-dispatched if still running!
}
# NEVER skip time limits
# Without limits, a hung task blocks the worker slot foreverMonitoring Health
Monitoring & Health Checks
Production monitoring with Flower, Celery signals, health checks, and Prometheus metrics.
Flower Dashboard
# Install and run Flower
pip install flower
# Basic setup
celery -A app flower --port=5555 --basic_auth=admin:password
# With persistent storage
celery -A app flower --persistent=True --db=flower.db
# With Prometheus metrics endpoint
celery -A app flower --port=5555 --broker_api=redis://redis:6379/0Celery Signal Metrics
from celery import signals
from prometheus_client import Counter, Histogram, Gauge
import time
# Counters
tasks_started = Counter(
"celery_tasks_started_total",
"Tasks started",
["task_name"],
)
tasks_completed = Counter(
"celery_tasks_completed_total",
"Tasks completed",
["task_name", "state"],
)
tasks_failed = Counter(
"celery_tasks_failed_total",
"Tasks failed",
["task_name", "exception_type"],
)
# Histogram
task_duration = Histogram(
"celery_task_duration_seconds",
"Task execution duration",
["task_name"],
buckets=[0.1, 0.5, 1, 5, 10, 30, 60, 300, 600],
)
# Gauge
active_tasks = Gauge(
"celery_active_tasks",
"Currently executing tasks",
["task_name"],
)
# Wire up signals
@signals.task_prerun.connect
def on_task_start(sender, task_id, task, args, kwargs, **_):
tasks_started.labels(task_name=task.name).inc()
active_tasks.labels(task_name=task.name).inc()
# Store start time in task request
task.request._start_time = time.time()
@signals.task_postrun.connect
def on_task_complete(sender, task_id, task, args, kwargs, retval, state, **_):
tasks_completed.labels(task_name=task.name, state=state).inc()
active_tasks.labels(task_name=task.name).dec()
# Record duration
start_time = getattr(task.request, "_start_time", None)
if start_time:
duration = time.time() - start_time
task_duration.labels(task_name=task.name).observe(duration)
@signals.task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **_):
tasks_failed.labels(
task_name=sender.name,
exception_type=type(exception).__name__,
).inc()
# Alert on critical task failures
if sender.name.startswith("tasks.payment"):
alerting.send_alert(
f"Payment task {sender.name} failed: {exception}",
severity="critical",
)
@signals.task_retry.connect
def on_task_retry(sender, reason, **kwargs):
tasks_completed.labels(task_name=sender.name, state="RETRY").inc()Health Check Endpoint
from celery import current_app
def celery_health_check() -> dict:
"""Comprehensive Celery health check."""
try:
# Check broker connection
conn = current_app.connection()
conn.ensure_connection(max_retries=3)
# Check workers responding
inspector = current_app.control.inspect()
active_workers = inspector.active()
if not active_workers:
return {"status": "unhealthy", "reason": "No active workers"}
# Check queue depths
import redis
r = redis.from_url(REDIS_URL)
queue_depths = {}
for queue in ["critical", "high", "default", "low"]:
queue_depths[queue] = r.llen(queue)
return {
"status": "healthy",
"workers": list(active_workers.keys()),
"active_tasks": sum(len(tasks) for tasks in active_workers.values()),
"queue_depths": queue_depths,
}
except Exception as e:
return {"status": "unhealthy", "reason": str(e)}
# FastAPI health endpoint
@router.get("/health/celery")
async def celery_health():
result = celery_health_check()
status_code = 200 if result["status"] == "healthy" else 503
return JSONResponse(content=result, status_code=status_code)Worker Inspection
from celery import current_app
def inspect_workers() -> dict:
"""Get detailed worker status."""
inspector = current_app.control.inspect()
return {
"active": inspector.active(), # Currently executing
"reserved": inspector.reserved(), # Prefetched, waiting
"scheduled": inspector.scheduled(), # ETA/countdown tasks
"registered": inspector.registered_tasks(), # Available task types
"stats": inspector.stats(), # Worker statistics
}Queue Depth Monitoring
import redis
from prometheus_client import Gauge
queue_depth_gauge = Gauge(
"celery_queue_depth",
"Number of pending tasks in queue",
["queue_name"],
)
def update_queue_metrics(redis_url: str):
"""Update queue depth metrics (call periodically)."""
r = redis.from_url(redis_url)
for queue in ["critical", "high", "default", "low", "bulk"]:
depth = r.llen(queue)
queue_depth_gauge.labels(queue_name=queue).set(depth)
# Schedule this as a periodic task
@celery_app.task
def collect_queue_metrics():
update_queue_metrics(REDIS_URL)
celery_app.conf.beat_schedule["collect-metrics"] = {
"task": "tasks.collect_queue_metrics",
"schedule": 30.0, # Every 30 seconds
}Alerting Rules
# Alert configuration
ALERT_RULES = {
"queue_depth_critical": {
"queue": "critical",
"threshold": 100,
"severity": "critical",
"message": "Critical queue depth exceeds {depth}",
},
"queue_depth_default": {
"queue": "default",
"threshold": 5000,
"severity": "warning",
"message": "Default queue backing up: {depth} pending",
},
"worker_down": {
"min_workers": 1,
"severity": "critical",
"message": "No active Celery workers",
},
}
def check_alerts(health: dict):
"""Evaluate alert rules against current health."""
alerts = []
for rule_name, rule in ALERT_RULES.items():
if "queue" in rule:
depth = health.get("queue_depths", {}).get(rule["queue"], 0)
if depth > rule["threshold"]:
alerts.append({
"rule": rule_name,
"severity": rule["severity"],
"message": rule["message"].format(depth=depth),
})
elif "min_workers" in rule:
worker_count = len(health.get("workers", []))
if worker_count < rule["min_workers"]:
alerts.append({
"rule": rule_name,
"severity": rule["severity"],
"message": rule["message"],
})
return alertsKey Metrics Summary
| Metric | Type | Description |
|---|---|---|
| Queue depth | Gauge | Pending tasks per queue |
| Task duration | Histogram | p50/p95/p99 execution time |
| Tasks started | Counter | Total tasks dispatched |
| Tasks completed | Counter | Total tasks finished (by state) |
| Tasks failed | Counter | Total failures (by exception type) |
| Active tasks | Gauge | Currently executing tasks |
| Worker count | Gauge | Number of active workers |
Result Backends
Result Backends
Task result storage, custom states, progress tracking, and rate limiting patterns.
Redis Result Backend
# celery_config.py
celery_app.conf.update(
result_backend="redis://redis:6379/1",
result_serializer="json",
result_expires=86400, # 24 hours
result_backend_transport_options={
"global_keyprefix": "celery_result:",
},
)Custom Task States
from celery import states
# Define custom states
VALIDATING = "VALIDATING"
PROCESSING = "PROCESSING"
UPLOADING = "UPLOADING"
@celery_app.task(bind=True)
def long_running_task(self, data: dict) -> dict:
"""Task with progress tracking via custom states."""
self.update_state(
state=VALIDATING,
meta={"step": 1, "total": 3, "description": "Validating input"},
)
validated = validate(data)
self.update_state(
state=PROCESSING,
meta={"step": 2, "total": 3, "description": "Processing data"},
)
result = process(validated)
self.update_state(
state=UPLOADING,
meta={"step": 3, "total": 3, "description": "Uploading results"},
)
upload(result)
return {"status": "complete", "url": result.url}Progress Tracking API
from celery.result import AsyncResult
@router.get("/api/v1/jobs/{job_id}")
async def get_job_status(job_id: str) -> dict:
"""Query task progress and result."""
result = AsyncResult(job_id, app=celery_app)
response = {
"job_id": job_id,
"status": result.status,
}
if result.status == "PROGRESS" or result.status in (VALIDATING, PROCESSING, UPLOADING):
response["progress"] = result.info
elif result.ready():
if result.successful():
response["result"] = result.result
else:
response["error"] = str(result.result)
return response
# Batch status check
@router.post("/api/v1/jobs/batch-status")
async def batch_job_status(job_ids: list[str]) -> dict:
"""Check status of multiple jobs at once."""
statuses = {}
for job_id in job_ids:
result = AsyncResult(job_id, app=celery_app)
statuses[job_id] = {
"status": result.status,
"ready": result.ready(),
}
return {"jobs": statuses}Large Result Handling
import boto3
@celery_app.task(bind=True)
def generate_report(self, report_id: str) -> dict:
"""Store large results in S3, return reference only."""
self.update_state(state="PROCESSING", meta={"step": "generating"})
# Generate large report
pdf_bytes = render_pdf(report_id)
# Store in S3 (NEVER store large data in Redis result backend)
s3 = boto3.client("s3")
s3_key = f"reports/{report_id}/{datetime.now().isoformat()}.pdf"
s3.put_object(
Bucket="reports-bucket",
Key=s3_key,
Body=pdf_bytes,
ContentType="application/pdf",
)
# Return only the reference
return {
"report_id": report_id,
"s3_key": s3_key,
"size_bytes": len(pdf_bytes),
"generated_at": datetime.now(timezone.utc).isoformat(),
}Job Status Lifecycle
PENDING → STARTED → SUCCESS
→ FAILURE
→ RETRY → STARTED → ...
→ REVOKED
Custom states:
PENDING → STARTED → VALIDATING → PROCESSING → UPLOADING → SUCCESSRate Limiting
Static Rate Limits
@celery_app.task(rate_limit="100/m") # 100 per minute
def call_external_api(endpoint: str) -> dict:
return requests.get(endpoint, timeout=30).json()
@celery_app.task(rate_limit="10/s") # 10 per second
def send_push_notification(user_id: str, message: str):
push_service.send(user_id, message)Dynamic Rate Limiting
from celery import current_app
class RateLimitManager:
def __init__(self, app):
self.app = app
self.defaults = {
"tasks.call_external_api": "100/m",
"tasks.send_notification": "10/s",
}
def adjust_rate(self, task_name: str, new_rate: str, workers=None):
self.app.control.rate_limit(task_name, new_rate, destination=workers)
def reduce_for_high_load(self, factor: float = 0.5):
for task, rate in self.defaults.items():
value, unit = int(rate.split("/")[0]), rate.split("/")[1]
new_rate = f"{int(value * factor)}/{unit}"
self.adjust_rate(task, new_rate)
def restore_defaults(self):
for task, rate in self.defaults.items():
self.adjust_rate(task, rate)Token Bucket Rate Limiter
from celery import Task
import time
class TokenBucketTask(Task):
"""Distributed token bucket rate limiting."""
abstract = True
rate_limit_key: str = None
tokens_per_second: float = 10.0
bucket_size: int = 100
def acquire_token(self) -> bool:
key = f"rate_limit:{self.rate_limit_key}"
now = time.time()
# Atomic Lua script for token bucket
script = """
local key = KEYS[1]
local now = tonumber(ARGV[1])
local rate = tonumber(ARGV[2])
local capacity = tonumber(ARGV[3])
local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
local tokens = tonumber(bucket[1]) or capacity
local last_update = tonumber(bucket[2]) or now
local elapsed = now - last_update
tokens = math.min(capacity, tokens + elapsed * rate)
if tokens >= 1 then
tokens = tokens - 1
redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
redis.call('EXPIRE', key, 3600)
return 1
end
return 0
"""
return bool(redis_client.eval(script, 1, key, now, self.tokens_per_second, self.bucket_size))
def __call__(self, *args, **kwargs):
if not self.acquire_token():
countdown = 2 ** min(self.request.retries, 6)
raise self.retry(countdown=countdown)
return super().__call__(*args, **kwargs)
# Usage
@celery_app.task(
base=TokenBucketTask,
bind=True,
rate_limit_key="stripe_api",
tokens_per_second=25,
bucket_size=100,
)
def charge_customer(self, customer_id: str, amount: int):
return stripe.PaymentIntent.create(customer=customer_id, amount=amount, currency="usd")Rate Limit Patterns Summary
| Pattern | Use Case | Implementation |
|---|---|---|
| Static | Simple API quotas | @task(rate_limit="100/m") |
| Dynamic | Adaptive to load | app.control.rate_limit() |
| Token bucket | Smooth burst handling | Custom Task base class |
| Per-user | Multi-tenant fairness | User-keyed counters |
Retry Strategies
Retry Strategies
Exponential backoff, idempotency, dead letter queues, and task locking patterns.
Exponential Backoff with Jitter
from celery import shared_task
@shared_task(
bind=True,
max_retries=5,
autoretry_for=(ConnectionError, TimeoutError),
retry_backoff=True, # Exponential: 1s, 2s, 4s, 8s, 16s
retry_backoff_max=600, # Cap at 10 minutes
retry_jitter=True, # Randomize to prevent thundering herd
)
def call_external_api(self, endpoint: str) -> dict:
"""Task with automatic retry on transient failures."""
response = requests.get(endpoint, timeout=30)
response.raise_for_status()
return response.json()Manual Retry with Custom Delay
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, to: str, subject: str, body: str) -> dict:
"""Manual retry with explicit error handling."""
try:
response = requests.post(
"https://api.sendgrid.com/v3/mail/send",
json={"to": to, "subject": subject, "html": body},
headers={"Authorization": f"Bearer {SENDGRID_KEY}"},
timeout=30,
)
response.raise_for_status()
return {"status": "sent", "to": to}
except requests.ConnectionError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 10)
except requests.HTTPError as exc:
if exc.response.status_code >= 500:
raise self.retry(exc=exc) # Server error: retry
raise # Client error: fail immediatelyBase Task with Retry Configuration
from celery import Task
class RetryableTask(Task):
"""Reusable base task with exponential backoff."""
abstract = True
autoretry_for = (ConnectionError, TimeoutError)
max_retries = 5
retry_backoff = True
retry_backoff_max = 600
retry_jitter = True
class DatabaseTask(Task):
"""Base task with database session management."""
abstract = True
_db = None
@property
def db(self):
if self._db is None:
self._db = create_session()
return self._db
def after_return(self, status, retval, task_id, args, kwargs, einfo):
if self._db:
self._db.close()
self._db = None
# Usage
@celery_app.task(base=RetryableTask)
def call_flaky_api(endpoint: str):
return requests.get(endpoint, timeout=30).json()
@celery_app.task(base=DatabaseTask)
def query_database(query: str):
return query_database.db.execute(query)Idempotency Protection
@shared_task(bind=True, max_retries=3, retry_backoff=True)
def process_payment(self, payment_id: str, amount: int) -> dict:
"""Idempotent task — safe to retry without duplicates."""
idempotency_key = f"processed:payment:{payment_id}"
# Check if already processed
if redis_client.get(idempotency_key):
return {"status": "already_processed", "payment_id": payment_id}
try:
# Process payment
result = stripe.PaymentIntent.create(
customer=get_customer(payment_id),
amount=amount,
currency="usd",
idempotency_key=f"celery:{payment_id}", # Stripe-level idempotency
)
# Mark as processed with 24h TTL
redis_client.setex(idempotency_key, 86400, "1")
return {"status": "success", "payment_id": payment_id, "intent_id": result.id}
except stripe.error.RateLimitError as exc:
raise self.retry(exc=exc, countdown=60)
except stripe.error.InvalidRequestError:
raise # Non-retryableDead Letter Queue
from celery import signals
@shared_task(
bind=True,
max_retries=3,
retry_backoff=True,
)
def important_task(self, data: dict) -> dict:
"""Task that moves to DLQ after exhausting retries."""
try:
return do_processing(data)
except Exception as exc:
if self.request.retries >= self.max_retries:
move_to_dlq(self, data, exc)
raise # Let Celery mark as failed
raise self.retry(exc=exc)
def move_to_dlq(task, data: dict, error: Exception):
"""Move failed task to dead letter queue for manual review."""
from datetime import datetime, timezone
dlq_entry = {
"task_name": task.name,
"task_id": task.request.id,
"args": data,
"error": str(error),
"retries": task.request.retries,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
redis_client.lpush(f"dlq:{task.name}", json.dumps(dlq_entry))
# DLQ processing
def process_dlq(task_name: str, limit: int = 10):
"""Process dead letter queue entries."""
dlq_key = f"dlq:{task_name}"
entries = []
for _ in range(limit):
raw = redis_client.rpop(dlq_key)
if not raw:
break
entries.append(json.loads(raw))
return entriesTask Locking (Singleton Tasks)
@shared_task(bind=True)
def singleton_task(self, resource_id: str) -> dict:
"""Only one instance runs per resource at a time."""
lock_key = f"lock:{self.name}:{resource_id}"
if not redis_client.set(lock_key, "1", nx=True, ex=300):
return {"status": "already_running", "resource_id": resource_id}
try:
result = do_exclusive_work(resource_id)
return {"status": "completed", "result": result}
finally:
redis_client.delete(lock_key)Retry Pattern Summary
| Pattern | Use Case | Implementation |
|---|---|---|
| Auto retry | Transient errors | autoretry_for + retry_backoff |
| Manual retry | Conditional retry | self.retry(exc=exc, countdown=N) |
| Idempotency | Payment, creation | Redis key check before processing |
| DLQ | Failed after retries | Move to Redis list for review |
| Singleton | Exclusive tasks | Redis lock with TTL |
| Base class | Shared config | Custom Task subclass |
Scheduled Tasks
Scheduled Tasks
Celery Beat periodic task configuration with crontab, database-backed schedules, and overlap prevention.
Basic Beat Schedule
# celery_config.py
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
# Interval-based (every N seconds)
"process-pending-orders": {
"task": "tasks.process_pending_orders",
"schedule": 300.0, # Every 5 minutes
},
# Daily at specific time
"daily-report": {
"task": "tasks.generate_daily_report",
"schedule": crontab(hour=6, minute=0),
},
# Weekly
"weekly-cleanup": {
"task": "tasks.cleanup_old_records",
"schedule": crontab(hour=2, minute=0, day_of_week="sunday"),
},
# Monthly
"monthly-billing": {
"task": "tasks.process_monthly_billing",
"schedule": crontab(hour=0, minute=0, day_of_month=1),
},
# With arguments
"sync-inventory": {
"task": "tasks.sync_inventory",
"schedule": crontab(minute="*/15"),
"args": ("warehouse-1",),
"kwargs": {"full_sync": False},
},
}
celery_app.conf.timezone = "UTC"Crontab Reference
from celery.schedules import crontab
SCHEDULES = {
# Every minute
"every_minute": crontab(),
# Every hour at :00
"hourly": crontab(minute=0),
# Daily at midnight
"daily_midnight": crontab(hour=0, minute=0),
# Weekdays at 9 AM
"weekday_morning": crontab(hour=9, minute=0, day_of_week="mon-fri"),
# Every 15 min during business hours
"business_hours": crontab(
minute="*/15",
hour="9-17",
day_of_week="mon-fri",
),
# First Monday of each month
"first_monday": crontab(
hour=0, minute=0,
day_of_week="monday",
day_of_month="1-7",
),
# Quarterly (Jan 1, Apr 1, Jul 1, Oct 1)
"quarterly": crontab(
hour=0, minute=0,
day_of_month=1,
month_of_year="1,4,7,10",
),
}Database-Backed Schedules
# Using django-celery-beat for dynamic schedules
# pip install django-celery-beat
# settings.py
INSTALLED_APPS = [..., "django_celery_beat"]
# celery_config.py
celery_app.conf.beat_scheduler = "django_celery_beat.schedulers:DatabaseScheduler"
# Create schedules programmatically
from django_celery_beat.models import PeriodicTask, CrontabSchedule
def create_user_report_schedule(user_id: str, hour: int):
"""Create user-specific scheduled report."""
schedule, _ = CrontabSchedule.objects.get_or_create(
hour=hour, minute=0, timezone="UTC",
)
PeriodicTask.objects.update_or_create(
name=f"user-report-{user_id}",
defaults={
"task": "tasks.generate_user_report",
"crontab": schedule,
"args": json.dumps([user_id]),
"enabled": True,
},
)
def disable_schedule(name: str):
PeriodicTask.objects.filter(name=name).update(enabled=False)Overlap Prevention
import redis
from contextlib import contextmanager
redis_client = redis.from_url(REDIS_URL)
@contextmanager
def schedule_lock(task_name: str, timeout: int = 3600):
"""Prevent overlapping scheduled task runs."""
lock_key = f"schedule_lock:{task_name}"
lock = redis_client.lock(lock_key, timeout=timeout)
acquired = lock.acquire(blocking=False)
if not acquired:
raise ScheduleLockError(f"Task {task_name} already running")
try:
yield
finally:
lock.release()
@celery_app.task(bind=True)
def long_running_scheduled_task(self):
"""Scheduled task that must not overlap."""
with schedule_lock(self.name, timeout=7200):
perform_long_operation()
# Alternative: simple Redis lock
@celery_app.task(bind=True, acks_late=True, reject_on_worker_lost=True)
def exclusive_scheduled_task(self):
lock_id = f"{self.name}-lock"
if not redis_client.set(lock_id, "1", nx=True, ex=3600):
return {"status": "skipped", "reason": "already running"}
try:
return perform_operation()
finally:
redis_client.delete(lock_id)Adaptive Polling
from celery import shared_task
@shared_task(bind=True)
def adaptive_poll(self, resource_id: str):
"""Self-rescheduling task with adaptive interval."""
result = check_resource(resource_id)
if result.has_changes:
# Activity detected: poll more frequently
self.apply_async(args=(resource_id,), countdown=30)
else:
# No activity: exponential backoff
current_delay = self.request.kwargs.get("delay", 30)
next_delay = min(current_delay * 2, 300) # Max 5 min
self.apply_async(
args=(resource_id,),
kwargs={"delay": next_delay},
countdown=next_delay,
)
return resultRunning Beat
# Standalone beat process (recommended for production)
celery -A app beat --loglevel=INFO
# With database scheduler
celery -A app beat --scheduler django_celery_beat.schedulers:DatabaseScheduler
# With PID file for process management
celery -A app beat --pidfile=/var/run/celery/beat.pid
# Embedded beat in worker (development only)
celery -A app worker --beat --loglevel=INFOBeat Monitoring
from celery import signals
from prometheus_client import Counter
beat_sent = Counter(
"celery_beat_tasks_sent_total",
"Scheduled tasks sent by beat",
["task_name"],
)
@signals.beat_sent.connect
def on_beat_sent(sender, task_id, task, args, kwargs, **_):
beat_sent.labels(task_name=task.name).inc()Best Practices
| Practice | Reason |
|---|---|
| Use UTC timezone | Avoid DST issues |
| Add schedule locks | Prevent overlap for long tasks |
| Use database scheduler | Dynamic schedule management |
| Monitor beat health | Detect missed schedules |
| Separate beat process | Better reliability than embedded |
| Set task time limits | Prevent runaway scheduled tasks |
Task Routing
Task Routing & Priority Queues
Multi-queue configuration, priority levels, and dynamic routing for Celery 5.x.
Queue Definition
# celery_config.py
from kombu import Queue, Exchange
default_exchange = Exchange("default", type="direct")
priority_exchange = Exchange("priority", type="direct")
celery_app.conf.task_queues = (
Queue(
"critical",
exchange=priority_exchange,
routing_key="critical",
queue_arguments={"x-max-priority": 10},
),
Queue(
"high",
exchange=priority_exchange,
routing_key="high",
queue_arguments={"x-max-priority": 10},
),
Queue(
"default",
exchange=default_exchange,
routing_key="default",
queue_arguments={"x-max-priority": 10},
),
Queue(
"low",
exchange=default_exchange,
routing_key="low",
),
Queue(
"bulk",
exchange=default_exchange,
routing_key="bulk",
# No priority for bulk — FIFO is fine
),
)
celery_app.conf.task_default_queue = "default"
celery_app.conf.task_default_priority = 5
# Redis priority support (required)
celery_app.conf.broker_transport_options = {
"priority_steps": list(range(10)), # 0-9 priority levels
"sep": ":",
"queue_order_strategy": "priority",
}Static Task Routing
# Route by task name pattern
celery_app.conf.task_routes = {
"tasks.payment.*": {"queue": "critical"},
"tasks.notification.*": {"queue": "high"},
"tasks.analytics.*": {"queue": "low"},
"tasks.report.*": {"queue": "bulk"},
"tasks.*": {"queue": "default"},
}
# Route by task decorator
@celery_app.task(queue="critical", priority=9)
def process_payment(payment_id: str):
"""Always runs on critical queue."""
pass
# Route at call time
process_order.apply_async(args=[order_id], queue="high", priority=8)Dynamic Router Class
import fnmatch
from enum import IntEnum
class TaskPriority(IntEnum):
BULK = 0
LOW = 2
NORMAL = 5
HIGH = 7
CRITICAL = 9
class TaskRouter:
"""Route tasks based on name, args, and kwargs."""
ROUTES = {
"tasks.payment.*": {"queue": "critical", "priority": TaskPriority.CRITICAL},
"tasks.notification.*": {"queue": "high", "priority": TaskPriority.HIGH},
"tasks.analytics.*": {"queue": "low", "priority": TaskPriority.LOW},
"tasks.report.*": {"queue": "bulk", "priority": TaskPriority.BULK},
}
def route_for_task(self, task, args=None, kwargs=None):
# Check kwargs for urgency override
if kwargs and kwargs.get("urgent"):
return {"queue": "critical", "priority": TaskPriority.CRITICAL}
# Match by task name pattern
for pattern, route in self.ROUTES.items():
if fnmatch.fnmatch(task, pattern):
return route
return {"queue": "default", "priority": TaskPriority.NORMAL}
celery_app.conf.task_routes = [TaskRouter()]User-Tier Priority
def submit_with_priority(
task,
args: tuple = (),
kwargs: dict = None,
user_tier: str = "standard",
is_urgent: bool = False,
) -> AsyncResult:
"""Submit task with priority based on user tier."""
kwargs = kwargs or {}
if is_urgent:
priority, queue = TaskPriority.CRITICAL, "critical"
elif user_tier == "enterprise":
priority, queue = TaskPriority.CRITICAL, "critical"
elif user_tier == "premium":
priority, queue = TaskPriority.HIGH, "high"
else:
priority, queue = TaskPriority.NORMAL, "default"
return task.apply_async(
args=args,
kwargs=kwargs,
queue=queue,
priority=priority,
)Per-Queue Worker Configuration
# Critical: low latency, high concurrency, one-at-a-time prefetch
celery -A app worker -Q critical -c 8 --prefetch-multiplier=1 --hostname=critical@%h
# High priority: balanced
celery -A app worker -Q high -c 4 --prefetch-multiplier=2 --hostname=high@%h
# Default: standard processing
celery -A app worker -Q default -c 4 --prefetch-multiplier=4 --hostname=default@%h
# Low/Bulk: high throughput, batch prefetch
celery -A app worker -Q low,bulk -c 2 --prefetch-multiplier=8 --hostname=bulk@%hQueue Depth Monitoring
import redis
def get_queue_stats(redis_url: str) -> dict:
"""Get queue depths for monitoring and autoscaling."""
r = redis.from_url(redis_url)
queues = ["critical", "high", "default", "low", "bulk"]
stats = {}
for queue in queues:
depth = r.llen(queue)
stats[queue] = {
"depth": depth,
"alert": depth > 1000,
}
return stats
def should_scale_workers(queue: str, threshold: int = 500) -> bool:
"""Check if queue depth warrants worker scaling."""
stats = get_queue_stats(REDIS_URL)
return stats.get(queue, {}).get("depth", 0) > thresholdConfiguration Summary
| Setting | Purpose | Recommended |
|---|---|---|
x-max-priority | Enable queue priority | 10 |
priority_steps | Redis priority levels | range(10) |
queue_order_strategy | Redis priority mode | "priority" |
prefetch_multiplier | Tasks per worker fetch | 1 (critical), 4-8 (bulk) |
| Queue count | Separation granularity | 3-5 queues |
Assess
Assesses and rates quality 0-10 with pros/cons analysis. Use when evaluating code, designs, or approaches.
Audit Full
Full-codebase audit using 1M context window. Security, architecture, and dependency analysis in a single pass. Use when you need whole-project analysis.
Last updated on