Push Your Data Into Snowflake Tables With Streamsets Using Snowpipe Streaming
Snowpipe Streaming has been released by Snowflake in 2023. It provides an API for low-latency loading of streaming data rows directly into Snowflake tables, bypassing file-based stages, the regular Snowflake ingestion pattern used in bulk loads and Snowpipes. Lower latency and promised cost reduction make it a very attractive option for real-time data processing.
Streamsets is already capable of bulk loading data to Snowflake. As Streamsets’ forte is in streaming mode of data processing, naturally, I got interested in making it work with Snowpipe Streaming. (I promise to use any word with “stream” in it sparingly from now on!) This isn’t going to be difficult, as Snowflake has provided a Java SDK for this API, and I already used external Java libraries in Groovy processors.
Let’s outline the building blocks of this solution:
If you don’t use Streamsets Platform, why not sign up for a 30 days free trial account – with this jumpstart guide you can get the trial instance up and running in 15 minutes.
Collect Snowflake Ingest SDK
Proceed to MVN repository for Ingest SDK and download the JAR-file. At the time of this blog, the latest version is 3.1.1. It also requires a couple of compile dependencies easy to trace on MVN, so I ended up with three files: slf4j-api-1.7.36.jar, snowflake-jdbc-3.22.0.jar, snowflake-ingest-sdk-3.1.1.jar
If your Deployment is already configured to use external resources archive, you need to add these files to that archive. Otherwise, prepare the archive as per Streamsets documentation. Publish it to a supported location.
Configure Streamsets and Snowflake
Generate a key pair as per Snowflake instructions. The public key is assigned to a Snowflake user (and this concludes Snowflake config!). The private key will be used by a Streamsets pipeline.
The best option to secure the private key is to keep it in a credential store like Azure Key Vault or AWS Secrets Manager and retrieve it at runtime. Most out-of-the-box stages in Streamsets follow that approach, but, Groovy stages don’t (and for a good reason). There are few “next best” options to remove sensitive credentials from pipelines and protect them, and here I will use one of them:
export SFL_PK=MIIEv…..U6W8=
2. In the Streamsets Deployment configuration, define a runtime configuration taking the value from this environment variable:
Recommended by LinkedIn
runtime.conf.location=embedded
runtime.conf_SnowflakePK=${env("SFL_PK")}
3. Also in the Deployment configuration, set it to use the External Resource archive from the selected location. Save the Deployment and restart the Data Collector.
Develop a Streamsets pipeline and load some data
This is my demo pipeline – I made it available for importing into SDC 6.1+. The Dev Data origin generates fictitious “book viewing” events of an online book store, containing a user ID, name, a book title and time of viewing:
Snowpipe Streaming operates with the concept of an offset token for each inserted row, to provide exactly-once delivery. Streamsets Jobs have their own offsets to keep track of already processed data. With a bit of orchestration setup in Streamsets, both these two offsets can work together. In this pipeline I am just going to indicate that offset tokens are computed by an Expression Evaluator processor outside of the Groovy code, to make the code reusable between different pipelines.
Next is the Groovy Evaluator processor – I developed the Groovy code for it from an example by Snowflake. It picks the private key from the runtime configuration made in the previous steps and stores no sensitive data at design time:
This pipeline is configured to generate 1000 records in 100-record batches, and in the real time insert them to a Snowflake target table. And here it goes (the IDs look funny because it’s just a generated long integer):
The example discussed here assumes that data schema stays the same from record to record and from batch to batch. If the schema varies a lot, it may be better to write the “variable” data to Snowflake as VARIANT type. For that, use an Expression Evaluator stage and put all “variable” fields into a nested field-container. In the target Snowflake table that “container” field should be created with type VARIANT.
Lastly, it’s a good idea to check Snowflake’s recommendations on cost and performance optimization.
Conclusion
Yes, you can load streaming data to Snowflake in near real-time, and use Streamsets to extract data from a variety of supported sources – a fair number of those fit streaming use cases, including CDC, REST, messaging/queuing systems on-premise or provided as services by major public clouds etc.
Just in case – ideas in this blog may not reflect the official position of IBM Streamsets.