添加了两个写入方法

This commit is contained in:
2026-04-13 12:38:36 +08:00
parent 538401017a
commit c609934156

View File

@@ -1,6 +1,18 @@
"""
OPC-UA polling service.
# 调用示例OPC-UA 写入)
# await opc_service.write_node_value("ns=2;s=PL.TEST.COUNT", 123, "Int32")
#
# 调用示例S7 写入)
# await opc_service.write_s7_value(
# endpoint="192.168.0.10:102",
# address="DB1.DBW4",
# data_length=2,
# data_type="INT16",
# value=123,
# )
Logic:
1. Connect to the OPC-UA server at `opc_url`.
2. Read the counter node (`counter_node`) every `poll_interval` seconds.
@@ -27,8 +39,9 @@ import asyncio
import json
import logging
import os
import re
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, Tuple
from dotenv import load_dotenv
@@ -395,6 +408,438 @@ class OpcService:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, _do_update)
# ------------------------------------------------------------------
def _normalize_variant_type(self, variant_type: Optional[str]):
"""Convert a variant type string to opcua.ua.VariantType."""
if not variant_type:
return None
from opcua import ua # type: ignore
vt_name = str(variant_type).strip()
if not vt_name:
return None
try:
return getattr(ua.VariantType, vt_name)
except AttributeError:
raise ValueError(
f"Unsupported OPC VariantType: {variant_type}. "
"Example: Boolean/Int16/Int32/Float/Double/String"
)
async def write_node_value(self, node_id: str, value: Any, variant_type: Optional[str] = None):
"""Connect to OPC server and write value to the specified node."""
if not node_id:
raise ValueError("node_id is required")
try:
from opcua import Client, ua # type: ignore
except ImportError as exc:
raise RuntimeError("opcua package not installed") from exc
def _do_write():
client = Client(self.opc_url)
try:
client.connect()
node = client.get_node(node_id)
vt = self._normalize_variant_type(variant_type)
if vt is None:
node.set_value(value)
else:
node.set_value(ua.DataValue(ua.Variant(value, vt)))
self._log(
f"Wrote OPC value success: node={node_id}, value={value}, "
f"variant_type={variant_type or 'auto'}"
)
return {
"ok": True,
"node_id": node_id,
"value": value,
"variant_type": variant_type or "auto",
"opc_url": self.opc_url,
"timestamp": datetime.now().isoformat(),
}
except Exception as exc:
self._log(f"Write OPC value failed: node={node_id}, error={exc}")
raise
finally:
try:
client.disconnect()
except Exception:
pass
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _do_write)
def _parse_s7_address(self, address: str) -> Tuple[int, int]:
"""Parse S7 DB address: DB{db}.{type}{byte}[.{bit}] -> (db_number, start_byte)."""
if not address:
raise ValueError("address is required")
# Supported examples:
# DB1.DBX0.0 / DB1.DBB2 / DB1.DBW4 / DB1.DBD8
# DB1.X0.0 / DB1.B2 / DB1.W4 / DB1.D8
pattern = r"^DB(\d+)\.(?:DB)?(?:X|B|W|D)(\d+)(?:\.\d+)?$"
m = re.match(pattern, address.strip(), re.IGNORECASE)
if not m:
raise ValueError(
"Unsupported S7 address format. "
"Examples: DB1.DBX0.0, DB1.DBB2, DB1.DBW4, DB1.DBD8"
)
db_number = int(m.group(1))
start_byte = int(m.group(2))
return db_number, start_byte
def _encode_s7_value(self, value: Any, data_type: str, data_length: int) -> bytes:
"""Encode python value to bytes for snap7 DB write."""
import struct
dt = (data_type or "").strip().upper()
if not dt:
raise ValueError("data_type is required")
if dt in ("BOOL", "BOOLEAN"):
if data_length != 1:
raise ValueError("BOOL data_length must be 1")
return bytes([1 if bool(value) else 0])
if dt in ("BYTE", "UINT8", "INT8"):
if data_length != 1:
raise ValueError(f"{dt} data_length must be 1")
iv = int(value)
if dt == "INT8":
return struct.pack(">b", iv)
return struct.pack(">B", iv)
if dt in ("INT16", "WORD", "UINT16"):
if data_length != 2:
raise ValueError(f"{dt} data_length must be 2")
iv = int(value)
if dt == "INT16":
return struct.pack(">h", iv)
return struct.pack(">H", iv)
if dt in ("INT32", "DINT", "UINT32"):
if data_length != 4:
raise ValueError(f"{dt} data_length must be 4")
iv = int(value)
if dt == "INT32" or dt == "DINT":
return struct.pack(">i", iv)
return struct.pack(">I", iv)
if dt in ("REAL", "FLOAT"):
if data_length != 4:
raise ValueError(f"{dt} data_length must be 4")
return struct.pack(">f", float(value))
if dt in ("LREAL", "DOUBLE"):
if data_length != 8:
raise ValueError(f"{dt} data_length must be 8")
return struct.pack(">d", float(value))
if dt in ("STRING", "BYTES"):
if data_length < 1:
raise ValueError(f"{dt} data_length must be >= 1")
raw = value.encode("utf-8") if isinstance(value, str) else bytes(value)
if len(raw) > data_length:
raise ValueError(
f"value bytes length {len(raw)} exceeds data_length {data_length}"
)
return raw.ljust(data_length, b"\x00")
raise ValueError(
"Unsupported S7 data_type. "
"Example: BOOL/BYTE/INT16/UINT16/INT32/UINT32/REAL/LREAL/STRING"
)
async def write_s7_value(
self,
ip: str,
address: str,
data_length: int,
data_type: str,
value: Any,
rack: int = 0,
slot: int = 1,
tcp_port: int = 102,
):
"""Connect via S7 protocol and write value to a DB address."""
if not ip:
raise ValueError("ip is required")
if not address:
raise ValueError("address is required")
if not data_type:
raise ValueError("data_type is required")
if not isinstance(data_length, int) or data_length <= 0:
raise ValueError("data_length must be a positive integer")
try:
import snap7 # type: ignore
except ImportError as exc:
raise RuntimeError("python-snap7 package not installed") from exc
db_number, start_byte = self._parse_s7_address(address)
payload = self._encode_s7_value(value, data_type, data_length)
def _do_write_s7():
client = snap7.client.Client()
try:
client.set_connection_params(ip, 0x0100, 0x0100)
client.set_connection_type(3)
client.connect(ip, rack, slot, tcp_port)
if not client.get_connected():
raise RuntimeError("S7 connect failed")
client.db_write(db_number, start_byte, payload)
self._log(
"Wrote S7 value success: "
f"ip={ip}, address={address}, data_type={data_type}, "
f"data_length={data_length}, value={value}"
)
return {
"ok": True,
"ip": ip,
"address": address,
"data_type": data_type,
"data_length": data_length,
"value": value,
"rack": rack,
"slot": slot,
"tcp_port": tcp_port,
"timestamp": datetime.now().isoformat(),
}
except Exception as exc:
self._log(f"Write S7 value failed: ip={ip}, address={address}, error={exc}")
raise
finally:
try:
client.disconnect()
except Exception:
pass
try:
client.destroy()
except Exception:
pass
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _do_write_s7)
# ------------------------------------------------------------------
def _parse_s7_endpoint(self, endpoint: str):
"""Parse S7 endpoint in format 'ip:port'."""
if not endpoint:
raise ValueError("endpoint is required, format: ip:port")
raw = str(endpoint).strip()
if ":" not in raw:
raise ValueError("endpoint format invalid, expected ip:port")
ip, port_str = raw.rsplit(":", 1)
ip = ip.strip()
port_str = port_str.strip()
if not ip:
raise ValueError("endpoint ip is empty")
if not port_str.isdigit():
raise ValueError("endpoint port must be numeric")
port = int(port_str)
if port < 1 or port > 65535:
raise ValueError("endpoint port out of range: 1-65535")
return ip, port
def _parse_s7_address(self, address: str):
"""
Parse S7 DB address.
Supported examples:
- DB1.DBX0.0
- DB1.DBB2
- DB1.DBW4
- DB1.DBD8
- DB1.X0.0 / DB1.B2 / DB1.W4 / DB1.D8
"""
if not address:
raise ValueError("address is required")
raw = str(address).strip().upper()
if "." not in raw or not raw.startswith("DB"):
raise ValueError(f"Unsupported S7 address format: {address}")
db_part, area_part = raw.split(".", 1)
try:
db_number = int(db_part[2:])
except ValueError as exc:
raise ValueError(f"Invalid DB number in address: {address}") from exc
if area_part.startswith("DB"):
area_part = area_part[2:]
area_code = area_part[:1]
offset_part = area_part[1:]
bit_index = None
if area_code == "X":
if "." not in offset_part:
raise ValueError(f"BOOL address requires bit index, e.g. DB1.DBX0.0: {address}")
byte_str, bit_str = offset_part.split(".", 1)
byte_offset = int(byte_str)
bit_index = int(bit_str)
if bit_index < 0 or bit_index > 7:
raise ValueError("bit index out of range: 0-7")
else:
if "." in offset_part:
raise ValueError(f"Only X area may include bit index: {address}")
byte_offset = int(offset_part)
if byte_offset < 0:
raise ValueError("byte offset must be >= 0")
return db_number, area_code, byte_offset, bit_index
def _encode_s7_value(self, value: Any, data_type: str, data_length: int) -> bytes:
"""Encode python value into S7 byte payload."""
import struct
if data_length <= 0:
raise ValueError("data_length must be > 0")
dt = str(data_type or "").strip().upper()
if not dt:
raise ValueError("data_type is required")
if dt == "BOOL":
if data_length != 1:
raise ValueError("BOOL data_length must be 1")
return b"\x01" if bool(value) else b"\x00"
if dt in ("BYTE", "INT8"):
if data_length != 1:
raise ValueError(f"{dt} data_length must be 1")
return int(value).to_bytes(1, byteorder="big", signed=(dt == "INT8"))
if dt in ("UINT16", "WORD"):
if data_length != 2:
raise ValueError(f"{dt} data_length must be 2")
return int(value).to_bytes(2, byteorder="big", signed=False)
if dt == "INT16":
if data_length != 2:
raise ValueError("INT16 data_length must be 2")
return int(value).to_bytes(2, byteorder="big", signed=True)
if dt in ("UINT32", "DWORD"):
if data_length != 4:
raise ValueError(f"{dt} data_length must be 4")
return int(value).to_bytes(4, byteorder="big", signed=False)
if dt in ("INT32", "DINT"):
if data_length != 4:
raise ValueError(f"{dt} data_length must be 4")
return int(value).to_bytes(4, byteorder="big", signed=True)
if dt in ("REAL", "FLOAT"):
if data_length != 4:
raise ValueError(f"{dt} data_length must be 4")
return struct.pack(">f", float(value))
if dt in ("LREAL", "DOUBLE"):
if data_length != 8:
raise ValueError(f"{dt} data_length must be 8")
return struct.pack(">d", float(value))
if dt in ("STRING", "BYTES"):
if isinstance(value, bytes):
raw = value
else:
raw = str(value).encode("utf-8")
if len(raw) > data_length:
raise ValueError(f"value bytes length {len(raw)} exceeds data_length {data_length}")
return raw.ljust(data_length, b"\x00")
raise ValueError(
f"Unsupported data_type: {data_type}. "
"Supported: BOOL/BYTE/INT8/UINT16/WORD/INT16/UINT32/DWORD/INT32/DINT/REAL/FLOAT/LREAL/DOUBLE/STRING/BYTES"
)
async def write_s7_value(
self,
endpoint: str,
address: str,
data_length: int,
data_type: str,
value: Any,
rack: int = 0,
slot: int = 1,
):
"""Connect via S7 and write value using endpoint(ip:port), address, length, and type."""
try:
import snap7 # type: ignore
from snap7.type import Areas # type: ignore
except ImportError as exc:
raise RuntimeError("python-snap7 package not installed") from exc
ip, tcp_port = self._parse_s7_endpoint(endpoint)
db_number, area_code, byte_offset, bit_index = self._parse_s7_address(address)
payload = self._encode_s7_value(value, data_type, int(data_length))
def _do_write():
client = snap7.client.Client()
try:
client.set_connection_type(3)
client.connect(ip, int(rack), int(slot), int(tcp_port))
if not client.get_connected():
raise RuntimeError(f"S7 connect failed: {endpoint}")
if area_code == "X":
current = client.db_read(db_number, byte_offset, 1)
mask = 1 << int(bit_index)
if bool(value):
current[0] = current[0] | mask
else:
current[0] = current[0] & (0xFF ^ mask)
client.db_write(db_number, byte_offset, current)
else:
client.db_write(db_number, byte_offset, payload)
self._log(
f"Wrote S7 value success: endpoint={endpoint}, address={address}, "
f"value={value}, data_type={data_type}, data_length={data_length}"
)
return {
"ok": True,
"endpoint": endpoint,
"ip": ip,
"port": tcp_port,
"address": address,
"data_type": data_type,
"data_length": data_length,
"value": value,
"timestamp": datetime.now().isoformat(),
}
except Exception as exc:
self._log(
f"Write S7 value failed: endpoint={endpoint}, address={address}, error={exc}"
)
raise
finally:
try:
client.disconnect()
except Exception:
pass
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, _do_write)
# ------------------------------------------------------------------
async def _simulate_loop(self):
"""Simulation mode when opcua is not installed."""