Skip to main content
OrchestKit v6.7.1 — 67 skills, 38 agents, 77 hooks with Opus 4.6 support
OrchestKit
Skills

Async Jobs

Async job processing patterns for background tasks, Celery workflows, task scheduling, retry strategies, and distributed task execution. Use when implementing background job processing, task queues, or scheduled task systems.

Reference medium

Primary Agent: python-performance-engineer

Async Jobs

Patterns for background task processing with Celery, ARQ, and Redis. Covers task queues, canvas workflows, scheduling, retry strategies, rate limiting, and production monitoring. Each category has individual rule files in references/ loaded on-demand.

Quick Reference

CategoryRulesImpactWhen to Use
Configurationcelery-configHIGHCelery app setup, broker, serialization, worker tuning
Task Routingtask-routingHIGHPriority queues, multi-queue workers, dynamic routing
Canvas Workflowscanvas-workflowsHIGHChain, group, chord, nested workflows
Retry Strategiesretry-strategiesHIGHExponential backoff, idempotency, dead letter queues
Schedulingscheduled-tasksMEDIUMCelery Beat, crontab, database-backed schedules
Monitoringmonitoring-healthMEDIUMFlower, custom events, health checks, metrics
Result Backendsresult-backendsMEDIUMRedis results, custom states, progress tracking
ARQ Patternsarq-patternsMEDIUMAsync Redis Queue for FastAPI, lightweight jobs
Temporal Workflowstemporal-workflowsHIGHDurable workflow definitions, sagas, signals, queries
Temporal Activitiestemporal-activitiesHIGHActivity patterns, workers, heartbeats, testing

Total: 10 rules across 9 categories

Quick Start

# Celery task with retry
from celery import shared_task

@shared_task(
    bind=True,
    max_retries=3,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,
)
def process_order(self, order_id: str) -> dict:
    result = do_processing(order_id)
    return {"order_id": order_id, "status": "completed"}
# ARQ task with FastAPI
from arq import create_pool
from arq.connections import RedisSettings

async def generate_report(ctx: dict, report_id: str) -> dict:
    data = await ctx["db"].fetch_report_data(report_id)
    pdf = await render_pdf(data)
    return {"report_id": report_id, "size": len(pdf)}

@router.post("/api/v1/reports")
async def create_report(data: ReportRequest, arq: ArqRedis = Depends(get_arq_pool)):
    job = await arq.enqueue_job("generate_report", data.report_id)
    return {"job_id": job.job_id}

Configuration

Production Celery app configuration with secure defaults and worker tuning.

Key Patterns

  • JSON serialization with task_serializer="json" for safety
  • Late acknowledgment with task_acks_late=True to prevent task loss on crash
  • Time limits with both task_time_limit (hard) and task_soft_time_limit (soft)
  • Fair distribution with worker_prefetch_multiplier=1
  • Reject on lost with task_reject_on_worker_lost=True

Key Decisions

DecisionRecommendation
SerializerJSON (never pickle)
Ack modeLate ack (task_acks_late=True)
Prefetch1 for fair, 4-8 for throughput
Time limitsoft < hard (e.g., 540/600)
TimezoneUTC always

Task Routing

Priority queue configuration with multi-queue workers and dynamic routing.

Key Patterns

  • Named queues for critical/high/default/low/bulk separation
  • Redis priority with queue_order_strategy: "priority" and 0-9 levels
  • Task router classes for dynamic routing based on task attributes
  • Per-queue workers with tuned concurrency and prefetch settings
  • Content-based routing for dynamic workflow dispatch

Key Decisions

DecisionRecommendation
Queue count3-5 (critical/high/default/low/bulk)
Priority levels0-9 with Redis x-max-priority
Worker assignmentDedicated workers per queue
Prefetch1 for critical, 4-8 for bulk
RoutingRouter class for 5+ routing rules

Canvas Workflows

Celery canvas primitives for sequential, parallel, and fan-in/fan-out workflows.

Key Patterns

  • Chain for sequential ETL pipelines with result passing
  • Group for parallel execution of independent tasks
  • Chord for fan-out/fan-in with aggregation callback
  • Immutable signatures (si()) for steps that ignore input
  • Nested workflows combining groups inside chains
  • Link error callbacks for workflow-level error handling

Key Decisions

DecisionRecommendation
SequentialChain with s()
ParallelGroup for independent tasks
Fan-inChord (all must succeed for callback)
Ignore inputUse si() immutable signature
Error in chainReject stops chain, retry continues
Partial failuresReturn error dict in chord tasks

Retry Strategies

Retry patterns with exponential backoff, idempotency, and dead letter queues.

Key Patterns

  • Exponential backoff with retry_backoff=True and retry_backoff_max
  • Jitter with retry_jitter=True to prevent thundering herd
  • Idempotency keys in Redis to prevent duplicate processing
  • Dead letter queues for failed tasks requiring manual review
  • Task locking to prevent concurrent execution of singleton tasks
  • Base task classes with shared retry configuration

Key Decisions

DecisionRecommendation
Retry delayExponential backoff with jitter
Max retries3-5 for transient, 0 for permanent
IdempotencyRedis key with TTL
Failed tasksDLQ for manual review
SingletonRedis lock with TTL

Scheduling

Celery Beat periodic task configuration with crontab, database-backed schedules, and overlap prevention.

Key Patterns

  • Crontab for time-based schedules (daily, weekly, monthly)
  • Interval for fixed-frequency tasks (every N seconds)
  • Database scheduler with django-celery-beat for dynamic schedules
  • Schedule locks to prevent overlapping long-running scheduled tasks
  • Adaptive polling with self-rescheduling tasks

Key Decisions

DecisionRecommendation
Schedule typeCrontab for time-based, interval for frequency
DynamicDatabase scheduler (django-celery-beat)
OverlapRedis lock with timeout
Beat processSeparate process (not embedded)
TimezoneUTC always

Monitoring

Production monitoring with Flower, custom signals, health checks, and Prometheus metrics.

Key Patterns

  • Flower dashboard for real-time task monitoring
  • Celery signals (task_prerun, task_postrun, task_failure) for metrics
  • Health check endpoint verifying broker connection and active workers
  • Queue depth monitoring for autoscaling decisions
  • Beat monitoring for scheduled task dispatch tracking

Key Decisions

DecisionRecommendation
DashboardFlower with persistent storage
MetricsPrometheus via celery signals
HealthBroker + worker + queue depth
AlertingSignal on task_failure
AutoscaleQueue depth > threshold

Result Backends

Task result storage, custom states, and progress tracking patterns.

Key Patterns

  • Redis backend for task status and small results
  • Custom task states (VALIDATING, PROCESSING, UPLOADING) for progress
  • update_state() for real-time progress reporting
  • S3/database for large result storage (never Redis)
  • AsyncResult for querying task state and progress

Key Decisions

DecisionRecommendation
Status storageRedis result backend
Large resultsS3 or database (never Redis)
ProgressCustom states with update_state()
Result queryAsyncResult with state checks

ARQ Patterns

Lightweight async Redis Queue for FastAPI and simple background tasks.

Key Patterns

  • Native async/await with arq for FastAPI integration
  • Worker lifecycle with startup/shutdown hooks for resource management
  • Job enqueue from FastAPI routes with enqueue_job()
  • Job status tracking with Job.status() and Job.result()
  • Delayed tasks with _delay=timedelta() for deferred execution

Key Decisions

DecisionRecommendation
Simple asyncARQ (native async)
Complex workflowsCelery (chains, chords)
In-process quickFastAPI BackgroundTasks
LLM workflowsLangGraph (not Celery)

Tool Selection

ToolBest ForComplexity
ARQFastAPI, simple async jobsLow
CeleryComplex workflows, enterpriseHigh
RQSimple Redis queuesLow
DramatiqReliable messagingMedium
FastAPI BackgroundTasksIn-process quick tasksMinimal

Anti-Patterns (FORBIDDEN)

# NEVER run long tasks synchronously in request handlers
@router.post("/api/v1/reports")
async def create_report(data: ReportRequest):
    pdf = await generate_pdf(data)  # Blocks for minutes!

# NEVER block on results inside tasks (causes deadlock)
@celery_app.task
def bad_task():
    result = other_task.delay()
    return result.get()  # Blocks worker!

# NEVER store large results in Redis
@shared_task
def process_file(file_id: str) -> bytes:
    return large_file_bytes  # Store in S3/DB instead!

# NEVER skip idempotency for retried tasks
@celery_app.task(max_retries=3)
def create_order(order):
    Order.create(order)  # Creates duplicates on retry!

# NEVER use BackgroundTasks for distributed work
background_tasks.add_task(long_running_job)  # Lost if server restarts

# NEVER ignore task acknowledgment settings
celery_app.conf.task_acks_late = False  # Default loses tasks on crash

# ALWAYS use immutable signatures in chords
chord([task.s(x) for x in items], callback.si())  # si() prevents arg pollution

Temporal Workflows

Durable execution engine for reliable distributed applications with Temporal.io.

Key Patterns

  • Workflow definitions with @workflow.defn and deterministic code
  • Saga pattern with compensation for multi-step transactions
  • Signals and queries for external interaction with running workflows
  • Timers with workflow.wait_condition() for human-in-the-loop
  • Parallel activities via asyncio.gather inside workflows

