Build a Digital Twin for a CNC Machine: Step-by-Step 2026 Tutorial
If you have landed here, you want a CNC machine digital twin tutorial 2026 that does not stop at slideware. This guide walks through building a working twin of a 3-axis vertical machining centre (VMC) end to end: MTConnect ingestion from a Fanuc/Heidenhain/Siemens control, normalisation into InfluxDB, a physics-informed spindle thermal and cutting-force estimator backed by scikit-learn, a FastAPI twin runtime, a provisioned Grafana dashboard, a hybrid rule + ML anomaly detector, and a validation loop against thermocouple and dynamometer ground truth. Everything ships as a reproducible Docker stack. By the end, you will have a twin a process engineer can actually use on the shop floor — not a CAD render with telemetry pinned to it.
This is written for the engineer who has to deliver something this quarter. Senior manufacturing engineer + Python developer voice throughout. Indian/UK spelling. Real code, real libraries, honest trade-offs.

What we are building (and what we are not)
In scope:
- A live data pipeline from the control to a time-series store at 1–10 Hz sampling.
- A reduced-order physics model for spindle bearing temperature (lumped RC thermal network) and a Kienzle-style cutting-force estimator.
- A machine-learning residual (RandomForest regressor) that corrects the physics estimate against measured data.
- A FastAPI twin runtime that exposes
/state,/predict,/twin/replay, and/health. - A Grafana dashboard with provisioning YAML so you do not click around in the UI for an hour.
- A hybrid anomaly detector combining deterministic gates and IsolationForest.
- A validation harness that compares predicted spindle temperature, cutting force, and vibration RMS against benchtop ground truth.
Out of scope:
- High-fidelity FEM / CFD coupling. You will not solve Navier-Stokes on every cycle on a Raspberry Pi.
- Tool-path simulation at G-code block level — that needs a kinematics kernel like VERICUT or NC Verify. We touch tool path only as context.
- Full PLM integration with Teamcenter or 3DEXPERIENCE. We stop at a clean REST contract you can hand to your PLM team.
- Closed-loop control. We are observing and predicting; we are not adjusting feed override automatically. That is a separate safety case.
If you are still mapping this onto the bigger picture, our IoT, digital twin and PLM overview explains where this twin sits in the wider stack.
Prerequisites (hardware, software, skills)
Hardware (minimum viable):
- A CNC with one of: Fanuc 31i/32i, Heidenhain TNC640, or Siemens 840D sl. Most controls from 2018 onward expose either MTConnect (via the Fanuc FOCAS adapter or vendor-native agent) or OPC UA.
- An edge gateway — anything from a Raspberry Pi 5 (for a single machine, fine) to a fanless x86 industrial PC (Advantech UNO-2484G or similar). 8 GB RAM, 128 GB SSD is plenty.
- Optional but recommended: one tri-axial accelerometer (PCB 356A45 or IMI 603C01) on the spindle housing, one K-type thermocouple bonded near the front bearing, and a Hall-effect current clamp on the spindle drive line. These give you ground truth for validation.
Software stack:
- Docker 24.x and Docker Compose v2 on the edge gateway.
- Python 3.11. We use
httpx,lxml,pandas,scikit-learn,fastapi,uvicorn,pydanticv2, andinfluxdb-client. - InfluxDB 2.7 (or TimescaleDB 2.x if you prefer SQL; we will note the swap points).
- Grafana 10.x.
- An MTConnect agent — the reference implementation is at mtconnect/cppagent. For Fanuc, the FOCAS-MTConnect adapter is the standard route.
Skills:
- Comfortable reading XML and JSON.
- Decent Python — async, typing, the basics of
pandasandsklearn. - A working understanding of a machining cycle: spindle, feed, depth of cut, what overload looks like.
- Enough Linux to keep Docker upright.
Architecture overview
The architecture is intentionally boring. Boring scales; clever fails on Friday at 18:00.

