The MQTT Collector Plugin

This plugin provides access to MQTT topics serving vanilla payloads, including simple primitives (numbers, booleans, or strings) and JSON objects.

Type 1 - Simple Message Example

Type 2 - Simple JSON Example

Type 3 - Structured JSON Example

Settings

Types


Introduction

The Collector configuration file appconfig.json for Windows, or appconfig.Docker.json for Docker, contains the Settings definition to connect to a MQTT Broker, to subscribe to topics, and to interpret the messages received on those topics.

By default this file can be found within C:\Program Files\Flow Software\Timebase\Collector\ for Windows, or /config for Docker.

There are three Types of MQTT payloads that can be configured to extract tag data:

 

Type 1 - Simple Message Example

When subscribing to a MQTT topic, you may receive messages that consist of only a simple value, such as a number or a string (e.g. 42 or 'Arthur Dent').

The following example plugin configuration file can be used to parse the message. Notice the Subscription Type is set to 1:

{
"Historians": [
{
"Host": "<yourHistorianHostOrIPAddress>",
"Port": 4511,
"UseTls": false
}
],
"Active": true,
"Type": "MQTT",
"Dataset": "MQTT Data",
"Settings": {
"Host": "<yourMQTTHostOrIPAddress>",
"Port": 8883, //either 1883 or 8883
"Username": "<UsernameGoesHere>", //may not be needed
"Password": "<PasswordGoesHere>", //may not be needed
"UseTls": false,
"Subscriptions": [
{
"Type": 1,
"QoS": 2,
"Topics": [
"ACME Filter/#"
]
}
]
}
}

The plugin type will be MQTT.

The dataset created in the Historian will be MQTT Data.

The Settings section has defined the broker connection and one Subscription.

The Subscription expects a Type 1 message on a topic that matches ACME Filter/#.

When the plugin receives a simple message, it will package the value and send it to the Historian with the current timestamp and the topic as the tagname.

 

Type 2 - Simple JSON Example

When subscribing to a MQTT topic, you may receive messages that consist of a simple JSON payload like the following examples. Type 2, or Simple JSON, payloads contain "path-based" tagnames and data, e.g. motor1.rpm is a JSON path that represents the value 23.6.

{
"payload": {
"timestamp": "2024-07-12T12:15"
},
"motor1": {
"rpm": 23.6,
"location": {
"long": 12.3,
"lat": 34.7
}
},
"motor2": {
"rpm": 28.6,
"location": {
"long": 22.4,
"lat": 36.8
}
}
}

Or ...

{
    "payload": {
        "timestamp": "2024-07-12T12:15"
    },
    "data": [
        {
            "motor1": {
                "rpm": 23.6,
                "location": {
                    "long": 12.3,
                    "lat": 34.7
                }
            }
        },
        {
            "motor2": {
                "rpm": 28.6,
                "location": {
                    "long": 22.4,
                    "lat": 36.8
                }
            }
        }
    ]
}

The following example plugin configuration file can be used to parse either of these messages. Notice the Subscription Type is set to 2:

{
"Historians": [
{
"Host": "<yourHistorianHostOrIPAddress>",
"Port": 4511,
"UseTls": false
}
],
"Active": true,
"Type": "MQTT",
"Dataset": "MQTT Data",
"Settings": {
"Host": "<yourMQTTHostOrIPAddress>",
"Port": 1883,
"UseTls": false,
"Subscriptions": [
     {
"Type": 2,
"TagnameIncludesTopic": false,
"TagnameDelimiter": ".",
"TimestampFields": [ "payload.timestamp" ],
"TimestampType": 3,
"QoS": 2,
"Topics": [
"ACME Filter/#"
]
}
]
}
}

The TagnameDelimiter between JSON path properties will be a period .

The TimestampFields, in this case only one, will reference the payload.timestampJSON path. See Settings

The TimestampType expects a string date and time. See Timestamp Types

When the plugin receives this example message, it will extract the following tags, their values,  and send them to the Historian with the parsed timestamp:

motor1.rpm 23.6

motor1.location.long 12.3

motor1.location.lat 34.7

motor2.rpm 28.6

motor2.location.long 22.4

motor2.location.lat 36.8

Notice how, in the second payload example, the name of the data array element is not used in the resulting tagname

 

Type 3 - Structured JSON Example

When subscribing to a MQTT topic, you may receive messages that consist of a Structured JSON payload like the following example. Type 3, or Structured JSON, payloads contain "data-based" tagnames, values, and metadata, e.g. the name property contains the tagname, the value property contains the tag's value, and the timestamp property contains the timestamp.

{
"topic": {
"namespace": "spBv1.0",
"edgeNodeDescriptor": "G1/E1",
"groupId": "G1",
"edgeNodeId": "E1",
"deviceId": "D1",
"type": "DBIRTH"
},
"payload": {
"timestamp": 1638223073192,
"metrics": [
{
"name": "Tag 1",
"timestamp": 1638223073192,
"dataType": "Int32",
"metaData": {},
"properties": {
"Quality": {
"type": "Int32",
"value": 192
}
},
"value": 1
},
{
"name": "Tag 2",
"timestamp": 1638223073011,
"dataType": "Boolean",
"metaData": {},
"properties": {
"Quality": {
"type": "Int32",
"value": 192
}
},
"value": false
},
{
"name": "Tag 3",
"timestamp": 1638223073041,
"dataType": "Float",
"metaData": {},
"properties": {
"Quality": {
"type": "Int32",
"value": 192
}
},
"value": 1.23
}
],
"seq": 1
}
}

Notice that this example of a Structured JSON payload is actually SparkplugB in a JSON format

The following example plugin configuration file can be used to parse the message. Notice the Subscription Type is set to 3:

{
"Historians": [
{
"Host": "<yourHistorianHostOrIPAddress>",
"Port": 4511,
"UseTls": false
}
],
"Active": true,
"Type": "MQTT",
"Dataset": "MQTT Data",
"Settings": {
"Host": "<yourMQTTHostOrIPAddress>",
"Port": 1883,
"UseTls": false,
"Subscriptions": [
{
"Type": 3,
"TagnameIncludesTopic": false,
"TagnameDelimiter": ".",
"TagnameFields": [ "topic.groupId", "topic.edgeNodeId", "topic.deviceId", "name" ],
"TimestampFields": [ "payload.timestamp", "timestamp" ],
"TimestampType": 2,
"ValueField": "value",
"QualityField": "properties.Quality.value",
"QoS": 2,
"Topics": [
"ACME Filter/#"
]
}
]
}
}

The TagnameFields reference an array of JSON paths that will be concatenated, in order, to create a tagname. In this example, the resultant tagname will be G1.E1.D1.Tag 1 

The TagnameDelimiter between JSON path properties will be a period .

The TimestampFields will reference an array of JSON paths. Where multiple JSON paths are specified, the deepest path found will be used. In this example, the timestamp path within the elements of the metrics array will be used.

The TimestampType expects a UNIX Millisecond format. See Timestamp Types

The ValueField references the value JSON path within the elements of the metricsarray.

The QualityField references the properties.Quality.value JSON path within the elements of the metrics array.

When the plugin receives this example message, it will extract the following tags, their values,  and send them to the Historian:

Tag G1.E1.D1.Tag 1 value 1 quality 192

Tag G1.E1.D1.Tag 2 value false quality 192

Tag G1.E1.D1.Tag 3 value 1.23 quality 192

Notice that the Subscriptions section of the configuration is an array. This means you can add more than one Subscription to your configuration. Additionally, this means you can combine Subscriptions of different Types in the same configuration.

Notice that the Topics property of a Subscription is an array. This means that for each Subscription definition, you can specify multiple topics to subscribe to.

 

Settings

The following settings can be set for the MQTT plugin:

  "Settings": {
"Host": "broker.hivemq.com",
"Port": 8883,
"UseTls": true,
"Subscriptions": [
{
"Type": 3,
"TagnameIncludesTopic": false,
"TagnameDelimiter": ".",
"TagnameFields": [ "topic.groupId", "topic.edgeNodeId", "topic.deviceId", "name" ],
"TimestampFields": [ "payload.timestamp", "timestamp" ],
"TimestampType": 2,
"ValueField": "value",
"QualityField": "properties.Quality.value",
"QoS": 2,
"Topics": [
"tbtest/#"
]
}
]
}

Connection

  • Host string required The hostname or IP address of the Historian to connect to
  • Port int The port number of the Historian broker. Default value is 4511.
  • UseTls bool Specifies whether to use TLS/SSL for secure communication with the broker. Default value is false
  • ClientId string Specifies a unique identifier that identifies the client connecting to the broker. Default value is a `random identifier`

Authentication

  • Username string The username required to authenticate with the MQTT broker
  • Password string The password required to authenticate with the MQTT broker. If a Username is provided, then a Password must also be provided.

Subscriptions

  • Subscriptions array of Subscription required

Subscription Definition

Payload

  • Type PayloadTypes required The data format of the payload expected from topics on this subscription
  • TagnameIncludesTopic bool Specifies whether the topic should prefix tagnames
  • TagnameDelimiter string The delimiter used when combining fields to concatenate a tagname
  • TagnameFields array of strings The tagname field. If multiple fields are specified, their values will be concatenated, separated by the TagnameDelimiter
  • TimestampFields array of string The timestamp field. If multiple fields are specified, the deepest one found will be used
  • TimestampType TimestampTypes The expected timestamp format. Default value is Date and time string
  • ValueField string The value field. Required for PayloadType Json (Structured)
  • QualityField string The quality field. Used for PayloadType Json (Structured)

Delivery

  • QoS QualityOfServiceTypes The MQTT Quality of Service required for topics on this subscription. Default value is Exactly once.

Topics

  • Topics array of string Topics are case-sensitive and composed of one or more levels, separated by a forward slash ("/"). "+" represents a single-level wildcard (e.g. "rooms/+/temperature"). "#" represents a multi-level wildcard that can only be used as the last character in the topic, preceded by a forward slash (e.g. "rooms/#"). Avoid leading or trailing slashes. Avoid empty topics (e.g. "rooms/temperature").
  • Filter string A tag filter is case-sensitive regex string applied to the resultant tagnames (e.g. "Line 1|.*RPM$" will exclude any tagname that contains "Line 1" or ends with "RPM". The regex string can be used to specify multiple filters where required.
{
"Historians": [
{
"Host": "<yourHistorianHostOrIPAddress>",
"Port": 4511,
"UseTls": false
}
],
"Active": true,
"Type": "MQTT",
"Dataset": "MQTT Data",
"Settings": {
"Host": "<yourMQTTHostOrIPAddress>",
"Port": 8883, //either 1883 or 8883
"Username": "<UsernameGoesHere>", //may not be needed
"Password": "<PasswordGoesHere>", //may not be needed
"UseTls": false,
"Subscriptions": [
{
"Type": 1,
"QoS": 2,
"Topics": [
"ACME Filter/#"
],
"Filter": "ACME Filter/Personnel"
}
]
}
}

 

 

 

 

Types

Payload Types

 

Timestamp Types

  • 1 UNIX (seconds) Seconds since 1970-01-01 00:00:00 UTC (e.g. 1689526200.234456)
  • 2 UNIX (milliseconds) Milliseconds since 1970-01-01 00:00:00 UTC (e.g. 1689526200234.456)
  • 3 Date and time string Date and time string (e.g. 2024-07-28T14:30:03Z or 2024-07-28T12:30:03+02:00)

 

Quality of Service Types

  • 0 At most once The message arrives at the receiver either once or not at all, "fire and forget"
  • 1 At least once Ensures that the message arrives at the receiver at least once
  • 2 Exactly once For use when neither loss nor duplication of messages are acceptable