Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0: A Working Tutorial
Asset tracking is one of the most common industrial IoT use cases — inventory on a factory floor, mobile equipment in a warehouse, fleet logistics across a region. But traditional polling-based systems hit latency walls and don’t scale beyond a few hundred devices. This tutorial walks you through building a production-grade real-time tracking system that handles thousands of devices, geofences them at millisecond latency, and visualizes them live in Grafana.
Architecture at a glance





We’ll use MQTT 5.0 (the 2019 standard that brought native header compression, user properties, and shared subscriptions), InfluxDB 3.0 (Apache Arrow engine with native geospatial support), and a lightweight Python geofence worker. By the end, you’ll have working code, Docker Compose configuration, and the architecture to add your own tracking payloads.
What We’re Building and Why MQTT 5.0 + InfluxDB 3.0
The challenge: Real-time asset tracking at scale demands three things:
1. Sub-second position updates from hundreds or thousands of mobile devices
2. Efficient wire protocol — MQTT 3.1.1 forces full topic names and no compression
3. Spatial queries — “Show me assets in zone B” or “Alert when equipment enters restricted area”
MQTT 5.0 solves (1) and (2). Its user properties let you pack metadata (device_id, tracker_type, accuracy) into the MQTT packet header without bloating the payload. Shared subscriptions distribute ingestion load across multiple writer workers. Topic aliases reduce topic string size from 100+ bytes to 2 bytes per update.
InfluxDB 3.0 solves (3). The new DataFusion/Apache Arrow query engine supports ST_Contains, ST_Distance, and geohash aggregations natively — no separate geospatial database needed. You ingest positions into a single measurement with lat/lon tags, and query zones directly.
What makes this different from a basic IoT stack:
– We’ll use EMQX (a production MQTT broker) with clustering support — not a toy mosquitto install
– InfluxDB 3.0 schema design for cardinality management (device_id as tag, not field)
– A dedicated geofence worker that subscribes to position updates and publishes alert payloads
– Grafana geomap visualization with live device trails
– Load-test harness to validate scaling past 500 concurrent devices
System Architecture
The tracking system has five moving parts: tracker devices, MQTT broker, ingestion pipeline, storage, and visualization.

Tracker devices publish MQTT messages every 2–5 seconds. Each message carries lat/lon, device_id, accuracy estimate, and timestamp. Devices connect over TLS (MQTT port 8883) or plaintext (1883 for local testing).
EMQX broker sits at the center. It routes position messages and geofence alerts. We’ll configure it with:
– User properties validation (check that device_id matches client certificate CN)
– Shared subscriptions (multiple workers read from same queue without duplication)
– Persistent sessions (if a device drops and reconnects, queued messages are delivered)
Influx writer service subscribes to tracking/+/position and batches writes to InfluxDB 3.0. We use shared subscriptions so that if one writer crashes, another picks up the traffic automatically.
Geofence worker subscribes to the same position topic, evaluates against a set of geofences (polygons, circles), and publishes alerts to tracking/+/alerts when an asset enters or exits a zone. This is where Shapely (a geometry library) shines.
InfluxDB 3.0 stores all positions in a single measurement (device_positions) with tags for device_id, zone, and tracker_type. Queries execute in microseconds because InfluxDB pushes predicates down to the Parquet layer.
Grafana geomap panel queries InfluxDB directly and plots devices on a map. Each device shows an icon (truck, forklift, etc.) and a trail of recent positions.

The MQTT topic hierarchy routes messages by device and message type:

Each device publishes to tracking/{device_id}/position with a JSON payload. The broker relays that to subscribers via the shared subscription group position_writers. The geofence worker also subscribes (in its own group geofence_workers) so it gets a copy of every position without blocking the ingestion pipeline.
InfluxDB schema for positions:

A single measurement (device_positions) with:
– Tags (indexed): device_id, zone, tracker_type
– Fields (not indexed, stored as columns): latitude, longitude, accuracy_m, speed_mps
– Timestamp: nanosecond precision, query with time >= now() - 10m
This design keeps cardinality low (device_id is a tag, not a field) and makes spatial queries fast.
Hands-On Tutorial: Build the System
Step 1: Docker Compose the Stack
Create a docker-compose.yml:
version: '3.8'
services:
emqx:
image: emqx/emqx:5.4-alpine
container_name: emqx-tracker
ports:
- "1883:1883" # MQTT plaintext
- "8883:8883" # MQTT over TLS
- "18083:18083" # Dashboard
environment:
EMQX_LOG_LEVEL: info
EMQX_ALLOW_ANONYMOUS: "true"
volumes:
- ./emqx.conf:/etc/emqx/emqx.conf:ro
healthcheck:
test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]
interval: 10s
timeout: 5s
retries: 5
influxdb:
image: influxdb:3.0-alpine
container_name: influx-tracker
ports:
- "8086:8086"
environment:
INFLUXDB_DB: tracking
INFLUXDB_ADMIN_USER: admin
INFLUXDB_ADMIN_PASSWORD: changeme
volumes:
- influx_data:/var/lib/influxdb2
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8086/health"]
interval: 10s
timeout: 5s
retries: 5
grafana:
image: grafana/grafana:11.0-alpine
container_name: grafana-tracker
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/dashboards:/etc/grafana/provisioning/dashboards:ro
depends_on:
- influxdb
influx-writer:
image: python:3.11-alpine
container_name: influx-writer
working_dir: /app
volumes:
- ./influx_writer.py:/app/influx_writer.py:ro
- ./requirements.txt:/app/requirements.txt:ro
command: sh -c "pip install -q -r requirements.txt && python influx_writer.py"
environment:
EMQX_HOST: emqx
EMQX_PORT: 1883
INFLUX_URL: http://influxdb:8086
INFLUX_ORG: myorg
INFLUX_BUCKET: tracking
INFLUX_TOKEN: my-token
depends_on:
emqx:
condition: service_healthy
influxdb:
condition: service_healthy
geofence-worker:
image: python:3.11-alpine
container_name: geofence-worker
working_dir: /app
volumes:
- ./geofence_worker.py:/app/geofence_worker.py:ro
- ./requirements.txt:/app/requirements.txt:ro
- ./geofences.json:/app/geofences.json:ro
command: sh -c "pip install -q -r requirements.txt && python geofence_worker.py"
environment:
EMQX_HOST: emqx
EMQX_PORT: 1883
GEOFENCES_FILE: /app/geofences.json
depends_on:
emqx:
condition: service_healthy
volumes:
influx_data:
grafana_data:
Create requirements.txt:
paho-mqtt==2.1.0
influxdb-client==1.18.0
shapely==2.0.2
requests==2.31.0
Start the stack:
docker-compose up -d
Wait for health checks to pass. EMQX dashboard will be at http://localhost:18083, Grafana at http://localhost:3000.
Step 2: Tracker Publisher (Python, MQTT 5.0)
Create tracker_publisher.py:
import paho.mqtt.client as mqtt
import json
import time
import random
import math
# MQTT 5.0 client
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="tracker-sim-1")
# Device configuration
DEVICE_ID = "ASSET-001"
BASE_LAT, BASE_LON = 40.7128, -74.0060 # NYC origin
SPEED_MPS = 2.0 # ~7 km/h walk speed
UPDATE_INTERVAL_S = 2
# Track position with Brownian motion (simulates realistic drift)
lat, lon = BASE_LAT, BASE_LON
def on_connect(client, userdata, connect_flags, reason_code, properties):
if reason_code == 0:
print(f"[{DEVICE_ID}] Connected to broker")
else:
print(f"[{DEVICE_ID}] Connection failed with code {reason_code}")
def on_publish(client, userdata, mid, reason_code, properties):
if reason_code == mqtt.MQTT_ERR_SUCCESS:
print(f"[{DEVICE_ID}] Published position update")
client.on_connect = on_connect
client.on_publish = on_publish
# Connect
client.connect("localhost", 1883, keepalive=60)
# MQTT 5.0: Set user properties for device metadata
client.user_property_list = [
("device_id", DEVICE_ID),
("tracker_type", "mobile-gps"),
("fw_version", "1.2.3"),
]
client.loop_start()
print(f"Starting publisher for {DEVICE_ID}...")
time.sleep(1)
# Publish positions every 2 seconds with realistic movement
for i in range(100):
# Random walk (Brownian motion)
angle = random.uniform(0, 2 * math.pi)
step_size = SPEED_MPS * UPDATE_INTERVAL_S / 111000 # m to lat/lon
lat += math.cos(angle) * step_size * 0.5
lon += math.sin(angle) * step_size * 0.5
# Clip to New York area
lat = max(40.70, min(40.72, lat))
lon = max(-74.02, min(-73.98, lon))
payload = {
"device_id": DEVICE_ID,
"lat": round(lat, 6),
"lon": round(lon, 6),
"accuracy_m": random.randint(5, 15),
"speed_mps": random.uniform(0, SPEED_MPS),
"ts": int(time.time() * 1000),
}
topic = f"tracking/{DEVICE_ID}/position"
client.publish(
topic,
json.dumps(payload),
qos=1,
retain=False
)
time.sleep(UPDATE_INTERVAL_S)
client.loop_stop()
client.disconnect()
print(f"[{DEVICE_ID}] Done")
Run this in a separate terminal:
python tracker_publisher.py
You should see “Published position update” messages. Check the EMQX dashboard — the subscription count on tracking/ASSET-001/position should show active subscribers.
Step 3: InfluxDB 3.0 Schema and Writer Service
Create influx_writer.py:
import os
import json
import time
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
# InfluxDB setup
INFLUX_URL = os.getenv("INFLUX_URL", "http://localhost:8086")
INFLUX_ORG = os.getenv("INFLUX_ORG", "myorg")
INFLUX_BUCKET = os.getenv("INFLUX_BUCKET", "tracking")
INFLUX_TOKEN = os.getenv("INFLUX_TOKEN", "my-token")
client_influx = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client_influx.write_api(write_options=SYNCHRONOUS)
# MQTT setup
EMQX_HOST = os.getenv("EMQX_HOST", "localhost")
EMQX_PORT = int(os.getenv("EMQX_PORT", "1883"))
client_mqtt = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="influx-writer-1")
def on_connect(client, userdata, connect_flags, reason_code, properties):
if reason_code == 0:
print("[Influx Writer] Connected to MQTT broker")
# Shared subscription: multiple writers can consume without duplication
client.subscribe("tracking/+/position", qos=1, options=mqtt.SubscribeOptions(shared_subscription_group="position_writers"))
else:
print(f"[Influx Writer] Connection failed: {reason_code}")
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode('utf-8'))
device_id = payload.get("device_id", "unknown")
lat = payload.get("lat")
lon = payload.get("lon")
accuracy = payload.get("accuracy_m", 0)
speed = payload.get("speed_mps", 0)
ts_ms = payload.get("ts", int(time.time() * 1000))
# Build InfluxDB line protocol
# Measurement: device_positions
# Tags: device_id, tracker_type
# Fields: latitude, longitude, accuracy_m, speed_mps
line = (
f'device_positions,'
f'device_id={device_id},'
f'tracker_type=mobile-gps '
f'latitude={lat},'
f'longitude={lon},'
f'accuracy_m={accuracy},'
f'speed_mps={speed} '
f'{int(ts_ms * 1_000_000)}' # Convert ms to nanoseconds
)
write_api.write(INFLUX_BUCKET, INFLUX_ORG, line)
print(f"[Influx Writer] Wrote position for {device_id}: ({lat}, {lon})")
except Exception as e:
print(f"[Influx Writer] Error: {e}")
client_mqtt.on_connect = on_connect
client_mqtt.on_message = on_message
print(f"[Influx Writer] Connecting to {EMQX_HOST}:{EMQX_PORT}...")
client_mqtt.connect(EMQX_HOST, EMQX_PORT, keepalive=60)
client_mqtt.loop_forever()
This writer uses MQTT 5.0 shared subscriptions. Multiple instances can run, and each message is delivered to exactly one worker.
Step 4: Geofence Worker
Create geofences.json:
{
"zones": [
{
"name": "Warehouse A",
"type": "polygon",
"coords": [
[40.7126, -74.0060],
[40.7130, -74.0060],
[40.7130, -74.0055],
[40.7126, -74.0055]
]
},
{
"name": "Loading Dock",
"type": "circle",
"center": [40.7128, -74.0050],
"radius_m": 50
}
]
}
Create geofence_worker.py:
import os
import json
import time
import paho.mqtt.client as mqtt
from shapely.geometry import Polygon, Point
from shapely.geometry import shape as geom_shape
# Load geofences
GEOFENCES_FILE = os.getenv("GEOFENCES_FILE", "geofences.json")
with open(GEOFENCES_FILE) as f:
geofences_config = json.load(f)
# Parse geofences into Shapely objects
geofences = {}
for zone in geofences_config["zones"]:
name = zone["name"]
if zone["type"] == "polygon":
geofences[name] = Polygon(zone["coords"])
elif zone["type"] == "circle":
center = Point(zone["center"])
# Convert radius in meters to degrees (approx 1 degree = 111 km)
radius_deg = zone["radius_m"] / 111000
geofences[name] = center.buffer(radius_deg)
# Track device state (in which zones currently)
device_zones = {}
EMQX_HOST = os.getenv("EMQX_HOST", "localhost")
EMQX_PORT = int(os.getenv("EMQX_PORT", "1883"))
client_mqtt = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="geofence-worker-1")
def on_connect(client, userdata, connect_flags, reason_code, properties):
if reason_code == 0:
print("[Geofence Worker] Connected to MQTT broker")
client.subscribe("tracking/+/position", qos=1, options=mqtt.SubscribeOptions(shared_subscription_group="geofence_workers"))
else:
print(f"[Geofence Worker] Connection failed: {reason_code}")
def on_message(client, userdata, msg):
try:
payload = json.loads(msg.payload.decode('utf-8'))
device_id = payload.get("device_id")
lat = payload.get("lat")
lon = payload.get("lon")
position = Point(lat, lon)
# Check which zones contain this position
current_zones = set()
for zone_name, zone_geom in geofences.items():
if zone_geom.contains(position):
current_zones.add(zone_name)
# Track state changes (entry/exit)
previous_zones = device_zones.get(device_id, set())
# Entries: zones in current but not in previous
entries = current_zones - previous_zones
for zone in entries:
alert = {
"device_id": device_id,
"zone": zone,
"event": "entry",
"ts": int(time.time() * 1000)
}
client.publish(
f"tracking/{device_id}/alerts",
json.dumps(alert),
qos=1
)
print(f"[Geofence Worker] {device_id} ENTERED {zone}")
# Exits: zones in previous but not in current
exits = previous_zones - current_zones
for zone in exits:
alert = {
"device_id": device_id,
"zone": zone,
"event": "exit",
"ts": int(time.time() * 1000)
}
client.publish(
f"tracking/{device_id}/alerts",
json.dumps(alert),
qos=1
)
print(f"[Geofence Worker] {device_id} EXITED {zone}")
# Update state
device_zones[device_id] = current_zones
except Exception as e:
print(f"[Geofence Worker] Error: {e}")
client_mqtt.on_connect = on_connect
client_mqtt.on_message = on_message
print(f"[Geofence Worker] Connecting to {EMQX_HOST}:{EMQX_PORT}...")
client_mqtt.connect(EMQX_HOST, EMQX_PORT, keepalive=60)
client_mqtt.loop_forever()
This worker evaluates positions against geofences and publishes entry/exit events without blocking the ingestion pipeline (separate shared subscription group).