Key Decisions

DecisionRecommendation
Workflow IDBusiness-meaningful, idempotent
DeterminismUse workflow.random(), workflow.now()
I/OAlways via activities, never directly

Temporal Activities

Activity and worker patterns for Temporal.io I/O operations.

Key Patterns

  • Activity definitions with @activity.defn for all I/O
  • Heartbeating for long-running activities (> 60s)
  • Error classification with ApplicationError(non_retryable=True) for business errors
  • Worker configuration with dedicated task queues
  • Testing with WorkflowEnvironment.start_local()

Key Decisions

DecisionRecommendation
Activity timeoutstart_to_close for most cases
Error handlingNon-retryable for business errors
TestingWorkflowEnvironment for integration tests
  • ork:python-backend - FastAPI, asyncio, SQLAlchemy patterns
  • ork:langgraph - LangGraph workflow patterns (use for LLM workflows, not Celery)
  • ork:distributed-systems - Resilience patterns, circuit breakers
  • ork:monitoring-observability - Metrics and alerting

Capability Details

celery-config

Keywords: celery, configuration, broker, worker, setup Solves:

  • Production Celery app configuration
  • Broker and backend setup
  • Worker tuning and time limits

task-routing

Keywords: priority, queue, routing, high priority, worker Solves:

  • Premium user task prioritization
  • Multi-queue worker deployment
  • Dynamic task routing

canvas-workflows

Keywords: chain, group, chord, signature, canvas, workflow, pipeline Solves:

  • Complex multi-step task pipelines
  • Parallel task execution with aggregation
  • Sequential task dependencies

retry-strategies

Keywords: retry, backoff, idempotency, dead letter, resilience Solves:

  • Exponential backoff with jitter
  • Duplicate prevention for retried tasks
  • Failed task handling with DLQ

scheduled-tasks

Keywords: periodic, scheduled, cron, celery beat, interval Solves:

  • Run tasks on schedule (crontab)
  • Dynamic schedule management
  • Overlap prevention for long tasks

monitoring-health

Keywords: flower, monitoring, health check, metrics, alerting Solves:

  • Production task monitoring dashboard
  • Worker health checks
  • Queue depth autoscaling

result-backends

Keywords: result, state, progress, AsyncResult, status Solves:

  • Task progress tracking with custom states
  • Result storage strategies
  • Job status API endpoints

arq-patterns

Keywords: arq, async queue, redis queue, fastapi background Solves:

  • Lightweight async background tasks for FastAPI
  • Simple Redis job queue with async/await
  • Job status tracking

Rules (6)

Compose multi-step task workflows using Celery canvas primitives and chains — HIGH

Canvas Workflows

Chains (Sequential)

from celery import chain

workflow = chain(
    extract_data.s(source_id),      # Returns raw_data
    transform_data.s(),              # Receives raw_data
    load_data.s(destination_id),     # Receives clean_data
)
result = workflow.apply_async()

Groups (Parallel)

from celery import group

parallel = group(process_chunk.s(chunk) for chunk in chunks)
group_result = parallel.apply_async()
results = group_result.get()  # List of results

Chords (Parallel + Callback)

from celery import chord

workflow = chord(
    [process_chunk.s(chunk) for chunk in chunks],
    aggregate_results.s()  # Receives list of all results
)
result = workflow.apply_async()

Signatures

# Reusable task signature
sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})

# Immutable signature (won't receive results from previous task)
sig = process_order.si(order_id)

# Partial signature (curry arguments)
partial_sig = send_email.s(subject="Order Update")

Map and Starmap

workflow = process_item.map([item1, item2, item3])
workflow = send_email.starmap([("user1@ex.com", "S1"), ("user2@ex.com", "S2")])
workflow = process_item.chunks(items, batch_size=100)

Incorrect — Mutable signature in chord:

# Body receives polluted args from each parallel task
chord(
    [process_chunk.s(chunk) for chunk in chunks],
    aggregate_results.s()  # Receives all chunk results concatenated!
)

Correct — Immutable signature in chord:

# Body receives clean list of results
chord(
    [process_chunk.si(chunk) for chunk in chunks],  # .si() = immutable
    aggregate_results.s()  # Receives [result1, result2, result3]
)

Error Handling

  • Chain stops when any task fails; subsequent tasks don't run
  • If any chord header task fails, the body won't execute
  • Always use si() (immutable signatures) in chords to prevent arg pollution

Track background job status and execution metrics for operational visibility — HIGH

Job Status Tracking

Job Status Enum

from enum import Enum

class JobStatus(Enum):
    PENDING = "pending"
    STARTED = "started"
    PROGRESS = "progress"
    SUCCESS = "success"
    FAILURE = "failure"
    REVOKED = "revoked"

ARQ Status Endpoint

@router.get("/api/v1/jobs/{job_id}")
async def get_job_status(job_id: str, arq: ArqRedis = Depends(get_arq_pool)):
    job = Job(job_id, arq)
    status = await job.status()
    result = await job.result() if status == JobStatus.complete else None
    return {"job_id": job_id, "status": status, "result": result}

Celery Progress Updates

@shared_task(bind=True)
def generate_report(self, report_id: str) -> dict:
    self.update_state(state="PROGRESS", meta={"step": "fetching"})
    data = fetch_report_data(report_id)

    self.update_state(state="PROGRESS", meta={"step": "rendering"})
    pdf = render_pdf(data)

    return {"report_id": report_id, "size": len(pdf)}

Incorrect — No progress updates:

@shared_task
def generate_report(report_id: str):
    # Long-running task with no feedback
    data = fetch_report_data(report_id)
    pdf = render_pdf(data)
    return {"report_id": report_id}

Correct — Progress updates:

@shared_task(bind=True)
def generate_report(self, report_id: str):
    self.update_state(state="PROGRESS", meta={"step": "fetching", "percent": 25})
    data = fetch_report_data(report_id)

    self.update_state(state="PROGRESS", meta={"step": "rendering", "percent": 75})
    pdf = render_pdf(data)
    return {"report_id": report_id}

Celery Status Endpoint

@router.get("/api/v1/jobs/{job_id}")
async def get_job(job_id: str):
    result = AsyncResult(job_id, app=celery_app)
    return {
        "job_id": job_id,
        "status": result.status,
        "result": result.result if result.ready() else None,
        "progress": result.info if result.status == "PROGRESS" else None,
    }

Schedule reliable periodic background tasks without overlap or timing drift — HIGH

Scheduling & Background Tasks

Celery Beat (Periodic Tasks)

from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "cleanup-expired-sessions": {
        "task": "app.workers.tasks.cleanup_sessions",
        "schedule": crontab(minute=0, hour="*/6"),  # Every 6 hours
    },
    "generate-daily-report": {
        "task": "app.workers.tasks.daily_report",
        "schedule": crontab(minute=0, hour=2),  # 2 AM daily
    },
    "sync-external-data": {
        "task": "app.workers.tasks.sync_data",
        "schedule": 300.0,  # Every 5 minutes
    },
}

FastAPI BackgroundTasks (In-Process)

from fastapi import BackgroundTasks

@router.post("/api/v1/users")
async def create_user(data: UserCreate, background_tasks: BackgroundTasks):
    user = await service.create_user(data)
    background_tasks.add_task(send_welcome_email, user.email)
    return user

FastAPI + Distributed Queue

@router.post("/api/v1/exports")
async def create_export(data: ExportRequest, arq: ArqRedis = Depends(get_arq_pool)):
    job = await arq.enqueue_job("export_data", data.dict())
    return {"job_id": job.job_id}

Incorrect — Using BackgroundTasks for long jobs:

# In-process task blocks other requests
@router.post("/export")
async def create_export(background_tasks: BackgroundTasks):
    background_tasks.add_task(generate_large_export)  # 5+ minutes!
    return {"status": "started"}

Correct — Use distributed queue:

# Offload to worker, instant response
@router.post("/export")
async def create_export(arq: ArqRedis = Depends(get_arq_pool)):
    job = await arq.enqueue_job("generate_large_export")
    return {"job_id": job.job_id}

Key Decisions

ScenarioUse
Quick, non-criticalFastAPI BackgroundTasks
Periodic/scheduledCelery Beat
Distributed, durableARQ or Celery
LLM workflowsLangGraph (not Celery)

Set up task queues as the foundation for reliable background job processing — HIGH

Task Queue Setup

ARQ (Async Redis Queue)

from arq import create_pool
from arq.connections import RedisSettings

async def startup(ctx: dict):
    ctx["db"] = await create_db_pool()
    ctx["http"] = httpx.AsyncClient()

async def shutdown(ctx: dict):
    await ctx["db"].close()
    await ctx["http"].aclose()

class WorkerSettings:
    redis_settings = RedisSettings(host="redis", port=6379)
    functions = [send_email, generate_report, process_webhook]
    on_startup = startup
    on_shutdown = shutdown
    max_jobs = 10
    job_timeout = 300

ARQ Task Definition

async def send_email(ctx: dict, to: str, subject: str, body: str) -> dict:
    http = ctx["http"]
    response = await http.post("https://api.sendgrid.com/v3/mail/send",
        json={"to": to, "subject": subject, "html": body},
        headers={"Authorization": f"Bearer {SENDGRID_KEY}"})
    return {"status": response.status_code, "to": to}

