MQTT to Kafka Bridge: Production IoT Tutorial (2026)
IoT systems face a fundamental architectural tension. Field devices demand lightweight, low-bandwidth protocols—MQTT delivers exactly that, with pub/sub semantics, QoS levels 0/1/2, and footprints measured in kilobytes. But enterprise consumers—analytics platforms, stream processors, compliance auditors—want Kafka: durable append-only logs, infinite replay, consumer-group coordination, and built-in exactly-once guarantees. Neither protocol is wrong. They solve different problems.
An MQTT to Kafka bridge solves the impedance mismatch. It lets you keep devices on MQTT while giving downstream systems the durability and scale that Kafka provides. This tutorial walks through building a production MQTT to Kafka bridge using EMQX 5.4, Kafka 3.7, and Strimzi 0.42—including payload decoding, transactional semantics, and the operational patterns that survive at scale. If you’re building an MQTT-to-Kafka bridge for the first time, this guide covers the three main implementation strategies and step-by-step code to deploy one in Kubernetes today.
What this tutorial covers:
- Why MQTT and Kafka coexist in modern IoT architectures
- Three MQTT-to-Kafka bridge implementations compared (EMQX native, Kafka Connect, custom service)
- Step-by-step setup with real YAML and CLI examples for your MQTT to Kafka bridge
- Sparkplug B payload decoding in Kafka
- Exactly-once semantics: what’s realistic, what’s not
- Failure modes, trade-offs, and production recommendations for running an MQTT-to-Kafka bridge at scale
- Sizing, monitoring, and disaster recovery patterns
Why You Need an MQTT-to-Kafka Bridge
An MQTT-to-Kafka bridge addresses a critical challenge in modern IoT architectures. MQTT scales devices beautifully. A single EMQX cluster handles 100M connections per node, consumes 2MB per 1000 connected devices, and lets you run brokers on edge gateways. Devices publish factory/line3/temp: 42.5 once per second, and the broker routes it instantly to whoever is listening. QoS 2 guarantees delivery.
But MQTT has blind spots at enterprise scale, which is precisely why an MQTT to Kafka bridge becomes essential:
- No replay. Messages disappear the moment they’re delivered. If your analytics pipeline was down for 30 minutes, those data points are gone.
- Bounded fan-out. Lots of subscribers slow down the broker. A thousand downstream consumers listening to the same MQTT topic create N subscriptions, each consuming broker resources.
- No long-term retention. Persistent message stores exist, but they’re not designed for years of historical data or time-windowed queries.
- Limited transformation. MQTT brokers route by topic. Schema evolution, payload decoding, and cross-topic joins live in consumer code.
Kafka, by contrast, is built for infinite fan-out and long-term retention. Append-only logs store every message forever (or until a retention policy evicts them). Consumers join independently. A thousand consumers reading the same topic pay near-zero cost. But Kafka has its own taxes:
- Heavier footprint. A Kafka broker is a JVM process with 1–2 GB heap. It won’t run on a gateway or edge device.
- Auth and TLS complexity. MQTT brokers support username/password; Kafka demands SASL, mTLS, or token-based auth.
- Operational overhead. Zookeeper (or KRaft), broker coordination, replica management, topic sharding—the barrier to entry is higher.
The MQTT to Kafka bridge pattern lets you have both. Devices stay on MQTT (lightweight, always reachable). Your MQTT-to-Kafka bridge service consumes MQTT messages and republishes them to Kafka, where they live forever, are replayed on demand, and feed into ClickHouse, Flink, S3, Snowflake, and anything else that speaks Kafka. Building an effective MQTT to Kafka bridge requires understanding both protocols and the trade-offs at each layer.
Reference Architecture

