Codementor Events

Introduction to Stream Processing

Published May 28, 2018Last updated Nov 23, 2018
 Introduction to Stream Processing

Together with blockchain and machine learning, stream processing seems to be one of the hottest topics nowadays. Companies are onboarding modern stream processing tools, service providers are releasing better and more powerful stream processing products, and specialists are in high demand.

This article introduces the basics of stream processing. It starts with a rationale for why we need stream processing and how it works under the hood. Then it goes into how to write simple, scalable distributed stream processing applications. All in fewer than 40 lines of code!

Since stream processing is a vast topic, this article is focused mostly on the data management part while sophisticated processing is left for another article. To make the article more practical, it discusses AWS Kinesis, a stream processing solution from Amazon, but it also refers to other popular Open Source technologies to present a broader picture.

Why Stream Processing

To understand why stream processing came into existence, let's look into how data processing was done before. With the previous approach, called batch processing, all data was stored in a database or a distributed filesystem, and different applications would perform computation using this data. Since batch processing tools were built to process datasets of finite size, to continuously process new data, an application would periodically crunch data from the last period like one hour or one day.

batch-processing.png

While this architecture worked for many years and still has many applications, it has fundamental drawbacks. Since new data is not processed as soon as it arrives, this causes several issues:

  • High latency — new results are computed only after a significant delay, but since the value of data decreases with time, this is undesirable
  • Session data — since a batch processing system splits data into time intervals, it is hard to analyze events that started during the one-time interval but ended during another time interval
  • Non-uniform load — a batch processing system should wait until enough data is accumulated before it can process the next block of data

Stream processing, data processing on its head, is all about processing a flow of events. A typical stream application consists of a number of producers that generate new events and a set of consumers that process these events. Events in the system can be any number of things, such as financial transactions, user activity on a website, or application metrics. Consumers can aggregate incoming data, send automatic alerts in real time, or produce new streams of data that can be processed by other consumers.

stream-processing.png

This architecture has a number of advantages:

  • Low-latency — a system can process new events and react to them in real-time
  • A natural fit for many applications — stream processing system is a natural fit for applications that work with a never-ending stream of events
  • Uniform processing — instead of waiting for data to accumulate before processing the next batch, stream processing system performs computation as soon as new data arrives

Unsurprisingly, stream processing was first adopted by financial companies that need to process new information, like trades or prices, in real time, but is now used in many areas like fraud detection, online recommendations, monitoring, and many others.

This architecture, however, poses a question: how should producers and consumers be connected? Should a producer open a TCP session to every consumer and send events directly? While this may be an option, it presents a significant issue if a producer is writing data that a consumer can process. Also, if we have a significant number of consumer and producers, the web of connections can turn into an unruly mess.

This is exactly the problem that LinkedIn faced in 2008, when they ended up with a number of multiple point-to-point pipelines among multiple systems. To organize it, they started working on an internal project that eventually became Apache Kafka. In a nutshell, Kafka is a buffer that allows producers to store new streaming events and consumers to read them, in real time, at their own pace.

unified-log.png

Apache Kafka quickly became a backbone of modern stream processing applications by providing a data integration layer for decoupled event-driven applications. It allows for the easy addition of new consumers and producers and the building of more complex applications. Apache Kafka became so successful that other companies have built services with a similar design, like Azure Event Hubs and AWS Kinesis. The latter will be discussed in more detail in this article.

Append-only log

At first sight, Kafka and Kinesis do not seem like big deals, since alternatives like RabbitMQ have been around for many years. However, if we look under the hood of these systems, we will see that it’s designed differently and allows for the implementation of new use cases.

The foundation of modern stream processing software is a data structure called an append-only log. A log is just a sequence of binary records. Kinesis puts no restrictions on the content of those records — they can be in any format, like: JSON, Avro, Protobuf, etc.

An append-only log only allows for the addition of new elements to the end of the log. We can't insert new elements at arbitrary positions of the log and we can't remove elements at will. Every element in a log has a sequence number, and newer elements have a higher sequence number than older elements.

append-only-log.png

