Table of Contents
Apache Avro

What is Apache Avro compared to Protobuf

Apache Avro

Introduction – Apache Avro and Protobuf

After we have learned some details about Protobuf in the second part of the IoT series, we will now give an introduction to Apache Avro and work out how it compares to Protobuf and if it is suitable to be used in IoT devices.

Apache Avro Basics

Avro is a framework for data serialization which has been developed as part of Apache Hadoop.
It is mainly used in big data processing and is also supported by some of the data processing solutions in Microsoft Azure.

Schemas

Avro uses schemas to structure the data. Schemas are usually defined in JSON, but there is also support for an IDL. This post will concentrate on the JSON format.

As an example, we will now recreate the environment sensor messages from the Protobuf post as a JSON schema. Some changes are necessary due to differences between Protobuf and Avro

  • Avro does not support unsigned types. The timestamp becomes a 64 bit signed integer.
  • Contrary to Protobuf, where all fields are optional, Avro does not support optional fields. Every field should get a default value to allow removing it when changing the schema.
  • Avro does not have Protobuf’s oneof feature. As a replacement, TelemetryMessage gets a field named payload whose type is an union of the two types EnvironmentData and Event.
{
  "namespace": "iotexample",
  "name": "DeviceMessages",
  "type": "record",
  "fields": [
    {
      "name": "telemetry_messages",
      "type": {
        "namespace": "iotexample",
        "name": "TelemetryMessagesArray",
        "type": "array",
        "items": {
          "namespace": "iotexample",
          "name": "TelemetryMessage",
          "type": "record",
          "fields": [
            {
              "name": "timestamp",
              "type": "long",
              "default": 0
            },
            {
              "name": "payload",
              "type": [
                {
                  "namespace": "iotexample",
                  "name": "EnvironmentData",
                  "type": "record",
                  "fields": [
                    {
                      "name": "temperature",
                      "type": "double",
                      "default": 0
                    },
                    {
                      "name": "pressure",
                      "type": "double",
                      "default": 0
                    },
                    {
                      "name": "humidity",
                      "type": "double",
                      "default": 0
                    },
                    {
                      "name": "co2_level",
                      "type": "double",
                      "default": 0
                    }
                  ]
                },
                {
                  "namespace": "iotexample",
                  "type": "record",
                  "name": "Event",
                  "fields": [
                    {
                      "type": "string",
                      "name": "message",
                      "default": ""
                    },
                    {
                      "type": "int",
                      "name": "event_number",
                      "default": 0
                    },
                    {
                      "name": "error_level",
                      "type": {
                        "namespace": "iotexample",
                        "name": "ErrorLevel",
                        "type": "enum",
                        "symbols": [
                          "UNSPECIFIED",
                          "ERROR",
                          "WARNING",
                          "INFO"
                        ]
                      },
                      "default": "UNSPECIFIED"
                    }
                  ]
                }
              ]
            }
          ]
        }
      }
    }
  ]
}
 

Schema Evolution

Changing an existing Avro schema (removing fields, adding new fields, etc.) shows the main difference to Protobuf.

We have learned that Protobuf messages can be evolved in a way that enables a consumer which only knows about the new version to consume messages created with the old version and vice versa.

In Avro, this is not possible as the consumer must always know the schema that was used to serialize the message. There are different levels of compatibility which allow different changes and are explained here.

Systems using Avro usually employ a schema registry where all versions of a schema are stored. Messages must then be prefixed with the identifier of the schema used by the producer to allow the consumer to decode the message.

Programming with Avro in C++

To demonstrate the C++ API, we again create a Qt based application. This application uses the Avro C++ library which must be built and installed. The C++ version also provides a code generator named avrogencpp. We use it to create the message structs and the serialization and deserialization code.
avrogencpp -i iotmessages.json -o iotmessages.h -n iotexample 

Walkthrough

First, we need some includes for Avro and the Qt features we are going to use
#include "iotmessages.h"

#include <avro/ValidSchema.hh>
#include <avro/Compiler.hh>

#include <QCoreApplication>
#include <QDateTime>
#include <QDebug>
#include <QFile> 

Then we use the generated structs from iotmessages.h and populate a DeviceMessages message.

