Pulling the Thread on Kafka's Compacted Topics

January 11, 2017 by

At Heroku, we’re always working towards improving operational stability with the services we offer. As we recently launched Apache Kafka on Heroku, we’ve been increasingly focused on hardening Apache Kafka, as well as our automation around it. This particular improvement in stability concerns Kafka’s compacted topics, which we haven’t talked about before. Compacted topics are a powerful and important feature of Kafka, and as of 0.9, provide the capabilities supporting a number of important features.

Meet the Bug

The bug we had been seeing is that an internal thread that’s used by Kafka to implement compacted topics (which we’ll explain more of shortly) can die in certain use cases, without any notification. This leads to long-term failures and instability of the Kafka cluster, as old data isn’t cleared up as expected.

To set the stage for the changes we made and the deeper explanation of the bug, we’ll cover what log compaction is briefly, and how it works:

Just What Are Compacted Topics Anyway?

In the default case, Kafka topics are a stream of messages:

[ a:1, b:2, c:3 ]

There is no way to change or delete previous messages, except that messages that are too old get deleted after a specified “retention time.”

Compacted topics provide a very different type of stream, maintaining only the most recent message of a given key. This produces something like a materialized or table-like view of a stream, with up-to-date values for all things in the key space. These compacted topics work by assigning each message a “key” (a simple Java byte[]), with Kafka periodically tombstoning or deleting messages in the topic with superseded keys, or by applying a time-based retention window. This tombstoning of repeated keys provides you with a sort of eventual consistency, with the implication that duplicate messages or keys may be present before the cleaning or compaction process has completed.

While this doesn’t give you real infinite storage – you now have to care about what keys you assign and how big the space of keys grows – it’s a useful primitive for many systems.

At Heroku we use compacted topics pretty sparingly. They’re a much more special purpose tool than regular Kafka topics. The largest user is the team who work on Heroku’s Metrics feature, where they power Threshold Alerts. Heroku Connect is also starting to use them.

Even when end users aren’t taking advantage of compacted topics, Kafka makes extensive use of them internally: they provide the persistence and tracking of which offsets consumers and consumer groups have processed. This makes them an essential part of the codebase, so the reliability of compacted topics matters a lot.

How Do Compacted Topics Really Work?

Given the goal of “removing duplicate keys”, how does Kafka go about implementing this? There are a few important elements. First is that, on disk, Kafka breaks messages up into “segments”, which are plain files:

my_topic_my_partition_1: [ a:1, b:2, c:3]
my_topic_my_partition_4: [ a:4, b:5]

This notation uses key:offset to represent a message, as these are the primary attributes being manipulated for this task. Compaction doesn’t care about message values, except that the most recent value for each key is preserved.

Secondly, a periodic process – the log cleaner thread – comes along and removes messages with duplicate keys. It does this by deleting duplicates only for new messages that have arrived since the last compaction. This leads to a nice tradeoff where Kafka only requires a relatively small amount of memory to remove duplicates from a large amount of data.

The cleaner runs in two phases. In phase 1, it builds an “offset map”, from keys to the latest offset for that key. This offset map is only built for “new” messages - the log cleaner marks where it got to when it finished. In phase 2, it starts from the beginning of the log, and rewrites one segment at a time, removing any message which has a lower offset than the message with that key in the offset map.

Phase 1

Data:

my_topic_my_partition_1: [ a:1, b:2, c:3]
my_topic_my_partition_4: [ a:4, b:5 ]

Offset map produced:

{
    a:4
    b:5
    c:3
}

Phase 2

my_topic_my_partition_1: [ a:1, b:2, c:3]
my_topic_my_partition_4: [ a:4, b:5 ]

Breaking this cleaning down message-by-message:

That’s the end of the first segment, so the output is:

my_topic_my_partition_1: [ c:3 ]

Then we clean the second segment:

That’s the end of this segment, so the output is:

my_topic_my_partition_4: [ a:4, b:5 ]

So now, we’ve cleaned up to the end of the topic. We’ve elided a few details here, for example, Kafka has a relatively complex protocol that enables rewriting whole topics in a crash safe way. Secondly, Kafka doesn’t ever build an offset map for the latest segment in the log. This is just to prevent doing the same work over and over - the latest log segment sees a lot of new messages, so there’s no sense in continually recompacting using it. Lastly, there are some optimizations that mean small log segments get merged into larger files, which avoids littering the filesystem with lots of small files. The last part of the puzzle is that Kafka writes down the highest offset in the offset map, for any key, that it last built the offset map to. In this case, offset 5.

Let’s see what happens when we add some more messages (again ignoring the fact that Kafka never compacts using the last segment). In this case c:6 and a:7 are the new messages:

my_topic_my_partition_1: [ c:3 ]
my_topic_my_partition_4: [ a:4, b:5 ]
my_topic_my_partition_6: [ c:6, a:7 ]

Phase 1

Build the offset map:

{
  a: 7,
  c: 6,
}

Note well, that the offset map doesn’t include b:5! We already built the offset map (in the previous clean) up to that message, and our new offset map doesn’t include a message with the key of b at all. This means the compaction process can use much less memory than you’d expect to remove duplicates.

Phase 2

Clean the log:

my_topic_my_partition_4: [ b:5 ]
my_topic_my_partition_6: [ c:6, a:7 ]

What is the bug again?

Prior to the most recent version of Kafka, the offset map had to keep a whole segment in memory. This simplified some internal accounting, but causes pretty gnarly problems, as it leads to the thread crashing if the map doesn’t have enough space. The default settings have log segments grow up to 1GB of data, which at a very small message size can overwhelm the offset map with the sheer number of keys. Then, having run out of space in the offset map without fitting in a full segment, an assertion fires and the thread crashes.

What makes this especially bad is Kafka’s handling of the thread crashing: there’s no notification to an operator, the process itself carries on running. This violates a good fundamental principle that if you’re going to fail, fail loudly and publicly.

With a broker running without this thread in the long term, data that is meant to be compacted grows and grows. This threatens the stability of the node, and if the crash impacts other nodes, the whole cluster.

What is the fix?

The fix was relatively simple, and a common theme in software: “stop doing that bad thing”. After spending quite some time to understand the compaction behavior (as explained above), the code change was a simple 100 line patch. The fix means Kafka doesn’t try to fit a whole segment in the offset map and lets it instead mark “I got partway through a log segment when building the map”.

The first step was to remove the assertion that caused the log cleaner thread to die. Then, we reworked the internal tracking such that we can record a partial segment load and recover from that point.

The outcome now, is that the log cleaner thread doesn’t die silently. This was a huge stress reliever for us - we’ve seen this happen in production multiple times, and recovering from it is quite tricky.

Conclusion

Working with the Kafka community on this bug was a great experience. We filed a Jira ticket and talked through potential solutions. After a short while, Jun Rao and Jay Kreps had a suggested solution, which was what we implemented. After some back and forth with code review, the patch was committed and made it into the latest release of Kafka.

This fix is in Kafka 0.10.1.1, which is now available and the default version on Heroku. You can provision a new cluster like so:

$ heroku addons:create heroku-kafka

For existing customers, you can upgrade to this release of Kafka like so:

$ heroku kafka:upgrade heroku-kafka --version 0.10