Using Azure Data Factory Mapping Data Flows to populate Data Vault

(2019-May-24) Data Flow as a data transformation engine has been introduced to the Microsoft Azure Data Factory (ADF) last year as a private feature preview. This privacy restriction has been lifted during the last Microsoft Build conference and Data Flow feature has become a public preview component of the ADF.

2020-Mar-15 update: a video recording of my webinar session on Using Azure Data Factory Mapping Data Flows to populate Data Vault at the recent PASS Data Architecture Virtual Group meeting - http://datanrg.blogspot.com/2020/03/using-azure-data-factory-mapping-data.html 

There are many different use-case scenarios that can be covered by Data Flows, considering that Data Flows in SQL Integration Service (SSIS) projects are still playing a big role to fulfill Extracting-Loading-Transforming (ETL) patterns for your data.

In this blog post, I will share my experience of populating a Data Vault repository by using Data Flows in Azure Data Factory. 


First, I need to create my Data Vault model. 

Data Model

For this exercise I've taken a date warehouse sample of AdventureWorksDW2017 SQL Server database, twhere I limited a set of entities to a small set of dimension tables (DimProduct, DimCustomer, DimGeography) and one fact table (FactInternateSales).




From the existing list of tables in my DW instance I came up with the list of entities for my new alternative Data Vault model:

To make a full transition from the existing  DW model to an alternative Data Vault I removed all Surrogate Keys and other attributes that are only necessary to support Kimball data warehouse methodology. Also, I needed to add necessary Hash keys to all my Hub, Link and Satellite tables. The target environment for my Data Vault would be SQL Azure database and I decided to use a built-in crc32 function of the Mapping Data Flow to calculate hash keys (HK) of my business data sourcing keys and composite hash keys of satellite tables attributes (HDIFF).



Sourcing Data
To test my Data Vault loading process in ADF Data Flows I extracted data from the existing Adventure Works SQL Server database instance and excluded DW related data columns. This is an example of my Product flat file in my Azure blob storage with the following list of columns:
       ProductAlternateKey
      ,WeightUnitMeasureCode
      ,SizeUnitMeasureCode
      ,EnglishProductName
      ,StandardCost
      ,FinishedGoodsFlag
      ,Color
      ,SafetyStockLevel
      ,ReorderPoint
      ,ListPrice
      ,Size
      ,SizeRange
      ,Weight
      ,ModelName



Also, I created a set of views to show the latest version of the satellite table that I want to collide with sourcing data files. Examples of those views for SAT_InternetSales and SAT_Product tables:

CREATE VIEW [dbo].[vw_latest_sat_internetsales] 
AS 
  SELECT [link_internetsales_hk], 
         [sat_internetsales_hdiff] 
  FROM   (SELECT [link_internetsales_hk], 
                 [sat_internetsales_hdiff], 
                 Row_number() 
                   OVER( 
                     partition BY SAT.link_internetsales_hk 
                     ORDER BY SAT.load_ts DESC) AS row_id 
          FROM   [dbo].[sat_internetsales] SAT) data 
  WHERE  row_id = 1; 

CREATE VIEW [dbo].[vw_latest_sat_product] 
AS 
  SELECT [hub_product_hk], 
         [sat_product_hdiff] 
  FROM   (SELECT [hub_product_hk], 
                 [sat_product_hdiff], 
                 [load_ts], 
                 Max(load_ts) 
                   OVER ( 
                     partition BY sat.hub_product_hk) AS latest_load_ts 
          FROM   [dbo].[sat_product] sat) data 
  WHERE  load_ts = latest_load_ts; 

Data Integration
If you're new to building data integration projects in the Microsoft Data Platform (SSIS, Azure Data Factory, others), I would suggest this approach:
   1) Learn more about different components of the existing data integration tools and their capabilities.
   2) Go back to your real use case scenario and describe the business logic of your data integration/data transformation steps.
   3) Then merge those business rules with corresponding technical components in order to come up with a technical architecture of your business solution.

Mapping Data Flows
Now, after preparing all of this, I'm ready to create Mapping Data Flows in Azure Data Factory.

Example of the Product data flow:


1) Source (Source Product): connection to the Product CSV data file in my blob storage account
2) Derived Columns (Hash Columns): to calculate hash columns and load timestamps.


HUB_Product_HK  crc32(ProductAlternateKey)
SAT_Product_HDIFF  crc32(ProductAlternateKey, WeightUnitMeasureCode, SizeUnitMeasureCode, EnglishProductName, StandardCost, FinishedGoodsFlag, Color, SafetyStockLevel, ReorderPoint, ListPrice, Size, SizeRange, Weight, ModelName) 

3) Exists transformation (CheckNewProduct): Filtering rows that are not matching the following criteria: move forward records from the Sourcing data files with records that are new to the HUB_Product table:


4) Select transformation: to select only 3 columns (ProductAlternateKey, HUB_Product_HK, LOAD_TS
5) Sink: These 3 columns are then sinked my target HUB_Product data vault table.

Since Product data file is used both to load data to HUB_Product and SAT_Product (Step 1 & 2), a separate data stream is created to populate data the SAT_Product as well.
3) Exists transformation (CheckNewSatProduct): Filtering rows that are not matching the following criteria: move forward records from the Sourcing data files that are new to the SAT_Product table:

