Build a Digital Twin for a CNC Machine: Step-by-Step 2026 Tutorial

Build a Digital Twin for a CNC Machine: Step-by-Step 2026 Tutorial

Build a Digital Twin for a CNC Machine: Step-by-Step 2026 Tutorial

If you have landed here, you want a CNC machine digital twin tutorial 2026 that does not stop at slideware. This guide walks through building a working twin of a 3-axis vertical machining centre (VMC) end to end: MTConnect ingestion from a Fanuc/Heidenhain/Siemens control, normalisation into InfluxDB, a physics-informed spindle thermal and cutting-force estimator backed by scikit-learn, a FastAPI twin runtime, a provisioned Grafana dashboard, a hybrid rule + ML anomaly detector, and a validation loop against thermocouple and dynamometer ground truth. Everything ships as a reproducible Docker stack. By the end, you will have a twin a process engineer can actually use on the shop floor — not a CAD render with telemetry pinned to it.

This is written for the engineer who has to deliver something this quarter. Senior manufacturing engineer + Python developer voice throughout. Indian/UK spelling. Real code, real libraries, honest trade-offs.

CNC machine digital twin tutorial 2026 — system architecture from Fanuc/Heidenhain/Siemens controls through MTConnect ingestion to twin runtime, Grafana and alerting

What we are building (and what we are not)

In scope:

  • A live data pipeline from the control to a time-series store at 1–10 Hz sampling.
  • A reduced-order physics model for spindle bearing temperature (lumped RC thermal network) and a Kienzle-style cutting-force estimator.
  • A machine-learning residual (RandomForest regressor) that corrects the physics estimate against measured data.
  • A FastAPI twin runtime that exposes /state, /predict, /twin/replay, and /health.
  • A Grafana dashboard with provisioning YAML so you do not click around in the UI for an hour.
  • A hybrid anomaly detector combining deterministic gates and IsolationForest.
  • A validation harness that compares predicted spindle temperature, cutting force, and vibration RMS against benchtop ground truth.

Out of scope:

  • High-fidelity FEM / CFD coupling. You will not solve Navier-Stokes on every cycle on a Raspberry Pi.
  • Tool-path simulation at G-code block level — that needs a kinematics kernel like VERICUT or NC Verify. We touch tool path only as context.
  • Full PLM integration with Teamcenter or 3DEXPERIENCE. We stop at a clean REST contract you can hand to your PLM team.
  • Closed-loop control. We are observing and predicting; we are not adjusting feed override automatically. That is a separate safety case.

If you are still mapping this onto the bigger picture, our IoT, digital twin and PLM overview explains where this twin sits in the wider stack.

Prerequisites (hardware, software, skills)

Hardware (minimum viable):

  • A CNC with one of: Fanuc 31i/32i, Heidenhain TNC640, or Siemens 840D sl. Most controls from 2018 onward expose either MTConnect (via the Fanuc FOCAS adapter or vendor-native agent) or OPC UA.
  • An edge gateway — anything from a Raspberry Pi 5 (for a single machine, fine) to a fanless x86 industrial PC (Advantech UNO-2484G or similar). 8 GB RAM, 128 GB SSD is plenty.
  • Optional but recommended: one tri-axial accelerometer (PCB 356A45 or IMI 603C01) on the spindle housing, one K-type thermocouple bonded near the front bearing, and a Hall-effect current clamp on the spindle drive line. These give you ground truth for validation.

Software stack:

  • Docker 24.x and Docker Compose v2 on the edge gateway.
  • Python 3.11. We use httpx, lxml, pandas, scikit-learn, fastapi, uvicorn, pydantic v2, and influxdb-client.
  • InfluxDB 2.7 (or TimescaleDB 2.x if you prefer SQL; we will note the swap points).
  • Grafana 10.x.
  • An MTConnect agent — the reference implementation is at mtconnect/cppagent. For Fanuc, the FOCAS-MTConnect adapter is the standard route.

Skills:

  • Comfortable reading XML and JSON.
  • Decent Python — async, typing, the basics of pandas and sklearn.
  • A working understanding of a machining cycle: spindle, feed, depth of cut, what overload looks like.
  • Enough Linux to keep Docker upright.

Architecture overview

The architecture is intentionally boring. Boring scales; clever fails on Friday at 18:00.

Data flow from MTConnect XML through normalisation, persistence and twin solver to dashboard and anomaly detector

Four logical layers:

  1. OT layer — the control and any auxiliary sensors. We do not touch the PLC. Read-only.
  2. Edge layer — the MTConnect agent (or OPC UA server) plus a Python ingest process on the gateway. This is where retries, buffering and time-sync live.
  3. Twin core (IT) — InfluxDB, the FastAPI twin runtime, and the model artefacts. Runs on a small server in the IT DMZ or in your cloud account.
  4. Consumption — Grafana, Alertmanager, and an outbound webhook to your MES/PLM if you want it.