Celery Setup

from celery import Celery

celery_app = Celery("orchestkit",
    broker="redis://redis:6379/0", backend="redis://redis:6379/1")

celery_app.conf.update(
    task_serializer="json",
    task_track_started=True,
    task_time_limit=600,
    task_soft_time_limit=540,
    worker_prefetch_multiplier=1,
    task_acks_late=True,
    task_reject_on_worker_lost=True,
)

Celery Task with Retry

@shared_task(bind=True, max_retries=3, default_retry_delay=60,
    autoretry_for=(ConnectionError, TimeoutError))
def send_email(self, to: str, subject: str, body: str) -> dict:
    try:
        response = requests.post(url, json=data, timeout=30)
        response.raise_for_status()
        return {"status": "sent", "to": to}
    except Exception as exc:
        raise self.retry(exc=exc)

Incorrect — No retry strategy:

# Fails permanently on first error
@shared_task
def send_email(to: str, subject: str):
    response = requests.post(url, json=data)  # Network error = lost job
    return response.json()

Correct — Retry with exponential backoff:

# Retries with backoff
@shared_task(bind=True, max_retries=3, default_retry_delay=60,
    autoretry_for=(ConnectionError, TimeoutError))
def send_email(self, to: str, subject: str):
    try:
        response = requests.post(url, json=data, timeout=30)
        response.raise_for_status()
        return response.json()
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

Tool Selection

ToolBest ForComplexity
ARQFastAPI, async jobsLow
CeleryComplex workflowsHigh
RQSimple Redis queuesLow
FastAPI BackgroundTasksQuick in-processNone

Configure Temporal activity timeouts, heartbeats, and retry policies to prevent data loss — HIGH

Temporal Activity and Worker Patterns

Incorrect — missing heartbeat and error classification:

@activity.defn
async def process_payment(input: PaymentInput) -> PaymentResult:
    # WRONG: No heartbeat for long operation
    # WRONG: No error classification (all errors retry)
    response = await httpx.post("https://payments.example.com/charge",
        json={"order_id": input.order_id, "amount": input.amount})
    return PaymentResult(**response.json())

Correct — heartbeat, error classification, and proper worker setup:

from temporalio import activity
from temporalio.exceptions import ApplicationError

@activity.defn
async def process_payment(input: PaymentInput) -> PaymentResult:
    activity.logger.info(f"Processing payment for order {input.order_id}")
    try:
        async with httpx.AsyncClient() as client:
            response = await client.post(
                "https://payments.example.com/charge",
                json={"order_id": input.order_id, "amount": input.amount})
            response.raise_for_status()
            return PaymentResult(**response.json())
    except httpx.HTTPStatusError as e:
        if e.response.status_code == 402:
            # Non-retryable: payment declined is a business error
            raise ApplicationError("Payment declined",
                non_retryable=True, type="PaymentDeclined")
        raise  # Retryable: transient HTTP errors

@activity.defn
async def send_notification(input: NotificationInput) -> None:
    for i, recipient in enumerate(input.recipients):
        # Heartbeat for long operations (required for activities > 60s)
        activity.heartbeat(f"Sending {i+1}/{len(input.recipients)}")
        await send_email(recipient, input.subject, input.body)

Worker Configuration

from temporalio.client import Client
from temporalio.worker import Worker

async def main():
    client = await Client.connect("localhost:7233")
    worker = Worker(
        client,
        task_queue="order-processing",
        workflows=[OrderWorkflow],
        activities=[create_order, process_payment, reserve_inventory, cancel_order_activity],
    )
    await worker.run()

async def start_order_workflow(order_data: OrderInput) -> str:
    client = await Client.connect("localhost:7233")
    handle = await client.start_workflow(
        OrderWorkflow.run, order_data,
        id=f"order-{order_data.order_id}",
        task_queue="order-processing",
    )
    return handle.id

Testing with WorkflowEnvironment

import pytest
from temporalio.testing import WorkflowEnvironment

@pytest.fixture
async def workflow_env():
    async with await WorkflowEnvironment.start_local() as env:
        yield env

@pytest.mark.asyncio
async def test_order_workflow(workflow_env):
    async with Worker(workflow_env.client, task_queue="test",
            workflows=[OrderWorkflow],
            activities=[create_order, process_payment]):
        result = await workflow_env.client.execute_workflow(
            OrderWorkflow.run, OrderInput(id="test-1", total=100),
            id="test-order-1", task_queue="test",
        )
        assert result.order_id == "test-1"

Key Decisions

DecisionRecommendation
Activity timeoutstart_to_close for most cases
Retry policy3 attempts default, exponential backoff
HeartbeatingRequired for activities > 60s
Error handlingApplicationError(non_retryable=True) for business errors
Worker deploymentSeparate workers per task queue in production
TestingWorkflowEnvironment.start_local() for integration tests

Define deterministic Temporal workflows with correct signal and query patterns — HIGH

Temporal Workflow Definitions

Incorrect — non-deterministic workflow operations:

@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order_data: OrderInput) -> OrderResult:
        # WRONG: Non-deterministic in workflow code
        if random.random() > 0.5:
            await do_something()
        if datetime.now() > deadline:
            await cancel()
        # WRONG: Direct I/O in workflow
        response = await httpx.get("https://api.example.com")

Correct — deterministic workflow with proper APIs:

from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta

@workflow.defn
class OrderWorkflow:
    def __init__(self):
        self._status = "pending"
        self._order_id: str | None = None

    @workflow.run
    async def run(self, order_data: OrderInput) -> OrderResult:
        self._order_id = await workflow.execute_activity(
            create_order, order_data,
            start_to_close_timeout=timedelta(seconds=30),
            retry_policy=RetryPolicy(maximum_attempts=3, initial_interval=timedelta(seconds=1)),
        )
        self._status = "processing"

        # Parallel activities via asyncio.gather
        payment, inventory = await asyncio.gather(
            workflow.execute_activity(process_payment, PaymentInput(order_id=self._order_id),
                start_to_close_timeout=timedelta(minutes=5)),
            workflow.execute_activity(reserve_inventory, InventoryInput(order_id=self._order_id),
                start_to_close_timeout=timedelta(minutes=2)),
        )

        self._status = "completed"
        return OrderResult(order_id=self._order_id, payment_id=payment.id)

    @workflow.query
    def get_status(self) -> str:
        return self._status

    @workflow.signal
    async def cancel_order(self, reason: str):
        self._status = "cancelling"
        await workflow.execute_activity(cancel_order_activity,
            CancelInput(order_id=self._order_id),
            start_to_close_timeout=timedelta(seconds=30))
        self._status = "cancelled"

Saga Pattern with Compensation

@workflow.defn
class OrderSagaWorkflow:
    @workflow.run
    async def run(self, order: OrderInput) -> OrderResult:
        compensations: list[tuple[Callable, Any]] = []

        try:
            reservation = await workflow.execute_activity(
                reserve_inventory, order.items,
                start_to_close_timeout=timedelta(minutes=2))
            compensations.append((release_inventory, reservation.id))

            payment = await workflow.execute_activity(
                charge_payment, PaymentInput(order_id=order.id),
                start_to_close_timeout=timedelta(minutes=5))
            compensations.append((refund_payment, payment.id))

            shipment = await workflow.execute_activity(
                create_shipment, ShipmentInput(order_id=order.id),
                start_to_close_timeout=timedelta(minutes=3))
            return OrderResult(order_id=order.id, payment_id=payment.id, shipment_id=shipment.id)

        except Exception:
            workflow.logger.warning(f"Saga failed, running {len(compensations)} compensations")
            for compensate_fn, compensate_arg in reversed(compensations):
                try:
                    await workflow.execute_activity(compensate_fn, compensate_arg,
                        start_to_close_timeout=timedelta(minutes=2))
                except Exception as e:
                    workflow.logger.error(f"Compensation failed: {e}")
            raise

Key Decisions

DecisionRecommendation
Workflow IDBusiness-meaningful, idempotent (e.g., order-\{order_id\})
Task queuePer-service or per-workflow-type
DeterminismUse workflow.random(), workflow.now() — never stdlib
I/OAlways via activities, never directly in workflows
Timersworkflow.wait_condition() with timeout for human-in-the-loop

References (8)

Arq Patterns

ARQ Patterns

Lightweight async Redis Queue patterns for FastAPI and simple background tasks.

Worker Setup

# backend/app/workers/arq_worker.py
from arq import create_pool
from arq.connections import RedisSettings

async def startup(ctx: dict):
    """Initialize worker resources on startup."""
    ctx["db"] = await create_db_pool()
    ctx["http"] = httpx.AsyncClient()

async def shutdown(ctx: dict):
    """Cleanup worker resources on shutdown."""
    await ctx["db"].close()
    await ctx["http"].aclose()

class WorkerSettings:
    redis_settings = RedisSettings(host="redis", port=6379)
    functions = [
        send_email,
        generate_report,
        process_webhook,
    ]
    on_startup = startup
    on_shutdown = shutdown
    max_jobs = 10
    job_timeout = 300  # 5 minutes

