In Part 1 we explored the DevOps angle:

“How do you roll out a new version of a Django + Celery monolith without killing in-flight tasks?”

The answer was to let Celery shut down gracefully and bring up new workers alongside old ones. That’s already a big win.

But there’s another angle to this story: how you design your tasks themselves. That’s where Celery’s workflows come in, especially chains.

The problem with “giant tasks”

It’s tempting to write one big Celery task that does everything:

1
2
3
4
5
6
@shared_task
def process_video(video_id):
    extract_metadata(video_id)
    transcode(video_id)
    generate_thumbnails(video_id)
    notify_user(video_id)

It looks simple, but it causes headaches:

  • If the worker dies halfway, the whole job is lost.
  • If a deploy happens mid-task, the job gets killed.
  • You can’t monitor progress, it’s all or nothing.

That’s brittle, and it makes deployments risky.

Enter: Chains

Celery lets you break a big job into a sequence of smaller, linked tasks. This is called a chain. Each step only does one thing, then enqueues the next step.

Example:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from celery import shared_task, chain

@shared_task
def extract(video_id):
    # get metadata
    return {"video_id": video_id, "streams": ["1080p", "720p"]}

@shared_task
def transcode(ctx):
    # convert to desired formats
    ctx["transcoded"] = True
    return ctx

@shared_task
def thumbnail(ctx):
    # create thumbnails
    ctx["thumbs"] = True
    return ctx

@shared_task
def notify(ctx):
    print(f"✅ Video {ctx['video_id']} processed")
    return True

def start_pipeline(video_id):
    return chain(
        extract.s(video_id),
        transcode.s(),
        thumbnail.s(),
        notify.s()
    ).apply_async()

Now, each step is its own task.

Why chains help with deployments

Chains make deployments smoother:

  1. Short tasks finish quickly. If you deploy while extract is running, it’s done in seconds. By the time you stop the old worker, it’s already finished.

  2. The workflow continues on new workers. The next step (transcode) just lands in the queue. If new workers are already up, they pick it up. Old workers don’t need to hang around forever.

  3. No wasted work. If a task crashes or a worker dies, only that step retries. You don’t lose everything done so far.

  4. Better visibility. You can track progress step by step, instead of guessing where a giant task failed.

A concrete example

Let’s compare two scenarios.

Big task style

  • Deploy happens while process_video is running.
  • Worker dies.
  • The job is lost. User gets nothing.

Chain style

  • Deploy happens while transcode is running.
  • Worker finishes or retries.
  • Next step (thumbnail) is queued.
  • New worker (new code) takes over.
  • Job continues - no user impact.

Best practices with chains

To make this work well:

  • Keep tasks small and idempotent. Each should be safe to rerun if retried.
  • Persist state between steps (DB, Redis, S3, etc.), so you don’t rely on in-memory context.
  • Use retries. Add autoretry_for=(Exception,) and retry_backoff=True.
  • Control prefetch. Set worker_prefetch_multiplier = 1 so workers don’t hog a bunch of chained tasks.
  • Acknowledge late. Use acks_late=True so tasks aren’t marked “done” until they really finish.

Let’s expand a bit…

1. Persist state between steps

When you pass data from one task to the next in a chain, Celery serializes the return value of Task A (say a Python dict) and hands it to Task B.

That works fine for small, short-lived data e.g. an email address, a filename, a counter.

But there are pitfalls:

  • If your return value is large (like a full CSV, or a binary blob), it bloats the broker (Redis/RabbitMQ).
  • If your chain breaks (e.g. worker crash, task retries), you might lose context if you only kept it in-memory.
  • If you need to inspect progress later (in Django admin, monitoring, etc.), that data is gone if it was only passed inline.

👉 That’s why it’s recommended to persist the state in a durable store:

  • Database (Postgres/MySQL): store intermediate job state in a model (ImportJob, EmailJob, etc.).
  • Redis (separate keys from broker): good for temporary state you don’t want in the broker payload.
  • S3 / File storage: if you’re moving large files between steps.

Example: instead of returning the entire CSV rows from load_manifest, you:

  • save them in ImportJob.raw_file (DB or S3),
  • return just the job_id.