The canonical topology has four layers:
-
Sensors and edge devices (temperature, pressure, motion sensors; industrial gateways). They publish MQTT messages to topic trees like
factory/line3/machine1/tempwith QoS 1 or 2. -
MQTT broker cluster (EMQX, Mosquitto, HiveMQ, or self-hosted Kafka MQTT proxy). Typically 3–5 nodes behind a load balancer, handling 50k–1M concurrent connections per node, peak throughput 100k+ messages/sec.
-
MQTT to Kafka bridge (the subject of this tutorial). Subscribes to MQTT topics, decodes payloads, enriches with metadata, and produces to Kafka with transactional guarantees.
-
Kafka cluster (Strimzi on Kubernetes, MSK on AWS, or self-managed). Brokers replicate messages across 3 nodes, retain for 7–30 days, and expose to downstream processors via consumer groups.
-
Downstream consumers (Flink for real-time, ClickHouse for analytics, Spark batch jobs, S3 sink for data lake, webhooks for alerting).
This architecture separates concerns: MQTT handles edge scale, Kafka handles enterprise durability. Your MQTT-to-Kafka bridge becomes the critical connector between them.
Three MQTT-to-Kafka Bridge Implementations Compared
When building an MQTT to Kafka bridge, you have three main options. Each implementation trades deployment complexity, latency, and operational cost differently, so choosing the right MQTT-to-Kafka bridge for your workload is critical.

