From d2d445eff16ff780dad7634afc1aa174e29bedf9 Mon Sep 17 00:00:00 2001 From: Joshi <3040996759@qq.com> Date: Fri, 24 Apr 2026 18:04:33 +0800 Subject: [PATCH] =?UTF-8?q?fix(opc=5Fservice):=20=E4=BF=AE=E5=A4=8DS7?= =?UTF-8?q?=E5=9C=B0=E5=9D=80=E8=A7=A3=E6=9E=90=E5=92=8C=E4=BF=AE=E8=AE=A2?= =?UTF-8?q?=E5=8F=B7=E5=BE=AA=E7=8E=AF=E8=AE=A1=E6=95=B0=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复S7地址解析中未处理的类型别名问题,并添加偏移量有效性检查 实现修订号的循环计数功能(0..999),并添加操作成功日志 --- backend/opc_service.py | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/backend/opc_service.py b/backend/opc_service.py index bb97f9e..de79a02 100644 --- a/backend/opc_service.py +++ b/backend/opc_service.py @@ -406,12 +406,14 @@ class OpcService: revision_address = target_cfg.get("setup_data_revision") if revision_address: try: - next_rev = self._read_s7_int32( + curr_rev = self._read_s7_int32( endpoint=self.write_s7_endpoint, address=revision_address, rack=self.write_s7_rack, slot=self.write_s7_slot, - ) + 1 + ) + # 循环计数:0..999,达到999后回到0 + next_rev = 0 if curr_rev >= 999 else curr_rev + 1 await self.write_s7_value( endpoint=self.write_s7_endpoint, address=revision_address, @@ -421,6 +423,10 @@ class OpcService: rack=self.write_s7_rack, slot=self.write_s7_slot, ) + self._log( + f"Write setup_data_revision success (U{target_uncoiler}): " + f"{curr_rev} -> {next_rev}, address={revision_address}" + ) except Exception as exc: self._log(f"Write setup_data_revision failed (U{target_uncoiler}): {exc}") @@ -1114,8 +1120,29 @@ class OpcService: if area_part.startswith("DB"): area_part = area_part[2:] - area_code = area_part[:1] - offset_part = area_part[1:] + # Normalize aliases: + # X0.0 / B2 / W4 / D8 / DINT0 / INT4 / WORD6 / REAL8 / BYTE10 + if area_part.startswith("DINT"): + area_code = "D" + offset_part = area_part[4:] + elif area_part.startswith("DWORD"): + area_code = "D" + offset_part = area_part[5:] + elif area_part.startswith("REAL"): + area_code = "D" + offset_part = area_part[4:] + elif area_part.startswith("INT"): + area_code = "W" + offset_part = area_part[3:] + elif area_part.startswith("WORD"): + area_code = "W" + offset_part = area_part[4:] + elif area_part.startswith("BYTE"): + area_code = "B" + offset_part = area_part[4:] + else: + area_code = area_part[:1] + offset_part = area_part[1:] bit_index = None if area_code == "X": @@ -1129,6 +1156,8 @@ class OpcService: else: if "." in offset_part: raise ValueError(f"Only X area may include bit index: {address}") + if not offset_part or not offset_part.lstrip("-").isdigit(): + raise ValueError(f"Invalid S7 byte offset in address: {address}") byte_offset = int(offset_part) if byte_offset < 0: