Codementor Events

Building a Broadcast Platform in Python

Published Mar 20, 2018Last updated Sep 16, 2018
Building a Broadcast Platform in Python

Abstract - In this article, we describe a development solution for data-streaming services. In contrast to Big-Data paid platforms, we show how a lean solution can be built with a simple, yet effective, 3-tiered architecture that results in a highly customizable software product. We show code samples in Python that use various state-of-the-art tools, such as Django, Django-channels, and websockets.

Edit - The work described in this article is now live at katiedj.com

Keywords: Python, Django, MVP, Websockets

Introduction

Scientists and software researchers often face a common situation when their work has brought some interesting results in terms of data analytics that they would like to share with the community.

In fact, open platforms, such as GitHub, allow users to publish software in return for visibility and (perhaps more importantly) feedback. Moreover, sharing datasets is a well known method for improving data quality as well as software analysis.

On the other hand, it is often the case that the source of data cannot be made public — for example, data can be generated by sensors (in this case we will speak of "real" data), or by a proprietary software system that generates a real time log (in this case we will speak of "synthetic" data).

One of the most interesting scenarios arises when a start-up has, almost unintentionally, built a lean product while building a larger one, and it becomes possible to get some initial revenue by selling the former. Selling it directly, though, would compromise the bigger start-up plans, potentially revealing secrets to competitors.

Thus, a solution is to build a broadcast system that keeps private the way data are generated, but sells access to the broadcast itself.

The above reasoning is the main motivation for our work. This article is going to idea-dense. Therefore, we start with a content overview make reading easier.

There will be three sections, each describing one part of the architecture (see also the figure below):

  • The broadcast system. We like to think about it as a radio that sends messages to whoever is listening to the right channels. However, to improve modularization of the entire system, the radio is also built in order to receive, from an external system, the data it will then broadcast. In our application, we built this module in Python, using Django and Django-channels.
  • The listener(s). They would be the clients, if we were talking about a standard client-server architecture — they are the agents listening to the radio channels. We built this part in Python, using the websockets module.
  • The data generator system. It is the private module whose implementation details are not shared with the listeners of the radio. It produces new data with an undisclosed software and then sends it over to the radio in a format that can be shared (sold, maybe). This part can be built in any language, or can even be a real system of sensors, or any “internet of things” device.

img.png

Before going into the details of the implementation, we would like to emphasize some of the good points of such an architecture:

  • The clear separation between the Data Generator and the Radio (i.e., the broadcast server), makes them highly flexible and customizable.
  • As a consequence, different people can work on the two modules: it is easy to imagine data scientists working on the Data Generator and web developers working on the Radio.
  • As a further consequence, the architecture is completely language independent: we said we built everything in Python, but in fact it is entirely possible to build the Data Generator in R or C++ (maybe if it is a high-computation system), and the Broadcast in Ruby or Java, just to name a few alternatives. The two modules only need to agree on the format of the exchanged data, which is likely to be JSON or XML.
  • As a business model, we said the most natural would be to sell the access to public channels to receive the data. In fact, even a more subtle model makes sense: selling the access to the private API that publishes the data.

Let us now continue the article by discussing more technical details about the implementation of the different modules.

The Broadcast Server (Radio)

The main objectives of the server must be to receive data from the Data Generator and to publish them in order to be retrieved by clients. Therefore, these requirements translate into:

  • An API endpoint that is used by the Data Generator each time a new sample is ready.
  • Public channels where the clients register in order to get the new data once they are published.

It is worth noticing that the two above points are based on very different behaviors: on one hand (first point), an API is used, which means it must be called by the other side. On the other hand (second point), data is simply sent to the broadcast, not matter who is listening to it.

Why such a difference? Simply put, it would be very expensive to build an endpoint that may be used by the listener, especially if there are a lot of connected listeners, then a standard REST API approach is not the best design choice — we used Websocket for it.

On the contrary, the Data Generator is a controlled system, therefore it is free to invoke a RESTful endpoint, whenever a new sample is ready to be consumed.

We used Python to build this module, and pure Django 2.0 for the API. We have a Django-app called publisher, whose urls.py file is very simple:

# urls.py
from django.urls import path
from . import views

app_name = 'publisher'

urlpatterns = [
    path('publish', views.PublisherView.as_view(), name='main'),
]

