Files
tiandihe/backend/opc_service.py
Joshi 95ec77afae feat(opc): 添加计划写入触发功能及相关配置
后续的配置是追踪的点位配置和写入的点位配置已经做好持久化在页面上配置完保存重启OPC即可实现持久化
后续的代码修改:从哪里开始而不是从最小的钢卷号开始,因为对方数据库里面的计划有几百条,写入的时候写入哪个计划的钢卷信息给一级都是需要修改代码的,现在默认的都是第一个钢卷

添加写入计数器、来源和目标节点的配置,支持从计划表读取数据并写入到指定开卷机的OPC节点。包括:
1. 在models.py中添加相关字段
2. 在opc_service.py中实现写入触发逻辑
3. 在OpcConfig.vue中添加配置界面
4. 更新相关API接口以支持新配置
2026-04-13 16:09:48 +08:00

1156 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
OPC-UA polling service.
# 调用示例OPC-UA 写入)
# await opc_service.write_node_value("ns=2;s=PL.TEST.COUNT", 123, "Int32")
#
# 调用示例S7 写入)
# await opc_service.write_s7_value(
# endpoint="192.168.0.10:102",
# address="DB1.DBW4",
# data_length=2,
# data_type="INT16",
# value=123,
# )
Logic:
1. Connect to the OPC-UA server at `opc_url`.
2. Read the counter node (`counter_node`) every `poll_interval` seconds.
3. When the counter value changes, read the trackmap nodes listed in
`trackmap_nodes` (a dict: {oracle_column -> node_id}).
4. For each row returned by the trackmap nodes build an UPDATE statement
and apply it to PLTM.CMPT_PL_TRACKMAP.
5. Monitor two signal nodes:
- Signal1 (Entry): When 0->1, fetch next 5 coils from PDI and save to SQLite temp table
- Signal2 (WeldDone): When 0->1 and held for 2s, update CMPT_PL_TRACKMAP
`trackmap_nodes` example (stored in .env or configured via UI):
{
"COILID": "ns=2;s=PL.TRACKMAP.P01.COILID",
"BEF_ES": "ns=2;s=PL.TRACKMAP.P01.BEF_ES",
...
}
For multi-position setups, define one entry per position column and handle
positional logic accordingly. The current implementation reads a flat set of
nodes and stores them against the POSITION value also read from OPC.
"""
import asyncio
import json
import logging
import os
import re
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger(__name__)
class OpcService:
def __init__(self):
self.config_path = os.path.join(os.path.dirname(__file__), "opc_config.json")
self.opc_url: str = os.getenv("OPC_URL", "opc.tcp://192.168.1.100:4840")
self.counter_node: str = os.getenv(
"OPC_COUNTER_NODE", "ns=2;s=PL.TRACKMAP.COUNTER"
)
self.poll_interval: int = int(os.getenv("OPC_POLL_INTERVAL", "2"))
self.signal1_node: str = os.getenv("OPC_SIGNAL1_NODE", "ns=2;s=PL.Signal.EntryCoil")
self.signal2_node: str = os.getenv("OPC_SIGNAL2_NODE", "ns=2;s=PL.Signal.WeldDone")
self.signal1_last: Optional[int] = None
self.signal2_last: Optional[int] = None
self.signal2_rise_time: Optional[datetime] = None
# 写入触发相关(三点位独立于追踪信号)
self.write_counter_node: str = os.getenv("OPC_WRITE_COUNTER_NODE", "")
self.write_source_node: str = os.getenv("OPC_WRITE_SOURCE_NODE", "")
self.write_target_node: str = os.getenv("OPC_WRITE_TARGET_NODE", "")
self.write_counter_last: Optional[Any] = None
# 写入字段映射(按目标开卷机 1/2 分组)
self.write_nodes: Dict[str, Dict[str, str]] = {}
self.signal1_coils: List[Dict[str, Any]] = []
self.first_coilid: str = ""
self.last_tracked_coilid: str = ""
self.end_coilid: str = ""
self.tracking_ended: bool = False
# 状态机: WAIT_S1=等待信号1, WAIT_S2=等待信号2
self.track_state: str = "WAIT_S1"
self.last_counter_at_state_change: Optional[Any] = None
# Mapping: oracle_column_name -> OPC node id
# Populated from .env or via API
self.trackmap_nodes: Dict[str, str] = self._load_trackmap_nodes()
# Load persisted config if present
self._load_persisted_config()
self.running: bool = False
self.last_counter: Optional[Any] = None
self.last_update: Optional[str] = None
self.event_log: List[str] = []
self._stop_event = asyncio.Event()
self._task: Optional[asyncio.Task] = None
# ------------------------------------------------------------------
def _load_trackmap_nodes(self) -> Dict[str, str]:
"""Load trackmap node mapping from env vars prefixed OPC_NODE_."""
nodes = {}
for key, val in os.environ.items():
if key.startswith("OPC_NODE_"):
col = key[len("OPC_NODE_"):].lower()
nodes[col] = val
return nodes
def _load_persisted_config(self):
"""Load OPC config from local json file if exists."""
if not os.path.exists(self.config_path):
return
try:
with open(self.config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
self.opc_url = cfg.get("opc_url", self.opc_url)
self.counter_node = cfg.get("counter_node", self.counter_node)
self.poll_interval = int(cfg.get("poll_interval", self.poll_interval))
self.trackmap_nodes = cfg.get("trackmap_nodes", self.trackmap_nodes) or {}
self.signal1_node = cfg.get("signal1_node", self.signal1_node)
self.signal2_node = cfg.get("signal2_node", self.signal2_node)
self.write_counter_node = cfg.get("write_counter_node", self.write_counter_node)
self.write_source_node = cfg.get("write_source_node", self.write_source_node)
self.write_target_node = cfg.get("write_target_node", self.write_target_node)
self.write_nodes = cfg.get("write_nodes", self.write_nodes) or {}
self._log(f"Loaded OPC config from {self.config_path}")
except Exception as exc:
logger.warning("Failed to load OPC config %s: %s", self.config_path, exc)
def save_config(self):
"""Persist current OPC config to local json file."""
cfg = {
"opc_url": self.opc_url,
"counter_node": self.counter_node,
"poll_interval": self.poll_interval,
"trackmap_nodes": self.trackmap_nodes,
"signal1_node": self.signal1_node,
"signal2_node": self.signal2_node,
"write_counter_node": self.write_counter_node,
"write_source_node": self.write_source_node,
"write_target_node": self.write_target_node,
"write_nodes": self.write_nodes,
}
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(cfg, f, ensure_ascii=False, indent=2)
def _log(self, msg: str):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{ts}] {msg}"
logger.info(entry)
self.event_log.append(entry)
if len(self.event_log) > 500:
self.event_log = self.event_log[-500:]
# ------------------------------------------------------------------
async def start_polling(self):
if self.running:
return
self._stop_event.clear()
self._task = asyncio.create_task(self._poll_loop())
async def stop_polling(self):
self._stop_event.set()
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self.running = False
# ------------------------------------------------------------------
async def _poll_loop(self):
self.running = True
self._log(f"OPC polling started: {self.opc_url}")
try:
from opcua import Client # type: ignore
except ImportError:
self._log("opcua package not installed running in SIMULATION mode")
await self._simulate_loop()
return
while not self._stop_event.is_set():
try:
client = Client(self.opc_url)
client.connect()
self._log(f"Connected to OPC server: {self.opc_url}")
try:
while not self._stop_event.is_set():
current_counter = await self._tick_with_counter(client)
await self._check_signals(client, current_counter)
await self._check_write_trigger(client)
await asyncio.sleep(self.poll_interval)
finally:
client.disconnect()
self._log("Disconnected from OPC server")
except Exception as exc:
self._log(f"OPC connection error: {exc}. Retrying in 5s...")
await asyncio.sleep(5)
self.running = False
self._log("OPC polling stopped")
# ------------------------------------------------------------------
async def _tick_with_counter(self, client):
"""Read counter and update trackmap; return current_counter."""
try:
counter_node = client.get_node(self.counter_node)
current_counter = counter_node.get_value()
except Exception as exc:
self._log(f"Failed to read counter node: {exc}")
return None
if current_counter == self.last_counter:
return current_counter
self._log(
f"Counter changed: {self.last_counter} -> {current_counter}. "
"Fetching trackmap data..."
)
self.last_counter = current_counter
self.last_update = datetime.now().isoformat()
if not self.trackmap_nodes:
self._log("No trackmap nodes configured skipping DB update")
return current_counter
# Read all configured nodes
data: Dict[str, Any] = {}
for col, node_id in self.trackmap_nodes.items():
try:
node = client.get_node(node_id)
data[col] = node.get_value()
except Exception as exc:
self._log(f"Failed to read node {node_id}: {exc}")
if not data:
return current_counter
# Determine POSITION from data (must be one of the mapped columns)
position = data.get("position")
if position is None:
self._log("'position' not in trackmap_nodes data cannot update row")
return current_counter
await self._update_oracle(position, data)
return current_counter
# ------------------------------------------------------------------
async def _check_signals(self, client, current_counter):
"""Check signal1 and signal2 with state machine and counter validation."""
try:
signal1_node = client.get_node(self.signal1_node)
signal1_value = signal1_node.get_value()
except Exception as exc:
self._log(f"Failed to read signal1 node: {exc}")
signal1_value = None
try:
signal2_node = client.get_node(self.signal2_node)
signal2_value = signal2_node.get_value()
except Exception as exc:
self._log(f"Failed to read signal2 node: {exc}")
signal2_value = None
counter_changed = current_counter != self.last_counter_at_state_change
# State machine for signal processing
if self.track_state == "WAIT_S1":
if signal1_value is not None and self.signal1_last is not None:
if self.signal1_last == 0 and signal1_value == 1 and counter_changed:
self._log(f"Signal1: Entry coil triggered (0->1) with counter change, state={self.track_state}")
await self._handle_signal1()
self.track_state = "WAIT_S2"
self.last_counter_at_state_change = current_counter
elif self.track_state == "WAIT_S2":
if signal2_value is not None and self.signal2_last is not None:
if self.signal2_last == 0 and signal2_value == 1 and counter_changed:
self._log("Signal2: Weld done rising edge (0->1) with counter change, triggering update")
await self._handle_signal2()
self.track_state = "WAIT_S1"
self.signal2_rise_time = None
self.last_counter_at_state_change = current_counter
elif self.signal2_last == 1 and signal2_value == 0:
self.signal2_rise_time = None
if signal1_value is not None:
self.signal1_last = signal1_value
if signal2_value is not None:
self.signal2_last = signal2_value
async def _check_write_trigger(self, client):
"""Check write trigger counter/source/target and write PDI head coil to OPC."""
if not (self.write_counter_node and self.write_source_node and self.write_target_node):
return
try:
write_counter = client.get_node(self.write_counter_node).get_value()
write_source = client.get_node(self.write_source_node).get_value()
write_target = client.get_node(self.write_target_node).get_value()
except Exception as exc:
self._log(f"Write trigger read failed: {exc}")
return
if self.write_counter_last is None:
self.write_counter_last = write_counter
return
if write_counter == self.write_counter_last:
return
old_counter = self.write_counter_last
self.write_counter_last = write_counter
self._log(
f"Write counter changed: {old_counter} -> {write_counter}, "
f"source={write_source}, target={write_target}"
)
try:
source_value = int(write_source)
target_value = int(write_target)
except (TypeError, ValueError):
self._log(f"Write trigger invalid source/target values: source={write_source}, target={write_target}")
return
if source_value != 100 or target_value not in (1, 2):
return
await self._write_entry_coil_to_uncoiler(client, target_value)
async def _write_entry_coil_to_uncoiler(self, client, target_uncoiler: int):
"""Write the smallest COILID plan to target uncoiler OPC nodes."""
def _load_next_plan():
from database import get_connection
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute(
"""
SELECT
COILID,
ENTRY_COIL_WEIGHT,
ENTRY_OF_COIL_LENGTH,
ENTRY_COIL_WIDTH,
ENTRY_COIL_THICKNESS,
ENTRY_OF_COIL_INNER_DIAMETER,
ENTRY_OF_COIL_OUTER_DIAMETER,
STEEL_GRADE
FROM PLTM.PDI_PLTM
ORDER BY COILID ASC
"""
)
row = cursor.fetchone()
if not row:
return None
return {
"coilid": row[0],
"entry_coil_weight": row[1],
"entry_of_coil_length": row[2],
"entry_coil_width": row[3],
"entry_coil_thickness": row[4],
"entry_of_coil_inner_diameter": row[5],
"entry_of_coil_outer_diameter": row[6],
"alloy_code": row[7],
"material": row[7],
}
finally:
cursor.close()
conn.close()
loop = asyncio.get_event_loop()
plan = await loop.run_in_executor(None, _load_next_plan)
if not plan:
self._log("Write trigger matched but PDI has no plan data")
return
target_cfg = self.write_nodes.get(str(target_uncoiler), {}) or {}
if not target_cfg:
self._log(f"Write nodes for uncoiler {target_uncoiler} not configured")
return
# setup_data_revision: 每次写入 +1
revision_node = target_cfg.get("setup_data_revision")
if revision_node:
try:
rev_value = client.get_node(revision_node).get_value()
next_rev = int(rev_value or 0) + 1
await self._write_node_value_by_client(client, revision_node, next_rev, "Int32")
except Exception as exc:
self._log(f"Write setup_data_revision failed (U{target_uncoiler}): {exc}")
field_variant = {
"coilid": "String",
"entry_coil_weight": "Float",
"entry_of_coil_length": "Float",
"entry_coil_width": "Float",
"entry_coil_thickness": "Float",
"entry_of_coil_inner_diameter": "Float",
"entry_of_coil_outer_diameter": "Float",
"alloy_code": "String",
"material": "String",
}
for field, value in plan.items():
node_id = target_cfg.get(field)
if not node_id or value is None:
continue
try:
await self._write_node_value_by_client(
client,
node_id,
value,
field_variant.get(field),
)
except Exception as exc:
self._log(
f"Write field failed (U{target_uncoiler}, {field}, node={node_id}): {exc}"
)
self._log(
f"Wrote plan COILID={plan['coilid']} to uncoiler {target_uncoiler}"
)
async def _write_node_value_by_client(
self,
client,
node_id: str,
value: Any,
variant_type: Optional[str] = None,
):
"""Write OPC node value using an existing client connection."""
from opcua import ua # type: ignore
node = client.get_node(node_id)
vt = self._normalize_variant_type(variant_type)
if vt is None:
node.set_value(value)
else:
node.set_value(ua.DataValue(ua.Variant(value, vt)))
async def _handle_signal1(self):
"""Handle signal1: fetch next 4 coils from Oracle PDI table and save to SQLite temp table."""
from sqlite_sync import sqlite_save_coils_to_track, sqlite_clear_coil_track
def _do_fetch():
# 如果追踪已结束,检查是否有新钢卷需要恢复追踪
self._log(f"Signal1: Called, first_coilid={repr(self.first_coilid)}, tracking_ended={self.tracking_ended}")
if self.tracking_ended:
try:
from database import get_connection
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT COILID FROM PLTM.PDI_PLTM ORDER BY COILID ASC")
all_coil_ids = [r[0] for r in cursor.fetchall()]
# 使用 end_coilid 判断是否有新钢卷
check_coilid = self.end_coilid or self.last_tracked_coilid
if all_coil_ids and check_coilid:
has_new = any(cid > check_coilid for cid in all_coil_ids)
if has_new:
self._log("Signal1: New coils detected, resuming tracking")
self.tracking_ended = False
# 从最后一个追踪的钢卷之后继续
self.first_coilid = check_coilid
else:
self._log("Signal1: Tracking ended, no new coils, skip")
return
elif all_coil_ids and not check_coilid:
# 没有结束点记录但有计划,按首卷重新启动追踪
self._log("Signal1: Tracking ended without checkpoint, restarting from first plan")
self.tracking_ended = False
self.first_coilid = ""
else:
# 没有计划,保持结束状态
self._log("Signal1: Tracking ended, PDI empty, skip")
return
finally:
cursor.close()
conn.close()
except Exception as exc:
self._log(f"Signal1: Check new coils failed: {exc}")
return
try:
from database import get_connection
conn = get_connection()
cursor = conn.cursor()
try:
if self.first_coilid:
cursor.execute("""
SELECT COILID, SEQUENCENB, ROLLPROGRAMNB
FROM PLTM.PDI_PLTM
WHERE COILID >= :start_coilid
ORDER BY COILID ASC
""", {"start_coilid": self.first_coilid})
else:
cursor.execute("""
SELECT COILID, SEQUENCENB, ROLLPROGRAMNB
FROM PLTM.PDI_PLTM
ORDER BY COILID ASC
""")
rows = cursor.fetchmany(4)
coils = [{"coilid": r[0], "sequencenb": r[1], "rollprogramnb": r[2]} for r in rows]
finally:
cursor.close()
conn.close()
except Exception as exc:
self._log(f"Signal1: Failed to fetch from Oracle: {exc}")
return
if len(coils) == 0:
self._log("Signal1: No more coils in PDI, ending tracking")
sqlite_clear_coil_track()
self.tracking_ended = True
return
if len(coils) == 1 and coils[0]["coilid"] == self.first_coilid:
self._log("Signal1: Only the same coil exists, saving it and setting for next query")
self.signal1_coils = coils
sqlite_save_coils_to_track(coils)
# 保持 first_coilid 不变,下次查询会从同一个位置继续
# Signal2 触发后,检查是否还有更多钢卷
return
self.signal1_coils = coils
sqlite_save_coils_to_track(coils)
if len(coils) >= 2:
self.first_coilid = coils[1]["coilid"]
else:
self.first_coilid = coils[0]["coilid"]
self.last_tracked_coilid = coils[0]["coilid"]
self._log(f"Signal1: Saved {len(coils)} coils, next_start: {self.first_coilid}, last_tracked: {self.last_tracked_coilid}")
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _do_fetch)
async def _handle_signal2(self):
"""Handle signal2: update CMPT_PL_TRACKMAP with temp table data."""
from sqlite_sync import sqlite_clear_coil_track, sqlite_get_coil_track
from database import get_connection
def _do_update():
coils = sqlite_get_coil_track()
try:
conn = get_connection()
cursor = conn.cursor()
try:
if not coils:
self._log("Signal2: No coils in temp track table, clearing all positions")
for pos in range(1, 11):
cursor.execute("SELECT COUNT(*) FROM PLTM.CMPT_PL_TRACKMAP WHERE POSITION = :pos", {"pos": pos})
if cursor.fetchone()[0] > 0:
cursor.execute("UPDATE PLTM.CMPT_PL_TRACKMAP SET COILID = NULL, TOM = SYSDATE WHERE POSITION = :pos", {"pos": pos})
conn.commit()
# 标记追踪结束
if self.signal1_coils:
self.end_coilid = self.signal1_coils[0]["coilid"]
self.tracking_ended = True
# 清空 first_coilid下次 Signal1 触发时从头查询
self.first_coilid = ""
self._log(f"Signal2: Tracking ended (no coils), end_coilid={self.end_coilid}")
return
cursor.execute("SELECT COILID FROM PLTM.CMPT_PL_TRACKMAP WHERE POSITION = 1")
prev_pos1_coil = cursor.fetchone()
prev_pos1_coilid = prev_pos1_coil[0] if prev_pos1_coil else None
coil_count = len(coils)
for i in range(4):
target_position = i + 1
coil_index = target_position - 1
if coil_index >= 0 and coil_index < coil_count:
coil = coils[coil_index]
cursor.execute("SELECT COUNT(*) FROM PLTM.CMPT_PL_TRACKMAP WHERE POSITION = :pos", {"pos": target_position})
exists = cursor.fetchone()[0] > 0
if exists:
cursor.execute("""
UPDATE PLTM.CMPT_PL_TRACKMAP
SET COILID = :coilid, TOM = SYSDATE
WHERE POSITION = :position
""", {"coilid": coil["coilid"], "position": target_position})
else:
cursor.execute("""
INSERT INTO PLTM.CMPT_PL_TRACKMAP (POSITION, COILID, TOM)
VALUES (:position, :coilid, SYSDATE)
""", {"position": target_position, "coilid": coil["coilid"]})
else:
cursor.execute("SELECT COUNT(*) FROM PLTM.CMPT_PL_TRACKMAP WHERE POSITION = :pos", {"pos": target_position})
exists = cursor.fetchone()[0] > 0
if exists:
cursor.execute("""
UPDATE PLTM.CMPT_PL_TRACKMAP
SET COILID = NULL, TOM = SYSDATE
WHERE POSITION = :position
""", {"position": target_position})
else:
cursor.execute("""
INSERT INTO PLTM.CMPT_PL_TRACKMAP (POSITION, COILID, TOM)
VALUES (:position, NULL, SYSDATE)
""", {"position": target_position})
for pos in range(5, 11):
cursor.execute("SELECT COUNT(*) FROM PLTM.CMPT_PL_TRACKMAP WHERE POSITION = :pos", {"pos": pos})
if cursor.fetchone()[0] > 0:
cursor.execute("UPDATE PLTM.CMPT_PL_TRACKMAP SET COILID = NULL, TOM = SYSDATE WHERE POSITION = :pos", {"pos": pos})
new_pos1_coil = coils[0]["coilid"] if coils else None
pos1_unchanged = prev_pos1_coilid and new_pos1_coil and prev_pos1_coilid == new_pos1_coil
if pos1_unchanged:
# 只在“末卷(单卷)重复信号”时清空,避免 6-7 阶段提前进入清空/重写循环
if coil_count == 1:
for pos in range(1, 11):
cursor.execute("SELECT COUNT(*) FROM PLTM.CMPT_PL_TRACKMAP WHERE POSITION = :pos", {"pos": pos})
if cursor.fetchone()[0] > 0:
cursor.execute("UPDATE PLTM.CMPT_PL_TRACKMAP SET COILID = NULL, TOM = SYSDATE WHERE POSITION = :pos", {"pos": pos})
sqlite_clear_coil_track()
self.end_coilid = coils[0]["coilid"]
self.last_tracked_coilid = self.end_coilid
self.tracking_ended = True
self.first_coilid = ""
self._log("Signal2: Final single coil repeated, cleared all positions and ended tracking")
else:
self._log("Signal2: Coil at position 1 unchanged but not final single-coil stage, keep tracking")
conn.commit()
self._log(f"Signal2: Updated 4 positions (coils: {coil_count})")
# 检查是否还有更多钢卷可以追踪
self._log(f"Signal2: Checking remaining coils, first_coilid={repr(self.first_coilid)}")
if self.first_coilid:
try:
cursor.execute("""
SELECT COILID FROM PLTM.PDI_PLTM
WHERE COILID >= :start_coilid
ORDER BY COILID ASC
""", {"start_coilid": self.first_coilid})
all_remaining = cursor.fetchall()
remaining_count = len(all_remaining)
self._log(f"Signal2: Check remaining after {self.first_coilid}: count={remaining_count}, coils={[r[0] for r in all_remaining]}")
if remaining_count == 0:
# 没有更多钢卷了,标记追踪结束
self.end_coilid = self.first_coilid
self.last_tracked_coilid = self.first_coilid
self.tracking_ended = True
self._log(f"Signal2: No more coils, tracking ended, end_coilid={self.end_coilid}")
elif remaining_count >= 2:
# 有2个以上钢卷保持 first_coilid 不变,让 Signal1 处理
self._log(f"Signal2: >=2 coils remain, first_coilid unchanged, waiting for Signal1")
else:
# 只有一个钢卷时先展示为“7(1个)”,下一次重复信号再清空到 NULL
self._log(
f"Signal2: One coil remains ({all_remaining[0][0]}), "
"waiting one more cycle before final clear"
)
except Exception as exc:
self._log(f"Signal2: Check next coils failed: {exc}")
finally:
cursor.close()
conn.close()
except Exception as exc:
self._log(f"Signal2: Oracle update failed: {exc}")
import traceback
self._log(f"Signal2 traceback: {traceback.format_exc()}")
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _do_update)
# ------------------------------------------------------------------
async def _update_oracle(self, position: Any, data: Dict[str, Any]):
"""Write fetched OPC values into PLTM.CMPT_PL_TRACKMAP."""
import threading
def _do_update():
try:
from database import get_connection
conn = get_connection()
cursor = conn.cursor()
try:
updatable = {k: v for k, v in data.items() if k != "position"}
if not updatable:
return
set_clause = ", ".join(
[f"{k.upper()} = :{k}" for k in updatable.keys()]
)
updatable["position_"] = position
sql = (
f"UPDATE PLTM.CMPT_PL_TRACKMAP "
f"SET {set_clause} WHERE POSITION = :position_"
)
cursor.execute(sql, updatable)
conn.commit()
self._log(
f"Updated CMPT_PL_TRACKMAP POSITION={position}: "
+ ", ".join(f"{k}={v}" for k, v in updatable.items() if k != "position_")
)
finally:
cursor.close()
conn.close()
except Exception as exc:
self._log(f"Oracle update failed: {exc}")
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _do_update)
# ------------------------------------------------------------------
def _normalize_variant_type(self, variant_type: Optional[str]):
"""Convert a variant type string to opcua.ua.VariantType."""
if not variant_type:
return None
from opcua import ua # type: ignore
vt_name = str(variant_type).strip()
if not vt_name:
return None
try:
return getattr(ua.VariantType, vt_name)
except AttributeError:
raise ValueError(
f"Unsupported OPC VariantType: {variant_type}. "
"Example: Boolean/Int16/Int32/Float/Double/String"
)
async def write_node_value(self, node_id: str, value: Any, variant_type: Optional[str] = None):
"""Connect to OPC server and write value to the specified node."""
if not node_id:
raise ValueError("node_id is required")
try:
from opcua import Client, ua # type: ignore
except ImportError as exc:
raise RuntimeError("opcua package not installed") from exc
def _do_write():
client = Client(self.opc_url)
try:
client.connect()
node = client.get_node(node_id)
vt = self._normalize_variant_type(variant_type)
if vt is None:
node.set_value(value)
else:
node.set_value(ua.DataValue(ua.Variant(value, vt)))
self._log(
f"Wrote OPC value success: node={node_id}, value={value}, "
f"variant_type={variant_type or 'auto'}"
)
return {
"ok": True,
"node_id": node_id,
"value": value,
"variant_type": variant_type or "auto",
"opc_url": self.opc_url,
"timestamp": datetime.now().isoformat(),
}
except Exception as exc:
self._log(f"Write OPC value failed: node={node_id}, error={exc}")
raise
finally:
try:
client.disconnect()
except Exception:
pass
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _do_write)
def _parse_s7_address(self, address: str) -> Tuple[int, int]:
"""Parse S7 DB address: DB{db}.{type}{byte}[.{bit}] -> (db_number, start_byte)."""
if not address:
raise ValueError("address is required")
# Supported examples:
# DB1.DBX0.0 / DB1.DBB2 / DB1.DBW4 / DB1.DBD8
# DB1.X0.0 / DB1.B2 / DB1.W4 / DB1.D8
pattern = r"^DB(\d+)\.(?:DB)?(?:X|B|W|D)(\d+)(?:\.\d+)?$"
m = re.match(pattern, address.strip(), re.IGNORECASE)
if not m:
raise ValueError(
"Unsupported S7 address format. "
"Examples: DB1.DBX0.0, DB1.DBB2, DB1.DBW4, DB1.DBD8"
)
db_number = int(m.group(1))
start_byte = int(m.group(2))
return db_number, start_byte
def _encode_s7_value(self, value: Any, data_type: str, data_length: int) -> bytes:
"""Encode python value to bytes for snap7 DB write."""
import struct
dt = (data_type or "").strip().upper()
if not dt:
raise ValueError("data_type is required")
if dt in ("BOOL", "BOOLEAN"):
if data_length != 1:
raise ValueError("BOOL data_length must be 1")
return bytes([1 if bool(value) else 0])
if dt in ("BYTE", "UINT8", "INT8"):
if data_length != 1:
raise ValueError(f"{dt} data_length must be 1")
iv = int(value)
if dt == "INT8":
return struct.pack(">b", iv)
return struct.pack(">B", iv)
if dt in ("INT16", "WORD", "UINT16"):
if data_length != 2:
raise ValueError(f"{dt} data_length must be 2")
iv = int(value)
if dt == "INT16":
return struct.pack(">h", iv)
return struct.pack(">H", iv)
if dt in ("INT32", "DINT", "UINT32"):
if data_length != 4:
raise ValueError(f"{dt} data_length must be 4")
iv = int(value)
if dt == "INT32" or dt == "DINT":
return struct.pack(">i", iv)
return struct.pack(">I", iv)
if dt in ("REAL", "FLOAT"):
if data_length != 4:
raise ValueError(f"{dt} data_length must be 4")
return struct.pack(">f", float(value))
if dt in ("LREAL", "DOUBLE"):
if data_length != 8:
raise ValueError(f"{dt} data_length must be 8")
return struct.pack(">d", float(value))
if dt in ("STRING", "BYTES"):
if data_length < 1:
raise ValueError(f"{dt} data_length must be >= 1")
raw = value.encode("utf-8") if isinstance(value, str) else bytes(value)
if len(raw) > data_length:
raise ValueError(
f"value bytes length {len(raw)} exceeds data_length {data_length}"
)
return raw.ljust(data_length, b"\x00")
raise ValueError(
"Unsupported S7 data_type. "
"Example: BOOL/BYTE/INT16/UINT16/INT32/UINT32/REAL/LREAL/STRING"
)
async def write_s7_value(
self,
ip: str,
address: str,
data_length: int,
data_type: str,
value: Any,
rack: int = 0,
slot: int = 1,
tcp_port: int = 102,
):
"""Connect via S7 protocol and write value to a DB address."""
if not ip:
raise ValueError("ip is required")
if not address:
raise ValueError("address is required")
if not data_type:
raise ValueError("data_type is required")
if not isinstance(data_length, int) or data_length <= 0:
raise ValueError("data_length must be a positive integer")
try:
import snap7 # type: ignore
except ImportError as exc:
raise RuntimeError("python-snap7 package not installed") from exc
db_number, start_byte = self._parse_s7_address(address)
payload = self._encode_s7_value(value, data_type, data_length)
def _do_write_s7():
client = snap7.client.Client()
try:
client.set_connection_params(ip, 0x0100, 0x0100)
client.set_connection_type(3)
client.connect(ip, rack, slot, tcp_port)
if not client.get_connected():
raise RuntimeError("S7 connect failed")
client.db_write(db_number, start_byte, payload)
self._log(
"Wrote S7 value success: "
f"ip={ip}, address={address}, data_type={data_type}, "
f"data_length={data_length}, value={value}"
)
return {
"ok": True,
"ip": ip,
"address": address,
"data_type": data_type,
"data_length": data_length,
"value": value,
"rack": rack,
"slot": slot,
"tcp_port": tcp_port,
"timestamp": datetime.now().isoformat(),
}
except Exception as exc:
self._log(f"Write S7 value failed: ip={ip}, address={address}, error={exc}")
raise
finally:
try:
client.disconnect()
except Exception:
pass
try:
client.destroy()
except Exception:
pass
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _do_write_s7)
# ------------------------------------------------------------------
def _parse_s7_endpoint(self, endpoint: str):
"""Parse S7 endpoint in format 'ip:port'."""
if not endpoint:
raise ValueError("endpoint is required, format: ip:port")
raw = str(endpoint).strip()
if ":" not in raw:
raise ValueError("endpoint format invalid, expected ip:port")
ip, port_str = raw.rsplit(":", 1)
ip = ip.strip()
port_str = port_str.strip()
if not ip:
raise ValueError("endpoint ip is empty")
if not port_str.isdigit():
raise ValueError("endpoint port must be numeric")
port = int(port_str)
if port < 1 or port > 65535:
raise ValueError("endpoint port out of range: 1-65535")
return ip, port
def _parse_s7_address(self, address: str):
"""
Parse S7 DB address.
Supported examples:
- DB1.DBX0.0
- DB1.DBB2
- DB1.DBW4
- DB1.DBD8
- DB1.X0.0 / DB1.B2 / DB1.W4 / DB1.D8
"""
if not address:
raise ValueError("address is required")
raw = str(address).strip().upper()
if "." not in raw or not raw.startswith("DB"):
raise ValueError(f"Unsupported S7 address format: {address}")
db_part, area_part = raw.split(".", 1)
try:
db_number = int(db_part[2:])
except ValueError as exc:
raise ValueError(f"Invalid DB number in address: {address}") from exc
if area_part.startswith("DB"):
area_part = area_part[2:]
area_code = area_part[:1]
offset_part = area_part[1:]
bit_index = None
if area_code == "X":
if "." not in offset_part:
raise ValueError(f"BOOL address requires bit index, e.g. DB1.DBX0.0: {address}")
byte_str, bit_str = offset_part.split(".", 1)
byte_offset = int(byte_str)
bit_index = int(bit_str)
if bit_index < 0 or bit_index > 7:
raise ValueError("bit index out of range: 0-7")
else:
if "." in offset_part:
raise ValueError(f"Only X area may include bit index: {address}")
byte_offset = int(offset_part)
if byte_offset < 0:
raise ValueError("byte offset must be >= 0")
return db_number, area_code, byte_offset, bit_index
def _encode_s7_value(self, value: Any, data_type: str, data_length: int) -> bytes:
"""Encode python value into S7 byte payload."""
import struct
if data_length <= 0:
raise ValueError("data_length must be > 0")
dt = str(data_type or "").strip().upper()
if not dt:
raise ValueError("data_type is required")
if dt == "BOOL":
if data_length != 1:
raise ValueError("BOOL data_length must be 1")
return b"\x01" if bool(value) else b"\x00"
if dt in ("BYTE", "INT8"):
if data_length != 1:
raise ValueError(f"{dt} data_length must be 1")
return int(value).to_bytes(1, byteorder="big", signed=(dt == "INT8"))
if dt in ("UINT16", "WORD"):
if data_length != 2:
raise ValueError(f"{dt} data_length must be 2")
return int(value).to_bytes(2, byteorder="big", signed=False)
if dt == "INT16":
if data_length != 2:
raise ValueError("INT16 data_length must be 2")
return int(value).to_bytes(2, byteorder="big", signed=True)
if dt in ("UINT32", "DWORD"):
if data_length != 4:
raise ValueError(f"{dt} data_length must be 4")
return int(value).to_bytes(4, byteorder="big", signed=False)
if dt in ("INT32", "DINT"):
if data_length != 4:
raise ValueError(f"{dt} data_length must be 4")
return int(value).to_bytes(4, byteorder="big", signed=True)
if dt in ("REAL", "FLOAT"):
if data_length != 4:
raise ValueError(f"{dt} data_length must be 4")
return struct.pack(">f", float(value))
if dt in ("LREAL", "DOUBLE"):
if data_length != 8:
raise ValueError(f"{dt} data_length must be 8")
return struct.pack(">d", float(value))
if dt in ("STRING", "BYTES"):
if isinstance(value, bytes):
raw = value
else:
raw = str(value).encode("utf-8")
if len(raw) > data_length:
raise ValueError(f"value bytes length {len(raw)} exceeds data_length {data_length}")
return raw.ljust(data_length, b"\x00")
raise ValueError(
f"Unsupported data_type: {data_type}. "
"Supported: BOOL/BYTE/INT8/UINT16/WORD/INT16/UINT32/DWORD/INT32/DINT/REAL/FLOAT/LREAL/DOUBLE/STRING/BYTES"
)
async def write_s7_value(
self,
endpoint: str,
address: str,
data_length: int,
data_type: str,
value: Any,
rack: int = 0,
slot: int = 1,
):
"""Connect via S7 and write value using endpoint(ip:port), address, length, and type."""
try:
import snap7 # type: ignore
from snap7.type import Areas # type: ignore
except ImportError as exc:
raise RuntimeError("python-snap7 package not installed") from exc
ip, tcp_port = self._parse_s7_endpoint(endpoint)
db_number, area_code, byte_offset, bit_index = self._parse_s7_address(address)
payload = self._encode_s7_value(value, data_type, int(data_length))
def _do_write():
client = snap7.client.Client()
try:
client.set_connection_type(3)
client.connect(ip, int(rack), int(slot), int(tcp_port))
if not client.get_connected():
raise RuntimeError(f"S7 connect failed: {endpoint}")
if area_code == "X":
current = client.db_read(db_number, byte_offset, 1)
mask = 1 << int(bit_index)
if bool(value):
current[0] = current[0] | mask
else:
current[0] = current[0] & (0xFF ^ mask)
client.db_write(db_number, byte_offset, current)
else:
client.db_write(db_number, byte_offset, payload)
self._log(
f"Wrote S7 value success: endpoint={endpoint}, address={address}, "
f"value={value}, data_type={data_type}, data_length={data_length}"
)
return {
"ok": True,
"endpoint": endpoint,
"ip": ip,
"port": tcp_port,
"address": address,
"data_type": data_type,
"data_length": data_length,
"value": value,
"timestamp": datetime.now().isoformat(),
}
except Exception as exc:
self._log(
f"Write S7 value failed: endpoint={endpoint}, address={address}, error={exc}"
)
raise
finally:
try:
client.disconnect()
except Exception:
pass
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _do_write)
# ------------------------------------------------------------------
async def _simulate_loop(self):
"""Simulation mode when opcua is not installed."""
counter = 0
while not self._stop_event.is_set():
await asyncio.sleep(self.poll_interval)
counter += 1
self.last_counter = counter
self.last_update = datetime.now().isoformat()
self._log(f"[SIM] Counter tick: {counter}")
self.running = False
opc_service = OpcService()