Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial

Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial

Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0: A Working Tutorial

Asset tracking is one of the most common industrial IoT use cases — inventory on a factory floor, mobile equipment in a warehouse, fleet logistics across a region. But traditional polling-based systems hit latency walls and don’t scale beyond a few hundred devices. This tutorial walks you through building a production-grade real-time tracking system that handles thousands of devices, geofences them at millisecond latency, and visualizes them live in Grafana.

Architecture at a glance

Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial — architecture diagram
Architecture diagram — Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial
Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial — architecture diagram
Architecture diagram — Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial
Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial — architecture diagram
Architecture diagram — Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial
Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial — architecture diagram
Architecture diagram — Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial
Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial — architecture diagram
Architecture diagram — Real-Time Asset Tracking with MQTT 5.0 and InfluxDB 3.0 Tutorial

We’ll use MQTT 5.0 (the 2019 standard that brought native header compression, user properties, and shared subscriptions), InfluxDB 3.0 (Apache Arrow engine with native geospatial support), and a lightweight Python geofence worker. By the end, you’ll have working code, Docker Compose configuration, and the architecture to add your own tracking payloads.

What We’re Building and Why MQTT 5.0 + InfluxDB 3.0

The challenge: Real-time asset tracking at scale demands three things:
1. Sub-second position updates from hundreds or thousands of mobile devices
2. Efficient wire protocol — MQTT 3.1.1 forces full topic names and no compression
3. Spatial queries — “Show me assets in zone B” or “Alert when equipment enters restricted area”

MQTT 5.0 solves (1) and (2). Its user properties let you pack metadata (device_id, tracker_type, accuracy) into the MQTT packet header without bloating the payload. Shared subscriptions distribute ingestion load across multiple writer workers. Topic aliases reduce topic string size from 100+ bytes to 2 bytes per update.

InfluxDB 3.0 solves (3). The new DataFusion/Apache Arrow query engine supports ST_Contains, ST_Distance, and geohash aggregations natively — no separate geospatial database needed. You ingest positions into a single measurement with lat/lon tags, and query zones directly.

What makes this different from a basic IoT stack:
– We’ll use EMQX (a production MQTT broker) with clustering support — not a toy mosquitto install
– InfluxDB 3.0 schema design for cardinality management (device_id as tag, not field)
– A dedicated geofence worker that subscribes to position updates and publishes alert payloads
– Grafana geomap visualization with live device trails
– Load-test harness to validate scaling past 500 concurrent devices

System Architecture

The tracking system has five moving parts: tracker devices, MQTT broker, ingestion pipeline, storage, and visualization.

System Architecture Diagram

Tracker devices publish MQTT messages every 2–5 seconds. Each message carries lat/lon, device_id, accuracy estimate, and timestamp. Devices connect over TLS (MQTT port 8883) or plaintext (1883 for local testing).

EMQX broker sits at the center. It routes position messages and geofence alerts. We’ll configure it with:
– User properties validation (check that device_id matches client certificate CN)
– Shared subscriptions (multiple workers read from same queue without duplication)
– Persistent sessions (if a device drops and reconnects, queued messages are delivered)

Influx writer service subscribes to tracking/+/position and batches writes to InfluxDB 3.0. We use shared subscriptions so that if one writer crashes, another picks up the traffic automatically.

Geofence worker subscribes to the same position topic, evaluates against a set of geofences (polygons, circles), and publishes alerts to tracking/+/alerts when an asset enters or exits a zone. This is where Shapely (a geometry library) shines.

InfluxDB 3.0 stores all positions in a single measurement (device_positions) with tags for device_id, zone, and tracker_type. Queries execute in microseconds because InfluxDB pushes predicates down to the Parquet layer.

Grafana geomap panel queries InfluxDB directly and plots devices on a map. Each device shows an icon (truck, forklift, etc.) and a trail of recent positions.

Docker Compose Service Graph

The MQTT topic hierarchy routes messages by device and message type:

MQTT Topic Hierarchy

Each device publishes to tracking/{device_id}/position with a JSON payload. The broker relays that to subscribers via the shared subscription group position_writers. The geofence worker also subscribes (in its own group geofence_workers) so it gets a copy of every position without blocking the ingestion pipeline.

InfluxDB schema for positions:

InfluxDB 3.0 Schema

A single measurement (device_positions) with:
Tags (indexed): device_id, zone, tracker_type
Fields (not indexed, stored as columns): latitude, longitude, accuracy_m, speed_mps
Timestamp: nanosecond precision, query with time >= now() - 10m

