In this part 2 post I wanted to explore the way to trigger the spark job I created in part 1 which was able to process my capture files.

I wanted to look into using a storage event trigger for the capture files being written and then have this trigger a data flow pipeline which processes that file.

If we go back to my diagram from part 1 we are really looking at the section in red.

I have my events coming into the event hub from my Logic App simulating the data being written to the event hub regularly and then I am ending up with the data in avro files being written to the landing area as shown below.

If you notice from the path the data goes to the folder as per the capture path:

{Namespace}/{EventHub}/{PartitionId}/Year={Year}/Month={Month}/Day={Day}/{Hour}-{Minute}-{Second}

In the previous post I developed a spark job which was exploring the processing of data, splitting it based on the message type property on the event hub message and moving data to a different folder in the data lake so it was grouped by message type and was in parquet format.

What I wanted to do now is work out how to trigger the processing each time a file is written to storage by capture.

Investigating the documentation it seems like the best way to do this is to create a data flow pipeline in Synapse which subscribes to storage events on the data lake and then when the pipeline runs it will trigger the notebook to process the file.

In the rest of this process I will walk through how I did this.

Step 1 – Setup RBAC for Event Subscription

I was stumped by this for a while later in my proof of concept and the documentation does cover this but its not that clear or obvious so I want to get this up front so you dont have problems with it later.

When we setup the trigger for the job and publish it, unless you already have it configured you will probably have a failure registering the event grid subscription. Now what happens here is the publish of your pipeline will create an event grid topic in your resource group and subscribe to the storage events for you. The act of publishing can fail if the appropriate permissions are not already in place.

The microsoft documentation for this is here:

https://docs.microsoft.com/en-us/azure/data-factory/how-to-create-event-trigger?tabs=data-factory

I kept getting the following message

The client '[guid]' with object id '[object id]' 
does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/write' over scope '
/subscriptions/[sub]/resourceGroups/SynapsePOC/providers/Microsoft.Storage/storageAccounts/blogmsdemodl/providers
/Microsoft.EventGrid/eventSubscriptions/[guid]' or the scope is invalid. If access was recently granted, please refresh your credentials.

I tried setting up permissions for my user and the synapse workspace managed identity but none of that worked. When I investigated I found that the client id guid actually related to an Azure AD enterprise application for Azure Data Factory as below:

Im assuming this is setup behind the scenes. I gave this app the contributor role on the storage account as per the documentation where it says “the account running the service”.

There is an option to allow a tighter permission around the event grid permission if you want to do lower priveledge.

Step 2 – Modify the Spark Notebook

The spark notebook I made in the previous post was going to need some changes to work with this scenario. The main difference is that rather than processing a bunch of folders and files within them, this time I want to process a specific file which got created. I also need the notebook to be callable from the pipeline and to be passed in some information from the event. Lets walk through the notebook.

First off I have a parameters section which lets me setup parameters which will be passed in by the pipeline. Ive also set some defaults to help me develop the notebook so it will point to a specific file I want to process.

Next up I wanted to visualise my parameters for troubleshooting and I also needed to parse the input folder path which will come from the storage event. When we read the file later the path for the spark read is different to the path coming from the event so I needed to extract the container name from the path and get the rest of the relative path.

You can see the above section produces this

Next I needed to process the relative path to extract the date info so I can use it for partitioning later in the notebook. There are probably bettwe ways to do this but I ended up creating a data frame and using pyspark.sql to extract the date info using regular expressions.

This section produces this output

You can see on the right of the picture the columns for my dates which have been extracted from the path.

Next up I am going to do the read. The change from the original notebook is now instead of reading all files in a folder structure I want to read a specific file. I pass in the path for the specific file and print the data frame schema.

Next up I want to copy across the date information I extracted from the relative path earlier so that they are in columns on the data frame I read from the file. This will mean that later when I write the file to the new location I can partition based on these columns so it will create the file in the right place.

The next section is for troubleshooting again but it lets me see the message types within the data im processing.

The final section will write the data out to the new location(s) but you can see that using the .partitionBy and specifying the MessageType, Year, Month, Day and File columns then I will end up with the 1 input file being split across multiple files in folders for each message type and then it will use the date partitioning structure that capture originally created. Finally the last bit of partitioning by file lets me have a folder for each file that was processed. There may be a better way to do this but the output parquet file has a filename which seems generated so this seperation I am assuming there will be no conflicts between multiple storage events at the same time.