The separation matters: when IT pushes a Grafana upgrade and breaks something at 02:00, the edge keeps ingesting and buffering. When the network drops, the edge holds a 24-hour ring buffer. This is the single most important architectural decision in the whole tutorial.

For a worked example of how a Tier-1 OEM operationalises this kind of split at factory scale, see our Stellantis virtual factory analysis.

Step 1 — Ingest data with MTConnect

MTConnect is an open XML standard maintained by the MTConnect Institute. The agent exposes three REST endpoints we care about:

  • GET /probe — what data items exist on this device.
  • GET /current — latest value of every data item.
  • GET /sample?from=<seq>&count=<n> — a chronological slice from a starting sequence number. This is the one you will live in.

Spin up the reference agent against a Fanuc adapter with the published cppagent Docker image. Then write the poller. The goal is: never miss a sequence number, never block on the network, and always know what time it is.

# ingest/mtconnect_poller.py
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import AsyncIterator

import httpx
from lxml import etree

log = logging.getLogger("mtconnect")

NS = {"m": "urn:mtconnect.org:MTConnectStreams:2.0"}


@dataclass(frozen=True)
class Sample:
    timestamp: datetime
    device: str
    data_item_id: str
    name: str
    value: str
    sequence: int


class MTConnectPoller:
    def __init__(self, base_url: str, device: str, poll_hz: float = 10.0):
        self.base_url = base_url.rstrip("/")
        self.device = device
        self.interval = 1.0 / poll_hz
        self._next_seq: int | None = None
        self._client = httpx.AsyncClient(timeout=5.0)

    async def _fetch(self, path: str, **params) -> bytes:
        r = await self._client.get(f"{self.base_url}{path}", params=params)
        r.raise_for_status()
        return r.content

    async def _bootstrap(self) -> None:
        xml = await self._fetch(f"/{self.device}/current")
        root = etree.fromstring(xml)
        header = root.find("m:Header", NS)
        # nextSequence on /current is the seq AFTER the latest sample
        self._next_seq = int(header.get("nextSequence"))
        log.info("bootstrapped at seq=%s", self._next_seq)

    @staticmethod
    def _parse_samples(xml: bytes) -> list[Sample]:
        root = etree.fromstring(xml)
        out: list[Sample] = []
        # MTConnect groups DataItems under Streams/DeviceStream/ComponentStream
        for el in root.iter():
            tag = etree.QName(el).localname
            if tag in {"Samples", "Events", "Condition", "Header"}:
                continue
            seq = el.get("sequence")
            ts = el.get("timestamp")
            di = el.get("dataItemId")
            name = el.get("name") or tag
            if not (seq and ts and di and el.text is not None):
                continue
            out.append(Sample(
                timestamp=datetime.fromisoformat(ts.replace("Z", "+00:00")),
                device=el.get("device", ""),
                data_item_id=di,
                name=name,
                value=el.text.strip(),
                sequence=int(seq),
            ))
        return out

    async def stream(self) -> AsyncIterator[Sample]:
        await self._bootstrap()
        while True:
            try:
                xml = await self._fetch(
                    f"/{self.device}/sample",
                    **{"from": self._next_seq, "count": 1000},
                )
                samples = self._parse_samples(xml)
                if samples:
                    self._next_seq = samples[-1].sequence + 1
                    for s in samples:
                        yield s
            except httpx.HTTPError as e:
                log.warning("mtconnect fetch failed: %s", e)
                await asyncio.sleep(2.0)
                continue
            await asyncio.sleep(self.interval)


async def main() -> None:
    logging.basicConfig(level=logging.INFO)
    poller = MTConnectPoller("http://edge-gateway:5000", device="VMC01")
    async for sample in poller.stream():
            print(sample.timestamp.isoformat(), sample.name, sample.value)


if __name__ == "__main__":
    asyncio.run(main())

Two things worth flagging:

  • Sequence tracking, not wall clock. MTConnect agents guarantee monotonic sequences. If you poll on time, you will eventually miss a window. If you poll on sequence, the agent tells you exactly what you missed.
  • Time zone hygiene. The agent emits ISO-8601 UTC. Parse to a timezone-aware datetime. Resist the urge to use datetime.now() anywhere in the ingest path; on a shop with a misconfigured NTP server you will spend a Saturday hunting a phantom drift.

For the data-item inventory, hit /probe once at startup and cache the mapping dataItemId → (subType, units, nativeUnits). You will need it in the next step.

Step 2 — Normalize and persist (InfluxDB/Timescale)

Raw MTConnect names vary by control. A Fanuc adapter emits Sspeed, Spindle_load, Xact, Yact, Zact, block, path_feedrate. A Heidenhain agent uses different IDs. Normalise once at the edge into a canonical schema so the twin and dashboards do not need to know about brand-specific spellings.

# ingest/normaliser.py
from dataclasses import dataclass
from typing import Mapping