Task Definition

from arq import func

async def send_email(
    ctx: dict,
    to: str,
    subject: str,
    body: str,
) -> dict:
    """Send email task using shared HTTP client."""
    http = ctx["http"]
    response = await http.post(
        "https://api.sendgrid.com/v3/mail/send",
        json={"to": to, "subject": subject, "html": body},
        headers={"Authorization": f"Bearer {SENDGRID_KEY}"},
    )
    return {"status": response.status_code, "to": to}

async def generate_report(
    ctx: dict,
    report_id: str,
    format: str = "pdf",
) -> dict:
    """Generate report asynchronously using shared DB pool."""
    db = ctx["db"]
    data = await db.fetch_report_data(report_id)
    pdf_bytes = await render_pdf(data)
    await db.save_report_file(report_id, pdf_bytes)
    return {"report_id": report_id, "size": len(pdf_bytes)}

FastAPI Integration

Enqueue Jobs

from arq import create_pool
from arq.connections import RedisSettings, ArqRedis

# Dependency
async def get_arq_pool() -> ArqRedis:
    return await create_pool(RedisSettings(host="redis"))

@router.post("/api/v1/reports")
async def create_report(
    data: ReportRequest,
    arq: ArqRedis = Depends(get_arq_pool),
):
    report = await service.create_report(data)

    # Enqueue background job
    job = await arq.enqueue_job(
        "generate_report",
        report.id,
        format=data.format,
    )

    return {"report_id": report.id, "job_id": job.job_id}

Job Status Endpoint

from arq.jobs import Job, JobStatus

@router.get("/api/v1/jobs/{job_id}")
async def get_job_status(
    job_id: str,
    arq: ArqRedis = Depends(get_arq_pool),
):
    job = Job(job_id, arq)
    status = await job.status()
    result = await job.result() if status == JobStatus.complete else None

    return {
        "job_id": job_id,
        "status": status.value,
        "result": result,
    }

Delayed Tasks

from datetime import timedelta

# Run in 1 hour
job = await arq.enqueue_job(
    "send_reminder",
    user_id="123",
    _defer_by=timedelta(hours=1),
)

# Run at specific time
from datetime import datetime, timezone

job = await arq.enqueue_job(
    "send_notification",
    user_id="123",
    _defer_until=datetime(2024, 12, 25, 9, 0, tzinfo=timezone.utc),
)

FastAPI Lifespan Integration

from contextlib import asynccontextmanager
from arq import create_pool
from arq.connections import RedisSettings

@asynccontextmanager
async def lifespan(app: FastAPI):
    """Initialize ARQ pool at startup, close at shutdown."""
    app.state.arq_pool = await create_pool(RedisSettings(host="redis"))
    yield
    await app.state.arq_pool.close()

app = FastAPI(lifespan=lifespan)

# Access pool in routes
async def get_arq(request: Request) -> ArqRedis:
    return request.app.state.arq_pool

FastAPI BackgroundTasks (In-Process)

from fastapi import BackgroundTasks

@router.post("/api/v1/users")
async def create_user(
    data: UserCreate,
    background_tasks: BackgroundTasks,
):
    user = await service.create_user(data)

    # Simple in-process background task
    # WARNING: Lost if server restarts — only for non-critical work
    background_tasks.add_task(send_welcome_email, user.email)

    return user

ARQ vs Celery vs BackgroundTasks

FeatureARQCeleryBackgroundTasks
Async nativeYesNo (gevent/eventlet)Yes
BrokerRedis onlyRedis, RabbitMQ, SQSNone (in-process)
WorkflowsNoChain, group, chordNo
MonitoringBasicFlower, eventsNone
PersistenceRedisRedis, RabbitMQNone
ComplexityLowHighMinimal

When to Use ARQ

  • Building with FastAPI and native async/await
  • Simple background tasks (email, notifications, reports)
  • Redis is already in the stack
  • Want minimal dependencies and configuration
  • Tasks are independent (no complex workflows)

When NOT to Use ARQ

  • Need complex workflows (chains, chords) -- use Celery
  • Need RabbitMQ or SQS -- use Celery
  • Need LLM workflows -- use LangGraph
  • Need guaranteed delivery with dead letter queues -- use Celery
  • Need per-task rate limiting -- use Celery

Canvas Workflows

Canvas Workflows

Celery canvas primitives for building complex task pipelines with chain, group, chord, and nested workflows.

Signatures

from celery import signature, chain, group, chord

# Create a reusable task signature
sig = signature("tasks.process_order", args=[order_id], kwargs={"priority": "high"})

# Immutable signature — won't receive results from previous task
sig = process_order.si(order_id)

# Partial signature — curry arguments
partial_sig = send_email.s(subject="Order Update")
# Later: partial_sig.delay(to="user@example.com", body="...")

Chains (Sequential Execution)

from celery import chain

# Tasks execute sequentially, each receiving the previous result
workflow = chain(
    extract_data.s(source_id),      # Returns raw_data
    transform_data.s(),              # Receives raw_data, returns clean_data
    load_data.s(destination_id),     # Receives clean_data
)
result = workflow.apply_async()

# Access results
chain_result = result.get()          # Final result
parent_result = result.parent.get()  # Previous task result

Chain Error Handling

from celery.exceptions import Reject

@celery_app.task(bind=True, max_retries=3)
def extract_data(self, source_id: str) -> dict:
    try:
        return fetch_from_source(source_id)
    except ConnectionError as exc:
        # Retryable: chain pauses, retries this step
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)
    except ValidationError as exc:
        # Non-retryable: reject and stop entire chain
        raise Reject(str(exc), requeue=False)

# Error callback for the whole chain
def create_etl_pipeline(source_id: str, dest: str) -> AsyncResult:
    pipeline = chain(
        extract_data.s(source_id),
        transform_data.s(schema="v2"),
        load_data.s(destination=dest),
    )
    return pipeline.apply_async(
        link_error=handle_pipeline_error.s(source_id=source_id)
    )

@celery_app.task
def handle_pipeline_error(request, exc, traceback, source_id: str):
    alert_team(f"ETL failed for {source_id}: {exc}")

Groups (Parallel Execution)

from celery import group

# Execute tasks in parallel
parallel = group(
    process_chunk.s(chunk) for chunk in chunks
)
group_result = parallel.apply_async()

# Wait for all to complete
results = group_result.get()

# Check completion status
group_result.ready()      # All completed?
group_result.successful() # All succeeded?
group_result.failed()     # Any failed?

Chords (Parallel + Callback)

from celery import chord

# Parallel execution with callback when ALL complete
workflow = chord(
    [process_chunk.s(chunk) for chunk in chunks],
    aggregate_results.s()  # Receives list of all results
)
result = workflow.apply_async()

# IMPORTANT: If any header task fails, callback won't run

Chord with Partial Failure Tolerance

@celery_app.task(bind=True)
def process_chunk(self, chunk_id: int, data: list) -> dict:
    """Return error dict instead of raising to allow chord to complete."""
    try:
        result = expensive_computation(data)
        return {"chunk_id": chunk_id, "status": "success", "result": result}
    except Exception as exc:
        return {"chunk_id": chunk_id, "status": "error", "error": str(exc)}

@celery_app.task
def aggregate_with_errors(results: list[dict]) -> dict:
    """Aggregate results, handling partial failures gracefully."""
    successes = [r for r in results if r["status"] == "success"]
    failures = [r for r in results if r["status"] == "error"]
    return {
        "total": len(results),
        "succeeded": len(successes),
        "failed": len(failures),
        "aggregated": sum(r["result"] for r in successes),
        "failed_chunks": [r["chunk_id"] for r in failures],
    }

Map, Starmap, and Chunks

# Map: apply same task to each item
workflow = process_item.map([item1, item2, item3])

# Starmap: unpack args for each call
workflow = send_email.starmap([
    ("user1@example.com", "Subject 1"),
    ("user2@example.com", "Subject 2"),
])

# Chunks: split large list into batches
workflow = process_item.chunks(items, batch_size=100)

Nested Workflows

def create_order_workflow(order_id: str) -> AsyncResult:
    """Complex workflow combining chain, group, and immutable signatures."""
    return chain(
        # Step 1: Validate
        validate_order.s(order_id),

        # Step 2: Parallel inventory checks (group inside chain)
        group(
            check_inventory.s(item_id)
            for item_id in get_order_items(order_id)
        ),

        # Step 3: Aggregate inventory results
        aggregate_inventory.s(),

        # Step 4: Payment (ignores input from aggregate)
        process_payment.si(order_id),

        # Step 5: Parallel notifications
        group(
            send_confirmation_email.si(order_id),
            send_sms_notification.si(order_id),
            update_analytics.si(order_id),
        ),
    ).apply_async()

Result Inspection

from celery.result import AsyncResult, GroupResult

def inspect_chain_result(task_id: str) -> dict:
    """Traverse chain results from leaf to root."""
    result = AsyncResult(task_id)
    chain_results = []

    current = result
    while current is not None:
        chain_results.append({
            "task_id": current.id,
            "state": current.state,
            "result": current.result if current.ready() else None,
        })
        current = current.parent

    return {"results": list(reversed(chain_results))}

Canvas Best Practices

