With Amplitude's Snowflake integration, you can ingest Snowflake data directly into your Amplitude project. The integration supports four strategies to import your Snowflake data, depending on the data types you select.
Depending on your company's network policy, you may need to add these IP addresses to your allowlist in order for Amplitude's servers to access your Snowflake instance:
Region | IP Addresses |
---|---|
US | 52.33.3.219 , 35.162.216.242 , 52.27.10.221 |
EU | 3.124.22.25 , 18.157.59.125 , 18.192.47.195 |
Amplitude's Data Warehouse Import sometimes processes events in parallel, so time-ordered syncing of user and group properties on events isn't guaranteed in the same way as submitting events directly to the Identify and Group Identify APIs.
Complete the following steps to configure the Snowflake source:
To add Snowflake as a data source in your Amplitude project, follow these steps:
In Amplitude Data, navigate to Catalog → Sources.
In the Warehouse Sources section, click Snowflake.
Enter the required credentials for the Snowflake instance you want to connect:
snowflakecomputing.com
. Don't include ".snowflakecomputing.com" in your account name.Amplitude offers password-based and key pair authentication for Snowflake.
ORGNAME-ACCOUNTNAME
.Copy the autogenerated SQL query and run it in Snowflake to give Amplitude the proper permissions.
After running the query, click Next to test the connection.
After the test succeeds, click Next again to move on to the data selection stage.
The data type you select defines the strategies and settings available to you for configuration.
Data Type | Description |
---|---|
Event | Includes user actions associated with either a user ID or a device ID and may also include event properties. |
User Properties | Includes dictionaries of user attributes you can use to segment users. Each property is associated with a user ID. |
Group Properties | Includes dictionaries of group attributes that apply to a a group of users. Each property is associated with a group name. |
Profiles | Includes dictionaries of properties that relate to a user profile. Profiles display the most current data synced from your warehouse, and are associated with a user ID. |
Select from the following strategies, depending on your data type selection.
Strategy | Description |
---|---|
Full Sync | Ingests the entire dataset on a defined schedule. This option is useful for datasets that change over time, but can't show which rows are changed. |
Timestamp | Ingests the most recent rows on a schedule, as determined by the Timestamp column. |
Change data capture (CDC) | Ingests the most recent rows of data on a schedule, as determined by Snowflake's Change Data Capture feature. CDC supports customization of the feed type (for event data) and data mutability settings. |
See the following table to understand which data types are compatible with which import strategies.
Data type | Supported import strategies |
---|---|
Event | CDC, Timestamp |
User properties | Full Sync, Timestamp |
Group Properties | Full Sync, Timestamp |
Profiles | CDC |
For the Event
data type, the CDC strategy supports configuration of the CDC feed type.
Select Ingestion Only to ingest from your warehouse and include Amplitude's enrichment services like ID Resolution, property and attribution syncing, and location resolution.
Select Continuous Sync to mirror your Snowflake data with support for insert
, update
, and delete
operations. This option deactivates Amplitude's enrichment services to ensure you remain in sync with your source-of-truth.
Continuous Sync also supports Data Mutability settings. Select which options to enable, update
or delete
. insert
operations are always on.
Depending on the import strategy you choose, either map your data with a SQL statement to transform the data (Timestamp, Full Sync) or use the data selection tool to map column names directly to Amplitude properties.
Provide a name for the source, and set the frequency with which Amplitude imports your data.
When choosing an integration strategy, consider the following:
Full Sync: Choose this option if you need to periodically ingest the entire dataset and can't track which rows have changed. This method is best for smaller datasets where tracking incrementally isn't possible. This method isn't suitable for large datasets due to the overhead required to ingest all data each time.
Timestamp Import: Choose this option if you can incrementally import data using a monotonically increasing timestamp column that indicates when records when Snowflake loads the records. This is efficient and works well when you append new data with timestamps.
Change Data Capture (CDC) Ingestion Only: Choose this option to import data based on changes detected by Snowflake's CDC feature while still using Amplitude's enrichment services. This method only supports reading INSERT
operations from the CDC
Change Data Capture (CDC) Continuous Sync: Choose this option to directly mirror the data in Snowflake with INSERT
, UPDATE
, and DELETE
operations based on changes detected by Snowflake's CDC feature. This method disables Amplitude's enrichment services to remain in sync with your source of truth and is ideal when you need to keep Amplitude data fully synchronized with your Snowflake data. UPDATE
and DELETE
operations mutate data in Amplitude.
Import Strategy | Data Types Supported | Data Mutability | Amplitude Enrichment Services | Column Mapping Method | When to Use | Considerations |
---|---|---|---|---|---|---|
Full Sync | User Properties, Group Properties | N/A | Enrichment services applied | Custom SQL SELECT Query | Use when you need to periodically ingest the entire dataset and cannot track changes incrementally. | Not suitable for large datasets due to the need to ingest the entire dataset each time. |
Timestamp | Events, User Properties, Group Properties | N/A | Enrichment services applied | Custom SQL SELECT Query | Use when you can track new data using a monotonically increasing timestamp column. | Requires a timestamp column that indicates when the record was loaded into Snowflake. |
CDC: Ingest only | Events | Insert operations only | Enrichment services applied | UI-based table and column selection | Use when you want to import data based on changes detected by Snowflake's CDC feature, with Amplitude enrichment services. | Requires change tracking to be enabled in Snowflake. |
CDC: Continuous Sync | Events, Profiles | Supports insert, update, delete operations | Enrichment services not applied | UI-based table and column selection | Use when you want to directly mirror data in Snowflake, including updates and deletions, and keep Amplitude in sync with source data. | Disables Amplitude's enrichment services to remain in sync with the source of truth. Requires careful consideration of limitations, such as data retention settings in Snowflake and that deletions/renames of columns may not be captured. See limitations section for more details. |
When using CDC Continuous Sync, keep the following things in mind:
Enable Change Tracking: Enable change tracking for the source table or view. See Enabling Change Tracking on Views and Underlying Tables in Snowflake's documentation.
Data Retention Settings: DATA_RETENTION_TIME_IN_DAYS
must be greater than or equal to one, but Amplitude recommends at least seven days. Otherwise, the change-based import fails. For more details, see Time Travel in Snowflake's documentation. Setting DATA_RETENTION_TIME_IN_DAYS
to 0
disables the change tracking and renders the connection unrecoverable. If this happens, recreate the source.
Disable Change Tracking: If you disable change tracking in Snowflake, or disconnect the Amplitude source for a period longer than the value of DATA_RETENTION_TIME_IN_DAYS
, Amplitude loses the ability to track historical changes. In this case, recreate the connection. To avoid duplicate events, ensure all events have an insert_id
set, and recreate the connection within seven days.
Unique and Immutable insert_id
: Ensure the data to be imported has a unique and immutable insert_id
for each row to prevent data duplication if there are any unexpected issues. More about Amplitude deduplication and insert_id
is available in Event Deduplication.
Complex SQL Statements: If a data source is represented as a complex SQL SELECT
statement (for instance, with a JOIN
clause), create a VIEW
in your Snowflake account that wraps the data source to use it with a change-based import strategy. See Streams on Views for considerations when using CDC with views in Snowflake.
Views with JOINs: While Snowflake CDC is efficient, using views that contain JOINs can have performance implications. Consider syncing joined data as user profiles instead.
Avoid table deletion and re-creation: Don't delete and recreate tables with the same name, as Snowflake CDC doesn't capture changes in this scenario. Use incremental models with tools like dbt to prevent table replacement.
Handling schema changes: CDC supports adding new columns with default NULL
values to CDC-tracked tables or views. Amplitude recommends against other kinds of schema changes. Snowflake CDC only reflects changes from DML statements. DDL statements that logically modify data (such as adding new columns with default values, dropping existing columns, or renaming columns) affect future data sent to Amplitude, but Snowflake doesn't update historical data with changes caused by DDL statements. As a result, Amplitude doesn't reflect these updates for historical data.
Amplitude enrichment services disabled: When using CDC Continuous Sync, Amplitude disables enrichment services like ID resolution, property and attribution syncing, and resolving location info to remain in sync with your source of truth.
To change the modeling method of your Snowflake source:
insert_id
in each row to prevent data duplication. For more information, see Data deduplication.JOIN
and WHERE
clauses:
VIEW
in your Snowflake account that wraps the data source.Data synced as of
is greater than the time recorded in the previous step to prevent potential data discrepancy and failure to identify the data drift after the latest completed import job.Data synced as of
(presented on the source detail page) on or after October 1, 2023, 12:00 AM UTC
. If it doesn't, either re-enable the connection and wait for Data synced as of
to advance or consider creating a new import connection. Otherwise, Amplitude imports all data from the current source, which may cause data duplication.Data synced as of
is greater than the time recorded in the step 3 to prevent potential data discrepancy and failure to identify the data drift after the latest completed import job.To revert to a custom SQL connection from an already migrated source, open the source configuration and click Revert to SQL Query Import.
When you roll back from the Change Data Capture to Custom SQL connection in the, use the same data source (table or view) in Snowflake to avoid inconsistencies.
Include the mandatory fields for the data type when you create the SQL query. These tables outline the mandatory and optional fields for each data type. Find a list of other supported fields for events in the HTTP V2 API documentation and for user properties in the Identify API documentation. Add any columns not in those lists to either event_properties
or user_properties
, otherwise it's ignored.
Column name (must be lowercase) | Mandatory | Column data type | Example |
---|---|---|---|
user_id |
Yes, unless device_id is used |
VARCHAR | datamonster@gmail.com |
device_id |
Yes, unless user_id is used |
VARCHAR | C8F9E604-F01A-4BD9 |
event_type |
Yes | VARCHAR | watch_tutorial |
time |
Yes | Milliseconds since epoch (Timestamp) | 1396381378123 |
event_properties |
Yes | VARIANT (JSON Object) | {"source":"notification", "server":"host-us"} |
user_properties |
No | VARIANT (JSON Object) | {"city":"chicago", "gender":"female"} |
update_time_column |
No (Yes if using time based import) | TIMESTAMP_NTZ | 2013-04-05 01:02:03.000 |
Find other supported fields can in the HTTP V2 API documentation.
Column name (must be lowercase) | Mandatory | Column data type | Example |
---|---|---|---|
user_id |
Yes | VARCHAR | datamonster@gmail.com |
user_properties |
Yes | VARIANT (JSON Object) | {"city":"chicago", "gender":"female"} |
update_time_column |
No (Yes if using time based import) | TIMESTAMP_NTZ | 2013-04-05 01:02:03.000 |
Find other supported fields in the Identify API documentation.
Column name (must be lowercase) | Mandatory | Column data type | Example |
---|---|---|---|
groups |
Yes | VARIANT (JSON Object) | {"company":"amplitude", "team":["marketing", "sales"]} |
group_properties |
Yes | VARIANT (JSON Object) | {"location":"seattle", "active":"true"} |
update_time_column |
No (Yes if using time based import) | TIMESTAMP_NTZ | 2013-04-05 01:02:03.000 |
Each group property in group_properties
would apply to every group in groups
.
To use a group property:
Set group properties. The following is an example of how you can do it in Snowflake Group Property Import:
1SELECT OBJECT_CONSTRUCT('customerId', account_id) AS "groups", -- must be JSON2 OBJECT_CONSTRUCT('companyName', name, 'customerType', type) AS "group_properties" -- must be JSON3FROM "AMPLITUDE"."DWH"."ACCOUNTS"
Send events with group properties associated. These can be dummy events, so long as the user ID and groups are there. Specify the following in your Snowflake Event Import:
1"groups": {"customerId": <account_id>}
To make the data selection step a bit easier, here are few example SQL snippets to get you started.
1SELECT2 EVENT_TYPE_COLUMN AS "event_type",3 EVENT_PROPERTIES_VARIANT_COLUMN AS "event_properties",4 TIME_EPOCH_MS_COLUMN AS "time",5 USER_ID_COLUMN AS "user_id",6 USER_PROPERTIES_VARIANT_COLUMN AS "user_properties"7FROM DATABASE_NAME.SCHEMA_NAME.TABLE_OR_VIEW_NAME
1SELECT2 USER_ID_COLUMN AS "user_id",3 USER_PROPERTIES_VARIANT_COLUMN AS "user_properties"4FROM DATABASE_NAME.SCHEMA_NAME.TABLE_OR_VIEW_NAME
1SELECT2 GROUPS_OBJ AS "groups",3 GROUP_PROPS_OBJ AS "group_properties"4FROM DATABASE_NAME.SCHEMA_NAME.TABLE_OR_VIEW_NAME
Creating a JSON Object:
OBJECT_CONSTRUCT('city', CITY, 'state', STATE) as "user_properties"
Converting timestamp column to milliseconds:
DATE_PART('EPOCH_MILLISECOND', TIMESTAMP_COLUMN) as "time"
Converting milliseconds to TIMESTAMP_NTZ format needed for time-based import. This example uses the scale
argument set to 3
to convert to milliseconds. See the Snowflake documentation for more details.
TO_TIMESTAMP_NTZ(TIME_COLUMN_IN_MILLIS, 3) as "update_time_column"
Converting a timestamp column with a timezone to TIMESTAMP_NTZ format needed for time-based import.
TO_TIMESTAMP_NTZ(CONVERT_TIMEZONE('UTC', TIMESTAMP_TZ_COLUMN)) as "update_time_column"
Thanks for your feedback!
September 19th, 2024
Need help? Contact Support
Visit Amplitude.com
Have a look at the Amplitude Blog
Learn more at Amplitude Academy
© 2024 Amplitude, Inc. All rights reserved. Amplitude is a registered trademark of Amplitude, Inc.