waiting-old-redshift-meme

Optimizing Redshift Performance with Dynamic Schemas

Amazon Redshift has served us very well at Amplitude. Redshift is a cloud-based, managed data warehousing solution that we use to give our customers direct access to their raw data (you can read more about why we chose it over other Redshift alternatives in another post from a couple months ago).  This allows them to write SQL queries to answer ad hoc questions about user behavior in their apps.

But, as we scaled the number of customers and amount of data stored, issues began emerging in our original schema. Namely, sometimes our customer’s queries took a long time to complete, and we started getting some support tickets like this:

slow Redshift queries customer tickets

It was clearly time for an overhaul.

The main issue we had to solve was our data schema. We were storing all of the events for an app in a single table. While this standardized the schema across all of our customers, it meant that querying on a particular event or property often required scanning through billions of irrelevant rows.

After a few months of work, we’ve retired our old static schema, and now have dynamic schemas that update as new events and properties are sent to Redshift. The new dynamic schema makes querying far more efficient and has drastically reduced query times — we’ve seen speed improvements of 10-30X.  We also decided to give each Amplitude customer a dedicated Redshift cluster to improve load performance and prevent customers using up each other’s resources.

We thought we’d share some of the changes that we made and how we manage clusters with dynamic schemas. We also learned a lot about the ins and outs of Redshift, and want to share those lessons as well.

Our Legacy Redshift Schema

At Amplitude, we collect event data. As opposed to traditional pageview-centric web analytics, we track actions that your users take. You can define whatever event types you like, as well as user and event properties. User properties are tied to a specific user ID (things like device type and subscription plan level), while event properties are tied to a specific event type.

We provide Redshift to give our customers direct access to their data so that they can perform ad-hoc queries and answer any question about user behavior they can think up. Our old Redshift structure did the job, but not always as quickly as our customers would like.

One massive table per app

Probably the biggest problem with our old structure was that we didn’t partition the data — all of the events for an app went into one massive table. Imagine querying billions of rows of data when you’re only interested in getting a list of users of people who just did a specific event, like ‘Buy From Cart’ or ‘Complete Game’. A lot of time was being wasted scanning over irrelevant rows.

Storing properties in JSON strings

Another shortcoming of the previous schema, which slowed down query times considerably, was that all of the user and event properties were dumped into JSON columns. So we had one column, user_properties, that held a JSON string of user properties, and another column, event_properties, containing a JSON string of event properties.

If you wanted to query on any user or event property, you had to use the json_extract_path_text function to parse the entire JSON and extract the specific property you were interested in. We observed that this json_extract_path_text function was consistently the most CPU-intensive computation on our Redshift clusters.

portion of our old Redshift schema

A portion of our old Redshift schema. As you can see, the event_properties and user_properties were each a giant JSON string that could contain up to 65535 characters. To query on one of these properties, a customer had to use the expensive and slow json_extract_path_text function.

Multi-tenant clusters

Another issue that sometimes affected query performance was that we had multi-tenant clusters, meaning that multiple Amplitude customers shared the same Redshift cluster. This left us vulnerable to an obvious problem: if one user started overloading the system with a giant query, it would slow down query performance for everyone else on the cluster. A small test query gone wrong from a single customer, like a select * over a billion rows, would take up the bulk of the resources and slow down performance for the rest.

These were the major problems we were looking to solve as we redesigned our Redshift schemas.

Introducing: Dynamic Dedicated Redshift

Table comparing old Redshift and dynamic dedicated Redshift

We’re calling our new release Dynamic Dedicated Redshift (internally we just call it DDR).

(Supposedly our VP of Engineering has some serious DDR skills. The rumors are currently unconfirmed.)

Dynamic Dedicated Redshift solves the previously identified issues in our old Redshift structure in the following ways:

Breaking out individual tables for each event type

Rather than having one table for all of an app’s events, each app’s data is now partitioned by event type. Events belonging to a specific event type will go to their own table; for example, there would be one table containing all ‘Add To Cart’ events, and another table containing all ‘Checkout’ events. This means that when you want to query on a specific event type, the query will only need to scan through the data for that event type, rather than all of them.

Dynamic schemas

Now, this next part wouldn’t have been possible without our first decision to split each event type into its own table. Remember how we were storing all event properties in a single JSON string? Now that we are splitting events into their own tables, we can extract event properties into their own individual columns in their corresponding event type table. This cuts out the computationally expensive step of using json_extract_path_text.

If we hadn’t decided to create event type tables, we never could have extracted event properties into their own columns. Imagine an app with 200 event types, where each event type has 10 properties: adding 2000 event property columns to your already massive table would have been a disaster. But, if you have 200 separate tables for each event type, adding 10 event property columns to each table makes a lot of sense and is totally doable.