Notice that we use the function path, new in Django 2.0. This way, the endpoint is exposed to the /publish path. Let us now show the Django View that receives data from the Data Generator and publish to the channels.

# views.py
class PublisherView(View):

    def post(self, request):
        try:
            key = request.META['HTTP_API_KEY']
            if not key in settings.API_KEYS:
                return JsonResponse(
                    {'error': 'API-KEY not valid'}, status=400)
        except:
            return JsonResponse(
                {'error': 'API-KEY missing in headers'}, status=400)
        try:
            body = json.loads(request.body.decode('utf-8'))
        except:
            return JsonResponse(
                {'error': 'POST data is not JSON'}, status=400)
        try:
            group = body['network']
        except:
            return JsonResponse(
                {'error': '*network* key missing'}, status=400)
        if group not in settings.API_KEYS[key]:
            return JsonResponse(
                {'error': 'You cannot broadcast to this channel'}, status=403)
        if not 'data' in body:
            return JsonResponse(
                {'error': '*data* key missing'}, status=400)
        if not isinstance(body['data'], dict):
            return JsonResponse(
                {'error': 'Can only broadcast json data'}, status=400)
        # all tests are OK
        Group(group).send({'text': json.dumps(body['data'])})
        return JsonResponse({'message': 'OK'}, status=200)

Let us also comment the behavior of this View step by step:

  1. The endpoint is enabled for POST requests.
  2. The header of the request must contain a field called API-KEY. Moreover, this field must correspond to one of the enabled KEYs (stored in settings.API_KEYS). This a very standard way to protect an endpoint. Basically, only users who have got a secret personal alphanumeric key can really use the API.
  3. The body of the request must be in well-formed JSON format.
  4. The body must contain, in the network field, the name of a valid group, that is, the name of the channel to which data will be sent.
  5. The name of such a channels must be among the channels that are enabled for this particular API-KEY. This is to avoid that an authorized user sending data to a channel she should not be sending it to.
  6. The data to be broadcasted must be in JSON format.
  7. Finally, after all tests are passed, data is sent over to the channel. Notice that we use the Group object of the Django-channels package.

Let us now “connect the dots” and show the module that handles connections, that are the channels. As we have already mentioned, we used the Django-channels package, which basically implements the Websocket protocol in a very Django-ish way. In fact, using Django-channels, it becomes extremely easy to handle simple listener connections. Here is the entire module (very few lines!):

# consumers.py
from channels.generic import websockets

class MyBroadcast(websockets.WebsocketConsumer):

    http_user = True
    strict_ordering = False

    def connection_groups(self, **kwargs):
        """
        Called to return the list of groups to automatically add/remove
        this connection to/from.
        """
        return [self.channel_name]

    def receive(self, text=None, bytes=None, **kwargs):
        # Not talking!
        pass

class SampleBroadcast(MyBroadcast):
    channel_name = "sample_net"

To give more insight, every subclass of MyBroadcast simply needs to create the name of its own channel (sample_net in the above code), and the rest is automatically handled by the great job done inside the Django-channels package.

The Broadcast Listener

Even though the most interesting part of the architecture is the Broadcast Server, a Radio is useless when nobody listens to it. Therefore, in our concept, it is also important to provide ready-to-use client modules that users can simply download and run to get the data.

Ideally, these modules would be published (for example in GitHub) and then experienced users can customize the behavior according to their needs, submit merge requests, and contribute further.

Also, ideally, we would provide client modules in several languages (Python, R, MatLab, etc.) so that each user can use her favorite environment for data analysis.

Let us now show how to write a similar module in Python. We used the great package websockets, built on top of the standard library asyncio.