This design keeps cardinality low (device_id is a tag, not a field) and makes spatial queries fast.

Hands-On Tutorial: Build the System

Step 1: Docker Compose the Stack

Create a docker-compose.yml:

version: '3.8'

services:
  emqx:
    image: emqx/emqx:5.4-alpine
    container_name: emqx-tracker
    ports:
      - "1883:1883"    # MQTT plaintext
      - "8883:8883"    # MQTT over TLS
      - "18083:18083"  # Dashboard
    environment:
      EMQX_LOG_LEVEL: info
      EMQX_ALLOW_ANONYMOUS: "true"
    volumes:
      - ./emqx.conf:/etc/emqx/emqx.conf:ro
    healthcheck:
      test: ["CMD", "/opt/emqx/bin/emqx_ctl", "status"]
      interval: 10s
      timeout: 5s
      retries: 5

  influxdb:
    image: influxdb:3.0-alpine
    container_name: influx-tracker
    ports:
      - "8086:8086"
    environment:
      INFLUXDB_DB: tracking
      INFLUXDB_ADMIN_USER: admin
      INFLUXDB_ADMIN_PASSWORD: changeme
    volumes:
      - influx_data:/var/lib/influxdb2
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8086/health"]
      interval: 10s
      timeout: 5s
      retries: 5

  grafana:
    image: grafana/grafana:11.0-alpine
    container_name: grafana-tracker
    ports:
      - "3000:3000"
    environment:
      GF_SECURITY_ADMIN_PASSWORD: admin
    volumes:
      - grafana_data:/var/lib/grafana
      - ./grafana/dashboards:/etc/grafana/provisioning/dashboards:ro
    depends_on:
      - influxdb

  influx-writer:
    image: python:3.11-alpine
    container_name: influx-writer
    working_dir: /app
    volumes:
      - ./influx_writer.py:/app/influx_writer.py:ro
      - ./requirements.txt:/app/requirements.txt:ro
    command: sh -c "pip install -q -r requirements.txt && python influx_writer.py"
    environment:
      EMQX_HOST: emqx
      EMQX_PORT: 1883
      INFLUX_URL: http://influxdb:8086
      INFLUX_ORG: myorg
      INFLUX_BUCKET: tracking
      INFLUX_TOKEN: my-token
    depends_on:
      emqx:
        condition: service_healthy
      influxdb:
        condition: service_healthy

  geofence-worker:
    image: python:3.11-alpine
    container_name: geofence-worker
    working_dir: /app
    volumes:
      - ./geofence_worker.py:/app/geofence_worker.py:ro
      - ./requirements.txt:/app/requirements.txt:ro
      - ./geofences.json:/app/geofences.json:ro
    command: sh -c "pip install -q -r requirements.txt && python geofence_worker.py"
    environment:
      EMQX_HOST: emqx
      EMQX_PORT: 1883
      GEOFENCES_FILE: /app/geofences.json
    depends_on:
      emqx:
        condition: service_healthy

volumes:
  influx_data:
  grafana_data:

Create requirements.txt:

paho-mqtt==2.1.0
influxdb-client==1.18.0
shapely==2.0.2
requests==2.31.0

Start the stack:

docker-compose up -d

Wait for health checks to pass. EMQX dashboard will be at http://localhost:18083, Grafana at http://localhost:3000.

Step 2: Tracker Publisher (Python, MQTT 5.0)

Create tracker_publisher.py:

import paho.mqtt.client as mqtt
import json
import time
import random
import math

# MQTT 5.0 client
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="tracker-sim-1")

# Device configuration
DEVICE_ID = "ASSET-001"
BASE_LAT, BASE_LON = 40.7128, -74.0060  # NYC origin
SPEED_MPS = 2.0  # ~7 km/h walk speed
UPDATE_INTERVAL_S = 2

# Track position with Brownian motion (simulates realistic drift)
lat, lon = BASE_LAT, BASE_LON

def on_connect(client, userdata, connect_flags, reason_code, properties):
    if reason_code == 0:
        print(f"[{DEVICE_ID}] Connected to broker")
    else:
        print(f"[{DEVICE_ID}] Connection failed with code {reason_code}")

def on_publish(client, userdata, mid, reason_code, properties):
    if reason_code == mqtt.MQTT_ERR_SUCCESS:
        print(f"[{DEVICE_ID}] Published position update")

client.on_connect = on_connect
client.on_publish = on_publish

# Connect
client.connect("localhost", 1883, keepalive=60)

# MQTT 5.0: Set user properties for device metadata
client.user_property_list = [
    ("device_id", DEVICE_ID),
    ("tracker_type", "mobile-gps"),
    ("fw_version", "1.2.3"),
]