Option A: EMQX Native Kafka Bridge
Since EMQX 5.0, the broker includes a built-in Kafka bridge. You define a Kafka sink in EMQX’s rule engine (via YAML or dashboard), and matching MQTT messages are forwarded to Kafka.
Pros:
– Zero separate services. The MQTT to Kafka bridge runs inside EMQX.
– Lowest latency. MQTT message → rule trigger → Kafka produce in microseconds.
– Built-in deduplication by EMQX’s message ID.
– Works with EMQX’s SQL rule engine for filtering and transforms.
Cons:
– Couples the MQTT-to-Kafka bridge logic to the MQTT broker. Schema changes mean restarting EMQX.
– Limited transformation. Complex payload decoding requires custom Lua functions.
– Less visibility. Bridge metrics are buried in EMQX telemetry.
– Enterprise-only for certain features (Kafka SASL auth, mTLS client certs).
Latency: <10 ms end-to-end (MQTT QoS 2 → Kafka).
Option B: Kafka Connect MQTT Source
Kafka Connect is a distributed framework for syncing data between systems. A Kafka Connect cluster runs worker tasks, each task pulls from one or more MQTT topics, and writes to Kafka topics.
Pros:
– Separation of concerns. The MQTT broker doesn’t know about Kafka; Connect handles the MQTT to Kafka bridge.
– Rich ecosystem. Schema Registry integration, Single Message Transforms (SMTs) for payload decoding.
– Kafka-native observability. Connect metrics integrate with Prometheus, Grafana.
– Easy to scale. Spin up more Connect workers if throughput grows.
Cons:
– Extra service. Kafka Connect cluster adds operational overhead (resource reservation, log shipping, monitoring).
– Slightly higher latency. MQTT publish → Connect poll (100–500 ms batches by default).
– Community connectors vary in quality. Confluent’s Premium MQTT connector is proprietary.
Latency: 100–500 ms batches, depending on poll.interval.ms.
Option C: Custom Consumer-Producer Service
Write a Python or Go service that subscribes to MQTT topics, decodes messages, and produces to Kafka. You control the entire MQTT to Kafka bridge pipeline.
Pros:
– Maximum control. Implement custom deduplication, context enrichment, failover logic in your MQTT-to-Kafka bridge.
– Simple to understand. Your code is the source of truth.
– Easy to debug. Logs, metrics, and traces are under your control.
Cons:
– You own the reliability. Implement error handling, retries, and exactly-once semantics yourself in your MQTT-to-Kafka bridge.
– Operational burden. Deploy, monitor, scale, and update a new service.
– May be overkill. For most use cases, the first two MQTT-to-Kafka bridge options are sufficient.
Latency: 10–100 ms, depending on buffering strategy.
Tutorial: Building an MQTT-to-Kafka Bridge with EMQX + Kafka Connect
In this practical tutorial, we’ll build a production MQTT to Kafka bridge using a hybrid approach: EMQX 5.4 for MQTT brokering and Kafka Connect for bridging. This MQTT-to-Kafka bridge design separates concerns and leverages each tool’s strengths.
Prerequisites
- Kubernetes 1.24+ cluster (minikube works for learning)
- Helm 3.x
- kubectl configured to your cluster
- 4 CPU, 8 GB RAM minimum (3 MQTT, 3 Kafka, 1 Connect worker)
Step 1: Deploy EMQX Cluster
Install the EMQX operator via Helm:
helm repo add emqx https://repos.emqx.io/helm
helm repo update
kubectl create namespace emqx
helm install emqx emqx/emqx -n emqx \
--set replicaCount=3 \
--set resources.limits.memory=1Gi \
--set resources.limits.cpu=1000m
Wait for three EMQX pods to reach Ready:
kubectl rollout status statefulset/emqx -n emqx
Expose the broker via a LoadBalancer or port-forward (for testing):
kubectl port-forward -n emqx svc/emqx 1883:1883 &
Step 2: Deploy Kafka with Strimzi
Strimzi is a Kubernetes operator for Kafka. Install it:
kubectl create namespace kafka
kubectl apply -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
# Wait for Strimzi operator to be ready
kubectl rollout status deployment/strimzi-cluster-operator -n kafka --timeout=300s
Deploy a 3-node Kafka cluster:
cat <<'EOF' | kubectl apply -n kafka -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: kafka-iot
spec:
kafka:
replicas: 3
resources:
requests:
memory: 512Mi
cpu: 250m
limits:
memory: 1Gi
cpu: 500m
storage:
type: ephemeral
listeners:
- name: plain
port: 9092
type: internal
tls: false
config:
offsets.topic.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
zookeeper:
replicas: 3
resources:
requests:
memory: 256Mi
cpu: 100m
limits:
memory: 512Mi
cpu: 250m
storage:
type: ephemeral
entityOperator:
topicOperator:
reconciliationIntervalSeconds: 60
userOperator:
reconciliationIntervalSeconds: 60
EOF
Wait for the Kafka cluster (5–10 minutes):
kubectl wait kafka/kafka-iot --for=condition=Ready --timeout=600s -n kafka
Create the target Kafka topic:
cat <<'EOF' | kubectl apply -n kafka -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: iot-raw
labels:
strimzi.io/cluster: kafka-iot
spec:
partitions: 3
replicationFactor: 3
config:
retention.ms: 604800000 # 7 days
min.insync.replicas: 2
EOF
Step 3: Deploy Kafka Connect
Deploy Kafka Connect as a Strimzi KafkaConnect cluster:
cat <<'EOF' | kubectl apply -n kafka -f -
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: kafka-connect-iot
spec:
replicas: 1
bootstrapServers: kafka-iot-kafka-bootstrap:9092
resources:
requests:
memory: 512Mi
cpu: 250m
limits:
memory: 1Gi
cpu: 500m
image: confluentinc/cp-kafka-connect:7.7.0
plugins:
- name: mqtt-connector
artifacts:
- url: https://repo1.maven.org/maven2/io/confluent/kafka-connect-mqtt/11.1.0/kafka-connect-mqtt-11.1.0-jar-with-dependencies.jar
sha512sum: <get from Confluent docs or use community variant>
EOF
(Note: For production, use Confluent’s Premium MQTT connector or the Lenses community connector. We show the structure here.)
Step 4: Create and Deploy the Connector
Define the MQTT source connector for your MQTT to Kafka bridge:
cat <<'EOF' | kubectl apply -n kafka -f -
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: mqtt-to-kafka-connector
labels:
strimzi.io/cluster: kafka-connect-iot
spec:
class: io.confluent.connect.mqtt.MqttSourceConnector
tasksMax: 2
config:
mqtt.server.uri: tcp://emqx.emqx.svc.cluster.local:1883
mqtt.topics: "factory/#"
kafka.topic: "iot-raw"
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.ByteArrayConverter
tasks.max: 2
confluent.topic.bootstrap.servers: kafka-iot-kafka-bootstrap:9092
confluent.topic.replication.factor: 3
confluent.topic.min.insync.replicas: 2
EOF
Monitor the connector:
kubectl logs -n kafka -l strimzi.io/name=kafka-connect-iot-connect -f
Step 5: Validate Your MQTT-to-Kafka Bridge End-to-End
Publish a test MQTT message:
docker run -it --rm eosrei/mosquitto-clients \
mosquitto_pub -h $(kubectl port-forward -n emqx svc/emqx 1883:1883 | tail -1 | cut -d':' -f2) \
-t "factory/line3/temp" \
-m '{"reading": 42.5, "unit": "C", "timestamp": 1713964800}'
Consume from the Kafka topic to verify the MQTT to Kafka bridge:
kubectl -n kafka run kafka-consumer \
--image=confluentinc/cp-kafka:7.7.0 \
--rm -it --restart=Never -- \
kafka-console-consumer \
--bootstrap-servers kafka-iot-kafka-bootstrap:9092 \
--topic iot-raw \
--from-beginning \
--max-messages 1
You should see the JSON message appear in the Kafka consumer output, confirming your MQTT to Kafka bridge is operational.
Sparkplug B Payload Decoding in Kafka
Sparkplug B is the emerging standard for IIoT edge nodes and gateways. Instead of arbitrary JSON, devices publish Protocol Buffer (Protobuf) payloads to topics like spBv1.0/factory/NDATA/gateway1. Kafka receives the raw binary.
Downstream applications need typed payloads: temperature: double, pressure: double, timestamp: int64. You decode Sparkplug B in a Kafka Streams topology:

Here’s a minimal Kafka Streams decoder in Python using confluent-kafka:
from confluent_kafka import Consumer, Producer
import sparkplug_b_pb2 # From Eclipse Sparkplug B specification
import json
def decode_sparkplug(payload: bytes) -> dict:
"""Decode Sparkplug B Protobuf payload."""
message = sparkplug_b_pb2.Payload()
message.ParseFromString(payload)
metrics = {}
for metric in message.metrics:
if metric.HasField('int_value'):
metrics[metric.name] = metric.int_value
elif metric.HasField('double_value'):
metrics[metric.name] = metric.double_value
elif metric.HasField('string_value'):
metrics[metric.name] = metric.string_value
return {
'timestamp': message.timestamp,
'seq': message.seq,
'metrics': metrics,
}
def main():
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'sparkplug-decoder',
'auto.offset.reset': 'earliest',
})
producer = Producer({'bootstrap.servers': 'localhost:9092'})
consumer.subscribe(['iot.raw'])
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
try:
decoded = decode_sparkplug(msg.value())
# Emit typed metrics to separate topics
for metric_name, value in decoded['metrics'].items():
topic = f'iot.{metric_name.lower()}'
producer.produce(
topic,
key=msg.key(),
value=json.dumps({'value': value, 'timestamp': decoded['timestamp']}),
)
consumer.commit()
except Exception as e:
print(f"Decode error: {e}")
continue
if __name__ == '__main__':
main()
This topology separates concerns: raw binary handling in Kafka Streams, typed metrics in downstream topics, and schema evolution managed by Schema Registry.
Exactly-Once Semantics in Your MQTT-to-Kafka Bridge: Realistic Expectations
“Exactly once” is a promise many systems make. Few deliver end-to-end. Let’s be honest about what your MQTT to Kafka bridge can realistically achieve.
MQTT QoS 2 guarantees delivery at least once. Kafka idempotent producers (enable.idempotence=true) deduplicate retries by producer instance ID and sequence number. Kafka transactional writes (transactional.id set) make commits atomic. Combine them, and you get effectively once from MQTT → Kafka in your MQTT-to-Kafka bridge:

- MQTT QoS 2: Device sends message with a packet ID. Broker acknowledges only after persisting. Retries include the same packet ID, broker deduplicates.
- Bridge deduplication: Store (deviceId, seq) tuples in a dedupe cache (Redis, local RocksDB, Kafka compacted topic). On incoming MQTT message, check if (deviceId, seq) exists. If yes, skip; if no, proceed and cache the tuple.
- Transactional Kafka producer: Set transactional.id and enable.idempotence. The producer buffers messages, commits them atomically to Kafka, and retries are idempotent.
- Consumer isolation: Set isolation.level=read_committed in the consumer group. Only committed messages are visible, eliminating partially-written transactions.
Reality check: True end-to-end exactly-once requires device-level deduplication keys (timestamps, sequence numbers, device IDs). MQTT QoS 1 is insufficient; you need QoS 2 and retries must include metadata. In practice, this overhead (caching, transactional writes, committed reads) adds 50–200 ms latency. For real-time IoT, approximate exactly-once (with <1% data duplication and monitoring for outliers) is often acceptable.
Trade-offs and Failure Modes
Backpressure
What happens if Kafka is slow (brokers overloaded, network jitter, clock skew in cluster)? EMQX or Kafka Connect buffers MQTT messages in memory. Buffers fill, and incoming publishes are dropped. Devices see DISCONNECT.
Mitigation: Monitor bridge lag (kafka_connect_source_record_lag_max, emqx_bridge_queue_len). Set Kafka retention high enough for devices to reconnect and replay. Use MQTT QoS 2 for critical messages.
Schema Evolution
You add a new sensor, and MQTT payloads gain a humidity field. Downstream Kafka consumers crash parsing the old schema. Integrate with Confluent Schema Registry: maintain a schema version in each message, or use Subject Name Strategy to isolate schema versions by topic.
Message Ordering Across Topics
MQTT messages from different sensors to different topics (factory/line1/temp, factory/line2/temp) arrive at Kafka in non-deterministic order. If you need strict ordering, put everything in a single topic and shard by device ID in the key.
Multi-Region Replication
Replicate Kafka to a second region (cross-cluster Mirror Maker 2, or cloud-native mirroring). MQTT broker failover is harder: devices must reconnect, rediscover brokers, and may lose in-flight messages.
Secret Management
EMQX and Kafka Connect need credentials for each other. Store SASL passwords, client certs, and API keys in Kubernetes Secrets (encrypted at rest) or HashiCorp Vault. Rotate quarterly.
Observability Gap
If the bridge crashes, you don’t automatically know. Implement end-to-end monitoring: publish a test MQTT message every 10 seconds, verify it appears in Kafka within 30 seconds. Alert if the gap exceeds 60 seconds.
Production Recommendations for Your MQTT-to-Kafka Bridge
Deploying an MQTT-to-Kafka bridge at scale requires careful capacity planning. A well-designed MQTT to Kafka bridge balances throughput, latency, and resource consumption.
Sizing Rules of Thumb for MQTT-to-Kafka Bridge Deployments
- EMQX node: 1000 msg/sec throughput per node (depending on message size, QoS, and persistence). For 10k msg/sec, plan 10+ nodes or use clustering with load balancing.
- Kafka broker: 10 MB/sec replication throughput per broker (3x if you have 3 replicas). For 50 MB/sec inbound, size for 150 MB/sec replication.
- Kafka Connect task: 10 MB/sec per task (MQTT polling + Kafka produce). For 50 MB/sec, use 5 tasks across 3 workers.
- Memory footprint: EMQX (1 GB per 1M connections), Kafka broker (2 GB per broker), Kafka Connect (512 MB per task). Scale conservatively.
Key Monitoring Metrics for MQTT-to-Kafka Bridges
Monitor these via Prometheus + Grafana or your observability platform:
emqx_connections_count: Active MQTT connections.emqx_messages_sent_total: Messages published to MQTT topics.emqx_bridge_queue_len: Pending messages in the Kafka bridge queue.kafka_connect_source_record_poll_total: Records polled from MQTT.kafka_connect_source_record_production_total: Records written to Kafka.kafka_connect_source_record_lag_max: Lag between MQTT publish and Kafka produce.kafka_brokers_online: Kafka broker count (alert if < replication factor).kafka_topic_partition_under_replicated_partitions: Under-replicated partitions (early warning of broker failure).
Disaster Recovery Design
Mirrored MQTT brokers in two regions, cross-region Kafka replication via MirrorMaker 2, and bridge tasks configured for failover:
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: mqtt-to-kafka-connector-dr
spec:
class: io.confluent.connect.mqtt.MqttSourceConnector
config:
mqtt.server.uri: tcp://emqx-dr.emqx-dr.svc.cluster.local:1883
mqtt.topics: "factory/#"
kafka.topic: "iot-raw"
consumer.override.isolation.level: read_committed
producer.override.enable.idempotence: "true"
producer.override.transactional.id: "mqtt-bridge-dr"
tasks.max: 2
Test failover quarterly: kill the primary MQTT broker and confirm devices reconnect to the secondary within 30 seconds.
FAQ
Q: Can I use Kafka’s native MQTT proxy instead of a separate bridge?
A: Kafka 3.0+ has an experimental MQTT proxy. Use it if you’re already managing Kafka and want one fewer service. Trade-off: it’s newer, less battle-tested at scale, and limited to Kafka brokers (no multi-region MQTT clustering).
Q: What’s the end-to-end latency of the MQTT to Kafka bridge?
A: EMQX native MQTT-to-Kafka bridge: <10 ms. Kafka Connect: 100–500 ms (polling-based). Custom service: 10–100 ms. For most IoT use cases (machine monitoring, environmental sensors), 100–500 ms is acceptable. For ultra-low-latency control loops, native bridge or custom service is preferred.
Q: How do I handle MQTT 5 properties (user properties, response topic, correlation data) in Kafka?
A: Kafka messages don’t have first-class support for MQTT properties. Pack them in the Kafka value (JSON or Protobuf) or use Kafka headers to map MQTT metadata. Kafka Connect’s SMTs help here.
Q: Is the EMQX Kafka bridge open source or commercial?
A: Open source. All major EMQX features, including the Kafka bridge, are in the community edition. Enterprise add-ons (LDAP auth, resource limits, advanced monitoring) are paid.
Q: How many devices can a single MQTT to Kafka bridge handle?
A: A 3-node EMQX cluster with bridge can push 10k–50k msg/sec to Kafka, depending on payload size, QoS, and network. A single Kafka Connect task (on moderate hardware) can handle 5k–10k msg/sec. Scale by adding Connect workers, not EMQX nodes (bridging is not MQTT broker’s bottleneck).
Q: What if the Kafka cluster goes down?
A: EMQX buffers messages in memory (default 100 MB per connection). Kafka Connect tasks pause and retry. After 1–2 hours of failure, messages are dropped. Use persistent message stores (EMQX Enterprise) or implement a sidecar cache (Redis) to buffer during Kafka outages.
Further Reading
Internal:
– EMQX MQTT Cluster on Kubernetes: Production Tutorial — Deep dive into EMQX deployment, clustering, and HA.
– Sparkplug B 3.0 Protocol & Unified Namespace Guide — Sparkplug schema design, topic conventions, and best practices.
– Industrial IoT — More tutorials on MQTT, Kafka, and edge integration.
External:
– EMQX Kafka Bridge Documentation
– Kafka Connect Architecture and Connectors
– Sparkplug B Specification — Official spec from the Eclipse Foundation.
– Confluent Schema Registry
Building Your MQTT-to-Kafka Bridge
Start small. Deploy one EMQX node, one Kafka broker (single-node), and one test MQTT device. Publish 1 msg/sec for a week. Monitor lag, error rates, and memory. Once you’re confident, scale to 3–5 nodes, increase device count, and integrate downstream processors.
An MQTT to Kafka bridge isn’t magic—it’s a durable queue. But a well-built MQTT-to-Kafka bridge lets IoT devices speak MQTT while enterprise systems speak Kafka—and they meet in the middle, with guarantees.
Ready to build your MQTT to Kafka bridge? Grab the code samples above, run them in Kubernetes, and start publishing.
Author: [Your Name] | Last updated: 2026-04-24 | Edit on GitHub
Advanced Patterns: Building Enterprise-Grade MQTT-to-Kafka Bridges
Beyond the basic setup, production deployments of your MQTT-to-Kafka bridge often require advanced patterns to handle edge cases and operational challenges at scale.
Context Enrichment
Raw MQTT payloads often lack metadata needed for downstream analytics. Your MQTT to Kafka bridge can enrich messages with:
- Device metadata: device location, model, firmware version, maintenance window
- Timing context: message age, network latency estimation, QoS confirmation
- Business context: facility ID, product line, shift assignments, SLA tiers
Implement enrichment as a separate Kafka Streams app between your bridge and downstream consumers. Query a reference dataset (PostgreSQL, Redis, or a compacted Kafka topic) to decorate each MQTT message with static or slowly-changing dimensions.
Dead-Letter Queues and Error Handling
Not every message will parse successfully. Your MQTT-to-Kafka bridge should handle failures gracefully:
# Pseudo-code for bridge with DLQ
try:
decoded = decode_sparkplug(msg.value())
producer.produce(topic='iot-metrics', value=json.dumps(decoded))
except Exception as e:
# Send to dead-letter topic for manual inspection
producer.produce(
topic='iot-dlq',
value=json.dumps({
'error': str(e),
'raw_payload': base64.encode(msg.value()),
'mqtt_topic': msg.topic(),
'timestamp': time.time(),
})
)
logging.error(f"Failed to decode message from {msg.topic()}: {e}")
Monitor the DLQ. A spike in errors indicates schema drift, malformed payloads, or a compromised MQTT client.
Filtering and Topic Remapping
MQTT hierarchies often don’t align with Kafka topic conventions. Your MQTT-to-Kafka bridge can filter and remap:
factory/building-A/line3/temp→iot.line3.temperature(flatter, more queryable)device-heartbeat/+/status→iot.heartbeats(aggregate by device class)errors/#→iot.errors(collect all alarms into one topic)
Define remapping rules in a configuration file and hot-reload them without restarting the bridge:
# remapping_rules.yaml
rules:
- match: "factory/*/temp"
target: "iot.temperature"
key_from: "mqtt_topic" # Use source topic as Kafka message key
- match: "device-*/*/status"
target: "iot.status"
key_from: "mqtt_topic"
- match: "errors/*"
target: "iot.errors"
Observability, Alerting, and Incident Response
An MQTT-to-Kafka bridge moving critical data demands deep observability. Beyond metrics, implement:
Distributed Tracing
Add OpenTelemetry instrumentation to trace message flow from MQTT device through Kafka to downstream consumers:
from opentelemetry import trace, metrics
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
otlp_exporter = OTLPSpanExporter(endpoint="localhost:4317")
trace.set_tracer_provider(TracerProvider(span_processors=[...]))
tracer = trace.get_tracer(__name__)
with tracer.start_as_current_span("mqtt_to_kafka_bridge") as span:
span.set_attribute("mqtt.topic", msg.topic())
span.set_attribute("mqtt.qos", msg.qos())
# Process and produce...
span.set_attribute("kafka.partition", partition)
span.set_attribute("kafka.offset", offset)
This gives you end-to-end visibility: which devices are slow, where latency spikes occur, and which downstream processors lag.
Alerting Strategy
Define alerts for:
- Bridge unavailability: No messages flowing for > 60 seconds. Page on-call engineer.
- Backlog growth:
emqx_bridge_queue_len> 10,000 for > 5 minutes. Indicates Kafka slowness or network issues. - Kafka lag:
kafka_connect_source_record_lag_max> 5 seconds. Suggests polling rate mismatch or downstream bottleneck. - Schema errors: DLQ message rate > 100/min. Indicates upstream change or malformed data.
- Resource saturation: EMQX memory > 80%, Kafka disk > 85%. Prevents cascading failures.
Pair alerts with runbooks:
- “Bridge unavailable” → Check EMQX and Kafka cluster health, verify network connectivity, review recent deployments.
- “Backlog growth” → Scale Kafka brokers or Kafka Connect workers, reduce message retention, investigate downstream consumers.
Log Aggregation
Stream bridge logs to a centralized system (ELK, Loki, CloudWatch):
kubectl logs -n kafka -l strimzi.io/name=kafka-connect-iot-connect -f | \
jq '. | {timestamp: .timestamp, level: .level, component: .logger_name, message: .message}' | \
tee >(curl -X POST http://loki:3100/loki/api/v1/push ...)
Operational Runbooks
Scaling Your MQTT-to-Kafka Bridge
When: Lag exceeds 5 seconds, or messages are dropped during peak loads.
Steps:
- Check current Kafka Connect task count:
kubectl get kafkaconnectors -n kafka mqtt-to-kafka-connector -o json | jq '.spec.config.tasks.max' - Increase
tasksMaxin the connector spec:kubectl patch kafkaconnector mqtt-to-kafka-connector -n kafka --type merge -p '{"spec":{"tasksMax":4}}' - Monitor lag:
kafka-consumer-groups.sh --describe --group connect-mqtt-to-kafka-connector --bootstrap-servers kafka-iot-kafka-bootstrap:9092 - If lag still grows, add more Connect workers:
kubectl scale deployment kafka-connect-iot-connect -n kafka --replicas=2
Recovering from Bridge Failure
When: No messages appear in Kafka for > 2 minutes.
Steps:
- Check connector status:
kubectl get kafkaconnectors -n kafka mqtt-to-kafka-connector -o jsonpath='{.status.connectorStatus.state}' - View recent logs:
kubectl logs -n kafka deployment/kafka-connect-iot-connect --tail=100 - If task is FAILED, restart it:
curl -X POST http://kafka-connect:8083/connectors/mqtt-to-kafka-connector/tasks/0/restart - Verify MQTT broker is reachable:
telnet emqx.emqx.svc.cluster.local 1883 - Verify Kafka is healthy:
kafka-broker-api-versions.sh --bootstrap-servers kafka-iot-kafka-bootstrap:9092 - If still stuck, redeploy the connector:
kubectl delete kafkaconnector mqtt-to-kafka-connector -n kafka && kubectl apply -f connector.yaml
Handling MQTT Broker Failover
When: Primary MQTT broker is down and devices need to reconnect.
Steps:
- Update devices’ MQTT broker addresses (via firmware update, DHCP config, or DNS failover) to point to secondary broker.
- Verify devices reconnect:
emqx_ctl clients list | grep "device-*" | wc -lshould show connection count recovering to baseline within 30–60 seconds. - Monitor the secondary MQTT-to-Kafka bridge instance for lag.
- Once primary is healthy, rebalance: point half the devices back to primary, monitor lag, then rebalance remaining.
Conclusion: Mastering the MQTT-to-Kafka Bridge
An MQTT to Kafka bridge is often overlooked as a simple pass-through. In reality, it’s a critical piece of industrial IoT infrastructure that requires careful design, tuning, and operational discipline.
The patterns in this tutorial—exactly-once semantics, schema management, observability, failover—are battle-tested at scale. Start with the basic Kafka Connect implementation, but plan for complexity: enrichment, remapping, DLQs, and multi-region replication become critical once you’re managing thousands of devices or petabytes of historical data.
Your MQTT-to-Kafka bridge is the bridge between worlds: MQTT’s simplicity and scale on one side, Kafka’s durability and analytics power on the other. Build it well, monitor it closely, and it will serve your IoT operations for years.
POSTEOF
echo “✓ Content expanded”
