feat(opc): 添加S7直写支持并更新字段映射格式
- 在OpcConfig模型中新增S7终端、机架和槽位配置 - 实现S7直写功能,替代原有的OPC写入方式 - 更新前端字段映射占位符为S7地址格式 - 修改焊接完成信号触发条件说明 - 添加S7 INT32读取功能用于数据版本控制
This commit is contained in:
@@ -67,6 +67,9 @@ class OpcService:
|
||||
self.write_counter_node: str = os.getenv("OPC_WRITE_COUNTER_NODE", "")
|
||||
self.write_source_node: str = os.getenv("OPC_WRITE_SOURCE_NODE", "")
|
||||
self.write_target_node: str = os.getenv("OPC_WRITE_TARGET_NODE", "")
|
||||
self.write_s7_endpoint: str = os.getenv("OPC_WRITE_S7_ENDPOINT", "")
|
||||
self.write_s7_rack: int = int(os.getenv("OPC_WRITE_S7_RACK", "0"))
|
||||
self.write_s7_slot: int = int(os.getenv("OPC_WRITE_S7_SLOT", "1"))
|
||||
self.write_counter_last: Optional[Any] = None
|
||||
# 写入字段映射(按目标开卷机 1/2 分组)
|
||||
self.write_nodes: Dict[str, Dict[str, str]] = {}
|
||||
@@ -116,6 +119,9 @@ class OpcService:
|
||||
self.write_counter_node = cfg.get("write_counter_node", self.write_counter_node)
|
||||
self.write_source_node = cfg.get("write_source_node", self.write_source_node)
|
||||
self.write_target_node = cfg.get("write_target_node", self.write_target_node)
|
||||
self.write_s7_endpoint = cfg.get("write_s7_endpoint", self.write_s7_endpoint)
|
||||
self.write_s7_rack = int(cfg.get("write_s7_rack", self.write_s7_rack))
|
||||
self.write_s7_slot = int(cfg.get("write_s7_slot", self.write_s7_slot))
|
||||
self.write_nodes = cfg.get("write_nodes", self.write_nodes) or {}
|
||||
self._log(f"Loaded OPC config from {self.config_path}")
|
||||
except Exception as exc:
|
||||
@@ -133,6 +139,9 @@ class OpcService:
|
||||
"write_counter_node": self.write_counter_node,
|
||||
"write_source_node": self.write_source_node,
|
||||
"write_target_node": self.write_target_node,
|
||||
"write_s7_endpoint": self.write_s7_endpoint,
|
||||
"write_s7_rack": self.write_s7_rack,
|
||||
"write_s7_slot": self.write_s7_slot,
|
||||
"write_nodes": self.write_nodes,
|
||||
}
|
||||
os.makedirs(os.path.dirname(self.config_path), exist_ok=True)
|
||||
@@ -323,10 +332,10 @@ class OpcService:
|
||||
if source_value != 100 or target_value not in (1, 2):
|
||||
return
|
||||
|
||||
await self._write_entry_coil_to_uncoiler(client, target_value)
|
||||
await self._write_entry_coil_to_uncoiler(target_value)
|
||||
|
||||
async def _write_entry_coil_to_uncoiler(self, client, target_uncoiler: int):
|
||||
"""Write the smallest COILID plan to target uncoiler OPC nodes."""
|
||||
async def _write_entry_coil_to_uncoiler(self, target_uncoiler: int):
|
||||
"""Write the smallest COILID plan to target uncoiler via S7."""
|
||||
|
||||
def _load_next_plan():
|
||||
from database import get_connection
|
||||
@@ -377,43 +386,66 @@ class OpcService:
|
||||
if not target_cfg:
|
||||
self._log(f"Write nodes for uncoiler {target_uncoiler} not configured")
|
||||
return
|
||||
if not self.write_s7_endpoint:
|
||||
self._log("Write S7 endpoint is empty, skip write")
|
||||
return
|
||||
|
||||
# setup_data_revision: 每次写入 +1
|
||||
revision_node = target_cfg.get("setup_data_revision")
|
||||
if revision_node:
|
||||
field_meta = {
|
||||
"setup_data_revision": {"data_type": "INT32", "data_length": 4},
|
||||
"coilid": {"data_type": "S7STRING", "data_length": 20},
|
||||
"entry_coil_weight": {"data_type": "FLOAT", "data_length": 4},
|
||||
"entry_of_coil_length": {"data_type": "FLOAT", "data_length": 4},
|
||||
"entry_coil_width": {"data_type": "FLOAT", "data_length": 4},
|
||||
"entry_coil_thickness": {"data_type": "FLOAT", "data_length": 4},
|
||||
"entry_of_coil_inner_diameter": {"data_type": "FLOAT", "data_length": 4},
|
||||
"entry_of_coil_outer_diameter": {"data_type": "FLOAT", "data_length": 4},
|
||||
"alloy_code": {"data_type": "S7STRING", "data_length": 4},
|
||||
"material": {"data_type": "S7STRING", "data_length": 20},
|
||||
}
|
||||
|
||||
revision_address = target_cfg.get("setup_data_revision")
|
||||
if revision_address:
|
||||
try:
|
||||
rev_value = client.get_node(revision_node).get_value()
|
||||
next_rev = int(rev_value or 0) + 1
|
||||
await self._write_node_value_by_client(client, revision_node, next_rev, "Int32")
|
||||
next_rev = self._read_s7_int32(
|
||||
endpoint=self.write_s7_endpoint,
|
||||
address=revision_address,
|
||||
rack=self.write_s7_rack,
|
||||
slot=self.write_s7_slot,
|
||||
) + 1
|
||||
await self.write_s7_value(
|
||||
endpoint=self.write_s7_endpoint,
|
||||
address=revision_address,
|
||||
data_length=4,
|
||||
data_type="INT32",
|
||||
value=next_rev,
|
||||
rack=self.write_s7_rack,
|
||||
slot=self.write_s7_slot,
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log(f"Write setup_data_revision failed (U{target_uncoiler}): {exc}")
|
||||
|
||||
field_variant = {
|
||||
"coilid": "String",
|
||||
"entry_coil_weight": "Float",
|
||||
"entry_of_coil_length": "Float",
|
||||
"entry_coil_width": "Float",
|
||||
"entry_coil_thickness": "Float",
|
||||
"entry_of_coil_inner_diameter": "Float",
|
||||
"entry_of_coil_outer_diameter": "Float",
|
||||
"alloy_code": "String",
|
||||
"material": "String",
|
||||
}
|
||||
|
||||
for field, value in plan.items():
|
||||
node_id = target_cfg.get(field)
|
||||
if not node_id or value is None:
|
||||
for field, value in {"coilid": plan["coilid"], **plan}.items():
|
||||
if field == "setup_data_revision":
|
||||
continue
|
||||
address = target_cfg.get(field)
|
||||
if not address or value is None:
|
||||
continue
|
||||
meta = field_meta.get(field)
|
||||
if not meta:
|
||||
continue
|
||||
try:
|
||||
await self._write_node_value_by_client(
|
||||
client,
|
||||
node_id,
|
||||
value,
|
||||
field_variant.get(field),
|
||||
await self.write_s7_value(
|
||||
endpoint=self.write_s7_endpoint,
|
||||
address=address,
|
||||
data_length=meta["data_length"],
|
||||
data_type=meta["data_type"],
|
||||
value=value,
|
||||
rack=self.write_s7_rack,
|
||||
slot=self.write_s7_slot,
|
||||
)
|
||||
except Exception as exc:
|
||||
self._log(
|
||||
f"Write field failed (U{target_uncoiler}, {field}, node={node_id}): {exc}"
|
||||
f"Write field failed (U{target_uncoiler}, {field}, address={address}): {exc}"
|
||||
)
|
||||
|
||||
self._log(
|
||||
@@ -431,11 +463,105 @@ class OpcService:
|
||||
from opcua import ua # type: ignore
|
||||
|
||||
node = client.get_node(node_id)
|
||||
# 1) Try direct write first (some OPC servers do their own coercion better)
|
||||
if variant_type is None:
|
||||
try:
|
||||
node.set_value(value)
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
vt = self._normalize_variant_type(variant_type)
|
||||
if vt is None:
|
||||
try:
|
||||
vt = node.get_data_type_as_variant_type()
|
||||
except Exception:
|
||||
vt = None
|
||||
|
||||
if vt is None:
|
||||
node.set_value(value)
|
||||
else:
|
||||
node.set_value(ua.DataValue(ua.Variant(value, vt)))
|
||||
return
|
||||
|
||||
# 2) Coerce by OPC variant type
|
||||
try:
|
||||
write_value = self._coerce_value_for_variant(value, vt)
|
||||
node.set_value(ua.DataValue(ua.Variant(write_value, vt)))
|
||||
return
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# 3) Coerce by current value runtime shape (list/bytes/str/int...)
|
||||
current_val = node.get_value()
|
||||
write_value = self._coerce_value_by_current_value(value, current_val)
|
||||
node.set_value(write_value)
|
||||
|
||||
def _coerce_value_for_variant(self, value: Any, vt):
|
||||
"""Coerce python value to match OPC ua.VariantType."""
|
||||
vt_name = getattr(vt, "name", str(vt))
|
||||
|
||||
if vt_name in ("Boolean",):
|
||||
return bool(value)
|
||||
if vt_name in ("SByte", "Byte", "Int16", "UInt16", "Int32", "UInt32", "Int64", "UInt64"):
|
||||
if value is None or value == "":
|
||||
return 0
|
||||
if isinstance(value, str):
|
||||
raw = value.strip()
|
||||
if raw == "":
|
||||
return 0
|
||||
if raw.lstrip("-").isdigit():
|
||||
return int(raw)
|
||||
# Non-numeric text to numeric scalar is invalid; let caller try another strategy
|
||||
raise ValueError(f"Cannot coerce non-numeric text '{value}' to {vt_name}")
|
||||
return int(value)
|
||||
if vt_name in ("Float", "Double"):
|
||||
if value is None or value == "":
|
||||
return 0.0
|
||||
return float(value)
|
||||
if vt_name in ("String",):
|
||||
return "" if value is None else str(value)
|
||||
if vt_name in ("ByteString",):
|
||||
if value is None:
|
||||
return b""
|
||||
if isinstance(value, bytes):
|
||||
return value
|
||||
return str(value).encode("utf-8")
|
||||
|
||||
return value
|
||||
|
||||
def _coerce_value_by_current_value(self, value: Any, current_val: Any):
|
||||
"""Coerce value according to current node value runtime structure."""
|
||||
if isinstance(current_val, str):
|
||||
return "" if value is None else str(value)
|
||||
|
||||
if isinstance(current_val, (bytes, bytearray)):
|
||||
raw = b"" if value is None else str(value).encode("utf-8")
|
||||
length = len(current_val)
|
||||
if length > 0:
|
||||
raw = raw[:length].ljust(length, b"\x00")
|
||||
return raw if isinstance(current_val, bytes) else bytearray(raw)
|
||||
|
||||
if isinstance(current_val, (list, tuple)) and current_val:
|
||||
# Common PLC string representation: array of bytes/ints
|
||||
if all(isinstance(x, int) for x in current_val):
|
||||
arr_len = len(current_val)
|
||||
raw = b"" if value is None else str(value).encode("ascii", errors="ignore")
|
||||
raw = raw[:arr_len].ljust(arr_len, b"\x00")
|
||||
return [int(b) for b in raw]
|
||||
|
||||
if isinstance(current_val, bool):
|
||||
return bool(value)
|
||||
if isinstance(current_val, int):
|
||||
if value is None or value == "":
|
||||
return 0
|
||||
if isinstance(value, str) and not value.strip().lstrip("-").isdigit():
|
||||
raise ValueError(f"Cannot write non-numeric '{value}' to int node")
|
||||
return int(value)
|
||||
if isinstance(current_val, float):
|
||||
if value is None or value == "":
|
||||
return 0.0
|
||||
return float(value)
|
||||
|
||||
return value
|
||||
|
||||
async def _handle_signal1(self):
|
||||
"""Handle signal1: fetch next 4 coils from Oracle PDI table and save to SQLite temp table."""
|
||||
@@ -839,6 +965,14 @@ class OpcService:
|
||||
raise ValueError(f"{dt} data_length must be 8")
|
||||
return struct.pack(">d", float(value))
|
||||
|
||||
if dt == "S7STRING":
|
||||
if data_length < 1 or data_length > 254:
|
||||
raise ValueError("S7STRING data_length must be 1..254")
|
||||
text = "" if value is None else str(value)
|
||||
raw = text.encode("ascii", errors="ignore")[:data_length]
|
||||
# Siemens STRING layout: [max_len][cur_len][chars...]
|
||||
return bytes([data_length, len(raw)]) + raw.ljust(data_length, b"\x00")
|
||||
|
||||
if dt in ("STRING", "BYTES"):
|
||||
if data_length < 1:
|
||||
raise ValueError(f"{dt} data_length must be >= 1")
|
||||
@@ -851,7 +985,7 @@ class OpcService:
|
||||
|
||||
raise ValueError(
|
||||
"Unsupported S7 data_type. "
|
||||
"Example: BOOL/BYTE/INT16/UINT16/INT32/UINT32/REAL/LREAL/STRING"
|
||||
"Example: BOOL/BYTE/INT16/UINT16/INT32/UINT32/REAL/LREAL/STRING/S7STRING"
|
||||
)
|
||||
|
||||
async def write_s7_value(
|
||||
@@ -1053,6 +1187,13 @@ class OpcService:
|
||||
raise ValueError(f"{dt} data_length must be 8")
|
||||
return struct.pack(">d", float(value))
|
||||
|
||||
if dt == "S7STRING":
|
||||
if data_length < 1 or data_length > 254:
|
||||
raise ValueError("S7STRING data_length must be 1..254")
|
||||
text = "" if value is None else str(value)
|
||||
raw = text.encode("ascii", errors="ignore")[:data_length]
|
||||
return bytes([data_length, len(raw)]) + raw.ljust(data_length, b"\x00")
|
||||
|
||||
if dt in ("STRING", "BYTES"):
|
||||
if isinstance(value, bytes):
|
||||
raw = value
|
||||
@@ -1064,9 +1205,35 @@ class OpcService:
|
||||
|
||||
raise ValueError(
|
||||
f"Unsupported data_type: {data_type}. "
|
||||
"Supported: BOOL/BYTE/INT8/UINT16/WORD/INT16/UINT32/DWORD/INT32/DINT/REAL/FLOAT/LREAL/DOUBLE/STRING/BYTES"
|
||||
"Supported: BOOL/BYTE/INT8/UINT16/WORD/INT16/UINT32/DWORD/INT32/DINT/REAL/FLOAT/LREAL/DOUBLE/STRING/S7STRING/BYTES"
|
||||
)
|
||||
|
||||
def _read_s7_int32(self, endpoint: str, address: str, rack: int = 0, slot: int = 1) -> int:
|
||||
"""Read a 4-byte signed int from S7 DB address."""
|
||||
try:
|
||||
import snap7 # type: ignore
|
||||
except ImportError as exc:
|
||||
raise RuntimeError("python-snap7 package not installed") from exc
|
||||
|
||||
ip, tcp_port = self._parse_s7_endpoint(endpoint)
|
||||
db_number, area_code, byte_offset, bit_index = self._parse_s7_address(address)
|
||||
if area_code not in ("D",):
|
||||
raise ValueError(f"INT32 read expects D/DBD address, got: {address}")
|
||||
|
||||
client = snap7.client.Client()
|
||||
try:
|
||||
client.set_connection_type(3)
|
||||
client.connect(ip, int(rack), int(slot), int(tcp_port))
|
||||
if not client.get_connected():
|
||||
raise RuntimeError(f"S7 connect failed: {endpoint}")
|
||||
raw = client.db_read(db_number, byte_offset, 4)
|
||||
return int.from_bytes(raw, byteorder="big", signed=True)
|
||||
finally:
|
||||
try:
|
||||
client.disconnect()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
async def write_s7_value(
|
||||
self,
|
||||
endpoint: str,
|
||||
|
||||
Reference in New Issue
Block a user