In the world of “big data”, businesses that can quickly discover and act upon insights from their users’ events have a decisive advantage. It is no longer sufficient for analytics systems to solely rely on daily batch processing. This is why our new column store, Nova, continues to use a lambda architecture. In addition to a batch layer, this architecture also has a real-time layer that processes event data as they come in, and the real-time layer only needs to maintain the last day’s events. In a previous post, we focused on the batch layer of Nova. Designing the real-time layer to support incremental updates for a column store creates a different set of requirements and challenges. We will discuss our approach in this post.
Events are the units of data in our system. We think of an event as a database row with columns such as upload time, Amplitude ID, user ID, etc. As these events come in, they are partitioned and distributed across multiple real-time nodes.
There are four main requirements for Nova’s real-time layer:
- Store and support queries on events from the most recent day.
- Optimized for both reads and writes. The write requirement is especially crucial because a real-time node typically updates thousands of times per second as new events come in.
- De-duplicate events from the last 10 minutes. This is because a real-time node reads from a message bus such as Kafka, and the message bus could repeatedly send the same batch of events in case of failure.
- Fault-tolerant. A real-time node should be able to recover from a reboot to its previous state.
Druid, a high-performance, real-time column store, inspired a lot of our final design. Most prominently, our real-time layer also has two main components: an in-memory store and a disk store. Druid’s real-time layer, however, does not satisfy the third requirement. For example, it would end up with duplicated events in any of the following scenarios:
- A failure happens between a persist (a Druid operation) and its corresponding offset update on the message bus.
- The message bus fails to stream events for more than 10 minutes (the interval in the Druid paper) and then resumes streaming events; but the first batch of incoming events was already ingested by real-time before the failure.
The ability to de-duplicate events directly determines how reliable and actionable our real-time insights are. Because of this, we decided not to entirely adopt Druid’s approach. Our final design differs in how long event data stay in the in-memory store, and how these data’s format changes once they move into the disk store.
To satisfy the deduplication requirement without sacrificing write speed, we must maintain at least 10 minutes worth of recent events in memory. This specification leads us to implement the in-memory store as a LRU cache. The entries in this cache are called “blocks”, which are groups of events based on upload times. To be more exact, we divide the timeline into five-minute intervals and group events whose upload times are from the same interval into a block. Note that blocks are row-oriented, so we can easily de-duplicate incoming events against them.
Let’s go over the ingestion workflow of the in-memory store. Whenever we see new incoming events (after deduplication) that belong to a block, we add these events to the block. As its events continue to show up in the stream, this block will stay in the cache and keep updating. After all its events have passed, the block will not be accessed again unless there are duplicated events. The time period between this block’s last update and its eviction from the cache must be no shorter than our real-time layer’s deduplication window (10 minutes). Therefore, we configure the cache to hold 15 minutes worth of event data so that the block will not be evicted for at least another 10 (15 – 5) minutes.
One of the biggest challenges of maintaining an in-memory store is recovery. To make sure our cache-based system is fault-tolerant, each time new events are added to a block, we append these events to a corresponding file on disk. This file grows as the block updates: it essentially is a write-ahead log for the block. When a node restarts, we can reconstruct the LRU cache by reading all the persisted files. We optimize these append operations so that they do not compromise the write performance of our store. Once the cache evicts a block, its corresponding file gets moved to the disk store to be converted to a columnar format at a later stage.
We chose to heavily utilize Caffeine Cache in our implementation because it has several nice properties:
- Caffeine is highly concurrent, so it easily satisfies our requirement on write-optimization.
- We can implement Caffeine as a write-through cache, which greatly simplifies our logics around synchronizing blocks and their persisted files.
- Caffeine uses a sophisticated eviction policy called Windowed-TinyLFU that maintains both a “window cache” and a “main cache”. In our event-data-streaming setting, blocks from the most recent five-minute interval will always be admitted into the main cache; so the eviction policy is effectively just Segmented LRU, which serves our needs.
The disk store has two main tasks:
- Repeatedly merging a fixed number of block files into a single column-oriented file that we call “chunk”.
- Periodically deleting chunks whose views are ready in the batch layer.
Chunk creation happens whenever enough blocks have been evicted from the in-memory store. By converting multiple small, row-oriented blocks to a single column-oriented chunk, we significantly improve the real-time layer’s query speed. To facilitate handoff between real-time and batch layers, we label chunks with a global batch number when they are created. This number increments every time the batch layer finishes processing one day’s events. The disk store runs an hourly task to delete chunks with old batch numbers. This simple handoff scheme enables the real-time layer to only store recent events and to transition fluidly with the batch layer.
Querying on the real-time layer
To compute a query result, we simultaneously fetch data from both the in-memory store and the disk store across all real-time nodes. The in-memory store has the drawback of being row-oriented, but it gains speed by having all its data in memory. On the other hand, the disk store has the disadvantage of having to read from disk, which it makes up by having all its data in an optimized columnar format. When reading from the in-memory store, we make sure to do it through a view of the LRU cache (which Caffeine supports). This way, queries do not affect the access history of the cache.
Towards faster insights
Our real-time layer achieves a tradeoff among speed, robustness and data integrity. These are the prerequisites of turning real-time analytics into fast, reliable insights. Here at Amplitude, we strive to reduce our customers’ time to insight. We are excited to build new features on top of the real-time layer to further this goal. If you are interested in working on these types of technical challenges, we would love to speak with you!