PatternWhen to UseKey Consideration
ChainSequential stepsUse si() for steps that ignore input
GroupParallel independent tasksMonitor memory with large groups
ChordFan-out/fan-inCallback runs only if ALL succeed
StarmapSame task, different argsMore efficient than group
ChunksLarge datasetsBalance chunk size vs overhead

Error Recovery Strategies

  1. Retry transient: Use autoretry_for with backoff
  2. Reject permanent: Use Reject(requeue=False) to stop chain
  3. Soft fail: Return error dict instead of raising (for chords)
  4. Link error: Use link_error callback for notifications

Celery Config

Celery Configuration

Production Celery app setup with secure defaults, broker configuration, and worker tuning.

Application Setup

# backend/app/workers/celery_app.py
from celery import Celery

celery_app = Celery(
    "myapp",
    broker="redis://redis:6379/0",
    backend="redis://redis:6379/1",
)

celery_app.conf.update(
    # Serialization — JSON only (never pickle)
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",

    # Timezone
    timezone="UTC",
    enable_utc=True,

    # Task tracking
    task_track_started=True,

    # Time limits (seconds)
    task_time_limit=600,       # 10 min hard kill
    task_soft_time_limit=540,  # 9 min soft limit (raises SoftTimeLimitExceeded)

    # Worker behavior
    worker_prefetch_multiplier=1,  # Fair task distribution
    worker_max_tasks_per_child=1000,  # Restart worker after N tasks (leak protection)

    # Reliability
    task_acks_late=True,            # Ack after completion (not before)
    task_reject_on_worker_lost=True,  # Re-queue if worker dies mid-task

    # Result backend
    result_expires=86400,  # 24 hours
    result_backend_transport_options={
        "global_keyprefix": "celery_result:",
    },
)

Broker Configuration

# Redis broker options
celery_app.conf.broker_transport_options = {
    "visibility_timeout": 43200,  # 12 hours (must exceed longest task)
    "retry_policy": {
        "timeout": 5.0,
    },
    "max_retries": 3,
}

# Connection pool
celery_app.conf.broker_pool_limit = 10
celery_app.conf.broker_connection_retry_on_startup = True

Worker Tuning

# Production worker startup
celery -A app worker \
    --loglevel=INFO \
    --concurrency=4 \
    --prefetch-multiplier=1 \
    --max-tasks-per-child=1000 \
    --without-heartbeat \
    --without-gossip \
    --without-mingle \
    -Ofair

Concurrency Guidelines

Workload TypeConcurrencyPrefetchNotes
CPU-boundN cores1Process pool
I/O-bound2-4x cores1-4Gevent/eventlet or prefork
MixedN cores1Process pool, fair scheduling
Bulk/batchN cores4-8Higher prefetch for throughput

Docker Compose

# docker-compose.yml
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  celery-worker:
    build: .
    command: celery -A app worker --loglevel=INFO --concurrency=4
    depends_on:
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1

  celery-beat:
    build: .
    command: celery -A app beat --loglevel=INFO
    depends_on:
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0

volumes:
  redis-data:

Configuration Anti-Patterns

# NEVER use pickle serializer (security risk)
celery_app.conf.task_serializer = "pickle"  # FORBIDDEN

# NEVER disable late ack in production
celery_app.conf.task_acks_late = False  # Tasks lost on crash

# NEVER set visibility_timeout shorter than longest task
celery_app.conf.broker_transport_options = {
    "visibility_timeout": 60,  # Task re-dispatched if still running!
}

# NEVER skip time limits
# Without limits, a hung task blocks the worker slot forever

Monitoring Health

Monitoring & Health Checks

Production monitoring with Flower, Celery signals, health checks, and Prometheus metrics.

Flower Dashboard

# Install and run Flower
pip install flower

# Basic setup
celery -A app flower --port=5555 --basic_auth=admin:password

# With persistent storage
celery -A app flower --persistent=True --db=flower.db

# With Prometheus metrics endpoint
celery -A app flower --port=5555 --broker_api=redis://redis:6379/0

Celery Signal Metrics

from celery import signals
from prometheus_client import Counter, Histogram, Gauge
import time

# Counters
tasks_started = Counter(
    "celery_tasks_started_total",
    "Tasks started",
    ["task_name"],
)

tasks_completed = Counter(
    "celery_tasks_completed_total",
    "Tasks completed",
    ["task_name", "state"],
)

tasks_failed = Counter(
    "celery_tasks_failed_total",
    "Tasks failed",
    ["task_name", "exception_type"],
)

# Histogram
task_duration = Histogram(
    "celery_task_duration_seconds",
    "Task execution duration",
    ["task_name"],
    buckets=[0.1, 0.5, 1, 5, 10, 30, 60, 300, 600],
)

# Gauge
active_tasks = Gauge(
    "celery_active_tasks",
    "Currently executing tasks",
    ["task_name"],
)

# Wire up signals
@signals.task_prerun.connect
def on_task_start(sender, task_id, task, args, kwargs, **_):
    tasks_started.labels(task_name=task.name).inc()
    active_tasks.labels(task_name=task.name).inc()
    # Store start time in task request
    task.request._start_time = time.time()

@signals.task_postrun.connect
def on_task_complete(sender, task_id, task, args, kwargs, retval, state, **_):
    tasks_completed.labels(task_name=task.name, state=state).inc()
    active_tasks.labels(task_name=task.name).dec()

    # Record duration
    start_time = getattr(task.request, "_start_time", None)
    if start_time:
        duration = time.time() - start_time
        task_duration.labels(task_name=task.name).observe(duration)

@signals.task_failure.connect
def on_task_failure(sender, task_id, exception, args, kwargs, traceback, einfo, **_):
    tasks_failed.labels(
        task_name=sender.name,
        exception_type=type(exception).__name__,
    ).inc()

    # Alert on critical task failures
    if sender.name.startswith("tasks.payment"):
        alerting.send_alert(
            f"Payment task {sender.name} failed: {exception}",
            severity="critical",
        )

@signals.task_retry.connect
def on_task_retry(sender, reason, **kwargs):
    tasks_completed.labels(task_name=sender.name, state="RETRY").inc()

Health Check Endpoint

from celery import current_app

def celery_health_check() -> dict:
    """Comprehensive Celery health check."""
    try:
        # Check broker connection
        conn = current_app.connection()
        conn.ensure_connection(max_retries=3)

        # Check workers responding
        inspector = current_app.control.inspect()
        active_workers = inspector.active()

        if not active_workers:
            return {"status": "unhealthy", "reason": "No active workers"}

        # Check queue depths
        import redis
        r = redis.from_url(REDIS_URL)
        queue_depths = {}
        for queue in ["critical", "high", "default", "low"]:
            queue_depths[queue] = r.llen(queue)

        return {
            "status": "healthy",
            "workers": list(active_workers.keys()),
            "active_tasks": sum(len(tasks) for tasks in active_workers.values()),
            "queue_depths": queue_depths,
        }
    except Exception as e:
        return {"status": "unhealthy", "reason": str(e)}

# FastAPI health endpoint
@router.get("/health/celery")
async def celery_health():
    result = celery_health_check()
    status_code = 200 if result["status"] == "healthy" else 503
    return JSONResponse(content=result, status_code=status_code)

Worker Inspection

from celery import current_app

def inspect_workers() -> dict:
    """Get detailed worker status."""
    inspector = current_app.control.inspect()

    return {
        "active": inspector.active(),        # Currently executing
        "reserved": inspector.reserved(),    # Prefetched, waiting
        "scheduled": inspector.scheduled(),  # ETA/countdown tasks
        "registered": inspector.registered_tasks(),  # Available task types
        "stats": inspector.stats(),          # Worker statistics
    }

Queue Depth Monitoring

import redis
from prometheus_client import Gauge

queue_depth_gauge = Gauge(
    "celery_queue_depth",
    "Number of pending tasks in queue",
    ["queue_name"],
)

def update_queue_metrics(redis_url: str):
    """Update queue depth metrics (call periodically)."""
    r = redis.from_url(redis_url)

    for queue in ["critical", "high", "default", "low", "bulk"]:
        depth = r.llen(queue)
        queue_depth_gauge.labels(queue_name=queue).set(depth)

# Schedule this as a periodic task
@celery_app.task
def collect_queue_metrics():
    update_queue_metrics(REDIS_URL)

celery_app.conf.beat_schedule["collect-metrics"] = {
    "task": "tasks.collect_queue_metrics",
    "schedule": 30.0,  # Every 30 seconds
}

Alerting Rules

# Alert configuration
ALERT_RULES = {
    "queue_depth_critical": {
        "queue": "critical",
        "threshold": 100,
        "severity": "critical",
        "message": "Critical queue depth exceeds {depth}",
    },
    "queue_depth_default": {
        "queue": "default",
        "threshold": 5000,
        "severity": "warning",
        "message": "Default queue backing up: {depth} pending",
    },
    "worker_down": {
        "min_workers": 1,
        "severity": "critical",
        "message": "No active Celery workers",
    },
}

