Files
pickling-mes/backend/tests/test_udp_sender.py

86 lines
2.4 KiB
Python
Raw Normal View History

"""
模拟L1发送UDP报文的测试脚本
用法: python tests/test_udp_sender.py
"""
import socket
import struct
import time
HOST = "127.0.0.1"
PORT = 9000
MAGIC = b'\xAA\xBB'
def checksum(body: bytes) -> int:
return sum(body) & 0xFFFF
def build_frame(msg_type: str, body_str: str, seq: int) -> bytes:
body = body_str.encode("gbk")
hdr = MAGIC
hdr += msg_type.encode("ascii").ljust(4)[:4]
hdr += struct.pack(">H", seq)
hdr += struct.pack(">H", len(body))
hdr += struct.pack(">H", checksum(body))
return hdr + body
def send(sock: socket.socket, frame: bytes):
sock.sendto(frame, (HOST, PORT))
try:
sock.settimeout(2)
ack, addr = sock.recvfrom(64)
print(f" ACK收到: {ack[:12].hex()}")
except socket.timeout:
print(" 未收到ACK超时")
if __name__ == "__main__":
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# PC20 心跳
print("发送 PC20 心跳...")
send(sock, build_frame("PC20", "20240101120000", seq=1))
time.sleep(0.2)
# PC01 卷材入口
print("发送 PC01 卷材入口...")
body = (
"HA123456789012345678" # 卷号 20B
"SPHC " # 钢种 10B
" 2.500" # 厚度 6B
"1250.0" # 宽度 6B
" 18500.0" # 重量 8B
"" # 班次 2B
)
send(sock, build_frame("PC01", body, seq=2))
time.sleep(0.2)
# PC03 过程数据
print("发送 PC03 过程数据...")
body = (
"HA123456789012345678" # 卷号 20B
"酸洗槽1 " # 位置 10B
" 80.5" # 速度 6B
" 5200.0" # 入口张力 8B
" 4800.0" # 出口张力 8B
" 62.3" # 酸液温度 6B
)
send(sock, build_frame("PC03", body, seq=3))
time.sleep(0.2)
# PC02 卷材出口
print("发送 PC02 卷材出口...")
body = (
"HA123456789012345678" # 卷号 20B
" 2.498" # 实测厚度 6B
"1249.5" # 实测宽度 6B
" 1850.00" # 处理长度 8B
" 78.3" # 平均速度 6B
"A1" # 质量等级 2B
)
send(sock, build_frame("PC02", body, seq=4))
sock.close()
print("测试完成")