basysKom AnwendungsEntwicklung

How to consume Protobuf messages in Azure Functions (Part 3 of 4)
Essential Summary
In the third part of our series, we show how Protobuf messages from the IoT Hub can be processed in Azure Functions.

Introduction

Now that the Protobuf messages of our IoT device has arrived at the IoT Hub, we have to consume the incoming messages. This could mean forwarding them to a processing system or storing them for later use.

For JSON or AVRO encoded data, we could directly route the messages into Azure Storage. To consume Protobuf encoded data, we have to execute custom code to perform the deserialization.

Azure Functions

Azure Functions offer a way to execute custom functions in the Azure cloud.

Functions are invoked by a trigger or binding, for a example a cron style timer, an HTTP request or an event like incoming data on other Azure services, for example ServiceBus Queues and Topics, Event Hubs or Event Grids. 

The functions can be hosted on Windows or Linux. Both platforms support a wide range of programming languages like JavaScript, TypeScript, C#, F#, Java, Python and Powershell.

Depending on the selected service plan, resources can be scaled out dynamically to handle an increased load and will be scaled in automatically if the load drops.

Example

To demonstrate how Protobuf messages received by an IoT Hub can be consumed, we will implement a simple Azure Function in JavaScript. It will decode the Protobuf messages and write the decoded data into an Azure Table Storage.

The full example code is hosted on github.

External resources

  • An IoT Hub with at least one IoT device
  • An Azure Storage account with two storage tables named Sensordata and Events
  • The example IoT device application from the previous blog post

Node.js modules

The handling code for our Protobuf messages is generated using protoc. It requires the google-protobuf module.

Custom environment variables

  • IOTHUB_CONNECTION_STRING is the connection string for the EventHub compatible endpoint of your IoT Hub instance
  • STORAGE_CONNECTION_STRING is the connection string for the Azure Storage account where the two storage tables are located

Input and output bindings

We use an eventHubTrigger input binding to have our function executed whenever there is a new message on the EventHub compatible endpoint of our IoT Hub. The binding is configured to combine multiple messages (if available) in one function invocation.

Two table storage output bindings are configured for the tables mentioned in the requirements.

Bindings are defined in a file named function.json which resides in the same directory as the function code. The property values can be environment variables which are initialized from the App Settings of the Function App the function is running in. In our example, the connection properties are configured that way.

{
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "IoTHubMessages",
      "direction": "in",
      "eventHubName": "samples-workitems",
      "connection": "IOTHUB_CONNECTION_STRING",
      "cardinality": "many",
      "consumerGroup": "$Default",
      "dataType": "binary"
    },
    {
      "tableName": "Sensordata",
      "connection": "STORAGE_CONNECTION_STRING",
      "name": "sensorDataTableBinding",
      "type": "table",
      "direction": "out"
    },
    {
      "tableName": "Events",
      "connection": "STORAGE_CONNECTION_STRING",
      "name": "eventsTableBinding",
      "type": "table",
      "direction": "out"
    }
  ]
}
 

Code walkthrough

First, we create two arrays for our output bindings. Their names must correspond to the names specified in the bindings and all values that have been pushed to these arrays will be processed after our custom code has finished executing.

The IoTHubMessages parameter corresponds to the name given in the input binding. It contains an array of raw messages that have been pushed to the IoT Hub by one or several different clients. In our case, this is Protobuf encoded data of the DeviceMessages type.

The serialized Protobuf data is now deserialized using the code generated by protoc.
Information on the IoT Hub device that has sent the message is embedded in the context object. We extract the sender’s device id to use it as identifying information in our Table Storage entries.

As the DeviceMessages message can contain multiple telemetry messages from the same sender, an inner loop is required.

For each message, we prepare a Table Storage entry with the sender’s device id as the partition key and the message’s unix timestamp as the row key. In case of the EnvironmentData message, the partition key is extended with '_environmentData'. This allows distinguishing and querying more message types later if it should become necessary.
The timestamp field is converted to a Date object and added to the entry as a property named sourceTimestamp.

After preparing the Table Storage entry, the properties of the Event or EnvironmentData message are merged into the object which is then pushed to the corresponding output binding arrays.

When we have finished processing the data, context.done() is called to notify the function host of the successful function execution.

const messages = require('../generated/environment_iot_messages_pb')

module.exports = function (context, IoTHubMessages) {

    context.bindings.sensorDataTableBinding = []
    context.bindings.eventsTableBinding = []

    for (let i = 0; i < IoTHubMessages.length; ++i) {
        const deserializedMessage = new proto.iotexample.DeviceMessages.deserializeBinary(IoTHubMessages[i]);

        const sender = context.bindingData.systemPropertiesArray[i]['iothub-connection-device-id'];

        for (const message of deserializedMessage.getTelemetryMessagesList()) {
            console.log(`Message from device ${sender}`);

            let data = message.toObject();

            const tableStorageEntry = {
                PartitionKey: sender,
                RowKey: data.timestamp,
                sourceTimestamp: new Date(data.timestamp)
            }

            if (data.environmentData) {
                tableStorageEntry.PartitionKey += '_environmentData';
                context.bindings.sensorDataTableBinding.push({...tableStorageEntry, ...data.environmentData});
            } else if (data.event) {
                context.bindings.eventsTableBinding.push({...tableStorageEntry, ...data.event});
            }
        }
    }

    context.done();
}; 

Testing the example

The example can be deployed to an Azure Function App, but for learning purposes, it is also very handy to run the application locally using Visual Studio Code. This requires a file named local.settings.json with the environment variables our Function App needs to run.
{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "Put your storage account connection string here",
    "FUNCTIONS_WORKER_RUNTIME": "node",
    "FUNCTIONS_EXTENSION_VERSION": "~3",
    "IOTHUB_CONNECTION_STRING": "Put your IoT Hub connection string here",
    "STORAGE_CONNECTION_STRING": "Put your storage account connection string here"
  }
} 

Just press F5 and the application should be launched.

If you run the Qt based example application in parallel, you should see the function being invoked and messages will start showing up in the storage tables.

How to consume Protobuf messages in Azure Functions (Part 3 of 4) 1 basysKom, HMI Dienstleistung, Qt, Cloud, Azure
Instead of or in addition to writing the data into a storage table, the deserialized data could also be pushed to an Event Hub to feed Azure Stream Analytics or some other data processing system.

Conclusion

By combining the Azure IoT SDK, an IoT Hub and Azure Functions, it is possible to create an end to end prototype for getting sensor data into the cloud in just a few hours.
Jannis Völker

Jannis Völker

Jannis Völker is a software engineer at basysKom GmbH in Darmstadt. After joining basysKom in 2017, he has been working in connectivity projects for embedded devices, Azure based cloud projects and has made contributions to Qt OPC UA and open62541. He has a background in embedded Linux, Qt and OPC UA and holds a master's degree in computer science from the University of Applied Sciences in Darmstadt.

Schreibe einen Kommentar

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.

Weitere Blogartikel

basysKom Newsletter

We collect only the data you enter in this form (no IP address or information that can be derived from it). The collected data is only used in order to send you our regular newsletters, from which you can unsubscribe at any point using the link at the bottom of each newsletter. We will retain this information until you ask us to delete it permanently. For more information about our privacy policy, read Privacy Policy