Data Vault on Snowflake: Streams & Tasks on Views

Patrick Cuba
15 min readSep 26, 2022

--

Snowflake continues to set the standard for Data in the Cloud by taking away the need to perform maintenance tasks on your data platform and giving you the freedom to choose your data model methodology for the cloud. You will expect the same relational capabilities for your data model as any other platform and Snowflake certainly delivers. Data volume and velocity of the data can become challenging when attempting to keep the cost of data processing data low. To remedy this, Snowflake introduced streams as a highly scalable data object to track change data capture (CDC) data activity to essentially process only new data into one or multiple data pipelines and from a single data object. Let’s see how this technology can be used to bring Data Vault closer to a Kappa (streaming first) architecture.

Episode catalogue,

  1. Immutable Store, Virtual End-Dates
  2. Snowsight dashboards for Data Vault
  3. Point-in-Time constructs & Join Trees
  4. Querying really BIG satellite tables
  5. Streams & Tasks on Views
  6. Conditional Multi-Table INSERT, and where to use it
  7. Row Access Policies + Multi-Tenancy
  8. Hub locking on Snowflake

10. Out-of-sequence data

9. Virtual Warehouses & Charge Back

11. Handling Semi-Structured Data

12. Feature Engineering and Business Vault

A reminder of the data vault table types,

Episode 5: Streams & Tasks on Views

When enabled, change tracking on a data object in Snowflake records INSERTS, UPDATES and DELETES at a record level, by adding two metadata columns to denote this activity. When a stream object is defined on a table with the change tracking enabled, it acts like a metadata bookmark on the data object. Imagine the following:

If the table object contains 100 records, a stream created on this object with 100 records places the bookmark at the end of the table, now when we read from these objects, we will see the following:

  • a read from the table will show 100 records, a read from the stream will show 0 records. If 50 records are added to that table object a read from the table will show 150 records and a read from the stream will show 50 records.
  • If we add another 50 records to the table, the read on the table will show 200 records and stream will show 100 records.
  • If the stream is used as a source for a data manipulation transformation (DML) effectively ingesting the stream into a target table, then the bookmark advances to the end of the table. A read from the table will show 200 records but a read from the stream will now show 0 records, the stream was consumed, and you can place as many streams on a table object as you want.

The stream object can be used in conjunction with Time-Travel to set the offset to a particular point in the past state of the table. Further, a stream can be placed on

  • a standard snowflake table (of course) — append and standard stream types,
  • an external table — insert stream type,
  • a data share — append and standard stream types,
  • directory table — append and standard stream types,
  • a snowflake view — for our purposes we will be referring to this object — append and standard stream types are available, but we will use append only

Stream types:

  • Standard — tracks SQL INSERT, UPDATE and DELETE operations
  • Append / Insert — SQL INSERT operation only
Multiple streams on a table object

Each stream as a source to a DML can be used as a base for SQL transformations. They are independent and can be processed at any velocity. A brief explanation of the above animation,

  1. Data A content is landed, DML is executed on Stream 2 to consume Data A into Target 2 which advances Stream 2 to the end of source data table
  2. Data B content is added, DML is executed on Stream 1 to consume Data A and B into Target 1 which advances Stream 1 to the end of source data table
  3. Data C content is added, DML is executed on Stream 2 to consume Data B and C into Target 2 which advances Stream 2 to the end of source data table; at the same time DML is executed on Stream 1 to consume Data C into Target 1 which advances Stream 1 to the end of source data table as well

We have described Snowflake streams; Snowflake tasks are the other Snowflake object that is often used in conjunction with streams to periodically process stream content that can be orchestrated into daisy-chained sequence of SQL statements. What’s more, a task can execute a special function to check if there is any data in the stream before it kicks off its scheduled SQL statement and can either be associated with a virtual warehouse or deployed as serverless tasks.

Now let’s use these objects for data vault!

Significance of SQL Views in Data Vault

In terms of data vault automation, SQL views come into use in three main areas.

  • Views for Virtualising End Dates — discussed in episode 1, after a satellite table is created two default views are created over the underlying satellite table.

o Current view returns only the current record per parent key

o History view adds an End-Date to a satellite table and an optional current-flag (1, 0)

  • Views for Information Marts (IM) — as discussed in episodes 3, 4 and 6. The default position of IM is to deploy them as views over the data vault tables and if suited, have them based on disposable query assistance tables like PITs & Bridges.
  • Views for Staging — to virtualise the data vault metadata columns over landed content, let’s breakdown what some of these are,

o Surrogate hash keysdurable key based on a business key for a target hub table or the unit of work for a target link table, always defined as of data type binary

o Load date — the timestamp of the record entering the data platform

o Applied date — the timestamp of the record as it was captured from the source system, an extract timestamp

o Record source — denoting where the record came from

o Record digest, or HashDiff — used for simplifying delta detection

o Business Key Collision Code — used sparingly to ensure passive integration.

o et al.

