Celery is a popular task queue for Python that allows you to schedule and execute long-running tasks asynchronously. One of the challenges of using a task queue is handling failures. Even with robust error handling in your code, there are still many reasons why a task might fail. For example, a task might depend on an external service that is down, or there might be an issue with the task's input data.
In these cases, it can be useful to retry a failed task. Retrying a task can give the external service time to recover, or allow the issue with the input data to be resolved. In this article, we'll look at how to set up retry logic for Celery tasks.
Retrying a Task
The simplest way to retry a task in Celery is to use the retry
decorator. This decorator takes a few optional arguments, the most important of which is the max_retries
argument. This argument specifies the maximum number of times the task should be retried.
Here's an example of how to use the retry
decorator to retry a task up to 3 times:
from celery import Celery
app = Celery('tasks', broker='amqp://localhost//')
@app.task(bind=True, max_retries=3)
def add(self, x, y):
try:
result = x + y
except Exception as exc:
self.retry(exc=exc)
return result
In this example, the add
task will be retried up to 3 times if it raises an exception. The exc
argument to the retry
method is optional, but it can be useful for debugging purposes. It allows you to pass along the exception that caused the task to fail, which can help you understand why the task is failing.
Custom Retry Policies
The retry
decorator provides a simple way to retry tasks, but it has some limitations. For example, you can't specify a custom retry policy or delay the retry for a certain amount of time.
To overcome these limitations, you can use the @app.task(autoretry_for=...)
decorator. This decorator allows you to specify a list of exception types that should trigger a retry and a dictionary of retry options.
Here's an example of how to use the autoretry_for
decorator to retry a task if it raises a ConnectionError
or a TimeoutError
, and to retry the task with a 5-minute delay if it raises a ServiceUnavailableError
:
from celery import Celery
from requests.exceptions import ConnectionError, TimeoutError, ServiceUnavailableError
app = Celery('tasks', broker='amqp://localhost//')
@app.task(autoretry_for=(ConnectionError, TimeoutError), retry_backoff=True, max_retries=3)
@app.task(autoretry_for=ServiceUnavailableError, retry_backoff=5*60)
def add(x, y):
result = x + y
return result
In this example, the add
task will be retried up to 3 times if it raises a ConnectionError
or a TimeoutError
, with a delay between retries that increases exponentially (i.e., the retry_backoff
option is set to True
).
If the add
task raises a ServiceUnavailableError
, it will be retried with a fixed delay of 5 minutes (i.e., the retry_backoff
option is set to 5*60
seconds).
Handling Task Failures
While retrying a task can be helpful in some cases, there may be situations where it's not possible or desirable to retry a task. For example, if a task depends on a third-party service that is permanently unavailable, retrying the task will not help. In these cases, it's important to have a way to handle task failures.
One way to handle task failures is to use the on_failure
decorator. This decorator allows you to specify a function that should be called whenever a task fails. The function will be passed the task's request
object, which contains information about the task, as well as the exception that caused the task to fail.
Here's an example of how to use the on_failure
decorator to log task failures:
from celery import Celery
app = Celery('tasks', broker='amqp://localhost//')
def log_failure(request, exc, traceback):
task_id = request.id
task_name = request.name
log.error('Task %s (%s) failed: %s\n%s', task_id, task_name, exc, traceback)
@app.task(bind=True, max_retries=3, on_failure=log_failure)
def add(self, x, y):
try:
result = x + y
except Exception as exc:
self.retry(exc=exc)
return result
In this example, the log_failure
function will be called whenever the add
task fails. It will log the task's ID, name, and the exception that caused the task to fail.
Conclusion
In this article, we looked at how to set up retry logic for Celery tasks. We saw how to use the retry
decorator to retry tasks a fixed number of times, and how to use the autoretry_for
decorator to specify custom retry policies. We also looked at how to use the on_failure
decorator to handle task failures.
Using these techniques, you can set up robust error handling for your Celery tasks, and ensure that they are executed successfully even in the face of failures.