feat: 移除PDI和订单号字段,新增设备巡检模块

- 从物料跟踪页面移除订单号列和表单字段
- 从导航菜单移除PDI管理,添加设备巡检
- 新增InspectionLocation和InspectionRecord后端模型和API
- 新增设备巡检前端页面(左侧点位列表,右侧设备和历史记录)
This commit is contained in:
2026-05-27 16:38:40 +08:00
commit 193da0018f
86 changed files with 11379 additions and 0 deletions

View File

@@ -0,0 +1,70 @@
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from app.config import settings
from app.models.user import User
from app.database import get_db
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login")
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode["exp"] = expire
return jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
async def authenticate_user(db: AsyncSession, username: str, password: str) -> Optional[User]:
result = await db.execute(select(User).where(User.username == username))
user = result.scalar_one_or_none()
if not user or not verify_password(password, user.hashed_password):
return None
return user
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
username: str = payload.get("sub")
if not username:
raise credentials_exception
except JWTError:
raise credentials_exception
result = await db.execute(select(User).where(User.username == username))
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise credentials_exception
return user
def require_roles(*roles: str):
async def checker(current_user: User = Depends(get_current_user)):
if current_user.role not in roles:
raise HTTPException(status_code=403, detail="权限不足")
return current_user
return checker

View File

@@ -0,0 +1,70 @@
from datetime import datetime
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from loguru import logger
from app.models.material import Coil, MaterialTracking, CoilStatus
from app.services.message_parser import dispatcher
class MaterialService:
@staticmethod
async def get_coil(db: AsyncSession, coil_no: str) -> Optional[Coil]:
result = await db.execute(select(Coil).where(Coil.coil_no == coil_no))
return result.scalar_one_or_none()
@staticmethod
async def create_tracking(db: AsyncSession, coil: Coil, event_type: str,
position: str = None, **kwargs) -> MaterialTracking:
tracking = MaterialTracking(
coil_id=coil.id,
coil_no=coil.coil_no,
position=position,
event_type=event_type,
event_time=kwargs.get("event_time", datetime.now()),
**{k: v for k, v in kwargs.items() if k != "event_time"},
)
db.add(tracking)
await db.flush()
return tracking
@staticmethod
async def update_coil_status(db: AsyncSession, coil: Coil, status: CoilStatus):
coil.status = status
await db.flush()
material_service = MaterialService()
# 注册L1报文处理器
@dispatcher.register("PC01")
async def handle_coil_entry(data: dict):
"""处理卷材入口报文"""
from app.database import AsyncSessionLocal
async with AsyncSessionLocal() as db:
coil = await material_service.get_coil(db, data["coil_no"])
if coil:
await material_service.update_coil_status(db, coil, CoilStatus.ON_LINE)
await material_service.create_tracking(
db, coil, "entry", position="入口", **data
)
await db.commit()
logger.info(f"卷材入线: {data['coil_no']}")
@dispatcher.register("PC02")
async def handle_coil_exit(data: dict):
"""处理卷材出口报文"""
from app.database import AsyncSessionLocal
async with AsyncSessionLocal() as db:
coil = await material_service.get_coil(db, data["coil_no"])
if coil:
await material_service.update_coil_status(db, coil, CoilStatus.FINISHED)
await material_service.create_tracking(
db, coil, "exit", position="出口", **data
)
await db.commit()
logger.info(f"卷材出线: {data['coil_no']}")

View File

