Build an OPC UA Server in Python with asyncua (Tutorial)
OPC UA is the lingua franca of industrial data. Every PLC, every distributed control system, every edge gateway speaks it—or should. Yet most tutorials stop at toy examples: a single server, a handful of variables, security politely ignored. In production, your OPC UA server Python asyncua setup needs to handle real address spaces, enforce encryption and authentication, manage client subscriptions, and survive restarts gracefully.
This tutorial builds a production-ready OPC UA server in Python from the ground up. We’ll walk through address space design, security policy configuration, subscriptions and monitored items, Docker packaging, and systemd deployment. By the end, you’ll have a working foundation that scales from a single machine to a fleet of edge gateways feeding a digital twin. We’ll also point out the gotchas that catch most first-time OPC UA implementers.
What this post covers: project setup, pure-Python asyncua library, building and securing an address space, registering methods, subscribing clients, containerization, and deployment strategies.
Why asyncua and Why Python
Python isn’t the first language that springs to mind for industrial protocols. C# has mature OPC UA libraries (OPC Foundation’s official stack), and C++ dominates embedded systems. But Python + asyncua tutorial approach wins in three critical areas: speed to market, maintainability, and ecosystem fit.
The Case for asyncua
asyncua (github.com/FreeOpcUa/opcua-asyncio) is the maintained successor to FreeOpcUa, the longest-running open-source OPC UA implementation for Python. It’s pure Python—no C bindings, no platform-specific compilation—and it covers roughly 80% of the OPC UA specification, including:
- Node classes and address spaces (Objects, Variables, Methods, Types)
- Data types (built-in, custom, structured)
- Security policies (Certificate + signature, encryption, username/password)
- Client subscriptions (monitored items, data change notifications, events)
- Reference types (hierarchical, causal, semantic)
What it doesn’t do: Publish/Subscribe (Pub/Sub) over MQTT transport is partial, and OPC UA Foundation certification isn’t available for open-source. But for bridging PLCs to edge analytics, digital twins, and MES systems, asyncua handles 95% of real-world workloads.
The asyncua project has been actively maintained since 2014, with a healthy community contributing fixes and features. It’s used in production by integrators and small industrial software vendors across Europe and North America. The codebase is transparent—if you hit a bug, you can fix it or propose a patch. Compare that to proprietary stacks where you’re waiting on vendor support cycles.
Async/Await Native
asyncua is built on Python’s asyncio, not callbacks or thread pools. This means:
# Non-blocking I/O, composable with other async libraries
async def handle_subscriptions():
async for event in server.subscriptions:
process_client_event(event)
You can run your PLC read loop, your web API server, and your analytics aggregator in a single Python process, sharing event loops and avoiding thread-safety headaches. This model aligns perfectly with modern Python web frameworks (FastAPI, aiohttp) and data streaming libraries (asyncpg, aioredis). No context switches, no mutex locks—just cooperative multitasking.
Ecosystem Fit
If you’re already using Python for data pipeline orchestration (Prefect, Dagster, Airflow), time-series databases (InfluxDB, TimescaleDB), or analytics (pandas, scikit-learn), plugging in an OPC UA server Python asyncua stack keeps your stack cohesive. No polyglot infrastructure, no context-switching between languages. Your DevOps team maintains one runtime; your monitoring covers one instrumentation surface.
Project Setup and Address Space Design
Dependencies and Docker Base
Create a pyproject.toml:
[project]
name = "opc-ua-server"
version = "1.0.0"
dependencies = [
"asyncua>=1.1.0",
"cryptography>=41.0.0",
"python-dotenv>=1.0.0"
]
[build-system]
requires = ["flit_core >=3.2,<4"]
build-backend = "flit_core.buildapi"
Install locally:
pip install -e .
For Docker, we’ll use Python 3.12 slim:
FROM python:3.12-slim
WORKDIR /app
COPY pyproject.toml .
RUN pip install -e .
COPY server.py certs/ ./
EXPOSE 4840
HEALTHCHECK --interval=30s --timeout=5s CMD python -c "import asyncio, asyncua; asyncio.run(asyncua.Client('opc.tcp://localhost:4840').connect())"
CMD ["python", "server.py"]
Address Space Philosophy
OPC UA’s address space is a tree of nodes. Each node is identified by a NodeId (namespace + numeric or string identifier), has a class (Object, Variable, Method, Type), and exposes attributes (DisplayName, DataType, AccessLevel, etc.) and references (HasChild, Organizes, Implements, etc.).
The key design decision: use OPC UA companion specifications (e.g., OPC 40001 for industrial machinery, OPC 40011 for robotics) rather than inventing custom hierarchies. Companion specs define reusable types, interfaces, and naming conventions. A third-party client looking at your server can immediately understand a “MachineryItem” from the machinery spec—far better than custom types like “MyCustomMachine_v2”.
For this tutorial, we’ll build a simple machine automation hierarchy:
Root
├── Objects
│ ├── Server (standard)
│ └── MyMachine (custom ns:2)
│ ├── Status
│ │ ├── Running (Boolean)
│ │ ├── Speed (Double, [0..3600] RPM)
│ │ └── Mode (Int32, enum: Manual/Auto/Service)
│ ├── Configuration
│ │ ├── SetMode() [Method]
│ │ └── ResetCounter() [Method]
│ └── Diagnostics
│ ├── ErrorCount (Int32)
│ └── LastErrorCode (String)
└── Types
└── MachineType (ObjectType)
The diagram below shows this structure:

Building the Server: Step-by-Step
Skeleton Server
Here’s a minimal working server:
import asyncio
import logging
from asyncua import Server, ua
logging.basicConfig(level=logging.INFO)
async def main():
# Create server instance
server = Server()
# Configure endpoint and security
await server.set_endpoint("opc.tcp://0.0.0.0:4840/freeopcua/server/")
await server.set_server_name("MyMachine OPC UA Server")
# Initialize standard nodes
await server.init()
# Create custom namespace
namespace_uri = "http://example.com/mymachine/1.0"
idx = await server.register_namespace(namespace_uri)
# Get objects folder and create root object
objects = server.get_objects_node()
mymachine = await objects.add_object(
ua.NodeId("MyMachine", idx, nodeid_type=ua.NodeIdType.String),
"MyMachine"
)
# Create sub-folders
status_folder = await mymachine.add_folder(
ua.NodeId("Status", idx, nodeid_type=ua.NodeIdType.String),
"Status"
)
config_folder = await mymachine.add_folder(
ua.NodeId("Configuration", idx, nodeid_type=ua.NodeIdType.String),
"Configuration"
)
# Add variables to Status folder
running_var = await status_folder.add_variable(
ua.NodeId("Running", idx, nodeid_type=ua.NodeIdType.String),
"Running",
False,
varianttype=ua.VariantType.Boolean
)
speed_var = await status_folder.add_variable(
ua.NodeId("Speed", idx, nodeid_type=ua.NodeIdType.String),
"Speed",
0.0,
varianttype=ua.VariantType.Double
)
# Set variable access levels (readable + writable)
await running_var.set_writable(True)
await speed_var.set_writable(True)
# Start server
async with server:
logging.info(f"Server started at {server.endpoint}")
# Simulate I/O updates
while True:
new_speed = 1500.0 # Simulate sensor reading
await speed_var.write_value(new_speed)
await asyncio.sleep(1)
if __name__ == "__main__":
asyncio.run(main())
Full Code Walkthrough
The server lifecycle follows this pattern:
- Instantiate
Server() - Configure endpoint and security policies
- Initialize standard nodes (Server, NamespaceArray, etc.)
- Register custom namespaces
- Build address space (objects, variables, types, methods)
- Start async context manager
- Update variables in a loop (reading from I/O, databases, etc.)
- Stop gracefully on signal
Let’s expand the skeleton with methods and data type definitions:
from asyncua.common import uamethod
from datetime import datetime
@uamethod
async def set_mode(parent, mode_value):
"""Method: change machine mode"""
if mode_value not in [0, 1, 2]: # 0=Manual, 1=Auto, 2=Service
return ua.StatusCode(ua.StatusCodes.BadOutOfRange)
parent.Mode.Value = mode_value
parent.LastModeChange = datetime.now()
return ua.StatusCode(ua.StatusCodes.Good)
@uamethod
async def reset_counter(parent):
"""Method: reset error counter"""
parent.ErrorCount.Value = 0
return ua.StatusCode(ua.StatusCodes.Good)
Register methods on the config folder:
await config_folder.add_method(
ua.NodeId("SetMode", idx, nodeid_type=ua.NodeIdType.String),
"SetMode",
set_mode,
[ua.uaprotocol_auto.Int32_uatype], # Input args
[ua.uaprotocol_auto.StatusCode_uatype] # Output args
)
The sequence diagram below illustrates the full lifecycle:

Security: Anonymous → Username/Password → Certificate
Out of the box, asyncua defaults to Anonymous mode—fine for development, forbidden in production. Moving to real security requires three steps: policy selection, credential management, and certificate provisioning.
Security Policies Explained
OPC UA defines a “security policy” as a tuple of (signing algorithm, encryption algorithm, key size). asyncua supports:
| Policy | Use Case | Signing | Encryption | Strength |
|---|---|---|---|---|
| None | Dev/test only | No | No | None |
| Basic256Sha256 | Standard production | RSA-SHA256 | AES-128 | ⭐⭐⭐ |
| Aes128_Sha256_RsaOaep | Higher assurance | RSA-SHA256 | AES-128 OAEP | ⭐⭐⭐⭐ |
| Aes256_Sha256_RsaPss | High-security sites | RSA-PSS-SHA256 | AES-256 | ⭐⭐⭐⭐⭐ |
The security ladder below shows the progression:

Setting Up Username/Password (Easiest)
# Enable username/password authentication
server.set_security_IDs([
ua.SecurityPolicyType.Basic256Sha256_Sign,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt
])
# Define users
async def user_manager(isession, username, password):
"""Validate user credentials"""
valid_users = {
"operator": "password123",
"admin": "secretadmin"
}
if username in valid_users and valid_users[username] == password:
return True
return False
server.set_user_manager(user_manager)
Client connects:
client = AsyncClient("opc.tcp://localhost:4840/")
client.set_user("operator")
client.set_password("password123")
await client.connect()
Certificate-Based Security (Production Standard)
Generate a self-signed certificate:
openssl req -x509 -newkey rsa:2048 -keyout server_key.pem -out server_cert.pem -days 365 -nodes
Load in server:
from cryptography import x509
from cryptography.hazmat.backends import default_backend
# Load cert and key
with open("server_cert.pem", "rb") as f:
cert_pem = f.read()
with open("server_key.pem", "rb") as f:
key_pem = f.read()
# asyncua expects DER format
from cryptography.hazmat.primitives import serialization
cert_obj = x509.load_pem_x509_certificate(cert_pem, default_backend())
cert_der = cert_obj.public_bytes(serialization.Encoding.DER)
# Install on server
await server.load_certificate(cert_der)
await server.load_private_key(key_pem.decode())
Enable policies:
server.set_security_IDs([
ua.SecurityPolicyType.Basic256Sha256_Sign,
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt
])
Clients must trust the cert (or disable validation for testing):
client = AsyncClient("opc.tcp://localhost:4840/")
client.set_security(
ua.SecurityPolicyType.Basic256Sha256_SignAndEncrypt,
"server_cert.pem",
"client_cert.pem",
"client_key.pem",
mode=ua.MessageSecurityMode.SignAndEncrypt
)
await client.connect()
Working with Namespaces and Custom Types
asyncua supports custom data types and object types—essential for modeling domain-specific structures. Here is how to define a custom machine status type:
# Define a custom ObjectType that all machines conform to
machine_type = await objects.add_object_type(
ua.NodeId("MachineType", idx, nodeid_type=ua.NodeIdType.String),
"MachineType"
)
# Add type variables (defaults for instances)
await machine_type.add_variable(
ua.NodeId("Manufacturer", idx),
"Manufacturer",
"Unknown",
varianttype=ua.VariantType.String
)
await machine_type.add_variable(
ua.NodeId("SerialNumber", idx),
"SerialNumber",
"",
varianttype=ua.VariantType.String
)
# Instantiate the type
pump_instance = await objects.add_object(
ua.NodeId("Pump_A1", idx),
"Pump_A1",
objecttype=machine_type
)
# Override type defaults at instantiation
await pump_instance.get_child("Manufacturer").write_value("Grundfos")
await pump_instance.get_child("SerialNumber").write_value("GR-2024-001")
Custom types promote consistency and allow clients to discover machine capabilities programmatically. When you model your factory using companion specs, third-party tools can auto-configure dashboards and analytics pipelines.
Subscriptions and Monitored Items
The real power of OPC UA is publish-subscribe. Clients don’t poll; the server pushes data changes to subscribers. This is far more efficient than REST polling and scales to thousands of clients.
Server Side (Automatic)
Any variable created in asyncua is automatically subscribed. When you update it:
await speed_var.write_value(1750.0)
All connected subscribers with monitored items on speed_var receive notifications. The server queues changes internally and publishes them on the subscription’s publishing interval.
Client Side (Subscription Handling)
A client creates a subscription and registers monitored items:
async with AsyncClient("opc.tcp://localhost:4840/") as client:
await client.connect()
# Create subscription with 1-second publish interval
subscription = await client.create_subscription(1000, None)
# Monitor Speed and Mode variables
handle_speed = await subscription.subscribe_data_change(speed_node)
handle_mode = await subscription.subscribe_data_change(mode_node)
# Handle notifications
async def datachange_handler(handle, node, val, attr):
print(f"Data changed on {node.nodeid.Identifier}: {val}")
subscription.on_change(handle_speed, datachange_handler)
subscription.on_change(handle_mode, datachange_handler)
# Keep subscription alive
await asyncio.sleep(3600)
await subscription.delete()
The diagram below shows the subscription flow:

Tuning Parameters
Detailed Lifecycle and Error Handling
In production, graceful shutdown is critical. The async context manager handles cleanup, but you should also catch signals:
import signal
async def main():
server = Server()
# ... setup ...
async def shutdown():
logging.info("Shutting down OPC UA server")
await server.stop()
# Handle SIGTERM and SIGINT
loop = asyncio.get_event_loop()
for sig in (signal.SIGTERM, signal.SIGINT):
loop.add_signal_handler(
sig, lambda: asyncio.create_task(shutdown())
)
async with server:
logging.info(f"Server started at {server.endpoint}")
try:
while True:
await asyncio.sleep(1)
except asyncio.CancelledError:
await shutdown()
This ensures all client sessions are properly closed and subscriptions are cleaned up before the process exits. Without this, clients may hang trying to reconnect, wasting resources.
– PublishingInterval: how often the server sends notifications (in milliseconds). Lower = more frequent, higher latency. Default 1000ms is often reasonable. Too low (50ms) wastes bandwidth; too high (10000ms) causes stale data.
– SamplingInterval: how often the server reads a monitored variable internally. Can be lower than publishing interval for aggregated reporting or dead-band filtering.
– QueueSize: how many notifications the server buffers before dropping old ones. Prevents packet loss under load. Typical: 10–100.
– DeadBand: minimum change threshold before notifying. Reduces noise (e.g., only notify if speed changes by >5%).
For industrial control, 100–500ms publish intervals are typical. For dashboards and analytics, 1000–5000ms is sufficient. Real-time systems may need 50ms or lower, but that requires tuning and testing.
Docker Packaging + systemd Deployment + Health Checks
A Python server on a developer’s laptop isn’t production-ready. We need containerization, process supervision, and observability.
Multi-Stage Dockerfile
# Build stage (optional, if you have compilation steps)
FROM python:3.12-slim as builder
WORKDIR /build
COPY pyproject.toml .
RUN pip install --user --no-cache-dir -e .
# Runtime stage
FROM python:3.12-slim
WORKDIR /app
# Copy installed packages from builder
COPY --from=builder /root/.local /root/.local
# Copy application
COPY server.py certs/ ./
# Ensure pip-installed packages are in PATH
ENV PATH=/root/.local/bin:$PATH
# OPC UA standard port
EXPOSE 4840
# Health check: try to connect to server
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD python -c "import asyncio, asyncua; asyncio.run(asyncua.Client('opc.tcp://localhost:4840').connect())" || exit 1
# Run server
CMD ["python", "server.py"]
Systemd Unit File
For bare-metal or VM deployments:
[Unit]
Description=OPC UA Machine Server
After=network.target
[Service]
Type=simple
User=opcua
WorkingDirectory=/opt/opc-ua-server
ExecStart=/usr/bin/python3 server.py
Restart=on-failure
RestartSec=10s
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
Enable and start:
sudo systemctl enable opc-ua-server
sudo systemctl start opc-ua-server
sudo systemctl status opc-ua-server
journalctl -u opc-ua-server -f
Container Architecture
The diagram below shows how the container fits into a broader system:

For orchestration with Prometheus metrics and PLC integration:
# docker-compose.yml
version: '3.8'
services:
opc-ua-server:
build: .
ports:
- "4840:4840"
environment:
- OPCUA_ENDPOINT=opc.tcp://0.0.0.0:4840
- LOG_LEVEL=INFO
healthcheck:
test: ["CMD", "python", "-c", "import asyncio, asyncua; asyncio.run(asyncua.Client('opc.tcp://localhost:4840').connect())"]
interval: 30s
timeout: 5s
retries: 3
volumes:
- ./certs:/app/certs:ro
networks:
- iiot
prometheus:
image: prom/prometheus
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
networks:
- iiot
networks:
iiot:
driver: bridge
Trade-offs and Gotchas
Performance Ceiling
Pure Python + asyncio = single-threaded event loop. Under heavy load, a single asyncua server updates ~5,000–10,000 variables per second before saturating. If you need higher throughput, either:
- Shard the address space across multiple server instances
- Use compiled OPC UA stacks (C#, C++)
- Pre-aggregate data upstream (combine 100 I/O signals into 10 computed metrics)
This ceiling arises because the asyncio event loop cannot truly parallelize: even with multiple coroutines, they take turns on a single CPU core. The GIL (Global Interpreter Lock) further constrains CPU-bound work. For a factory with 10,000+ real-time sensors, distribute the load horizontally—run one asyncua server per PLC cabinet or cell, and have a parent aggregator collect their data. This scales elastically and isolates faults.
No OPC UA Foundation Certification
asyncua is excellent but not OPC UA Foundation certified. Major industrial vendors (Siemens, Rockwell, GE) may not officially support it. Solution: treat asyncua as a gateway, not a primary control system. Bridge it to a certified C# or Java server for mission-critical systems.
Certification matters in regulated industries (automotive, pharmaceuticals, nuclear). If you’re subject to audits or need vendor SLAs, start with an official stack. But for internal data plumbing, brownfield integrations, and early-stage IoT prototypes, asyncua’s openness and simplicity often outweigh formal blessing.
Security Certificate Validation Pitfalls
Clients often skip certificate validation (mode=MessageSecurityMode.None) to “test quickly,” then deploy that way. Always enforce validation in production. Common mistakes:
- Using self-signed certs without loading them in the client trust store
- Mismatched DNS names (cert says “server.example.com”, client connects to “localhost”)
- Expired certificates (easy to miss, set calendar reminders)
- Weak RSA key sizes (use 2048 bits minimum, 4096 for high-security sites)
A compromised certificate means a man-in-the-middle attacker can eavesdrop on all variable reads/writes and inject false data. The damage compounds if your OPC UA server controls critical setpoints (heater temps, pump speeds, etc.). Invest time in certificate lifecycle management—automate renewal with Let’s Encrypt or your internal PKI, and test rotation on staging before production.
Namespace and Type Explosion
As your address space grows (100s of machine types, 1000s of variables), without strict naming conventions it becomes a tangled mess. Best practice: adopt a companion spec or write an internal standard early.
Define rules like:
– All machine nodes use ns:2; all company metadata uses ns:3
– Folder structure mirrors factory floor topology (Plant > Line > Cell > Equipment > Component)
– Variable names follow a consistent pattern: {equipment}_{signal}_{unit} (e.g., Pump_A1_Discharge_Pressure_Bar)
– Version your namespace URIs; retiring old namespaces avoids legacy cruft
Violating these early costs little. Fixing them later, when 500 clients depend on the old hierarchy, is expensive.
Complexity of Full Pub/Sub
asyncua supports client subscriptions (above), but OPC UA Pub/Sub (sending data over MQTT, AMQP, etc.) is partially implemented. If you need event-driven, multi-tenant pub/sub, consider Sparkplug B over MQTT (simpler, fewer dependencies) or OPC UA Pub/Sub with a mature broker (Mosquitto, RabbitMQ).
Practical Recommendations
1. Ship Companion-Spec Compliant Servers
Don’t invent custom types. Use OPC 40001 (machinery), OPC 40011 (robotics), or OPC 40101 (condition monitoring). Clients expect standard shapes; deviating wastes integration effort.
2. Use Async Correctly
asyncua is async-first, but blocking code (slow database queries, file I/O) will starve your event loop. Always use asyncio.to_thread() for blocking operations:
# BAD: blocks the event loop
result = my_slow_database_query()
# GOOD: runs in thread pool, doesn't block
result = await asyncio.to_thread(my_slow_database_query)
### Certificate Management and Rotation
In production, certificates expire and need rotation. Here is an automated renewal pattern using cryptography:
```python
from cryptography.x509.oid import NameOID, ExtensionOID
from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.backends import default_backend
from datetime import datetime, timedelta
import os
def renew_certificate(hostname="localhost", days_valid=365):
"""Generate a fresh self-signed certificate"""
key = x509.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
subject = issuer = x509.Name([
x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"CA"),
x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"MyFactory"),
x509.NameAttribute(NameOID.COMMON_NAME, hostname),
])
cert = x509.CertificateBuilder().subject_name(
subject
).issuer_name(
issuer
).public_key(
key.public_key()
).serial_number(
x509.random_serial_number()
).not_valid_before(
datetime.utcnow()
).not_valid_after(
datetime.utcnow() + timedelta(days=days_valid)
).add_extension(
x509.SubjectAlternativeName([
x509.DNSName(hostname),
x509.DNSName("*.example.com"),
]),
critical=False,
).sign(key, hashes.SHA256(), default_backend())
# Write to files
with open("server_cert.pem", "wb") as f:
f.write(cert.public_bytes(serialization.Encoding.PEM))
with open("server_key.pem", "wb") as f:
f.write(key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
))
logging.info(f"Certificate renewed, valid until {cert.not_valid_after}")
Schedule this to run monthly (cron job, Kubernetes CronJob, or systemd timer) to keep certs fresh. On renewal, reload the cert in your running server without restart if possible.
### 3. Deploy as Sidecar, Not Primary Control
Treat the Python OPC UA server as a **gateway**, bridging industrial I/O to the data plane. Don't make it the source of truth for control logic. If it crashes, your PLC should keep running.
### 4. Monitor Aggressively
Log every connection, disconnection, and method invocation. Wire Prometheus metrics:
```python
from prometheus_client import Counter, Histogram, Gauge
subscriptions_active = Gauge('opc_ua_subscriptions_active', 'Active subscriptions')
variable_updates = Counter('opc_ua_variable_updates_total', 'Total variable updates')
update_latency = Histogram('opc_ua_update_latency_seconds', 'Update processing latency')
FAQ
Q: Can asyncua talk to Siemens S7 or Allen-Bradley CompactLogix?
A: Not directly. asyncua is an OPC UA server, not a PLC client. To bridge a PLC to OPC UA, use a separate library (snap7 for Siemens, CIP/EtherNet/IP for AB) to read the PLC, then push data into asyncua variables.
Q: How many clients can one server handle?
A: Depends on hardware and message rate. Each subscription is a lightweight async task. Typical limits: 100–1000 concurrent clients before network or CPU saturation. Horizontal scaling (multiple servers) is easier than tuning a single instance to 10k clients.
Q: What’s the difference between OPC UA and OPC (Classic)?
A: OPC Classic is Windows-only DCOM, obsolete since ~2015. OPC UA is platform-agnostic, uses TCP or HTTPS, and supports publish-subscribe. Always use OPC UA unless you’re stuck maintaining legacy systems.
Q: Can I use asyncua for real-time control?
A: No. asyncua is soft real-time (best-effort, no guarantees). If you need deterministic <100ms latency, use Modbus RTU or CANopen directly on the PLC. Use asyncua for monitoring and non-critical setpoints only.
Q: How do I handle 10,000 variables?
Advanced Features: Events and Complex Types
Beyond simple variables, OPC UA supports events—asynchronous notifications triggered by conditions (e.g., alarms). asyncua can emit events:
from asyncua import ua
# Define an event type (inherits from BaseEventType)
event_type = await objects.add_object_type(
ua.NodeId("MachineAlarmType", idx, nodeid_type=ua.NodeIdType.String),
"MachineAlarmType"
)
# Add event properties
await event_type.add_variable(
ua.NodeId("Severity", idx, nodeid_type=ua.NodeIdType.String),
"Severity",
0,
varianttype=ua.VariantType.Int32
)
# Later, trigger an event instance
event = ua.Event(source_node=mymachine, event_type=event_type)
event.Severity = 3 # Critical
event.Message = "Temperature sensor failure"
await event.trigger()
Clients receive events through subscriptions, enabling real-time alerting and condition monitoring. Events are essential for predictive maintenance workflows.
A: Split into multiple servers (one per cabinet or area), or use lazy loading: create variables on-demand as clients request them, rather than pre-allocating the entire address space.
Further Reading
- OPC UA Pub/Sub vs MQTT Architecture: UADP Comparison — when to use OPC UA Pub/Sub over MQTT, and how Unified Architecture Data Protocol (UADP) fits in.
- DDS Data Distribution Service Protocol: Complete Guide — alternative to OPC UA for real-time, distributed systems.
- Sparkplug B 3.0: Unified Namespace Guide — simpler, lighter-weight alternative for edge-to-cloud data flows.
- asyncua GitHub Repository — source code, issues, examples.
- OPC Foundation Specifications — official OPC UA part 1–14 specs (dense, but authoritative).
Conclusion
Building a production-ready OPC UA server Python asyncua instance is within reach for any Python developer. Start small (single machine, anonymous mode), add security and subscriptions incrementally, containerize, and scale horizontally. asyncua gives you 80% of the OPC UA spec without C bindings or vendor lock-in—and the remaining 20% can usually be worked around with careful design.
The next time you’re bridging a PLC farm to a digital twin or feeding a lakehouse, reach for asyncua. Your integration will ship faster, run cheaper, and integrate smoothly with Python’s data ecosystem.
Author: Riju
Published: 2026-04-27
Topics: Industrial IoT, OPC UA, Python, asyncua, Edge Computing, IIoT Tutorials
5. Build Resilience with Health Checks and Fallbacks
A production OPC UA server must survive transient failures gracefully. Implement retry logic and circuit breakers:
from tenacity import retry, stop_after_attempt, wait_exponential
class ResilientVariableWriter:
def __init__(self, variable_node, max_retries=3):
self.variable_node = variable_node
self.max_retries = max_retries
self.last_value = None
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def write_with_retry(self, value):
"""Write value with automatic retry on transient failures"""
try:
await self.variable_node.write_value(value)
self.last_value = value
return True
except Exception as e:
logging.warning(f"Write failed, retrying: {e}")
raise
async def write_or_fallback(self, value, fallback_value=None):
"""Try to write; if all retries fail, use fallback"""
try:
await self.write_with_retry(value)
except Exception as e:
logging.error(f"All retries exhausted: {e}")
if fallback_value is not None:
await self.variable_node.write_value(fallback_value)
This ensures failed writes don’t cascade into system failures. Pair with alerting (Prometheus, PagerDuty) so ops teams know when fallbacks activate.
6. Versioning and API Stability
As your address space grows, clients depend on the node structure. Break changes carefully:
# Version your namespace URIs
NS_V1 = "http://example.com/mymachine/1.0" # Legacy
NS_V2 = "http://example.com/mymachine/2.0" # Current
async def main():
server = Server()
# Support both old and new clients
idx_v1 = await server.register_namespace(NS_V1)
idx_v2 = await server.register_namespace(NS_V2)
# V1 nodes for backward compatibility
objects = server.get_objects_node()
machine_v1 = await objects.add_folder(idx_v1, "Machine")
# V2 nodes with new structure
machine_v2 = await objects.add_folder(idx_v2, "MachineV2")
When you deprecate V1, give clients at least 2–3 release cycles to migrate. Document the migration path clearly in release notes. Breaking the address space without warning is one of the fastest ways to lose trust with integrators.
7. Monitoring and Observability
Wire up comprehensive observability so you catch issues before customers do. Beyond Prometheus metrics, log important events:
import json
class OpcUaMetrics:
def __init__(self):
self.clients_connected = Gauge('opc_ua_clients_connected', 'Current client connections')
self.variable_updates = Counter('opc_ua_variable_updates_total', 'Total variable updates')
self.write_errors = Counter('opc_ua_write_errors_total', 'Total write errors')
self.subscription_creates = Counter('opc_ua_subscriptions_created_total', 'Subscriptions created')
async def log_event(self, event_type, details):
"""Structured logging for audit trails"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'details': details
}
logging.info(json.dumps(log_entry))
metrics = OpcUaMetrics()
# In your server event handlers
async def on_client_connect(session):
metrics.clients_connected.inc()
await metrics.log_event('client_connect', {'session_id': str(session.session_id)})
async def on_variable_write(node_id, value):
metrics.variable_updates.inc()
await metrics.log_event('variable_write', {'node_id': str(node_id), 'value': value})
Structured logs integrate with centralized logging (ELK, Loki, Splunk) and enable post-mortem analysis when issues occur.
Deploying Across Multiple Data Centers
For global or federated factories, replicate your OPC UA server across regions. asyncua doesn’t natively support clustering, but you can implement a hub-and-spoke pattern:
# Regional server in Europe
EU_ENDPOINT = "opc.tcp://eu.factory.local:4840/"
# Regional server in Asia
ASIA_ENDPOINT = "opc.tcp://asia.factory.local:4840/"
# Central aggregator
class GlobalAggregator:
async def __init__(self):
self.eu_client = Client(EU_ENDPOINT)
self.asia_client = Client(ASIA_ENDPOINT)
async def start(self):
await self.eu_client.connect()
await self.asia_client.connect()
# Subscribe to both regions
eu_sub = await self.eu_client.create_subscription(1000, None)
asia_sub = await self.asia_client.create_subscription(1000, None)
# Aggregate metrics
eu_metrics = await self._get_metrics(eu_sub)
asia_metrics = await self._get_metrics(asia_sub)
return await self._publish_global_view(eu_metrics, asia_metrics)
This decentralizes control (each region operates independently) while maintaining global visibility. Use Kafka or event streams to publish regional events to a central analytics platform.
Common Pitfalls and Lessons from the Field
Over the past five years, integrators report these recurring issues:
-
Forgetting to set access levels on variables. All variables default to read-only. If clients can’t write setpoints, your server is just a glorified dashboard.
-
Flooding the address space. One integrator created 100,000 variables without pagination or lazy-loading. Clients choked trying to browse. Keep the address space lean; use methods to access large datasets on-demand.
-
No versioning strategy. Three years in, someone wants to change the name of a critical variable. If you don’t support both old and new names, you break 50 clients. Plan for evolution from day one.
-
Hardcoding endpoints and ports. Use environment variables for configuration. When your container moves to a new host, it should not require recompilation.
-
Ignoring clock skew. If your OPC UA server and PLC clocks drift by 10 seconds, timestamps become meaningless. Sync clocks with NTP before going live.
-
Not testing failover. Test what happens if the server crashes mid-subscription. Do clients reconnect cleanly? Does the address space recover? Do you lose queued updates? Chaos engineering pays dividends here.
These lessons—collected from production deployments—can save you weeks of debugging.