When a record is read by a consumer, it is not removed and can be read by other consumers as well. Kinesis does not keep track of what records are yet to be read by different consumers. Instead, this responsibility lies with a consumer. This is different from systems, like RabbitMQ. that behave like a queue and keeps track of the state of every consumer.

This architecture allows systems like Kinesis and Kafka to process an insane amount of transactions, with low-latency, far beyond what traditional messaging systems are capable of. It also allows consumers to process data that was written to the buffer before they started.

Keep in mind that Kinesis and Kafka do not make RabbitMQ obsolete and there are still many use-cases where RabbitMQ would be a better fit.

Retention period

At this point, you may be wondering, “Is data ever removed from a stream?” If we can't remove records from a stream sooner or later, we won't have enough space to store all of the new records.

In both AWS Kinesis and Apache Kafka, records have a retention period and are automatically removed when this period ends. In Kinesis, a record is automatically removed after 24 hours, but the retention period can be increased to up to seven days.

An extended retention period is especially beneficial if, for some reason, your system can't handle incoming data correctly, and you need to re-process it again. The longer the retention period, the more time you have to fix your production system and reprocess the log. The downside is that you have to pay more for additional storage.

Kafka allows specifying either maximum retention period or maximum retention size of all records. Default retention period is seven days, but it can even be infinite if the log compaction feature is enabled.

Scaling up

Systems like Apache Kafka and AWS Kinesis were built to handle petabytes of data. Since a single machine can never handle this load, they need to scale horizontally. To handle an immense load, both systems apply two well-known tricks to allow to handle more reads and writes: sharding and replication.

Sharding

In the case of stream processing, sharding means that we have more than one log. A single stream is split into multiple logs, each one being small enough so a single machine can handle it.

sharded-log.png

To determine what shard to use, we need to provide a key with each record. For each incoming record, Kinesis calculates a hash code of a key and the value of the hash code is used to determine what shard will process it. Each shard is responsible for a portion of the hash code values range, and if a hash code of a key falls within a shard's range, this shard will store a record associated with it.

multiple-shards.png

How do we come up with a key for a record though? We can select a random key and then our records will be uniformly distributed among our stream’s shards. We can also use a key from our problem's domain, such as a hostname, if we process metrics, or payer ID, if we process financial transactions. This will allow achieving order among records with the same key, since they will be directed to the same shard.

An important thing to notice is that because a log is split into multiple shards, global order of elements is not preserved. However, an order of elements that end up on the same shard is preserved.

But how many shards should we have? That depends on how many records we need to be able to write and read per second. The more data we need to process, the more shards we need to handle a flow of data. Because of this, we need to have an ability to increase and decrease a number of shards in our stream.

Kinesis allows splitting a single shard into two shards to process more records. If the number of records we need to process has decreased, we can merge two shards with adjacent hash key ranges and combine them in the single shard. This, in turn, will decrease both the throughput and our monthly bill.

Replication

As in the databases, world replication means that we maintain several copies of data, and consumers can read from any copy. In the case of Kinesis, every record has three copies in three different datacenters.

log-replication.png

This has two benefits. First, it makes a stream processing system more resilient to failure, since we can lose two copies of our data. Second, it allows transferring more data to consumers. This is reflected in Kinesis limits that allow to write only up to 1MB of data into a single shard and read up to 2MB of data from a shard.

Working with Kinesis

At this stage, you should have an understanding of how stream processing systems works and how it can process almost an infinite amount of incoming data. Now it is time to go through some actual code examples.

15261497267356.jpg
Source: https://bit.ly/2IQCFjM

In this section, we will see how we can use the low-level API to read and write stream data. We will implement a producer that will write metrics data into a stream and a producer that will read and process this stream. Our applications should do the following:

  • Create a stream
  • A producer who sends a new record to a stream when we have new data
  • A consumer who reads and process new records in an infinite loop

Just as with many other AWS services, when we use Kinesis, we do not need to provision hosts or install software. All we need to do is to perform several API calls, and the rest will be done for us. This is what makes Kinesis different from Kafka. Kafka requires a complex set up that is hard to get right.

