Write a post

Asynchronous Tasks using Celery with Django

Published Aug 23, 2016Last updated Jan 18, 2017

Prerequisites

Introduction

Celery is a task queue based on distributed message passing. It is used to handle long running asynchronous tasks. RabbitMQ, on the other hand, is message broker which is used by Celery to send and receive messages. Celery is perfectly suited for tasks which will take some time to execute but we don’t want our requests to be blocked while these tasks are processed. Case in point are sending emails, SMSs, making remote API calls, etc.

Target

  • For this tutorial, we will start a worker in Celery to process long tasks and keep track of tasks as they transition through different states.
  • Let’s go!

Using the Local Django Application

Local File Directory Structure

We are going to use a Django application called mycelery. Our directory is structured in this way. The root of our django application is ‘mycelery’.

  • mycelery
    • manage.py
    • mycelery
      • __init__.py
      • settings.py
      • urls.py
      • Wsgi.py

Add the lines in the settings.py file to tell the Celery that we will use RabbitMQ as out message broker and accept data in json format.

BROKER_URL = 'amqp://guest:guest@localhost//'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
  • CELERY_ACCEPT_CONTENT is the type of contents allowed to receive.
  • CELERY_TASK_SERIALIZER is a string used for identifying default serialization method.
  • CELERY_RESULT_SERIALIZER is the type of result serialization format.

After adding the message broker, add the lines in a new file celery.py that tells Celery that we will use the settings in settings.py defined above.

from __future__ import absolute_import
import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mycelery.settings')
    
from django.conf import settings
from celery import Celery
    
app = Celery('mycelery',
    backend='amqp',
    broker='amqp://guest@localhost//')
    
# This reads, e.g., CELERY_ACCEPT_CONTENT = ['json'] from settings.py:
app.config_from_object('django.conf:settings')
    
# For autodiscover_tasks to work, you must define your tasks in a file called 'tasks.py'.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
@app.task(bind=True)
def debug_task(self):
    print("Request: {0!r}".format(self.request))

Create tasks in Celery

Create an app named myceleryapp and make a tasks.py file in this app’s folder. All the tasks will be defined in this file.

In the tasks.py, we are just playing with a number so that it will be a long task for now.

from celery import shared_task,current_task
from numpy import random
from scipy.fftpack import fft
    
@shared_task
def fft_random(n):
    for i in range(n):
        x = random.normal(0, 0.1, 2000)
        y = fft(x)
        if(i%30 == 0):
            process_percent = int(100 * float(i) / float(n))
            current_task.update_state(state='PROGRESS',
                meta={'process_percent': process_percent})
    return random.random()

Using current_task.update_state() method, we can pass the status of the task completed to the message broker every 30 iterations.

Calling tasks in Django

To call the above task, the following lines of code is required. You can put these lines in your files from wherever you want to call them.

from .tasks import fft_random
job = fft_random.delay(int(n))

Import the method and make a call. That’s it! Now your operation is running in background.

Get the status of the task

To get the status of the task above, define the following method in views.py

# Create your views here.
def task_state(request):
    data = 'Fail'
    if request.is_ajax():
        if 'task_id' in request.POST.keys() and request.POST['task_id']:
            task_id = request.POST['task_id']
            task = AsyncResult(task_id)
            data = task.result or task.state
        else:
            data = 'No task_id in the request'
    else:
        data = 'This is not an ajax request'

    json_data = json.dumps(data)
    return HttpResponse(json_data, content_type='application/json')

Task_id is send to the method from the JavaScript. This method checks the status of the task with id task_id and return them in json format to JavaScript. We can then call this method from our JavaScript and show a corresponding bar.

Conclusion

We can use Celery to run different types of tasks from sending emails to scraping a website. In the case of long running tasks, we’d like to show the status of the task to our user, and we can use a simple JavaScript bar which calls the task status url and sets the time spent on the task. With the help of Celery, a user’s experience on Django websites can be improved dramatically.

Discover and read more posts from Udit Agarwal
get started
Enjoy this post?

Leave a like and comment for Udit

1
5
Data Science with Python & R: Data Frames I
Data Science with Python & R: Dimensionality Reduction and Clustering
Counting Word Frequency in Python