Having a view over the landed content removes the need to copy data for the sake of physically adding these metadata columns. Instead, as data is landed it already contains the data vault metadata columns and having a single point of where all these metadata columns have been defined benefits the possible multitude of hub, link, and satellite table loaders needed to take the staged content and load them to the modelled target tables. Data movement is the enemy of analytic value, any opportunity to reduce this latency improves the time-to-analytical-value (TTAV).

Load data at any velocity

We have streams, tasks, and views for staging, let’s put them together to illustrate how the above can be combined to simplify data vault automation and apply a set and forget architecture. The problem we’re looking to solve is to elegantly deal multiple staged records per business key consolidated into a single load to a target satellite table. That is, a satellite table that is modelled to allow only one current active state per parent key, if we have multiple states of the business key in staging then how do we solve this without writing a loop into our orchestration to process each staged record one at a time against the target satellite table and loading it before processing the next newer staged record?

Incorrect load, the record with the highest timestamp must represent the current state of the business object

Without a way to pre-condense the records before loading to the target satellite, the model will suffer data integrity issues. In the above example the staged record with 9:07 am timestamp is discarded because it matches the current active record in the target satellite table. However, because of the multiple states in staging, both staged records are compared against the target satellite table and only one survives the delta check when in fact in this example both should.

To show that we can process the above scenario in one load; let’s imagine five scenarios by loading five business objects to track and for the sake of simplicity we will process two loads but with three records for each business key. An initial load to the target satellite, and the delta load.

At 9am all five business objects have the same object state (for simplicity), because this is the first time the business keys have been seen all five business keys and their associated descriptive attributes will insert records to the target satellite table.

Now for each business key we will process different scenarios to illustrate how this implementation deals with change deltas

Scenario 1: No change to staged content

Scenario 1

Let’s use the first scenario to acclimatise to the way we are depicting this technique.

On the left is the staged content as a view with a stream offset based on the stream on view, on the right is the target satellite table. For each scenario example, staging is simplified to process one business object (in reality there would be many more!) and a limited set of columns. Landed content continues to grow and a satellite-load operation will advance the stream offset the loader is based on.

Each record has a Hashdiff column and a previous HashDiff column for that record utilising the SQL LEAD() function by the hashed key. The former column exists on the stage view, the latter column exists in the common table expression (CTE) used in the satellite loader.

Two delta detection operations will occur in one load:

  1. discarding duplicate records in staging by comparing each record’s hashdiff to its previous hashdiff, and
  2. by comparing the oldest record in the staged stream on view to the current active record in the target satellite table for that hashed business key.

Streams ensure we reduce the volume of records being processed so the above two operations are not processing all the landed content, but the newest data since the last stream ingestion event. For this and all the scenarios below the first record is always inserted because that business object did not exist in the target satellite at the time of load at 9:00:05 am.

Let’s process the rest of scenario 1:

  • @9:00:05 am: new data is ingested into the target satellite table, the stream offset advances, and the satellite table is populated
  • @9:08:07 am: new data is landed and note that there are two active states of the business object

o The staged hashdiff for newest record is the same as the previous staged hashdiff by key, we will discard this record because it is not a true change

o the oldest staged record has the same hashdiff as the current record in the target satellite, this record will not be inserted to the target satellite table

No new staged records are inserted to the target satellite table.

Scenario 2: Every record is different

Scenario 2

As above but for a different business object, scenario 2 depicts,

  • @9:00:05 am: new data is ingested into the target satellite table, the stream offset advances, and the satellite table is populated
  • @9:08:07 am: new data is landed and note that there are two active states of the business object

o The staged hashdiff for newest record is different to the previous staged hashdiff by key, we will keep this record because it is a true change

o the oldest staged record has a different hashdiff as the current record in the target satellite, this record will be inserted to the target satellite table

All new staged records are inserted into the satellite table.

Scenario 3: Newest staged content is a true change

Scenario 3

For scenario 3’s business object,

  • @9:00:05 am: new data is ingested into the target satellite table, the stream offset advances, and the satellite table is populated
  • @9:08:07 am: new data is landed and note that there are two active states of the business object

o The staged hashdiff for newest record is different to the previous staged hashdiff by key, we will keep this record because it is a true change

o the oldest staged record has the same hashdiff as the current record in the target satellite, this record will not be inserted to the target satellite table

Only the newest staged record is inserted into the satellite table.

Scenario 4: Newest record is the same as the satellite record

Scenario 4

For scenario 4’s business object,

  • @9:00:05 am: new data is ingested into the target satellite table, the stream offset advances, and the satellite table is populated
  • @9:08:07 am: new data is landed and note that there are two active states of the business object

o The staged hashdiff for newest record is different to the previous staged hashdiff by key, we will keep this record because it is a true change

o the oldest staged record has a different hashdiff as the current record in the target satellite, this record will be inserted to the target satellite table

NOTE, that the newest staged record has the same hashdiff as in the target satellite, and this is important, the newest staged record must appear as the current state for that business object. Because we have validated that this is a true change in staging already, there is no need to compare this record to the target satellite — — it must be inserted.

All new staged records are inserted into the satellite table.

Scenario 5: Discarding non-true changes

Scenario 5