CANONICAL = {
    # Fanuc-ish
    "Sspeed":         ("spindle_rpm",        "rev/min", float),
    "Spindle_load":   ("spindle_load_pct",   "percent", float),
    "Xact":           ("axis_x_pos_mm",      "mm",      float),
    "Yact":           ("axis_y_pos_mm",      "mm",      float),
    "Zact":           ("axis_z_pos_mm",      "mm",      float),
    "path_feedrate":  ("feed_mm_min",        "mm/min",  float),
    "block":          ("nc_block",           "text",    str),
    "execution":      ("exec_state",         "text",    str),
    # Heidenhain
    "S1Actual":       ("spindle_rpm",        "rev/min", float),
    "S1Load":         ("spindle_load_pct",   "percent", float),
    # Siemens 840D sl
    "actSpeed1":      ("spindle_rpm",        "rev/min", float),
    "actLoad1":       ("spindle_load_pct",   "percent", float),
}


@dataclass(frozen=True)
class Normalised:
    timestamp_ns: int
    machine: str
    tag: str
    value: float | str
    units: str


def normalise(sample, machine: str) -> Normalised | None:
    rule = CANONICAL.get(sample.name)
    if rule is None:
        return None
    tag, units, cast = rule
    try:
        v = cast(sample.value)
    except (ValueError, TypeError):
        return None
    if isinstance(v, float):
        # sanity bounds — drop obvious garbage early
        if tag == "spindle_load_pct" and not (0.0 <= v <= 200.0):
            return None
        if tag == "spindle_rpm" and not (0.0 <= v <= 60_000.0):
            return None
    return Normalised(
        timestamp_ns=int(sample.timestamp.timestamp() * 1e9),
        machine=machine,
        tag=tag,
        value=v,
        units=units,
    )

Now persist. With InfluxDB 2.x:

# ingest/sink_influx.py
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import ASYNCHRONOUS


class InfluxSink:
    def __init__(self, url: str, token: str, org: str, bucket: str):
        self._client = InfluxDBClient(url=url, token=token, org=org)
        self._write = self._client.write_api(write_options=ASYNCHRONOUS)
        self._bucket = bucket
        self._org = org

    def write(self, n) -> None:
        p = (
            Point("cnc")
            .tag("machine", n.machine)
            .tag("tag", n.tag)
            .tag("units", n.units)
            .time(n.timestamp_ns, WritePrecision.NS)
        )
        if isinstance(n.value, (int, float)):
            p = p.field("value", float(n.value))
        else:
            p = p.field("text", str(n.value))
        self._write.write(bucket=self._bucket, org=self._org, record=p)

    def close(self) -> None:
        self._write.close()
        self._client.close()

If you prefer TimescaleDB, the schema is equally simple:

CREATE TABLE cnc_samples (
    ts          TIMESTAMPTZ      NOT NULL,
    machine     TEXT             NOT NULL,
    tag         TEXT             NOT NULL,
    value_num   DOUBLE PRECISION,
    value_txt   TEXT,
    units       TEXT
);
SELECT create_hypertable('cnc_samples', 'ts', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX ON cnc_samples (machine, tag, ts DESC);

Use whichever your team will actually maintain. InfluxDB Flux is friendlier for downsampling; Timescale is friendlier if you already have a Postgres operator on staff.

Wire the three together:

# ingest/run.py
import asyncio
import os

from .mtconnect_poller import MTConnectPoller
from .normaliser import normalise
from .sink_influx import InfluxSink


async def main():
    poller = MTConnectPoller(
        base_url=os.environ["MTC_URL"],
        device=os.environ["MTC_DEVICE"],
        poll_hz=float(os.environ.get("MTC_HZ", "10")),
    )
    sink = InfluxSink(
        url=os.environ["INFLUX_URL"],
        token=os.environ["INFLUX_TOKEN"],
        org=os.environ["INFLUX_ORG"],
        bucket=os.environ["INFLUX_BUCKET"],
    )
    machine = os.environ["MTC_DEVICE"]
    try:
        async for s in poller.stream():
            n = normalise(s, machine)
            if n is not None:
                sink.write(n)
    finally:
        sink.close()


if __name__ == "__main__":
    asyncio.run(main())

That is your ingest service. Put it in a Docker container behind a restart policy. Move on.

Step 3 — Physics-informed model (spindle thermal + cutting force estimate)

This is where most “digital twin” projects collapse into dashboards. Resist. A twin needs at least one model that says what should be happening so you can compare against what is happening.

We will keep it pragmatic. Two physics models, both reducible to a handful of parameters.

3.1 Spindle bearing thermal — lumped RC network

Treat the spindle housing as a single thermal capacitance with conduction to ambient and convection from cooling. Heat is injected proportional to spindle load and the square of spindle speed (rough proxy for bearing friction losses).

C · dT/dt = α · load · rpm² + β · rpm − (T − T_ambient) / R

In discrete time, this is a one-line update. Fit α, β, R, C from a warm-up cycle.

# twin/physics.py
import numpy as np
from dataclasses import dataclass


@dataclass
class ThermalParams:
    alpha: float   # W per (load_pct * rpm^2)
    beta: float    # W per rpm
    R: float       # K / W
    C: float       # J / K


def step_spindle_temp(
    T_prev: float, T_amb: float, load_pct: float, rpm: float,
    dt: float, p: ThermalParams,
) -> float:
    P_gen = p.alpha * load_pct * rpm * rpm + p.beta * rpm
    P_loss = (T_prev - T_amb) / p.R
    dT = (P_gen - P_loss) * dt / p.C
    return T_prev + dT


def simulate(T0: float, T_amb: np.ndarray, load: np.ndarray,
             rpm: np.ndarray, dt: float, p: ThermalParams) -> np.ndarray:
    T = np.empty_like(T_amb)
    T[0] = T0
    for k in range(1, len(T_amb)):
        T[k] = step_spindle_temp(T[k-1], T_amb[k-1], load[k-1], rpm[k-1], dt, p)
    return T

Fit it once against a 30–60 minute warm-up log with a thermocouple bonded near the front bearing. scipy.optimize.minimize on RMSE is plenty; you do not need anything more sophisticated. Expect a steady-state temperature error of 1–3 K after fitting, which is enough to flag a cooling system fault long before it scraps a part.

3.2 Kienzle cutting force estimate

For end-milling, the tangential cutting force per tooth follows the Kienzle equation:

F_t = k_c1.1 · a_p · h^(1 - m_c)

where a_p is axial depth of cut, h is the average chip thickness, k_c1.1 is the specific cutting force at 1 mm² chip section, and m_c is a material exponent. For common workpiece materials, k_c1.1 and m_c are tabulated (Kistler, Sandvik handbooks). Average chip thickness for slotting is h ≈ f_z · sin(φ), simplified to h ≈ f_z · (a_e / D) for partial radial engagement.

You cannot read a_p, a_e or f_z directly from MTConnect. You can read the spindle load and back-calculate using:

P_cut = F_t · v_cF_t = P_cut / v_c, with v_c = π · D · rpm / 60.

# twin/cutting_force.py
import math


def estimate_cutting_force(spindle_load_pct: float, spindle_rpm: float,
                           rated_power_kw: float, tool_diameter_mm: float,
                           efficiency: float = 0.85) -> float:
    """Return estimated tangential cutting force [N]."""
    if spindle_rpm < 1.0 or tool_diameter_mm < 0.1:
        return 0.0
    P_mech = (spindle_load_pct / 100.0) * rated_power_kw * 1000.0 * efficiency  # W
    v_c = math.pi * tool_diameter_mm * 1e-3 * spindle_rpm / 60.0  # m/s
    if v_c < 1e-3:
        return 0.0
    return P_mech / v_c

This is approximate. It conflates friction in the drivetrain with cutting load. But run it against a Kistler 9257B dynamometer for a few representative cuts and you will find errors of 10–20 % — perfectly adequate for tool-wear trending and feed-override sanity checks.

3.3 ML residual — let scikit-learn clean up

Physics is biased and ML is variance-prone. The standard trick is to learn the residual: model what the physics gets wrong as a function of context.

# twin/ml_residual.py
import joblib
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler


FEATURES = [
    "spindle_rpm", "spindle_load_pct", "feed_mm_min",
    "axis_x_pos_mm", "axis_y_pos_mm", "axis_z_pos_mm",
    "T_amb", "T_physics",
]
TARGET = "T_measured"


def build_pipeline() -> Pipeline:
    return Pipeline([
        ("scale", StandardScaler()),
        ("rf", RandomForestRegressor(
            n_estimators=300, max_depth=14, min_samples_leaf=5,
            n_jobs=-1, random_state=42,
        )),
    ])


def fit(df: pd.DataFrame, path: str) -> dict:
    df = df.dropna(subset=FEATURES + [TARGET])
    df["residual"] = df[TARGET] - df["T_physics"]
    X = df[FEATURES].to_numpy()
    y = df["residual"].to_numpy()
    pipe = build_pipeline()
    pipe.fit(X, y)
    joblib.dump({"pipeline": pipe, "features": FEATURES}, path)
    pred = pipe.predict(X) + df["T_physics"].to_numpy()
    mae = float(np.mean(np.abs(pred - df[TARGET].to_numpy())))
    rmse = float(np.sqrt(np.mean((pred - df[TARGET].to_numpy()) ** 2)))
    return {"mae_K": mae, "rmse_K": rmse, "n": int(len(df))}


def predict(model_path: str, df: pd.DataFrame) -> np.ndarray:
    obj = joblib.load(model_path)
    X = df[obj["features"]].to_numpy()
    return obj["pipeline"].predict(X) + df["T_physics"].to_numpy()

A RandomForest with 300 trees fits in seconds on a year of 1 Hz data and is robust to the messy categorical context (different tools, different programs). If you reach for transformers or LSTMs first, ask yourself honestly whether you have the labelled data to justify them. The answer in a job shop is almost always no.

Step 4 — Twin runtime as a service

The twin runtime is a small FastAPI service that:

  • Holds the latest state in a Redis cache (or in-memory if you are single-instance — fine for a single machine).
  • Exposes a /state endpoint for the dashboard and downstream MES.
  • Exposes a /predict endpoint that runs the physics + ML estimate on a supplied window.
  • Exposes a /twin/replay endpoint that re-runs the twin over a historical window — invaluable for incident reviews.

Twin service internals: state cache, physics module, ML module, fusion layer and REST API

# twin/service.py
from __future__ import annotations

from datetime import datetime
from typing import Annotated

import joblib
import numpy as np
import pandas as pd
from fastapi import Depends, FastAPI, HTTPException
from pydantic import BaseModel, Field, ConfigDict

from .physics import ThermalParams, simulate
from .cutting_force import estimate_cutting_force


class TwinConfig(BaseModel):
    model_config = ConfigDict(frozen=True)
    rated_power_kw: float = 15.0
    default_tool_d_mm: float = 12.0
    thermal: ThermalParams = ThermalParams(
        alpha=1.8e-8, beta=1.2e-3, R=0.18, C=14_500.0,
    )


class StateOut(BaseModel):
    timestamp: datetime
    machine: str
    spindle_rpm: float
    spindle_load_pct: float
    spindle_temp_estimated_C: float
    cutting_force_estimated_N: float
    exec_state: str


class PredictIn(BaseModel):
    machine: str
    T0_C: float = Field(ge=-20.0, le=120.0)
    T_amb_C: list[float]
    spindle_load_pct: list[float]
    spindle_rpm: list[float]
    dt_s: float = Field(gt=0.0, le=60.0)
    tool_diameter_mm: float | None = None


class PredictOut(BaseModel):
    T_spindle_C: list[float]
    F_cut_N: list[float]


app = FastAPI(title="CNC Digital Twin", version="1.0.0")
config = TwinConfig()
_state: dict[str, StateOut] = {}
_ml_model = None  # lazy load


def get_ml():
    global _ml_model
    if _ml_model is None:
        try:
            _ml_model = joblib.load("/models/spindle_thermal_residual.joblib")
        except FileNotFoundError:
            _ml_model = None
    return _ml_model


@app.get("/health")
def health() -> dict:
    return {"status": "ok", "model_loaded": get_ml() is not None}


@app.get("/state/{machine}", response_model=StateOut)
def get_state(machine: str) -> StateOut:
    s = _state.get(machine)
    if s is None:
        raise HTTPException(404, f"no state for {machine}")
    return s


@app.post("/predict", response_model=PredictOut)
def predict(body: PredictIn, ml=Depends(get_ml)) -> PredictOut:
    n = len(body.T_amb_C)
    if not (len(body.spindle_load_pct) == len(body.spindle_rpm) == n):
        raise HTTPException(400, "input arrays must be equal length")

    T_amb = np.asarray(body.T_amb_C, dtype=float)
    load = np.asarray(body.spindle_load_pct, dtype=float)
    rpm = np.asarray(body.spindle_rpm, dtype=float)

    T_phys = simulate(body.T0_C, T_amb, load, rpm, body.dt_s, config.thermal)

    if ml is not None:
        df = pd.DataFrame({
            "spindle_rpm": rpm,
            "spindle_load_pct": load,
            "feed_mm_min": np.zeros(n),
            "axis_x_pos_mm": np.zeros(n),
            "axis_y_pos_mm": np.zeros(n),
            "axis_z_pos_mm": np.zeros(n),
            "T_amb": T_amb,
            "T_physics": T_phys,
        })
        residual = ml["pipeline"].predict(df[ml["features"]].to_numpy())
        T_out = T_phys + residual
    else:
        T_out = T_phys

    d = body.tool_diameter_mm or config.default_tool_d_mm
    F_cut = [
        estimate_cutting_force(l, r, config.rated_power_kw, d)
        for l, r in zip(load.tolist(), rpm.tolist())
    ]
    return PredictOut(T_spindle_C=T_out.tolist(), F_cut_N=F_cut)

Notes on choices:

  • Pydantic v2 for I/O contracts. This is your single source of truth for what /predict expects; FastAPI will produce an OpenAPI spec for free, which is what your MES/PLM integrator will ask for in week two.
  • Lazy model load. The service must come up green even if the model file is missing; you do not want the entire twin red-flagged because someone fat-fingered the model path.
  • No background tasks in the runtime. Keep the runtime stateless apart from the cache. Anything that needs scheduling lives in a separate worker.

Dockerise:

# twin/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY twin/ ./twin/
EXPOSE 8080
CMD ["uvicorn", "twin.service:app", "--host", "0.0.0.0", "--port", "8080"]

And a docker-compose.yml to stand the whole thing up:

services:
  influx:
    image: influxdb:2.7
    ports: ["8086:8086"]
    environment:
      DOCKER_INFLUXDB_INIT_MODE: setup
      DOCKER_INFLUXDB_INIT_USERNAME: admin
      DOCKER_INFLUXDB_INIT_PASSWORD: change-me-please
      DOCKER_INFLUXDB_INIT_ORG: shopfloor
      DOCKER_INFLUXDB_INIT_BUCKET: cnc_raw
      DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: dev-token-only
    volumes: ["influx-data:/var/lib/influxdb2"]

  mtconnect:
    image: ghcr.io/mtconnect/cppagent:latest
    ports: ["5000:5000"]
    volumes: ["./agent.cfg:/mtconnect/agent.cfg:ro"]

  ingest:
    build: ./ingest
    environment:
      MTC_URL: http://mtconnect:5000
      MTC_DEVICE: VMC01
      MTC_HZ: "10"
      INFLUX_URL: http://influx:8086
      INFLUX_TOKEN: dev-token-only
      INFLUX_ORG: shopfloor
      INFLUX_BUCKET: cnc_raw
    depends_on: [mtconnect, influx]
    restart: unless-stopped

  twin:
    build: ./twin
    ports: ["8080:8080"]
    volumes: ["./models:/models:ro"]
    depends_on: [influx]
    restart: unless-stopped

  grafana:
    image: grafana/grafana:10.4.2
    ports: ["3000:3000"]
    volumes:
      - "./grafana/provisioning:/etc/grafana/provisioning:ro"
      - "grafana-data:/var/lib/grafana"
    depends_on: [influx]

volumes:
  influx-data: {}
  grafana-data: {}

docker compose up -d and you have a stack.

Step 5 — Dashboard (Grafana provisioning)

Click-and-drag Grafana setup belongs in 2018. Provision everything as code so the dashboard travels with the repo.

grafana/provisioning/datasources/influx.yml:

apiVersion: 1
datasources:
  - name: InfluxDB
    type: influxdb
    access: proxy
    url: http://influx:8086
    jsonData:
      version: Flux
      organization: shopfloor
      defaultBucket: cnc_raw
      tlsSkipVerify: true
    secureJsonData:
      token: dev-token-only
    isDefault: true
    editable: true

grafana/provisioning/dashboards/dashboards.yml:

apiVersion: 1
providers:
  - name: cnc-twin
    orgId: 1
    folder: CNC Twin
    type: file
    disableDeletion: false
    updateIntervalSeconds: 30
    options:
      path: /etc/grafana/provisioning/dashboards/json

Then drop dashboard JSON files into grafana/provisioning/dashboards/json/. Build a starter dashboard with four panels:

  1. Spindle load + RPM time-series, 1 h window, 1 s resolution.
  2. Estimated vs measured spindle temperature — two series on one axis, with the residual on a second Y axis.
  3. Estimated cutting force time-series with annotations for tool change events (parse block for T words).
  4. Anomaly score as a state-timeline panel.

Use Flux for the temperature comparison panel:

from(bucket: "cnc_twin_state")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "twin")
  |> filter(fn: (r) => r._field == "T_spindle_C" or r._field == "T_measured_C")
  |> aggregateWindow(every: 1s, fn: mean, createEmpty: false)
  |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
  |> map(fn: (r) => ({ r with residual_K: r.T_measured_C - r.T_spindle_C }))