To perform any operations with Kinesis, we create a Kinesis client first:

// Create an AWS Kinesis client
// You may want to set parameters such as AWS region, credentials, etc.
AmazonKinesis kinesis = AmazonKinesisClientBuilder
    .standard()
    .build();

Creating a stream

First, let's create a stream that we will use to connect our producer and consumer. All we need to do is to provide a name of our stream, an initial number of shards, and call the createStream method.

CreateStreamRequest createStreamRequest = new CreateStreamRequest();
// Set the name of a new stream
createStreamRequest.setStreamName("metrics-stream");
// Set initial number of shards
createStreamRequest.setShardCount(4);

client.createStream(createStreamRequest);

After we've called the createStream method, we need to wait until a new stream is activated. I've omitted this logic for brevity, but you can read a full code example in the AWS documentation.

Implementing a producer

Now, when we have a stream, we can write some data into it! Writing new data is not much harder. First, for every new record, we need to create a PutRecordRequest instance, set a stream name, a key, and data that we want to store in the stream. Then we need to call the putRecord method and pass our new record to it:

// A metric data that we need to send
Metric metric = ...;

PutRecordRequest putRecordRequest = new PutRecordRequest();
// Set a stream name. The same as we created in the previous example
putRecordRequest.setStreamName("metrics-stream");
// Set metric name as a key of the new record
putRecordRequest.setPartitionKey(metric.getMetricName());
// Finally we specify data that we want to store
putRecordRequest.setData(metric.toBytes());

try {
    kinesis.putRecord(putRecordRequest);
} catch (AmazonClientException ex) {
    // Don't forget to process an exception!
}

This code snippet sends a single record to a stream called metrics-stream.

Keep in mind that neither Kinesis nor Kafka guarantees 100% uptime! Both are complicated systems and many things can go wrong, from network failure to an execution of an invalid command on a production system. On top of that, AWS Kinesis can even throttle your requests if you attempt to write too much data in a short period of time. Hence, you need to be prepared — implement retry logic and buffer new records locally for a short period of time in case a stream processing system is unavailable.

Implementing a consumer

This is all for writing data. Reading data from a stream is a bit more complicated. In case of AWS Kinesis, the process looks like this:

  • Get an iterator to read data from a particular shard
  • Send a read request and provide an iterator received during the previous step
  • A read request returns records from a stream and a new iterator that we can use to send another read request to read the next batch of records

Here is how we can get an iterator to read records from a shard named shard-0001, from the beginning, for a stream:

// Create a request to get an iterator
GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
// Specify a name of the stream to read records from
getShardIteratorRequest.setStreamName("metrics-stream");
// Specify what shard to read from
getShardIteratorRequest.setShardId("shard-0001");
// Start reading from the oldest record
getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

// Get an iterator to read data from a stream from a specific shard
GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest);
// Iterator that we can use to read records from the specified shard
String shardIterator = getShardIteratorResult.getShardIterator();

In this example, we start reading from the oldest record in the stream, but Kinesis supports other iterator types:

  • AT_SEQUENCE_NUMBER — start reading from a record with the specified sequence number
  • AFTER_SEQUENCE_NUMBER — start reading after a record with the specified sequence number
  • AT_TIMESTAMP — start reading records from a specified timestamp
  • LATEST — start reading after the most recent record in the stream

Notice that we can only get an iterator to read data from a single shard. But how can we get an ID of a shard to read from? We can get a list of shards for a specific stream by using the ListShards method from AWS Kinesis that returns information about shards, including their identifiers:

ListShardsRequest listShardsRequest = new ListShardsRequest();
listShardsRequest.setStreamName("metrics-stream");

ListShardsResult result = kinesis.listShards(listShardsRequest);

for (Shard shard : result.getShards()) {
    // Get hash key range a shard is responsible for
    HashKeyRange hashKeyRange = shard.getHashKeyRange();
    // Returns first a range of records' sequence numbers in a stream
    SequenceNumberRange sequenceNumberRange = shard.getSequenceNumberRange();
    // Get a shard id that we can use to read data from it
    String shardId = shard.getShardId();
    // Get parent's shard id
    String parentShardId = shard.getParentShardId();
}

