Search This Blog

Sunday, March 10, 2019

Communication in Azure: using Data Factory to send messages to Azure Service Bus

(2019-Mar-10) A conversation between two or more people involves continuous efforts to listen and reflect on what other people have to say, i.e. you need to stay connected all the time, otherwise it's no longer a mutual communication. Emails provided us with the flexibility to provide timely and reasonably delayed responses, and I have already blogged about using Azure Data Factory with Emails. Azure Service Bus is one of the earliest or oldest components that was introduced in Azure. Can we use Data Factory to send messages to Azure Service Bus?

Image by Kranich17 on Pixabay 

Spoiler alert: currently Data Factory does not support Service Bus queues as targets, so as a workaround, Logic App with a Send Message task could be used. Then this Logic App will be called from my Data Factory pipeline using Web activity task.


Use Case Description
Let's say I have a blob storage container with sourcing files that I need to copy to a staging blog container. And at the same time, while copying those files, I need to send a JSON message to Service Bus that would contain information of file name being sent and its size; that later will be read by another application or service.


Use Case steps

1) Service Bus Component 
Service Bus is a cloud messaging service that allows sending messages between applications and service. It allows creating messaging (workspaces) namespaces with multiple queues within, that your applications can use as communication channels for information exchange. There is well-constructed documentation at this Microsoft web resource - https://docs.microsoft.com/en-us/azure/service-bus-messaging/ where you can get more information about Service Bus in Azure.

For my use case, I created a service bus queue with 14 days time to live default limit for messages and 30 seconds lock duration for pick & lock message receive scenario.


2) Logic App with Service Bus task development
My Azure Logic App is very simple and has two elements: trigger for HTTP requests and a task to send a message to my service bus queue that I have just created.

I define my HTTP request body JSON schema the following way, and it's up to you to enrich and add additional elements to this schema:
{
    "properties": {
        "Message": {
            "type": "string"
        },
        "SessionID": {
            "type": "string"
        }
    },
    "type": "object"
}


Then I reuse elements of this body in my Send message task for Session ID and Content parameters, which conclude my Logic App development. Don't forget to copy and save HTTP Post URL from the HTTP trigger, you will need to use it later in your Data Factory pipeline.




3) Data Factory Pipeline Development
My sourcing blob storage container has these 3 files:
- storeinventory_01_20181027.csv
- storeinventory_02_20181027.csv
storeinventory_03_20181027.csv

The list of those files is pulled by [Get Metadata: List of files] activity where dataset folder property is set to @pipeline().parameters.SourceFilePath and field list to Child itemsAnd files copying is done with a set of tasks in my ForEachContainer.


This container has 4 activity tasks within:


[Copy Data: CopyOneContainer] tasks Souce folder is set to @pipeline().parameters.SourceFilePath and filename is set to @item().nameLikewise, Sink folder is set to @pipeline().parameters.DestinationFilePath and filename is set to @item().name

[Set Variable: Message ID] defines MessageID variable as @concat(utcnow(),'_',pipeline().RunId) just to have unique identifiers for my generated messages. 

[Set Variable: Queue Message] defines Message variable with a content that will be submitted by my service bus queue as @concat('{"MessageID":"', variables('MessageID'), '","FileName":"', item().name, '","FileSize":"', activity('CopyOneContainer').output.dataWritten,'"}') to support JSON messaging format that I need:
{
  "MessageID": {
    "type": "string"
  },
  "FileName": {
    "type": "string"
  },
  "FileSize": {
    "type": "string"
  }
}

To complete my ADF pipeline development, I add [Web: Send Queue Message] activity task with the following settings:
URL: copy and paste URL from my newly created Logic App.
Method: POST
Headers: Name: Content-Type with Value:application/json

Body: {"Message":"@variables('Message')","SessionID":"@pipeline().RunId"}

4) Testing ADF pipeline and validating Service Bus message
As a result of my data factory pipeline run, all 3 testing files get copied successfully to my Staging blob storage container. And most importantly, for each of the copied file a message was submitted to my service bus queue which then I validated using Service Bus Explorer:



Summary:
1) Azure Data Factory and Service Bus can find common grounds to communicate with each other, and Azure Logic Apps could serve as a good mediator to establish this type of messaging communication.
2) As soon as messages land in a service bus queue, it's now a responsibility of recipient side to obtain and process those message, which may be part of another blog post.

I have saved the code of my ADF pipeline from this blog post in this GitHub location: 
https://github.com/NrgFly/Azure-DataFactory/blob/master/Samples/pipeline/CopyFilesWithServiceBus.json

No comments:

Post a Comment