That pivot is the trick that makes the dashboard readable. Tag every panel with the machine name so the same JSON works across a fleet — Grafana variables of type query make this trivial.

Step 6 — Anomaly alerting (rules + ML hybrid)

A pure-ML anomaly detector on machine tools alerts on everything in the first week and is muted by week three. A pure-rules detector misses subtle drift. Combine both, gate them with dwell time, and route the result.

Anomaly detector composition: rule-based gates plus ML score plus dwell timer plus alert routing

# twin/anomaly.py
from __future__ import annotations

from collections import deque
from dataclasses import dataclass, field
from typing import Iterable

import joblib
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler


ANOMALY_FEATURES = [
    "spindle_load_pct", "spindle_rpm", "feed_mm_min",
    "T_spindle_C", "vib_rms_g",
]


def train_isoforest(df: pd.DataFrame, path: str) -> None:
    pipe = Pipeline([
        ("scale", StandardScaler()),
        ("if",    IsolationForest(
            n_estimators=200, contamination=0.01, random_state=42, n_jobs=-1,
        )),
    ])
    pipe.fit(df[ANOMALY_FEATURES].to_numpy())
    joblib.dump({"pipeline": pipe, "features": ANOMALY_FEATURES}, path)


@dataclass
class Detector:
    model_path: str
    load_max_pct: float = 110.0
    dT_max_per_min: float = 5.0
    cross_signal_load_pct: float = 60.0
    score_threshold: float = -0.05  # negative = more anomalous for IF
    dwell_windows: int = 3

    _hits: deque = field(default_factory=lambda: deque(maxlen=10))
    _model = None

    def __post_init__(self):
        self._model = joblib.load(self.model_path)

    def _rule_gate(self, row: pd.Series, prev: pd.Series | None) -> str | None:
        if row["spindle_load_pct"] > self.load_max_pct:
            return "spindle_load_over_limit"
        if prev is not None:
            dt_min = (row.name - prev.name).total_seconds() / 60.0
            if dt_min > 0:
                dTdt = (row["T_spindle_C"] - prev["T_spindle_C"]) / dt_min
                if dTdt > self.dT_max_per_min:
                    return "thermal_runaway"
        if (row["spindle_load_pct"] > self.cross_signal_load_pct
                and row["feed_mm_min"] < 1.0):
            return "load_without_feed"
        return None

    def _ml_score(self, row: pd.Series) -> float:
        X = np.asarray([row[f] for f in self._model["features"]]).reshape(1, -1)
        # IsolationForest.score_samples: higher = more normal
        return float(self._model["pipeline"].named_steps["if"].score_samples(
            self._model["pipeline"].named_steps["scale"].transform(X)
        )[0])

    def step(self, row: pd.Series, prev: pd.Series | None) -> dict | None:
        rule = self._rule_gate(row, prev)
        score = self._ml_score(row)
        ml_hit = score < self.score_threshold
        hit = rule is not None or ml_hit
        self._hits.append(hit)
        recent = list(self._hits)[-self.dwell_windows:]
        if len(recent) < self.dwell_windows or not all(recent):
            return None
        return {
            "ts": row.name.isoformat(),
            "severity": "high" if rule is not None else "medium",
            "rule": rule,
            "ml_score": score,
            "values": row.to_dict(),
        }


