OPC UA PubSub over MQTT 5: A Production Implementation Tutorial
Last Updated: 2026-05-16
Architecture at a glance
Architecture diagram — OPC UA PubSub over MQTT 5: A Production Implementation TutorialArchitecture diagram — OPC UA PubSub over MQTT 5: A Production Implementation TutorialArchitecture diagram — OPC UA PubSub over MQTT 5: A Production Implementation TutorialArchitecture diagram — OPC UA PubSub over MQTT 5: A Production Implementation TutorialArchitecture diagram — OPC UA PubSub over MQTT 5: A Production Implementation Tutorial
If you have ever tried to scale an OPC UA Client/Server deployment past a few dozen consumers — a SCADA HMI, a historian, an MES connector, two analytics services, a cloud bridge — you have already met the wall. Sessions multiply, the server CPU spikes on every reconnect storm, and your network team starts asking why a PLC is holding open thirty TCP sockets. OPC UA PubSub over MQTT 5 is the fix the OPC Foundation finally standardized in Part 14 of the specification, and as of 2026 it is production-ready in mainstream stacks. This tutorial walks you through a working implementation end-to-end: how the message model maps onto MQTT 5, the difference between UADP and JSON DataSetMessages, which MQTT 5 properties actually matter for industrial reliability, and how to layer signing, encryption, and mTLS without breaking interoperability. Every code block runs against asyncua v1.1+ and aiomqtt v2+ against a local EMQX 5, HiveMQ 4, or Mosquitto 2 broker.
Why OPC UA PubSub Over MQTT (and Not Client/Server)
OPC UA Client/Server is a session-oriented, request-response protocol. Every consumer opens a SecureChannel and a Session against the server, creates a Subscription, registers MonitoredItems, and the server pushes Publish responses back. That works beautifully for one HMI talking to one PLC. It scales poorly the moment you have N consumers wanting the same dataset, because the server now holds N sessions, N subscriptions, N keep-alive timers, and the certificate trust list grows with every new client. Most embedded OPC UA servers — those baked into PLCs and gateways — cap out somewhere between 5 and 50 concurrent sessions.
PubSub flips the model. The publisher does not know who is listening. It serializes a DataSet — a named collection of variable values — into a DataSetMessage, wraps it in a NetworkMessage, and pushes it to a message-oriented middleware. Consumers subscribe to topics. The server is now stateless with respect to consumers, and the broker handles fan-out. Part 14 of the OPC UA specification (Sections 5.4 and 6.2) defines two transport mappings: UDP (for deterministic LAN use, including TSN) and broker-based, currently MQTT and AMQP. MQTT 5 has become the default because brokers like EMQX, HiveMQ, and Mosquitto 2 are everywhere and MQTT 5 finally added the metadata primitives PubSub needs.
What you gain in practice:
Decoupled scale. Add a new consumer by subscribing to a topic. The PLC never knows. We routinely run 200+ subscribers against a single Siemens S7-1500 publisher.
Network friendliness. One outbound TCP connection to the broker, often on port 8883, traverses firewalls and NAT cleanly. Client/Server requires inbound TCP/4840, which network security teams reliably block.
Store-and-forward. MQTT 5 retained messages plus message-expiry-interval give you a last-known-value cache without a separate historian sidecar.
Polyglot consumers. A Python analytics service or a Node-RED flow can decode a JSON DataSetMessage without an OPC UA stack. Try doing that against a Client/Server endpoint.
The tradeoff is non-determinism. PubSub-over-MQTT is best-effort with QoS 1 or 2, but it is not real-time control. If you need sub-10ms guarantees, you want UADP-over-UDP with TSN, not MQTT. For everything north of the PLC layer — historization, analytics, MES, cloud — MQTT 5 is the right plane.
End-to-End Architecture: From Server to Subscriber
The PubSub model is built around six configuration objects defined in Part 14, Section 6. Understanding their hierarchy is non-negotiable, because every SDK — asyncua, open62541, the UA-.NETStandard stack — exposes the same nouns. If you read them once, you read them in every codebase.
The Six Objects You Must Know
PublishedDataSet — a server-side definition of what data goes out. It enumerates which OPC UA Variables (by NodeId) belong to the dataset and assigns each a FieldName. This object is independent of any transport. It is the contract.
PubSubConnection — wraps a single transport endpoint. For MQTT, the connection carries the broker URL (e.g. mqtt://broker.local:1883), credentials, and protocol version. One connection can host many writer groups.
WriterGroup — groups DataSetWriters that share a publishing interval and a header layout. The WriterGroup owns the NetworkMessage that wraps DataSetMessages. Think of it as the “batch” container.
DataSetWriter — the binding between a PublishedDataSet and a WriterGroup. It assigns a DataSetWriterId (uint16), an MQTT QueueName (topic), and a content mask deciding which fields land in the wire message.
ReaderGroup / DataSetReader — the symmetric pair on the subscriber side. A DataSetReader matches incoming NetworkMessages by PublisherId and DataSetWriterId, decodes the payload, and writes values into target variables or invokes a callback.
SecurityGroup / SKS (Security Key Service) — optional but required for SignAndEncrypt at the PubSub layer. The SKS distributes symmetric keys to publishers and subscribers and rotates them on a configurable cadence.
How a Value Flows End-to-End
The publisher reads a Variable from its address space at the WriterGroup’s publishingInterval — say 1000 ms. The DataSetWriter serializes the configured FieldNames into a DataSetMessage. The WriterGroup concatenates one or more DataSetMessages into a NetworkMessage with a header containing PublisherId, WriterGroupId, and SequenceNumber. The PubSubConnection hands the NetworkMessage to the MQTT 5 client, which PUBLISHes to a topic — convention is <facility>/<line>/<dswId> or the Sparkplug B-style hierarchy. The broker fans out to subscribers. On the subscriber, the DataSetReader matches the message, decodes the payload, and updates target variables or fires on_data_changed.
A subtle gotcha: the topic alone does not identify a dataset. Part 14 Section 7.3.3 mandates that PublisherId plus DataSetWriterId is the canonical key. Two writers can publish to the same topic; the reader disambiguates from the header. We have debugged hours of “missing data” that traced back to teams assuming topic = writer.
Step 1 – PubSub Configuration with asyncua
asyncua (the actively maintained successor to python-opcua) ships full PubSub configuration support from v1.1. The configuration is exposed as a PubSubConfig object you attach to the server. Here is a working publisher setup that you can drop into a publisher.py and run against any OPC UA address space:
# publisher.py — asyncua v1.1+, Python 3.11+
import asyncio
from asyncua import Server, ua
from asyncua.pubsub.publisher import Publisher
from asyncua.pubsub.information_model import (
PubSubConnectionConfig,
WriterGroupConfig,
DataSetWriterConfig,
PublishedDataSetConfig,
PublishedVariableConfig,
)
async def main() -> None:
# 1. Standard OPC UA server hosting the address space we want to publish.
server = Server()
await server.init()
server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
server.set_server_name("plc-01")
# Create the variables we are going to publish.
ns = await server.register_namespace("urn:example:line1")
folder = await server.nodes.objects.add_folder(ns, "Line1")
temp = await folder.add_variable(ns, "Temperature", 72.4)
pressure = await folder.add_variable(ns, "Pressure", 1.013)
rpm = await folder.add_variable(ns, "MotorRPM", 1450)
for v in (temp, pressure, rpm):
await v.set_writable()
# 2. PublishedDataSet — the "what". Three fields, identified by NodeId.
pds = PublishedDataSetConfig(
name="Line1Telemetry",
published_variables=[
PublishedVariableConfig(
published_variable=temp.nodeid,
attribute_id=ua.AttributeIds.Value,
field_name="Temperature",
),
PublishedVariableConfig(
published_variable=pressure.nodeid,
attribute_id=ua.AttributeIds.Value,
field_name="Pressure",
),
PublishedVariableConfig(
published_variable=rpm.nodeid,
attribute_id=ua.AttributeIds.Value,
field_name="MotorRPM",
),
],
)
# 3. PubSubConnection — bind to an MQTT 5 broker.
conn_cfg = PubSubConnectionConfig(
name="mqtt-broker-1",
publisher_id=ua.Variant("plc-01", ua.VariantType.String),
transport_profile_uri=(
"http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-json"
# use ...pubsub-mqtt-uadp for binary UADP
),
address_url="mqtt://broker.local:1883",
)
# 4. WriterGroup — owns the publish cadence and NetworkMessage layout.
wg_cfg = WriterGroupConfig(
name="Line1Group",
writer_group_id=1,
publishing_interval=1000.0, # ms
keep_alive_time=5000.0,
# JsonNetworkMessageContentMask — see Step 2 for the bit meaning.
message_settings={
"NetworkMessageContentMask": 0x0000003F,
"DataSetMessageContentMask": 0x00000007,
},
)
# 5. DataSetWriter — binds the PDS to the WriterGroup + topic.
dsw_cfg = DataSetWriterConfig(
name="Line1Writer",
data_set_writer_id=1,
data_set_name="Line1Telemetry",
queue_name="plant/line1/dsw1", # MQTT topic
# Send all field values plus status and source timestamp.
data_set_field_content_mask=ua.DataSetFieldContentMask.StatusCode
| ua.DataSetFieldContentMask.SourceTimestamp,
)
# 6. Wire it together and start.
pubsub = Publisher(server)
await pubsub.add_published_data_set(pds)
connection = await pubsub.add_connection(conn_cfg)
writer_group = await connection.add_writer_group(wg_cfg)
await writer_group.add_data_set_writer(dsw_cfg, pds_name="Line1Telemetry")
async with server:
await pubsub.start()
print("Publishing on plant/line1/dsw1 every 1000 ms")
# Drift the values so you can watch them on the subscriber.
i = 0
while True:
await temp.write_value(72.4 + (i % 10) * 0.1)
await pressure.write_value(1.013 + (i % 5) * 0.002)
await rpm.write_value(1450 + (i % 20))
i += 1
await asyncio.sleep(1.0)
if __name__ == "__main__":
asyncio.run(main())
What each block does and why it matters:
Step 2 (PublishedDataSet) is the contract. Once a consumer is decoding Line1Telemetry, you cannot rename a field without bumping a MajorVersion field on the dataset — Part 14 Section 6.2.3 calls this a breaking change. Treat it like a Protobuf schema.
Step 3 (Connection) — the transport_profile_uri is the switch between binary UADP and JSON. Mix this up and you will see subscribers parse garbage. The profile URIs are normative; they live in Part 14 Annex C.
Step 4 (WriterGroup) — the publishing_interval is the hot knob. Too tight and you melt your broker. We default to 1000 ms for telemetry and 100 ms only for state-machine variables.
Step 5 (DataSetWriter) — queue_name is the MQTT topic. Do not use wildcards here. The writer publishes to one topic; readers subscribe with filters.
Start the publisher, point an MQTT client at mqtt://broker.local:1883 and subscribe to plant/line1/dsw1, and you will see DataSetMessages arrive every second.
Step 2 – DataSetMessage Mapping: UADP vs JSON Payloads
The single most-asked question after “does this work?” is “UADP or JSON?” The answer is “depends on who is reading,” but you need to understand both wire formats to choose well. Both are defined in Part 14 Section 7.2.
UADP (UA Data over Packet) — Binary
UADP is OPC UA’s compact binary encoding, the same wire format used in UDP-based PubSub. A NetworkMessage on the wire starts with a 1-byte header containing flags (PublisherId present, GroupHeader present, PayloadHeader present, ExtendedFlags1 present), then the PublisherId as a Variant, then a GroupHeader carrying WriterGroupId and NetworkMessageNumber, then a PayloadHeader enumerating the DataSetWriterIds present in this message, and finally one or more DataSetMessages.
A typical UADP message for our three-field dataset lands around 48 bytes on the wire. Decoded with Wireshark‘s OPC UA dissector or asyncua.pubsub.network_message.NetworkMessage.from_binary(), it looks like:
Set transport_profile_uri="http://opcfoundation.org/UA-Profile/Transport/pubsub-mqtt-uadp" and NetworkMessageContentMask per Part 14 Section 7.2.2.2 (PublisherId=0x01, GroupHeader=0x02, WriterGroupId=0x04, PayloadHeader=0x10, etc.) and asyncua will emit UADP onto the MQTT topic. Use UADP for edge-to-edge links, bandwidth-constrained cellular, and anywhere you trust the consumer to have an OPC UA decoder.
JSON — Human Readable
Set the same connection to pubsub-mqtt-json and the wire payload becomes structured JSON per Part 14 Section 7.2.3:
This is about 420 bytes — 8–10× UADP — but any consumer with a JSON parser can read it. Use JSON for cloud ingest (Kafka, Kinesis, Pub/Sub), Node-RED, Grafana data sources, and anything where the consumer is not a card-carrying OPC UA stack.
Switching at Runtime
You cannot switch encoding per-message, but you can run two DataSetWriters in the same WriterGroup — one UADP for the historian, one JSON for the analytics bus — pointing to different topics. The PublishedDataSet is shared, so the contract stays single-sourced.
Step 3 – MQTT 5 Properties for PubSub Reliability
MQTT 3.1.1 was not enough for PubSub. MQTT 5 added user properties, content type, response topic, message expiry interval, and session expiry — every one of which is load-bearing in a production OPC UA PubSub deployment. Here is the subscriber, plus the property dictionary you pass to aiomqtt‘s publish() call:
# subscriber.py — aiomqtt v2+, decodes JSON DataSetMessage payloads.
import asyncio
import json
import aiomqtt
from aiomqtt import MqttError
from aiomqtt.types import PayloadType
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
BROKER = "broker.local"
TOPIC_FILTER = "plant/+/dsw+"
async def consume() -> None:
# session_expiry_interval keeps offline messages queued for 1 hour
props = Properties(PacketTypes.CONNECT)
props.SessionExpiryInterval = 3600
async with aiomqtt.Client(
hostname=BROKER,
port=1883,
identifier="historian-01",
protocol=aiomqtt.ProtocolVersion.V5,
clean_start=False,
properties=props,
) as client:
await client.subscribe(TOPIC_FILTER, qos=1)
async for message in client.messages:
handle(message)
def handle(message) -> None:
# MQTT 5 properties are exposed via message.properties
content_type = getattr(message.properties, "ContentType", None)
user_props = dict(getattr(message.properties, "UserProperty", []) or [])
writer_id = user_props.get("writerId")
pub_id = user_props.get("pubId")
if content_type == "application/opcua+json":
payload = json.loads(message.payload)
for dsm in payload["Messages"]:
print(
f"[{pub_id}/{writer_id}] seq={dsm['SequenceNumber']} "
f"fields={dsm['Payload']}"
)
elif content_type == "application/opcua+uadp":
# See asyncua.pubsub.network_message for the binary decoder.
from asyncua.pubsub.network_message import NetworkMessage
nm = NetworkMessage.from_binary(message.payload)
for dsm in nm.data_set_messages:
print(f"[{pub_id}/{writer_id}] uadp seq={dsm.sequence_number} "
f"fields={[f.value for f in dsm.fields]}")
asyncio.run(consume())
And here is the property dictionary the publisher attaches to each PUBLISH (asyncua’s MQTT transport wires these for you, but if you are using aiomqtt directly to relay messages, build it manually):
# Illustrative — when relaying or republishing DataSetMessages via aiomqtt
# directly. asyncua's built-in transport handles this internally.
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
props = Properties(PacketTypes.PUBLISH)
props.ContentType = "application/opcua+json" # or +uadp
props.MessageExpiryInterval = 10 # seconds; drop if stale
props.ResponseTopic = "plant/line1/dsw1/ack" # if you want round-trip acks
props.CorrelationData = b"\xCA\xFE" # round-trip correlation
props.UserProperty = [
("writerId", "1"),
("pubId", "plc-01"),
("dataSetClassId", "Line1Telemetry"),
]
await client.publish(
topic="plant/line1/dsw1",
payload=json_payload,
qos=1,
retain=True, # last-known-value cache
properties=props,
)
Why each property matters:
retain=true — when a new subscriber connects, the broker immediately replays the last retained message. This is your last-known-value cache. Without it, a subscriber that comes up between publishing intervals waits up to a full interval to see a value.
MessageExpiryInterval — set this to ~10× your publishingInterval. If a subscriber is offline longer than this, the broker drops the message rather than firehosing stale telemetry on reconnect.
ContentType — application/opcua+json or application/opcua+uadp. Lets a polyglot subscriber dispatch the right decoder without parsing the payload first.
UserProperty — carry writerId and pubId as headers so subscribers can filter without decoding the payload. This is the header lookup Part 14 Section 7.3.3 standardizes.
SessionExpiryInterval + clean_start=false — combine on the subscriber to get persistent sessions. Disconnect, reconnect within the interval, and queued QoS 1 messages are redelivered.
ResponseTopic + CorrelationData — needed only if you implement a request/response side-channel (rare for telemetry, common for command-and-control).
Step 4 – Security: Signing, Encryption, mTLS
Security in PubSub-over-MQTT is layered. You have MQTT transport security (TLS 1.3, mTLS, broker ACLs) and OPC UA PubSub message-level security (signing and SignAndEncrypt of DataSetMessages with keys distributed by an SKS — Security Key Service). They are independent. You can run either, both, or — in a tightly isolated cell network — neither.
Transport layer. Always TLS 1.3, always mTLS for industrial deployments. Hand each publisher and each subscriber its own X.509 client certificate signed by a private CA. Configure broker ACLs so a given client identity can publish only to its own topic prefix — EMQX 5’s acl.conf and HiveMQ 4’s REST API both support this. In aiomqtt:
Message layer. If your broker is in a different trust domain — a cloud-hosted EMQX or HiveMQ Cloud — TLS alone is not enough, because the broker operator can read your payloads. Use PubSub SignAndEncrypt to keep the broker as an opaque relay. Part 14 Section 8.3 defines two security policies you will see:
Aes128_Sha256_RsaOaep — SignOnly. Integrity and authenticity, no confidentiality. Picks up a tamper, broker still sees the plaintext.
Aes256_Sha256_RsaPss — SignAndEncrypt. Full confidentiality, AES-256-GCM payload, RSA-PSS for the key wrapping.
The keys are not your X.509 certs; they are symmetric SecurityKey sets the SKS pushes to authorized publishers and subscribers in a SecurityGroup, with rotation. In asyncua you point the PubSubConnection at the SKS endpoint and set security_mode=SignAndEncrypt on the WriterGroup. The SKS itself is a regular OPC UA Client/Server endpoint exposing the GetSecurityKeys method (Part 14, Section 8.4) — you can run one bundled in asyncua, or use commercial offerings from Siemens, Beckhoff, or Unified Automation.
The decision tree in the diagram above is the one we apply in practice. Default to TLS 1.3 + mTLS unless you are inside an air-gapped cell network. Add SignAndEncrypt the moment the broker leaves the trust boundary. Rotate SKS keys every 24h; anything longer and you are leaning on AES-GCM nonce uniqueness for too long.
Trade-offs and Gotchas
Broker scalability is not free. A single EMQX 5 node will happily handle 100k QoS 1 publishes per second on a beefy VM, but only if your topic tree is shallow and your retained-message count stays in the low millions. We have seen Mosquitto 2 fall over at 50k retained messages because the persistence layer is single-threaded. Size the broker for your retained working set, not just your message rate.
Time sync is invisible until it fails. DataSetMessages carry SourceTimestamp and ServerTimestamp. If your publisher’s clock drifts 200 ms relative to a downstream Spark job’s clock, you will get out-of-order windows and ghost gaps in your dashboards. PTP (IEEE 1588) on the OT side, NTP with a local stratum-2 reference on the IT side, and never trust the broker’s receive time as a substitute for source time.
Security policy mismatch is the #1 interop fail. Publisher says Aes256_Sha256_RsaPss, subscriber library only ships Aes128_Sha256_RsaOaep, no error code surfaces because the subscriber never decodes a message it cannot parse — it just sits silent. Always log on the broker side and verify with mosquitto_sub -t '$SYS/broker/clients/connected' that both sides are exchanging traffic before you blame the wire.
MajorVersion migration is not optional. Renaming MotorRPM to RPM_Setpoint in the PublishedDataSet without bumping MetaDataVersion.MajorVersion will silently break every subscriber that caches metadata. Plan dataset evolution like API versioning.
MQTT 5 retain + SignAndEncrypt is dangerous. A retained encrypted message will outlive its key rotation window. Subscribers that connect after rotation cannot decode the retained payload. Either disable retain for SignAndEncrypt topics, or set MessageExpiryInterval shorter than the SKS rotation interval.
Practical Recommendations
Stand up a local stack first. Drop this into a docker-compose.yml and you have EMQX 5, HiveMQ 4, and Mosquitto 2 side by side for compatibility testing:
Then follow this rollout order: (1) get a JSON publisher running against Mosquitto with no TLS; (2) flip to UADP and verify byte-for-byte against Part 14 Section 7.2.2; (3) add TLS 1.3 + mTLS; (4) introduce a second consumer to validate fan-out; (5) add SignAndEncrypt with an SKS once the broker is no longer on your trusted network. Skipping steps 1–2 is the leading cause of multi-week “we cannot figure out why the data is wrong” investigations.
Pick JSON for cloud-bound traffic and UADP for edge-to-edge. Run two writers in parallel when you need both. Set publishingInterval to 1000 ms unless you have a measured reason to go faster. Keep your topic hierarchy flat — <site>/<area>/<asset>/<writer> — and avoid Sparkplug B-style topic encoding unless you are committed to the Sparkplug stack. And put prometheus-mqtt-exporter against your broker from day one — broker-side metrics are how you debug PubSub, not subscriber-side logs.
FAQ
Q: Is OPC UA PubSub over MQTT compatible with Sparkplug B?
A: No, they are different specifications. Sparkplug B is a topic and payload convention on top of MQTT; OPC UA PubSub is a separate payload format defined by the OPC Foundation. You can run both on the same broker on different topic trees, but a Sparkplug subscriber cannot decode OPC UA DataSetMessages, and vice versa. See our OPC UA vs MQTT Sparkplug B comparison for a deeper breakdown.
Q: Can I use MQTT 3.1.1 instead of MQTT 5 for OPC UA PubSub?
A: Technically yes — Part 14 allows it — but you lose ContentType, UserProperty, MessageExpiryInterval, and ResponseTopic. You will be encoding writer/publisher IDs into topic names or payloads, which breaks polyglot consumption. MQTT 5 is the production answer in 2026.
Q: What is the difference between UADP-over-UDP and UADP-over-MQTT?
A: UADP is the binary payload format. UDP transport is connectionless multicast, used for deterministic LAN traffic (typically TSN-enabled). MQTT transport is broker-mediated, used for north-bound integration. The payload bytes are identical; the framing and reliability differ. See our PubSub vs UADP architecture comparison.
Q: How many DataSetWriters can one WriterGroup hold?
A: The spec imposes no hard limit; in practice asyncua handles dozens. The real constraint is the resulting NetworkMessage size — keep it under your broker’s max_packet_size (EMQX 5 defaults to 1MB).
Q: Does asyncua support the SKS (Security Key Service)?
A: As of asyncua 1.1, basic SKS client behaviour is supported; a built-in SKS server is on the roadmap. For 2026 production deployments with SignAndEncrypt, most teams pair asyncua with a commercial SKS from Unified Automation or Siemens. Our asyncua production tutorial covers the integration in depth.