client.loop_start()

print(f"Starting publisher for {DEVICE_ID}...")
time.sleep(1)

# Publish positions every 2 seconds with realistic movement
for i in range(100):
    # Random walk (Brownian motion)
    angle = random.uniform(0, 2 * math.pi)
    step_size = SPEED_MPS * UPDATE_INTERVAL_S / 111000  # m to lat/lon
    lat += math.cos(angle) * step_size * 0.5
    lon += math.sin(angle) * step_size * 0.5

    # Clip to New York area
    lat = max(40.70, min(40.72, lat))
    lon = max(-74.02, min(-73.98, lon))

    payload = {
        "device_id": DEVICE_ID,
        "lat": round(lat, 6),
        "lon": round(lon, 6),
        "accuracy_m": random.randint(5, 15),
        "speed_mps": random.uniform(0, SPEED_MPS),
        "ts": int(time.time() * 1000),
    }

    topic = f"tracking/{DEVICE_ID}/position"
    client.publish(
        topic,
        json.dumps(payload),
        qos=1,
        retain=False
    )

    time.sleep(UPDATE_INTERVAL_S)

client.loop_stop()
client.disconnect()
print(f"[{DEVICE_ID}] Done")

Run this in a separate terminal:

python tracker_publisher.py

You should see “Published position update” messages. Check the EMQX dashboard — the subscription count on tracking/ASSET-001/position should show active subscribers.

Step 3: InfluxDB 3.0 Schema and Writer Service

Create influx_writer.py:

import os
import json
import time
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

# InfluxDB setup
INFLUX_URL = os.getenv("INFLUX_URL", "http://localhost:8086")
INFLUX_ORG = os.getenv("INFLUX_ORG", "myorg")
INFLUX_BUCKET = os.getenv("INFLUX_BUCKET", "tracking")
INFLUX_TOKEN = os.getenv("INFLUX_TOKEN", "my-token")

client_influx = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG)
write_api = client_influx.write_api(write_options=SYNCHRONOUS)

# MQTT setup
EMQX_HOST = os.getenv("EMQX_HOST", "localhost")
EMQX_PORT = int(os.getenv("EMQX_PORT", "1883"))

client_mqtt = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="influx-writer-1")

def on_connect(client, userdata, connect_flags, reason_code, properties):
    if reason_code == 0:
        print("[Influx Writer] Connected to MQTT broker")
        # Shared subscription: multiple writers can consume without duplication
        client.subscribe("tracking/+/position", qos=1, options=mqtt.SubscribeOptions(shared_subscription_group="position_writers"))
    else:
        print(f"[Influx Writer] Connection failed: {reason_code}")

def on_message(client, userdata, msg):
    try:
        payload = json.loads(msg.payload.decode('utf-8'))
        device_id = payload.get("device_id", "unknown")
        lat = payload.get("lat")
        lon = payload.get("lon")
        accuracy = payload.get("accuracy_m", 0)
        speed = payload.get("speed_mps", 0)
        ts_ms = payload.get("ts", int(time.time() * 1000))

        # Build InfluxDB line protocol
        # Measurement: device_positions
        # Tags: device_id, tracker_type
        # Fields: latitude, longitude, accuracy_m, speed_mps
        line = (
            f'device_positions,'
            f'device_id={device_id},'
            f'tracker_type=mobile-gps '
            f'latitude={lat},'
            f'longitude={lon},'
            f'accuracy_m={accuracy},'
            f'speed_mps={speed} '
            f'{int(ts_ms * 1_000_000)}'  # Convert ms to nanoseconds
        )

        write_api.write(INFLUX_BUCKET, INFLUX_ORG, line)
        print(f"[Influx Writer] Wrote position for {device_id}: ({lat}, {lon})")
    except Exception as e:
        print(f"[Influx Writer] Error: {e}")

client_mqtt.on_connect = on_connect
client_mqtt.on_message = on_message

print(f"[Influx Writer] Connecting to {EMQX_HOST}:{EMQX_PORT}...")
client_mqtt.connect(EMQX_HOST, EMQX_PORT, keepalive=60)
client_mqtt.loop_forever()

This writer uses MQTT 5.0 shared subscriptions. Multiple instances can run, and each message is delivered to exactly one worker.

Step 4: Geofence Worker

Create geofences.json:

{
  "zones": [
    {
      "name": "Warehouse A",
      "type": "polygon",
      "coords": [
        [40.7126, -74.0060],
        [40.7130, -74.0060],
        [40.7130, -74.0055],
        [40.7126, -74.0055]
      ]
    },
    {
      "name": "Loading Dock",
      "type": "circle",
      "center": [40.7128, -74.0050],
      "radius_m": 50
    }
  ]
}

