fix(opc_service): 修复S7地址解析和修订号循环计数问题

修复S7地址解析中未处理的类型别名问题,并添加偏移量有效性检查
实现修订号的循环计数功能(0..999),并添加操作成功日志
This commit is contained in:
2026-04-24 18:04:33 +08:00
parent 8b15f78e78
commit d2d445eff1

View File

@@ -406,12 +406,14 @@ class OpcService:
revision_address = target_cfg.get("setup_data_revision") revision_address = target_cfg.get("setup_data_revision")
if revision_address: if revision_address:
try: try:
next_rev = self._read_s7_int32( curr_rev = self._read_s7_int32(
endpoint=self.write_s7_endpoint, endpoint=self.write_s7_endpoint,
address=revision_address, address=revision_address,
rack=self.write_s7_rack, rack=self.write_s7_rack,
slot=self.write_s7_slot, slot=self.write_s7_slot,
) + 1 )
# 循环计数0..999达到999后回到0
next_rev = 0 if curr_rev >= 999 else curr_rev + 1
await self.write_s7_value( await self.write_s7_value(
endpoint=self.write_s7_endpoint, endpoint=self.write_s7_endpoint,
address=revision_address, address=revision_address,
@@ -421,6 +423,10 @@ class OpcService:
rack=self.write_s7_rack, rack=self.write_s7_rack,
slot=self.write_s7_slot, slot=self.write_s7_slot,
) )
self._log(
f"Write setup_data_revision success (U{target_uncoiler}): "
f"{curr_rev} -> {next_rev}, address={revision_address}"
)
except Exception as exc: except Exception as exc:
self._log(f"Write setup_data_revision failed (U{target_uncoiler}): {exc}") self._log(f"Write setup_data_revision failed (U{target_uncoiler}): {exc}")
@@ -1114,8 +1120,29 @@ class OpcService:
if area_part.startswith("DB"): if area_part.startswith("DB"):
area_part = area_part[2:] area_part = area_part[2:]
area_code = area_part[:1] # Normalize aliases:
offset_part = area_part[1:] # X0.0 / B2 / W4 / D8 / DINT0 / INT4 / WORD6 / REAL8 / BYTE10
if area_part.startswith("DINT"):
area_code = "D"
offset_part = area_part[4:]
elif area_part.startswith("DWORD"):
area_code = "D"
offset_part = area_part[5:]
elif area_part.startswith("REAL"):
area_code = "D"
offset_part = area_part[4:]
elif area_part.startswith("INT"):
area_code = "W"
offset_part = area_part[3:]
elif area_part.startswith("WORD"):
area_code = "W"
offset_part = area_part[4:]
elif area_part.startswith("BYTE"):
area_code = "B"
offset_part = area_part[4:]
else:
area_code = area_part[:1]
offset_part = area_part[1:]
bit_index = None bit_index = None
if area_code == "X": if area_code == "X":
@@ -1129,6 +1156,8 @@ class OpcService:
else: else:
if "." in offset_part: if "." in offset_part:
raise ValueError(f"Only X area may include bit index: {address}") raise ValueError(f"Only X area may include bit index: {address}")
if not offset_part or not offset_part.lstrip("-").isdigit():
raise ValueError(f"Invalid S7 byte offset in address: {address}")
byte_offset = int(offset_part) byte_offset = int(offset_part)
if byte_offset < 0: if byte_offset < 0: