(2019-Mar-10) A conversation between two or more people involves continuous efforts to listen and reflect on what other people have to say, i.e. you need to stay connected all the time, otherwise it's no longer a mutual communication. Emails provided us with the flexibility to provide timely and reasonably delayed responses, and I have already blogged about using Azure Data Factory with Emails. Azure Service Bus is one of the earliest or oldest components that was introduced in Azure. Can we use Data Factory to send messages to Azure Service Bus?
Image by Kranich17 on Pixabay
Spoiler alert: currently Data Factory does not support Service Bus queues as targets, so as a workaround, Logic App with a Send Message task could be used. Then this Logic App will be called from my Data Factory pipeline using Web activity task.
Use Case Description
Let's say I have a blob storage container with sourcing files that I need to copy to a staging blog container. And at the same time, while copying those files, I need to send a JSON message to Service Bus that would contain information of file name being sent and its size; that later will be read by another application or service.
Use Case steps
1) Service Bus Component
Service Bus is a cloud messaging service that allows sending messages between applications and service. It allows creating messaging (workspaces) namespaces with multiple queues within, that your applications can use as communication channels for information exchange. There is well-constructed documentation at this Microsoft web resource - https://docs.microsoft.com/en-us/azure/service-bus-messaging/ where you can get more information about Service Bus in Azure.
For my use case, I created a service bus queue with 14 days time to live default limit for messages and 30 seconds lock duration for pick & lock message receive scenario.
2) Logic App with Service Bus task development
My Azure Logic App is very simple and has two elements: trigger for HTTP requests and a task to send a message to my service bus queue that I have just created.
I define my HTTP request body JSON schema the following way, and it's up to you to enrich and add additional elements to this schema:
Then I reuse elements of this body in my Send message task for Session ID and Content parameters, which conclude my Logic App development. Don't forget to copy and save HTTP Post URL from the HTTP trigger, you will need to use it later in your Data Factory pipeline.
3) Data Factory Pipeline Development
My sourcing blob storage container has these 3 files:
- storeinventory_01_20181027.csv
- storeinventory_02_20181027.csv
- storeinventory_03_20181027.csv
The list of those files is pulled by [Get Metadata: List of files] activity where dataset folder property is set to @pipeline().parameters.SourceFilePath and field list to Child items. And files copying is done with a set of tasks in my ForEachContainer.
This container has 4 activity tasks within:
[Copy Data: CopyOneContainer] tasks Souce folder is set to @pipeline().parameters.SourceFilePath and filename is set to @item().name. Likewise, Sink folder is set to @pipeline().parameters.DestinationFilePath and filename is set to @item().name.
[Set Variable: Message ID] defines MessageID variable as @concat(utcnow(),'_',pipeline().RunId) just to have unique identifiers for my generated messages.
[Set Variable: Queue Message] defines Message variable with a content that will be submitted by my service bus queue as @concat('{"MessageID":"', variables('MessageID'), '","FileName":"', item().name, '","FileSize":"', activity('CopyOneContainer').output.dataWritten,'"}') to support JSON messaging format that I need:
To complete my ADF pipeline development, I add [Web: Send Queue Message] activity task with the following settings:
URL: copy and paste URL from my newly created Logic App.
Method: POST
Headers: Name: Content-Type with Value:application/json
Body: {"Message":"@variables('Message')","SessionID":"@pipeline().RunId"}
4) Testing ADF pipeline and validating Service Bus message
As a result of my data factory pipeline run, all 3 testing files get copied successfully to my Staging blob storage container. And most importantly, for each of the copied file a message was submitted to my service bus queue which then I validated using Service Bus Explorer:
Summary:
1) Azure Data Factory and Service Bus can find common grounds to communicate with each other, and Azure Logic Apps could serve as a good mediator to establish this type of messaging communication.
2) As soon as messages land in a service bus queue, it's now a responsibility of recipient side to obtain and process those message, which may be part of another blog post.
I have saved the code of my ADF pipeline from this blog post in this GitHub location:
https://github.com/NrgFly/Azure-DataFactory/blob/master/Samples/pipeline/CopyFilesWithServiceBus.json
Image by Kranich17 on Pixabay
Spoiler alert: currently Data Factory does not support Service Bus queues as targets, so as a workaround, Logic App with a Send Message task could be used. Then this Logic App will be called from my Data Factory pipeline using Web activity task.
Use Case Description
Let's say I have a blob storage container with sourcing files that I need to copy to a staging blog container. And at the same time, while copying those files, I need to send a JSON message to Service Bus that would contain information of file name being sent and its size; that later will be read by another application or service.
Use Case steps
1) Service Bus Component
Service Bus is a cloud messaging service that allows sending messages between applications and service. It allows creating messaging (workspaces) namespaces with multiple queues within, that your applications can use as communication channels for information exchange. There is well-constructed documentation at this Microsoft web resource - https://docs.microsoft.com/en-us/azure/service-bus-messaging/ where you can get more information about Service Bus in Azure.
For my use case, I created a service bus queue with 14 days time to live default limit for messages and 30 seconds lock duration for pick & lock message receive scenario.
2) Logic App with Service Bus task development
My Azure Logic App is very simple and has two elements: trigger for HTTP requests and a task to send a message to my service bus queue that I have just created.
I define my HTTP request body JSON schema the following way, and it's up to you to enrich and add additional elements to this schema:
{ "properties": { "Message": { "type": "string" }, "SessionID": { "type": "string" } }, "type": "object" }
Then I reuse elements of this body in my Send message task for Session ID and Content parameters, which conclude my Logic App development. Don't forget to copy and save HTTP Post URL from the HTTP trigger, you will need to use it later in your Data Factory pipeline.
3) Data Factory Pipeline Development
My sourcing blob storage container has these 3 files:
- storeinventory_01_20181027.csv
- storeinventory_02_20181027.csv
- storeinventory_03_20181027.csv
The list of those files is pulled by [Get Metadata: List of files] activity where dataset folder property is set to @pipeline().parameters.SourceFilePath and field list to Child items. And files copying is done with a set of tasks in my ForEachContainer.
This container has 4 activity tasks within:
[Copy Data: CopyOneContainer] tasks Souce folder is set to @pipeline().parameters.SourceFilePath and filename is set to @item().name. Likewise, Sink folder is set to @pipeline().parameters.DestinationFilePath and filename is set to @item().name.
[Set Variable: Message ID] defines MessageID variable as @concat(utcnow(),'_',pipeline().RunId) just to have unique identifiers for my generated messages.
[Set Variable: Queue Message] defines Message variable with a content that will be submitted by my service bus queue as @concat('{"MessageID":"', variables('MessageID'), '","FileName":"', item().name, '","FileSize":"', activity('CopyOneContainer').output.dataWritten,'"}') to support JSON messaging format that I need:
{ "MessageID": { "type": "string" }, "FileName": { "type": "string" }, "FileSize": { "type": "string" } }
To complete my ADF pipeline development, I add [Web: Send Queue Message] activity task with the following settings:
URL: copy and paste URL from my newly created Logic App.
Method: POST
Headers: Name: Content-Type with Value:application/json
Body: {"Message":"@variables('Message')","SessionID":"@pipeline().RunId"}
4) Testing ADF pipeline and validating Service Bus message
As a result of my data factory pipeline run, all 3 testing files get copied successfully to my Staging blob storage container. And most importantly, for each of the copied file a message was submitted to my service bus queue which then I validated using Service Bus Explorer:
Summary:
1) Azure Data Factory and Service Bus can find common grounds to communicate with each other, and Azure Logic Apps could serve as a good mediator to establish this type of messaging communication.
2) As soon as messages land in a service bus queue, it's now a responsibility of recipient side to obtain and process those message, which may be part of another blog post.
I have saved the code of my ADF pipeline from this blog post in this GitHub location:
https://github.com/NrgFly/Azure-DataFactory/blob/master/Samples/pipeline/CopyFilesWithServiceBus.json
Great post, it's worth calling out the cost of data factory especially with a for each can start getting out of hand really quickly if you don't monitor it and put some controls in place.
ReplyDeleteYes, I agree with you Nick, controlling your cost in Azure is important. And it will help to monitor this on a regular basis.
DeleteGreat Post. can you tell me the point at which you added the service bus address.
ReplyDeleteSorry for a late response. The service bus reference was added within my Logic App.
DeleteGreat post. Can we send the data from data lake to Azure service bus.
ReplyDeleteYes, you can create a Logic App to communicate with your Service Bus, then from your ADF pipeline you call this Logic App using Web activity and pass all your information.
DeleteSuper article! Est-ce que c'est possible de récupérer les données du service bus dans ADF pour ensuite les envoyer vers snowflake par exemple? Merci
ReplyDeleteGreat post. Can we get data from service bus to ADF in order to copy them into snowflake for instance?
ReplyDeleteWhat data do you plan to get from a service bus to ADF?
Delete