Create geofence_worker.py:

import os
import json
import time
import paho.mqtt.client as mqtt
from shapely.geometry import Polygon, Point
from shapely.geometry import shape as geom_shape

# Load geofences
GEOFENCES_FILE = os.getenv("GEOFENCES_FILE", "geofences.json")
with open(GEOFENCES_FILE) as f:
    geofences_config = json.load(f)

# Parse geofences into Shapely objects
geofences = {}
for zone in geofences_config["zones"]:
    name = zone["name"]
    if zone["type"] == "polygon":
        geofences[name] = Polygon(zone["coords"])
    elif zone["type"] == "circle":
        center = Point(zone["center"])
        # Convert radius in meters to degrees (approx 1 degree = 111 km)
        radius_deg = zone["radius_m"] / 111000
        geofences[name] = center.buffer(radius_deg)

# Track device state (in which zones currently)
device_zones = {}

EMQX_HOST = os.getenv("EMQX_HOST", "localhost")
EMQX_PORT = int(os.getenv("EMQX_PORT", "1883"))

client_mqtt = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2, client_id="geofence-worker-1")

def on_connect(client, userdata, connect_flags, reason_code, properties):
    if reason_code == 0:
        print("[Geofence Worker] Connected to MQTT broker")
        client.subscribe("tracking/+/position", qos=1, options=mqtt.SubscribeOptions(shared_subscription_group="geofence_workers"))
    else:
        print(f"[Geofence Worker] Connection failed: {reason_code}")

def on_message(client, userdata, msg):
    try:
        payload = json.loads(msg.payload.decode('utf-8'))
        device_id = payload.get("device_id")
        lat = payload.get("lat")
        lon = payload.get("lon")

        position = Point(lat, lon)

        # Check which zones contain this position
        current_zones = set()
        for zone_name, zone_geom in geofences.items():
            if zone_geom.contains(position):
                current_zones.add(zone_name)

        # Track state changes (entry/exit)
        previous_zones = device_zones.get(device_id, set())

        # Entries: zones in current but not in previous
        entries = current_zones - previous_zones
        for zone in entries:
            alert = {
                "device_id": device_id,
                "zone": zone,
                "event": "entry",
                "ts": int(time.time() * 1000)
            }
            client.publish(
                f"tracking/{device_id}/alerts",
                json.dumps(alert),
                qos=1
            )
            print(f"[Geofence Worker] {device_id} ENTERED {zone}")

        # Exits: zones in previous but not in current
        exits = previous_zones - current_zones
        for zone in exits:
            alert = {
                "device_id": device_id,
                "zone": zone,
                "event": "exit",
                "ts": int(time.time() * 1000)
            }
            client.publish(
                f"tracking/{device_id}/alerts",
                json.dumps(alert),
                qos=1
            )
            print(f"[Geofence Worker] {device_id} EXITED {zone}")

        # Update state
        device_zones[device_id] = current_zones

    except Exception as e:
        print(f"[Geofence Worker] Error: {e}")

client_mqtt.on_connect = on_connect
client_mqtt.on_message = on_message

print(f"[Geofence Worker] Connecting to {EMQX_HOST}:{EMQX_PORT}...")
client_mqtt.connect(EMQX_HOST, EMQX_PORT, keepalive=60)
client_mqtt.loop_forever()

This worker evaluates positions against geofences and publishes entry/exit events without blocking the ingestion pipeline (separate shared subscription group).

Geofence Worker Sequence

Step 5: Grafana Geomap Dashboard

