Files
pickling-mes/backend/tests/test_udp_sender.py
wangyu 193da0018f feat: 移除PDI和订单号字段,新增设备巡检模块
- 从物料跟踪页面移除订单号列和表单字段
- 从导航菜单移除PDI管理,添加设备巡检
- 新增InspectionLocation和InspectionRecord后端模型和API
- 新增设备巡检前端页面(左侧点位列表,右侧设备和历史记录)
2026-05-27 16:38:40 +08:00

86 lines
2.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
模拟L1发送UDP报文的测试脚本
用法: python tests/test_udp_sender.py
"""
import socket
import struct
import time
HOST = "127.0.0.1"
PORT = 9000
MAGIC = b'\xAA\xBB'
def checksum(body: bytes) -> int:
return sum(body) & 0xFFFF
def build_frame(msg_type: str, body_str: str, seq: int) -> bytes:
body = body_str.encode("gbk")
hdr = MAGIC
hdr += msg_type.encode("ascii").ljust(4)[:4]
hdr += struct.pack(">H", seq)
hdr += struct.pack(">H", len(body))
hdr += struct.pack(">H", checksum(body))
return hdr + body
def send(sock: socket.socket, frame: bytes):
sock.sendto(frame, (HOST, PORT))
try:
sock.settimeout(2)
ack, addr = sock.recvfrom(64)
print(f" ACK收到: {ack[:12].hex()}")
except socket.timeout:
print(" 未收到ACK超时")
if __name__ == "__main__":
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# PC20 心跳
print("发送 PC20 心跳...")
send(sock, build_frame("PC20", "20240101120000", seq=1))
time.sleep(0.2)
# PC01 卷材入口
print("发送 PC01 卷材入口...")
body = (
"HA123456789012345678" # 卷号 20B
"SPHC " # 钢种 10B
" 2.500" # 厚度 6B
"1250.0" # 宽度 6B
" 18500.0" # 重量 8B
"" # 班次 2B
)
send(sock, build_frame("PC01", body, seq=2))
time.sleep(0.2)
# PC03 过程数据
print("发送 PC03 过程数据...")
body = (
"HA123456789012345678" # 卷号 20B
"酸洗槽1 " # 位置 10B
" 80.5" # 速度 6B
" 5200.0" # 入口张力 8B
" 4800.0" # 出口张力 8B
" 62.3" # 酸液温度 6B
)
send(sock, build_frame("PC03", body, seq=3))
time.sleep(0.2)
# PC02 卷材出口
print("发送 PC02 卷材出口...")
body = (
"HA123456789012345678" # 卷号 20B
" 2.498" # 实测厚度 6B
"1249.5" # 实测宽度 6B
" 1850.00" # 处理长度 8B
" 78.3" # 平均速度 6B
"A1" # 质量等级 2B
)
send(sock, build_frame("PC02", body, seq=4))
sock.close()
print("测试完成")