Figuring out how to automate this process and create a truly dynamic data schema was the biggest challenge we faced in this process. We wanted each event type table in Redshift to have its own schema, with columns for each of its event properties. These tables would then update as new event properties were sent, adding columns as needed.

In our architecture, we have raw event files (in JSON format) in Amazon S3 that we then load into Redshift every hour. To create a dynamic schema, we have a transformation job that takes these raw files and does the following:

  1. Read all the events in the raw file and write them to an individual event type file .
  2. Create a schema file for each event type, with individual user and event properties pulled into their own columns.

In the example below, we’re looking at an e-commerce app with events like add_to_cart and checkout_cart.

Diagram of dynamic dedicated Redshift schema

This diagram illustrates how we construct and maintain our new dynamic schemas.

Once the transformation job is done, it’s time to load the data into Redshift. First, there’s a check to see whether the Redshift transform schema for each event type matches the current schema in Redshift. If the schema has changed, the current Redshift schema will be updated, with new columns added as needed. Then, data from each event type file in S3 will load into the corresponding event type table in Redshift.

Dedicated

In addition, each customer now has their own Redshift cluster, as opposed to the previous multi-tenant system. This means a customer’s query will never be impacted by another customer running a monster-sized query, or from loading another customer’s data into Redshift.

Old Redshift vs. DDR: Performance Improvements

We ran some queries to compare performance between our legacy Redshift schema and the new dynamic schema and found significant improvements in query time.

Example 1: Number of users who have played the ‘Slots’ game in the last three months.

Old Redshift

select count(distinct amplitude_id)
from events123
where event_type = 'Played'
and event_time BETWEEN '2015-03-01' and '2015-06-01'
and json_extract_path_text(event_properties, 'title') = 'Slots';

Time: 752570.259 ms

Dynamic Dedicated Redshift

select count(distinct amplitude_id)
from played
where e_title = 'Slots'
and event_time BETWEEN '2015-03-01' and '2015-06-01';

Time: 69602.186 ms

DDR: 10.8x Faster

Example 2: Find the most common game type, game level, and gender combinations in May.

Old Redshift

select json_extract_path_text(event_properties, 'game type') as e_game_type, json_extract_path_text(event_properties, 'game level') as e_game_level, json_extract_path_text(user_properties, 'gender') as u_gender,
count(distinct amplitude_id)
from events123
where event_type = 'Played'
and  event_time between '2015-05-01' AND '2015-05-30'
group by e_game_type, e_game_level, u_gender
order by count desc;

Time: 663722.052 ms

Dynamic Dedicated Redshift

select e_game_type, e_game_level, u_gender, count(distinct amplitude_id) 
from played 
where event_time BETWEEN '2015-05-01' AND '2015-05-30' 
group by e_game_type, e_game_level, u_gender 
order by count desc;

Time: 21371.673 ms

DDR: 31.1x faster

In this case, we’re looking at a query time of 11 minutes versus 21 seconds. That’s the difference between leaving your desk to wander around and bother your co-workers while you wait for a query to finish running, and just checking Facebook for a few seconds.

Example 3: For all people who have registered in the last five months, group by locale, account status, gender, and age.

Old Redshift