@@ -0,0 +1,348 @@
"""
L1报文解析服务 — UDP协议
假设报文格式(固定帧结构,收到实际协议文档后对应调整):
┌─────────────────────────────────────────────────────┐
│ Offset Size 说明 │
│ 0 2B 魔数 0xAA 0xBB │
│ 2 4B 报文类型 ASCII"PC01"
│ 6 2B 序列号 uint16 大端 │
│ 8 2B Body长度 uint16 大端 │
│ 10 2B 校验和 所有Body字节累加低16位 │
│ 12 N B Body GBK编码固定列宽文本 │
└─────────────────────────────────────────────────────┘
UDP最大包65507字节单帧不分片。
回执帧Header相同结构Body = "ACK" + 原序列号(2B)
"""
import asyncio
import struct
import time
import uuid
import json
from datetime import datetime
from typing import Optional, Dict, Any, Tuple
from loguru import logger
from app.config import settings
# ─────────────────────────── 报文类型注册表 ───────────────────────────
MSG_TYPES: Dict[str, str] = {
"PC01": "卷材入口报文",
"PC02": "卷材出口报文",
"PC03": "过程数据报文",
"PC04": "质量数据报文",
"PC05": "设备状态报文",
"PC10": "计划下发报文",
"PC11": "计划确认报文",
"PC20": "心跳报文",
}
HEADER_SIZE = 12 # 报文头固定长度
MAGIC = b'\xAA\xBB'
# ─────────────────────────── 校验和 ───────────────────────────
def _checksum(body: bytes) -> int:
return sum(body) & 0xFFFF
# ─────────────────────────── 报文头解析 ───────────────────────────
def parse_header(raw: bytes) -> Optional[Dict[str, Any]]:
if len(raw) < HEADER_SIZE:
logger.warning(f"报文过短: {len(raw)}B丢弃")
return None
magic = raw[0:2]
if magic != MAGIC:
logger.warning(f"魔数错误: {magic.hex()},丢弃")
return None
msg_type = raw[2:6].decode("ascii", errors="replace").strip()
seq = struct.unpack(">H", raw[6:8])[0]
body_len = struct.unpack(">H", raw[8:10])[0]
checksum = struct.unpack(">H", raw[10:12])[0]
body = raw[HEADER_SIZE: HEADER_SIZE + body_len]
if len(body) < body_len:
logger.warning(f"Body不完整: 期望{body_len}B 实际{len(body)}B")
return None
if _checksum(body) != checksum:
logger.warning(f"校验和错误 [{msg_type}] seq={seq},丢弃")
return None
return {"msg_type": msg_type, "seq": seq, "body": body,
"body_str": body.decode("gbk", errors="replace")}
# ─────────────────────────── 构建回执帧 ───────────────────────────
def build_ack(seq: int) -> bytes:
body = b"ACK" + struct.pack(">H", seq)
hdr = MAGIC
hdr += b"ACK ".ljust(4)[:4]
hdr += struct.pack(">H", seq)
hdr += struct.pack(">H", len(body))
hdr += struct.pack(">H", _checksum(body))
return hdr + body
# ─────────────────────────── 构建发送帧 ───────────────────────────
def build_frame(msg_type: str, body: bytes, seq: int = 0) -> bytes:
hdr = MAGIC
hdr += msg_type.encode("ascii").ljust(4)[:4]
hdr += struct.pack(">H", seq)
hdr += struct.pack(">H", len(body))
hdr += struct.pack(">H", _checksum(body))
return hdr + body
# ─────────────────────────── Body解析器 ───────────────────────────
class PC01Parser:
"""卷材入口报文
Body固定列宽(GBK): 卷号(20) 钢种(10) 厚度(6) 宽度(6) 重量(8) 班次(2)
"""
def parse(self, body: str) -> Dict[str, Any]:
return {
"coil_no": body[0:20].strip(),
"steel_grade": body[20:30].strip(),
"thickness": _safe_float(body[30:36]),
"width": _safe_float(body[36:42]),
"weight": _safe_float(body[42:50]),
"shift": body[50:52].strip(),
"event_time": datetime.now(),
}
class PC02Parser:
"""卷材出口报文
Body: 卷号(20) 实测厚度(6) 实测宽度(6) 处理长度(8) 平均速度(6) 质量等级(2)
"""
def parse(self, body: str) -> Dict[str, Any]:
return {
"coil_no": body[0:20].strip(),
"actual_thickness": _safe_float(body[20:26]),
"actual_width": _safe_float(body[26:32]),
"process_length": _safe_float(body[32:40]),
"avg_speed": _safe_float(body[40:46]),
"quality_grade": body[46:48].strip(),
"event_time": datetime.now(),
}
class PC03Parser:
"""过程数据报文(周期推送)
Body: 卷号(20) 位置(10) 速度(6) 入口张力(8) 出口张力(8) 酸液温度(6)
"""
def parse(self, body: str) -> Dict[str, Any]:
return {
"coil_no": body[0:20].strip(),
"position": body[20:30].strip(),
"speed": _safe_float(body[30:36]),
"tension_inlet": _safe_float(body[36:44]),
"tension_outlet": _safe_float(body[44:52]),
"acid_temp": _safe_float(body[52:58]),
"event_time": datetime.now(),
}
class PC04Parser:
"""质量数据报文
Body: 卷号(20) 缺陷类型(10) 缺陷位置(8) 严重程度(2)
"""
def parse(self, body: str) -> Dict[str, Any]:
return {
"coil_no": body[0:20].strip(),
"defect_type": body[20:30].strip(),
"defect_pos": _safe_float(body[30:38]),
"severity": body[38:40].strip(),
"event_time": datetime.now(),
}
class PC05Parser:
"""设备状态报文
Body: 设备编号(10) 状态码(4) 故障码(6) 时间戳(14 yyyyMMddHHmmss)
"""
def parse(self, body: str) -> Dict[str, Any]:
ts_str = body[20:34].strip()
try:
ts = datetime.strptime(ts_str, "%Y%m%d%H%M%S")
except Exception:
ts = datetime.now()
return {
"equipment_code": body[0:10].strip(),
"status_code": body[10:14].strip(),
"fault_code": body[14:20].strip(),
"event_time": ts,
}
class PC20Parser:
"""心跳报文 Body: 时间戳(14)"""
def parse(self, body: str) -> Dict[str, Any]:
return {"event_time": datetime.now(), "raw_ts": body[0:14].strip()}
def _safe_float(s: str) -> float:
try:
return float(s.strip())
except (ValueError, AttributeError):
return 0.0
BODY_PARSERS: Dict[str, Any] = {
"PC01": PC01Parser(),
"PC02": PC02Parser(),
"PC03": PC03Parser(),
"PC04": PC04Parser(),
"PC05": PC05Parser(),
"PC20": PC20Parser(),
}
# ─────────────────────────── 分发器 ───────────────────────────
class MessageDispatcher:
def __init__(self):
self._handlers: Dict[str, list] = {}
def register(self, msg_type: str):
def decorator(func):
self._handlers.setdefault(msg_type, []).append(func)
return func
return decorator
async def dispatch(self, msg_type: str, data: Dict[str, Any]):
for handler in self._handlers.get(msg_type, []):
try:
await handler(data)
except Exception as e:
logger.error(f"报文处理器异常 [{msg_type}]: {e}")
dispatcher = MessageDispatcher()
# ─────────────────────────── UDP 服务端 ───────────────────────────
class L1UdpProtocol(asyncio.DatagramProtocol):
"""asyncio UDP DatagramProtocol 实现"""
def __init__(self, server: "L1UdpServer"):
self._server = server
self.transport: Optional[asyncio.DatagramTransport] = None
def connection_made(self, transport: asyncio.DatagramTransport):
self.transport = transport
host, port = transport.get_extra_info("sockname")
logger.info(f"UDP监听启动: {host}:{port}")
def datagram_received(self, data: bytes, addr: Tuple[str, int]):
asyncio.create_task(self._server.handle(data, addr, self.transport))
def error_received(self, exc: Exception):
logger.error(f"UDP错误: {exc}")
def connection_lost(self, exc):
logger.warning(f"UDP连接丢失: {exc}")
class L1UdpServer:
"""UDP报文接收服务"""
def __init__(self):
self._transport: Optional[asyncio.DatagramTransport] = None
self._running = False
# 统计
self.recv_count = 0
self.error_count = 0
async def start(self):
self._running = True
loop = asyncio.get_running_loop()
self._transport, _ = await loop.create_datagram_endpoint(
lambda: L1UdpProtocol(self),
local_addr=(settings.L1_HOST, settings.L1_PORT),
)
logger.info(f"L1 UDP服务已启动监听 {settings.L1_HOST}:{settings.L1_PORT}")
async def handle(self, raw: bytes, addr: Tuple[str, int],
transport: asyncio.DatagramTransport):
t0 = time.time()
self.recv_count += 1
logger.debug(f"收到UDP包 from {addr[0]}:{addr[1]} {len(raw)}B")
header = parse_header(raw)
if not header:
self.error_count += 1
await self._save_log(raw, addr, None, "error", "报文头解析失败", t0)
return
msg_type = header["msg_type"]
seq = header["seq"]
# 发送ACK回执
if msg_type != "ACK":
ack = build_ack(seq)
transport.sendto(ack, addr)
# 心跳不做业务处理
if msg_type == "PC20":
logger.debug(f"心跳 seq={seq}")
return
# Body解析
body_parser = BODY_PARSERS.get(msg_type)
data: Dict[str, Any] = {}
if body_parser:
try:
data = body_parser.parse(header["body_str"])
except Exception as e:
logger.error(f"Body解析异常 [{msg_type}]: {e}")
self.error_count += 1
await self._save_log(raw, addr, header, "error", str(e), t0)
return
else:
logger.warning(f"未知报文类型: {msg_type}")
elapsed_ms = (time.time() - t0) * 1000
logger.info(f"[{msg_type}] seq={seq} from {addr[0]} 耗时{elapsed_ms:.1f}ms")
await self._save_log(raw, addr, header, "success", None, t0, data)
await dispatcher.dispatch(msg_type, data)
async def _save_log(self, raw: bytes, addr: Tuple[str, int],
header: Optional[Dict], status: str,
error_msg: Optional[str], t0: float,
parsed_data: Optional[Dict] = None):
try:
from app.database import AsyncSessionLocal
from app.models.message import MessageLog
elapsed_ms = (time.time() - t0) * 1000
async with AsyncSessionLocal() as db:
log = MessageLog(
msg_id=str(uuid.uuid4())[:16],
msg_type=header["msg_type"] if header else "UNKNOWN",
direction="recv",
source=f"{addr[0]}:{addr[1]}",
raw_data=raw.hex(),
parsed_data=json.dumps(parsed_data, default=str) if parsed_data else None,
status=status,
error_msg=error_msg,
process_time=round(elapsed_ms, 2),
received_at=datetime.now(),
)
db.add(log)
await db.commit()
except Exception as e:
logger.error(f"保存报文日志失败: {e}")
def send(self, data: bytes, addr: Tuple[str, int]):
"""主动向L1发送报文"""
if self._transport:
self._transport.sendto(data, addr)
else:
raise RuntimeError("UDP服务未启动")
def stop(self):
self._running = False
if self._transport:
self._transport.close()
# 全局单例
l1_server = L1UdpServer()

