Amazon Kinesis: the best event queue you’re not using

Instrumental receives a lot of raw data, upwards of 1,000,000 metrics per second. Because of this, we’ve always used an event queue to aggregate the data before we permanently store it.

Before switching to AWS Kinesis, this aggregation was based on many processes writing to AWS Simple Queue Service (SQS) with a one-at-a-time reader that would aggregate data, then push it into another SQS queue, where multiple readers would store the data in MongoDB. This batching queue allowed us to aggregate data from many writers into single operations process.

But as our incoming data volume grew, so did the amount of non-aggregatable data. Our single reader process would soon have more unique data than it could hope to process in real time. It also took longer than we desired for events to move through the pipeline.

Apache’s Kafka was a logical replacement to our old process, and is used by some of our competitors, but the operational burden was not something to take lightly. 

When AWS announced Kinesis we saw something that gave us everything we’d get from Kafka, plus resharding and zero system maintenance! Kinesis would also cost substantially less than both our historical system and alternative replacement systems. We’ve been running Kinesis in our production environment for a while now, and have been thrilled with the results.

Without an understanding of the difficulty of scaling queue systems, it may not be immediately obvious why some Kinesis features exist, or how to use them correctly. What follows is a rundown of the problems we’ve encountered with queue systems in the past and the lessons learned implementing Kinesis.

Scaling data ingestion is hard

In the beginning, you had one machine. That machine processes as much data as it can until it needs a queue. You add a queue. At first, you just have multiple processes putting data into the queue and a worker processing the data. Eventually, you need multiple workers. More workers puts more strain on the database, so you need bigger and faster databases. When it’s impractical or impossible to buy more database power, you batch a few writes together to optimize your database writes. Batching means you’ll have partial or duplicate writes, so you need fancy error handling. Eventually, your original queue can’t handle all the incoming data anymore, which leads to sharding the queue… Then life gets more complicated!

It’s a story as old as time. So let’s enumerate the problems and see how Kinesis can help you deal with:

Kinesis has solutions to every single one of these problems. It is a great example of how a few limitations can make an insurmountable problem into a completely deterministic, safe, and easy-to-run system.

How does it do that? With two features:

  • A router that maps items onto append-only queues
  • Always increasing identifiers attached to every item in the queue

It may be hard to see how this small set of features could solve all the problems above, but let’s explore an example evolution of a system to see how each one of these come into play.

Too much incoming data

Let’s add a queue!

This is almost always the first problem in a growing system. There’s more data coming in than a single process can handle. In our initial collector version, this was in the low 1000’s of ops per second to a single-threaded Ruby process.

At this point, you separate accepting data and persisting of data by running multiple data ingress points, and putting the data into a queue so you can accept data spikes far outpacing your ability to immediately persist them.

If you have a queue, you’ll need workers. At some point, those workers start straining your database. You’ll do the easy thing and upgrade the database box as much as possible, but eventually there are too many writes for one box.

Too many DB writes

Let’s batch some writes!

Now that you’re able to accept many more requests to write on your front end, you’ve hopefully isolated your incoming data rate from you overall application load levels. How did you do that? Unfortunately, by shedding a lot of load to your queue, workers, and ultimately to your database.

Intuitively, this makes sense. You’re still accepting the same amount of data, you’ve just changed where you deal with this problem: from when a customer arrives at your application to when a worker receives the data from the queue. And of course, as your database load rises, your workers take longer to insert data, which causes the queue to back up…

…and at some point you find yourself making calculations about which century it will be when your system could feasibly even catch up with all the outstanding data. At least, this is where Instrumental ended up pretty quickly.

There are two techniques we considered when reaching this point:

  1. Increase database capacity
  2. Increase database write efficiency

Increasing database capacity can happen via a number of routes, whether by scaling up (buying bigger database machines), scaling out (adding more nodes that can accept writes, or sharding), or shifting your database to one whose cost-per-write efficiency is dramatically better than your currentd database. Each of those options can carry relatively high costs, both in acquisition (buying more machines) and operations (adding shard awareness to your app and operational knowledge).

Alternatively, if you can increase write efficiency without changing how the system operates, you can get higher overall throughput while reducing the load that your system experiences. There isn’t a universally applicable technique here: the nature of your data will strongly affect what methods you can employ.

In our case, the incoming data was able to be merged logically and into a batch of database operations. This allowed us to reduce both the number of database writes, and the total amount of write traffic being sent to the database. As you might imagine, this dramatically increased our overall database throughput, such that incoming customer data appears on the page even faster.