Each downstream task then queries the DB (or S3) for what it needs. This way your workflow can resume safely even if workers crash or you redeploy mid-way.

2. Use retries (autoretry_for + retry_backoff)

Background jobs run in a messy world:

  • Networks hiccup,
  • APIs throw 502s,
  • DB deadlocks happen.

If you don’t retry, your whole workflow breaks on one transient glitch.

Celery can do automatic retries for you:

1
2
3
4
5
6
7
8
9
@shared_task(
    autoretry_for=(Exception,),   # which errors trigger retry
    retry_backoff=True,           # exponential backoff: 1s, 2s, 4s, ...
    max_retries=5                 # optional safety cap
)
def fetch_url(url):
    r = requests.get(url, timeout=10)
    r.raise_for_status()
    return r.text
  • If the first attempt fails → Celery retries later.
  • The chain continues only once the retry succeeds.
  • If retries are exhausted, the error bubbles up → chain error handler runs (.on_error(...)).

This makes your chain much more robust without manual try/except boilerplate.

3. Control prefetch (worker_prefetch_multiplier = 1)

By default, each Celery worker process asks the broker for multiple tasks at once (the prefetch count).

  • Example: if concurrency=4 and prefetch_multiplier=4, each worker process can grab 16 tasks at once.
  • This can cause task hoarding: one worker reserves a whole chain of tasks, even if it’s only actively working on the first one.

Why is that bad for chains and deployments?

  • During a deploy, you want tasks to be distributed evenly between old and new workers.
  • If old workers prefetch a pile of tasks, they’ll keep holding onto them even if you’re trying to drain them.
  • It increases latency for other tasks, because they’re “reserved” but not yet running.

Setting:

1
2
# settings.py
CELERY_WORKER_PREFETCH_MULTIPLIER = 1

Means:

  • Each worker process only fetches as many tasks as it has free slots (e.g. 1 task per concurrency slot).
  • Tasks move smoothly between workers.
  • Graceful shutdown + chains play nicely: old workers only finish what they already started, and the rest is left for new workers.

TL;DR

  • Persist state: don’t rely on inline data between chain steps; store it in DB/Redis/S3 so your workflow survives crashes and redeploys.
  • Retries: let Celery auto-retry transient failures with backoff so a flaky API doesn’t break the whole chain.
  • Prefetch: set multiplier = 1 so workers don’t hoard tasks, making rolling deploys smoother and latency lower.

Zoom in Redis persistance

Before jumping to some code, let’s quickly clarify persistance in Redis - where it’s relevant and where not so much.

Redis can be used as a sidecar scratchpad for Celery workflows, separate from the broker payload. The broker (Redis or RabbitMQ) is meant to carry small task messages, not big blobs or transient work-in-progress data.

Here are some good candidates for Redis temporary state that you don’t want to shove into the task’s return value:

1. Large parsed-but-not-yet-processed data

Example: You download a 50 MB CSV file in step 1. Passing it inline to step 2 would mean serializing 50 MB into the broker payload → clogging Redis/RabbitMQ, slowing everything down.

Instead:

  • Step 1 downloads the CSV, puts the parsed rows into a Redis key: SET import:123:rows [...] (maybe compressed JSON).
  • Step 2 just gets the job_id, pulls rows from Redis.

Redis is fast, and the data can expire automatically after a TTL.

2. Intermediate progress counters

Example: In a long workflow (like sending 10,000 emails), you want to track how many have been sent so far.

  • Don’t pass the growing counter dict through each task’s return value.
  • Instead, keep a Redis hash: HINCRBY email_job:456 progress.sent 1.
  • Any task can update it, and you can show progress in Django admin.

3. Caching heavy computations between steps

Example: Step 1 queries an external API that returns a huge JSON (~5 MB). Step 2 and Step 3 both need that JSON.

  • Instead of re-fetching or passing it inline, Step 1 stores it in Redis under job:789:apiresponse.
  • Later steps pull it as needed.

4. Fan-out/fan-in scratch data (without chords)

Sometimes you split a job into many subtasks, then need to collect results. You don’t want each subtask’s payload bloating the broker.

