Testing complex Celery pipelines
Some of our tasks involve complex pipelines running in Celery, often running multiple subtasks in parallel. One of our goals is to ensure our users can query our API with a job ID and receive the state and return values of that task, ignoring the underlying complexity.
To this end, we need to resolve what could be a complex situation in our tasks (one is finished, one errored, one restarting) into a few simple states for the overarching job: Pending, Running, Finished, or Failure.
The entire code can be found here
Starting out — a basic chain
Celery provides built-in app and worker fixtures, but its documentation lacks anything beyond the most straightforward task running.
This is already a big step forward — there’s a big difference between testing a function with inputs and seeing that it runs the desired code and seeing that it runs in Celery — but that’s not enough for our purposes.
To get started, let’s create our basic building blocks — 2 functions which we will then combine in various ways.
Since we want to ensure that both tasks happened sequentially in a specific order, we’ll have their inputs and outputs be lists of numbers, with each function adding a number to the result. That way, we’ll be able to tell by the final result what steps we passed along the way.
So let’s write our first test — let’s try chaining these together and see if they did indeed run sequentially!
If we send the task to execute asynchronously, we’ll see that its result is
None — it won’t get a result until it fully resolves.
We can make it resolve by waiting for it explicitly, using result.get(). From here on, the stored result should not change.
Complex graphs and supplying IDs
Now that we’ve understood the fundamentals of testing our asynchronous tasks, let’s get serious!
Since we want our users to get a job ID and be able to retrieve the task status from there, we’ll want a single object that represents the final results.
In most of our cases, there will be multiple tasks running in parallel which do not interact further, so what we’ll return to the user will be the ID of a GroupResult.
In order to keep a unified ID format across our objects, we don’t want to use Celery’s autogenerated UUID, so we’ll need to tell celery to use our own ID. How we do this differs between use-cases.
If we’re applying a Group, we can pass this in as a parameter to apply_async.
For example, here we’re creating a Group of Chains — but since we’re still applying a Group in the end, we can pass the job ID directly.
But what if we’re not applying the Group directly? In cases where we activate A once, and then activate many instances of B — for example, we bulk-save to the DB and then run post-processing on the items — our Group is only a linked task of the preceding tasks. In this case, we need to add the task ID to the options of the group:
Errors and in-flight control
Now that we know we can handle complex task graphs, on to the meaty part — how can we simulate errors and check what happens if only part of the tasks finish?
Simulating error in our tasks is very simple — since they try to append to a given list, we can pass them a non-list parameter and they won’t be able to append to it.
In order for the exceptions from the task to not fail our test, we can’t just wrap it in a
pytest.raises() — these functions are running asynchronously!
Instead, we tell Celery not to raise these errors, by setting
propagate=False on the .get() function.
If we want to simulate what it looks like when only part of the tasks have finished, we need to wait for a specific one to finish. This is where the sleep() on the second tasks comes in handy — if it’s too low, both tasks may finish by the time we get to what we’re trying to assert!
Celery’s documentation may lack the depth that real-world applications require, but this is certainly not for lack of support. Testing your infrastructure behavior when running tasks — and not just unit-testing the tasks themselves — is imperative for understanding the complex nature of distributed state you may find yourself in, and for ensuring you’ve thought out how your task graph acts under various situations.