def run(df: pd.DataFrame, det: Detector) -> Iterable[dict]:
    prev = None
    for ts, row in df.iterrows():
        row.name = ts
        alert = det.step(row, prev)
        if alert:
            yield alert
        prev = row

Routing is a downstream concern; the detector emits structured events and a small worker pushes them to Slack, Teams or your MES bus. Keep the detector pure (no I/O) — it is the easiest way to unit-test the thing.

Tune the score_threshold against a labelled week of data. A useful starting point: pick the 1st percentile of score_samples on a known-good week and round to two decimals.

Step 7 — Validation against ground truth

A twin without validation is a dashboard. The validation loop has three signals worth comparing:

  • Spindle bearing temperature vs the bonded thermocouple. Target: MAE under 2 K, RMSE under 3 K over a one-hour window.
  • Estimated cutting force vs a Kistler dynamometer for a representative cut. Target: MAPE under 20 % for the cutting portion (ignore non-cutting blocks).
  • Vibration RMS vs the on-spindle accelerometer band-limited to 10–1000 Hz. Target: trend correlation r > 0.85.

Validation loop: predicted vs measured spindle temperature, cutting force, vibration RMS

A small offline harness, run nightly, is enough:

# validation/harness.py
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error, mean_squared_error


def score(pred: pd.Series, truth: pd.Series) -> dict:
    df = pd.concat([pred.rename("p"), truth.rename("t")], axis=1).dropna()
    if len(df) < 60:
        return {"n": int(len(df))}
    mae = mean_absolute_error(df.t, df.p)
    rmse = float(np.sqrt(mean_squared_error(df.t, df.p)))
    mape = float(np.mean(np.abs((df.t - df.p) / df.t.replace(0, np.nan)).dropna()) * 100.0)
    corr = float(df.t.corr(df.p))
    return {"n": int(len(df)), "mae": float(mae), "rmse": rmse,
            "mape_pct": mape, "corr": corr}

