System Variables in Azure Data Factory: Your Everyday Toolbox

(2018-Nov-20) After working and testing the functionality of variables within Azure Data Factory pipelines, I realized that it's worth to explore existing system variables. That's basically could be my toolbox to collect and store control flow metrics of my pipelines.



Looking at the official Microsoft resource System variables supported by Azure Data Factory you're given with a modest selection of system variables that you can analyze and use both on a pipeline and pipeline trigger level. Currently, you have three ways to monitor Azure Data Factory: visually, with the help of Azure Monitor or using a code to retrieve those metrics.

But here is a case of how I want to monitor a control flow of my pipeline in Azure Data Factory:



This the same data ingestion pipeline from my previous blog post - Story of combining things together that builds a list of files from a Blob storage and then data from those files are copied to a SQL database in Azure. My intention is to collect and store event information of all the completed tasks, such as Get Metadata and Copy Data.

Here is a current list of pipeline system variable in my disposal:
@pipeline().DataFactory - Name of the data factory the pipeline run is running within
@pipeline().Pipeline - Name of the pipeline
@pipeline().RunId - ID of the specific pipeline run
@pipeline().TriggerType - Type of the trigger that invoked the pipeline (Manual, Scheduler)
@pipeline().TriggerId - ID of the trigger that invokes the pipeline
@pipeline().TriggerName - Name of the trigger that invokes the pipeline
@pipeline().TriggerTime - Time when the trigger that invoked the pipeline. The trigger time is the actual fired time, not the scheduled time.

And after digging a bit more and testing pipeline activity, I've discovered additional metrics that I can retrieve on the level of each individual task:
PipelineName, 
JobId, 
ActivityRunId, 
Status, 
StatusCode, 
Output, 
Error, 
ExecutionStartTime, 
ExecutionEndTime, 
ExecutionDetails, 
Duration

Here is my final pipeline in ADF that can populate all these metrics into my custom logging database table:



And this is how I made it work:

1) First I created dbo.adf_pipeline_log table in my SQL database in Azure:


2) Then I used [Append Variable] Activity task as "On Completion" outcome from the "Get Metadata" activity with the following expression to populate a new array type var_logging variable:




var_logging = 
@concat('Metadata Store 01|Copy|',
,pipeline().DataFactory,'|'
,activity('Metadata Store 01').Duration,'|'
,activity('Metadata Store 01').Error,'|'
,activity('Metadata Store 01').ExecutionDetails,'|'
,activity('Metadata Store 01').ExecutionEndTime,'|'
,activity('Metadata Store 01').ExecutionStartTime,'|'
,activity('Metadata Store 01').JobId,'|'
,activity('Metadata Store 01').Output,'|'
,pipeline().Pipeline,'|'
,activity('Metadata Store 01').ActivityRunId,'|'
,activity('Metadata Store 01').Status,'|'
,activity('Metadata Store 01').StatusCode)

where each of the system variables is concatenated and separated with pipe character "|". 

I did a similar thing to populate the very same var_logging variable in the ForEach container where actual data copy operation occurs:



3) And then I used this final tasks to populate my dbo.adf_pipeline_log table using data from the var_logging variable by calling a stored procedure:


Where the whole trick is to split each of the text lines of the var_logging variable into another array of values split by "|" characters. Then by knowing the position of each individual system variables values, I can set them to their appropriate stored procedure parameters / columns in my logging table (e.g. @split(item(),'|')[0] for the ActivityTask).




This provided me a flexibility to see both Completed and Failed activity runs (to test a failed activity I had to temporarily rename the target table of my Copy Data task). I can now read this data and get more additional insights from the SQL Server table. 



Let me know what you think about this, and have a happy data adventure!

