OnlyOnePixel

Share this post

Celery Retries Made Easy

onlyonepixel.substack.com
Software Engineering

Celery Retries Made Easy

A Technical Guide to Mastering Task Retries

Uchechukwu Emmanuel
Dec 16, 2022
1
Share this post

Celery Retries Made Easy

onlyonepixel.substack.com

Celery is a popular task queue for Python that allows you to schedule and execute long-running tasks asynchronously. One of the challenges of using a task queue is handling failures. Even with robust error handling in your code, there are still many reasons why a task might fail. For example, a task might depend on an external service that is down, or there might be an issue with the task's input data.

In these cases, it can be useful to retry a failed task. Retrying a task can give the external service time to recover, or allow the issue with the input data to be resolved. In this article, we'll look at how to set up retry logic for Celery tasks.

Thanks for reading The Blue-Blooded Maverick! Subscribe for free to receive new posts and support my work.

Retrying a Task

The simplest way to retry a task in Celery is to use the retry decorator. This decorator takes a few optional arguments, the most important of which is the max_retries argument. This argument specifies the maximum number of times the task should be retried.

Here's an example of how to use the retry decorator to retry a task up to 3 times:

from celery import Celery

app = Celery('tasks', broker='amqp://localhost//')

@app.task(bind=True, max_retries=3)
def add(self, x, y):
    try:
        result = x + y
    except Exception as exc:
        self.retry(exc=exc)
    return result

In this example, the add task will be retried up to 3 times if it raises an exception. The exc argument to the retry method is optional, but it can be useful for debugging purposes. It allows you to pass along the exception that caused the task to fail, which can help you understand why the task is failing.

Custom Retry Policies

The retry decorator provides a simple way to retry tasks, but it has some limitations. For example, you can't specify a custom retry policy or delay the retry for a certain amount of time.

To overcome these limitations, you can use the @app.task(autoretry_for=...) decorator. This decorator allows you to specify a list of exception types that should trigger a retry and a dictionary of retry options.

Here's an example of how to use the autoretry_for decorator to retry a task if it raises a ConnectionError or a TimeoutError, and to retry the task with a 5-minute delay if it raises a ServiceUnavailableError:

from celery import Celery
from requests.exceptions import ConnectionError, TimeoutError, ServiceUnavailableError

app = Celery('tasks', broker='amqp://localhost//')

@app.task(autoretry_for=(ConnectionError, TimeoutError), retry_backoff=True, max_retries=3)
@app.task(autoretry_for=ServiceUnavailableError, retry_backoff=5*60)
def add(x, y):
    result = x + y
    return result

In this example, the add task will be retried up to 3 times if it raises a ConnectionError or a TimeoutError, with a delay between retries that increases exponentially (i.e., the retry_backoff option is set to True).

If the add task raises a ServiceUnavailableError, it will be retried with a fixed delay of 5 minutes (i.e., the retry_backoff option is set to 5*60 seconds).

Handling Task Failures

While retrying a task can be helpful in some cases, there may be situations where it's not possible or desirable to retry a task. For example, if a task depends on a third-party service that is permanently unavailable, retrying the task will not help. In these cases, it's important to have a way to handle task failures.

One way to handle task failures is to use the on_failure decorator. This decorator allows you to specify a function that should be called whenever a task fails. The function will be passed the task's request object, which contains information about the task, as well as the exception that caused the task to fail.

Here's an example of how to use the on_failure decorator to log task failures:

from celery import Celery

app = Celery('tasks', broker='amqp://localhost//')

def log_failure(request, exc, traceback):
    task_id = request.id
    task_name = request.name
    log.error('Task %s (%s) failed: %s\n%s', task_id, task_name, exc, traceback)

@app.task(bind=True, max_retries=3, on_failure=log_failure)
def add(self, x, y):
    try:
        result = x + y
    except Exception as exc:
        self.retry(exc=exc)
    return result

In this example, the log_failure function will be called whenever the add task fails. It will log the task's ID, name, and the exception that caused the task to fail.

Conclusion

In this article, we looked at how to set up retry logic for Celery tasks. We saw how to use the retry decorator to retry tasks a fixed number of times, and how to use the autoretry_for decorator to specify custom retry policies. We also looked at how to use the on_failure decorator to handle task failures.

Using these techniques, you can set up robust error handling for your Celery tasks, and ensure that they are executed successfully even in the face of failures.

Thanks for reading The Blue-Blooded Maverick! Subscribe for free to receive new posts and support my work.

Share this post

Celery Retries Made Easy

onlyonepixel.substack.com
Comments
TopNewCommunity

No posts

Ready for more?

© 2023 Uchechukwu Emmanuel
Privacy ∙ Terms ∙ Collection notice
Start WritingGet the app
Substack is the home for great writing