Now when we have an iterator, we can read data from a single shard. Since we don't expect that a stream of records will never end, we read records in an infinite loop. Every time we read a new batch of records from a stream, we get an iterator that we can use to perform the next read.

String shardIterator = ...;

while (true) {
    
    GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
    getRecordsRequest.setShardIterator(shardIterator);
    getRecordsRequest.setLimit(20); 
    
    GetRecordsResult result = client.getRecords(getRecordsRequest);
      
    // Put the result into record list. The result can be empty.
    List<Record> records = result.getRecords();
    
    for (Record record: records) {
        // Process each record
    } 
    
    Thread.sleep(200); 
    shardIterator = result.getNextShardIterator();
}

Why do we need to sleep for 200 milliseconds? This because each shard in Kinesis allows up to five read transactions per second so that we can read new records at most every 200 ms. The more consumers we have reading from a shard, the more each consumer should wait.
Since we used a metric name as a key in previous example, all metrics with the same name will end up on the same shard. This allows a reader of a shard to process all data for the same metric. In a way, this is similar to MapReduce, when a single reduce execution is processing records with the same key.

High-level API

So far we've figured out how to read data from a single shard, but usually, we want to process all records in a stream. Sure, we can start a thread per shard in a stream, but this, however, poses several problems:

  • What should we do if new shards are created in a stream?
  • How do we distribute the work of reading records from a stream among multiple machines?
  • If one of our machines reading from the stream failed, how do we ensure that we continue processing records correctly?

There are multiple stream processing systems that can process records from Kinesis or Kafka streams, such as Apache Spark, Apache Flink, Google Cloud Dataflow, etc., but these are sophisticated tools that are beyond the scope of this article. Here, we will look at how we can use a simpler framework called Kinesis Client Library (KCL).

To use it, we need to do two things:

  • Implement a processor that will process records from a single shard
  • Run KCL workers on one or more machines

This is it. KCL will ensure that all records from all shards are processed and that the work is evenly distributed among multiple machines.

Before we start, however let’s briefly talk about how KCL distributes the work. KCL distributes shards among multiple workers, and every worker controls several processors, with each one processing a shingle shard. There is no "master" node in this system, and every KCL worker is independent of other workers. The state of the system is stored in a DynamoDB table and is constantly updated by each worker. Every record in the DynamoDB table specifies a shard ID from a Kinesis stream, what processor is processing it, and what is the sequence number is of the last processed record in this shard.

kcl-worker.png

Periodically, every KCL worker gets a list of shards from a KCL stream and checks if there are any shards without an assigned processor. If it finds any unassigned shards, a worker creates a record in the DynamoDB table to "acquire" a shard and creates an instance of a processor to process it. If we scale down a stream and some of the shards are closed, KCL will detect this as well and decrease the number of processors to maintain a 1:1 relationship between processors and shards in a stream.

Implementing a processor

To implement a processor in KCL, we need to implement the following interface:


    // Called once before processor starts processing incoming records
    public void initialize(InitializationInput initializationInput) {

    }

    // Called to process a batch of records read from a Kinesis stream
    public void processRecords(ProcessRecordsInput processRecordsInput) {

    }

    // Called once before the processor is terminated
    public void shutdown(ShutdownInput shutdownInput) {

    }
}

Let's implement these methods one by one.

The main method that we need to implement is processRecords. It is called every time the KCL library has read a group of records, and it sends them to a processor. In this method, we need to iterate through a list of records, process every record (here we just print its content to console), and checkpoint our progress.

@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
    // Iterate through a list of new records to process
    for (Record record : processRecordsInput.getRecords()) {
                // First we need to deserialize a metric object
        Metric metric = parseMetric(record);
        // Now we can process a single record. Here we just print 
        // record to a console, but we could also send a notification
        // to a third-party system, write data to a database, calculate
        // statistics, etc.
        System.out.println(metric);
        
        // At last, we need to signal that this record is processed
        // Once a record is processed it won't be sent to our processor again
        checkpoint(processRecordsInput.getCheckpointer(), record);
    }
}