In Grafana (http://localhost:3000, admin/admin), add InfluxDB as a data source:

  1. Configuration → Data Sources → Add InfluxDB
  2. URL: http://influxdb:8086
  3. Database: tracking
  4. User: admin, Password: changeme

Create a new dashboard and add a Geomap panel:

  • Data source: InfluxDB
  • Query:
    from(bucket: "tracking")
    |> range(start: -10m)
    |> filter(fn: (r) => r._measurement == "device_positions")
    |> filter(fn: (r) => r._field == "latitude" or r._field == "longitude")
    |> last()
  • Map center: 40.7128, -74.0060
  • Zoom: 12

The geomap will plot all devices as points with markers, color-coded by device_id.

Step 6: Load Test with mqttloader

Install mqttloader and generate 500 concurrent devices:

pip install mqttloader
mqttloader -b mqtt://localhost:1883 -s 100 -c 500 -t "tracking/ASSET-{}/position" \
  -m '{"device_id":"ASSET-{}","lat":40.7128,"lon":-74.0060,"accuracy_m":10}' \
  -i 2000

This simulates 500 devices publishing every 2 seconds. Watch Grafana and InfluxDB metrics:

curl -s 'http://localhost:8086/api/v2/metrics' | jq '.metrics[] | select(.name == "influxdb_http_requests_total")'

You should see write throughput of 250 writes/sec with sub-100ms latency.

Trade-offs and Gotchas

QoS 1 vs QoS 2 at scale: We use QoS 1 (at-least-once delivery). QoS 2 (exactly-once) requires a 4-message handshake per publish — at 250 msgs/sec, that’s 1000 MQTT control messages/sec. Stick with QoS 1 unless you absolutely need exactly-once (e.g., billing events).

MQTT 5 user properties size: User properties are handy for metadata, but each property adds ~30 bytes. If you’re tracking thousands of devices, keep user properties minimal. Move rich metadata into the JSON payload instead.

InfluxDB cardinality explosion: If you store device_id as a field instead of a tag, cardinality explodes and query performance tanks. Always use tags for high-cardinality dimensions (device_id, zone, tracker_type).

Position accuracy budgets: GPS alone drifts 5–10 meters in urban canyons. Indoor tracking needs WiFi RSSI fingerprinting or BLE trilateration. Document your accuracy assumption (±10m, ±50m) and adjust alert thresholds.

Geofence evaluation latency: Shapely’s contains() is fast (~1 microsecond per check), but if you have 1000 geofences, that’s 1ms per position. Pre-filter with bounding boxes or use R-tree spatial indexing if you scale beyond 100 zones.

Practical Recommendations

  1. Cluster EMQX for high availability: Run at least 3 EMQX nodes in Kubernetes. Devices will reconnect to another broker on failover. Configure persistent sessions with broker.session_lru_trim_size: 5000000.

  2. Batch writes to InfluxDB: Instead of single-message writes, buffer 100 positions and flush every 10 seconds. This cuts network round-trips by 100x.

  3. Use InfluxDB retention policies: Set a retention of 7 days for raw positions, 1 year for hourly aggregates. ALTER DATABASE tracking SET DEFAULT RETENTION "7d".

  4. Monitor geofence worker lag: If the geofence worker falls behind, devices will miss entry/exit events. Add Prometheus metrics to track message age in the queue.

  5. Encrypt MQTT traffic: Always use TLS (port 8883) in production. Self-signed certs are fine for internal clusters, but require client certificate verification.

  6. Test failover scenarios: Kill the influx-writer pod and verify another instance picks up the traffic. This is where shared subscriptions shine.

Frequently Asked Questions

What’s new in MQTT 5.0 vs 3.1.1?

MQTT 5.0 added user properties (attach arbitrary metadata to packets), topic aliases (reduce topic string size from 100+ bytes to 2), shared subscriptions (distribute work among multiple subscribers), and reason codes (fine-grained error reporting). For asset tracking, user properties and shared subscriptions are game-changers.

Can InfluxDB 3.0 handle geospatial queries?

Yes. InfluxDB 3.0’s DataFusion engine supports ST_Contains, ST_Distance, and ST_GeomFromText for PostGIS-compatible queries. You can query “all devices within 500m of point (lat, lon)” directly in SQL: SELECT * FROM device_positions WHERE ST_Distance(ST_Point(longitude, latitude), ST_Point(-74.0060, 40.7128)) < 500.

How many devices can one MQTT broker support?

A single EMQX 5.x node handles 5–10 million concurrent connections on modern hardware (64 GB RAM, NVMe SSD). For persistent messages and geofence processing, budget 100–500 devices per worker. A 3-node cluster can handle 10,000+ active devices easily.

How do you do geofencing on MQTT streams?

Subscribe to the position topic, evaluate each position against a set of geofences (polygons or circles) using a library like Shapely, and publish alerts when entry/exit events occur. A dedicated geofence worker decouples this from the ingestion pipeline so alerting doesn’t slow down writes.

Is InfluxDB 3.0 backward compatible with InfluxDB 2.x?

Partially. InfluxDB 3.0 is a rewrite with a new query engine (DataFusion, not Flux). It speaks the HTTP API and supports Flux queries, but performance and behavior differ. For new projects, use native SQL. For migrations, plan for reworking dashboards and alert rules.

Further Reading

Comments

No comments yet. Why don’t you start the discussion?

Leave a Reply

Your email address will not be published. Required fields are marked *