For scenario 5’s business object,

  • @9:00:05 am: new data is ingested into the target satellite table, the stream offset advances, and the satellite table is populated
  • @9:08:07 am: new data is landed and note that there are two active states of the business object

o The staged hashdiff for newest record is the same as the previous staged hashdiff by key, we will discard this record because it is not a true change

o the oldest staged record has a different hashdiff as the current record in the target satellite table, this record will be inserted to the target satellite table

Only the oldest staged record is inserted into the satellite table.

To summarise,

Summary of all inserts

Stream offsets move as new data is consumed by downstream DML operations and as stated earlier, you can have as many stream offsets on a supported data object as you want. This benefit allows for the use of hub, link, and satellite loaders each with their own stream based on the single stream on view. Notice that by using streams the size of the landed content is virtualised and we never have to truncate the landed data, streams simplify the need to orchestrate data movement in landing/staging simply by minimising the volume of data needed for consumption, no matter the velocity of the data coming in.

Late arriving data

If we imagined that for account scenario 6, all 15 of its records were loaded at 10am then this load pattern would be able to deal with 15 states of the records in one load and discard all duplicates during that load and advance the stream offset!

Streams on Test Framework

Not only can streams be used on ingestion data pipelines they can also be used on the target tables to efficiently process data vault metrics into a test framework. We introduced this framework in episode 2. Where can we use it?

  • Count for new hash keys in the hub-table
  • Count for new records in the satellite table
  • Reconcile new hash keys with a satellite table’s parent entity, either a parent hub table or a parent link table
  • Count for new hash keys in the link-table
  • Reconcile new hash keys with a link table’s parent entities, for each parent hub table.

Using this “streams on views” frameworks also means you need test streams based on that same view to measure the record counts that were just inserted into your data vault tables.

Standard tests to answer standard questions

Set and Forget

Finally, the object that can be used to automate all the above, Snowflake tasks! For each stream a task is used to execute the load to the target hub, link, or satellite table. One task, one loader, one stream on view, let’s summarise the Snowflake objects needed:

  • Staged view — defined once with the necessary data vault metadata columns to map to the target hub, link, and satellite tables. One landed file, one view
  • Streams on the view — one for each target data vault table being populated by this staged content. We will also need streams on the view to gather new statistics to record in our data vault test framework.
  • Task on streams — one to one with the number of streams; streams needed for the test framework will be daisy chained after the hub, link, or satellite loader tasks respectively to measure what has just been loaded.
Continuous flow, shedding non-true changes

Oh my! Seems complex, but reality it really isn’t. Data Vault only has three table types and therefore only three types of table loaders, and three test patterns per loaded table. The above is a repeatable pattern, it only needs to be configured via parameters once externally and the tasks can be scheduled to run every minute, every five minutes, once a day, you choose! You set the above once and you do not need to use external orchestration and automation tools when everything you need is already in the Snowflake platform!

Some suggested naming patterns for the above Snowflake objects:

  • View: stg_${src-badge}_${src-file}
  • Stream: str_${src-badge}_${src-file}_to_sat_${sat-name}_${hub-key-name}
  • Task: tsk_${src-badge}_${src-file}_to_sat_${sat-name}_${hub-key-name}

Why Streams on Views? Final thoughts…

Fundamentally this answer may surprise you, it’s to do with role-based access control (RBAC) and the scalability offered by applying RBAC to views and tables. Imagine defining a table and for certain roles you do not want them to have explicit access to the table itself. Therefore, the role will not be granted privileges to the tables directly. However, you are prepared to give certain roles privileges but under the conditions as specified by certain transformations and obfuscations (for example) of the data. You can then define an SQL VIEW over that table and give the role access to the VIEW instead of the table. Optionally you could secure the transformation itself by deploying the VIEW as a secured VIEW instead and the role will not have the ability to expose the transformation by issuing a GET_DDL function call. That role does not need explicitly assigned privileges to that table, only the view. The owner of the view would have had privileges to the table in order to define the SQL view.

Snowflake allows the ability to create a VIEW over a table stream but because the ownership of the table object and the view may differ as described above, applying the stream over a view instead avoids the complexity of ensuring the privileges over the table object (where a stream offset is placed) and SQL view match. Deploying a view over a table-stream is therefore an anti-pattern.

The final callout with this pattern is to always design for failure, and that is, should an issue occur be prepared with the operating procedures to reset stream offsets (and based on Snowflake time-travel if you need to).

And finally! Data Vault satellite table grain is modelled to the understanding of business object state. If a business object can only have one state at any point in time, then that is what this pattern addresses. Do not change the satellite to a multi-active satellite to deal with what could ultimately be a gap in understanding of the data that was modelled, or an error in the process to ingest the data into Snowflake. Also keep in mind that if the target table that was modelled is meant to be a multi-active satellite, then this same proposed framework can be utilised to load multi-active satellites at any velocity as well!

Until next time!

The Dream

Reference:

The views expressed in this article are that of my own, you should test implementation performance before committing to this implementation. The author provides no guarantees in this regard.

--

--

Patrick Cuba
Patrick Cuba

Written by Patrick Cuba

A Data Vault 2.0 Expert, Snowflake Solution Architect

Responses (1)