private Metric parseMetric(Record record) {
    ByteBuffer data = record.getData();
    // Parse binary data that was recorded to a stream
}

private void checkpoint(IRecordProcessorCheckpointer checkpointer, Record record) {
    try {
        checkpointer.checkpoint(record);
    } catch (InvalidStateException e) {
        // DynamoDB Table does not exists
        // We need recreate the table
    } catch (ShutdownException e) {
        // Two processors are processing the same shard
        // We need stop processing records in this processor
    }
}

There are multiple ways to checkpoint progress, and we should not necessarily do this after we've processed all records. Depending on our application, it can do this every N records or after we've processed a batch of records.

Notice that the checkpoint method can throw an exception in case DynamoDB table does not exist or if we have two KCL records processing the same shard.

The last method that we need to implement is the shutdown. This method is the last chance for our processor to checkpoint its progress, and this is exactly what we do here:

public void shutdown(ShutdownInput shutdownInput) {
    ShutdownReason reason = shutdownInput.getShutdownReason();
    switch (reason) {
        // Re-sharding, no more records in current shard
        case TERMINATE:
        // Application shutdown
        case REQUESTED: 
            checkpoint(shutdownInput.getCheckpointer(), lastProcessedRecord);
            break;
            
        // Processing will be moved to a different record processor
        case ZOMBIE:
            // No need to checkpoint
            break;
    }
}

The majority of work is done, but we also need to provide a factory object for KCL to create an instance of our processor:

public class MetricsProcessorFactory implements IRecordProcessorFactory {

    public IRecordProcessor createProcessor() {
      return new MetricRecordsProcessor();
    }
}

Running KCL

Now, when we have the all the parts in place, all we need to do is to start a KCL worker. We will launch this code on as many machines as we need to process our Kinesis stream.

// Configuration for a worker instance
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(
    // Name of our application
    "metrics-processor",
    // Name of stream to process
    "metrics-stream",
    new DefaultAWSCredentialsProviderChain(),
    // Name of this KCL worker instance. Should be different for different processes/machines
    "worker-1"
);

// Create a factory that knows how to create an instance of our records processor
final IRecordProcessorFactory recordProcessorFactory = new MetricsProcessorFactory();
// Create a KCL worker. We only need one per machine
final Worker worker = new Worker.Builder()
    .config(config)
    .recordProcessorFactory(recordProcessorFactory)
    .build();

// Start KCL worker
worker.run();

That is all we need to do to implement a distributed stream processing. KCL will launch a processor per Kinesis shard and will distribute load among multiple machines automatically.

Notice that KCL can launch additional threads. However, if you need more machines to process incoming records you need to take care of this yourself. If you use AWS, you can use auto-scaling groups to automatically add more machines if CPU utilization gets too high.

Code example

To make these concepts more practical, I've implemented a small GitHub project that shows how to use AWS Kinesis. You can look through the code and run working examples. It consists of several small appliations that create AWS resources, produce streaming data, and read data from the stream.

Conclusions

That was a brief introduction to the field of stream processing. Modern stream processing systems are based on the append-only log data structure that allows building a data ingestion infrastructure. Stream processing systems allow producers to write new records to a log and multiple consumers can read records from a log in parallel. We've also covered how to create a simple stream processing applications using AWS Kinesis

This article just scratched the surface of the topic. It didn't cover such interesting topics windows in stream, stream processing frameworks, a concept of time in streaming data, streaming SQL (sounds like an oxymoron, I know!), etc., and I hope to cover these and many other topics in upcoming articles.

In the meantime, if you liked this article, you can watch my deep dive Pluralsight course Developing Stream Processing Applications with AWS Kinesis, which covers the ins and out of AWS Kinesis.

Discover and read more posts from Ivan Mushketyk
get started
post commentsBe the first to share your opinion
Harun Yasar
3 years ago
client.createStream(createStreamRequest);

I think this should have been

kinesis.createStream(createStreamRequest);

Right?

Show more replies