For almost two years now, the Heroku Dashboard has provided a metrics page to display information about memory usage and CPU load for all of the dynos running an application. Additionally, we’ve been providing aggregate error metrics, as well as metrics from the Heroku router about incoming requests: average and P95 response time, counts by status, etc.
Almost all of this information is being slurped out of an application’s log stream via the Log Runtime Metrics labs feature. For applications that don’t have this flag enabled, which is most applications on the platform, the relevant logs are still generated, but bypass Logplex, and are instead sent directly to our metrics processing pipeline.
Since its beta release, Dashboard Metrics has been a good product. Upgrading from good to great unearthed some interesting performance hurdles, as well as doubling our CPU requirements, which simply became bumps when we just slapped a “Powered by Kafka” sticker on it. What follows is a look back at the events and decisions that lead us to enlightenment.
Historically speaking, if a Heroku user wanted system level metrics about their apps, they had two choices:
In August of 2014, a third option became available–we shipped visualizations for the past 24 hours of 10-minute roll-up data right in the dashboard!
The architecture for the initial system was quite simple. We received incoming log lines containing metrics (the same log lines a customer sees when they turn on log-runtime-metrics), and turned them into points which we would then persist to a 21 shard InfluxDB cluster, using consistent hashing. The dashboard then queried the relevant metrics via a simple API proxy layer and rendered them.
Our InfluxDB setup just worked, so well in fact, that for nearly two years the only maintenance we did was upgrade the operating system to Ubuntu Trusty! We were still running a nearly 2-year-old release! (Note: all opinions are based on the 0.7.3 release, which has long been dropped from support. InfluxDB served us incredibly well, and we are eternally grateful for their effort.)
As we sat back and collected user experiences about Dashboard Metrics, it was clear that our users wanted more. And we wanted to deliver more! We spun off a new team charged with building a better operational experience for our customers, with metrics being at the forefront.
Almost immediately we shipped 2 hours of data at 1-minute resolution, and started enhancing the events that we can show by querying internal APIs. We’ve since added restart events, scaling events, deployment markers, configuration changes, and have lots of other enhancements planned. But, as we were starting to make these changes, we were realizing that our current infrastructure wasn’t going to cut it.
We set off researching ideas for how to build our next generation metrics pipeline with a few overarching goals:
The last goal is most important for us. Yes, of course, we want to ensure that whatever system we put in place is tolerant against failures and that it can be extended to drive new features. But, we also want to understand what operational headaches our customers have firsthand so we can look at developing features that address them, instead of building features that look nice on paper but solve nothing in practice.
Plus, our coworkers already operate the hell out of databases and our runtimes. It’d be foolish to not leverage their skills!
The data ingestion path for a metrics aggregation system is the most critical. Not seeing a spike in 5xx errors during a flash sale makes businesses sad. Being blind to increased latency makes customers sad. A system which drops data on the floor due to a downstream error makes me sad.
But, in our previous system, this could happen and did happen anytime we had to restart a node. We were not robust to restarts, or crashes, or really any failures.
The New System Had to Be.
With our previous setup, when data was successfully stored in InfluxDB, that was pretty much the end of the line. Sure, one can build systems to query the data again, and we have, but with millions of constantly changing dynos, and metric streams as a result, knowing what to query all the time isn’t all that easy.
Data in the New System Should Never Rest.
Our summarization of raw data with InfluxDB relied on continuous queries, which while convenient, were fairly expensive. When we added continuous queries for 1-minute resolution, our CPU load doubled.
We Should Only Persist Rolled Up Data.
With these three properties in mind, we found stream processing, specifically with Apache Kafka, to fit our needs quite well. With Kafka as the bus between each of the following components, our architecture was set.
We ran into a number of problems, not the least of which is the volume and size of HTTP requests we get from Logplex, and the other metrics producing systems. Pushing this through the shared Heroku router was something we wanted to avoid.
Fortunately, Private Spaces was entering beta and looking for testers. We became customers.
Overall, our architecture looks like this:
Let’s dive into the different pieces.
With Private Spaces, a customer gets an isolated “mini-Heroku” into which to deploy apps. That isolation comes with an elastic routing layer, dedicated runtime instances, and access to the same exact Heroku experience one gets in the common runtime (pipelines, buildpacks, CLI tools, metrics, logging, etc). It was only natural for us to deploy our new system into a space.
Our space runs 4 different apps, all of which share the same Go code base. We share a common code base to reduce the pain of package dependencies in Go and to better ensure compatibility between services.
By design, our ingress app is simple. It speaks the Logplex drain protocol over HTTP and converts each log line into an appropriate Protocol Buffers encoded message. Log lines representing router requests, for instance, are then in turn produced to the
ROUTER_REQUESTS Kafka topic.
We custom built our aggregation layer instead of opting to run something like Spark on the platform. The big stream processing frameworks all make assumptions about the underlying orchestration layer, and some of them even require arbitrary TCP connections, which Heroku on the whole, doesn’t currently support. Also, at the time we were making this decision, Kafka Streams was not ready for prime time, so instead we built a simple aggregation framework in Go that runs in a dyno and plays well with Heroku Kafka.
In reality, building a simple aggregation framework is pretty straightforward for our use case. Each of our consumers is a large map of accumulators that either just count stuff, or perform simple streaming computations like min, max, and sum. Orchestrating which dynos consume which partitions was done with an upfront deterministic mapping based on the number of dynos we want to run and the number of partitions we have. We trust Heroku’s ability to keep the right number of dynos running, and slightly over-provision to avoid delays.
While straightforward, there were a few hurdles we encountered building this framework, including late arrival of data and managing offsets after an aggregation process restarts.
While it’d be amazing if we can always guarantee that data would never arrive late, it’s just not possible. We regularly receive measurements as old as 2 minutes, and will count measurements up to 5 minutes old.
We do this by keeping around the last 5 summary accumulators, and flushing them on change (after a timeout, of course) to the appropriate compacted summary topic.
When the aggregation process restarts, the in-memory accumulators need to reflect the last state that they were in before the restart occurred. Because of this, we can’t simply start consuming from the latest offset, and it would be wrong to mark an offset that wasn’t yet committed to a summary.
Therefore, we keep track of the first offset seen for every new time frame (e.g. each minute) and mark the offset from 5 minutes ago after flushing the summaries for the current minute. This ensures that our state is rebuilt correctly, at the expense of extra topic publishes. (Did I mention our use of compacted topics?)
Each sink process consumes a number of partitions for each topic, batches them up by shard and type, and uses a Postgres COPY FROM statement to write 1024 summaries at a time. This seems to perform quite well and is certainly better performing than an INSERT per summary.
Our database schemas more or less match the layout of log-runtime-metrics data, but include additional columns like min, max, sum, sum of squares, and count for the metrics to aid in downsampling and storytelling purposes.
The API is a simple read-only HTTP over JSON service. In addition to selecting the appropriate data from the appropriate shards, and authenticating against the Heroku API, the Metrics API has the ability to downsample the 1-minute data on demand. This is how we continue to serve 10-minute roll-up data for the 24-hour views.
We would like to eventually expose these endpoints officially as part of the Heroku API, but that work is not currently scheduled.
That left Kafka. Until a few weeks ago, there wasn’t really an option for getting Kafka as a service, but as Heroku insiders, we were alpha and now beta testers of Heroku Kafka long before the public beta was announced.
We don’t complicate our usage of Kafka. Dyno load average measurements, like all of the other measurement types, for instance, have 3 topics associated with them.
DYNO_LOAD.<version> are raw measurements. A compacted
DYNO_LOAD_SUMMARY.<version> summarizes / rolls up measurements. The rollup period is contained within the message, making it possible for us to store 1 minute, and (potentially) 15-minute rollups in the same topic if we need to. Lastly, the
RETRY.DYNO_LOAD_SUMMARY.<version> topic is written to when we fail to write a summary to our Postgres sink. Another process consumes the
RETRY topics and attempts to persist to Postgres indefinitely.
Each of these topics is created with 32 partitions, and a replication factor of 3, giving us a bit of wiggle room for failure. We partition messages to those 32 partitions based on the application’s UUID, which we internally call owner since a measurement is “owned” by an app.
Even though we continue to use owner for partitioning, our compacted, summary topics produce messages with a key that includes the time for which the summary was for. We do this with a custom partitioner that simply ignores the parts of the key that aren’t the owner.
Given our plan of storing only summarized data, we felt (and still believe) that a partitioned and sharded Postgres setup would work and made sense. As such, we have 7 shards, each running on the Heroku Postgres private-4 plan. We create a new schema each day with fresh tables for each metric type. This allows us to easily reign in our retention strategy with simple
DROP SCHEMA statements, which are less expensive than
DELETE statements on large tables.
The owner column, as it has throughout, continues to be the shard key.
While our performance is acceptable for our query load, it’s not exactly where we’d like it to be, and feel it could be better. We will be looking at ways in which to further optimize our use of Postgres in the future.
No system is perfect, and ours isn’t some magical exception. We’ve learned some things in this exercise that we think are worth pointing out.
Our strategy of using the owner column for our shard/partitioning key was a bit unfortunate, and now hard to change. While we don’t currently see any ill effects from this, there are hypothetical situations in which this could pose a problem. For now, we have dashboards and metrics which we watch to ensure that this doesn’t happen and a lot of confidence that the systems we’ve built upon will actually handle it in stride.
Even still, a better strategy, likely, would have been to shard on owner + process_type (e.g. web), which would have spread the load more evenly across the system. In addition to the more even distribution of data, from a product perspective it would mean that in a partial outage, some of an application’s metrics would remain available.
The performance of our Postgres cluster doesn’t worry us. As mentioned, it’s acceptable for now, but our architecture makes it trivial to swap out, or simply add another data store to increase query throughput when it becomes necessary. We can do this by spinning up another Heroku app that uses shareable addons, starts consuming the summary topics and writes them to a new data store, with no impact to the Postgres store!
Our system is more powerful and more extensible because of Kafka.
While we’re pretty confident about the data once it has been committed to Kafka, the story up until then is murkier. Heroku’s logging pipeline, on which metrics continue to be built, is built on top of lossy-by-design systems. The logging team, of course, monitors this loss and maintains it at an acceptable level, but it means that we may miss some measurements from time to time. Small loss events are typically absorbed via our 1-minute rollup strategy. Larger loss events due to system outages are, historically and fortunately, very rare.
As we look to build more features on top of our metrics pipeline that require greater reliability in the underlying metrics, we’re also looking at ways in which we can ensure our metrics end up in Kafka. This isn’t the end of the discussion on reliability, but rather just the beginning.
This isn’t the end of the story, but rather just the humble beginnings. We’ll continue to evolve this system based on internal monitoring and user feedback.
In addition, we rebuilt our metrics pipeline because there are operational experiences we wish to deliver that now become dramatically easier. We’ve prototyped a few of them and hope to start delivering on them rapidly.
Finally, it goes without saying that we think Kafka is a big deal. We hope this will inspire you to wonder what types of things Kafka can enable for your apps. And, of course, we can only hope that you’ll trust Heroku to run that cluster for you!