With Amplitude's Snowflake integration, you can ingest Snowflake data directly into your Amplitude project. This article walks you through the steps needed to make that happen.
Depending on your company's network policy, you may need add these IP addresses to your allowlist in order for Amplitude's servers to access your Snowflake instance:
Amplitude's Data Warehouse Import sometimes processes events in parallel, so time-ordered syncing of user and group properties on events isn't guaranteed in the same way as submitting events directly to the Identify and Group Identify APIs.
Amplitude's Snowflake Data Import supports two methods for importing data from Snowflake, Change Data Capture and Custom SQL Query.
Change Data Capture | Custom SQL Query | |
---|---|---|
Import data types | Event, User property, Group Property | Event, User property, Group Property |
Import strategy | Change-based | Time-based, Full Sync (only for group and user properties) |
When to use | Recommended for most use cases, user-friendly, minimal SQL knowledge required. Limited data source selection functionality, consider creating Snowflake VIEW (see Prerequisites for details). |
Use when data selection requires customization, even though it may lead to data discrepancies and higher costs if misconfigured |
Change Data Capture identifies and captures changes made to data in a database and delivers those changes in real time to a downstream process or system.
For the Snowflake source in Amplitude, Change Data Capture uses mechanisms available in Snowflake, Time Travel and CHANGES clause, to identify changes made in the data source since the last successfully completed import job.
DATA_RETENTION_TIME_IN_DAYS
must be greater than or equal to 1
, but Amplitude recommends at least 7
days. Otherwise, the change-based import fails. For more details, see Time Travel in Snowflake's documentation. Setting DATA_RETENTION_TIME_IN_DAYS
to 0
disables the change tracking, and causes the connection to become unrecoverable. If this happens, recreate the source.insert_id
for each row to prevent data duplication if there are any unexpected issues. More about Amplitude deduplication and insert_id
is Event Deduplication.DATA_RETENTION_TIME_IN_DAYS
, Amplitude loses ability to track historical changes. In this case, recreate the connection. To avoid duplicate events, ensure all events have an insert_id
set, and recreate the connection within seven days.CHANGES
limitations apply.The Custom SQL query supports time-based import of events, user properties, and group properties, and full syncs of user properties and group properties.
For Time-based import, Amplitude requires that you use a monotonically increasing timestamp value. This value should show when the record was loaded into the source table the SQL configuration is querying from. The warehouse import tool brings data into Amplitude by continually updating the maximum value of the column referenced in the Timestamp Column Name input within the Import Config UI with each subsequent import.
Upon first import, Amplitude imports all the data returned from the query configured in the Import Config. Amplitude saves a reference of the maximum timestamp referenced in the Timestamp Column Name: timestamp_1
. Upon subsequent import, Amplitude imports all data from the previously saved timestamp (timestamp_1
), to what's now the new maximum timestamp (timestamp_2
). Then after that import, Amplitude saves timestamp_2
as the new maximum timestamp.
To add Snowflake as a data source in your Amplitude project, follow these steps:
In Amplitude Data, click Catalog and select the Sources tab.
In the Warehouse Sources section, click Snowflake.
Enter the required credentials for the Snowflake instance you want to connect:
snowflakecomputing.com
. Don't include ".snowflakecomputing.com" in your account name.Amplitude offers password-based and key pair authentication for Snowflake.
If you want to use password authentication, select the Password option and then enter your password in the Password field. If you want to use key pair authentication, select the Key pair option and then click Generate Key. To use key pair authentication, provide the organization and account names in the format ORGNAME-ACCOUNTNAME
.
Copy the autogenerated SQL query and run it in Snowflake to give Amplitude the proper permissions.
After running the query, click Next to test the connection.
After the test is successful, click Next again to move on to the data selection stage.
Choose the modeling method, Change Data Capture or Custom SQL Query.
Configure the modeling method:
Data source: Choose a table or view from the left panel.
Data type: Select if the table maps to event, user property, or group property data.
Frequency: Select the interval with which Amplitude should check for changes in the Snowflake table.
Map the required and custom fields: Setup name mapping between columns in the Snowflake data source and data field name that Amplitude requires. For more information, see Data fields below.
When complete, click Test Mapping to verify the correct data appears under the right property in Amplitude.
Choose your configuration options:
Finish the configuration:
Amplitude displays a notification indicating you enable the new Snowflake source and redirects you to the Sources listing page.
If you have any issues or questions while following this flow, contact the Amplitude team.
To change the modeling method of your Snowflake source:
insert_id
in each row to prevent data duplication. For more information, see Data deduplication.JOIN
and WHERE
clauses:
VIEW
in your Snowflake account that wraps the data source.Data synced as of
is greater than the time recorded in the previous step to prevent potential data discrepancy and failure to identify the data drift after the latest completed import job.Data synced as of
(presented on the source detail page) on or after October 1, 2023, 12:00 AM UTC
. If it doesn't, either re-enable the connection and wait for Data synced as of
to advance or consider creating a new import connection. Otherwise, Amplitude imports all data from the current source, which may cause data duplication.Data synced as of
is greater than the time recorded in the step 3 to prevent potential data discrepancy and failure to identify the data drift after the latest completed import job.To revert to a Custom SQL connection from an already migrated source, open the source configuration and click Revert to SQL Query Import.
When you roll back from the Change Data Capture to Custom SQL connection in the, use the same data source (table or view) in Snowflake to avoid inconsistencies.
Include the mandatory fields for the data type when you create the SQL query. These tables outline the mandatory and optional fields for each data type. Find a list of other supported fields for events in the HTTP V2 API documentation and for user properties in the Identify API documentation. Add any columns not in those lists to either event_properties
or user_properties
, otherwise it's ignored.
Column name (must be lowercase) | Mandatory | Column data type | Example |
---|---|---|---|
user_id |
Yes, unless device_id is used |
VARCHAR | datamonster@gmail.com |
device_id |
Yes, unless user_id is used |
VARCHAR | C8F9E604-F01A-4BD9 |
event_type |
Yes | VARCHAR | watch_tutorial |
time |
Yes | Milliseconds since epoch (Timestamp) | 1396381378123 |
event_properties |
Yes | VARIANT (JSON Object) | {"source":"notification", "server":"host-us"} |
user_properties |
No | VARIANT (JSON Object) | {"city":"chicago", "gender":"female"} |
update_time_column |
No (Yes if using time based import) | TIMESTAMP_NTZ | 2013-04-05 01:02:03.000 |
Find other supported fields can in the HTTP V2 API documentation.
Column name (must be lowercase) | Mandatory | Column data type | Example |
---|---|---|---|
user_id |
Yes | VARCHAR | datamonster@gmail.com |
user_properties |
Yes | VARIANT (JSON Object) | {"city":"chicago", "gender":"female"} |
update_time_column |
No (Yes if using time based import) | TIMESTAMP_NTZ | 2013-04-05 01:02:03.000 |
Find other supported fields in the Identify API documentation.
Column name (must be lowercase) | Mandatory | Column data type | Example |
---|---|---|---|
groups |
Yes | VARIANT (JSON Object) | {"company":"amplitude", "team":["marketing", "sales"]} |
group_properties |
Yes | VARIANT (JSON Object) | {"location":"seattle", "active":"true"} |
update_time_column |
No (Yes if using time based import) | TIMESTAMP_NTZ | 2013-04-05 01:02:03.000 |
Each group property in group_properties
would apply to every group in groups
.
To use a group property:
Set group properties. The following is an example of how you can do it in Snowflake Group Property Import:
1SELECT OBJECT_CONSTRUCT('customerId', account_id) AS "groups", -- must be JSON2 OBJECT_CONSTRUCT('companyName', name, 'customerType', type) AS "group_properties" -- must be JSON3FROM "AMPLITUDE"."DWH"."ACCOUNTS"
Send events with group properties associated. This can be dummy events, so long as the user id and groups are there, such as specifying the following in customer's Snowflake Event Import:
1"groups": {"customerId": <account_id>}
Column name (must be lowercase) | Mandatory | Column data type | Example |
---|---|---|---|
user_id |
Yes | VARCHAR | "user123" |
property_name_1 |
Yes | (key value) VARCHAR: VARCHAR | "Title": "Data Engineer" |
property_name_1 |
Yes | (key value) VARCHAR: VARCHAR | "City": "San Francisco" |
Amplitude supports profile properties for known users. A user_id
value must join each profile property.
To make the data selection step a bit easier, here are few example SQL snippets to get you started.
1SELECT2 EVENT_TYPE_COLUMN AS "event_type",3 EVENT_PROPERTIES_VARIANT_COLUMN AS "event_properties",4 TIME_EPOCH_MS_COLUMN AS "time",5 USER_ID_COLUMN AS "user_id",6 USER_PROPERTIES_VARIANT_COLUMN AS "user_properties"7FROM DATABASE_NAME.SCHEMA_NAME.TABLE_OR_VIEW_NAME
1SELECT2 USER_ID_COLUMN AS "user_id",3 USER_PROPERTIES_VARIANT_COLUMN AS "user_properties"4FROM DATABASE_NAME.SCHEMA_NAME.TABLE_OR_VIEW_NAME
1SELECT2 GROUPS_OBJ AS "groups",3 GROUP_PROPS_OBJ AS "group_properties"4FROM DATABASE_NAME.SCHEMA_NAME.TABLE_OR_VIEW_NAME
Creating a JSON Object:
OBJECT_CONSTRUCT('city', CITY, 'state', STATE) as "user_properties"
Converting timestamp column to milliseconds:
DATE_PART('EPOCH_MILLISECOND', TIMESTAMP_COLUMN) as "time"
Converting milliseconds to TIMESTAMP_NTZ format needed for time-based import. This example uses the scale
argument set to 3
to convert to milliseconds. See the Snowflake documentation for more details.
TO_TIMESTAMP_NTZ(TIME_COLUMN_IN_MILLIS, 3) as "update_time_column"
Converting a timestamp column with a timezone to TIMESTAMP_NTZ format needed for time-based import.
TO_TIMESTAMP_NTZ(CONVERT_TIMEZONE('UTC', TIMESTAMP_TZ_COLUMN)) as "update_time_column"
Thanks for your feedback!
April 22nd, 2024
Need help? Contact Support
Visit Amplitude.com
Have a look at the Amplitude Blog
Learn more at Amplitude Academy
© 2024 Amplitude, Inc. All rights reserved. Amplitude is a registered trademark of Amplitude, Inc.