The next part of my trying out for Synapse alongside an Integration platform involves using Event Hub and its capture feature to ingest data into the Data Lake so it can be queried and used in Synapse.
There are a couple of really good articles and videos which are worth checking out.
First off check out this excellent video to quickly get started: https://www.youtube.com/watch?v=OIOEsd2Iiik
Then some of these articles provide good additional reading:
While those sources above are excellent I felt it didnt quite cover exactly what I was trying to achieve so wanted to explore this topic a little further. My requirements were that I have a number of different application / integration scenarios where we want to send messages to the data lake and one of the reference architectures for doing this involves Event Hub as an input for streamed data into your data lake which you can then process the data and analyse it with Synapse.
If you imagine the below architecture.
In the diagram we are looking at the red dotted line area in particular and if we expand that out to understand what happens internally in terms of how Event Hub, Synapse and the Data Lake play together we would see something like the below diagram.
This time the red dotted line focuses on to a particular part of what we will be looking at and when messages are written to the event hub, we will use the capture feature which will write files out to the data lake storage in the landed area. We will the process them with Synapse and transform them into a more usable format and put them into a different part of the raw zone in datalake.
One of the points to maybe note here is I may have my terminology crossed around the terms landing area and raw zone. I know they can mean the same thing and being an integration person rather than a data person please bear with me here but in this case im terming the landing area to be where the messages exactly as then come from Event Hub. One of the challenges I want to deal with is that my event hub may have multiple different types of event on it and I want to deal with that in my transformation and split the messages out and better structure them. I felt in my case this is a better approach than having a different event hub for each type of event and im guessing capture might be charged per event hub, the documentation isnt very clear on that.
In this article I wanted to look into one of the bits I didnt feel that was covered by the above resources which is how to get the messages from the event hub capture output of avro files in the data lake and split the messages out based on the different event types we received and then be able to process them in a Synapse query. That is what ill focus on in this article.
One of the design decisions you might make in this type of solution is if you want to use Event Hub Capture or if you want to manage reading the event hub stream yourself. Im assuming you have already decided to use capture and you have it setup and pushing your events to the Data Lake and you are getting the avro files in your landing area within the Raw Zone of your data lake. The bit we want to look into is what to do next.
Before getting into the walk through, I want to cheat a little bit and point you to the above links at the start of the article, in particular the you tube video if you want some background on setting up the Event Hub, Synapse, etc. This will cover:
- Setup a Synapse workspace
- Setup a data lake
- Setup an event hub with Capture enabled
- You have a spark pool setup in Synapse
My starting point
In my data lake I have 2 containers, the EventHub one is where the data from the capture will go as shown below. The other container is the one used by Synapse during its setup.
You can see if I dig into the container I have some avro files already published by Event Hub.
You can also see my Capture configuration below.
Note my capture file name format is slightly modified from the default which I saw in the youtube video which looks a good idea to make the folders more readable.
To simulate adding some data to the event hub I have a couple of logic apps which will send data to it. They will simulate data from my factory which is monitoring sensors. Each message sent to the event hub has a property called MESSAGE_TYPE which tells me the type of data being used.
The body for each message might be in different formats. In my case they are all json with different schemas but I guess they could be text,xml or whatever.
When capture is outputting the avro files the changes are I will have files containing a mixture of different messages and my aim is to split them up and output them to data lake in partitions based on the message type to make additional processing of the data easier.
Following what I learned in the video and from a few articles I tried to see if I can create a spark job which will process the avro files and do the following:
- Read the avro files
- Split messages by message type so they are in different folders
- Create parquet files in the data lake for the split out data
- Maintain some kind of folder structure based on when the files are created
Experimenting with Spark Job
Next up I wanted to create a spark notebook which will run over the files in the landing area which contains the avro files from Event Hub. The notebook will run with the synapse workspace managed identity which already has access to the data lake storage account so we do not have to worry about configuring any permissions.
The script for the job breaks down into the following areas:
This code snipped will format the path for my folder where capture is writing files to. It will then read the stream of files and show me the schema.
You can see the schema looks like below for the typical event message. Note i will comment out the schema print and display messages in the real script.
The key point to note about the schema is the Body is the encoded event data and I also have a property for the message type where ill have to use the path inputStreamDf.Properties.MESSAGE_TYPE.member2 to access it.
Next up we just display the messages so I can take a look at a few examples.
Next up I wanted to check what different message types are in my data stream to help me see how its processing stuff. I can run the below and see the output for the summarised different message types.
I then want to do some basic transformations which include:
- Decode the body to a readable string
- Get rid of a few columns
- Rename the message type from being Properties[MESSAGE_TYPE].member2 to be MessageType so its easier to work with.
Id run the below and see the updated results.
Finally I format the output path and write out the stream as parquet files with the below script. If you notice in the inputStreamDf.write I use the partitionBy option which is pretty cool and will write out the data to a folder structure using the fields from the data frame for message type, year, month and day. This means in the data lake ill get a structure:
- All of my files for that combination
Note I have left the commented line with the coalesce in it if I am troubleshooting and want to write this all to 1 file but in the real world you dont want to do that and just let it write to the number of files that maximize performance. Note my earlier comment about being fairly new to Synapse so check out other peoples content if you want to explore that some more as I havent reached that point on my journey yet.
The full script I used is below:
from pyspark.sql.functions import * source_storageaccount_name = "blogmsdemodl" source_container_name = "eventhub" source_relative_path = "blog-ms-demo/to-synapse/*" inputPath = 'abfss://%s@%s.dfs.core.windows.net/%s' % (source_container_name, source_storageaccount_name, source_relative_path) inputStreamDf = spark.read.format('avro').load(inputPath) inputStreamDf.printSchema display(inputStreamDf) #Display message types messageTypes = inputStreamDf.select(inputStreamDf.Properties.MESSAGE_TYPE.member2) msgTypes = messageTypes.distinct() display(msgTypes) #Transform data inputStreamDf = inputStreamDf.withColumn("Body", expr("CAST(Body as String)")) inputStreamDf = inputStreamDf.select(inputStreamDf.Body, inputStreamDf.EnqueuedTimeUtc, inputStreamDf.Properties.MESSAGE_TYPE.member2, inputStreamDf.Year, inputStreamDf.Month, inputStreamDf.Day) inputStreamDf = inputStreamDf.withColumnRenamed("Properties[MESSAGE_TYPE].member2", "MessageType") display(inputStreamDf) #Write out to data lake target_storageaccount_name = "blogmsdemodl" target_container_name = "blogmsdemofs" target_relative_path = "Raw/EventHub" target_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (target_container_name, target_storageaccount_name, target_relative_path) inputStreamDf.write.partitionBy("MessageType", "Year", "Month", "Day").format("parquet").mode("overwrite").save(target_path) #inputStreamDf.coalesce(1).write.partitionBy("MessageType", "Year", "Month", "Day").format("parquet").mode("overwrite").save(target_folder_path)
I can now run my full job and it will process files from Event Hub.
Checking Spark Output
If we now check in the data lake in my Raw / Event Hub folder under the main container for my synapse workspace you can see the following folders for each message type:
If we drill down the folder structure under the ELE message we can then see a bunch of parquet files which have been created for my data.
Next we can go and run a query on our Serverless SQL Pool and we can query the files for the ELE message type using wild cards for the date like below.
Select * from openrowset( bulk 'https://blogmsdemodl.dfs.core.windows.net/blogmsdemofs/Raw/EventHub/MessageType=ELE/Year=*/Month=*/Day=*/*.snappy.parquet', format = 'parquet') AS [r];
We can see the results show only our ELE messages.
This looks good now so next I will create a view so ive an easy way for people to query this data.
Create View in SQL Serverless Pool
I can parse the data with a view using JSON_VALUE and create a typical SQL like view using the below query.
CREATE VIEW Plant_ELE_Data AS select AVG_CONFIDENCE = JSON_VALUE(Body, '$.AVG_CONFIDENCE'), DAYTIME = JSON_VALUE(Body, '$.DAYTIME'), GRS_MASS = JSON_VALUE(Body, '$.GRS_MASS'), MESSAGE_TYPE = JSON_VALUE(Body, '$.MESSAGE_TYPE'), RUN_HOURS = JSON_VALUE(Body, '$.RUN_HOURS'), TAG_NAME = JSON_VALUE(Body, '$.TAG_NAME') from openrowset( bulk 'https://blogmsdemodl.dfs.core.windows.net/blogmsdemofs/Raw/EventHub/MessageType=ELE/Year=*/Month=*/Day=*/*.snappy.parquet', format = 'parquet') AS [r];
This means I can run a query like below and see my ELE data in an easy to use format.
SELECT TOP (100) [AVG_CONFIDENCE] ,[DAYTIME] ,[GRS_MASS] ,[MESSAGE_TYPE] ,[RUN_HOURS] ,[TAG_NAME] FROM [dbo].[Plant_ELE_Data]
Hopefully this post is useful sharing how I am thinking handling some of our multiple different message types from the event hub and getting them queryable.
I think my next step is going to be looking into options of triggering the spark job from the creation of a capture file or running it on a schedule.