# radio_listener.py
class WSClient():

    def __init__(self, url):
        self.url = url
        # constant values, but can be passed as params
        self.reply_timeout = 10
        self.ping_timeout = 5
        self.sleep_time = 5

    async def listen_forever(self):
        while True:
            logger.debug('Creating new connection...')
            try:
                async with websockets.connect(self.url) as ws:
                    while True:
                        try:
                            reply = await asyncio.wait_for(ws.recv(), timeout=self.reply_timeout)
                        except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed):
                            try:
                                pong = await ws.ping()
                                await asyncio.wait_for(pong, timeout=self.ping_timeout)
                                logger.debug('Ping OK, keeping connection alive...')
                                continue
                            except:
                                logger.debug('Ping error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time))
                                await asyncio.sleep(self.sleep_time)
                                break
                        # Here do something with the data
                        logger.debug(‘Got data > {}’.format(reply))
            except socket.gaierror:
                logger.debug('Socket error - retrying connection in {} sec (Ctrl-C to quit)'.format(self.sleep_time))
                await asyncio.sleep(self.sleep_time)
               	continue
            except ConnectionRefusedError:
                logger.debug('Nobody seems to listen to this URL')
                logger.debug('Exiting...')
                break

The above code defines a class WSClient and uses standard techniques of asynchronous programming and connection retry when dealing with web applications.

In short, it waits for data from the server in an infinite loop, and every time the connection fails (or seems to fail), first sends a ping message, then restarts a new connection. As per good programming practice, almost every event is logged (using standard Python library logging, we intentionally skipped details about it).

The Data Generator

Although we kept the Data Generator as the last section in this article, this is very likely the core of the application. Even more than so, it is probably something that was built for a totally different reason, and only later it was realized that more people could be interested in using the same data.

As we mentioned earlier, the Data Generator can be practically anything: a system of sensors retrieving data from a system, a network of mobile phones sending data about GPS locations, and/or a software built by scientists that computes some complex mathematical model.

In our case, the Data Generator was a Python software capable of simulating vehicles’ behavior, predicting turnings at the intersections, taking into account traffic lights, and eventually computing average densities of vehicles in each street of large towns.

Basically, it is a road traffic simulator. The overall broadcast idea is actually based on the fact that we would like to share the actual data and not the mathematics behind the model. Interested readers can find a (very) comprehensive discussion in this manuscript (do not be mislead by the cover page: the manuscript is in English).

A simple approach for periodically publishing the data computed by the generator is the following:

import requests, json
import secret

def publish(net_name, jdata):
    url = secret.URL
    headers = {'content-type': 'application/json'}
    headers['API-KEY'] = secret.API_KEYS[net_name]
    payload = {'network': net_name}
    payload['data'] = jdata
    resp = requests.post(
        url,
        data=json.dumps(payload),
        headers=headers)
    return resp.status_code

def radio_publish():
    # initialize “net” object
    …
    #
    while True:
        jdata = net.dump_state()
        try:
            resp = publish(net.name, jdata)
            time.sleep(net.step)
        except:
            break
        net.next()
    logger.debug('Last published data is > {}'.format(jdata))
    return jdata

These two functions should be very clear when thinking back to the Broadcast Server API. Notice that, within the radio_publish method, there is an infinite loop that first calls net.dump_state to retrieve the last computed data sample, then it publishes it, and then calls net.next, to invite the computation a new sample. Different, more complex, paradigms are also possible, of course, such as event-based programming.

Conclusions and outlook

In this article, we have shown the journey towards building a data-streaming platform. We have expressed practical motivations by means of real-world business examples, and we have also shown the backbone of such a streaming platform built in Python.

We have underlined several times the flexibility given by our design choices: the net separations between the three modules allows fast prototyping and development as well as compartmentalization of team resource distribution.

Finally, numerous paths remain worth investigating further: what is the best infrastructure to support deployment of such an architecture, what is the best communication paradigm between the Data Generator and the Broadcast Server, and, perhaps more interestingly, how would the open-source community receive and contribute to this type of design?

Discover and read more posts from Pietro Grandinetti, PhD
get started
post commentsBe the first to share your opinion
Puxin Xu
6 years ago

Hi Pietro,
Thank you for sharing this post, would you mind to share your whole codes in Github? When I go through this I still feel some confusing due to the missing part.
Thank you!

Pietro Grandinetti, PhD
6 years ago

Hello there, thank you for reaching out!

Code is already in GitHub since quite some time. It seems frozen, but just because I am working on a new simulator, that I will later integrate.
So:

Looking forward to receiving more feedback of yours!
Cheers

Puxin Xu
6 years ago

Thank you!

Pietro Grandinetti, PhD
5 years ago

Hello, just wanted to let you know I have updated the README in the repo with a brief “developer’s guide”, after I received a request about how to clone and reproduce the server.
It’s not the main use case in my opinion (one only needs to clone and reproduce the listeners, to fetch data), but I can understand someone may want to build upon katiedj’s server.

Puxin Xu
5 years ago

Thank you!

Show more replies