Painting on a Distributed Canvas_ An Advanced Guide to Celery Workflows
All right my name is David golden I’m an employee at Heroku .I’m gonna be talking to you today about the celery canvas module.
celery’s ships with a lot of tools to compose tasks together into workflows. but as I’ve talked to people about their use of celery. A lot of people are either unfamiliar with these tools or they’re afraid of them because they’re not quite sure how they work or they don’t really trust them. so my goal today is to kind of pull back the card and show you of pull back the card and show you what’s going on with these workflow tools and show you how you can even extend and create your own.
but before we get to that I want to talk about some design principles of celery tasks in general . Apart from the canvas workflows.
In my opinion a good celery task is atomic. In a concurrent system, you don’t want to be leaking intermediate State out to other parts of the system. So you should have clear success fail scenarios where any state that changes, changes as immediately as possible and only when the task succeeds. That you know keeps your tasks from interfering with other tasks or their parts of your system.
I think tasks should also be idempotent(幂等). tasks will fail in creative ways . whether you know you lose a process or an API goes out or something like that you want to be able to pick up where you left off and be able to execute your task over again, and be able to execute your task over gain so that you don’t have to have manual intervention when a task fails. if your tasks are item potent you get this for free。
And for the purpose of our workflows. Tasks should be compositible. This is you know the same as in functional programming that you should think of your tasks like a function， that think of your tasks like a function， that have one concern take arguments for all of the state they need ，or ways to access that state and return something 。That’s
useful for other functions to consume 。
So a lot of people focus on their choice for the celery broker. but they overlook the result backend. as an architectural choice, the result backend is really important for these workflows.
so it’s worth taking some time to talk about what it does, and and which you should choose. First of all the result back-end is in charge of tracking task state.
So well when a task is sent to a broker, celery marks the task is pending. When you know , it sends started by a worker and mark started success or fail.
You need a result back-end in order to know what state a task is in and of course to know what it returned. what you know, value your object the function returned that the AtTask was running.
So which backend should you choose?
I implore you if you take away nothing else from this talk please do not use RabbitMQ as your results store.
What’s actually going on here is that, celery will use a message queue in rabbit. Because that’s rabbits only tools , So everything looks like a queue . As the result or a state, and when a client asks for that result or state, Rabbit will actually DQ the message from the Queue.
the first news asks where the task is asking will consume that and the next one who asks will get nothing. There’s nothing to get ,so the next time you ask for the task state, it says I don’t know.
It’s spending you can also use a database, for your solve store by using Djangoe celery and the Django ORM models are provided for results. But pulling in a database can be kind of expensive and it doesn’t support all the operations that make our workflows work well as we’ll see later.
also if you happen to be unfortunate enough to be using my SQL. You could have problems from your transaction isolation level, where you know one process sees, one one answer; and other sees another ,for like what state am I ?
Memcache is a pretty good choice for result backends , but it’s not persistent. so if you care about your result sticking around, you know when the process restarts then memcache is probably not your best choice for his old back-end.
it does however have atomic counters which are nice. it supports expiry for keys, for you don’t want your results to be hanging around indefinitely forever.
yeah but that assistance problem causes me to want to choose Redis normally for my result back-end. It has you know all the same properties as memcache with the exception of sharding ,but you know you can usually keep it pretty persistent. Aou could lose a little bit of data but generally not much.
All right so now let’s get to the actual building blocks for canvas itself. The most fundamental building block of the celery canvas tools is the signature.
it’s important to remember as we go forward that everything in the celery canvas module is based off of signature. so what is a signature ? A signature is simply a Serializable representation of a task.
IT is a subclass of a dictionary and so it contains the import path for the task itself; any arguments and keyword arguments that are need to be passed to that task and other options.
so you see we can JSON dumps a signature and then signature also takes and it’s in a dictionary. So we could we could dump it, load it back in, pass it to the signature to rehydrate (补充，补水)a signature and you can see how this would be useful when passing you know across the message bus；consuming the other side。this is basically how tasks serialization works in general。
so when you send a task over the wire it’s always creating a signature always passing that signature through the message broker and back to the worker to be consumed。
I’m sorry so？ sub task type we’ll get to a bit later。
but for for normal tasks，it’s none or null for the workflow types which aren’t actually a task but there is serialized representation of a workflow or a group of tasks，the sub task type is a key that tells celery which type of registered signature to use in order to reconstruct this signature object， so there’s a mapping in celery of the sub task type to
the signature class and if it’s Not a just creates a normal signature class。 But if there’s something like chain or group in there， then signature knows about those registered types and uses that subtask type to key ， of which signature subclass to rehydrate。
There are convenience methods provided on signatures ，in order to treat them ，much like you would tasks those pass through normally to the task，if it’s just a single task signature， so you can you can run delay，
you can run apply async。
There are also convenience methods off of tasks themselves which you can use to get a signature。I think in celery 1/3 it is still do not subtasks，but in three-two is signature, or dot s for short works in both three one and three two you can pass those methods arguments and keyword arguments that are then stored into the signature and serialized. you can even kind of build up arguments and and continue to add arguments keyword arguments with the exception of immutable signatures.
So signatures can be marked as immutable and then their arguments are frozen. This is useful in situations where something else is adding trying to add task arguments for you as we’ll see later.
if you want to prevent that from happening just make your signature mutable so now that we are able to serialize tasks we can begin to compose them by constructing tasks that that interact with other tasks.
The most simple unit of composition is a callback which is a task that is launched based on another task finishing. The result of the parent task is passed to the child as that tasks first argument.
So you can see here we have a couple of signatures for a task that just adds numbers together. But the second one only has a single argument. When we call this dot link, a method on the signature, you can see then when we convert it to a dictionary. The link option is set to the list of tasks that are linked to it , and so celery when it finishes the first task will say okay are there any links tasks for me to run and if so I’m going to run them, pass my results as an argument into that x desc.
So here the result of the child task is six. One plus two being three then added two three even six. But the results async result object we get back from actually applying async on the original signature is the parent tasks result, so result Doggett will be three and then we have to navigate down to the children property to see all of the dependents or child tasks and navigate to its result to see the result of the callback task.
We get one step farther down with chains. So chains are multiple tasks run in series and they are actually changed together via this callback mechanism. So think of a callback being you know one task launches another, a chain is one task went is another, launchs another for as many tasks as you specify.
So when we construct a chain we can give it any number of task. signatures you can see that so when we cast it to a dictionary of those then get put into a tasks keyword argument. the result from the chain is actually the async result of the last task in the chain that’s an important distinction because then the way you treat that result is different than you would for a noble callback.
it makes sense for a chain because when you chain things together you’re saying basically this is a set of dependencies on the last task and so you really want to know what is my end result, and so celery passes back to you they sync result of the last task in the chain ,and if you were to inspect at state you would see pending pending pending while the other tasks than chain were executing ,but it would have populated the parent properties ,so you could navigate using dot parent parent parent kind of back in the chain up to the very first task, and kind of inspect the state of the chain as it’s being executed if you wanted to.
so what’s going on under the hood here? These are not complete code slices just as much as I can fit on a slide , is that when you run the chain. it’s actually going through a process prepare all the steps by navigating through each of the tasks and assigning the task to the link of the previous task . So is taking the task before it and saying I want to link it to the current task ,and that creates this chain of callbacks.
Remember dot link is the way we assign one task to be a callback of another, and so then that’s the prepare steps returns all the tasks and their async result instances .
And the run method just takes the first one and calls apply async on it . It’s kind of like lining up all the dominoes and then just tipping the first one over and all the other dominoes you know continue to fall in series.
so next after chain we have Group. Group is entirely different it’s not built on callback or chain and it’s intended for a group of tasks that are meant to be run in parallel as opposed to in series.
Now one important distinction in difference between group and chain is with chain you care about the last result in the series because you want the end result. In group all of the tasks are treated as equals because they’re all run in parallel. so what celery decided to do was to create a new async result subclass called group result, whose dot results or dot children that’s, they’re both the same in group result represent the async result instances for each child tasks.
The group result you notice has its own UUID that is distinct from any of the UUIDs in the child tasks . That is important because normally when you ask celery for the result or state of job, it uses the task ID of the task UUID and it will track that in the result store. But there is no state for a group result because this UUID doesn’t actually map to a task that’s being executed it’s an abstract concept for a group of tasks that will be executed so you can see here the serialized version looks very similar to a chain .
The only real distinct difference is this set type Group so you know the signature is registering as type group. celery sees that group and rehydrates it as a as a group instance. yeah so if we iterate through the child results we get you know the results of each task .
There is a bit of coordination with celery here because of this distinction between an async result and a group result so celery has a lot of branching around. if this is a group through this thing if it’s just a regular async result do this other thing ,but you can see that apply async actually constructs a group result from the list of applied subtasks, this apply tasks generator ,that just goes through and applies a sync on the individual tasks of signatures and then yields each one’s a sync result so what you get is a group ID which is the UUID of the group result that I mentioned and then all of the async result instances from the subtasks
so now we’ve covered chains and groups what happens when you want to combine them. The cord is the most complex type of all of the celery canvas workflows and basically what it is is it’s a group changed to a callback so you have a group of tasks that are executing in parallel and when all of the tasks are done really when all of them have succeeded; then it launches this callback Task .
The group it calls the header, and the callback task it calls the body. so you can see the body and the header and the the dictionary version of the signature. so the body task actually gets a list of all of the results from each of the header tasks, and so this tasks dot T sum here would get a list as its argument passed to it, you know by celery and then in this case it would just sum all the numbers together. The result ,the async result returned from executing the cord is similar to the chain.
it’s the result for the body task so then you know when you call that result on it or dot state or whatever you’ll see this state or result from the body task and if you navigate back to its parent it’s it’s linked to the group result corresponding to the group of header tasks and so you have this kind of like asymmetric relationship here, where the async results parent is the group but the group’s children are all the group’s subtasks you can’t actually get from the group ,the header group back to the the body result so that’s kind of an odd family tree. But you can see that you can still inspect each task and results from the group get the individual results and then their sums together in this callback task. so this requires a fair bit of coordination from the result back ends in order to work .
the cord knows how many tasks are in its header group and so it stores that as its cord size . the the task tracer then any time a member of a group which is part of a cord complete successfully will react in some way based on the back end of the results store, that will determine whether or not the cords callback should be executed at any given point in time .
so as I said different backends work different ways and this is where the atomic counter comes in handy; because with memcache and Redis once the header group has been executed ,then anytime a member of that group succeeds ,it increments a counter based on that group UUID remember that group ID is distinct and then once that counter reaches the size of the cord then it knows that it’s okay to run the callback task. It can gather all the results from all the header tasks in the cord or in the group and pass those to the body callback .
For all the other backends celery actually launches a separate task whose only job is to pull the group for the results of its children; and if all the children are in a success state ,then it knows it can call the callback task. if they are not then it launches a retry and it will retry with a certain countdown. I can’t remember how long it is ,but it’ll it’ll wait and continue to pull and pull and pull until the group is finished. So you can see how it would be advantages if you want that callback to run quickly to use a result store that supports atomic counters .
So these are all of the primitives that celery provides by themselves for composing workflows. But if you wanted to you could have built something like chain for instance yourself without any support from celery. it’s just a subclass of signature. celery’s support of the link function for linking callbacks gives you everything that you need to chain them together.
so I have created something that I call a weave that is just an example of how you could extend the already existing primitives to create your own distributed workflows .
So a weave is just a custom signature subclass that takes a task and takes a numeric length of a list .It accepts a large list as its argument as its first argument and it splits it into smaller lists according to the list size that you specify. It passes each of the smaller lists into the task that you gave it and it takes the results of that task, it calls a joint task to join the subsequent smaller lists all together back into a single big list and returns that back to you.
so in this case range does exactly what you expect gives you a list of numbers ,and multiplied takes each member of the list and multiplies it by 2 and returns a list with each member multiplied by 2 ,and so you can see the kind of trace in the path of what the weave does in order to retrieve a list at the end ,that is the same as the original list with each member multiplied by 2.
There’s a little more code than I could easily fit on the slide here but you can see I call this class decorator- register type on the signature subclass and I provide a sub tab type.
sub task type of weave and that tells celery to register this signature as type weave. So any time it sees sub task type weave, it’s going to construct a weave instance.
I put weave as a second argument the init here to it that are represented the task that is associated with it, but since there is no real task associated with all these signatures, I could have put none there just as easily, because celery is never going to try to launch an actual task for a signature subclass.
That doesn’t represent have a one-to-one representation of an actual task. So in my apply async, basically what I’m doing , is I’m I’m taking an iterable from the args that I assume will be applied to the signature at some point later then it’s a knit , because I only take the task and the list size in the init, I create a cord out of this joint task that I uses the body and a group of header tasks that are cloned from my original task that I passed in ,you see down here at the end task clone and I provide it a nard that’s a subset of the original list ,as I chunk the list or the iterable into the iteration size that I want ,and so I end up with a group of tasks that are all the same tasks with different sub_lists passed to them ,and a join test that joins all the lists together and I create a court out of it and I return that cord and this is my custom, custom signature subclass .
so why would I do this?
well let me give you a practical example with a Twitter API integration.
The Twitter API allows you to ask for friends or follower IDs in groups of up to 5,000 IDs per API call and because we want to consume the API as efficiently as possible.
if we’re trying to get all of your friends or followers ,of course we would ask for the maximum number of IDs we can get . But if we want any other information on those users than their ID , we have to request the full user object and Twitter limits those calls to 100 IDs per call. So in order to get a full list of five thousand, five thousand user names we would have to make a single API call for all the IDS and then 50 API calls of one hundred a piece in order to get the user names out of the user objects .
You can see how this would be a natural fit for the weave concept that I’ve created .
so without any special coordination ,with just this weave signature subclass and those basic tasks, I can now construct a workflow which results in a single list of all of my Twitter friends user names based on using chaining this friend ID call, to a weave contain a user name task with size 100 . so I’m making calls of 100 IDs at a time to the users end point and getting the user objects extracting these user names and the join tasks joins them all back together into a single list . so this is just an example of the kinds of things that you can do with a celery canvas workflows by creating your own signature subclasses .
So it’s important to note what happens when one or more tasks fail and these workflows, because well it may be kind of unintuitive and it may be catastrophic depending on how you architect your task. so you need to know what happens when you run tasks in parallel it’s pretty be nine you can get the state success or fail for each of the tasks individually .
You can inspect the results remember in a group all tasks are created equal so nothing is going to prevent anything else running .you know you just see that some tasks failed and some succeeded.
If you’re running tasks in series then it’s kind of like you know flicking one Domino out of a series of dominoes right, like it takes the one to fall over to it the next one and if you’ve got a gap or one missing, you know that’s an analogy to a task failure it’s not going to knock over that next Domino and all of the subsequent tasks are going to fail to to run.
This makes logical sense because things in series are :are they creative dependency chain all right ,and so if you have a prerequisite that it’s not fulfilled, you can’t run the thing that that depends on it . But it’s just something to be aware of you should be very defense in your coding about trying to retry tasks that have transient failures until they succeed ,and understand any chaining or callbacks won’t run if the parent task fails .
so I’ve put together a few rules of thumb that aren’t directly related to workflows, but are kind of some best practices throughout my ears using celery.
The first one I feel like is pretty obvious never block on a task from a task or from any process really you’re. The whole idea of celery is it is meant to be concurrent and if you have one process waiting on another process to finish, and doing nothing then you are wasting your concurrency .So always find ways to compose tasks together rather than blocking on one task and getting the result in a synchronous manner before moving on with, with you know your other tasks with your other process .
That should be fairly obvious you want to try to design your tasks to do the smallest useful amount of work.
The reason I put useful in there is because there is a fair bit of overhead involved in serializing your tasks sending it over the network to your message broker, sending it back over the network as a worker consumes it from the worker deserializing before you even begin to run your function. If you do something like I heard one company once say that they were putting all of their database rights into celery tasks like one database. Right for tasks and they were seeing terrible performance of course , because the overhead of just doing the database right in the process is less than actually doing all that work to make it a task. But if you’re doing like an API integration or something like the Twitter example I showed. It’s a really good rule of thumb to have say you know like one API call per task or something like that the smallest useful unit work that you can create will give you the best return for your concurrency buck.
A lot of people don’t know about the soft and hard time limits and celery. But they can be super useful in kind of having some guarantee about how your worker pool resources will be used. If you have tasks that you know can in some circumstances take a really long time maybe because you know Facebook is slow right now ,or some some other resources is constrained; then it can be really useful to interrupt those tasks and if you make them idempotent , then there’s no downside of running them over again. And if you’ve already made them small amounts of work and then you haven’t lost a whole lot and see all these design principles begin to work together to create a system where you are using your concurrency well where you can fail without worrying about it . And then the time limits will give you good guidelines for good guarantees or how long your tasks will take at most before they just get axed it killed.
it’s important to set both soft and hard because as I’ve shown here some tasks don’t respond to the exception that gets raised for the soft time out and they have to actually be killed like when a thread is waiting to join .
So All right Acks_late is another underused feature in celery .what happens when a worker consumes a task from a broker is commonly that the broker will set a reservation for how long it will hold the task before reissuing it into the queue and to another worker; and the way that a worker says to the broker okay it’s it’s it’s alright for you to remove the task from the queue is by acknowledging or acking that it’s received that task .
Celery workers by a default will ack when they start working a task but you can conceive of workers dying mid process without being able to communicate back to the broker. celery will try to revoke the task and try to send it back to the queue but that’s not always possible; like if there’s a No work outage and/or power outage all kinds of things can happen and things will just die . So when you’re acting early you can lose tasks from workers have partially worked the task and then gone away weren’t able to revoke it so it never went back to the queue and now your task drops on the floor.
if you Acks late the worker will not act the task until it reaches the terminal state, either a success or failure and in that case you’re guaranteed that the task will be Re-execute ‘add if something happens to the worker. The risk is that if that reservation time on the broker expires before the worker can act it then it will go back into the queue and be worked over again by another worker. so you see how this could work and tend them with your soft and hard timeouts so that you can guarantee your execution window for a task and if you are making tasks idempotent and and quick and all that good stuff then actually will give you much better safeties against tasks being lost. midstream yeah that little note from the celery dock explains it pretty well.
there’s another one that I didn’t make a slide for but I want to cover. In celery 3.1 at least the default serialization method used is pickle. I don’t know if any of you guys saw Alex gainers talk at PyCon this past year. but pickle can do very bad things to software and it’s not only a security concern ,but you know say for instance that you want to send a django model instance as a celery task which is already a bad idea to begin with. if you’re using pickle and you know you, you set your task to the broker it serializes everything as well as it can ,and let’s say it’s in flight and you do a database migration or a deploy or something that changes the model somehow right!
the model instance will get unpick older or reloaded with its old class definition and you now have celery tasks that are running against some class definition that doesn’t match it and bad things will happen .This is the problem with pickle in general so the really easiest solution to this is to just use the json serializers ! Not only will that avoid all of the security concerns with arbitrary code execution that pickle potentially has, but it will force you to use task arguments that are JSON serializable and i feel like that’s just good for everybody.