Files
tiandihe/backend/opc_service.py

245 lines
9.0 KiB
Python
Raw Normal View History

"""
OPC-UA polling service.
Logic:
1. Connect to the OPC-UA server at `opc_url`.
2. Read the counter node (`counter_node`) every `poll_interval` seconds.
3. When the counter value changes, read the trackmap nodes listed in
`trackmap_nodes` (a dict: {oracle_column -> node_id}).
4. For each row returned by the trackmap nodes build an UPDATE statement
and apply it to PLTM.CMPT_PL_TRACKMAP.
`trackmap_nodes` example (stored in .env or configured via UI):
{
"COILID": "ns=2;s=PL.TRACKMAP.P01.COILID",
"BEF_ES": "ns=2;s=PL.TRACKMAP.P01.BEF_ES",
...
}
For multi-position setups, define one entry per position column and handle
positional logic accordingly. The current implementation reads a flat set of
nodes and stores them against the POSITION value also read from OPC.
"""
import asyncio
import json
import logging
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger(__name__)
class OpcService:
def __init__(self):
self.config_path = os.path.join(os.path.dirname(__file__), "opc_config.json")
self.opc_url: str = os.getenv("OPC_URL", "opc.tcp://192.168.1.100:4840")
self.counter_node: str = os.getenv(
"OPC_COUNTER_NODE", "ns=2;s=PL.TRACKMAP.COUNTER"
)
self.poll_interval: int = int(os.getenv("OPC_POLL_INTERVAL", "2"))
# Mapping: oracle_column_name -> OPC node id
# Populated from .env or via API
self.trackmap_nodes: Dict[str, str] = self._load_trackmap_nodes()
# Load persisted config if present
self._load_persisted_config()
self.running: bool = False
self.last_counter: Optional[Any] = None
self.last_update: Optional[str] = None
self.event_log: List[str] = []
self._stop_event = asyncio.Event()
self._task: Optional[asyncio.Task] = None
# ------------------------------------------------------------------
def _load_trackmap_nodes(self) -> Dict[str, str]:
"""Load trackmap node mapping from env vars prefixed OPC_NODE_."""
nodes = {}
for key, val in os.environ.items():
if key.startswith("OPC_NODE_"):
col = key[len("OPC_NODE_"):].lower()
nodes[col] = val
return nodes
def _load_persisted_config(self):
"""Load OPC config from local json file if exists."""
if not os.path.exists(self.config_path):
return
try:
with open(self.config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
self.opc_url = cfg.get("opc_url", self.opc_url)
self.counter_node = cfg.get("counter_node", self.counter_node)
self.poll_interval = int(cfg.get("poll_interval", self.poll_interval))
self.trackmap_nodes = cfg.get("trackmap_nodes", self.trackmap_nodes) or {}
self._log(f"Loaded OPC config from {self.config_path}")
except Exception as exc:
logger.warning("Failed to load OPC config %s: %s", self.config_path, exc)
def save_config(self):
"""Persist current OPC config to local json file."""
cfg = {
"opc_url": self.opc_url,
"counter_node": self.counter_node,
"poll_interval": self.poll_interval,
"trackmap_nodes": self.trackmap_nodes,
}
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(cfg, f, ensure_ascii=False, indent=2)
def _log(self, msg: str):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{ts}] {msg}"
logger.info(entry)
self.event_log.append(entry)
if len(self.event_log) > 500:
self.event_log = self.event_log[-500:]
# ------------------------------------------------------------------
async def start_polling(self):
if self.running:
return
self._stop_event.clear()
self._task = asyncio.create_task(self._poll_loop())
async def stop_polling(self):
self._stop_event.set()
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self.running = False
# ------------------------------------------------------------------
async def _poll_loop(self):
self.running = True
self._log(f"OPC polling started: {self.opc_url}")
try:
from opcua import Client # type: ignore
except ImportError:
self._log("opcua package not installed running in SIMULATION mode")
await self._simulate_loop()
return
while not self._stop_event.is_set():
try:
client = Client(self.opc_url)
client.connect()
self._log(f"Connected to OPC server: {self.opc_url}")
try:
while not self._stop_event.is_set():
await self._tick(client)
await asyncio.sleep(self.poll_interval)
finally:
client.disconnect()
self._log("Disconnected from OPC server")
except Exception as exc:
self._log(f"OPC connection error: {exc}. Retrying in 5s...")
await asyncio.sleep(5)
self.running = False
self._log("OPC polling stopped")
# ------------------------------------------------------------------
async def _tick(self, client):
"""Read counter; if changed, fetch trackmap nodes and update Oracle."""
try:
counter_node = client.get_node(self.counter_node)
current_counter = counter_node.get_value()
except Exception as exc:
self._log(f"Failed to read counter node: {exc}")
return
if current_counter == self.last_counter:
return # nothing changed
self._log(
f"Counter changed: {self.last_counter} -> {current_counter}. "
"Fetching trackmap data..."
)
self.last_counter = current_counter
self.last_update = datetime.now().isoformat()
if not self.trackmap_nodes:
self._log("No trackmap nodes configured skipping DB update")
return
# Read all configured nodes
data: Dict[str, Any] = {}
for col, node_id in self.trackmap_nodes.items():
try:
node = client.get_node(node_id)
data[col] = node.get_value()
except Exception as exc:
self._log(f"Failed to read node {node_id}: {exc}")
if not data:
return
# Determine POSITION from data (must be one of the mapped columns)
position = data.get("position")
if position is None:
self._log("'position' not in trackmap_nodes data cannot update row")
return
await self._update_oracle(position, data)
# ------------------------------------------------------------------
async def _update_oracle(self, position: Any, data: Dict[str, Any]):
"""Write fetched OPC values into PLTM.CMPT_PL_TRACKMAP."""
import threading
def _do_update():
try:
from database import get_connection
conn = get_connection()
cursor = conn.cursor()
try:
updatable = {k: v for k, v in data.items() if k != "position"}
if not updatable:
return
set_clause = ", ".join(
[f"{k.upper()} = :{k}" for k in updatable.keys()]
)
updatable["position_"] = position
sql = (
f"UPDATE PLTM.CMPT_PL_TRACKMAP "
f"SET {set_clause} WHERE POSITION = :position_"
)
cursor.execute(sql, updatable)
conn.commit()
self._log(
f"Updated CMPT_PL_TRACKMAP POSITION={position}: "
+ ", ".join(f"{k}={v}" for k, v in updatable.items() if k != "position_")
)
finally:
cursor.close()
conn.close()
except Exception as exc:
self._log(f"Oracle update failed: {exc}")
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _do_update)
# ------------------------------------------------------------------
async def _simulate_loop(self):
"""Simulation mode when opcua is not installed."""
counter = 0
while not self._stop_event.is_set():
await asyncio.sleep(self.poll_interval)
counter += 1
self.last_counter = counter
self.last_update = datetime.now().isoformat()
self._log(f"[SIM] Counter tick: {counter}")
self.running = False
opc_service = OpcService()