Amazon Redshift has served us very well at Amplitude. Redshift is a cloud-based, managed data warehousing solution that we use to give our customers direct access to their raw data (you can read more about why we chose it over other Redshift alternatives in another post from a couple months ago).
This allows them to write SQL queries to answer ad hoc questions about user behavior in their apps. But, as we scaled the number of customers and amount of data stored, issues began emerging in our original schema. Namely, sometimes our customer’s queries took a long time to complete, and we started getting some support tickets like this:
It was clearly time for an overhaul. The main issue we had to solve was our data schema. We were storing all of the events for an app in a single table. While this standardized the schema across all of our customers, it meant that querying on a particular event or property often required scanning through billions of irrelevant rows.
After a few months of work, we’ve retired our old static schema, and now have dynamic schemas that update as new events and properties are sent to Redshift. The new dynamic schema makes querying far more efficient and has drastically reduced query times – we’ve seen speed improvements of 10-30X.
We also decided to give each Amplitude customer a dedicated Redshift cluster to improve load performance and prevent customers using up each other’s resources. We thought we’d share some of the changes that we made and how we manage clusters with dynamic schemas.
We also learned a lot about the ins and outs of Redshift, and want to share those lessons as well.
Our Legacy Redshift Schema
At Amplitude, we collect event data. As opposed to traditional pageview-centric web analytics, we track actions that your users take. You can define whatever event types you like, as well as user and event properties.
User properties are tied to a specific user ID (things like device type and subscription plan level), while event properties are tied to a specific event type. We provide Redshift to give our customers direct access to their data so that they can perform ad-hoc queries and answer any question about user behavior they can think up. Our old Redshift structure did the job, but not always as quickly as our customers would like.
One massive table per app
Probably the biggest problem with our old structure was that we didn’t partition the data – all of the events for an app went into one massive table. Imagine querying billions of rows of data when you’re only interested in getting a list of users of people who just did a specific event, like ‘Buy From Cart’ or ‘Complete Game’. A lot of time was being wasted scanning over irrelevant rows.
Storing properties in JSON strings
Another shortcoming of the previous schema, which slowed down query times considerably, was that all of the user and event properties were dumped into JSON columns. So we had one column, user_properties, that held a JSON string of user properties, and another column, event_properties, containing a JSON string of event properties.
If you wanted to query on any user or event property, you had to use the
json_extract_path_text function to parse the entire JSON and extract the specific property you were interested in. We observed that this
json_extract_path_text function was consistently the most CPU-intensive computation on our Redshift clusters.
A portion of our old Redshift schema. As you can see, the event_properties and user_properties were each a giant JSON string that could contain up to 65535 characters. To query on one of these properties, a customer had to use the expensive and slow json_extract_path_text function.
Another issue that sometimes affected query performance was that we had multi-tenant clusters, meaning that multiple Amplitude customers shared the same Redshift cluster. This left us vulnerable to an obvious problem: if one user started overloading the system with a giant query, it would slow down query performance for everyone else on the cluster.
A small test query gone wrong from a single customer, like a
select * over a billion rows, would take up the bulk of the resources and slow down performance for the rest. These were the major problems we were looking to solve as we redesigned our Redshift schemas.
Introducing: Dynamic Dedicated Redshift
We’re calling our new release Dynamic Dedicated Redshift (internally we just call it DDR). (Supposedly our VP of Engineering has some serious DDR skills. The rumors are currently unconfirmed.) Dynamic Dedicated Redshift solves the previously identified issues in our old Redshift structure in the following ways:
Breaking out individual tables for each event type
Rather than having one table for all of an app’s events, each app’s data is now partitioned by event type. Events belonging to a specific event type will go to their own table; for example, there would be one table containing all ‘Add To Cart’ events, and another table containing all ‘Checkout’ events. This means that when you want to query on a specific event type, the query will only need to scan through the data for that event type, rather than all of them.
Now, this next part wouldn’t have been possible without our first decision to split each event type into its own table. Remember how we were storing all event properties in a single JSON string? Now that we are splitting events into their own tables, we can extract event properties into their own individual columns in their corresponding event type table.
This cuts out the computationally expensive step of using
json_extract_path_text. If we hadn’t decided to create event type tables, we never could have extracted event properties into their own columns.
Imagine an app with 200 event types, where each event type has 10 properties: adding 2000 event property columns to your already massive table would have been a disaster. But, if you have 200 separate tables for each event type, adding 10 event property columns to each table makes a lot of sense and is totally doable.
Figuring out how to automate this process and create a truly dynamic data schema was the biggest challenge we faced in this process. We wanted each event type table in Redshift to have its own schema, with columns for each of its event properties. These tables would then update as new event properties were sent, adding columns as needed.
In our architecture, we have raw event files (in JSON format) in Amazon S3 that we then load into Redshift every hour. To create a dynamic schema, we have a transformation job that takes these raw files and does the following:
- Read all the events in the raw file and write them to an individual event type file .
- Create a schema file for each event type, with individual user and event properties pulled into their own columns.
In the example below, we’re looking at an e-commerce app with events like add_to_cart and checkout_cart.
This diagram illustrates how we construct and maintain our new dynamic schemas.
Once the transformation job is done, it’s time to load the data into Redshift. First, there’s a check to see whether the Redshift transform schema for each event type matches the current schema in Redshift. If the schema has changed, the current Redshift schema will be updated, with new columns added as needed. Then, data from each event type file in S3 will load into the corresponding event type table in Redshift.
In addition, each customer now has their own Redshift cluster, as opposed to the previous multi-tenant system. This means a customer’s query will never be impacted by another customer running a monster-sized query, or from loading another customer’s data into Redshift.
Old Redshift vs. DDR: Performance Improvements
We ran some queries to compare performance between our legacy Redshift schema and the new dynamic schema and found significant improvements in query time.
Example 1: Number of users who have played the ‘Slots’ game in the last three months.
select count(distinct amplitude_id) from events123 where event_type = 'Played' and event_time BETWEEN '2015-03-01' and '2015-06-01' and json_extract_path_text(event_properties, 'title') = 'Slots';
Time: 752570.259 ms
Dynamic Dedicated Redshift
select count(distinct amplitude_id) from played where e_title = 'Slots' and event_time BETWEEN '2015-03-01' and '2015-06-01';
Time: 69602.186 ms
DDR: 10.8x Faster
Example 2: Find the most common game type, game level, and gender combinations in May**.
select json_extract_path_text(event_properties, 'game type') as e_game_type, json_extract_path_text(event_properties, 'game level') as e_game_level, json_extract_path_text(user_properties, 'gender') as u_gender, count(distinct amplitude_id) from events123 where event_type = 'Played' and event_time between '2015-05-01' AND '2015-05-30' group by e_game_type, e_game_level, u_gender order by count desc;
Time: 663722.052 ms
Dynamic Dedicated Redshift
select e_game_type, e_game_level, u_gender, count(distinct amplitude_id) from played where event_time BETWEEN '2015-05-01' AND '2015-05-30' group by e_game_type, e_game_level, u_gender order by count desc;
Time: 21371.673 ms
DDR: 31.1x faster
In this case, we’re looking at a query time of 11 minutes versus 21 seconds. That’s the difference between leaving your desk to wander around and bother your co-workers while you wait for a query to finish running, and just checking Facebook for a few seconds.
**Example 3: For all people who have registered in the last five months, group by locale, account status, gender, and age.
select json_extract_path_text(user_properties, 'locale') as u_locale, json_extract_path_text(user_properties, 'account status') as u_account_status, json_extract_path_text(user_properties, 'gender’) as u_gender, json_extract_path_text(user_properties, 'age') as u_age, count(distinct amplitude_id) from events123 where event_type = 'Register' and event_time BETWEEN '2015-01-01' and '2015-06-01' group by u_locale, u_account_status, u_gender, u_age;
Time: We stopped this query after it had been running for an hour.
Dynamic Dedicated Redshift
select u_locale, u_account_status, u_gender, u_age, count(distinct amplitude_id) from register where event_time between '2015-01-01' AND '2015-06-01' group by u_locale, u_account_status, u_gender, u_age;
Time: 8146.598 ms
DDR: 8 seconds vs > 1 hour. Success!
Redshift Lessons Learned
As you can imagine, our engineering team spent a lot of time working with the ins and outs of Redshift throughout this process. Here are some of the lessons we learned (the hard way) which we hope are helpful to others using Redshift!
1. Optimize files for efficient loading into Redshift.
Every hour, we load millions of rows of data into thousands of event type tables in Redshift, so it’s important that we make this process as efficient as possible. One factor to consider when loading files into Redshift is the file size.
There’s an overhead associated with each load job and each commit statement, so it’s more efficient to merge smaller files into larger files for loading. We have found that a merged file greater than 1MB in size loads much faster than individual smaller files. In general we load file sizes between 1MB and 1GB.
The number of files loaded per transaction is also important. The smallest computation unit in Redshift is a slice. Each small node has two slices and the bigger 8xl nodes have 16 or 32 slices, depending on whether you are using dense storage (HDD) or dense compute (SSD) nodes.
All the parallelization in Redshift is in terms of the number of slices being used for a job. To load data from S3 into Redshift, we use the Redshift COPY command, which can load multiple files at a time. To maximize your Redshift resources, it’s best to load a number of files that is a multiple of the number of slices in your cluster -- otherwise, you’ll be wasting slices.
2. Incorporate as many commands as you can into a single transaction.
You can achieve an incredibly high degree of parallelization with Redshift, but there’s a single commit queue at the heart of it. This means that actually committing a transaction in Redshift is sequential.
In addition, Redshift commits are very computationally expensive. The single commit queue is the bottleneck in loading your data into Redshift. You can think of the commit queue like a ferry. A commit is like sending the ferry across the bay and dropping people off on the other side. The more people you load up on the ferry at a time, the faster all of the people will get to the other side.
You’ll save time and computing power if you combine many commands into a single transaction (i.e. fill the ferry to max capacity), as opposed to committing after every single command (send the ferry across with only 1 person in it).
3. Take advantage of Redshift’s workload management (WLM) feature.
Amazon Redshift’s workload management (WLM) helps you allocate resources to certain user groups or query groups.
By adjusting your WLM queue configurations, you can drastically improve performance and query speed. For our Redshift clusters, we use WLM to set what percentage of memory goes to a customer’s queries, versus loading data and other maintenance tasks. If we give a lot of memory to our customers and don’t leave much for loading new data, loading will never finish; if we do the opposite, customer queries will never finish.
There has to be a good balance when allocating resources to our customers relative to internal functions. WLM also allows us to define how many queries can be run in parallel at the same time for a given group. If the maximum number of queries are already running in parallel, any new queries will enter a queue to wait for the first one to finish. Of course, the optimal configuration for this will depend on your specific use case, but it’s a tool that anyone using Amazon Redshift should definitely take advantage of.
4. Achieve higher compression ratios by specifying compression encoding per column.
For us, the most expensive part of Redshift is the data storage. Thus, the more we can compress the data to save space, the better. We can typically compress our data about 3x in Redshift. This means that Redshift’s cost of $1000/TB/year (for 3 year reserved nodes) is actually more like $350 per uncompressed TB per year. Redshift allows you to specify a different compression type for each column, which is great.
Instead of using a generic strategy to compress all of your data, you should use the compression type that best suits the data in the column. In addition to compression, if you know you’ll be using Redshift on a continuous, long-term basis, another way to save costs is to use reserved instances. By using reserved instances, we save about 60% of the cost compared to on-demand Redshift.
5. Use CloudWatch alarms to keep an eye on your cluster’s key metrics.
CloudWatch is an Amazon feature that helps us keep tabs on key metrics and alerts us if something is wrong. For example, if the load on our system crosses a certain threshold, we get an email alert and can check our clusters, instead of having to actively monitor performance. CloudWatch is a great tool for ensuring that Redshift is healthy.
Overall, the process of creating a dynamic data schema and rolling it out to our customers took three solid months of work – but it’s been time well spent to improve the experience of using Redshift for Amplitude customers. By splitting event types into their own tables and creating dynamic schemas that pull properties into their own columns, we’ve seen query times improve by up to 30X – oftentimes the difference between several minutes and several seconds.
This is huge for our customers, who don’t want to pause whatever they’re working on while they wait for a lengthy query to run. In addition, by providing dedicated Redshift clusters for each of our customers, we can ensure that no customers are negatively affected by the actions or data requirements of other customers.