Complete Django + Celery example with RabbitMQ broker for asynchronous task processing.
- Docker and Docker Compose installed
- Make utility
-
Copy environment file:
cp .env.local .env
-
Start all services (Django, PostgreSQL, RabbitMQ, Celery Worker, Flower):
make run
This will build images on first run and start:
- Django app:
http://localhost:8000 - PostgreSQL: Database on port
5432 - RabbitMQ: Broker on port
5672(Management UI:http://localhost:15672) - Celery Worker: Background task processor
- Flower: Task monitoring dashboard at
http://localhost:5555
- Django app:
-
Create superuser to access Django admin:
make createsuperuser
-
Access the application:
- Django Admin: http://localhost:8000/admin/
- Flower Dashboard: http://localhost:5555
Run make help to see all available commands:
make run- Start all servicesmake stop- Stop all containersmake bash- Access bash inside core containermake shell- Access Django shellmake migrate- Run database migrationsmake makemigrations- Create new migrationsmake reset-db- Reset database (⚠️ deletes all data)make rebuild-core- Rebuild core container from scratchmake rebuild-all- Rebuild everything
├── apps/
│ ├── client/ # Client app
│ │ ├── tasks.py # Celery tasks definitions
│ │ ├── admin.py # Admin actions that trigger tasks
│ │ └── views.py # Views (can trigger tasks)
│ ├── post/ # Post app
│ └── user/ # User app
├── core/
│ ├── celery.py # Celery configuration
│ └── settings/ # Django settings
├── docker-compose.yml # All services orchestration
├── Makefile # Helper commands
└── .env.local # Environment variables template
If you want to add Celery to your own Django project, follow these steps:
Add to your requirements/_base.txt or requirements.txt:
celery==5.3.6 # Core Celery library
django-celery-results==2.5.1 # Optional: Store results in DB
flower==2.0.1 # Optional: Monitoring dashboardInstall:
pip install -r requirements.txtAdd Celery configuration to core/settings/base.py:
# Celery Configuration
# https://docs.celeryproject.org/en/stable/django/first-steps-with-django.html
CELERY_BROKER_URL = env.str("CELERY_BROKER_URL") # RabbitMQ or Redis URL
CELERY_RESULT_BACKEND = env.str("CELERY_RESULT_BACKEND") # Optional: "django-db" to store results
CELERY_WORKER_PREFETCH_MULTIPLIER = env.int("CELERY_WORKER_PREFETCH_MULTIPLIER", default=1)
CELERY_RESULT_EXTENDED = env.bool("CELERY_RESULT_EXTENDED", default=False) # Optional: Extended result infoImportant notes:
CELERY_RESULT_BACKENDandCELERY_RESULT_EXTENDEDare optional - only needed if you want to store task results in the database- If using
django-celery-results, add"django_celery_results"toINSTALLED_APPS
Reference: core/settings/base.py:138-143
Create core/celery.py:
import os
from celery import Celery
from django.conf import settings
# Set default Django settings module
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings.local")
# Create Celery app
app = Celery("core")
# Load config from Django settings with CELERY_ namespace
app.config_from_object(settings, namespace="CELERY")
# Auto-discover tasks.py in all Django apps
app.autodiscover_tasks()Reference: core/celery.py
Modify core/__init__.py to import Celery app:
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)Create tasks.py in any Django app (e.g., apps/client/tasks.py):
from celery import shared_task
@shared_task
def my_async_task(param):
# Your task logic here
print(f"Processing: {param}")
return "Task completed"Magic: Celery automatically discovers any tasks.py file in your Django apps!
Reference: apps/client/tasks.py
Add to your .env file:
# Celery Configuration
CELERY_BROKER_URL=amqp://admin:admin@broker:5672/vhost # RabbitMQ
# OR
# CELERY_BROKER_URL=redis://localhost:6379/0 # Redis
CELERY_RESULT_BACKEND=django-db # Optional: Store results in database# Development
celery -A core.celery worker -l INFO
# Production (with more options)
celery -A core.celery worker -l INFO --concurrency 4 --max-tasks-per-child 100In Docker (see docker-compose.yml:53-70):
celeryworker:
image: django-example/core
command: celery -A core.celery worker -l INFO --concurrency 1
depends_on:
- db
- brokerFrom anywhere in your Django code:
from apps.client.tasks import my_async_task
# Asynchronous execution (non-blocking)
task = my_async_task.delay("parameter")
print(f"Task ID: {task.id}")
# With custom options
task = my_async_task.apply_async(
args=["parameter"],
countdown=10, # Execute after 10 seconds
)This project demonstrates the complete Celery workflow for asynchronous task processing:
Django View/Admin → Celery Task (.delay) → RabbitMQ Broker → Celery Worker → Result Backend (DB)
Message broker that queues tasks between Django and workers.
- Configuration (
.env.local):CELERY_BROKER_URL=amqp://admin:admin@broker:5672/vhost
- Docker service (
docker-compose.yml:20-30): RabbitMQ with management plugin - Ports:
5672- AMQP protocol15672- Management UI (http://localhost:15672, user:admin, pass:admin)
Main Celery application configuration.
app = Celery("core")
app.config_from_object(settings, namespace="CELERY")
app.autodiscover_tasks() # Finds tasks.py in all Django apps- Auto-discovers tasks in
tasks.pyfrom all registered Django apps - Loaded on Django startup via
core/__init__.py:3 - Namespace
CELERY_means all config uses this prefix in.env
Asynchronous task definitions using @shared_task decorator.
Example task (apps/client/tasks.py:7-17):
@shared_task
def collect_post_by_client(client_id):
client = Client.objects.get(id=client_id)
print(f'Request: {client.username}')
for i in range(1, 200):
Post.objects.create(
client=client,
source="twitter",
description=f"Lorem {i}"
)This task creates 200 posts asynchronously, preventing request timeout.
Stores task results and status in database.
- Configuration (
.env.local:23):CELERY_RESULT_BACKEND="django-db" - Package:
django-celery-results(installed inrequirements.txt:6) - Purpose: Track task status (PENDING, STARTED, SUCCESS, FAILURE) and retrieve results
Background process that executes tasks from the queue.
celeryworker:
command: celery -A core.celery worker -l INFO --concurrency 1- Runs in separate Docker container
- Shares same codebase as Django app
- Configuration:
--concurrency 1: Process 1 task at a time--max-tasks-per-child 1: Restart worker after each task (prevents memory leaks)--prefetch-multiplier 1: Don't prefetch tasks
Real-time monitoring dashboard for Celery.
flower:
command: celery -A core.celery flower
ports:
- "5555:5555"- Access: http://localhost:5555
- Features: Monitor tasks, workers, queue status, task history, and execution times
The project includes a Django admin action in apps/client/admin.py:10-19:
@admin.action(description="Run collect post task")
def collect_post(modeladmin, request, queryset):
for c in queryset.all():
collect_post_by_client.delay(c.id) # Sends task to broker
messages.add_message(
request,
messages.INFO,
f"Run collect post task for client {c.username}",
)Usage Steps:
- Go to http://localhost:8000/admin/client/client/
- Select one or more clients from the list
- Choose "Run collect post task" from the actions dropdown
- Click "Go"
- The task is sent to RabbitMQ and processed by Celery worker
- Monitor progress in Flower: http://localhost:5555
You can trigger tasks from any Django view. Here's a complete example:
# apps/client/views.py
from django.http import JsonResponse
from celery.result import AsyncResult
from .tasks import collect_post_by_client
def trigger_collect_posts(request, client_id):
"""Trigger async task to collect posts for a client"""
task = collect_post_by_client.delay(client_id)
return JsonResponse({
'task_id': task.id,
'status': 'Task sent to broker',
'client_id': client_id,
'monitor_url': f'http://localhost:5555/task/{task.id}'
})
def check_task_status(request, task_id):
"""Check the status of a running task"""
task = AsyncResult(task_id)
response_data = {
'task_id': task_id,
'status': task.status, # PENDING, STARTED, SUCCESS, FAILURE, RETRY
'ready': task.ready(),
}
if task.ready():
if task.successful():
response_data['result'] = task.result
else:
response_data['error'] = str(task.info)
return JsonResponse(response_data)Add to core/urls.py:
from django.urls import path
from apps.client import views
urlpatterns = [
# ... existing patterns
path('api/collect-posts/<int:client_id>/', views.trigger_collect_posts, name='trigger_collect_posts'),
path('api/task-status/<str:task_id>/', views.check_task_status, name='check_task_status'),
]Usage:
# Trigger task
curl http://localhost:8000/api/collect-posts/1/
# Response:
# {
# "task_id": "a7f3e0b2-5c8d-4e9f-b1a2-3d4e5f6a7b8c",
# "status": "Task sent to broker",
# "client_id": 1,
# "monitor_url": "http://localhost:5555/task/a7f3e0b2-5c8d-4e9f-b1a2-3d4e5f6a7b8c"
# }
# Check task status
curl http://localhost:8000/api/task-status/a7f3e0b2-5c8d-4e9f-b1a2-3d4e5f6a7b8c/
# Response:
# {
# "task_id": "a7f3e0b2-5c8d-4e9f-b1a2-3d4e5f6a7b8c",
# "status": "SUCCESS",
# "ready": true
# }make shellfrom apps.client.tasks import collect_post_by_client
# Trigger task
task = collect_post_by_client.delay(1)
print(f"Task ID: {task.id}")
# Check status
print(f"Status: {task.status}")
# Wait for result (blocking)
result = task.get(timeout=60)Here's what happens when you trigger a task:
- User Action: Admin selects a client and clicks "Run collect post task"
- Django: Calls
collect_post_by_client.delay(client_id)(non-blocking) - Celery: Serializes task (function name + arguments) and sends to RabbitMQ
- RabbitMQ: Stores task message in queue
- Celery Worker: Picks up task from queue and executes
collect_post_by_client() - Task Execution: Creates 200 posts in PostgreSQL database
- Result Storage: Saves task result/status in
django_celery_resultstable - Monitoring: View real-time progress in Flower dashboard
Timing:
- Django response: ~50ms (immediately returns after queuing)
- Task execution: ~5-10 seconds (running in background)
- Database operations: 200 inserts
Key Celery settings in .env.local:
# Celery Configuration
CELERY_RESULT_BACKEND="django-db" # Store results in PostgreSQL
CELERY_BROKER_URL=amqp://admin:admin@broker:5672/vhost # RabbitMQ connection
CELERY_WORKER_CONCURRENCY=1 # Number of worker processes
CELERY_WORKER_PREFETCH_MULTIPLIER=1 # Tasks to prefetch per worker
CELERY_WORKER_MAX_TASKS_PER_CHILD=10 # Restart worker after N tasksAccess http://localhost:5555 to:
- View active workers
- Monitor task queue
- Check task execution history
- See task success/failure rates
- Inspect individual task details
Access http://localhost:15672 (user: admin, pass: admin) to:
- View message queues
- Check connection status
- Monitor message rates
- Inspect queue bindings
docker logs django-example-celeryworker -fdocker logs django-example-core -fWorkers not processing tasks?
# Check worker status
docker ps | grep celeryworker
# Restart worker
docker restart django-example-celeryworkerRabbitMQ connection issues?
# Check broker is running
docker ps | grep broker
# Check environment variable
docker exec django-example-core env | grep CELERY_BROKER_URLTasks stuck in PENDING?
- Verify worker is running:
docker ps | grep celeryworker - Check worker logs:
docker logs django-example-celeryworker - Ensure broker URL is correct in
.env
Database errors in tasks?
- Check PostgreSQL is running:
docker ps | grep db - Verify database migrations:
make migrate
- Port: 8000
- Path:
/healthcheck/(if implemented) - Expected Status: 200