One point to note here is the overwrite mode. I am using append now. In the previous post I used overwrite which will replace the file structure, but if I use append it will mean that the existing structure is maintained and any new files processed will be merged into it. If you use overwrite then each time a file is ran it will replace the files previously processed and their folders in the locations.

The final code for the notebook looks like this below.

Parameters section which has some values in it for when I run it to try it out but these will be passed in from the pipeline later

input_file = "15-22-46.avro"
input_folder_Path = "eventhub/blog-ms-demo/to-synapse/0/Year=2021/Month=08/Day=08"
input_storageaccount_name = "blogmsdemodl"

The rest of the code look like:

# Section which just displays some info for troubleshooting
from pyspark.sql.functions import *
from pyspark.sql.functions import split


displayHTML("Input Parameters:</br>")
displayHTML("input_file: %s</br>" % input_file)
displayHTML("input_folder_Path: %s</br>" % input_folder_Path)
displayHTML("input_storageaccount_name: %s</br>" % input_storageaccount_name)
displayHTML("</br>")


#Get the container from the input parameter
folderNames = input_folder_Path.split("/")
source_container_name = folderNames[0]
displayHTML("source_container_name: ")
displayHTML(source_container_name)
displayHTML("</br>")

#Get the relative path from the input parameter
source_relative_path = input_folder_Path[len(source_container_name):None]
displayHTML("source_relative_path: ")
displayHTML(source_relative_path)
displayHTML("</br>")

#Output File
output_file_names = input_file.split(".")
output_file_name = output_file_names[0]
displayHTML("output_file_name: ")
displayHTML(output_file_name)
displayHTML("</br>")



#Load the input to a data frame so we can process it and extract the day month and year from the path with a regex

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
input_data = [(input_file, input_folder_Path, input_storageaccount_name, source_container_name, source_relative_path)]

input_schema = StructType([ \
    StructField("input_file", StringType(), True), \
    StructField("input_folder_Path", StringType(), True), \
    StructField("input_storageaccount_name", StringType(), True), \
    StructField("source_container_name", StringType(), True), \
    StructField("source_relative_path", StringType(), True) \
  ])
 
inputDf = spark.createDataFrame(data=input_data, schema=input_schema)
inputDf = inputDf.withColumn("Year", regexp_extract(inputDf.source_relative_path, "Year=(.+?)/", 1))
inputDf = inputDf.withColumn("Month", regexp_extract(inputDf.source_relative_path, "Month=(.+?)/", 1))
inputDf = inputDf.withColumn("Day", regexp_extract(inputDf.source_relative_path, "Day=(.*)", 1))

inputDf.printSchema()
inputDf.show(truncate=False)


# Load the input file to a data frame and check the schema

source_file_name = input_file
source_storageaccount_name = input_storageaccount_name
inputPath = 'abfss://%s@%s.dfs.core.windows.net/%s/%s' % (source_container_name, source_storageaccount_name, source_relative_path, source_file_name)
displayHTML("inputPath: %s</br>" % inputPath)
inputStreamDf = spark.read.format('avro').load(inputPath)

inputStreamDf.printSchema

# Convert the body field to a string and select fields I want and rename the message type column to something friendlier

inputStreamDf = inputStreamDf.withColumn("Body", expr("CAST(Body as String)"))
inputStreamDf = inputStreamDf.select(inputStreamDf.Body, inputStreamDf.EnqueuedTimeUtc, inputStreamDf.Properties.MESSAGE_TYPE.member2)
inputStreamDf = inputStreamDf.withColumnRenamed("Properties[MESSAGE_TYPE].member2", "MessageType")

# Add the year month and day that originally came from the path to the data frame for partitioning later


year = inputDf.first().Year
month = inputDf.first().Month
day = inputDf.first().Day
inputStreamDf = inputStreamDf.withColumn("Year", lit(year))
inputStreamDf = inputStreamDf.withColumn("Month", lit(month))
inputStreamDf = inputStreamDf.withColumn("Day", lit(day))
inputStreamDf = inputStreamDf.withColumn("File", lit(output_file_name))