Four logical layers:
- OT layer — the control and any auxiliary sensors. We do not touch the PLC. Read-only.
- Edge layer — the MTConnect agent (or OPC UA server) plus a Python ingest process on the gateway. This is where retries, buffering and time-sync live.
- Twin core (IT) — InfluxDB, the FastAPI twin runtime, and the model artefacts. Runs on a small server in the IT DMZ or in your cloud account.
- Consumption — Grafana, Alertmanager, and an outbound webhook to your MES/PLM if you want it.
The separation matters: when IT pushes a Grafana upgrade and breaks something at 02:00, the edge keeps ingesting and buffering. When the network drops, the edge holds a 24-hour ring buffer. This is the single most important architectural decision in the whole tutorial.
For a worked example of how a Tier-1 OEM operationalises this kind of split at factory scale, see our Stellantis virtual factory analysis.
Step 1 — Ingest data with MTConnect
MTConnect is an open XML standard maintained by the MTConnect Institute. The agent exposes three REST endpoints we care about:
GET /probe— what data items exist on this device.GET /current— latest value of every data item.GET /sample?from=<seq>&count=<n>— a chronological slice from a starting sequence number. This is the one you will live in.
Spin up the reference agent against a Fanuc adapter with the published cppagent Docker image. Then write the poller. The goal is: never miss a sequence number, never block on the network, and always know what time it is.
# ingest/mtconnect_poller.py
import asyncio
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import AsyncIterator
import httpx
from lxml import etree
log = logging.getLogger("mtconnect")
NS = {"m": "urn:mtconnect.org:MTConnectStreams:2.0"}
@dataclass(frozen=True)
class Sample:
timestamp: datetime
device: str
data_item_id: str
name: str
value: str
sequence: int
class MTConnectPoller:
def __init__(self, base_url: str, device: str, poll_hz: float = 10.0):
self.base_url = base_url.rstrip("/")
self.device = device
self.interval = 1.0 / poll_hz
self._next_seq: int | None = None
self._client = httpx.AsyncClient(timeout=5.0)
async def _fetch(self, path: str, **params) -> bytes:
r = await self._client.get(f"{self.base_url}{path}", params=params)
r.raise_for_status()
return r.content
async def _bootstrap(self) -> None:
xml = await self._fetch(f"/{self.device}/current")
root = etree.fromstring(xml)
header = root.find("m:Header", NS)
# nextSequence on /current is the seq AFTER the latest sample
self._next_seq = int(header.get("nextSequence"))
log.info("bootstrapped at seq=%s", self._next_seq)
@staticmethod
def _parse_samples(xml: bytes) -> list[Sample]:
root = etree.fromstring(xml)
out: list[Sample] = []
# MTConnect groups DataItems under Streams/DeviceStream/ComponentStream
for el in root.iter():
tag = etree.QName(el).localname
if tag in {"Samples", "Events", "Condition", "Header"}:
continue
seq = el.get("sequence")
ts = el.get("timestamp")
di = el.get("dataItemId")
name = el.get("name") or tag
if not (seq and ts and di and el.text is not None):
continue
out.append(Sample(
timestamp=datetime.fromisoformat(ts.replace("Z", "+00:00")),
device=el.get("device", ""),
data_item_id=di,
name=name,
value=el.text.strip(),
sequence=int(seq),
))
return out
async def stream(self) -> AsyncIterator[Sample]:
await self._bootstrap()
while True:
try:
xml = await self._fetch(
f"/{self.device}/sample",
**{"from": self._next_seq, "count": 1000},
)
samples = self._parse_samples(xml)
if samples:
self._next_seq = samples[-1].sequence + 1
for s in samples:
yield s
except httpx.HTTPError as e:
log.warning("mtconnect fetch failed: %s", e)
await asyncio.sleep(2.0)
continue
await asyncio.sleep(self.interval)
async def main() -> None:
logging.basicConfig(level=logging.INFO)
poller = MTConnectPoller("http://edge-gateway:5000", device="VMC01")
async for sample in poller.stream():
print(sample.timestamp.isoformat(), sample.name, sample.value)
if __name__ == "__main__":
asyncio.run(main())
Two things worth flagging:
- Sequence tracking, not wall clock. MTConnect agents guarantee monotonic sequences. If you poll on time, you will eventually miss a window. If you poll on sequence, the agent tells you exactly what you missed.
- Time zone hygiene. The agent emits ISO-8601 UTC. Parse to a timezone-aware
datetime. Resist the urge to usedatetime.now()anywhere in the ingest path; on a shop with a misconfigured NTP server you will spend a Saturday hunting a phantom drift.
For the data-item inventory, hit /probe once at startup and cache the mapping dataItemId → (subType, units, nativeUnits). You will need it in the next step.
Step 2 — Normalize and persist (InfluxDB/Timescale)
Raw MTConnect names vary by control. A Fanuc adapter emits Sspeed, Spindle_load, Xact, Yact, Zact, block, path_feedrate. A Heidenhain agent uses different IDs. Normalise once at the edge into a canonical schema so the twin and dashboards do not need to know about brand-specific spellings.
# ingest/normaliser.py
from dataclasses import dataclass
from typing import Mapping
CANONICAL = {
# Fanuc-ish
"Sspeed": ("spindle_rpm", "rev/min", float),
"Spindle_load": ("spindle_load_pct", "percent", float),
"Xact": ("axis_x_pos_mm", "mm", float),
"Yact": ("axis_y_pos_mm", "mm", float),
"Zact": ("axis_z_pos_mm", "mm", float),
"path_feedrate": ("feed_mm_min", "mm/min", float),
"block": ("nc_block", "text", str),
"execution": ("exec_state", "text", str),
# Heidenhain
"S1Actual": ("spindle_rpm", "rev/min", float),
"S1Load": ("spindle_load_pct", "percent", float),
# Siemens 840D sl
"actSpeed1": ("spindle_rpm", "rev/min", float),
"actLoad1": ("spindle_load_pct", "percent", float),
}
@dataclass(frozen=True)
class Normalised:
timestamp_ns: int
machine: str
tag: str
value: float | str
units: str
def normalise(sample, machine: str) -> Normalised | None:
rule = CANONICAL.get(sample.name)
if rule is None:
return None
tag, units, cast = rule
try:
v = cast(sample.value)
except (ValueError, TypeError):
return None
if isinstance(v, float):
# sanity bounds — drop obvious garbage early
if tag == "spindle_load_pct" and not (0.0 <= v <= 200.0):
return None
if tag == "spindle_rpm" and not (0.0 <= v <= 60_000.0):
return None
return Normalised(
timestamp_ns=int(sample.timestamp.timestamp() * 1e9),
machine=machine,
tag=tag,
value=v,
units=units,
)
Now persist. With InfluxDB 2.x:
# ingest/sink_influx.py
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import ASYNCHRONOUS
class InfluxSink:
def __init__(self, url: str, token: str, org: str, bucket: str):
self._client = InfluxDBClient(url=url, token=token, org=org)
self._write = self._client.write_api(write_options=ASYNCHRONOUS)
self._bucket = bucket
self._org = org
def write(self, n) -> None:
p = (
Point("cnc")
.tag("machine", n.machine)
.tag("tag", n.tag)
.tag("units", n.units)
.time(n.timestamp_ns, WritePrecision.NS)
)
if isinstance(n.value, (int, float)):
p = p.field("value", float(n.value))
else:
p = p.field("text", str(n.value))
self._write.write(bucket=self._bucket, org=self._org, record=p)
def close(self) -> None:
self._write.close()
self._client.close()
If you prefer TimescaleDB, the schema is equally simple:
CREATE TABLE cnc_samples (
ts TIMESTAMPTZ NOT NULL,
machine TEXT NOT NULL,
tag TEXT NOT NULL,
value_num DOUBLE PRECISION,
value_txt TEXT,
units TEXT
);
SELECT create_hypertable('cnc_samples', 'ts', chunk_time_interval => INTERVAL '1 day');
CREATE INDEX ON cnc_samples (machine, tag, ts DESC);
Use whichever your team will actually maintain. InfluxDB Flux is friendlier for downsampling; Timescale is friendlier if you already have a Postgres operator on staff.
Wire the three together:
# ingest/run.py
import asyncio
import os
from .mtconnect_poller import MTConnectPoller
from .normaliser import normalise
from .sink_influx import InfluxSink
async def main():
poller = MTConnectPoller(
base_url=os.environ["MTC_URL"],
device=os.environ["MTC_DEVICE"],
poll_hz=float(os.environ.get("MTC_HZ", "10")),
)
sink = InfluxSink(
url=os.environ["INFLUX_URL"],
token=os.environ["INFLUX_TOKEN"],
org=os.environ["INFLUX_ORG"],
bucket=os.environ["INFLUX_BUCKET"],
)
machine = os.environ["MTC_DEVICE"]
try:
async for s in poller.stream():
n = normalise(s, machine)
if n is not None:
sink.write(n)
finally:
sink.close()
if __name__ == "__main__":
asyncio.run(main())
That is your ingest service. Put it in a Docker container behind a restart policy. Move on.
Step 3 — Physics-informed model (spindle thermal + cutting force estimate)
This is where most “digital twin” projects collapse into dashboards. Resist. A twin needs at least one model that says what should be happening so you can compare against what is happening.
We will keep it pragmatic. Two physics models, both reducible to a handful of parameters.
3.1 Spindle bearing thermal — lumped RC network
Treat the spindle housing as a single thermal capacitance with conduction to ambient and convection from cooling. Heat is injected proportional to spindle load and the square of spindle speed (rough proxy for bearing friction losses).
C · dT/dt = α · load · rpm² + β · rpm − (T − T_ambient) / R
In discrete time, this is a one-line update. Fit α, β, R, C from a warm-up cycle.
# twin/physics.py
import numpy as np
from dataclasses import dataclass
@dataclass
class ThermalParams:
alpha: float # W per (load_pct * rpm^2)
beta: float # W per rpm
R: float # K / W
C: float # J / K
def step_spindle_temp(
T_prev: float, T_amb: float, load_pct: float, rpm: float,
dt: float, p: ThermalParams,
) -> float:
P_gen = p.alpha * load_pct * rpm * rpm + p.beta * rpm
P_loss = (T_prev - T_amb) / p.R
dT = (P_gen - P_loss) * dt / p.C
return T_prev + dT
def simulate(T0: float, T_amb: np.ndarray, load: np.ndarray,
rpm: np.ndarray, dt: float, p: ThermalParams) -> np.ndarray:
T = np.empty_like(T_amb)
T[0] = T0
for k in range(1, len(T_amb)):
T[k] = step_spindle_temp(T[k-1], T_amb[k-1], load[k-1], rpm[k-1], dt, p)
return T
Fit it once against a 30–60 minute warm-up log with a thermocouple bonded near the front bearing. scipy.optimize.minimize on RMSE is plenty; you do not need anything more sophisticated. Expect a steady-state temperature error of 1–3 K after fitting, which is enough to flag a cooling system fault long before it scraps a part.
3.2 Kienzle cutting force estimate
For end-milling, the tangential cutting force per tooth follows the Kienzle equation:
F_t = k_c1.1 · a_p · h^(1 - m_c)
where a_p is axial depth of cut, h is the average chip thickness, k_c1.1 is the specific cutting force at 1 mm² chip section, and m_c is a material exponent. For common workpiece materials, k_c1.1 and m_c are tabulated (Kistler, Sandvik handbooks). Average chip thickness for slotting is h ≈ f_z · sin(φ), simplified to h ≈ f_z · (a_e / D) for partial radial engagement.
You cannot read a_p, a_e or f_z directly from MTConnect. You can read the spindle load and back-calculate using:
P_cut = F_t · v_c → F_t = P_cut / v_c, with v_c = π · D · rpm / 60.
# twin/cutting_force.py
import math
def estimate_cutting_force(spindle_load_pct: float, spindle_rpm: float,
rated_power_kw: float, tool_diameter_mm: float,
efficiency: float = 0.85) -> float:
"""Return estimated tangential cutting force [N]."""
if spindle_rpm < 1.0 or tool_diameter_mm < 0.1:
return 0.0
P_mech = (spindle_load_pct / 100.0) * rated_power_kw * 1000.0 * efficiency # W
v_c = math.pi * tool_diameter_mm * 1e-3 * spindle_rpm / 60.0 # m/s
if v_c < 1e-3:
return 0.0
return P_mech / v_c
This is approximate. It conflates friction in the drivetrain with cutting load. But run it against a Kistler 9257B dynamometer for a few representative cuts and you will find errors of 10–20 % — perfectly adequate for tool-wear trending and feed-override sanity checks.
3.3 ML residual — let scikit-learn clean up
Physics is biased and ML is variance-prone. The standard trick is to learn the residual: model what the physics gets wrong as a function of context.
# twin/ml_residual.py
import joblib
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
FEATURES = [
"spindle_rpm", "spindle_load_pct", "feed_mm_min",
"axis_x_pos_mm", "axis_y_pos_mm", "axis_z_pos_mm",
"T_amb", "T_physics",
]
TARGET = "T_measured"
def build_pipeline() -> Pipeline:
return Pipeline([
("scale", StandardScaler()),
("rf", RandomForestRegressor(
n_estimators=300, max_depth=14, min_samples_leaf=5,
n_jobs=-1, random_state=42,
)),
])
def fit(df: pd.DataFrame, path: str) -> dict:
df = df.dropna(subset=FEATURES + [TARGET])
df["residual"] = df[TARGET] - df["T_physics"]
X = df[FEATURES].to_numpy()
y = df["residual"].to_numpy()
pipe = build_pipeline()
pipe.fit(X, y)
joblib.dump({"pipeline": pipe, "features": FEATURES}, path)
pred = pipe.predict(X) + df["T_physics"].to_numpy()
mae = float(np.mean(np.abs(pred - df[TARGET].to_numpy())))
rmse = float(np.sqrt(np.mean((pred - df[TARGET].to_numpy()) ** 2)))
return {"mae_K": mae, "rmse_K": rmse, "n": int(len(df))}
def predict(model_path: str, df: pd.DataFrame) -> np.ndarray:
obj = joblib.load(model_path)
X = df[obj["features"]].to_numpy()
return obj["pipeline"].predict(X) + df["T_physics"].to_numpy()
A RandomForest with 300 trees fits in seconds on a year of 1 Hz data and is robust to the messy categorical context (different tools, different programs). If you reach for transformers or LSTMs first, ask yourself honestly whether you have the labelled data to justify them. The answer in a job shop is almost always no.
Step 4 — Twin runtime as a service
The twin runtime is a small FastAPI service that:
- Holds the latest state in a Redis cache (or in-memory if you are single-instance — fine for a single machine).
- Exposes a
/stateendpoint for the dashboard and downstream MES. - Exposes a
/predictendpoint that runs the physics + ML estimate on a supplied window. - Exposes a
/twin/replayendpoint that re-runs the twin over a historical window — invaluable for incident reviews.