select json_extract_path_text(user_properties, 'locale') as u_locale, json_extract_path_text(user_properties, 'account status') as u_account_status, json_extract_path_text(user_properties, 'gender’) as u_gender, json_extract_path_text(user_properties, 'age') as u_age,
count(distinct amplitude_id)
from events123 
where event_type = 'Register' 
and event_time BETWEEN '2015-01-01' and '2015-06-01' 
group by 
u_locale, u_account_status, u_gender, u_age;

Time:

waiting for old Redshift schema

We stopped this query after it had been running for an hour.

Dynamic Dedicated Redshift

select u_locale, u_account_status, u_gender, u_age, 
count(distinct amplitude_id)
from register
where event_time between '2015-01-01' AND '2015-06-01' 
group by u_locale, u_account_status, u_gender, u_age;

Time: 8146.598 ms

DDR: 8 seconds vs > 1 hour. Success!

Redshift Lessons Learned

As you can imagine, our engineering team spent a lot of time working with the ins and outs of Redshift throughout this process. Here are some of the lessons we learned (the hard way) which we hope are helpful to others using Redshift!

1. Optimize files for efficient loading into Redshift.

Every hour, we load millions of rows of data into thousands of event type tables in Redshift, so it’s important that we make this process as efficient as possible. One factor to consider when loading files into Redshift is the file size. There’s an overhead associated with each load job and each commit statement, so it’s more efficient to merge smaller files into larger files for loading. We have found that a merged file greater than 1MB in size loads much faster than individual smaller files. In general we load file sizes between 1MB and 1GB.

The number of files loaded per transaction is also important. The smallest computation unit in Redshift is a slice. Each small node has two slices and the bigger 8xl nodes have 16 or 32 slices, depending on whether you are using dense storage (HDD) or dense compute (SSD) nodes. All the parallelization in Redshift is in terms of the number of slices being used for a job.

To load data from S3 into Redshift, we use the Redshift COPY command, which can load multiple files at a time. To maximize your Redshift resources, it’s best to load a number of files that is a multiple of the number of slices in your cluster — otherwise, you’ll be wasting slices.

2. Incorporate as many commands as you can into a single transaction.

You can achieve an incredibly high degree of parallelization with Redshift, but there’s a single commit queue at the heart of it. This means that actually committing a transaction in Redshift is sequential. In addition, Redshift commits are very computationally expensive.

The single commit queue is the bottleneck in loading your data into Redshift. You can think of the commit queue like a ferry. A commit is like sending the ferry across the bay and dropping people off on the other side. The more people you load up on the ferry at a time, the faster all of the people will get to the other side.

You’ll save time and computing power if you combine many commands into a single transaction (i.e. fill the ferry to max capacity), as opposed to committing after every single command (send the ferry across with only 1 person in it).

3. Take advantage of Redshift’s workload management (WLM) feature.

Amazon Redshift’s workload management (WLM) helps you allocate resources to certain user groups or query groups. By adjusting your WLM queue configurations, you can drastically improve performance and query speed.

For our Redshift clusters, we use WLM to set what percentage of memory goes to a customer’s queries, versus loading data and other maintenance tasks. If we give a lot of memory to our customers and don’t leave much for loading new data, loading will never finish; if we do the opposite, customer queries will never finish. There has to be a good balance when allocating resources to our customers relative to internal functions. WLM also allows us to define how many queries can be run in parallel at the same time for a given group. If the maximum number of queries are already running in parallel, any new queries will enter a queue to wait for the first one to finish.

Of course, the optimal configuration for this will depend on your specific use case, but it’s a tool that anyone using Amazon Redshift should definitely take advantage of.

4. Achieve higher compression ratios by specifying compression encoding per column.

For us, the most expensive part of Redshift is the data storage. Thus, the more we can compress the data to save space, the better. We can typically compress our data about 3x in Redshift. This means that Redshift’s cost of $1000/TB/year (for 3 year reserved nodes) is actually more like $350 per uncompressed TB per year.

Redshift allows you to specify a different compression type for each column, which is great. Instead of using a generic strategy to compress all of your data, you should use the compression type that best suits the data in the column.

In addition to compression, if you know you’ll be using Redshift on a continuous, long-term basis, another way to save costs is to use reserved instances. By using reserved instances, we save about 60% of the cost compared to on-demand Redshift.

5. Use CloudWatch alarms to keep an eye on your cluster’s key metrics.

CloudWatch is an Amazon feature that helps us keep tabs on key metrics and alerts us if something is wrong. For example, if the load on our system crosses a certain threshold, we get an email alert and can check our clusters, instead of having to actively monitor performance. CloudWatch is a great tool for ensuring that Redshift is healthy.


Overall, the process of creating a dynamic data schema and rolling it out to our customers took three solid months of work — but it’s been time well spent to improve the experience of using Redshift for Amplitude customers. By splitting event types into their own tables and creating dynamic schemas that pull properties into their own columns, we’ve seen query times improve by up to 30X — oftentimes the difference between several minutes and several seconds. This is huge for our customers, who don’t want to pause whatever they’re working on while they wait for a lengthy query to run. In addition, by providing dedicated Redshift clusters for each of our customers, we can ensure that no customers are negatively affected by the actions or data requirements of other customers.

Written by: Nirmal Utwani & Alicia Shiu


Interested in learning more about what we do for our customers at Amplitude? Get in touch

By the way, if you liked this article – we’re hiring! See https://amplitude.com/careers.

  • Hasib

    Would appreciate if you shed further detail on Dedidacted cluster for each customer. How could you allow or afford say 100 clusters for 100 customer

    • Alicia Shiu

      Redshift has a lot of distinct node configurations you can choose from based on your requirements. We customize the cluster size for each customer based on data volume and compute requirements. We also leverage reserved pricing from Amazon which reduces the cost significantly.

    • Alicia Shiu

      see Nirmal’s comment for your answer!

  • lenguyenthedat

    This is a typical case when Hadoop was historically one of the only few choices due to its ability to handle unstructured data. I’m glad you guys made it work with Redshift. Awesome post!

  • Yogesh Gowdra

    Nirmal, great post, thanks. Could you shed more light on managing so many dedicated clusters? Also, how good is it from near real time data ingestion point of view? I hear S3 becomes bottleneck in multi-tenant ecosystem especially from data bandwidth point of view.

  • Ting Jia

    Hi Nirmal, thanks a lot for your article.
    1 And can you give some details of your transformation job in the diagram , very interested in that.

    2 The properties of every event is fixed in your configuration or generate in the data dynamically ?
    Hope for your comments , and thank you again.

    • Nirmal Utwani

      1. The transformation job is an hourly job that runs on the data from the previous hour. It goes through all the events and distributes those events into different event-type files and updates the schema of the event-type if it sees a new property there.
      The load on the other hand checks the schemas before dumping data; if there’s been an addition of a property to the event-type schema, a new column is added to the corresponding table in Redshift. Once, this thing is setup, we dump respective event-type files created in the transformation job to their corresponding tables.

      2. All the events begin with a base schema, and we keep adding properties dynamically as we see them.

      Feel free to follow-up with further questions.

      • Unicorn

        Hi Nirmal, did you use an ETL tool for this transform & load or the AWS data pipeline? Any further information will be helpful. Thanks

  • ummu

    Hi, my issue with having an event type per table is that you can’t do some analytics where you want to see the last x events per user. I see in the docs that you still have an ‘eventsxxx’ table. Are the docs outdated, or does that table actually still exist? Does that mean this new method duplicated all the data or does it work some other way?

    • Yoni

      +1 on this comment

      I would also like to know how you handle queries on multiple event types?
      Do you have views which UNION ALL the type-specific tables?
      What is the performance like for these types of queries vs “Old Redshift”?

      • Nirmal Utwani

        Hi @ummu & @yoni,

        We create an ‘eventsxxx’ view, which is a UNION ALL on top of event-type tables. Queries that span multiple event-types are slower when compared to Old Redshift. We have event_time and event_type as our SORT_KEYS which helps queries on the ‘eventsxxx’ view.

        • Yoni

          Thanks for your reply, appreciate it.

  • sandeep

    Hi Nirmal, We have a similar kind of requirement and this was the very first solution that came to my mind. The only constraint I feel is the limit on maximum number of tables in redshift cluster which is around 220 if I remember correctly. In a world where I might want to support lots of event types its easy enough to cross this limit. Have you guys worked around this ?

    • Nirmal Utwani

      Hi Sandeep,
      The maximum number of tables allowed in a single redshift cluster is 9900 which is good enough for most of the use-cases.

      Just to safeguard us against hitting those limits, we limit the number of distinct event-types that get their own individual tables.

      For eg. Assuming an app has 200 event-types, the 20 biggest event-types get their own respective tables and the remaining 180 event-types are loaded into generic tables. Each generic table can fit in 20 event-types. So, in the above case, we were able to store 200 event-types in just 29 redshift tables. We create views on top of these generic tables for our customers to keep them abstracted from this distinction.

  • Dylan Herman

    Great article and thanks for sharing! My question is that If data is only loaded into Redshift on an hourly basis, are (near) real-time analytics not an option?

    • Alicia Shiu

      Hi Dylan,

      Good question — while the raw data in Redshift is loaded on an hourly basis, the data that’s available through our analytics platform is real-time — you’re able to see data within a few minutes of something happening. We only use Redshift as a data warehouse for customers to access raw data. The back-end that powers our actual analytics platform is called Nova, which you can read more about in this post: https://amplitude.com/blog/2016/05/25/nova-architecture-understanding-user-behavior/

      • Dylan Herman

        Thanks!

  • Vadim Yaroslavskiy

    This is definitely a great way to speed things up (aka manual partition).
    However, you could potentially avoid this by adding sorting on “Event” column. While querying a sorted column Redshift can skip large sections of table, greatly speeding up the query. It would not be as quick as querying a separate partition, but it’s very simple and it doesn’t require any changes to the code. Did you try it and it didn’t work well for some reason?

  • Saurabh Shetty

    Hi Nirmal,

    We are also trying to look at speeding the data ingestion using an automated approach but the challenge is choosing the correct distribution key and sort key for the analytical needs.

    Currently we are doing the traditional ELT approach, choosing the correct distribution and sort key for the staging table (1st entry point) and then performing upserts (update +insert) in the target table with the same structure.
    Also the staging table keys have to be decided to colocate the joins while merging data into target table.

    So do you think we can benefit from automating the staging data load part and then just creating the upsert scripts for the final data load/merge?

    Thanks
    Saurabh