def check_alerts(health: dict):
    """Evaluate alert rules against current health."""
    alerts = []
    for rule_name, rule in ALERT_RULES.items():
        if "queue" in rule:
            depth = health.get("queue_depths", {}).get(rule["queue"], 0)
            if depth > rule["threshold"]:
                alerts.append({
                    "rule": rule_name,
                    "severity": rule["severity"],
                    "message": rule["message"].format(depth=depth),
                })
        elif "min_workers" in rule:
            worker_count = len(health.get("workers", []))
            if worker_count < rule["min_workers"]:
                alerts.append({
                    "rule": rule_name,
                    "severity": rule["severity"],
                    "message": rule["message"],
                })
    return alerts

Key Metrics Summary

MetricTypeDescription
Queue depthGaugePending tasks per queue
Task durationHistogramp50/p95/p99 execution time
Tasks startedCounterTotal tasks dispatched
Tasks completedCounterTotal tasks finished (by state)
Tasks failedCounterTotal failures (by exception type)
Active tasksGaugeCurrently executing tasks
Worker countGaugeNumber of active workers

Result Backends

Result Backends

Task result storage, custom states, progress tracking, and rate limiting patterns.

Redis Result Backend

# celery_config.py
celery_app.conf.update(
    result_backend="redis://redis:6379/1",
    result_serializer="json",
    result_expires=86400,  # 24 hours
    result_backend_transport_options={
        "global_keyprefix": "celery_result:",
    },
)

Custom Task States

from celery import states

# Define custom states
VALIDATING = "VALIDATING"
PROCESSING = "PROCESSING"
UPLOADING = "UPLOADING"

@celery_app.task(bind=True)
def long_running_task(self, data: dict) -> dict:
    """Task with progress tracking via custom states."""
    self.update_state(
        state=VALIDATING,
        meta={"step": 1, "total": 3, "description": "Validating input"},
    )
    validated = validate(data)

    self.update_state(
        state=PROCESSING,
        meta={"step": 2, "total": 3, "description": "Processing data"},
    )
    result = process(validated)

    self.update_state(
        state=UPLOADING,
        meta={"step": 3, "total": 3, "description": "Uploading results"},
    )
    upload(result)

    return {"status": "complete", "url": result.url}

Progress Tracking API

from celery.result import AsyncResult

@router.get("/api/v1/jobs/{job_id}")
async def get_job_status(job_id: str) -> dict:
    """Query task progress and result."""
    result = AsyncResult(job_id, app=celery_app)

    response = {
        "job_id": job_id,
        "status": result.status,
    }

    if result.status == "PROGRESS" or result.status in (VALIDATING, PROCESSING, UPLOADING):
        response["progress"] = result.info
    elif result.ready():
        if result.successful():
            response["result"] = result.result
        else:
            response["error"] = str(result.result)

    return response

# Batch status check
@router.post("/api/v1/jobs/batch-status")
async def batch_job_status(job_ids: list[str]) -> dict:
    """Check status of multiple jobs at once."""
    statuses = {}
    for job_id in job_ids:
        result = AsyncResult(job_id, app=celery_app)
        statuses[job_id] = {
            "status": result.status,
            "ready": result.ready(),
        }
    return {"jobs": statuses}

Large Result Handling

import boto3

@celery_app.task(bind=True)
def generate_report(self, report_id: str) -> dict:
    """Store large results in S3, return reference only."""
    self.update_state(state="PROCESSING", meta={"step": "generating"})

    # Generate large report
    pdf_bytes = render_pdf(report_id)

    # Store in S3 (NEVER store large data in Redis result backend)
    s3 = boto3.client("s3")
    s3_key = f"reports/{report_id}/{datetime.now().isoformat()}.pdf"
    s3.put_object(
        Bucket="reports-bucket",
        Key=s3_key,
        Body=pdf_bytes,
        ContentType="application/pdf",
    )

    # Return only the reference
    return {
        "report_id": report_id,
        "s3_key": s3_key,
        "size_bytes": len(pdf_bytes),
        "generated_at": datetime.now(timezone.utc).isoformat(),
    }

Job Status Lifecycle

PENDING → STARTED → SUCCESS
                  → FAILURE
                  → RETRY → STARTED → ...
                  → REVOKED

Custom states:
PENDING → STARTED → VALIDATING → PROCESSING → UPLOADING → SUCCESS

Rate Limiting

Static Rate Limits

@celery_app.task(rate_limit="100/m")  # 100 per minute
def call_external_api(endpoint: str) -> dict:
    return requests.get(endpoint, timeout=30).json()

@celery_app.task(rate_limit="10/s")   # 10 per second
def send_push_notification(user_id: str, message: str):
    push_service.send(user_id, message)

Dynamic Rate Limiting

from celery import current_app

class RateLimitManager:
    def __init__(self, app):
        self.app = app
        self.defaults = {
            "tasks.call_external_api": "100/m",
            "tasks.send_notification": "10/s",
        }

    def adjust_rate(self, task_name: str, new_rate: str, workers=None):
        self.app.control.rate_limit(task_name, new_rate, destination=workers)

    def reduce_for_high_load(self, factor: float = 0.5):
        for task, rate in self.defaults.items():
            value, unit = int(rate.split("/")[0]), rate.split("/")[1]
            new_rate = f"{int(value * factor)}/{unit}"
            self.adjust_rate(task, new_rate)

    def restore_defaults(self):
        for task, rate in self.defaults.items():
            self.adjust_rate(task, rate)

Token Bucket Rate Limiter

from celery import Task
import time

class TokenBucketTask(Task):
    """Distributed token bucket rate limiting."""
    abstract = True
    rate_limit_key: str = None
    tokens_per_second: float = 10.0
    bucket_size: int = 100

    def acquire_token(self) -> bool:
        key = f"rate_limit:{self.rate_limit_key}"
        now = time.time()

        # Atomic Lua script for token bucket
        script = """
        local key = KEYS[1]
        local now = tonumber(ARGV[1])
        local rate = tonumber(ARGV[2])
        local capacity = tonumber(ARGV[3])

        local bucket = redis.call('HMGET', key, 'tokens', 'last_update')
        local tokens = tonumber(bucket[1]) or capacity
        local last_update = tonumber(bucket[2]) or now

        local elapsed = now - last_update
        tokens = math.min(capacity, tokens + elapsed * rate)

        if tokens >= 1 then
            tokens = tokens - 1
            redis.call('HMSET', key, 'tokens', tokens, 'last_update', now)
            redis.call('EXPIRE', key, 3600)
            return 1
        end
        return 0
        """
        return bool(redis_client.eval(script, 1, key, now, self.tokens_per_second, self.bucket_size))

    def __call__(self, *args, **kwargs):
        if not self.acquire_token():
            countdown = 2 ** min(self.request.retries, 6)
            raise self.retry(countdown=countdown)
        return super().__call__(*args, **kwargs)

# Usage
@celery_app.task(
    base=TokenBucketTask,
    bind=True,
    rate_limit_key="stripe_api",
    tokens_per_second=25,
    bucket_size=100,
)
def charge_customer(self, customer_id: str, amount: int):
    return stripe.PaymentIntent.create(customer=customer_id, amount=amount, currency="usd")

Rate Limit Patterns Summary

PatternUse CaseImplementation
StaticSimple API quotas@task(rate_limit="100/m")
DynamicAdaptive to loadapp.control.rate_limit()
Token bucketSmooth burst handlingCustom Task base class
Per-userMulti-tenant fairnessUser-keyed counters

Retry Strategies

Retry Strategies

Exponential backoff, idempotency, dead letter queues, and task locking patterns.

Exponential Backoff with Jitter

from celery import shared_task

@shared_task(
    bind=True,
    max_retries=5,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_backoff=True,        # Exponential: 1s, 2s, 4s, 8s, 16s
    retry_backoff_max=600,     # Cap at 10 minutes
    retry_jitter=True,         # Randomize to prevent thundering herd
)
def call_external_api(self, endpoint: str) -> dict:
    """Task with automatic retry on transient failures."""
    response = requests.get(endpoint, timeout=30)
    response.raise_for_status()
    return response.json()

Manual Retry with Custom Delay

@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def send_email(self, to: str, subject: str, body: str) -> dict:
    """Manual retry with explicit error handling."""
    try:
        response = requests.post(
            "https://api.sendgrid.com/v3/mail/send",
            json={"to": to, "subject": subject, "html": body},
            headers={"Authorization": f"Bearer {SENDGRID_KEY}"},
            timeout=30,
        )
        response.raise_for_status()
        return {"status": "sent", "to": to}
    except requests.ConnectionError as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 10)
    except requests.HTTPError as exc:
        if exc.response.status_code >= 500:
            raise self.retry(exc=exc)  # Server error: retry
        raise  # Client error: fail immediately

Base Task with Retry Configuration

from celery import Task

class RetryableTask(Task):
    """Reusable base task with exponential backoff."""
    abstract = True
    autoretry_for = (ConnectionError, TimeoutError)
    max_retries = 5
    retry_backoff = True
    retry_backoff_max = 600
    retry_jitter = True

class DatabaseTask(Task):
    """Base task with database session management."""
    abstract = True
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = create_session()
        return self._db

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if self._db:
            self._db.close()
            self._db = None

# Usage
@celery_app.task(base=RetryableTask)
def call_flaky_api(endpoint: str):
    return requests.get(endpoint, timeout=30).json()