Step 5: Grafana Geomap Dashboard
In Grafana (http://localhost:3000, admin/admin), add InfluxDB as a data source:
- Configuration → Data Sources → Add InfluxDB
- URL:
http://influxdb:8086 - Database:
tracking - User:
admin, Password:changeme
Create a new dashboard and add a Geomap panel:
- Data source: InfluxDB
- Query:
from(bucket: "tracking")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "device_positions")
|> filter(fn: (r) => r._field == "latitude" or r._field == "longitude")
|> last() - Map center: 40.7128, -74.0060
- Zoom: 12
The geomap will plot all devices as points with markers, color-coded by device_id.
Step 6: Load Test with mqttloader
Install mqttloader and generate 500 concurrent devices:
pip install mqttloader
mqttloader -b mqtt://localhost:1883 -s 100 -c 500 -t "tracking/ASSET-{}/position" \
-m '{"device_id":"ASSET-{}","lat":40.7128,"lon":-74.0060,"accuracy_m":10}' \
-i 2000
This simulates 500 devices publishing every 2 seconds. Watch Grafana and InfluxDB metrics:
curl -s 'http://localhost:8086/api/v2/metrics' | jq '.metrics[] | select(.name == "influxdb_http_requests_total")'
You should see write throughput of 250 writes/sec with sub-100ms latency.
Trade-offs and Gotchas
QoS 1 vs QoS 2 at scale: We use QoS 1 (at-least-once delivery). QoS 2 (exactly-once) requires a 4-message handshake per publish — at 250 msgs/sec, that’s 1000 MQTT control messages/sec. Stick with QoS 1 unless you absolutely need exactly-once (e.g., billing events).
MQTT 5 user properties size: User properties are handy for metadata, but each property adds ~30 bytes. If you’re tracking thousands of devices, keep user properties minimal. Move rich metadata into the JSON payload instead.
InfluxDB cardinality explosion: If you store device_id as a field instead of a tag, cardinality explodes and query performance tanks. Always use tags for high-cardinality dimensions (device_id, zone, tracker_type).
Position accuracy budgets: GPS alone drifts 5–10 meters in urban canyons. Indoor tracking needs WiFi RSSI fingerprinting or BLE trilateration. Document your accuracy assumption (±10m, ±50m) and adjust alert thresholds.
Geofence evaluation latency: Shapely’s contains() is fast (~1 microsecond per check), but if you have 1000 geofences, that’s 1ms per position. Pre-filter with bounding boxes or use R-tree spatial indexing if you scale beyond 100 zones.
Practical Recommendations
-
Cluster EMQX for high availability: Run at least 3 EMQX nodes in Kubernetes. Devices will reconnect to another broker on failover. Configure persistent sessions with
broker.session_lru_trim_size: 5000000. -
Batch writes to InfluxDB: Instead of single-message writes, buffer 100 positions and flush every 10 seconds. This cuts network round-trips by 100x.
-
Use InfluxDB retention policies: Set a retention of 7 days for raw positions, 1 year for hourly aggregates.
ALTER DATABASE tracking SET DEFAULT RETENTION "7d". -
Monitor geofence worker lag: If the geofence worker falls behind, devices will miss entry/exit events. Add Prometheus metrics to track message age in the queue.
-
Encrypt MQTT traffic: Always use TLS (port 8883) in production. Self-signed certs are fine for internal clusters, but require client certificate verification.
-
Test failover scenarios: Kill the influx-writer pod and verify another instance picks up the traffic. This is where shared subscriptions shine.
Frequently Asked Questions
What’s new in MQTT 5.0 vs 3.1.1?
MQTT 5.0 added user properties (attach arbitrary metadata to packets), topic aliases (reduce topic string size from 100+ bytes to 2), shared subscriptions (distribute work among multiple subscribers), and reason codes (fine-grained error reporting). For asset tracking, user properties and shared subscriptions are game-changers.
Can InfluxDB 3.0 handle geospatial queries?
Yes. InfluxDB 3.0’s DataFusion engine supports ST_Contains, ST_Distance, and ST_GeomFromText for PostGIS-compatible queries. You can query “all devices within 500m of point (lat, lon)” directly in SQL: SELECT * FROM device_positions WHERE ST_Distance(ST_Point(longitude, latitude), ST_Point(-74.0060, 40.7128)) < 500.
How many devices can one MQTT broker support?
A single EMQX 5.x node handles 5–10 million concurrent connections on modern hardware (64 GB RAM, NVMe SSD). For persistent messages and geofence processing, budget 100–500 devices per worker. A 3-node cluster can handle 10,000+ active devices easily.
How do you do geofencing on MQTT streams?
Subscribe to the position topic, evaluate each position against a set of geofences (polygons or circles) using a library like Shapely, and publish alerts when entry/exit events occur. A dedicated geofence worker decouples this from the ingestion pipeline so alerting doesn’t slow down writes.
Is InfluxDB 3.0 backward compatible with InfluxDB 2.x?
Partially. InfluxDB 3.0 is a rewrite with a new query engine (DataFusion, not Flux). It speaks the HTTP API and supports Flux queries, but performance and behavior differ. For new projects, use native SQL. For migrations, plan for reworking dashboards and alert rules.
Further Reading
- MQTT 5.0 Specification
- InfluxDB 3.0 Documentation
- Grafana Geomap Visualizations
- Related: Complete Technical Guide to MQTT Protocol
- Related: EMQX MQTT Cluster on Kubernetes
- Related: Unified Namespace Architecture for Industrial IoT
- Related: Time-Series Database Internals: InfluxDB, TimescaleDB, QuestDB
- Related: IoT Protocol Latency Benchmarks
