Codementor Events

Asynchronous Task with RabbitMQ, Celery, and Django

Published May 24, 2017
Asynchronous Task with RabbitMQ, Celery, and Django

Installing RabbitMQ

RabbitMQ is a complete, stable, and durable message broker that can be used with Celery. Installing RabbitMQ on Ubuntu based systems is done through the following command:

$ sudo apt-get install rabbitmq-server
On Fedora or Red Hat based system, the command for installing is:
$ sudo dnf install rabbitmq-server
When the installation is complete, the broker will always start running in the background.

Installing Celery

Celery is on the Python Package index and can be installed using pip:
$ pip install celery

Sending Email as Background Task Using Celery

First, create a Django application for sending an email. Now, here’s how to configure your Django project to use Celery and RabbitMQ:

In your Django project directory, create a file by the name celery.py, and in this file, add the following code:

from celery import Celery

# Setting the Default Django settings module
os.environ.setdefault('DJANGO_SETTINGS_MODULE','celeryProj.settings')
app=Celery('celeryProj')

# Using a String here means the worker will always find the configuration information
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

In your project's settings.py file, add the following configuration information:

# For RabbitMQ
BROKER_URL = 'amqp://[ipaddress]'
CELERY_RESULT_BACKEND = 'amqp://[ipaddress]'
# Celery Data Format
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Kolkata'

Now, to ensure that the celery app is loaded when the Django app starts, add the following lines to the init.py file:

from __future__ import absolute_import
from .celery import app as celery_app

The Project Walk-Through

Here’s the code from forms.py that is used for creating the form in the template:

from django import forms

class FeedbackForm(forms.Form):
    name=forms.CharField(label='Name')
    email = forms.EmailField(label="Email Address")
    message = forms.CharField(label="Message", widget=forms.Textarea(attrs={'rows': 5}))

Creating a Celery task: Celery uses the concept of tasks, which are just functions that are called by Celery for scheduling. Create a separate tasks.py file and add the following code to it:

from celery.decorators import shared_task
from celery.utils.log import get_task_logger
from celeryapp.emails import send_feedback_email

logger=get_task_logger(__name__)

# This is the decorator which a celery worker uses
@shared_task(name="send_feedback_email_task")
def send_feedback_email_task(name,email,message):
    logger.info("Sent email")
    return send_feedback_email(name,email,message)

The code for sending the email is:

from django.core.mail import send_mail

def send_feedback_email(name,email,message):
    send_mail(name,message+" \n "+email,email,['recepients email'],fail_silently=False)

The configuration in the django's setting.py file for SMTP is as follows:

EMAIL_HOST = 'smtp.gmail.com'
EMAIL_HOST_USER='username'
EMAIL_HOST_PASSWORD='password'
EMAIL_PORT = 587
EMAIL_USE_TLS=True

The code from the views.py file that is calling the function for sending the email is as follows:

def index(request):

    if request.method=='POST':
        form=FeedbackForm(request.POST)
        if form.is_valid():
            name=form.cleaned_data['name']
            email=form.cleaned_data['email']
            msg=form.cleaned_data['message']
            # The delay is used to asynchronously process the task
            send_feedback_email_task.delay(name,email,msg)
            return HttpResponseRedirect('/')
    else:
        form=FeedbackForm
    return render(request,'index.html',{'form':form})

To run the task asynchronously, we have to run a Celery worker from the terminal.

celery -A celeryProj worker -l info

The worker will search for tasks in the broker queue and process it.

This will enable you to send periodic emails as automated tasks.

In the celery.py file, the beat schedule can be defined using the following entry

from celery import Celery
from celery.schedules import crontab

#The decorator is used for recognizing a periodic task
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):

    #Sending the email every 10 Seconds
    sender.add_periodic_task(10.0, send_feedback_email_task.s('Ankur','ankur@xyz.com','Hello'), name='add every 10')
  # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        send_feedback_email_task.s('Ankur','ankur@xyz.com','Hello'),)

#The task to be processed by the worker
@app.task
def send_feedback_email_task(name,email,message):
    send_feedback_email(name,email,message)
    logger.info("Sent email")

Now, run the worker and beat on separate terminals as follows:

For the worker:
$ celery -A celeryProj worker -l info

For the beat:
$ celery -A celeryProj beat -l info

Discover and read more posts from Ankur Rathore
get started
post commentsBe the first to share your opinion
opteamix bot
6 years ago

Hi Can you execute our periodic task by starting the Django server? instead of typing this commane evey time.
$ celery -A celeryProj worker -l info

For the beat:
$ celery -A celeryProj beat -l info

Ankur Rathore
6 years ago

Right now it’s not possible to run the Celery worker and beat with the Django server but what you can do is Daemonize the process once you go into production. You can find the steps required in this link http://docs.celeryproject.org/en/latest/userguide/daemonizing.html#daemonizing

Abhay Katiyar
6 years ago

Hi there
I found this article very helpful. Could you tell me how to daemonise the task in the background using celery?

Ankur Rathore
6 years ago

Thanks Abhay, I have not yet implemented daemonization yet but I am sure you can get useful information from the official celery docs http://docs.celeryproject.org/en/latest/userguide/daemonizing.html

Abhay Katiyar
6 years ago

Could you tell me how to specify the maximum retries in celery.py if a task fails ?

Ankur Rathore
6 years ago

There are various techniques for retries. You can specify a retry for a list or tuple of exceptions in the @app.task decorator. If any of these exceptions occur during execution of task, the task will be retired. It’s in this option you can specify the max_retries option. Like:

@app.task(autoretry_for=(Exception,),
retry_kwargs={‘max_retries’: 5})
def refresh_data(data):
return refresh_callData(data)

Hope that solves your problem.

Abhay Katiyar
6 years ago

Thank you so much.

Abhay Katiyar
6 years ago

Hi there

I am using django_celery_results as a backend which contain the id ,task_id,status,content_type,content_encoding,result,date_done,traceback,hidden,meta attribute of the task but it does not have any task name column in the table. I gave the name to each celery task and want to show its name and its status on the html page using django. Please let me know how can I use this model to get the name of the task because I am importing this as model in my views.py in django.

Show more replies