Push the metrics back into InfluxDB under a cnc_twin_validation measurement and put them on a dashboard the maintenance lead actually looks at. The moment MAE drifts above your budget for two consecutive nights, retrain the residual.

Track the model versions in MLflow (or, honestly, a Git tag and a JSON manifest if you do not want yet another service). When a challenger model beats the champion on a 24-hour holdout window, promote it — but keep the champion warm for 48 hours so you can roll back without redeploying containers.

Trade-offs, gotchas, and where this falls short of full digital twin

Honest list, ordered by how often they bite:

  • Latency vs fidelity. At 10 Hz polling, you will see spindle load steps but you will alias high-frequency vibration. If you need real chatter detection, you need a dedicated DAQ at 10 kHz+ and a separate edge stream — MTConnect alone will not get you there. The MTConnect Institute is honest about this in its Sample Interpretation documentation.
  • Sensor coverage gaps. Most production CNCs do not have a spindle thermocouple. You can run the physics-only branch without it, but the validation loop becomes weak. Budget for one thermocouple + one accelerometer per “golden machine” and propagate the fitted parameters to siblings.
  • Sim-to-real gap on cutting force. The Kienzle approach assumes you know the material and the tool engagement. In a job shop with mixed materials and one-off parts, expect 30–50 % errors on absolute force. Trend-based use (force is rising for the same program) is still valuable; absolute thresholds are not.
  • Time sync. PTP (IEEE 1588) on the OT network is ideal. NTP from the gateway will get you within 50 ms on a quiet network and 500 ms when someone is downloading a CAD model. For 10 Hz data this is fine; for cross-signal correlations at 1 kHz it is not.
  • IT/OT politics. The hardest battle is not technical. Asking IT to open a port from the shop VLAN to a cloud InfluxDB will generate a six-week security review. Plan for it. The mitigations: keep the InfluxDB on-prem in the IT DMZ, push only aggregated metrics outward, and put the OpenAPI spec of the twin runtime in front of the security team early.
  • Vendor lock at the edge. A Fanuc FOCAS adapter is closed-source. If the machine is upgraded, the adapter may need re-licensing. Build the rest of the stack on the canonical schema so an adapter swap is a one-day job.
  • “Digital twin” hype. What we have built is a reduced-order, data-driven twin with a validation loop. It is not a high-fidelity FEM that lets you simulate fixture stiffness or thermal distortion of the bed casting in real time. If your CFO has been sold the latter, manage expectations early. NIST’s framing in NISTIR 8356 on digital-twin trustworthiness is a useful reference to ground that conversation.

