diff --git a/backend/opc_service.py b/backend/opc_service.py index 97a4526..0d2cd37 100644 --- a/backend/opc_service.py +++ b/backend/opc_service.py @@ -1,6 +1,18 @@ """ OPC-UA polling service. +# 调用示例(OPC-UA 写入) +# await opc_service.write_node_value("ns=2;s=PL.TEST.COUNT", 123, "Int32") +# +# 调用示例(S7 写入) +# await opc_service.write_s7_value( +# endpoint="192.168.0.10:102", +# address="DB1.DBW4", +# data_length=2, +# data_type="INT16", +# value=123, +# ) + Logic: 1. Connect to the OPC-UA server at `opc_url`. 2. Read the counter node (`counter_node`) every `poll_interval` seconds. @@ -27,8 +39,9 @@ import asyncio import json import logging import os +import re from datetime import datetime -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple from dotenv import load_dotenv @@ -395,6 +408,438 @@ class OpcService: loop = asyncio.get_event_loop() await loop.run_in_executor(None, _do_update) + # ------------------------------------------------------------------ + def _normalize_variant_type(self, variant_type: Optional[str]): + """Convert a variant type string to opcua.ua.VariantType.""" + if not variant_type: + return None + from opcua import ua # type: ignore + + vt_name = str(variant_type).strip() + if not vt_name: + return None + + try: + return getattr(ua.VariantType, vt_name) + except AttributeError: + raise ValueError( + f"Unsupported OPC VariantType: {variant_type}. " + "Example: Boolean/Int16/Int32/Float/Double/String" + ) + + async def write_node_value(self, node_id: str, value: Any, variant_type: Optional[str] = None): + """Connect to OPC server and write value to the specified node.""" + if not node_id: + raise ValueError("node_id is required") + + try: + from opcua import Client, ua # type: ignore + except ImportError as exc: + raise RuntimeError("opcua package not installed") from exc + + def _do_write(): + client = Client(self.opc_url) + try: + client.connect() + node = client.get_node(node_id) + + vt = self._normalize_variant_type(variant_type) + if vt is None: + node.set_value(value) + else: + node.set_value(ua.DataValue(ua.Variant(value, vt))) + + self._log( + f"Wrote OPC value success: node={node_id}, value={value}, " + f"variant_type={variant_type or 'auto'}" + ) + return { + "ok": True, + "node_id": node_id, + "value": value, + "variant_type": variant_type or "auto", + "opc_url": self.opc_url, + "timestamp": datetime.now().isoformat(), + } + except Exception as exc: + self._log(f"Write OPC value failed: node={node_id}, error={exc}") + raise + finally: + try: + client.disconnect() + except Exception: + pass + + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, _do_write) + + def _parse_s7_address(self, address: str) -> Tuple[int, int]: + """Parse S7 DB address: DB{db}.{type}{byte}[.{bit}] -> (db_number, start_byte).""" + if not address: + raise ValueError("address is required") + + # Supported examples: + # DB1.DBX0.0 / DB1.DBB2 / DB1.DBW4 / DB1.DBD8 + # DB1.X0.0 / DB1.B2 / DB1.W4 / DB1.D8 + pattern = r"^DB(\d+)\.(?:DB)?(?:X|B|W|D)(\d+)(?:\.\d+)?$" + m = re.match(pattern, address.strip(), re.IGNORECASE) + if not m: + raise ValueError( + "Unsupported S7 address format. " + "Examples: DB1.DBX0.0, DB1.DBB2, DB1.DBW4, DB1.DBD8" + ) + + db_number = int(m.group(1)) + start_byte = int(m.group(2)) + return db_number, start_byte + + def _encode_s7_value(self, value: Any, data_type: str, data_length: int) -> bytes: + """Encode python value to bytes for snap7 DB write.""" + import struct + + dt = (data_type or "").strip().upper() + if not dt: + raise ValueError("data_type is required") + + if dt in ("BOOL", "BOOLEAN"): + if data_length != 1: + raise ValueError("BOOL data_length must be 1") + return bytes([1 if bool(value) else 0]) + + if dt in ("BYTE", "UINT8", "INT8"): + if data_length != 1: + raise ValueError(f"{dt} data_length must be 1") + iv = int(value) + if dt == "INT8": + return struct.pack(">b", iv) + return struct.pack(">B", iv) + + if dt in ("INT16", "WORD", "UINT16"): + if data_length != 2: + raise ValueError(f"{dt} data_length must be 2") + iv = int(value) + if dt == "INT16": + return struct.pack(">h", iv) + return struct.pack(">H", iv) + + if dt in ("INT32", "DINT", "UINT32"): + if data_length != 4: + raise ValueError(f"{dt} data_length must be 4") + iv = int(value) + if dt == "INT32" or dt == "DINT": + return struct.pack(">i", iv) + return struct.pack(">I", iv) + + if dt in ("REAL", "FLOAT"): + if data_length != 4: + raise ValueError(f"{dt} data_length must be 4") + return struct.pack(">f", float(value)) + + if dt in ("LREAL", "DOUBLE"): + if data_length != 8: + raise ValueError(f"{dt} data_length must be 8") + return struct.pack(">d", float(value)) + + if dt in ("STRING", "BYTES"): + if data_length < 1: + raise ValueError(f"{dt} data_length must be >= 1") + raw = value.encode("utf-8") if isinstance(value, str) else bytes(value) + if len(raw) > data_length: + raise ValueError( + f"value bytes length {len(raw)} exceeds data_length {data_length}" + ) + return raw.ljust(data_length, b"\x00") + + raise ValueError( + "Unsupported S7 data_type. " + "Example: BOOL/BYTE/INT16/UINT16/INT32/UINT32/REAL/LREAL/STRING" + ) + + async def write_s7_value( + self, + ip: str, + address: str, + data_length: int, + data_type: str, + value: Any, + rack: int = 0, + slot: int = 1, + tcp_port: int = 102, + ): + """Connect via S7 protocol and write value to a DB address.""" + if not ip: + raise ValueError("ip is required") + if not address: + raise ValueError("address is required") + if not data_type: + raise ValueError("data_type is required") + if not isinstance(data_length, int) or data_length <= 0: + raise ValueError("data_length must be a positive integer") + + try: + import snap7 # type: ignore + except ImportError as exc: + raise RuntimeError("python-snap7 package not installed") from exc + + db_number, start_byte = self._parse_s7_address(address) + payload = self._encode_s7_value(value, data_type, data_length) + + def _do_write_s7(): + client = snap7.client.Client() + try: + client.set_connection_params(ip, 0x0100, 0x0100) + client.set_connection_type(3) + client.connect(ip, rack, slot, tcp_port) + + if not client.get_connected(): + raise RuntimeError("S7 connect failed") + + client.db_write(db_number, start_byte, payload) + + self._log( + "Wrote S7 value success: " + f"ip={ip}, address={address}, data_type={data_type}, " + f"data_length={data_length}, value={value}" + ) + return { + "ok": True, + "ip": ip, + "address": address, + "data_type": data_type, + "data_length": data_length, + "value": value, + "rack": rack, + "slot": slot, + "tcp_port": tcp_port, + "timestamp": datetime.now().isoformat(), + } + except Exception as exc: + self._log(f"Write S7 value failed: ip={ip}, address={address}, error={exc}") + raise + finally: + try: + client.disconnect() + except Exception: + pass + try: + client.destroy() + except Exception: + pass + + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, _do_write_s7) + + # ------------------------------------------------------------------ + def _parse_s7_endpoint(self, endpoint: str): + """Parse S7 endpoint in format 'ip:port'.""" + if not endpoint: + raise ValueError("endpoint is required, format: ip:port") + + raw = str(endpoint).strip() + if ":" not in raw: + raise ValueError("endpoint format invalid, expected ip:port") + + ip, port_str = raw.rsplit(":", 1) + ip = ip.strip() + port_str = port_str.strip() + + if not ip: + raise ValueError("endpoint ip is empty") + if not port_str.isdigit(): + raise ValueError("endpoint port must be numeric") + + port = int(port_str) + if port < 1 or port > 65535: + raise ValueError("endpoint port out of range: 1-65535") + + return ip, port + + def _parse_s7_address(self, address: str): + """ + Parse S7 DB address. + + Supported examples: + - DB1.DBX0.0 + - DB1.DBB2 + - DB1.DBW4 + - DB1.DBD8 + - DB1.X0.0 / DB1.B2 / DB1.W4 / DB1.D8 + """ + if not address: + raise ValueError("address is required") + + raw = str(address).strip().upper() + if "." not in raw or not raw.startswith("DB"): + raise ValueError(f"Unsupported S7 address format: {address}") + + db_part, area_part = raw.split(".", 1) + try: + db_number = int(db_part[2:]) + except ValueError as exc: + raise ValueError(f"Invalid DB number in address: {address}") from exc + + if area_part.startswith("DB"): + area_part = area_part[2:] + + area_code = area_part[:1] + offset_part = area_part[1:] + + bit_index = None + if area_code == "X": + if "." not in offset_part: + raise ValueError(f"BOOL address requires bit index, e.g. DB1.DBX0.0: {address}") + byte_str, bit_str = offset_part.split(".", 1) + byte_offset = int(byte_str) + bit_index = int(bit_str) + if bit_index < 0 or bit_index > 7: + raise ValueError("bit index out of range: 0-7") + else: + if "." in offset_part: + raise ValueError(f"Only X area may include bit index: {address}") + byte_offset = int(offset_part) + + if byte_offset < 0: + raise ValueError("byte offset must be >= 0") + + return db_number, area_code, byte_offset, bit_index + + def _encode_s7_value(self, value: Any, data_type: str, data_length: int) -> bytes: + """Encode python value into S7 byte payload.""" + import struct + + if data_length <= 0: + raise ValueError("data_length must be > 0") + + dt = str(data_type or "").strip().upper() + if not dt: + raise ValueError("data_type is required") + + if dt == "BOOL": + if data_length != 1: + raise ValueError("BOOL data_length must be 1") + return b"\x01" if bool(value) else b"\x00" + + if dt in ("BYTE", "INT8"): + if data_length != 1: + raise ValueError(f"{dt} data_length must be 1") + return int(value).to_bytes(1, byteorder="big", signed=(dt == "INT8")) + + if dt in ("UINT16", "WORD"): + if data_length != 2: + raise ValueError(f"{dt} data_length must be 2") + return int(value).to_bytes(2, byteorder="big", signed=False) + + if dt == "INT16": + if data_length != 2: + raise ValueError("INT16 data_length must be 2") + return int(value).to_bytes(2, byteorder="big", signed=True) + + if dt in ("UINT32", "DWORD"): + if data_length != 4: + raise ValueError(f"{dt} data_length must be 4") + return int(value).to_bytes(4, byteorder="big", signed=False) + + if dt in ("INT32", "DINT"): + if data_length != 4: + raise ValueError(f"{dt} data_length must be 4") + return int(value).to_bytes(4, byteorder="big", signed=True) + + if dt in ("REAL", "FLOAT"): + if data_length != 4: + raise ValueError(f"{dt} data_length must be 4") + return struct.pack(">f", float(value)) + + if dt in ("LREAL", "DOUBLE"): + if data_length != 8: + raise ValueError(f"{dt} data_length must be 8") + return struct.pack(">d", float(value)) + + if dt in ("STRING", "BYTES"): + if isinstance(value, bytes): + raw = value + else: + raw = str(value).encode("utf-8") + if len(raw) > data_length: + raise ValueError(f"value bytes length {len(raw)} exceeds data_length {data_length}") + return raw.ljust(data_length, b"\x00") + + raise ValueError( + f"Unsupported data_type: {data_type}. " + "Supported: BOOL/BYTE/INT8/UINT16/WORD/INT16/UINT32/DWORD/INT32/DINT/REAL/FLOAT/LREAL/DOUBLE/STRING/BYTES" + ) + + async def write_s7_value( + self, + endpoint: str, + address: str, + data_length: int, + data_type: str, + value: Any, + rack: int = 0, + slot: int = 1, + ): + """Connect via S7 and write value using endpoint(ip:port), address, length, and type.""" + try: + import snap7 # type: ignore + from snap7.type import Areas # type: ignore + except ImportError as exc: + raise RuntimeError("python-snap7 package not installed") from exc + + ip, tcp_port = self._parse_s7_endpoint(endpoint) + db_number, area_code, byte_offset, bit_index = self._parse_s7_address(address) + + payload = self._encode_s7_value(value, data_type, int(data_length)) + + def _do_write(): + client = snap7.client.Client() + try: + client.set_connection_type(3) + client.connect(ip, int(rack), int(slot), int(tcp_port)) + + if not client.get_connected(): + raise RuntimeError(f"S7 connect failed: {endpoint}") + + if area_code == "X": + current = client.db_read(db_number, byte_offset, 1) + mask = 1 << int(bit_index) + if bool(value): + current[0] = current[0] | mask + else: + current[0] = current[0] & (0xFF ^ mask) + client.db_write(db_number, byte_offset, current) + else: + client.db_write(db_number, byte_offset, payload) + + self._log( + f"Wrote S7 value success: endpoint={endpoint}, address={address}, " + f"value={value}, data_type={data_type}, data_length={data_length}" + ) + + return { + "ok": True, + "endpoint": endpoint, + "ip": ip, + "port": tcp_port, + "address": address, + "data_type": data_type, + "data_length": data_length, + "value": value, + "timestamp": datetime.now().isoformat(), + } + except Exception as exc: + self._log( + f"Write S7 value failed: endpoint={endpoint}, address={address}, error={exc}" + ) + raise + finally: + try: + client.disconnect() + except Exception: + pass + + loop = asyncio.get_event_loop() + return await loop.run_in_executor(None, _do_write) + # ------------------------------------------------------------------ async def _simulate_loop(self): """Simulation mode when opcua is not installed."""