View File

@@ -0,0 +1,482 @@
"""
工艺预测模型 — 灰箱Gray-box架构
设计思路:
物理结构来自 Arrhenius 酸洗动力学,参数取自公开文献实验值,
而非理论推导。每个模型内置校准系数 K_cal初始=1.0
投产后可通过 calibrate() 方法用实测结果回归更新,
使模型随数据积累逐步收敛到真实工况。
关键文献依据:
[1] 碳钢 HCl 酸洗活化能Ea ≈ 40~50 kJ/mol实验测定均值取 45 kJ/mol
来源Hydrochloric Acid Pickling Process Optimization in Metal Wire,
IJSSST Vol-16 No-5; IspatGuru Pickling of Hot Rolled Strip
[2] H⁺ 浓度动力学阶次1.0~2.0阶(取保守值 1.2
来源Optimizing pickling process for 30Cr13 steel,
ScienceDirect 2025; neural network MPC studies
[3] 温度效应校验:速率每升温 6~8°C 翻倍Ea≈45 kJ/mol 时对应约 7°C
[4] 欠酸洗风险判别特征strip thickness, speed, conc, temp
来源Prediction of under pickling defects on steel strip surface,
arXiv:1207.0911
[5] 速度优化Nelder-Mead simplex 已在实际 1450mm 酸洗线验证
来源Zhu et al., Advances in Mechanical Engineering, 2016
"""
import math
import json
import os
from typing import List, Dict, Any, Optional, Tuple
# ── 校准系数持久化路径 ────────────────────────────────────────────────────────
_CAL_FILE = os.path.join(os.path.dirname(__file__), "cal_coeffs.json")
def _load_cal() -> Dict[str, float]:
try:
with open(_CAL_FILE) as f:
return json.load(f)
except Exception:
return {}
def _save_cal(d: Dict[str, float]):
with open(_CAL_FILE, "w") as f:
json.dump(d, f, indent=2)
# ─────────────────────────────────────────────────────────────────────────────
# 1. 酸洗速度模型Gray-box
# ─────────────────────────────────────────────────────────────────────────────
class AcidSpeedModel:
"""
基于文献实测参数的 Arrhenius 灰箱模型。
与上一版本的关键差异:
- Ea/R: 3000 K → 5413 K45 kJ/mol 实验值,文献[1]
- 浓度指数: 0.6 → 1.2H⁺ 二阶动力学,文献[2]
- 增加氧化铁皮结构修正FeO/Fe₃O₄双层模型文献[4]
- 内置 K_cal 校准系数,支持投产后在线标定
"""
# 文献实验值(碳钢 HCl 连续酸洗)
TANK_LENGTH = 18.0 # m单槽有效长度按设备规格书
NUM_TANKS = 6
# K0 由实际工况反推:
# 目标75°C、180 g/L 正常酸液条件下,最大速度约 120~130 m/minPI≥95%
# 推导t_total = 6×18/(125/60)=51.8sk=ln(20)/51.8=0.058 s⁻¹
# k=K0×SCALE_RATE_FACTOR → K0=0.058/0.765≈0.075 s⁻¹
K0 = 0.075 # 指前因子 s⁻¹由设备规格反推标定
EA_R = 5413.0 # Ea/R (K)Ea=45 kJ/mol / R=8.314(文献实验值[1]
T_REF = 348.15 # 参考温度 75°C (K)
C_REF = 180.0 # 参考游离酸浓度 g/L
N_CONC = 1.2 # 浓度动力学阶次(文献[2] 取保守值)
V_MIN = 20.0
V_MAX = 180.0
CAL_KEY = "acid_speed_kcal"
# 氧化铁皮结构系数FeO 快速溶解 + Fe₃O₄ 慢速溶解,文献[4]
# 热轧碳钢铁皮组成约FeO 70%Fe₃O₄ 20%Fe₂O₃ 10%
# FeO 溶速约为 Fe₃O₄ 的 4 倍;有效速率取加权平均
SCALE_RATE_FACTOR = 0.70 * 1.0 + 0.20 * 0.25 + 0.10 * 0.15 # ≈ 0.765
def __init__(
self,
thickness: float, # mm
width: float, # mm
steel_grade: str,
acid_conc_list: List[float], # 各槽游离酸 g/L
acid_temp_list: List[float], # 各槽温度 °C
scale_weight: float = 8.5, # g/m²氧化铁皮重量
target_pi: float = 95.0,
):
if len(acid_conc_list) != self.NUM_TANKS:
raise ValueError(f"acid_conc_list 需要 {self.NUM_TANKS} 个元素")
if len(acid_temp_list) != self.NUM_TANKS:
raise ValueError(f"acid_temp_list 需要 {self.NUM_TANKS} 个元素")
self.thickness = thickness
self.width = width
self.steel_grade = steel_grade
self.acid_conc_list = acid_conc_list
self.acid_temp_list = acid_temp_list
self.scale_weight = scale_weight
self.target_pi = target_pi
self.K_cal = _load_cal().get(self.CAL_KEY, 1.0)
def _k_i(self, conc: float, temp_c: float) -> float:
"""单槽有效酸洗速率常数(含文献参数 + 铁皮结构修正)"""
T_k = temp_c + 273.15
arrhenius = math.exp(-self.EA_R * (1.0 / T_k - 1.0 / self.T_REF))
conc_factor = max(conc / self.C_REF, 0.01) ** self.N_CONC
# 铁皮越厚,有效接触面积越低(正比于 1/scale_weight^0.3 经验修正)
scale_corr = (8.5 / max(self.scale_weight, 1.0)) ** 0.3
return self.K0 * arrhenius * conc_factor * self.SCALE_RATE_FACTOR * scale_corr * self.K_cal
def _compute_pi(self, v_mpm: float) -> Tuple[float, List[float], List[float]]:
v_mps = v_mpm / 60.0
pi_prev = 0.0
pi_per_tank, rt_per_tank = [], []
for i in range(self.NUM_TANKS):
t_i = self.TANK_LENGTH / v_mps
k_i = self._k_i(self.acid_conc_list[i], self.acid_temp_list[i])
# 精确解析解dPI/dt = k*(1-PI/100) → PI_new = 100-(100-PI_old)*exp(-k*t)
# 避免 Euler 一阶近似在 k*t 较大时的严重失真
pi_prev = 100.0 - (100.0 - pi_prev) * math.exp(-k_i * t_i)
pi_per_tank.append(round(pi_prev, 2))
rt_per_tank.append(round(t_i, 1))
return pi_prev, pi_per_tank, rt_per_tank
def calculate(self) -> Dict[str, Any]:
# Nelder-Mead 单维退化为二分搜索(文献[5]验证有效)
pi_at_min, _, _ = self._compute_pi(self.V_MIN)
if pi_at_min < self.target_pi:
pi, pp, rt = self._compute_pi(self.V_MIN)
return {
"max_speed": self.V_MIN,
"pi_per_tank": pp,
"residence_time_per_tank": rt,
"total_pi": round(pi, 2),
"under_pickling_risk": self._risk_level(self.V_MIN, pi),
"warning": "酸液条件不足,即使最低速下酸洗指数仍低于目标,请检查酸浓度和温度",
"K_cal": self.K_cal,
}
lo, hi, best_v = self.V_MIN, self.V_MAX, self.V_MIN
while hi - lo >= 0.5:
mid = (lo + hi) / 2.0
pi_mid, _, _ = self._compute_pi(mid)
if pi_mid >= self.target_pi:
best_v = mid; lo = mid + 0.5
else:
hi = mid - 0.5
best_v = math.floor(best_v)
total_pi, pi_per_tank, rt_per_tank = self._compute_pi(best_v)
return {
"max_speed": best_v,
"pi_per_tank": pi_per_tank,
"residence_time_per_tank": rt_per_tank,
"total_pi": round(total_pi, 2),
"under_pickling_risk": self._risk_level(best_v, total_pi),
"warning": None,
"K_cal": self.K_cal,
}
def _risk_level(self, speed: float, pi: float) -> str:
"""
欠酸洗风险评估(文献[4] decision-tree 特征阈值)
输入speed(m/min), pi(%),结合厚度、浓度综合判断
"""
avg_conc = sum(self.acid_conc_list) / len(self.acid_conc_list)
avg_temp = sum(self.acid_temp_list) / len(self.acid_temp_list)
# 文献给出的欠酸洗高风险条件组合
risk_score = 0
if pi < 85: risk_score += 3
elif pi < 92: risk_score += 1
if speed > 140: risk_score += 2
if avg_conc < 120: risk_score += 2
if avg_temp < 68: risk_score += 2
if self.thickness > 4.0: risk_score += 1
if risk_score >= 5: return "HIGH"
elif risk_score >= 2: return "MEDIUM"
else: return "LOW"
def calibrate(self, actual_max_speed: float,
actual_quality_ok: bool) -> float:
"""
投产后标定接口:
传入某卷的实际最大可用速度(操作员确认质量合格时的速度),
用简单比例更新 K_cal使模型逐步向真实工况收敛。
actual_max_speed: 实际测得质量合格的最高速度 (m/min)
actual_quality_ok: True=该速度下质量合格False=出现欠酸洗
"""
predicted = self.calculate()["max_speed"]
if not actual_quality_ok:
# 预测速度偏高,缩减 K_cal
adjustment = 0.95
else:
ratio = actual_max_speed / max(predicted, 1.0)
# 平滑更新,避免单次样本过拟合
adjustment = 1.0 + 0.3 * (ratio - 1.0)
adjustment = max(0.7, min(1.3, adjustment))
self.K_cal = round(self.K_cal * adjustment, 4)
cal = _load_cal()
cal[self.CAL_KEY] = self.K_cal
_save_cal(cal)
return self.K_cal
# ─────────────────────────────────────────────────────────────────────────────
# 2. 张力设定模型
# ─────────────────────────────────────────────────────────────────────────────
class TensionModel:
"""
张力模型:基于截面积×屈服强度,区间比例系数参考酸洗线工程手册。
每个区段独立校准系数 zone_kcal[zone],互不干扰。
"""
# 各区基准比例系数(酸洗线工程实践均值)
ZONE_RATIOS = {
"inlet": 1.00,
"s1_roller": 0.85,
"acid_entry": 0.78,
"acid1": 0.72,
"acid2": 0.68,
"acid3": 0.68,
"rinse": 0.70,
"leveler": 0.76,
"s2_roller": 0.88,
"outlet": 1.00,
}
ZONE_NAMES_CN = {
"inlet": "入口张力辊",
"s1_roller": "S1夹送辊",
"acid_entry": "酸洗入口辊",
"acid1": "1#酸槽",
"acid2": "2#酸槽",
"acid3": "3#酸槽",
"rinse": "漂洗段辊",
"leveler": "拉矫机",
"s2_roller": "S2夹送辊",
"outlet": "出口张力辊",
}
@staticmethod
def _zone_cal_key(zone: str) -> str:
return f"tension_zone_{zone}"
def __init__(
self,
thickness: float,
width: float,
yield_strength: float,
tension_coef: float = 0.25,
):
self.thickness = thickness
self.width = width
self.yield_strength = yield_strength
self.tension_coef = tension_coef
cal = _load_cal()
# 每个区段独立加载自己的校准系数,默认 1.0
self.zone_kcal: Dict[str, float] = {
z: cal.get(self._zone_cal_key(z), 1.0)
for z in self.ZONE_RATIOS
}
def calculate(self) -> Dict[str, Any]:
cross_section = self.thickness * self.width # mm²
# T_max 是理论基准值(不含区段校准,区段校准在各 zone 内单独乘)
t_base_kn = (self.tension_coef * self.yield_strength
* cross_section / 1000.0) # kN
zones = {}
for zone, ratio in self.ZONE_RATIOS.items():
k = self.zone_kcal.get(zone, 1.0)
zones[zone] = {
"tension_kN": round(t_base_kn * ratio * k, 2),
"ratio": ratio,
"k_cal": k,
"name_cn": self.ZONE_NAMES_CN[zone],
}
density = 7850.0
mass_per_m = density * (self.thickness / 1000.0) * (self.width / 1000.0)
accel_kn = round(mass_per_m * (30.0 / 60.0) / 1000.0, 3)
t_max_kn = round(t_base_kn * self.zone_kcal.get("inlet", 1.0), 2)
return {
"T_max": t_max_kn,
"T_base": round(t_base_kn, 2),
"cross_section_mm2": round(cross_section, 1),
"zones": zones,
"weld_speed_limit": 60.0,
"weld_tension_kN": round(t_max_kn * 0.60, 2),
"accel_tension": accel_kn,
"zone_kcal": self.zone_kcal,
}
def calibrate(self, zone: str, measured_kn: float) -> Dict[str, float]:
"""仅更新指定区段的校准系数,其他区段不变"""
if zone not in self.ZONE_RATIOS:
raise ValueError(f"未知区段: {zone}")
t_base = (self.tension_coef * self.yield_strength
* self.thickness * self.width / 1000.0)
predicted = t_base * self.ZONE_RATIOS[zone] * self.zone_kcal[zone]
ratio = measured_kn / max(predicted, 0.1)
# 平滑更新,步长 40%,范围限制在 [0.5, 2.0]
adjustment = 1.0 + 0.4 * (ratio - 1.0)
adjustment = max(0.5, min(2.0, adjustment))
new_k = round(self.zone_kcal[zone] * adjustment, 4)
self.zone_kcal[zone] = new_k
cal = _load_cal()
cal[self._zone_cal_key(zone)] = new_k
_save_cal(cal)
return self.zone_kcal
# ─────────────────────────────────────────────────────────────────────────────
# 3. 质量预测模型
# ─────────────────────────────────────────────────────────────────────────────
class QualityPredictionModel:
"""
欠酸洗风险 + 质量等级预测。
v2 变化:
- 使用与 AcidSpeedModel 一致的文献参数Ea/R=5413, n=1.2
- 欠酸洗风险特征阈值参考 arXiv:1207.0911 的 decision-tree 结论
- 增加铁离子浓度FeCl₂对酸洗能力的抑制修正
- 支持投产后用实际质量等级校准评分阈值
"""
EA_R = 5413.0
T_REF = 348.15
C_REF = 180.0
N_CONC = 1.2
CAL_KEY = "quality_kcal"
def __init__(
self,
thickness: float,
avg_speed: float,
acid_conc_avg: float, # 游离酸均值 g/L
acid_temp_avg: float, # 温度均值 °C
scale_weight: float = 8.5,
fe_conc_avg: float = 60.0, # FeCl₂ 浓度 g/L铁离子抑制效应
):
self.thickness = thickness
self.avg_speed = avg_speed
self.acid_conc_avg = acid_conc_avg
self.acid_temp_avg = acid_temp_avg
self.scale_weight = scale_weight
self.fe_conc_avg = fe_conc_avg
self.K_cal = _load_cal().get(self.CAL_KEY, 1.0)
def _pickling_index_score(self) -> float:
T_k = self.acid_temp_avg + 273.15
arrhenius = math.exp(-self.EA_R * (1.0 / T_k - 1.0 / self.T_REF))
conc_factor = max(self.acid_conc_avg / self.C_REF, 0.01) ** self.N_CONC
# 铁离子抑制FeCl₂ > 80 g/L 时显著降低酸洗速率(文献经验)
fe_inhibition = 1.0 - max(0.0, (self.fe_conc_avg - 80.0) / 200.0) * 0.35
scale_corr = (8.5 / max(self.scale_weight, 1.0)) ** 0.3
exposure = (1.20 * arrhenius * conc_factor * fe_inhibition
* scale_corr * 18.0 * 6) / (self.avg_speed / 60.0)
pi_score = 100.0 * (1.0 - math.exp(-exposure / 10.0))
return min(max(pi_score * self.K_cal, 0.0), 100.0)
def _surface_score(self, pi_score: float) -> float:
# 最优速度区间 80-140 m/min文献[4] 欠酸洗风险判别边界)
if self.avg_speed < 60:
speed_score = 80.0
elif self.avg_speed <= 140:
speed_score = 80.0 + 15.0 * (self.avg_speed - 60) / 80.0
else:
over = (self.avg_speed - 140) / 40.0
speed_score = 95.0 - 30.0 * over
return min(max(pi_score * 0.65 + speed_score * 0.35, 0.0), 100.0)
def _grade(self, pi: float, surface: float) -> str:
c = (pi + surface) / 2.0
if c >= 90: return "A1"
if c >= 80: return "A2"
if c >= 70: return "B1"
if c >= 60: return "B2"
return "C"
def _recommendations(self, pi: float, surface: float) -> List[str]:
recs = []
if self.fe_conc_avg > 80:
recs.append(f"铁离子浓度偏高({self.fe_conc_avg:.0f} g/L酸洗能力受抑制建议加速换酸或补充新酸")
if pi < 80:
recs.append("酸洗指数偏低,建议提高酸液浓度至 180 g/L 以上,或将温度升至 80°C")
if pi < 65:
recs.append(f"欠酸洗风险高,建议将线速降至 {max(self.avg_speed*0.75, 20):.0f} m/min 以下")
if self.acid_temp_avg < 70:
recs.append(f"酸液温度偏低({self.acid_temp_avg:.1f}°C建议升温至 75~85°C")
if self.acid_conc_avg < 120:
recs.append(f"游离酸浓度偏低({self.acid_conc_avg:.0f} g/L建议补充新酸至 150 g/L")
if self.avg_speed > 150:
recs.append(f"线速过高({self.avg_speed:.0f} m/min欠酸洗风险建议不超过 140 m/min")
if self.scale_weight > 12.0:
recs.append(f"氧化铁皮偏重({self.scale_weight:.1f} g/m²建议检查加热炉气氛控制")
if not recs:
recs.append("工艺参数在正常范围内,当前设定可继续保持")
return recs
def calculate(self) -> Dict[str, Any]:
pi = round(self._pickling_index_score(), 1)
surface = round(self._surface_score(pi), 1)
return {
"pi_score": pi,
"surface_score": surface,
"overall_grade": self._grade(pi, surface),
"recommendations": self._recommendations(pi, surface),
"K_cal": self.K_cal,
}
def calibrate(self, actual_grade: str) -> float:
"""传入实际质检等级,更新评分校准系数"""
grade_map = {"A1": 95, "A2": 85, "B1": 75, "B2": 65, "C": 50}
actual_score = grade_map.get(actual_grade, 75)
result = self.calculate()
predicted_score = (result["pi_score"] + result["surface_score"]) / 2.0
ratio = actual_score / max(predicted_score, 1.0)
adjustment = 1.0 + 0.3 * (ratio - 1.0)
adjustment = max(0.7, min(1.3, adjustment))
self.K_cal = round(self.K_cal * adjustment, 4)
cal = _load_cal()
cal[self.CAL_KEY] = self.K_cal
_save_cal(cal)
return self.K_cal
# ─────────────────────────────────────────────────────────────────────────────
# 4. 消耗预测模型
# ─────────────────────────────────────────────────────────────────────────────
class AcidConsumptionModel:
"""
单卷资源消耗预测。
单位消耗定额取自浙江企鹅1250mm规格书
酸耗额外引入铁离子浓度修正FeCl₂ 越高酸液越快失效,换酸频率越高)。
"""
ACID_WITH_REGEN = 2.0 # kg/t
ACID_WITHOUT_REGEN = 35.0 # kg/t
STEAM_UNIT = 39.8 # kg/t
POWER_UNIT = 14.0 # kWh/t
COOLING_UNIT = 1.21 # m³/t
def __init__(
self,
thickness: float,
width: float,
coil_weight_kg: float,
has_regen_station: bool = True,
fe_conc_avg: float = 60.0, # FeCl₂ g/L影响换酸频率
):
self.thickness = thickness
self.width = width
self.coil_weight_kg = coil_weight_kg
self.has_regen_station = has_regen_station
self.fe_conc_avg = fe_conc_avg
def calculate(self) -> Dict[str, Any]:
weight_t = self.coil_weight_kg / 1000.0
acid_base = self.ACID_WITH_REGEN if self.has_regen_station else self.ACID_WITHOUT_REGEN
# 铁离子修正FeCl₂ > 100 g/L 时酸液利用率下降,有效酸耗上升
fe_factor = 1.0 + max(0.0, (self.fe_conc_avg - 100.0) / 100.0) * 0.4
acid_unit = round(acid_base * fe_factor, 3)
return {
"coil_weight_t": round(weight_t, 3),
"has_regen_station": self.has_regen_station,
"acid_consumption_kg": round(acid_unit * weight_t, 2),
"acid_unit_kg_per_t": acid_unit,
"steam_consumption_kg": round(self.STEAM_UNIT * weight_t, 2),
"steam_unit_kg_per_t": self.STEAM_UNIT,
"power_consumption_kwh": round(self.POWER_UNIT * weight_t, 2),
"power_unit_kwh_per_t": self.POWER_UNIT,
"cooling_water_m3": round(self.COOLING_UNIT * weight_t, 3),
"cooling_water_unit_m3_per_t": self.COOLING_UNIT,
"fe_conc_factor": round(fe_factor, 3),
}