Comments

  1. Thank you! Very helpful blog with a more complete list of data factory pipeline system variables than I've seen anywhere else. Am working on building a similar logging approach for our data pipelines. This is going to save me time!

    ReplyDelete
  2. This is awesome! Just what I am needing to do; can you please upload your Table script and stored proc? I would save me a ton of time...thanks! MIke

    ReplyDelete
    Replies
    1. CREATE TABLE [dbo].[adf_pipeline_log](
      [id] [int] IDENTITY(1,1) NOT NULL,
      [DataFactory] [varchar](100) NULL,
      [Pipeline] [varchar](100) NULL,
      [JobId] [varchar](100) NULL,
      [RunId] [varchar](100) NULL,
      [ActivityTask] [varchar](100) NULL,
      [ActivityType] [varchar](100) NULL,
      [Status] [varchar](100) NULL,
      [StatusCode] [varchar](100) NULL,
      [Output] [varchar](1000) NULL,
      [Error] [varchar](1000) NULL,
      [ExecutionStartTime] [datetime] NULL,
      [ExecutionEndTime] [datetime] NULL,
      [ExecutionDetails] [varchar](1000) NULL,
      [Duration] [time](7) NULL,
      [ActivityTaskLogTime] [datetime] NULL,
      CONSTRAINT [PK_adf_pipeline_log] PRIMARY KEY CLUSTERED
      (
      [id] ASC
      )WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY]
      ) ON [PRIMARY]
      GO

      ALTER TABLE [dbo].[adf_pipeline_log] ADD CONSTRAINT [DF_adf_pipeline_log_ActivityTaskEventTime] DEFAULT (getdate()) FOR [ActivityTaskLogTime]
      GO


      CREATE PROCEDURE [dbo].[sp_adf_pipeline_log_update]
      (
      -- Input parameters
      @DataFactory [varchar](100),
      @Pipeline [varchar](100),
      @JobId [varchar](100),
      @RunId [varchar](100),
      @ActivityTask [varchar](100),
      @ActivityType [varchar](100),
      @Status [varchar](100),
      @StatusCode [varchar](100),
      @Output [varchar](1000),
      @Error [varchar](1000),
      @ExecutionStartTime [datetime],
      @ExecutionEndTime [datetime],
      @ExecutionDetails [varchar](1000),
      @Duration [time](7)
      )
      AS
      BEGIN

      SET NOCOUNT ON
      BEGIN TRY

      -- Adding a logging event
      INSERT INTO [dbo].[adf_pipeline_log]
      (
      DataFactory,
      Pipeline,
      JobId,
      RunId,
      ActivityTask,
      ActivityType,
      [Status],
      StatusCode,
      [Output],
      Error,
      ExecutionStartTime,
      ExecutionEndTime,
      ExecutionDetails,
      Duration
      )
      VALUES
      (
      @DataFactory,
      @Pipeline,
      @JobId,
      @RunId,
      @ActivityTask,
      @ActivityType,
      @Status,
      @StatusCode,
      @Output,
      @Error,
      @ExecutionStartTime,
      @ExecutionEndTime,
      @ExecutionDetails,
      @Duration
      );

      END TRY

      BEGIN CATCH
      -- Output few key error statistics in the case of exception or termination
      SELECT
      ERROR_NUMBER() AS ERR_NUMBER,
      ERROR_STATE() AS ERR_STATE,
      ERROR_LINE() AS ERR_LINE,
      ERROR_PROCEDURE() AS ERR_PROCEDURE,
      ERROR_MESSAGE() AS ERR_MESSAGE;
      END CATCH
      END
      GO

      Delete
  3. what data are you taking from metadata activity

    ReplyDelete
    Replies
    1. My "Get Metadata" activity tasks in this particular case was to read a list a files from my blob storage container, which then I process in further tasks.

      Delete
  4. In storeproc paramters @split(item(),'|')[0] from where item() comes??

    ReplyDelete
  5. This is awesome.One doubt which is not related to the above mentioned process.How can we avoid certain values in a json output from get metadata task before sending it to foreach loop

    ReplyDelete
    Replies
    1. Save the output of the Get Metadata task into a variable, then replace, or remove some of the values that you don't need. You can also use Filter activity task for this.

      Delete
  6. Hello, I am trying to create an email alert using Web Activity and below is the code. While I would like to make the activity name i.e.,Copy_cg4 dynamic in the below code. how do I get the activity name dynamically?

    {"DataFactoryName":"@{pipeline().DataFactory}","PipelineName":"@{pipeline().Pipeline}","Subject":"@{activity('Copy_cg4').Status}","ErrorMessage":"The ADF pipeline has @{activity('Copy_cg4').Status}","EmailTo":"supriya.d@te.com"}

    ReplyDelete
    Replies
    1. I don't know if this possible, it would require some testing. But if an Error message would contain some activity metadata information (which is should), then I would try to parse that text and extract as an activity name.

      Delete
  7. Hi Rayis,

    When I try to use @pipeline().Pipeline as a parameter to a notebook, I see random numbers as a suffix.. Is there a way to pass the pipeline name alone.

    ReplyDelete
    Replies
    1. Check your code, the @pipeline().Pipeline should add any additional information to the actual name of your pipeline. Otherwise, if length of that number is constant, that you can also remove that portion that goes to your ADB notebook as a parameter value.

      Delete
  8. hi Ravis, do you have some idea of how get the status from DataBricks executions?

    ReplyDelete
    Replies
    1. Hi Fran, the only status I think you can get in ADF whether your notebook execution was successful or not, and if it's not successful it will provide you with a Databricks URL for a failed job. More detailed logging could be done with the help of Log Analytics, where additional details could be collected about your Databricks runs if you direct its logs to a Log Analytics workspace.

      Delete
  9. Hi Rayis, how can i pass table name?
    like we pass pipeline name select '@{pipline().pipline}' as pipline_name
    static or dynamically

    ReplyDelete
    Replies
    1. Your table name is custom parameter/variable and you have define how it will be sourced. Once this is defined then you can easily update schema of the logging table and add your table name as a new column.

      Delete
  10. how do i read activity name of the previous activity in the next activity.

    ReplyDelete
    Replies
    1. Check the JSON code of your pipeline, if an activity depends on another activity then you will be able to see predecessors' names ("dependsOn")

      Delete
  11. Hello Rayis,
    Thanks a lot for your articles, it is very helpful.

    Could you please reupload global pipeline screenshot, it seems picture is broken.
    I would like to understand your global logic, and more specifically how much time you call "[dbo].[sp_adf_pipeline_log_update]" stored procedure.

    Thanks !

    ReplyDelete
    Replies
    1. Hi Jerome, you can find the whole pipeline code in my public GitHub repository: https://github.com/NrgFly/Azure-DataFactory/blob/master/Samples/pipeline/bdata_adf_logging_pl.json. I think this will be better than a screenshot.

      Delete
  12. Hi,i would like to extract pipeline duration and store it in sql..can i follow the same procedure

    ReplyDelete
    Replies
    1. There is a Duration column in the table already. Additionally, you can do this by creating a calculated column in your SQL table and put an expression to for this new column: ExecutionEndTime - ExecutionStartTime.

      Delete
  13. but here u capture duration of oly metadata activity right?
    i would like to get end to end pipeline duration

    ReplyDelete
    Replies
    1. You can also connect your ADF instance to a log analytics workspace in azure to collect all the logs. This will provide with all the timings that you're looking for, both for activities and pipelines.

      Delete
  14. i could not get the correct time when i did end time -starttime.planning to do with restapi?any idea

    ReplyDelete
    Replies
    1. You can check the "billableDuration" output attribute from your web activity, if that is how you call your rest api.

      Delete

Post a Comment