# twin/service.py
from __future__ import annotations
from datetime import datetime
from typing import Annotated
import joblib
import numpy as np
import pandas as pd
from fastapi import Depends, FastAPI, HTTPException
from pydantic import BaseModel, Field, ConfigDict
from .physics import ThermalParams, simulate
from .cutting_force import estimate_cutting_force
class TwinConfig(BaseModel):
model_config = ConfigDict(frozen=True)
rated_power_kw: float = 15.0
default_tool_d_mm: float = 12.0
thermal: ThermalParams = ThermalParams(
alpha=1.8e-8, beta=1.2e-3, R=0.18, C=14_500.0,
)
class StateOut(BaseModel):
timestamp: datetime
machine: str
spindle_rpm: float
spindle_load_pct: float
spindle_temp_estimated_C: float
cutting_force_estimated_N: float
exec_state: str
class PredictIn(BaseModel):
machine: str
T0_C: float = Field(ge=-20.0, le=120.0)
T_amb_C: list[float]
spindle_load_pct: list[float]
spindle_rpm: list[float]
dt_s: float = Field(gt=0.0, le=60.0)
tool_diameter_mm: float | None = None
class PredictOut(BaseModel):
T_spindle_C: list[float]
F_cut_N: list[float]
app = FastAPI(title="CNC Digital Twin", version="1.0.0")
config = TwinConfig()
_state: dict[str, StateOut] = {}
_ml_model = None # lazy load
def get_ml():
global _ml_model
if _ml_model is None:
try:
_ml_model = joblib.load("/models/spindle_thermal_residual.joblib")
except FileNotFoundError:
_ml_model = None
return _ml_model
@app.get("/health")
def health() -> dict:
return {"status": "ok", "model_loaded": get_ml() is not None}
@app.get("/state/{machine}", response_model=StateOut)
def get_state(machine: str) -> StateOut:
s = _state.get(machine)
if s is None:
raise HTTPException(404, f"no state for {machine}")
return s
@app.post("/predict", response_model=PredictOut)
def predict(body: PredictIn, ml=Depends(get_ml)) -> PredictOut:
n = len(body.T_amb_C)
if not (len(body.spindle_load_pct) == len(body.spindle_rpm) == n):
raise HTTPException(400, "input arrays must be equal length")
T_amb = np.asarray(body.T_amb_C, dtype=float)
load = np.asarray(body.spindle_load_pct, dtype=float)
rpm = np.asarray(body.spindle_rpm, dtype=float)
T_phys = simulate(body.T0_C, T_amb, load, rpm, body.dt_s, config.thermal)
if ml is not None:
df = pd.DataFrame({
"spindle_rpm": rpm,
"spindle_load_pct": load,
"feed_mm_min": np.zeros(n),
"axis_x_pos_mm": np.zeros(n),
"axis_y_pos_mm": np.zeros(n),
"axis_z_pos_mm": np.zeros(n),
"T_amb": T_amb,
"T_physics": T_phys,
})
residual = ml["pipeline"].predict(df[ml["features"]].to_numpy())
T_out = T_phys + residual
else:
T_out = T_phys
d = body.tool_diameter_mm or config.default_tool_d_mm
F_cut = [
estimate_cutting_force(l, r, config.rated_power_kw, d)
for l, r in zip(load.tolist(), rpm.tolist())
]
return PredictOut(T_spindle_C=T_out.tolist(), F_cut_N=F_cut)
Notes on choices:
- Pydantic v2 for I/O contracts. This is your single source of truth for what
/predictexpects; FastAPI will produce an OpenAPI spec for free, which is what your MES/PLM integrator will ask for in week two. - Lazy model load. The service must come up green even if the model file is missing; you do not want the entire twin red-flagged because someone fat-fingered the model path.
- No background tasks in the runtime. Keep the runtime stateless apart from the cache. Anything that needs scheduling lives in a separate worker.
Dockerise:
# twin/Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY twin/ ./twin/
EXPOSE 8080
CMD ["uvicorn", "twin.service:app", "--host", "0.0.0.0", "--port", "8080"]
And a docker-compose.yml to stand the whole thing up:
services:
influx:
image: influxdb:2.7
ports: ["8086:8086"]
environment:
DOCKER_INFLUXDB_INIT_MODE: setup
DOCKER_INFLUXDB_INIT_USERNAME: admin
DOCKER_INFLUXDB_INIT_PASSWORD: change-me-please
DOCKER_INFLUXDB_INIT_ORG: shopfloor
DOCKER_INFLUXDB_INIT_BUCKET: cnc_raw
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: dev-token-only
volumes: ["influx-data:/var/lib/influxdb2"]
mtconnect:
image: ghcr.io/mtconnect/cppagent:latest
ports: ["5000:5000"]
volumes: ["./agent.cfg:/mtconnect/agent.cfg:ro"]
ingest:
build: ./ingest
environment:
MTC_URL: http://mtconnect:5000
MTC_DEVICE: VMC01
MTC_HZ: "10"
INFLUX_URL: http://influx:8086
INFLUX_TOKEN: dev-token-only
INFLUX_ORG: shopfloor
INFLUX_BUCKET: cnc_raw
depends_on: [mtconnect, influx]
restart: unless-stopped
twin:
build: ./twin
ports: ["8080:8080"]
volumes: ["./models:/models:ro"]
depends_on: [influx]
restart: unless-stopped
grafana:
image: grafana/grafana:10.4.2
ports: ["3000:3000"]
volumes:
- "./grafana/provisioning:/etc/grafana/provisioning:ro"
- "grafana-data:/var/lib/grafana"
depends_on: [influx]
volumes:
influx-data: {}
grafana-data: {}
docker compose up -d and you have a stack.
Step 5 — Dashboard (Grafana provisioning)
Click-and-drag Grafana setup belongs in 2018. Provision everything as code so the dashboard travels with the repo.
grafana/provisioning/datasources/influx.yml:
apiVersion: 1
datasources:
- name: InfluxDB
type: influxdb
access: proxy
url: http://influx:8086
jsonData:
version: Flux
organization: shopfloor
defaultBucket: cnc_raw
tlsSkipVerify: true
secureJsonData:
token: dev-token-only
isDefault: true
editable: true
grafana/provisioning/dashboards/dashboards.yml:
apiVersion: 1
providers:
- name: cnc-twin
orgId: 1
folder: CNC Twin
type: file
disableDeletion: false
updateIntervalSeconds: 30
options:
path: /etc/grafana/provisioning/dashboards/json
Then drop dashboard JSON files into grafana/provisioning/dashboards/json/. Build a starter dashboard with four panels:
- Spindle load + RPM time-series, 1 h window, 1 s resolution.
- Estimated vs measured spindle temperature — two series on one axis, with the residual on a second Y axis.
- Estimated cutting force time-series with annotations for tool change events (parse
blockforTwords). - Anomaly score as a state-timeline panel.
Use Flux for the temperature comparison panel:
from(bucket: "cnc_twin_state")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "twin")
|> filter(fn: (r) => r._field == "T_spindle_C" or r._field == "T_measured_C")
|> aggregateWindow(every: 1s, fn: mean, createEmpty: false)
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> map(fn: (r) => ({ r with residual_K: r.T_measured_C - r.T_spindle_C }))
That pivot is the trick that makes the dashboard readable. Tag every panel with the machine name so the same JSON works across a fleet — Grafana variables of type query make this trivial.
Step 6 — Anomaly alerting (rules + ML hybrid)
A pure-ML anomaly detector on machine tools alerts on everything in the first week and is muted by week three. A pure-rules detector misses subtle drift. Combine both, gate them with dwell time, and route the result.

