Files
tiandihe/backend/opc_service.py
Joshi 538401017a feat(钢卷跟踪): 从哪开始获取计划?一次性获取多少个计划?不同批次的顺序号相同怎么处理?
已实现的功能:
Systemcount+信号变化才算有效
状态机逻辑:信号1必须配合计数器变化才触发,然后等待信号2
信号2必须配合计数器变化+保持2秒才触发
第一批1-5,第二批2-6,第三批3-7
每次取5个钢卷,顺序号滑动+1
信号2触发时更新Oracle追踪表
OPC页面配置点位
信号1(入口钢卷)节点配置
信号2(焊接完成)节点配置
计数器节点配置
保存后自动重启OPC服务
前端操作中间表 
TrackCoil页面可增删改查临时表
可手动调整顺序
模拟信号1/信号2按钮可测试

- 后端新增钢卷跟踪相关API和数据库表
- 前端添加钢卷跟踪管理页面
- OPC服务增加信号节点监控和状态机处理
- 实现钢卷跟踪的自动更新逻辑
2026-04-11 14:52:47 +08:00

412 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
OPC-UA polling service.
Logic:
1. Connect to the OPC-UA server at `opc_url`.
2. Read the counter node (`counter_node`) every `poll_interval` seconds.
3. When the counter value changes, read the trackmap nodes listed in
`trackmap_nodes` (a dict: {oracle_column -> node_id}).
4. For each row returned by the trackmap nodes build an UPDATE statement
and apply it to PLTM.CMPT_PL_TRACKMAP.
5. Monitor two signal nodes:
- Signal1 (Entry): When 0->1, fetch next 5 coils from PDI and save to SQLite temp table
- Signal2 (WeldDone): When 0->1 and held for 2s, update CMPT_PL_TRACKMAP
`trackmap_nodes` example (stored in .env or configured via UI):
{
"COILID": "ns=2;s=PL.TRACKMAP.P01.COILID",
"BEF_ES": "ns=2;s=PL.TRACKMAP.P01.BEF_ES",
...
}
For multi-position setups, define one entry per position column and handle
positional logic accordingly. The current implementation reads a flat set of
nodes and stores them against the POSITION value also read from OPC.
"""
import asyncio
import json
import logging
import os
from datetime import datetime
from typing import Any, Dict, List, Optional
from dotenv import load_dotenv
load_dotenv()
logger = logging.getLogger(__name__)
class OpcService:
def __init__(self):
self.config_path = os.path.join(os.path.dirname(__file__), "opc_config.json")
self.opc_url: str = os.getenv("OPC_URL", "opc.tcp://192.168.1.100:4840")
self.counter_node: str = os.getenv(
"OPC_COUNTER_NODE", "ns=2;s=PL.TRACKMAP.COUNTER"
)
self.poll_interval: int = int(os.getenv("OPC_POLL_INTERVAL", "2"))
self.signal1_node: str = os.getenv("OPC_SIGNAL1_NODE", "ns=2;s=PL.Signal.EntryCoil")
self.signal2_node: str = os.getenv("OPC_SIGNAL2_NODE", "ns=2;s=PL.Signal.WeldDone")
self.signal1_last: Optional[int] = None
self.signal2_last: Optional[int] = None
self.signal2_rise_time: Optional[datetime] = None
self.signal1_coils: List[Dict[str, Any]] = []
self.current_seq_start: int = 1
# 状态机: WAIT_S1=等待信号1, WAIT_S2=等待信号2
self.track_state: str = "WAIT_S1"
self.last_counter_at_state_change: Optional[Any] = None
# Mapping: oracle_column_name -> OPC node id
# Populated from .env or via API
self.trackmap_nodes: Dict[str, str] = self._load_trackmap_nodes()
# Load persisted config if present
self._load_persisted_config()
self.running: bool = False
self.last_counter: Optional[Any] = None
self.last_update: Optional[str] = None
self.event_log: List[str] = []
self._stop_event = asyncio.Event()
self._task: Optional[asyncio.Task] = None
# ------------------------------------------------------------------
def _load_trackmap_nodes(self) -> Dict[str, str]:
"""Load trackmap node mapping from env vars prefixed OPC_NODE_."""
nodes = {}
for key, val in os.environ.items():
if key.startswith("OPC_NODE_"):
col = key[len("OPC_NODE_"):].lower()
nodes[col] = val
return nodes
def _load_persisted_config(self):
"""Load OPC config from local json file if exists."""
if not os.path.exists(self.config_path):
return
try:
with open(self.config_path, "r", encoding="utf-8") as f:
cfg = json.load(f)
self.opc_url = cfg.get("opc_url", self.opc_url)
self.counter_node = cfg.get("counter_node", self.counter_node)
self.poll_interval = int(cfg.get("poll_interval", self.poll_interval))
self.trackmap_nodes = cfg.get("trackmap_nodes", self.trackmap_nodes) or {}
self.signal1_node = cfg.get("signal1_node", self.signal1_node)
self.signal2_node = cfg.get("signal2_node", self.signal2_node)
self._log(f"Loaded OPC config from {self.config_path}")
except Exception as exc:
logger.warning("Failed to load OPC config %s: %s", self.config_path, exc)
def save_config(self):
"""Persist current OPC config to local json file."""
cfg = {
"opc_url": self.opc_url,
"counter_node": self.counter_node,
"poll_interval": self.poll_interval,
"trackmap_nodes": self.trackmap_nodes,
"signal1_node": self.signal1_node,
"signal2_node": self.signal2_node,
}
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
with open(self.config_path, "w", encoding="utf-8") as f:
json.dump(cfg, f, ensure_ascii=False, indent=2)
def _log(self, msg: str):
ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{ts}] {msg}"
logger.info(entry)
self.event_log.append(entry)
if len(self.event_log) > 500:
self.event_log = self.event_log[-500:]
# ------------------------------------------------------------------
async def start_polling(self):
if self.running:
return
self._stop_event.clear()
self._task = asyncio.create_task(self._poll_loop())
async def stop_polling(self):
self._stop_event.set()
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self.running = False
# ------------------------------------------------------------------
async def _poll_loop(self):
self.running = True
self._log(f"OPC polling started: {self.opc_url}")
try:
from opcua import Client # type: ignore
except ImportError:
self._log("opcua package not installed running in SIMULATION mode")
await self._simulate_loop()
return
while not self._stop_event.is_set():
try:
client = Client(self.opc_url)
client.connect()
self._log(f"Connected to OPC server: {self.opc_url}")
try:
while not self._stop_event.is_set():
current_counter = await self._tick_with_counter(client)
await self._check_signals(client, current_counter)
await asyncio.sleep(self.poll_interval)
finally:
client.disconnect()
self._log("Disconnected from OPC server")
except Exception as exc:
self._log(f"OPC connection error: {exc}. Retrying in 5s...")
await asyncio.sleep(5)
self.running = False
self._log("OPC polling stopped")
# ------------------------------------------------------------------
async def _tick_with_counter(self, client):
"""Read counter and update trackmap; return current_counter."""
try:
counter_node = client.get_node(self.counter_node)
current_counter = counter_node.get_value()
except Exception as exc:
self._log(f"Failed to read counter node: {exc}")
return None
if current_counter == self.last_counter:
return current_counter
self._log(
f"Counter changed: {self.last_counter} -> {current_counter}. "
"Fetching trackmap data..."
)
self.last_counter = current_counter
self.last_update = datetime.now().isoformat()
if not self.trackmap_nodes:
self._log("No trackmap nodes configured skipping DB update")
return current_counter
# Read all configured nodes
data: Dict[str, Any] = {}
for col, node_id in self.trackmap_nodes.items():
try:
node = client.get_node(node_id)
data[col] = node.get_value()
except Exception as exc:
self._log(f"Failed to read node {node_id}: {exc}")
if not data:
return current_counter
# Determine POSITION from data (must be one of the mapped columns)
position = data.get("position")
if position is None:
self._log("'position' not in trackmap_nodes data cannot update row")
return current_counter
await self._update_oracle(position, data)
return current_counter
# ------------------------------------------------------------------
async def _check_signals(self, client, current_counter):
"""Check signal1 and signal2 with state machine and counter validation."""
try:
signal1_node = client.get_node(self.signal1_node)
signal1_value = signal1_node.get_value()
except Exception as exc:
self._log(f"Failed to read signal1 node: {exc}")
signal1_value = None
try:
signal2_node = client.get_node(self.signal2_node)
signal2_value = signal2_node.get_value()
except Exception as exc:
self._log(f"Failed to read signal2 node: {exc}")
signal2_value = None
counter_changed = current_counter != self.last_counter_at_state_change
# State machine for signal processing
if self.track_state == "WAIT_S1":
if signal1_value is not None and self.signal1_last is not None:
if self.signal1_last == 0 and signal1_value == 1 and counter_changed:
self._log(f"Signal1: Entry coil triggered (0->1) with counter change, state={self.track_state}")
await self._handle_signal1()
self.track_state = "WAIT_S2"
self.last_counter_at_state_change = current_counter
elif self.track_state == "WAIT_S2":
if signal2_value is not None and self.signal2_last is not None:
if self.signal2_last == 0 and signal2_value == 1 and counter_changed:
self.signal2_rise_time = datetime.now()
self._log("Signal2: Weld done rising edge (0->1) with counter change")
elif self.signal2_last == 1 and signal2_value == 0:
self.signal2_rise_time = None
if signal2_value == 1 and self.signal2_rise_time and counter_changed:
elapsed = (datetime.now() - self.signal2_rise_time).total_seconds()
if elapsed >= 2.0:
self._log(f"Signal2: Held {elapsed:.1f}s, triggering trackmap update")
await self._handle_signal2()
self.track_state = "WAIT_S1"
self.signal2_rise_time = None
self.last_counter_at_state_change = current_counter
if signal1_value is not None:
self.signal1_last = signal1_value
if signal2_value is not None:
self.signal2_last = signal2_value
async def _handle_signal1(self):
"""Handle signal1: fetch next 5 coils from PDI and save to SQLite temp table."""
from sqlite_sync import (
sqlite_get_max_sequencenb,
sqlite_get_coils_by_sequencenb_range,
sqlite_save_coils_to_track
)
def _do_fetch():
max_seq = sqlite_get_max_sequencenb()
if max_seq is None or max_seq < 1:
self._log("Signal1: No PDI data available")
return
start_seq = self.current_seq_start
end_seq = min(start_seq + 4, max_seq)
if end_seq < start_seq:
self._log(f"Signal1: Insufficient PDI data (max_seq={max_seq}, start={start_seq})")
return
coils = sqlite_get_coils_by_sequencenb_range(start_seq, end_seq)
if len(coils) == 0:
self._log(f"Signal1: No coils found")
return
self.signal1_coils = coils
sqlite_save_coils_to_track(coils)
self._log(f"Signal1: Saved {len(coils)} coils (seq {start_seq}-{end_seq}) to temp table")
self.current_seq_start = start_seq + 1
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _do_fetch)
async def _handle_signal2(self):
"""Handle signal2: update CMPT_PL_TRACKMAP with temp table data."""
from sqlite_sync import sqlite_get_coil_track
from database import get_connection
def _do_update():
coils = sqlite_get_coil_track()
if not coils:
self._log("Signal2: No coils in temp track table")
return
try:
conn = get_connection()
cursor = conn.cursor()
try:
coil_count = len(coils)
for i in range(5):
target_position = i + 1
coil_index = target_position - 1
if coil_index >= 0 and coil_index < coil_count:
coil = coils[coil_index]
cursor.execute("SELECT COUNT(*) FROM PLTM.CMPT_PL_TRACKMAP WHERE POSITION = :pos", {"pos": target_position})
exists = cursor.fetchone()[0] > 0
if exists:
cursor.execute("""
UPDATE PLTM.CMPT_PL_TRACKMAP
SET COILID = :coilid, TOM = SYSDATE
WHERE POSITION = :position
""", {"coilid": coil["coilid"], "position": target_position})
else:
cursor.execute("""
INSERT INTO PLTM.CMPT_PL_TRACKMAP (POSITION, COILID, TOM)
VALUES (:position, :coilid, SYSDATE)
""", {"position": target_position, "coilid": coil["coilid"]})
else:
cursor.execute("SELECT COUNT(*) FROM PLTM.CMPT_PL_TRACKMAP WHERE POSITION = :pos", {"pos": target_position})
exists = cursor.fetchone()[0] > 0
if exists:
cursor.execute("""
UPDATE PLTM.CMPT_PL_TRACKMAP
SET COILID = NULL, TOM = SYSDATE
WHERE POSITION = :position
""", {"position": target_position})
else:
cursor.execute("""
INSERT INTO PLTM.CMPT_PL_TRACKMAP (POSITION, COILID, TOM)
VALUES (:position, NULL, SYSDATE)
""", {"position": target_position})
conn.commit()
self._log(f"Signal2: Updated 5 positions (coils: {coil_count})")
finally:
cursor.close()
conn.close()
except Exception as exc:
self._log(f"Signal2: Oracle update failed: {exc}")
import traceback
self._log(f"Signal2 traceback: {traceback.format_exc()}")
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _do_update)
# ------------------------------------------------------------------
async def _update_oracle(self, position: Any, data: Dict[str, Any]):
"""Write fetched OPC values into PLTM.CMPT_PL_TRACKMAP."""
import threading
def _do_update():
try:
from database import get_connection
conn = get_connection()
cursor = conn.cursor()
try:
updatable = {k: v for k, v in data.items() if k != "position"}
if not updatable:
return
set_clause = ", ".join(
[f"{k.upper()} = :{k}" for k in updatable.keys()]
)
updatable["position_"] = position
sql = (
f"UPDATE PLTM.CMPT_PL_TRACKMAP "
f"SET {set_clause} WHERE POSITION = :position_"
)
cursor.execute(sql, updatable)
conn.commit()
self._log(
f"Updated CMPT_PL_TRACKMAP POSITION={position}: "
+ ", ".join(f"{k}={v}" for k, v in updatable.items() if k != "position_")
)
finally:
cursor.close()
conn.close()
except Exception as exc:
self._log(f"Oracle update failed: {exc}")
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _do_update)
# ------------------------------------------------------------------
async def _simulate_loop(self):
"""Simulation mode when opcua is not installed."""
counter = 0
while not self._stop_event.is_set():
await asyncio.sleep(self.poll_interval)
counter += 1
self.last_counter = counter
self.last_update = datetime.now().isoformat()
self._log(f"[SIM] Counter tick: {counter}")
self.running = False
opc_service = OpcService()