Testing complex Celery pipelines

Yair Morgenstern
4 min readJul 10, 2022

Some of our tasks involve complex pipelines running in Celery, often running multiple subtasks in parallel. One of our goals is to ensure our users can query our API with a job ID and receive the state and return values of that task, ignoring the underlying complexity.

To this end, we need to resolve what could be a complex situation in our tasks (one is finished, one errored, one restarting) into a few simple states for the overarching job: Pending, Running, Finished, or Failure.

The entire code can be found here

Starting out — a basic chain

Celery provides built-in app and worker fixtures, but its documentation lacks anything beyond the most straightforward task running.

This is already a big step forward — there’s a big difference between testing a function with inputs and seeing that it runs the desired code and seeing that it runs in Celery — but that’s not enough for our purposes.

To get started, let’s create our basic building blocks — 2 functions which we will then combine in various ways.

Since we want to ensure that both tasks happened sequentially in a specific order, we’ll have their inputs and outputs be lists of numbers, with each function adding a number to the result. That way, we’ll be able to tell by the final result what steps we passed along the way.

Since we’re trying to simulate a real environment, we added a sleep() command to the second task.

So let’s write our first test — let’s try chaining these together and see if they did indeed run sequentially!

A gets an empty list, adds 1, passes [1] to B, which adds 2 and returns [1,2] as the final result

If we send the task to execute asynchronously, we’ll see that its result is None — it won’t get a result until it fully resolves.
We can make it resolve by waiting for it explicitly, using result.get(). From here on, the stored result should not change.

Complex graphs and supplying IDs

Now that we’ve understood the fundamentals of testing our asynchronous tasks, let’s get serious!

Since we want our users to get a job ID and be able to retrieve the task status from there, we’ll want a single object that represents the final results.
In most of our cases, there will be multiple tasks running in parallel which do not interact further, so what we’ll return to the user will be the ID of a GroupResult.

In order to keep a unified ID format across our objects, we don’t want to use Celery’s autogenerated UUID, so we’ll need to tell celery to use our own ID. How we do this differs between use-cases.

If we’re applying a Group, we can pass this in as a parameter to apply_async.
For example, here we’re creating a Group of Chains — but since we’re still applying a Group in the end, we can pass the job ID directly.

Both chains operate completely independently

But what if we’re not applying the Group directly? In cases where we activate A once, and then activate many instances of B — for example, we bulk-save to the DB and then run post-processing on the items — our Group is only a linked task of the preceding tasks. In this case, we need to add the task ID to the options of the group:

The result of A is sent to all instances of B in the group as input

Errors and in-flight control

Now that we know we can handle complex task graphs, on to the meaty part — how can we simulate errors and check what happens if only part of the tasks finish?

Simulating error in our tasks is very simple — since they try to append to a given list, we can pass them a non-list parameter and they won’t be able to append to it.

In order for the exceptions from the task to not fail our test, we can’t just wrap it in a pytest.raises() — these functions are running asynchronously!
Instead, we tell Celery not to raise these errors, by setting propagate=False on the .get() function.

The group’s first child returns an AttributeError, and the second returns successfully

If we want to simulate what it looks like when only part of the tasks have finished, we need to wait for a specific one to finish. This is where the sleep() on the second tasks comes in handy — if it’s too low, both tasks may finish by the time we get to what we’re trying to assert!

Only the first child has finished, the second is still pending — hence only the result of the first is available

Conclusion

Celery’s documentation may lack the depth that real-world applications require, but this is certainly not for lack of support. Testing your infrastructure behavior when running tasks — and not just unit-testing the tasks themselves — is imperative for understanding the complex nature of distributed state you may find yourself in, and for ensuring you’ve thought out how your task graph acts under various situations.

--

--

Yair Morgenstern

Creator of Unciv, an open-source multiplatform reimplementation of Civ V