(2018-Nov-20) After working and testing the functionality of variables within Azure Data Factory pipelines, I realized that it's worth to explore existing system variables. That's basically could be my toolbox to collect and store control flow metrics of my pipelines.
Looking at the official Microsoft resource System variables supported by Azure Data Factory you're given with a modest selection of system variables that you can analyze and use both on a pipeline and pipeline trigger level. Currently, you have three ways to monitor Azure Data Factory: visually, with the help of Azure Monitor or using a code to retrieve those metrics.
But here is a case of how I want to monitor a control flow of my pipeline in Azure Data Factory:
This the same data ingestion pipeline from my previous blog post - Story of combining things together that builds a list of files from a Blob storage and then data from those files are copied to a SQL database in Azure. My intention is to collect and store event information of all the completed tasks, such as Get Metadata and Copy Data.
Here is a current list of pipeline system variable in my disposal:
@pipeline().DataFactory - Name of the data factory the pipeline run is running within
@pipeline().Pipeline - Name of the pipeline
@pipeline().RunId - ID of the specific pipeline run
@pipeline().TriggerType - Type of the trigger that invoked the pipeline (Manual, Scheduler)
@pipeline().TriggerId - ID of the trigger that invokes the pipeline
@pipeline().TriggerName - Name of the trigger that invokes the pipeline
@pipeline().TriggerTime - Time when the trigger that invoked the pipeline. The trigger time is the actual fired time, not the scheduled time.
And after digging a bit more and testing pipeline activity, I've discovered additional metrics that I can retrieve on the level of each individual task:
PipelineName,
JobId,
ActivityRunId,
Status,
StatusCode,
Output,
Error,
ExecutionStartTime,
ExecutionEndTime,
ExecutionDetails,
Duration
Here is my final pipeline in ADF that can populate all these metrics into my custom logging database table:
And this is how I made it work:
1) First I created dbo.adf_pipeline_log table in my SQL database in Azure:
2) Then I used [Append Variable] Activity task as "On Completion" outcome from the "Get Metadata" activity with the following expression to populate a new array type var_logging variable:
where each of the system variables is concatenated and separated with pipe character "|".
I did a similar thing to populate the very same var_logging variable in the ForEach container where actual data copy operation occurs:
3) And then I used this final tasks to populate my dbo.adf_pipeline_log table using data from the var_logging variable by calling a stored procedure:
Where the whole trick is to split each of the text lines of the var_logging variable into another array of values split by "|" characters. Then by knowing the position of each individual system variables values, I can set them to their appropriate stored procedure parameters / columns in my logging table (e.g. @split(item(),'|')[0] for the ActivityTask).
This provided me a flexibility to see both Completed and Failed activity runs (to test a failed activity I had to temporarily rename the target table of my Copy Data task). I can now read this data and get more additional insights from the SQL Server table.
Let me know what you think about this, and have a happy data adventure!
Looking at the official Microsoft resource System variables supported by Azure Data Factory you're given with a modest selection of system variables that you can analyze and use both on a pipeline and pipeline trigger level. Currently, you have three ways to monitor Azure Data Factory: visually, with the help of Azure Monitor or using a code to retrieve those metrics.
But here is a case of how I want to monitor a control flow of my pipeline in Azure Data Factory:
This the same data ingestion pipeline from my previous blog post - Story of combining things together that builds a list of files from a Blob storage and then data from those files are copied to a SQL database in Azure. My intention is to collect and store event information of all the completed tasks, such as Get Metadata and Copy Data.
Here is a current list of pipeline system variable in my disposal:
@pipeline().DataFactory - Name of the data factory the pipeline run is running within
@pipeline().Pipeline - Name of the pipeline
@pipeline().RunId - ID of the specific pipeline run
@pipeline().TriggerType - Type of the trigger that invoked the pipeline (Manual, Scheduler)
@pipeline().TriggerId - ID of the trigger that invokes the pipeline
@pipeline().TriggerName - Name of the trigger that invokes the pipeline
@pipeline().TriggerTime - Time when the trigger that invoked the pipeline. The trigger time is the actual fired time, not the scheduled time.
And after digging a bit more and testing pipeline activity, I've discovered additional metrics that I can retrieve on the level of each individual task:
PipelineName,
JobId,
ActivityRunId,
Status,
StatusCode,
Output,
Error,
ExecutionStartTime,
ExecutionEndTime,
ExecutionDetails,
Duration
Here is my final pipeline in ADF that can populate all these metrics into my custom logging database table:
And this is how I made it work:
1) First I created dbo.adf_pipeline_log table in my SQL database in Azure:
2) Then I used [Append Variable] Activity task as "On Completion" outcome from the "Get Metadata" activity with the following expression to populate a new array type var_logging variable:
var_logging = @concat('Metadata Store 01|Copy|', ,pipeline().DataFactory,'|' ,activity('Metadata Store 01').Duration,'|' ,activity('Metadata Store 01').Error,'|' ,activity('Metadata Store 01').ExecutionDetails,'|' ,activity('Metadata Store 01').ExecutionEndTime,'|' ,activity('Metadata Store 01').ExecutionStartTime,'|' ,activity('Metadata Store 01').JobId,'|' ,activity('Metadata Store 01').Output,'|' ,pipeline().Pipeline,'|' ,activity('Metadata Store 01').ActivityRunId,'|' ,activity('Metadata Store 01').Status,'|' ,activity('Metadata Store 01').StatusCode)
where each of the system variables is concatenated and separated with pipe character "|".
I did a similar thing to populate the very same var_logging variable in the ForEach container where actual data copy operation occurs:
3) And then I used this final tasks to populate my dbo.adf_pipeline_log table using data from the var_logging variable by calling a stored procedure:
Where the whole trick is to split each of the text lines of the var_logging variable into another array of values split by "|" characters. Then by knowing the position of each individual system variables values, I can set them to their appropriate stored procedure parameters / columns in my logging table (e.g. @split(item(),'|')[0] for the ActivityTask).
This provided me a flexibility to see both Completed and Failed activity runs (to test a failed activity I had to temporarily rename the target table of my Copy Data task). I can now read this data and get more additional insights from the SQL Server table.
Let me know what you think about this, and have a happy data adventure!
Thank you! Very helpful blog with a more complete list of data factory pipeline system variables than I've seen anywhere else. Am working on building a similar logging approach for our data pipelines. This is going to save me time!
ReplyDeleteThis is awesome! Just what I am needing to do; can you please upload your Table script and stored proc? I would save me a ton of time...thanks! MIke
ReplyDeleteCREATE TABLE [dbo].[adf_pipeline_log](
Delete[id] [int] IDENTITY(1,1) NOT NULL,
[DataFactory] [varchar](100) NULL,
[Pipeline] [varchar](100) NULL,
[JobId] [varchar](100) NULL,
[RunId] [varchar](100) NULL,
[ActivityTask] [varchar](100) NULL,
[ActivityType] [varchar](100) NULL,
[Status] [varchar](100) NULL,
[StatusCode] [varchar](100) NULL,
[Output] [varchar](1000) NULL,
[Error] [varchar](1000) NULL,
[ExecutionStartTime] [datetime] NULL,
[ExecutionEndTime] [datetime] NULL,
[ExecutionDetails] [varchar](1000) NULL,
[Duration] [time](7) NULL,
[ActivityTaskLogTime] [datetime] NULL,
CONSTRAINT [PK_adf_pipeline_log] PRIMARY KEY CLUSTERED
(
[id] ASC
)WITH (STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF) ON [PRIMARY]
) ON [PRIMARY]
GO
ALTER TABLE [dbo].[adf_pipeline_log] ADD CONSTRAINT [DF_adf_pipeline_log_ActivityTaskEventTime] DEFAULT (getdate()) FOR [ActivityTaskLogTime]
GO
CREATE PROCEDURE [dbo].[sp_adf_pipeline_log_update]
(
-- Input parameters
@DataFactory [varchar](100),
@Pipeline [varchar](100),
@JobId [varchar](100),
@RunId [varchar](100),
@ActivityTask [varchar](100),
@ActivityType [varchar](100),
@Status [varchar](100),
@StatusCode [varchar](100),
@Output [varchar](1000),
@Error [varchar](1000),
@ExecutionStartTime [datetime],
@ExecutionEndTime [datetime],
@ExecutionDetails [varchar](1000),
@Duration [time](7)
)
AS
BEGIN
SET NOCOUNT ON
BEGIN TRY
-- Adding a logging event
INSERT INTO [dbo].[adf_pipeline_log]
(
DataFactory,
Pipeline,
JobId,
RunId,
ActivityTask,
ActivityType,
[Status],
StatusCode,
[Output],
Error,
ExecutionStartTime,
ExecutionEndTime,
ExecutionDetails,
Duration
)
VALUES
(
@DataFactory,
@Pipeline,
@JobId,
@RunId,
@ActivityTask,
@ActivityType,
@Status,
@StatusCode,
@Output,
@Error,
@ExecutionStartTime,
@ExecutionEndTime,
@ExecutionDetails,
@Duration
);
END TRY
BEGIN CATCH
-- Output few key error statistics in the case of exception or termination
SELECT
ERROR_NUMBER() AS ERR_NUMBER,
ERROR_STATE() AS ERR_STATE,
ERROR_LINE() AS ERR_LINE,
ERROR_PROCEDURE() AS ERR_PROCEDURE,
ERROR_MESSAGE() AS ERR_MESSAGE;
END CATCH
END
GO
what data are you taking from metadata activity
ReplyDeleteMy "Get Metadata" activity tasks in this particular case was to read a list a files from my blob storage container, which then I process in further tasks.
DeleteIn storeproc paramters @split(item(),'|')[0] from where item() comes??
ReplyDeleteIt comes from the ForEach loop container.
DeleteThis is awesome.One doubt which is not related to the above mentioned process.How can we avoid certain values in a json output from get metadata task before sending it to foreach loop
ReplyDeleteSave the output of the Get Metadata task into a variable, then replace, or remove some of the values that you don't need. You can also use Filter activity task for this.
DeleteHello, I am trying to create an email alert using Web Activity and below is the code. While I would like to make the activity name i.e.,Copy_cg4 dynamic in the below code. how do I get the activity name dynamically?
ReplyDelete{"DataFactoryName":"@{pipeline().DataFactory}","PipelineName":"@{pipeline().Pipeline}","Subject":"@{activity('Copy_cg4').Status}","ErrorMessage":"The ADF pipeline has @{activity('Copy_cg4').Status}","EmailTo":"supriya.d@te.com"}
I don't know if this possible, it would require some testing. But if an Error message would contain some activity metadata information (which is should), then I would try to parse that text and extract as an activity name.
DeleteHi Rayis,
ReplyDeleteWhen I try to use @pipeline().Pipeline as a parameter to a notebook, I see random numbers as a suffix.. Is there a way to pass the pipeline name alone.
Check your code, the @pipeline().Pipeline should add any additional information to the actual name of your pipeline. Otherwise, if length of that number is constant, that you can also remove that portion that goes to your ADB notebook as a parameter value.
Deletehi Ravis, do you have some idea of how get the status from DataBricks executions?
ReplyDeleteHi Fran, the only status I think you can get in ADF whether your notebook execution was successful or not, and if it's not successful it will provide you with a Databricks URL for a failed job. More detailed logging could be done with the help of Log Analytics, where additional details could be collected about your Databricks runs if you direct its logs to a Log Analytics workspace.
DeleteHi Rayis, how can i pass table name?
ReplyDeletelike we pass pipeline name select '@{pipline().pipline}' as pipline_name
static or dynamically
Your table name is custom parameter/variable and you have define how it will be sourced. Once this is defined then you can easily update schema of the logging table and add your table name as a new column.
Deletehow do i read activity name of the previous activity in the next activity.
ReplyDeleteCheck the JSON code of your pipeline, if an activity depends on another activity then you will be able to see predecessors' names ("dependsOn")
DeleteHello Rayis,
ReplyDeleteThanks a lot for your articles, it is very helpful.
Could you please reupload global pipeline screenshot, it seems picture is broken.
I would like to understand your global logic, and more specifically how much time you call "[dbo].[sp_adf_pipeline_log_update]" stored procedure.
Thanks !
Hi Jerome, you can find the whole pipeline code in my public GitHub repository: https://github.com/NrgFly/Azure-DataFactory/blob/master/Samples/pipeline/bdata_adf_logging_pl.json. I think this will be better than a screenshot.
DeleteHi,i would like to extract pipeline duration and store it in sql..can i follow the same procedure
ReplyDeleteThere is a Duration column in the table already. Additionally, you can do this by creating a calculated column in your SQL table and put an expression to for this new column: ExecutionEndTime - ExecutionStartTime.
Deletebut here u capture duration of oly metadata activity right?
ReplyDeletei would like to get end to end pipeline duration
You can also connect your ADF instance to a log analytics workspace in azure to collect all the logs. This will provide with all the timings that you're looking for, both for activities and pipelines.
Deletei could not get the correct time when i did end time -starttime.planning to do with restapi?any idea
ReplyDeleteYou can check the "billableDuration" output attribute from your web activity, if that is how you call your rest api.
Delete