# twin/anomaly.py
from __future__ import annotations
from collections import deque
from dataclasses import dataclass, field
from typing import Iterable
import joblib
import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
ANOMALY_FEATURES = [
"spindle_load_pct", "spindle_rpm", "feed_mm_min",
"T_spindle_C", "vib_rms_g",
]
def train_isoforest(df: pd.DataFrame, path: str) -> None:
pipe = Pipeline([
("scale", StandardScaler()),
("if", IsolationForest(
n_estimators=200, contamination=0.01, random_state=42, n_jobs=-1,
)),
])
pipe.fit(df[ANOMALY_FEATURES].to_numpy())
joblib.dump({"pipeline": pipe, "features": ANOMALY_FEATURES}, path)
@dataclass
class Detector:
model_path: str
load_max_pct: float = 110.0
dT_max_per_min: float = 5.0
cross_signal_load_pct: float = 60.0
score_threshold: float = -0.05 # negative = more anomalous for IF
dwell_windows: int = 3
_hits: deque = field(default_factory=lambda: deque(maxlen=10))
_model = None
def __post_init__(self):
self._model = joblib.load(self.model_path)
def _rule_gate(self, row: pd.Series, prev: pd.Series | None) -> str | None:
if row["spindle_load_pct"] > self.load_max_pct:
return "spindle_load_over_limit"
if prev is not None:
dt_min = (row.name - prev.name).total_seconds() / 60.0
if dt_min > 0:
dTdt = (row["T_spindle_C"] - prev["T_spindle_C"]) / dt_min
if dTdt > self.dT_max_per_min:
return "thermal_runaway"
if (row["spindle_load_pct"] > self.cross_signal_load_pct
and row["feed_mm_min"] < 1.0):
return "load_without_feed"
return None
def _ml_score(self, row: pd.Series) -> float:
X = np.asarray([row[f] for f in self._model["features"]]).reshape(1, -1)
# IsolationForest.score_samples: higher = more normal
return float(self._model["pipeline"].named_steps["if"].score_samples(
self._model["pipeline"].named_steps["scale"].transform(X)
)[0])
def step(self, row: pd.Series, prev: pd.Series | None) -> dict | None:
rule = self._rule_gate(row, prev)
score = self._ml_score(row)
ml_hit = score < self.score_threshold
hit = rule is not None or ml_hit
self._hits.append(hit)
recent = list(self._hits)[-self.dwell_windows:]
if len(recent) < self.dwell_windows or not all(recent):
return None
return {
"ts": row.name.isoformat(),
"severity": "high" if rule is not None else "medium",
"rule": rule,
"ml_score": score,
"values": row.to_dict(),
}
def run(df: pd.DataFrame, det: Detector) -> Iterable[dict]:
prev = None
for ts, row in df.iterrows():
row.name = ts
alert = det.step(row, prev)
if alert:
yield alert
prev = row
Routing is a downstream concern; the detector emits structured events and a small worker pushes them to Slack, Teams or your MES bus. Keep the detector pure (no I/O) — it is the easiest way to unit-test the thing.
Tune the score_threshold against a labelled week of data. A useful starting point: pick the 1st percentile of score_samples on a known-good week and round to two decimals.
Step 7 — Validation against ground truth
A twin without validation is a dashboard. The validation loop has three signals worth comparing:
- Spindle bearing temperature vs the bonded thermocouple. Target: MAE under 2 K, RMSE under 3 K over a one-hour window.
- Estimated cutting force vs a Kistler dynamometer for a representative cut. Target: MAPE under 20 % for the cutting portion (ignore non-cutting blocks).
- Vibration RMS vs the on-spindle accelerometer band-limited to 10–1000 Hz. Target: trend correlation
r > 0.85.