An interesting byproduct of this batching is that it automatically balances latency against batching. If it’s very close to caught up, the batches become small and end-to-end latency is small. When queues grow, batches become much larger, and operations to the DB are decreased, this comes at a price of increased end-to-end latency.

Failures are tricky

Let’s make sure we can retry batches!

Things are working pretty well now at a scale beyond your initial hopes, but how do you handle the failure cases in a batch> What happens if a single write fails? If your database is in the process of failover and will be inoperable for 10 seconds? Your application code needs to restart? This machine gets isolated on the network? What happens when all of these occur simultaneously?

Network isolation is one of the most difficult situations to handle, which makes it one of the best to inspect to show how deep this hole can go. Here are a few different cases for handling this:

  1. We can try resubmitting failed data back into the queue, to be picked up by a new batch. In some cases this may be an appropriate strategy, but if the network is down, you won’t be able to reach the queue.
  2. We can use a queue that offers “deliver-at-least-once” semantics. Each queue item should have a timeout after it has been read that will cause the item to automatically re-enter the queue if the worker cannot successfully process the data. This is awesome, but now we have a big opportunity to duplicate written data into the stream. If the processor was 10% done, but we retry the whole batch, 10% of the data will be duplicated, but due to variable batch sizes and order of operations we will have a difficult time handling this situation.

There are many approaches to fixing this class of problems: let’s inspect the approach Kinesis takes and how their very simple approach provides incredible failure protection.

First, let’s accept that errors in our case means either overwriting (at-least-once) or underwriting (at-most-once) data. The appearance of exactly-once delivery is the ideal we’re after. (Even if it’s just an ideal!)

At its base, Kinesis gives you an ordered queue where each record has a unique and ever-increasing identifier (sequence number). Now these sequence numbers don’t look like 1, 2, 3; they’re actually huge numbers that can increase by millions between each record. It’s not important what these numbers are, just that they are integers, always increasing, and never repeated.

So, how does this help us handle errors?

There are a couple of approaches to making write-once guarantees, but mostly they revolve around having the final data store only do something if a certain condition is met (in other words, variations on a compare-and-swap operation), like:

  • Only insert this record if it doesn’t exist
  • Only update this record if this value is X

With these ever increasing and never repeating numbers, we can make a very simple system that ensures we never write the same data twice.

  • Only insert this record if it doesn’t exist
  • Only update this record if the new sequence id is > old sequence id

This may all be a bit difficult to visualize, so let’s look at some examples of operations happening on different machines, and how we arrive at our failure cases. First, let’s examine the sequence of operations that would lead to incorrectly duplicating data in our database:

Time Nodea Nodeb State
t0 increment a a = 1
t1 Network disruption, Nodeb takes over processing duties from Nodea
t2 increment a a = 2, duplicating Nodea‘s operation

So a is now set to 2 incorrectly, because the network was lost before it could write that it finished processing.

Now, let’s examine what happens if we introduce a compare-and-swap style operation using the sequence numbers provided by Kinesis:

Time Nodea Nodeb State
t0 increment a and set sequence id to 1 only if existing sequence number < 1 [a = 1, sequence = 1]
t1 Network disruption, Nodeb takes over processing duties from Nodea
t2 increment a and set sequence number to 1 only if existing sequence number < 1 [a = 1, sequence = 1]

Great, no duplicates! We can expand this idea from a single item being processed to a batch of items by using the sequence number of the last item in a batch, and picking a fixed batch size (let’s say 3 for simplicity). The basic technique here can also be modified to variable batch sizes mentioned above, but it muddies this example so tweet at us if you want hear about it.

Assuming we have a batch that contains 3 commands, increment a, increment b and increment c, our duplicate operation failure mid-batch looks like:

Time Nodea Nodeb State
t0 increment a a = 1
t1 increment b a = 1, b = 1
t2 Network disruption, Nodeb takes over processing duties from Nodea
t3 increment a a = 2, b = 1, duplicating Nodea‘s operation
t4 increment b a = 2, b = 2, duplicating Nodea‘s operation
t5 increment c a = 2, b = 2, c = 1

This is really bad. The closer the failure happens to the end of a batch, the more data will be duplicated. This might cause you to try to reduce batch sizes to reduce the window of failure possibility (bad!), which in turn causes your overall batch processing to be less efficient.

So, let’s apply our compare-and-swap operation technique using Kinesis sequence numbers and make it so that batches can be safely retried:

Time Nodea Nodeb State
t0 increment a and set sequence number to 3 only if existing sequence number < 3 [a = 1, sequence = 3]
t1 increment b and set sequence number to 3 only if existing sequence number < 3 [a = 1, sequence = 3], [b = 1, sequence = 3]
t2 Network disruption, Nodeb takes over processing duties from Nodea
t3 increment a and set sequence number to 3 only if existing sequence number < 3 [a = 1, sequence = 3], [b = 1, sequence = 3]
t4 increment b and set sequence number to 3 only if existing sequence number < 3 [a = 1, sequence = 3], [b = 1, sequence = 3]
t5 increment c and set sequence number to 3 only if existing sequence number < 3 [a = 1, sequence = 3], [b = 1, sequence = 3], [c = 1, sequence = 3]

So even when doing batching across multiple persistent records, the idempotency guarantees are preserved and easy to comprehend.

You may have noticed this guarantee does not hold if you have more than one processor gathering batches. If two processors were reading the same batch from the same queue (“at-least-once”, remember?) they may end up writing the database record. Due to our write-once logic, if the batch that is newer gets written first, the older data would be silently discarded. So these simple operations and guarantees are only provided reliably when a single processor is working on a queue. No sweat, we’ll handle that in the next section!

Lastly, Kinesis provides a checkpoint operation a client can send that says “I’ve processed all the data up until sequence ID X and I never need to see it again.” This is really an optimization so you can handle failures without incurring millions of operations that you know will be ignored. Once you’re sure you’ve reliably persisted or processed your data, checkpoint and you won’t have to process it again.

Too much to queue and process

Let’s shard some queues!

Alright, with failures squarely tucked away, the next problem is the issue of overwhelming a single queue system. Because this is all running on real physical hardware, at some scale a single machine is just not capable of handling any more. This is true for both queuing and processing, but you’re more likely to hit this issue first on processing because that operation is a more CPU-intense.

So the obvious solution is, what if we had more queues and just randomly push the data into one of N queues? Easy, right?

This is pretty much what Kinesis does except for the random bit, and we’ll get to that in a second. It’s worth noting that the limits for a single Kinesis queue are pretty low: 1MB/s write, 2MB/s read, and 1000 records/sec. This low limit gives these queues a high granularity to ease in the operational aspects.

The system is too difficult to understand

Let’s make data flow deterministic!

Oh no! our fancy error handling and batching requires that only a single processor can work on a bit of data at a time, so this random assignment of a record to a queue has undone all of the safety of our whole system. It’s not likely we’re able to keep the guarantees of sequence numbers across machines at high scale, so now everything we use a sequence number for is messed up.

Kinesis addresses this by making every queue responsible for a mutually exclusive subset of the data. That is, every record has a method for determining the queue it belongs to that is known in advance of it being queued. Kinesis calls this the “partition key.” Internally it md5s this key, then maps that md5 value into a mapping of the queues.

q 0 = [20, 216)q 1 = [216, 232)q 2 = [232, 248)q 3 = [248, 264)q 4 = [264, 280)q 5 = [280, 296)q 6 = [296, 2112)q 7 = [2112, 2128)

Kinesis allows you to assign the md5space to each queue however you like; there is no requirement that the space has to be split evenly. This lets you deal with some types of hotspots, but not all.

Warning: It’s worth noting this system will have problems with data that contains hotspots. It’s dividing the md5space into queues, and if you have a single shard that exceeds the queue limits, you will not be able to use Kinesis without figuring out another way to handle that hot spot.

A good way to look at this: the md5space represents some number of queues. These queues are not known to the inserting system, but Kinesis understands the relationship between the partition key and the destination queue. This is important for our next section.

The System is Too Hard to Scale

Let’s split those shards between workers!

Everything is great again, right? Yes, right up until the system needs to handle 2, 10, or 100 times the data volume. This is where Kinesis shows off some incredible design features. We’re going to subdivide our queue system into way more queues, and do it in a deterministic and safe way.

Essentially we’re going to take a single section of the md5space and split it from one queue into two; maintaining sequence ids, ordering, matchability and everything else all the while.

Kinesis considers the operation of a split like this:

  Shard 1 - > Shard 2 + Shard 3