@celery_app.task(base=DatabaseTask)
def query_database(query: str):
    return query_database.db.execute(query)

Idempotency Protection

@shared_task(bind=True, max_retries=3, retry_backoff=True)
def process_payment(self, payment_id: str, amount: int) -> dict:
    """Idempotent task — safe to retry without duplicates."""
    idempotency_key = f"processed:payment:{payment_id}"

    # Check if already processed
    if redis_client.get(idempotency_key):
        return {"status": "already_processed", "payment_id": payment_id}

    try:
        # Process payment
        result = stripe.PaymentIntent.create(
            customer=get_customer(payment_id),
            amount=amount,
            currency="usd",
            idempotency_key=f"celery:{payment_id}",  # Stripe-level idempotency
        )

        # Mark as processed with 24h TTL
        redis_client.setex(idempotency_key, 86400, "1")

        return {"status": "success", "payment_id": payment_id, "intent_id": result.id}
    except stripe.error.RateLimitError as exc:
        raise self.retry(exc=exc, countdown=60)
    except stripe.error.InvalidRequestError:
        raise  # Non-retryable

Dead Letter Queue

from celery import signals

@shared_task(
    bind=True,
    max_retries=3,
    retry_backoff=True,
)
def important_task(self, data: dict) -> dict:
    """Task that moves to DLQ after exhausting retries."""
    try:
        return do_processing(data)
    except Exception as exc:
        if self.request.retries >= self.max_retries:
            move_to_dlq(self, data, exc)
            raise  # Let Celery mark as failed
        raise self.retry(exc=exc)

def move_to_dlq(task, data: dict, error: Exception):
    """Move failed task to dead letter queue for manual review."""
    from datetime import datetime, timezone

    dlq_entry = {
        "task_name": task.name,
        "task_id": task.request.id,
        "args": data,
        "error": str(error),
        "retries": task.request.retries,
        "timestamp": datetime.now(timezone.utc).isoformat(),
    }
    redis_client.lpush(f"dlq:{task.name}", json.dumps(dlq_entry))

# DLQ processing
def process_dlq(task_name: str, limit: int = 10):
    """Process dead letter queue entries."""
    dlq_key = f"dlq:{task_name}"
    entries = []

    for _ in range(limit):
        raw = redis_client.rpop(dlq_key)
        if not raw:
            break
        entries.append(json.loads(raw))

    return entries

Task Locking (Singleton Tasks)

@shared_task(bind=True)
def singleton_task(self, resource_id: str) -> dict:
    """Only one instance runs per resource at a time."""
    lock_key = f"lock:{self.name}:{resource_id}"

    if not redis_client.set(lock_key, "1", nx=True, ex=300):
        return {"status": "already_running", "resource_id": resource_id}

    try:
        result = do_exclusive_work(resource_id)
        return {"status": "completed", "result": result}
    finally:
        redis_client.delete(lock_key)

Retry Pattern Summary

PatternUse CaseImplementation
Auto retryTransient errorsautoretry_for + retry_backoff
Manual retryConditional retryself.retry(exc=exc, countdown=N)
IdempotencyPayment, creationRedis key check before processing
DLQFailed after retriesMove to Redis list for review
SingletonExclusive tasksRedis lock with TTL
Base classShared configCustom Task subclass

Scheduled Tasks

Scheduled Tasks

Celery Beat periodic task configuration with crontab, database-backed schedules, and overlap prevention.

Basic Beat Schedule

# celery_config.py
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    # Interval-based (every N seconds)
    "process-pending-orders": {
        "task": "tasks.process_pending_orders",
        "schedule": 300.0,  # Every 5 minutes
    },

    # Daily at specific time
    "daily-report": {
        "task": "tasks.generate_daily_report",
        "schedule": crontab(hour=6, minute=0),
    },

    # Weekly
    "weekly-cleanup": {
        "task": "tasks.cleanup_old_records",
        "schedule": crontab(hour=2, minute=0, day_of_week="sunday"),
    },

    # Monthly
    "monthly-billing": {
        "task": "tasks.process_monthly_billing",
        "schedule": crontab(hour=0, minute=0, day_of_month=1),
    },

    # With arguments
    "sync-inventory": {
        "task": "tasks.sync_inventory",
        "schedule": crontab(minute="*/15"),
        "args": ("warehouse-1",),
        "kwargs": {"full_sync": False},
    },
}

celery_app.conf.timezone = "UTC"

Crontab Reference

from celery.schedules import crontab

SCHEDULES = {
    # Every minute
    "every_minute": crontab(),

    # Every hour at :00
    "hourly": crontab(minute=0),

    # Daily at midnight
    "daily_midnight": crontab(hour=0, minute=0),

    # Weekdays at 9 AM
    "weekday_morning": crontab(hour=9, minute=0, day_of_week="mon-fri"),

    # Every 15 min during business hours
    "business_hours": crontab(
        minute="*/15",
        hour="9-17",
        day_of_week="mon-fri",
    ),

    # First Monday of each month
    "first_monday": crontab(
        hour=0, minute=0,
        day_of_week="monday",
        day_of_month="1-7",
    ),

    # Quarterly (Jan 1, Apr 1, Jul 1, Oct 1)
    "quarterly": crontab(
        hour=0, minute=0,
        day_of_month=1,
        month_of_year="1,4,7,10",
    ),
}

Database-Backed Schedules

# Using django-celery-beat for dynamic schedules
# pip install django-celery-beat

# settings.py
INSTALLED_APPS = [..., "django_celery_beat"]

# celery_config.py
celery_app.conf.beat_scheduler = "django_celery_beat.schedulers:DatabaseScheduler"

# Create schedules programmatically
from django_celery_beat.models import PeriodicTask, CrontabSchedule

def create_user_report_schedule(user_id: str, hour: int):
    """Create user-specific scheduled report."""
    schedule, _ = CrontabSchedule.objects.get_or_create(
        hour=hour, minute=0, timezone="UTC",
    )
    PeriodicTask.objects.update_or_create(
        name=f"user-report-{user_id}",
        defaults={
            "task": "tasks.generate_user_report",
            "crontab": schedule,
            "args": json.dumps([user_id]),
            "enabled": True,
        },
    )

def disable_schedule(name: str):
    PeriodicTask.objects.filter(name=name).update(enabled=False)

Overlap Prevention

import redis
from contextlib import contextmanager

redis_client = redis.from_url(REDIS_URL)

@contextmanager
def schedule_lock(task_name: str, timeout: int = 3600):
    """Prevent overlapping scheduled task runs."""
    lock_key = f"schedule_lock:{task_name}"
    lock = redis_client.lock(lock_key, timeout=timeout)

    acquired = lock.acquire(blocking=False)
    if not acquired:
        raise ScheduleLockError(f"Task {task_name} already running")

    try:
        yield
    finally:
        lock.release()

@celery_app.task(bind=True)
def long_running_scheduled_task(self):
    """Scheduled task that must not overlap."""
    with schedule_lock(self.name, timeout=7200):
        perform_long_operation()

# Alternative: simple Redis lock
@celery_app.task(bind=True, acks_late=True, reject_on_worker_lost=True)
def exclusive_scheduled_task(self):
    lock_id = f"{self.name}-lock"

    if not redis_client.set(lock_id, "1", nx=True, ex=3600):
        return {"status": "skipped", "reason": "already running"}

    try:
        return perform_operation()
    finally:
        redis_client.delete(lock_id)

Adaptive Polling

from celery import shared_task

@shared_task(bind=True)
def adaptive_poll(self, resource_id: str):
    """Self-rescheduling task with adaptive interval."""
    result = check_resource(resource_id)

    if result.has_changes:
        # Activity detected: poll more frequently
        self.apply_async(args=(resource_id,), countdown=30)
    else:
        # No activity: exponential backoff
        current_delay = self.request.kwargs.get("delay", 30)
        next_delay = min(current_delay * 2, 300)  # Max 5 min
        self.apply_async(
            args=(resource_id,),
            kwargs={"delay": next_delay},
            countdown=next_delay,
        )
    return result

Running Beat

# Standalone beat process (recommended for production)
celery -A app beat --loglevel=INFO

# With database scheduler
celery -A app beat --scheduler django_celery_beat.schedulers:DatabaseScheduler

# With PID file for process management
celery -A app beat --pidfile=/var/run/celery/beat.pid

# Embedded beat in worker (development only)
celery -A app worker --beat --loglevel=INFO

Beat Monitoring

from celery import signals
from prometheus_client import Counter

beat_sent = Counter(
    "celery_beat_tasks_sent_total",
    "Scheduled tasks sent by beat",
    ["task_name"],
)

@signals.beat_sent.connect
def on_beat_sent(sender, task_id, task, args, kwargs, **_):
    beat_sent.labels(task_name=task.name).inc()

Best Practices

PracticeReason
Use UTC timezoneAvoid DST issues
Add schedule locksPrevent overlap for long tasks
Use database schedulerDynamic schedule management
Monitor beat healthDetect missed schedules
Separate beat processBetter reliability than embedded
Set task time limitsPrevent runaway scheduled tasks

Task Routing

Task Routing & Priority Queues

Multi-queue configuration, priority levels, and dynamic routing for Celery 5.x.

Queue Definition