Example:

  • Step 1 splits CSV into 100 chunks.
  • Each subtask parses its chunk and writes partial stats to Redis (HINCRBY job:111 stats.ok 42).
  • Final task just reads the aggregate from Redis and writes to DB.

5. Large binary blobs

Anything like PDFs, images, or video metadata. You don’t want this inside Celery’s broker messages.

Example:

  • Task extracts thumbnail images → stores them as base64 blobs in Redis with TTL.
  • Next step picks them up and sends them as email attachments.
  • If something goes wrong and the chain dies, Redis will auto-expire the blobs.

Why Redis works well for this

  • Fast and in-memory: great for ephemeral, high-volume data.
  • TTL support: auto-cleanup avoids bloat.
  • Separate from broker: you don’t overload Celery’s messaging layer.

A small code sketch

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
import redis, json
from django.conf import settings

r = redis.Redis.from_url(settings.REDIS_URL)

@shared_task
def step1_download(job_id, url):
    content = requests.get(url).text
    # store in redis with 1h TTL
    r.setex(f"job:{job_id}:raw", 3600, content)
    return job_id  # only pass lightweight ID in the chain

@shared_task
def step2_parse(job_id):
    raw = r.get(f"job:{job_id}:raw")
    if not raw:
        raise Exception("Missing raw data in Redis")
    rows = [line.split(",") for line in raw.decode().splitlines()]
    # store parsed rows
    r.setex(f"job:{job_id}:rows", 3600, json.dumps(rows))
    return job_id

Good candidate: big, short-lived, or aggregatable data (CSV rows, large API responses, binary blobs, counters). Not good: long-term business data (that belongs in Postgres or S3).

A real-world Django chain: importing a CSV

Let’s make it concrete with something you’ve probably faced in a Django project: users upload a CSV of data (say, product records or contacts). You want to parse it, validate rows, insert into the DB, and then send the user a summary email.

Here’s how a Celery chain helps:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from celery import shared_task, chain
from django.core.mail import send_mail
import csv, io
from myapp.models import Contact


@shared_task
def parse_csv(file_path):
    """Read CSV and return list of dicts."""
    with open(file_path, newline="") as f:
        reader = csv.DictReader(f)
        rows = [row for row in reader]
    return rows


@shared_task
def validate_rows(rows):
    """Filter out invalid rows (e.g. missing email)."""
    valid = [r for r in rows if r.get("email")]
    return valid


@shared_task
def insert_into_db(rows):
    """Bulk insert into Django model."""
    objs = [Contact(name=r["name"], email=r["email"]) for r in rows]
    Contact.objects.bulk_create(objs)
    return len(objs)


@shared_task
def send_summary(count, user_email):
    """Email the user with a report."""
    send_mail(
        "CSV Import Complete",
        f"✅ Imported {count} contacts successfully.",
        "noreply@example.com",
        [user_email],
    )
    return True


def start_import(file_path, user_email):
    return chain(
        parse_csv.s(file_path),
        validate_rows.s(),
        insert_into_db.s(),
        send_summary.s(user_email),
    ).apply_async()

What happens here?

  1. parse_csv reads the file and hands off rows.
  2. validate_rows cleans them up.
  3. insert_into_db writes to the database.
  4. send_summary emails the user.

Each step is its own Celery task, so:

  • If a deploy happens mid-import, the remaining steps just run on the new workers.
  • If a step fails, you retry only that step.
  • You can see progress in your Celery dashboard instead of treating it as one big black box.

This is the same philosophy we discussed earlier, but in a workflow you might actually ship in production.


Wrapping up

Celery isn’t just about running things in the background. It’s about composing workflows.

By breaking long jobs into chains of small, resilient tasks, you:

  • Make deployments safer (tasks hop naturally to new workers).
  • Reduce wasted work on crashes.
  • Gain better visibility and retry behavior.
  • Keep DevOps simple - no need for fancy orchestration.

If Part 1 was about how you deploy, Part 2 is about what you deploy. A bit of thought in task design goes a long way in making your system robust, scalable, and developer-friendly.