int main()
{
    // Create and populate a TelemetryMessage with EnvironmentData payload
    iotexample::TelemetryMessage sensorDataMessage;
    sensorDataMessage.timestamp = QDateTime::currentMSecsSinceEpoch();

    iotexample::EnvironmentData payload;
    payload.temperature = 23;
    payload.pressure = 1080;
    payload.humidity = 75;
    payload.co2_level = 415;

    sensorDataMessage.payload.set_EnvironmentData(payload);

    // Create and populate a TelemetryMessage with Event payload
    iotexample::TelemetryMessage eventMessage;
    eventMessage.timestamp = QDateTime::currentMSecsSinceEpoch();

    iotexample::Event event;
    event.message = std::string("My event message");
    event.error_level = iotexample::ErrorLevel::ERROR;
    event.event_number = 123;

    eventMessage.payload.set_Event(event);

    // Create a DeviceMessages message and add the two messages
    iotexample::DeviceMessages messages;
    messages.telemetry_messages.push_back(sensorDataMessage);
    messages.telemetry_messages.push_back(eventMessage); 

In the next step, we will serialize the message and print the serialized data size and the base64 encoded serialized data.

    // Create output stream and encoder
    auto out = avro::memoryOutputStream();
    auto encoder = avro::binaryEncoder();
    encoder->init(*out);

    // Encode the message
    avro::encode(*encoder, messages);
    encoder->flush();

    qDebug() << "Serialized size:" << out->byteCount() << "bytes";

    // Create input stream and reader
    auto in = avro::memoryInputStream(*out);
    auto reader = avro::StreamReader(*in);

    QByteArray data(out->byteCount(), Qt::Uninitialized);

    reader.readBytes(reinterpret_cast<quint8 *>(data.data()), out->byteCount());

    qDebug() << "Serialized data:" << data.toBase64(); 

To demonstrate the decoder, we will now use a validatingDecoder to deserialize the message. This decoder is initialized with the schema and will throw an exception if the data doesn’t match.

After decoding the message, the content is printed. The printing code also shows how to handle the union we used as oneof replacement.

    // Load schema file from the resource
    QFile schemaFile(":iotmessages.json");
    schemaFile.open(QFile::ReadOnly);
    auto schemaInput = std::istringstream(schemaFile.readAll().toStdString());

    avro::ValidSchema schema;

    try {
        avro::compileJsonSchema(schemaInput, schema);
    } catch(std::exception &ex) {
        qWarning() << "Failed to compile schema:" << ex.what();
        return 1;
    }

    // Create input stream and a validating binary decoder
    in = avro::memoryInputStream(*out);
    auto decoder = avro::binaryDecoder();

    decoder->init(*in);
    auto validatingDecoder = avro::validatingDecoder(schema, decoder);

    iotexample::DeviceMessages decoded;

    try {
        avro::decode(*validatingDecoder, decoded);
    } catch(std::exception &ex) {
        qWarning() << "Decode failed with:" << ex.what();
        return 1;
    }

    qDebug() << "Decoded message with" << decoded.telemetry_messages.size() << "values";

    for (uint i = 0; i < decoded.telemetry_messages.size(); ++i) {
        const auto &current = decoded.telemetry_messages.at(i);
        qDebug() << "Message" << i + 1;
        qDebug() << "  Timestamp:" << QDateTime::fromMSecsSinceEpoch(current.timestamp).toString(Qt::ISODate);
        if (current.payload.idx() == 1) {
            qDebug() << "  Event number:" << current.payload.get_Event().event_number;
            qDebug() << "  Event error level:" << (int) current.payload.get_Event().error_level;
            qDebug() << "  Event message:" << QString::fromStdString(current.payload.get_Event().message);
        } else if (current.payload.idx() == 0) {
            qDebug() << "  Temperature:" << current.payload.get_EnvironmentData().temperature;
            qDebug() << "  Pressure:" << current.payload.get_EnvironmentData().pressure;
            qDebug() << "  Humidity:" << current.payload.get_EnvironmentData().humidity;
            qDebug() << "  CO2:" << current.payload.get_EnvironmentData().co2_level;
        } else {
            qDebug() << "  Empty TelemetryMessages";
        }
    }

    return 0;
} 

Building the example

We use qmake to build the example application. The JSON schema is included as a resource and is also used to generate the C++ code at build time.

QT -= gui

CONFIG += c++11 console
CONFIG -= app_bundle

INCLUDEPATH += /opt/avro/include

LIBS += -L/opt/avro/lib -lavrocpp

HEADERS += \
    iotmessages.h

SOURCES += \
        main.cpp

AVRO_COMPILER = /opt/avro/bin/avrogencpp
command = $$AVRO_COMPILER -i iotmessages.json -o iotmessages.h -n iotexample
!system($$command) {
    error("avrogencpp is required to build this application")
}

RESOURCES += \
    schema.qrc 

Result

The application is executed and prints the result.

We see that the serialized data is 68 bytes in size, which is less than the 80 bytes needed by Protobuf. The main cause is the lack of optional fields and field numbers in Avro messages.

Serialized size: 68 bytes
Serialized data: "BLCsqYnDXAAAAAAAAAA3QAAAAAAA4JBAAAAAAADAUkAAAAAAAPB5QLCsqYnDXAIgTXkgZXZlbnQgbWVzc2FnZfYBAgA="
Decoded message with 2 values
Message 1
  Timestamp: "2020-05-15T15:45:28"
  Temperature: 23
  Pressure: 1080
  Humidity: 75
  CO2: 415
Message 2
  Timestamp: "2020-05-15T15:45:28"
  Event number: 123
  Event error level: 1
  Event message: "My event message" 

Programming with Avro in JavaScript / Node.js

We will now do the same encode/decode routine in Node.js. This example requires the avro-js module.
const avro = require('avro-js');

// Parse the schema and create the corresponding type
const deviceRecordMessage = avro.parse('./iotmessages.json');

// Create a sample message
const messageToEncode = {
    telemetry_messages: [
        {
            timestamp: Date.now(),
            payload: {
                'iotexample.EnvironmentData': {
                    temperature: 23,
                    pressure: 1080,
                    humidity: 75,
                    co2_level: 415
                }
            }
        },
        {
            timestamp: Date.now(),
            payload: {
                'iotexample.Event': {
                    message: 'My event message',
                    event_number: 123,
                    error_level: 'ERROR'
                }
            }
        }
    ]
}

// Encode the message
const data = deviceRecordMessage.toBuffer(messageToEncode);

// Print encoded data as base64
console.log(`Encoded data: "${data.toString('base64')}", length: ${data.length}`);

// Decode the message
const decoded = deviceRecordMessage.fromBuffer(data);

// Print the content of the decoded message
console.log(JSON.stringify(decoded, null, 2)); 

Running the example of course shows the same encoded data as the C++ example. The decoded data is JSON and follows the form that can be expected from the schema.

Encoded data: "BOKUk+vEXAAAAAAAAAA3QAAAAAAA4JBAAAAAAADAUkAAAAAAAPB5QOKUk+vEXAIgTXkgZXZlbnQgbWVzc2FnZfYBAgA=", length: 68

{
  "telemetry_messages": [
    {
      "timestamp": 1589787125041,
      "payload": {
        "iotexample.EnvironmentData": {
          "temperature": 23,
          "pressure": 1080,
          "humidity": 75,
          "co2_level": 415
        }
      }
    },
    {
      "timestamp": 1589787125041,
      "payload": {
        "iotexample.Event": {
          "message": "My event message",
          "event_number": 123,
          "error_level": "ERROR"
        }
      }
    }
  ]
} 

Schema evolution in JavaScript

We will now have at look what happens if the consumer in the cloud uses an updated schema and receives a message encoded with an older schema.
const avro = require('avro-js');
const fs = require('fs')

const schema = JSON.parse(fs.readFileSync('./iotmessages.json'))

// Parse the schema and create the corresponding type
const deviceRecordMessage = avro.parse(schema);

// Create a sample message
const messageToEncode = {
    telemetry_messages: [
        {
            timestamp: Date.now(),
            payload: {
                'iotexample.EnvironmentData': {
                    temperature: 23,
                    pressure: 1080,
                    humidity: 75,
                    co2_level: 415
                }
            }
        },
        {
            timestamp: Date.now(),
            payload: {
                'iotexample.Event': {
                    message: 'My event message',
                    event_number: 123,
                    error_level: 'ERROR'
                }
            }
        }
    ]
}

// Encode the message
const data = deviceRecordMessage.toBuffer(messageToEncode);

// Print encoded data as base64
console.log(`Encoded data: "${data.toString('base64')}", length: ${data.length}`); 

The first change will be to remove the co2_level field from the schema and then attempting to decode the encoded data with the new schema.

let decoded = null

// Modify the schema, rename co2_level to ambient_light
let modifiedSchema = JSON.parse(JSON.stringify(schema));
modifiedSchema.fields[0].type.items.fields[1].type[0].fields[3].name = 'ambient_light';
let deviceRecordMessageEvolved = avro.parse(modifiedSchema);

decoded = deviceRecordMessageEvolved.fromBuffer(data);
console.log(`${decoded.telemetry_messages[0].payload['iotexample.EnvironmentData'].ambient_light} was the value of co2_level`) 

Running the code shows that the value that was set in the co2_level field is now decoded as ambient_light because the semantic change is just in the schema and the encoded data has not changed..

The second modified schema adds the ambient_light field in addition to the co2_level field.

console.log("######## Try to decode a message with a newer type with a removed field ########");

decoded = deviceRecordMessageEvolved.fromBuffer(data);
console.log(`${decoded.telemetry_messages[0].payload['iotexample.EnvironmentData'].ambient_light} was the value of co2_level`)