It’s important to see that while Shard 2 + 3 cover the same md5space as Shard 1, they do so at different times. Only one queue is ever responsible for a place in the md5space at any given time. And when Shard 2 and 3 come up, they have sequence numbers that are larger than the largest sequence number in shard 1. Shard 2 and 3 are only allowed to start processing after shard 1 has completed processing entirely. Otherwise. there is a race condition for writes to the same record by two different processors on Shard 1 and Shard 2.

Merges are identical to splits, just in reverse; the same guarantees apply.

Another thing worth noting here is that only one split or merge operation can happen at a time on your stream, and they take about 15-30 seconds to apply. So as you have more shards, the time to increase the size of your whole stream goes up, because each shard may require it’s own split/merge operation. Practically, this is not a problem because you’ll be running with a reasonable amount of spare capacity to handle things like “fast catchup of processors” and “spikey” loads. Shards cost about $0.36/day, so it’s silly not to have a few shards of extra capacity in the system.

A System On Which it is Difficult to Add Features

Let’s duplicate the stream to different worker types!

An interesting feature of Kinesis is that checkpoints and workers are not concepts of the service, they’re things implemented in the client code of the Amazon libraries. In fact, checkpoints and worker leases are just records in a DynamoDB table the client library keeps for you. Here’s what a Kinesis metadata table look like:

Kinesis Metadata DynamoDB table
If you want to have more than one thing process the stream, you just tell it to use a different table name. and boom, you now have a duplicate, but completely separate and non-interfering stream, for the price of a single dynamo table with just a few records in it.

If you’re interested in learning more about this technique of delegating client state to a separate system, Kafka does very similar things with how clients coordinate state in Zookeeper.

Gotchas

Let’s admit tools are usually only good in the scenarios they’re designed for!

  • How it works best – Kinesis is a good fit for high-volume, analyzed data. It’s a bad fit for job queues, or things where a single record might be unprocessable; these are still better handled by job queue systems.
  • Variable batch sizes – Implementing variable batches is possible, but a little tricky. It can easily undo all the safety guarantees if you’re not careful.
  • Data Retention Limits – Data is only available in Kinesis for 24 hours, after which, it is discarded. If you want to do something with your data, you should get on that right away.
  • Usage Limits – You can only write 1000 records, totaling up to 1MB per second, and read up to 2MB per second. Splitting shards is the easiest way to avoid the limits, and it’s very safe and easy to do.

Addendum

Let’s tell you some other important things!

  • Worker Resilience – The client library automatically maintains relationships between workers and their shards. In fact, this idea is simple to see once you understand the idempotency guarantees of the base levels. Which is to say, design your workers so failures can correctly signal the Kinesis API when work needs to be transferred to another worker.
  • Worker Names for Fast Restart – We restart the processors on every application deploy, which in normal circumstances would cause a worker “lease” to timeout and failover to another host. By setting the workerName we can speed this operation up considerably by having each machine have a distinct worker name. Now when a restart happens, the client library will see the old lease for the worker, and just resume working on the shard with no failover required.
  • Error Handling Using Exists for the MultiLangDaemon – Occasionally the client code would like to gracefully handle a non-recoverable error. The MultiLangDaemon does not provide a method for this yet, but it’s very easy if you have something monitoring your MultiLangDaemon and restarting it if it dies. Calling exit(1) will cause your processor to die; the MultiLangDaemon will then decide things have gone sideways, and it will die, taking any other processors it’s responsible for with it. Then the MultiLangDaemon will restart, take over its old leases, and start up new workers. Hopefully this is something improved in the future, but it is more than good enough for most situations, only introducing enough latency for your processes to restart.

How Do I Make This Work?

This sure seems like a lot to understand and do!

There’s a lot of concepts involved in understanding how to apply Kinesis correctly to your application. Many of the concepts we’ve presented are techniques that, as your scale increases, you’ll very likely be employing whether or not you use Kinesis. We think Kinesis enables you to use these concepts safely and cheaply, without spending a lot of time rebuilding the infrastructure that gets you there.

In short, we’re pretty big fans, and would recommend Kinesis to other engineers working on processing streaming data. There’s plenty to enjoy both in its operation, and in observation of how it works!

Warning: sales pitch! Since you read this entire post, you’re probably the kind of person who wants to deeply understand the real-time performance of your application. We built Instrumental application and server monitoring for you. Also, feel free to tweet questions and comment at @Instrumental.

Instrumental Free Trial

Understanding what's happening with your software is only possible if you monitor it at the code layer. From agents to our metric-based pricing, we’re focused on making it easy to measure your code in real-time. Try Instrumental free for 30 days.