A small offline harness, run nightly, is enough:
# validation/harness.py
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error, mean_squared_error
def score(pred: pd.Series, truth: pd.Series) -> dict:
df = pd.concat([pred.rename("p"), truth.rename("t")], axis=1).dropna()
if len(df) < 60:
return {"n": int(len(df))}
mae = mean_absolute_error(df.t, df.p)
rmse = float(np.sqrt(mean_squared_error(df.t, df.p)))
mape = float(np.mean(np.abs((df.t - df.p) / df.t.replace(0, np.nan)).dropna()) * 100.0)
corr = float(df.t.corr(df.p))
return {"n": int(len(df)), "mae": float(mae), "rmse": rmse,
"mape_pct": mape, "corr": corr}
Push the metrics back into InfluxDB under a cnc_twin_validation measurement and put them on a dashboard the maintenance lead actually looks at. The moment MAE drifts above your budget for two consecutive nights, retrain the residual.
Track the model versions in MLflow (or, honestly, a Git tag and a JSON manifest if you do not want yet another service). When a challenger model beats the champion on a 24-hour holdout window, promote it — but keep the champion warm for 48 hours so you can roll back without redeploying containers.
Trade-offs, gotchas, and where this falls short of full digital twin
Honest list, ordered by how often they bite:
- Latency vs fidelity. At 10 Hz polling, you will see spindle load steps but you will alias high-frequency vibration. If you need real chatter detection, you need a dedicated DAQ at 10 kHz+ and a separate edge stream — MTConnect alone will not get you there. The MTConnect Institute is honest about this in its Sample Interpretation documentation.
- Sensor coverage gaps. Most production CNCs do not have a spindle thermocouple. You can run the physics-only branch without it, but the validation loop becomes weak. Budget for one thermocouple + one accelerometer per “golden machine” and propagate the fitted parameters to siblings.
- Sim-to-real gap on cutting force. The Kienzle approach assumes you know the material and the tool engagement. In a job shop with mixed materials and one-off parts, expect 30–50 % errors on absolute force. Trend-based use (force is rising for the same program) is still valuable; absolute thresholds are not.
- Time sync. PTP (IEEE 1588) on the OT network is ideal. NTP from the gateway will get you within 50 ms on a quiet network and 500 ms when someone is downloading a CAD model. For 10 Hz data this is fine; for cross-signal correlations at 1 kHz it is not.
- IT/OT politics. The hardest battle is not technical. Asking IT to open a port from the shop VLAN to a cloud InfluxDB will generate a six-week security review. Plan for it. The mitigations: keep the InfluxDB on-prem in the IT DMZ, push only aggregated metrics outward, and put the OpenAPI spec of the twin runtime in front of the security team early.
- Vendor lock at the edge. A Fanuc FOCAS adapter is closed-source. If the machine is upgraded, the adapter may need re-licensing. Build the rest of the stack on the canonical schema so an adapter swap is a one-day job.
- “Digital twin” hype. What we have built is a reduced-order, data-driven twin with a validation loop. It is not a high-fidelity FEM that lets you simulate fixture stiffness or thermal distortion of the bed casting in real time. If your CFO has been sold the latter, manage expectations early. NIST’s framing in NISTIR 8356 on digital-twin trustworthiness is a useful reference to ground that conversation.
Practical recommendations
A short list of things that will save you a fortnight, in rough order of return on time:
- Pin every dependency.
requirements.txtwith==exact versions. Random Forest reproducibility depends on the scikit-learn version. You will be thankful when the model needs to be retrained two years from now. - Persist raw and derived separately. Two InfluxDB buckets —
cnc_rawandcnc_twin_state. You will want to reprocess raw with new models without throwing away history. - Write a CLI, not just a service.
python -m twin.cli replay --machine VMC01 --from 2026-05-01 --to 2026-05-02is worth its weight when an incident happens at 03:00. - Capture exec_state transitions as events. A simple state machine on top of the MTConnect
executionfield tells you cycle time more reliably than counting button presses. - Take a “golden week.” Pick a representative week of production output and freeze it as your validation set. Every model change is scored against it before merging. This is the closest thing to CI you will get in industrial ML.
- Train your operators on the dashboard before training your ML model. A dashboard the operator trusts will surface 80 % of the value. Sequence accordingly.
- Buy, do not build, the MTConnect agent. Use mtconnect/cppagent for native protocols and a vendor adapter (Fanuc, MEMEX, Memex MERLIN, etc.) for the control. Building your own FOCAS bridge in 2026 is not a good use of senior engineering time.
FAQ
Q1. Why InfluxDB rather than just dumping to Parquet on S3?
Twin runtimes want second-level reads of the last few minutes for state, and minute-level reads of the last day for predict. A time-series store handles both with one query language and downsampling tasks. Parquet on object storage is excellent for archival and offline retraining; it is poor for the dashboard. Use both.
Q2. Can I do this with OPC UA instead of MTConnect?
Yes. OPC UA via the asyncua Python library follows exactly the same shape — subscribe to nodes, normalise, persist. MTConnect’s advantage is a richer machine-tool domain model and a public schema; OPC UA’s advantage is broader vendor coverage outside machine tools. For a CNC, MTConnect is usually the faster path.
Q3. How long does the warm-up data need to be?
For the thermal model: 30–60 minutes from cold start to thermal steady state, sampled at 1 Hz. For the residual model: at least 40 hours of cutting time across the part programs you actually run. For the IsolationForest detector: one week of “normal” operation including idle, warm-up, tool change and cutting.
Q4. The Kienzle estimate is bad on my machine. Why?
Three usual suspects: (a) the rated spindle power in TwinConfig does not match the drive’s actual rating, (b) the drive efficiency is assumed at 0.85 and yours is closer to 0.7 at low load, (c) the spindle load percentage on Fanuc controls is a peak-hold value, not an instantaneous one. Replace the load source with the spindle motor current (Hall clamp or drive parameter) for a step-change improvement.
Q5. How do I extend this to a fleet of machines?
Run one ingest container per machine, one InfluxDB cluster, one twin runtime that takes machine as a path parameter, and one Grafana with templated variables. Fit thermal parameters per machine; share the ML pipeline as a class but train per-machine residuals — bearing aging is per-machine.
Q6. Where does PLM come in?
At two points. First, the as-designed tool list and material data come from PLM (Teamcenter, 3DEXPERIENCE, Aras). Second, the validated as-built record — what was actually cut, with what spindle temperature profile, against what twin prediction — goes back to PLM as part of the digital thread for that serial number. Keep the twin runtime PLM-agnostic; let an integration worker do the talking.
Further reading
- MTConnect Institute — standards and reference materials: https://www.mtconnect.org/
- MTConnect Reference Agent (
cppagent) on GitHub: https://github.com/mtconnect/cppagent - NIST IR 8356, Considerations for Digital Twin Technology and Emerging Standards: https://csrc.nist.gov/pubs/ir/8356/final
- InfluxDB Python client documentation: https://github.com/influxdata/influxdb-client-python
- TimescaleDB hypertables: https://docs.timescale.com/use-timescale/latest/hypertables/
- FastAPI documentation: https://fastapi.tiangolo.com/
- scikit-learn
IsolationForestreference: https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.IsolationForest.html - Grafana provisioning guide: https://grafana.com/docs/grafana/latest/administration/provisioning/
- Sandvik Coromant — cutting-force theory and Kienzle coefficients: https://www.sandvik.coromant.com/en-gb/knowledge
- Fanuc FOCAS library reference: https://www.fanucamerica.com/products/cnc/cnc-software/focas-developer-tools
- Our companion piece on where this twin sits in the broader stack: IoT, digital twin and PLM: complete overview
- For a factory-scale view of similar architecture choices: Stellantis virtual factory digital twin analysis 2026
