Asynchronous Tasks using Celery with Django

Published Aug 23, 2016Last updated Jan 18, 2017

Prerequisites

Introduction

Celery is a task queue based on distributed message passing. It is used to handle long running asynchronous tasks. RabbitMQ, on the other hand, is message broker which is used by Celery to send and receive messages. Celery is perfectly suited for tasks which will take some time to execute but we don’t want our requests to be blocked while these tasks are processed. Case in point are sending emails, SMSs, making remote API calls, etc.

Target

  • For this tutorial, we will start a worker in Celery to process long tasks and keep track of tasks as they transition through different states.
  • Let’s go!

Using the Local Django Application

Local File Directory Structure

We are going to use a Django application called mycelery. Our directory is structured in this way. The root of our django application is ‘mycelery’.

  • mycelery
    • manage.py
    • mycelery
      • __init__.py
      • settings.py
      • urls.py
      • Wsgi.py

Add the lines in the settings.py file to tell the Celery that we will use RabbitMQ as out message broker and accept data in json format.

BROKER_URL = 'amqp://guest:guest@localhost//'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
  • CELERY_ACCEPT_CONTENT is the type of contents allowed to receive.
  • CELERY_TASK_SERIALIZER is a string used for identifying default serialization method.
  • CELERY_RESULT_SERIALIZER is the type of result serialization format.

After adding the message broker, add the lines in a new file celery.py that tells Celery that we will use the settings in settings.py defined above.

from __future__ import absolute_import
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mycelery.settings')
    
from django.conf import settings
from celery import Celery
    
app = Celery('mycelery',
    backend='amqp',
    broker='amqp://guest@localhost//')
    
# This reads, e.g., CELERY_ACCEPT_CONTENT = ['json'] from settings.py:
app.config_from_object('django.conf:settings')
    
# For autodiscover_tasks to work, you must define your tasks in a file called 'tasks.py'.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
@app.task(bind=True)
def debug_task(self):
    print("Request: {0!r}".format(self.request))

Create tasks in Celery

Create an app named myceleryapp and make a tasks.py file in this app’s folder. All the tasks will be defined in this file.

In the tasks.py, we are just playing with a number so that it will be a long task for now.

from celery import shared_task,current_task
from numpy import random
from scipy.fftpack import fft
    
@shared_task
def fft_random(n):
    for i in range(n):
        x = random.normal(0, 0.1, 2000)
        y = fft(x)
        if(i%30 == 0):
            process_percent = int(100 * float(i) / float(n))
            current_task.update_state(state='PROGRESS',
                meta={'process_percent': process_percent})
    return random.random()

Using current_task.update_state() method, we can pass the status of the task completed to the message broker every 30 iterations.

Calling tasks in Django

To call the above task, the following lines of code is required. You can put these lines in your files from wherever you want to call them.

from .tasks import fft_random
job = fft_random.delay(int(n))

Import the method and make a call. That’s it! Now your operation is running in background.

Get the status of the task

To get the status of the task above, define the following method in views.py

# Create your views here.
def task_state(request):
    data = 'Fail'
    if request.is_ajax():
        if 'task_id' in request.POST.keys() and request.POST['task_id']:
            task_id = request.POST['task_id']
            task = AsyncResult(task_id)
            data = task.result or task.state
        else:
            data = 'No task_id in the request'
    else:
        data = 'This is not an ajax request'

    json_data = json.dumps(data)
    return HttpResponse(json_data, content_type='application/json')

Task_id is send to the method from the JavaScript. This method checks the status of the task with id task_id and return them in json format to JavaScript. We can then call this method from our JavaScript and show a corresponding bar.

Conclusion

We can use Celery to run different types of tasks from sending emails to scraping a website. In the case of long running tasks, we’d like to show the status of the task to our user, and we can use a simple JavaScript bar which calls the task status url and sets the time spent on the task. With the help of Celery, a user’s experience on Django websites can be improved dramatically.

Discover and read more posts from Udit Agarwal
get started
Enjoy this post?

Leave a like and comment for Udit

1
5
5Replies
Patricia Infante
5 months ago

Hello Sir, I am currently doing something alike but I’m using Django as results backend. What I did was manually set the progress percentage in update_state depending on which part of the task it is running - because I put some subprocesses there. Accessing the task status using AsyncResult, I don’t know how to repeatedly access the task status and let it show in the templates. After accessing the status, I plan to just pass it as part of context in the contents page. Hope for your help. :)

Udit Agarwal
5 months ago

You have to use a Javascript ajax call to hit a url which fetches the current status of the task

Stefan Milutinovic
7 months ago

Hi.

I have one question.

I am working with Djagno Rest Framework, and I want, on perform_create in APIView class to start some celery task.

I setup all, and there is no error, and when I create object using rest_api, there is still delay on perform_create.

Help?

Udit Agarwal
7 months ago

Hey! you should add a .delay after your function to get it to execute on Celery’s queue

FerumFlex
10 months ago

Hi, please check your code but it seems you have a bug.

in settings you set BROKER_URL, but other properties you set with prefix CELERY_. So I think they are not effect your application, because function config_from_object has special param and you can set up prefix for params. if you set prefix to CELERY you also need to change variable name to CELERY_BROKER_URL

Show more replies

Get curated posts in your inbox

Read more posts to become a better developer