Asynchronous Task with RabbitMQ, Celery, and Django

Published May 24, 2017
Asynchronous Task with RabbitMQ, Celery, and Django

Installing RabbitMQ

RabbitMQ is a complete, stable, and durable message broker that can be used with Celery. Installing RabbitMQ on Ubuntu based systems is done through the following command:

$ sudo apt-get install rabbitmq-server
On Fedora or Red Hat based system, the command for installing is:
$ sudo dnf install rabbitmq-server
When the installation is complete, the broker will always start running in the background.

Installing Celery

Celery is on the Python Package index and can be installed using pip:
$ pip install celery

Sending Email as Background Task Using Celery

First, create a Django application for sending an email. Now, here’s how to configure your Django project to use Celery and RabbitMQ:

In your Django project directory, create a file by the name celery.py, and in this file, add the following code:

from celery import Celery

# Setting the Default Django settings module
os.environ.setdefault('DJANGO_SETTINGS_MODULE','celeryProj.settings')
app=Celery('celeryProj')

# Using a String here means the worker will always find the configuration information
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

In your project's settings.py file, add the following configuration information:

# For RabbitMQ
BROKER_URL = 'amqp://[ipaddress]'
CELERY_RESULT_BACKEND = 'amqp://[ipaddress]'
# Celery Data Format
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Kolkata'

Now, to ensure that the celery app is loaded when the Django app starts, add the following lines to the init.py file:

from __future__ import absolute_import
from .celery import app as celery_app

The Project Walk-Through

Here’s the code from forms.py that is used for creating the form in the template:

from django import forms

class FeedbackForm(forms.Form):
    name=forms.CharField(label='Name')
    email = forms.EmailField(label="Email Address")
    message = forms.CharField(label="Message", widget=forms.Textarea(attrs={'rows': 5}))

Creating a Celery task: Celery uses the concept of tasks, which are just functions that are called by Celery for scheduling. Create a separate tasks.py file and add the following code to it:

from celery.decorators import shared_task
from celery.utils.log import get_task_logger
from celeryapp.emails import send_feedback_email

logger=get_task_logger(__name__)

# This is the decorator which a celery worker uses
@shared_task(name="send_feedback_email_task")
def send_feedback_email_task(name,email,message):
    logger.info("Sent email")
    return send_feedback_email(name,email,message)

The code for sending the email is:

from django.core.mail import send_mail

def send_feedback_email(name,email,message):
    send_mail(name,message+" \n "+email,email,['recepients email'],fail_silently=False)

The configuration in the django's setting.py file for SMTP is as follows:

EMAIL_HOST = 'smtp.gmail.com'
EMAIL_HOST_USER='username'
EMAIL_HOST_PASSWORD='password'
EMAIL_PORT = 587
EMAIL_USE_TLS=True

The code from the views.py file that is calling the function for sending the email is as follows:

def index(request):

    if request.method=='POST':
        form=FeedbackForm(request.POST)
        if form.is_valid():
            name=form.cleaned_data['name']
            email=form.cleaned_data['email']
            msg=form.cleaned_data['message']
            # The delay is used to asynchronously process the task
            send_feedback_email_task.delay(name,email,msg)
            return HttpResponseRedirect('/')
    else:
        form=FeedbackForm
    return render(request,'index.html',{'form':form})

To run the task asynchronously, we have to run a Celery worker from the terminal.

celery -A celeryProj worker -l info

The worker will search for tasks in the broker queue and process it.

This will enable you to send periodic emails as automated tasks.

In the celery.py file, the beat schedule can be defined using the following entry

from celery import Celery
from celery.schedules import crontab

#The decorator is used for recognizing a periodic task
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):

    #Sending the email every 10 Seconds
    sender.add_periodic_task(10.0, send_feedback_email_task.s('Ankur','ankur@xyz.com','Hello'), name='add every 10')
  # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        send_feedback_email_task.s('Ankur','ankur@xyz.com','Hello'),)

#The task to be processed by the worker
@app.task
def send_feedback_email_task(name,email,message):
    send_feedback_email(name,email,message)
    logger.info("Sent email")

Now, run the worker and beat on separate terminals as follows:

For the worker:
$ celery -A celeryProj worker -l info

For the beat:
$ celery -A celeryProj beat -l info

Discover and read more posts from Ankur Rathore
get started
Enjoy this post?

Leave a like and comment for Ankur

2