Recently Ive been looking at a scenario where we will be pushing a lot of messages to Azure Service Bus and then processing them into the backend application. The architecture looks a little like the picture below:

Im a big fan of sending messages to topics and then processing messages from queues as you can see above.

Anyway the problem I had is that the sending application will send the state of every entity and for many of these entities the data will not have changed. Down the line I want to pub/sub these changes to multiple systems but for now lets keep things simple and do one destination. On a normal day the sending application will send out 50k+ messages but 90% of them will not have changed since yesterday. I want to be able to get rid of the noise and focus on processing just the messages that need to be processed.

I explored a couple of different ways to do this but in the end this is what I came up with:

  1. The message sent by the sending application would include 2 new custom properties
    1. Process Name would indicate the name or the message process eg: Student Update
    2. Primary Key would indicate the unique id for the message data correlating to the source system eg: Student ID
  2. I would create a new queue called the dedupe-queue
  3. I would create an Azure function to read this queue and perform the dedupe task
  4. The function would use an Azure table. The table name would match the Process Name property on the service bus message and it would check in this table to see if the message had been processed before
  5. The look up would use the primary key property from the message which would be the row key in the table
  6. If the row was not found then this is the first time the message has been processed
  7. If the row was found then I would retrieve the message body from the row
  8. In the function it would compare the new body with the body from the table and if they were different then the message would be considered to have changed
  9. I would then update the row in the table with the updated message body
  10. The function would create a new message to send back to service bus which is a copy of the previous message but it would include a new property called IsDuplicate
  11. The function would then forward the message to a new topic called approved-updates which would have a subscription where IsDuplicate = true to send messages to the worker queue
  12. Any messages where IsDuplicate was false would have no subscription and they would disappear

The updated architecture would look like the below diagram:

Code for Function

In the csx file we put the code for 2 helper classes, 1 was a data entity which is the entity saved in the Azure table. The other class called Helpers contains some simple helper methods to check if the object has changed and to update the object in storage.

The below function should be generic and can work with different types of message. You simple need to do the following to get it working:

  1. Setup your Azure Function Service Bus trigger
  2. Setup your Azure Function Service Bus output
  3. Add a connection string for Azure Storage below where advised
#r “Microsoft.ServiceBus”

#r “Microsoft.WindowsAzure.Storage”

#r “System.Runtime.Serialization”

using System;

using System.IO;

using Microsoft.ServiceBus.Messaging;

using Microsoft.WindowsAzure.Storage;

using Microsoft.WindowsAzure.Storage.Table;

public static void Run(BrokeredMessage message, TraceWriter logger, out BrokeredMessage outputSbMsg)

{

var storageConnectionString = “<Add connection string>”;

logger.Verbose(“Message received”);

//Get Info from inbound service bus message

var messageId = message.MessageId;

var primaryKey = (string)message.Properties[“PrimaryKey”];

var processName = (string)message.Properties[“ProcessName”];

var bodyStream = message.GetBody<Stream>();

var sr = new StreamReader(bodyStream);

var jsonText = sr.ReadToEnd();

//Setup Outbound Service Bus Message

var newStream = new MemoryStream();

var newStreamWriter = new StreamWriter(newStream);

newStreamWriter.Write(jsonText);

newStreamWriter.Flush();

newStream.Flush();

newStream.Seek(0, SeekOrigin.Begin);

outputSbMsg = new BrokeredMessage(newStream, false);

outputSbMsg.Label = message.Label;

outputSbMsg.Properties.Add(“PrimaryKey”, primaryKey);

outputSbMsg.Properties.Add(“ProcessName”, processName);

//Check table store to see if its a duplicate

var storageAccount = CloudStorageAccount.Parse(storageConnectionString);

var tableClient = storageAccount.CreateCloudTableClient();

var table = tableClient.GetTableReference(processName);

if(Helpers.HasChanged(processName, primaryKey, jsonText, table))

{

outputSbMsg.Properties.Add(“IsDuplicate”, “FALSE”);

logger.Info(“Message changed so forwarded to Service Bus”);

}

else

{

outputSbMsg.Properties.Add(“IsDuplicate”, “TRUE”);

logger.Info(“Message is set as a duplicate”);

}

logger.Info(“Message Processed”);

}

public class Helpers

{

public static bool HasChanged(string processName, string primaryKey, string jsonText, CloudTable table)

{

var retrieveOperation = TableOperation.Retrieve<DataEntity>(processName, primaryKey);

var retrievedResult = table.Execute(retrieveOperation);

if (retrievedResult.Result == null)

return true; //Object not in table

else

{

var entity = (DataEntity)retrievedResult.Result;

if (entity.JsonText == jsonText)

return false; //Objects match so no change

else

return true;

}

}

public void Update(string processName, string primaryKey, string jsonText, CloudTable table)

{

var entity = new DataEntity(processName, primaryKey);

entity.JsonText = jsonText;

var insertOperation = TableOperation.InsertOrReplace(entity);

table.ExecuteAsync(insertOperation);

}

}

public class DataEntity : TableEntity

{

public DataEntity(string processName, string primaryKey)

{

this.PartitionKey = processName;

this.RowKey = primaryKey;

PrimaryKey = primaryKey;

ProcessName = processName;

}

public DataEntity() { }

public string PrimaryKey { get; set; }

public string ProcessName { get; set; }

public string JsonText { get; set; }

}

When I did some testing of this I used the Azure Function dynamic service plan which should allow the function to burst scale up and down based on function usage. In practice I have had this function processing in the region of 100 messages per second. I need to spend some more time understanding how to monitor and optimize the function but I thought this was a really interesting approach to deduplication. There are some out of the box deduplication options for service bus but according to the documentation they only compare the message id where this will compare the entire message body.

 

Buy Me A Coffee