# celery_config.py
from kombu import Queue, Exchange

default_exchange = Exchange("default", type="direct")
priority_exchange = Exchange("priority", type="direct")

celery_app.conf.task_queues = (
    Queue(
        "critical",
        exchange=priority_exchange,
        routing_key="critical",
        queue_arguments={"x-max-priority": 10},
    ),
    Queue(
        "high",
        exchange=priority_exchange,
        routing_key="high",
        queue_arguments={"x-max-priority": 10},
    ),
    Queue(
        "default",
        exchange=default_exchange,
        routing_key="default",
        queue_arguments={"x-max-priority": 10},
    ),
    Queue(
        "low",
        exchange=default_exchange,
        routing_key="low",
    ),
    Queue(
        "bulk",
        exchange=default_exchange,
        routing_key="bulk",
        # No priority for bulk — FIFO is fine
    ),
)

celery_app.conf.task_default_queue = "default"
celery_app.conf.task_default_priority = 5

# Redis priority support (required)
celery_app.conf.broker_transport_options = {
    "priority_steps": list(range(10)),  # 0-9 priority levels
    "sep": ":",
    "queue_order_strategy": "priority",
}

Static Task Routing

# Route by task name pattern
celery_app.conf.task_routes = {
    "tasks.payment.*": {"queue": "critical"},
    "tasks.notification.*": {"queue": "high"},
    "tasks.analytics.*": {"queue": "low"},
    "tasks.report.*": {"queue": "bulk"},
    "tasks.*": {"queue": "default"},
}

# Route by task decorator
@celery_app.task(queue="critical", priority=9)
def process_payment(payment_id: str):
    """Always runs on critical queue."""
    pass

# Route at call time
process_order.apply_async(args=[order_id], queue="high", priority=8)

Dynamic Router Class

import fnmatch
from enum import IntEnum

class TaskPriority(IntEnum):
    BULK = 0
    LOW = 2
    NORMAL = 5
    HIGH = 7
    CRITICAL = 9

class TaskRouter:
    """Route tasks based on name, args, and kwargs."""

    ROUTES = {
        "tasks.payment.*": {"queue": "critical", "priority": TaskPriority.CRITICAL},
        "tasks.notification.*": {"queue": "high", "priority": TaskPriority.HIGH},
        "tasks.analytics.*": {"queue": "low", "priority": TaskPriority.LOW},
        "tasks.report.*": {"queue": "bulk", "priority": TaskPriority.BULK},
    }

    def route_for_task(self, task, args=None, kwargs=None):
        # Check kwargs for urgency override
        if kwargs and kwargs.get("urgent"):
            return {"queue": "critical", "priority": TaskPriority.CRITICAL}

        # Match by task name pattern
        for pattern, route in self.ROUTES.items():
            if fnmatch.fnmatch(task, pattern):
                return route

        return {"queue": "default", "priority": TaskPriority.NORMAL}

celery_app.conf.task_routes = [TaskRouter()]

User-Tier Priority

def submit_with_priority(
    task,
    args: tuple = (),
    kwargs: dict = None,
    user_tier: str = "standard",
    is_urgent: bool = False,
) -> AsyncResult:
    """Submit task with priority based on user tier."""
    kwargs = kwargs or {}

    if is_urgent:
        priority, queue = TaskPriority.CRITICAL, "critical"
    elif user_tier == "enterprise":
        priority, queue = TaskPriority.CRITICAL, "critical"
    elif user_tier == "premium":
        priority, queue = TaskPriority.HIGH, "high"
    else:
        priority, queue = TaskPriority.NORMAL, "default"

    return task.apply_async(
        args=args,
        kwargs=kwargs,
        queue=queue,
        priority=priority,
    )

Per-Queue Worker Configuration

# Critical: low latency, high concurrency, one-at-a-time prefetch
celery -A app worker -Q critical -c 8 --prefetch-multiplier=1 --hostname=critical@%h

# High priority: balanced
celery -A app worker -Q high -c 4 --prefetch-multiplier=2 --hostname=high@%h

# Default: standard processing
celery -A app worker -Q default -c 4 --prefetch-multiplier=4 --hostname=default@%h

# Low/Bulk: high throughput, batch prefetch
celery -A app worker -Q low,bulk -c 2 --prefetch-multiplier=8 --hostname=bulk@%h

Queue Depth Monitoring

import redis

def get_queue_stats(redis_url: str) -> dict:
    """Get queue depths for monitoring and autoscaling."""
    r = redis.from_url(redis_url)
    queues = ["critical", "high", "default", "low", "bulk"]

    stats = {}
    for queue in queues:
        depth = r.llen(queue)
        stats[queue] = {
            "depth": depth,
            "alert": depth > 1000,
        }
    return stats

def should_scale_workers(queue: str, threshold: int = 500) -> bool:
    """Check if queue depth warrants worker scaling."""
    stats = get_queue_stats(REDIS_URL)
    return stats.get(queue, {}).get("depth", 0) > threshold

Configuration Summary

SettingPurposeRecommended
x-max-priorityEnable queue priority10
priority_stepsRedis priority levelsrange(10)
queue_order_strategyRedis priority mode"priority"
prefetch_multiplierTasks per worker fetch1 (critical), 4-8 (bulk)
Queue countSeparation granularity3-5 queues
Edit on GitHub

Last updated on

On this page

Async JobsQuick ReferenceQuick StartConfigurationKey PatternsKey DecisionsTask RoutingKey PatternsKey DecisionsCanvas WorkflowsKey PatternsKey DecisionsRetry StrategiesKey PatternsKey DecisionsSchedulingKey PatternsKey DecisionsMonitoringKey PatternsKey DecisionsResult BackendsKey PatternsKey DecisionsARQ PatternsKey PatternsKey DecisionsTool SelectionAnti-Patterns (FORBIDDEN)Temporal WorkflowsKey PatternsKey DecisionsTemporal ActivitiesKey PatternsKey DecisionsRelated SkillsCapability Detailscelery-configtask-routingcanvas-workflowsretry-strategiesscheduled-tasksmonitoring-healthresult-backendsarq-patternsRules (6)Compose multi-step task workflows using Celery canvas primitives and chains — HIGHCanvas WorkflowsChains (Sequential)Groups (Parallel)Chords (Parallel + Callback)SignaturesMap and StarmapError HandlingTrack background job status and execution metrics for operational visibility — HIGHJob Status TrackingJob Status EnumARQ Status EndpointCelery Progress UpdatesCelery Status EndpointSchedule reliable periodic background tasks without overlap or timing drift — HIGHScheduling & Background TasksCelery Beat (Periodic Tasks)FastAPI BackgroundTasks (In-Process)FastAPI + Distributed QueueKey DecisionsSet up task queues as the foundation for reliable background job processing — HIGHTask Queue SetupARQ (Async Redis Queue)ARQ Task DefinitionCelery SetupCelery Task with RetryTool SelectionConfigure Temporal activity timeouts, heartbeats, and retry policies to prevent data loss — HIGHTemporal Activity and Worker PatternsWorker ConfigurationTesting with WorkflowEnvironmentKey DecisionsDefine deterministic Temporal workflows with correct signal and query patterns — HIGHTemporal Workflow DefinitionsSaga Pattern with CompensationKey DecisionsReferences (8)Arq PatternsARQ PatternsWorker SetupTask DefinitionFastAPI IntegrationEnqueue JobsJob Status EndpointDelayed TasksFastAPI Lifespan IntegrationFastAPI BackgroundTasks (In-Process)ARQ vs Celery vs BackgroundTasksWhen to Use ARQWhen NOT to Use ARQCanvas WorkflowsCanvas WorkflowsSignaturesChains (Sequential Execution)Chain Error HandlingGroups (Parallel Execution)Chords (Parallel + Callback)Chord with Partial Failure ToleranceMap, Starmap, and ChunksNested WorkflowsResult InspectionCanvas Best PracticesError Recovery StrategiesCelery ConfigCelery ConfigurationApplication SetupBroker ConfigurationWorker TuningConcurrency GuidelinesDocker ComposeConfiguration Anti-PatternsMonitoring HealthMonitoring & Health ChecksFlower DashboardCelery Signal MetricsHealth Check EndpointWorker InspectionQueue Depth MonitoringAlerting RulesKey Metrics SummaryResult BackendsResult BackendsRedis Result BackendCustom Task StatesProgress Tracking APILarge Result HandlingJob Status LifecycleRate LimitingStatic Rate LimitsDynamic Rate LimitingToken Bucket Rate LimiterRate Limit Patterns SummaryRetry StrategiesRetry StrategiesExponential Backoff with JitterManual Retry with Custom DelayBase Task with Retry ConfigurationIdempotency ProtectionDead Letter QueueTask Locking (Singleton Tasks)Retry Pattern SummaryScheduled TasksScheduled TasksBasic Beat ScheduleCrontab ReferenceDatabase-Backed SchedulesOverlap PreventionAdaptive PollingRunning BeatBeat MonitoringBest PracticesTask RoutingTask Routing & Priority QueuesQueue DefinitionStatic Task RoutingDynamic Router ClassUser-Tier PriorityPer-Queue Worker ConfigurationQueue Depth MonitoringConfiguration Summary