// Modify the schema, add field ambient_light in addition to co2_level
modifiedSchema = JSON.parse(JSON.stringify(schema));
modifiedSchema.fields[0].type.items.fields[1].type[0].fields.push(
    {
        name: 'ambient_light',
        type: 'double',
        defaul": 0
    }
);
deviceRecordMessageEvolved = avro.parse(modifiedSchema);

try {
    decoded = deviceRecordMessageEvolved.fromBuffer(data);
} catch(ex) {
    console.error(`Failed to decode the message encoded with the old schema: ${ex}`);
} 

The decoder now fails with Error: invalid union index: -61 because the offsets expected by the decode assume an additional double value which is not present in the encoded data.

We now decode the message using a resolver which maps the encoded data to the new schema with the additional field.

// Create a resolving decoder and decode the message
resolver = deviceRecordMessageEvolved.createResolver(deviceRecordMessage);

decoded = deviceRecordMessageEvolved.fromBuffer(data, resolver);

console.log(JSON.stringify(decoded, null, 2)); 

The data is now decoded as expected, the ambient_light field which is not present in the encoded data is set to the default value that was specified in the schema.

{
  "telemetry_messages": [
    {
      "timestamp": 1589784854837,
      "payload": {
        "iotexample.EnvironmentData": {
          "temperature": 23,
          "pressure": 1080,
          "humidity": 75,
          "co2_level": 415,
          "ambient_light": 0
        }
      }
    },
    {
      "timestamp": 1589784854837,
      "payload": {
        "iotexample.Event": {
          "message": "My event message",
          "event_number": 123,
          "error_level": "ERROR"
        }
      }
    }
  ]
} 
To show the importance of default values in the schema, we will now add the ambient_light field without specifying a default value and repeat the previous test
modifiedSchema = JSON.parse(JSON.stringify(schema));
modifiedSchema.fields[0].type.items.fields[1].type[0].fields.push(
    {
        name: 'ambient_light',
        type: 'double'
    }
);
deviceRecordMessageEvolved = avro.parse(modifiedSchema);

try {
    resolver = deviceRecordMessageEvolved.createResolver(deviceRecordMessage);
} catch (ex) {
    console.error(`Decode failed without default value: ${ex}`);
} 

The attempt to create a resolver fails due to the missing default value

Decode failed without default value: Error: cannot read "iotexample.EnvironmentData" as ["iotexample.EnvironmentData","iotexample.Event"] 

Conclusion

As we have seen from the example, an Avro message must always be prefixed with some information about which schema was used to encode it or the decoder will either fail or create invalid data. Adding default values to the schema is very important to allow a value to be removed later.

Despite the slightly smaller encoded data size for Avro, the ability to update Protobuf message definitions in a compatible way without having to prefix the encoded data with a schema identifier makes it a better choice for the data transmission of IoT devices.

2 Responses

  1. In AVRO, you can define a field type as a union of [required_type,null]. and the default will be null. I do not think the point of having to define a default to enable removing later is correct.

Leave a Reply

Your email address will not be published. Required fields are marked *

Jannis Völker

Jannis Völker

Jannis Völker is a software engineer at basysKom GmbH in Darmstadt. After joining basysKom in 2017, he has been working in connectivity projects for embedded devices, Azure based cloud projects and has made contributions to Qt OPC UA and open62541. He has a background in embedded Linux, Qt and OPC UA and holds a master's degree in computer science from the University of Applied Sciences in Darmstadt.
Share on facebook
Share on twitter
Share on linkedin
Share on reddit
Share on xing
Share on email
Share on stumbleupon
Share on whatsapp
Share on pocket

Read more

open62541
Azure
Jannis Völker
Connect OPC UA with open62541 to MS Azure IoT Hub

The open62541 OPC UA stack with its Pub-Sub extension now supports MQTT over TLS as well as MQTT-brokers requiring a login (contributed by basysKom). This allows the direct communication between open62541 and the Azure IoT Hub and therefore highly simplifies the connection of OPC UA based IoT Devices to the cloud.

Read More »
IoT Cloud
Azure
Heike Ziegler
IoT: Getting started with cloud and modern IoT and IIoT from scratch

IoT and IIoT applications are special compared to other kinds of cloud applications as they have to deal with devices existing “outside” of data centers.

The following series of articles provide an end-to-end overview of what Microsoft Azure offers to handle some of the challenges involved in connecting an IoT Device with the Cloud.

By working through this series you will learn about the major concepts involved in getting your IoT/IIoT device connected to Microsoft Azure. In our examples we will feature Qt, Node.Js, Protobuf from Google and much more to get you started.

Read More »