4) Select transformation: to select columns for my next Sink Step (HUB_Product_HK, WeightUnitMeasureCode, SizeUnitMeasureCode, EnglishProductName, StandardCost, FinishedGoodsFlag, Color, SafetyStockLevel, ReorderPoint, ListPrice, Size, SizeRange, Weight, ModelName, SAT_Product_HDIFF, LOAD_TS
5) Sink: These 18 columns are then sinked my target SAT_Product data vault table.

Similarly, I created another Data Flow task to load Fact Internet Sales data (dvs_internetsales_big.csv) and populate LINK_InternetSales & SAT_InternetSales tables:

where LINK_InternetSales_HK & SAT_InternetSales_HDIFF  hash keys were used to identify new data for the data vault tables.

This case is solved, and now I can use Azure Data Factory Mapping Data Flows to load data into a data vault repository.

Please let me know if you have further questions or you can get a closer look into the code of those developed pipelines and mapping data flows in my GitHub repository:
https://github.com/NrgFly/Azure-DataFactory/tree/master/Samples

Happy Data Adventures!

(2019-May-27) update:
After Mark Kromer had included this original blog post into his list of ADF data flows patterns (https://github.com/kromerm/adfdataflowdocs/blob/master/patterns/adf-data-flow-patterns.md), there was a comment about different hash functions that are available in the ADF Mapping Data Flows (CRC32, MD5, SHA1, SHA2). At my company production environment, we use text fields and MD5 function to populate them with hash values using Azure Databricks workflow.

So for my mapping data flows exercise, I decided to change hash columns from Bigint to Varchar and replace CRC32 with SHA2(512). It was interesting to see my hash key columns with 128 characters values at the end, but it still worked. ADF Mapping Data Flows proved to be a solid environment for data integration projects to load data into a Data Vault environment!

Comments

  1. Please note that using the sha2 hash without delimiters can result in the same hash when columns are nullable.
    Example: ABC+NULL+123 results in ABC123
    ABC+123+NULL results in ABC123
    Therefore, the same hash gets generated (because it's the same string) and a data source change won't be detected.

    Using a delimiter can solve this issue (I'm using a double pipe --> '||').
    Example: ABC||NULL||123 results in ABC||||123
    ABC||123||NULL results in ABC||123||
    Because the string is different, the hash will be different, so you'll get a different hash outcome, which indicates a source change and therefore you'll get a new record.

    Also, when dealing with NULL values, it's useful to use the concatWS function in order to catch NULLs with whatever whitespace value you prefer (I used an empty string --> ''). Otherwise all hash combinations that contain NULLs in one of the columns will result in the same hash (the hash of NULL).

    ReplyDelete
    Replies
    1. Interesting case, let me test this and verify the behavior.

      Delete
  2. Hey Rayis,
    How do you store the historic data in the SAT ? In my Data Vault, built with Stored Procedures, I have an additional column called LOAD_END_DATE which is set to null as default unless the HASHDIFF differs between Raw layer and Data Vault layer then LOAD_END_DATE is set to the LOAD_DTS i.e current_timestamp. In that way I can always store historic rows.

    The "update" inside the Stored Procedure looks like the following:

    UPDATE [DVault].[SAT_product]
    SET LOAD_END_DATE = @LOAD_DATE
    FROM [RawData].[product] raw
    INNER JOIN [DVault].[SAT_product] Sat
    ON Sat.HUB_PK_product = raw.HUB_PK_product
    WHERE Sat.[SAT_product
    _HASHDIFF] <> raw.[SAT_product_HASHDIFF]
    AND Sat.[Load_End_Date] IS NULL

    How would you implement this logic inside the Data Flow ?

    ReplyDelete
    Replies
    1. Hi Rubrix, please watch my YouTube video on this: https://www.youtube.com/watch?v=PgrWRirKKyw
      In a data flow (or this could done in any other development tools) historical records are created in single data step to populate new (not existing) records from the source when for the same Hash HUB key and Hash Sat Diff column is different, and LOAD_TS (timestamp) column is populated. This will give me multiple Hash HUB key records in the Satellite table with different LOAD_TS records (the latest value could be treated as my active record).

      Delete
    2. Thank you for your answer. I will take a look at the video you linked. Thank you for sharing your work.

      Delete
    3. Thank you for your answer. I will take a look at the video you linked. Thank you for sharing your work.

      Delete
  3. Rayis,
    How do you implement CI CD for Data Vault implementation?

    ReplyDelete
    Replies
    1. Hi Sudeep, Data Vault is a methodology, not a technology that can be incorporated into CI CD process. Whatever tools you decided to choose to implement your data vault will drive your decision to create a DevOps process around them. I've worked with Databricks + ADLS as engine behind the DV model, the blog showed an example of using Azure SQL and a Blob storage.

      Delete
  4. Could you go a little deeper into what you connect the link table to? Is it connected to the HUB or SAT table to load into the Link table?

    ReplyDelete
    Replies
    1. LINK tables are necessary to identify relationships between HUB tables.

      Delete
    2. That makes sense, but how does one create the connection in ADF? For instance I create the Link Hash key for the intial data source, but how do I connect to that link to another hub? I am connecting the link to a hub via the HUB has key and that is all. Is there anything else I should include in the Exists Component?

      Delete
    3. or should I ask, how are the implemented through ADF?

      Delete
    4. You can check my ADF Data Vault implementation in this data flow: https://github.com/NrgFly/Azure-DataFactory/blob/master/Samples/dataflow/df_PASS_ADF_DataVault.json

      Delete
    5. LINK and SAT tables were populated in a single run based on the hash calculated columns from the very same source.

      Delete

Post a Comment