Write a post

Enjoy this post? Give Samuel Morhaim a like if it's helpful.

2
2

Creating a simple, yet powerful queue system

Published Aug 11, 2015Last updated Feb 09, 2017

During the last several months we were faced with a challenging business requirement to run multiple micro services for thousands of requests in the most efficient way. Using a Queue was our first instinct (far better than creating a x-SQL table with your own statuses, creation dates, priorities, etc) and we were right - however there are way too many options out there.

Out of personal preference (and not a lot of technical backing) here are our findings:

  • RabbitMQ: Appears to be very powerful, fast and reliable - After months of using it, we were overwhelmed with the different options and terminologies. The learning curve was steep and we just needed a queue!

  • Amazon SQS: Excellent queue, with the legendary "you need a PhD to navigate AWS world" issues. However we were able to get it up and running in a very short period of time to our surprise. However, there is latency when getting messages and a documented "up to a minute or more" delay between publishing and consuming.

  • Beanstalkd: After some more research we ran into Beanstalkd. Stupidly simple and fast to setup, terminology is simple and gets the job done. Drawbacks: Absolutely no security (lock the ports and pray) - and I have yet to find a decent looking monitoring tool to help us track messages.

So I am going to show you how we set this up. I assume that you:

  • Know just enough about PHP
  • Basic working knowledge of composer
  • PHP OOP Understanding
  • You at least follow PSR-0 or use any decent framework.

Installing (Ubuntu)

aptitude install -y beanstalkd

Done. Go get some coffee. Good job.

Using it

First things first. We need what I call a publisher and then a number of consumers / workers.

Get this library into your composer:

"pda/pheanstalk": "~3.0.2",

I created a simple class called Queue\Publisher.php and Queue\Consumer.php:

namespace Queue;

use Pheanstalk\Pheanstalk;

class Publisher
{
    private $queue;

    public function __construct(array $args)
    {
        $this->config = ['queue' => ['host' => 'yourdomain.com']; // Don't mind this. I typically have a config file, but I moved it here for purposes of this tutorial. Use IP or Domain for the host. 

        $this->queue = $args['queue']; // Just a name for the queue. I pass this as a parameter. Beanstalkd calls the queues "tubes" but this really is just how you want to call it. If it doesn't exist, it gets created. 
        $this->client = new Pheanstalk($this->config['queue']['host']); // Instantiate an object. 

    }

    public function send($request)
    {
        return $this->client
            ->useTube($this->queue)
            ->put(json_encode($request)); // Send anything you want, json encoded and that's it!
    }
}

And for the consumer:

namespace Queue;

use Pheanstalk\Pheanstalk;

class Consumer
{

    private $queue;

    public function __construct(array $args)
    {
        $this->config = ['queue' => ['host' => 'yourdomain.com']; // Don't mind this. I typically have a config file, but I moved it here for purposes of this tutorial. Use IP or Domain for the host. 

        $this->queue = $args['queue'];
        $this->client = new Pheanstalk($this->config['queue']['host']);
    }

    public function listen()
    {
        $this->client->watch($this->queue); // Pass the name of the queue again. 

        while ($job = $this->client->reserve()) { // Do this forever... so it's always listening. 
            $message = json_decode($job->getData(), true); // Decode the message 

            $status = $this->process($message);

            if($status)
                $this->client->delete($job);
            else
                $this->client->delete($job); // Delete anyway. You could burry it, meaning it gets re-tried later. 

        }
    }

    public function process($msg)
    {
         // Do some operation and return true if success or false
    }

Finally of course you need to call this:

$publish = new Queues\Publisher(['queue' => "someQueue"]);
$publish->send("Do this..."); // This could be an array, a class name, etc.

Then in the other side, retrieve it

$consumer = new Queues\Consumer(['queue' => "someQueue"]); // Basically this will run via console usually.

And that's it! In our application we run PhalconPHP and we have the concept of Tasks, we use these tasks to run the Consumer while we use a Cronjob to "publish" messages. In the same way, we can run a large number of consumer tasks to process the queue faster if we need to.

For even more scalability, we could setup any number of servers with the same code, point the consumer to the same Beanstalkd IP/Domain and run even more consumers so we have virtually unlimited space to grow.

As a tip, we are running our tasks via Supervisor, so for example we have 10 workers running which clears our queues in about 15 minutes. (The delay takes place in the external API's we query for each of the thousand of messages we process). If your operations are local or you can control the performance of them, you could be processing hundreds of messages per second as needed.

Thank you.

Discover and read more posts from Samuel Morhaim
get started
Enjoy this post?

Leave a like and comment for Samuel

2
2

Subscribe to our weekly newsletter