Suspense
Suspense is a way of putting the task to sleep - the current execution will be stopped and a new execution will be started, executing from the beginning of the task.
The suspense can be either explicit or implicit. In either case, it's important that it's safe for the code up to the point of suspense can be re-executed - i.e., any side-effects need to be idempotent. (An easy way to achieve this is to ensure that any tasks called by the execution are memoised.)
Suspense is useful as a way of freeing up resources used by a waiting execution.
Explicit suspense
To explicitly suspend an execution, simply call the suspend
function, passing either a delay (as a number of seconds, or as a datetime.timedelta
), or a future timestamp (as a datetime.datetime
):
@cf.workflow()
def my_workflow():
# (Some code that is safe to be re-run)
if not some_condition():
cf.suspend(60) # Restart the task in one minute
# (Do something)
It's important to suspend based on some condition that will eventually succeed, otherwise the task will be repeatedly suspended and re-run.
Implicit suspense
Implicit suspense is implemented by entering a 'suspense' context, optionally specifying a timeout. If the timeout is reached while waiting for the result of another execution, the execution will be suspended and automatically re-started (from the beginning) after the required result becomes available. If no timeout is specified, the execution will suspend if the result isn't available immediately.
With implicit suspense, the task will automatically wait for the dependency (the execution that is being suspended for) to complete before the original task is restarted.
@cf.task()
def create_order(user_execution, product_execution):
with cf.suspense():
# If the result isn't ready immediately, the `create_order` execution will suspend
user = user_execution.result()
# This is outside the suspense context, so there's no timeout
product = product_execution.result()
# ...
A timeout can be specified to give the execution time to complete before suspending:
with cf.suspense(10):
# The execution is given 10 seconds to complete before suspending
user = user_execution.result()
It's important that any tasks that are called within the suspense block or before it are memoised (or cached). Otherwise the task is likely to keep suspending as a new task will be spawned on each execution.