#display(inputStreamDf)

# Display the message types within the data frame for troubleshooting

messageTypes = inputStreamDf.select(inputStreamDf.MessageType)
msgTypes = messageTypes.distinct()

#display(msgTypes)

# Write out the data frame to the new location, using the partitioning will split out the data into folders for different message types and also we will include the original file name as a partition folder too


target_storageaccount_name = "blogmsdemodl"
target_container_name = "blogmsdemofs"
target_relative_path = "Raw/EventHub"

target_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (target_container_name, target_storageaccount_name, target_relative_path)

inputStreamDf.write.partitionBy("MessageType", "Year", "Month", "Day", "File").format("parquet").mode("append").save(target_path)

When I run the notebook I then end up with the following files created. Note on the 2 images below you can see the different folder paths so the 1 input file created files in 2 output folders.

Step 3 – The Pipeline

Now that I have a parameterised notebook and I now need to create a pipeline that will subscribe to the storage event and call the notebook.

The pipeline is pretty simple. I have 2 set variable shapes just for troubleshooting to make it easy to see the parameters from the trigger then I just call the notebook.

On my pipeline I have created 2 parameters as shown below:

You can see on the properties for my notebook activity I am passing in the pipeline parameters as the names of the parameters we created earlier on the notebook.

Step 4 – The Trigger

I now created a trigger for my pipeline using the trigger menu on the pipeline designer. It will subscribe to all file create events for .avro files created in the container which capture writes files to from the event hub.

The next page will show you some of the paths which would match your filter, but the one after that allows you to map the trigger to your pipeline parameters.

The 2 parameters to use are slightly different for Synapse than Data Factory so just be aware of that, but the ones to use are:

@trigger().outputs.body.folderPath
@trigger().outputs.body.fileName

I might also need to start my trigger. To do this go to the manage section and the triggers and there will be a Play icon next to your trigger. Press that and then publish and it will then turn your trigger on.

Note this is the point where the event grid topic gets created I believe and if the RBAC setup I talked about earlier isnt right then you may get that error here waiting for the trigger to subscribe for events.

Step 5 – Triggering the Job

Going back to my previous demo I have a logic app which is regularly pushing data into the event hub and capture is creating files regularly too. The next time it runs after my pipeline and trigger are published we should expect a pipeline run to happen. If we monitor this through we should see the following.

In my capture folder you can see the new file is created, it might take a short period for the file to be created depending on your capture settings.

We can now go to the monitoring for Synapse and check the triggers and we will see the trigger ran for the file above.

We can now go to the pipeline runs and see the pipeline that ran from the trigger.

If we click on the pipeline run we can see how it ran.

I can also go to the spark applications and look on how my job ran

I can now go to my output directory and check where the file was produced for each of the different file types.

The above one is for the ELE message type, but there will also be files in the MessageType=GAS and MessageType=Water folder and sub folders.

Step 6 – Query the Data

Next I want to query the data which has been processed and I can do the same as we did in the previous post to query the data and create serverless views. The one thing to note however is the output path was slightly different to last time. Note the path below with the red bit highlighting the extra bit of the path. This is because when processing files individually we needed to have that extra folder so we can partition to include the file.

https://blogmsdemodl.dfs.core.windows.net/blogmsdemofs/Raw/EventHub/MessageType=ELE/Year=*/Month=*/Day=*/*/*.snappy.parquet

Select 
    *
from openrowset(
    bulk 'https://blogmsdemodl.dfs.core.windows.net/blogmsdemofs/Raw/EventHub/MessageType=ELE/Year=*/Month=*/Day=*/*/*.snappy.parquet',
    format = 'parquet')
 AS [r];

Summary

In part 2 I have extended what we did in the part 1 post so we can process capture files on an event basis.

On reflection I think there is pro’s and con’s of doing it this way versus doing in on a scheduled basis and going over the entire file structure for a given period. Maybe processing data hourly or something.

I think that would be likely to boil down to your requirements and how you want to manage your resources.

Hopefully these notes will help others looking at a similar approaches and it would be interesting to compare notes with others doing similar things.

 

Buy Me A Coffee