Practical recommendations

A short list of things that will save you a fortnight, in rough order of return on time:

  1. Pin every dependency. requirements.txt with == exact versions. Random Forest reproducibility depends on the scikit-learn version. You will be thankful when the model needs to be retrained two years from now.
  2. Persist raw and derived separately. Two InfluxDB buckets — cnc_raw and cnc_twin_state. You will want to reprocess raw with new models without throwing away history.
  3. Write a CLI, not just a service. python -m twin.cli replay --machine VMC01 --from 2026-05-01 --to 2026-05-02 is worth its weight when an incident happens at 03:00.
  4. Capture exec_state transitions as events. A simple state machine on top of the MTConnect execution field tells you cycle time more reliably than counting button presses.
  5. Take a “golden week.” Pick a representative week of production output and freeze it as your validation set. Every model change is scored against it before merging. This is the closest thing to CI you will get in industrial ML.
  6. Train your operators on the dashboard before training your ML model. A dashboard the operator trusts will surface 80 % of the value. Sequence accordingly.
  7. Buy, do not build, the MTConnect agent. Use mtconnect/cppagent for native protocols and a vendor adapter (Fanuc, MEMEX, Memex MERLIN, etc.) for the control. Building your own FOCAS bridge in 2026 is not a good use of senior engineering time.

FAQ

Q1. Why InfluxDB rather than just dumping to Parquet on S3?
Twin runtimes want second-level reads of the last few minutes for state, and minute-level reads of the last day for predict. A time-series store handles both with one query language and downsampling tasks. Parquet on object storage is excellent for archival and offline retraining; it is poor for the dashboard. Use both.

Q2. Can I do this with OPC UA instead of MTConnect?
Yes. OPC UA via the asyncua Python library follows exactly the same shape — subscribe to nodes, normalise, persist. MTConnect’s advantage is a richer machine-tool domain model and a public schema; OPC UA’s advantage is broader vendor coverage outside machine tools. For a CNC, MTConnect is usually the faster path.

Q3. How long does the warm-up data need to be?
For the thermal model: 30–60 minutes from cold start to thermal steady state, sampled at 1 Hz. For the residual model: at least 40 hours of cutting time across the part programs you actually run. For the IsolationForest detector: one week of “normal” operation including idle, warm-up, tool change and cutting.

Q4. The Kienzle estimate is bad on my machine. Why?
Three usual suspects: (a) the rated spindle power in TwinConfig does not match the drive’s actual rating, (b) the drive efficiency is assumed at 0.85 and yours is closer to 0.7 at low load, (c) the spindle load percentage on Fanuc controls is a peak-hold value, not an instantaneous one. Replace the load source with the spindle motor current (Hall clamp or drive parameter) for a step-change improvement.

Q5. How do I extend this to a fleet of machines?
Run one ingest container per machine, one InfluxDB cluster, one twin runtime that takes machine as a path parameter, and one Grafana with templated variables. Fit thermal parameters per machine; share the ML pipeline as a class but train per-machine residuals — bearing aging is per-machine.

Q6. Where does PLM come in?
At two points. First, the as-designed tool list and material data come from PLM (Teamcenter, 3DEXPERIENCE, Aras). Second, the validated as-built record — what was actually cut, with what spindle temperature profile, against what twin prediction — goes back to PLM as part of the digital thread for that serial number. Keep the twin runtime PLM-agnostic; let an integration worker do the talking.

Further reading

Comments

No comments yet. Why don’t you start the discussion?

Leave a Reply

Your email address will not be published. Required fields are marked *