diff --git a/backend/Dockerfile b/backend/Dockerfile index c57f001..cce6ab9 100644 --- a/backend/Dockerfile +++ b/backend/Dockerfile @@ -1,6 +1,6 @@ FROM python:3.12-slim WORKDIR /app COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt +RUN pip install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple -r requirements.txt COPY . . CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/backend/app/services/prediction.py b/backend/app/services/prediction.py index 84432f9..600ef38 100644 --- a/backend/app/services/prediction.py +++ b/backend/app/services/prediction.py @@ -1,33 +1,22 @@ """ -工艺预测模型 — 灰箱(Gray-box)架构 +工艺预测模型 — 灰箱物理模型 + ONNX 神经网络双栈 -设计思路: - 物理结构来自 Arrhenius 酸洗动力学,参数取自公开文献实验值, - 而非理论推导。每个模型内置校准系数 K_cal(初始=1.0), - 投产后可通过 calibrate() 方法用实测结果回归更新, - 使模型随数据积累逐步收敛到真实工况。 +推理优先级: + 1. ONNX 模型(onnxruntime,若 pt_models/ 目录存在则加载) + 2. 物理灰箱模型(Arrhenius 解析解,始终可用) -关键文献依据: - [1] 碳钢 HCl 酸洗活化能:Ea ≈ 40~50 kJ/mol(实验测定均值取 45 kJ/mol) - 来源:Hydrochloric Acid Pickling Process Optimization in Metal Wire, - IJSSST Vol-16 No-5; IspatGuru Pickling of Hot Rolled Strip - [2] H⁺ 浓度动力学阶次:1.0~2.0阶(取保守值 1.2) - 来源:Optimizing pickling process for 30Cr13 steel, - ScienceDirect 2025; neural network MPC studies - [3] 温度效应校验:速率每升温 6~8°C 翻倍(Ea≈45 kJ/mol 时对应约 7°C) - [4] 欠酸洗风险判别特征:strip thickness, speed, conc, temp - 来源:Prediction of under pickling defects on steel strip surface, - arXiv:1207.0911 - [5] 速度优化:Nelder-Mead simplex 已在实际 1450mm 酸洗线验证 - 来源:Zhu et al., Advances in Mechanical Engineering, 2016 +训练:运行 backend/train_models.py 重新生成 pt_models/*.onnx +校准:K_cal 系数持久化在 cal_coeffs.json,两个栈都使用同一套 K_cal """ import math import json import os +from pathlib import Path from typing import List, Dict, Any, Optional, Tuple +from loguru import logger -# ── 校准系数持久化路径 ──────────────────────────────────────────────────────── -_CAL_FILE = os.path.join(os.path.dirname(__file__), "cal_coeffs.json") +# ── 校准系数持久化 ──────────────────────────────────────────────────────────── +_CAL_FILE = Path(__file__).parent / "cal_coeffs.json" def _load_cal() -> Dict[str, float]: try: @@ -41,56 +30,78 @@ def _save_cal(d: Dict[str, float]): json.dump(d, f, indent=2) +# ── ONNX 推理层 ─────────────────────────────────────────────────────────────── +_PT_DIR = Path(__file__).parent / "pt_models" +_scalers: Optional[Dict] = None +_sess: Dict[str, Any] = {} + +try: + import onnxruntime as ort + import numpy as _np + + _sp = _PT_DIR / "scalers.json" + if _sp.exists(): + with open(_sp) as f: + _scalers = json.load(f) + + for _name in ("acid_speed", "tension", "quality"): + _p = _PT_DIR / f"{_name}.onnx" + if _p.exists(): + _sess[_name] = ort.InferenceSession( + str(_p), providers=["CPUExecutionProvider"] + ) + if _sess: + logger.info(f"PT models loaded: {list(_sess.keys())}") +except ImportError: + logger.warning("onnxruntime not installed — using physics fallback") + + +def _pt_infer(name: str, x_raw: List[float]) -> Optional[List[float]]: + """标准化 → ONNX 推理 → 反标准化,返回输出向量;失败返回 None。""" + if name not in _sess or _scalers is None: + return None + try: + sc = _scalers[name] + xm = _np.array(sc["X_mean"], dtype=_np.float32) + xs = _np.array(sc["X_std"], dtype=_np.float32) + ym = _np.array(sc["y_mean"], dtype=_np.float32) + ys = _np.array(sc["y_std"], dtype=_np.float32) + x = (_np.array(x_raw, dtype=_np.float32) - xm) / xs + raw = _sess[name].run(None, {"input": x.reshape(1, -1)})[0][0] + return (raw * ys + ym).tolist() + except Exception as e: + logger.warning(f"PT infer {name} failed: {e}") + return None + + # ───────────────────────────────────────────────────────────────────────────── -# 1. 酸洗速度模型(Gray-box) +# 1. 酸洗速度模型 # ───────────────────────────────────────────────────────────────────────────── class AcidSpeedModel: """ - 基于文献实测参数的 Arrhenius 灰箱模型。 - - 与上一版本的关键差异: - - Ea/R: 3000 K → 5413 K(45 kJ/mol 实验值,文献[1]) - - 浓度指数: 0.6 → 1.2(H⁺ 二阶动力学,文献[2]) - - 增加氧化铁皮结构修正(FeO/Fe₃O₄双层模型,文献[4]) - - 内置 K_cal 校准系数,支持投产后在线标定 + 灰箱: Arrhenius 动力学 + 二分搜索 + PT栈: 14 维输入 → 最大速度 (m/min) + 输入: [thickness, scale_weight, conc×6, temp×6] """ + TANK_LENGTH = 18.0 + NUM_TANKS = 6 + K0 = 0.075 + EA_R = 5413.0 + T_REF = 348.15 + C_REF = 180.0 + N_CONC = 1.2 + V_MIN = 20.0 + V_MAX = 180.0 + SCALE_RATE_FACTOR = 0.70 * 1.0 + 0.20 * 0.25 + 0.10 * 0.15 + CAL_KEY = "acid_speed_kcal" - # 文献实验值(碳钢 HCl 连续酸洗) - TANK_LENGTH = 18.0 # m,单槽有效长度(按设备规格书) - NUM_TANKS = 6 - # K0 由实际工况反推: - # 目标:75°C、180 g/L 正常酸液条件下,最大速度约 120~130 m/min(PI≥95%) - # 推导:t_total = 6×18/(125/60)=51.8s,k=ln(20)/51.8=0.058 s⁻¹ - # k=K0×SCALE_RATE_FACTOR → K0=0.058/0.765≈0.075 s⁻¹ - K0 = 0.075 # 指前因子 s⁻¹,由设备规格反推标定 - EA_R = 5413.0 # Ea/R (K),Ea=45 kJ/mol / R=8.314(文献实验值[1]) - T_REF = 348.15 # 参考温度 75°C (K) - C_REF = 180.0 # 参考游离酸浓度 g/L - N_CONC = 1.2 # 浓度动力学阶次(文献[2] 取保守值) - V_MIN = 20.0 - V_MAX = 180.0 - CAL_KEY = "acid_speed_kcal" - - # 氧化铁皮结构系数(FeO 快速溶解 + Fe₃O₄ 慢速溶解,文献[4]) - # 热轧碳钢铁皮组成约:FeO 70%,Fe₃O₄ 20%,Fe₂O₃ 10% - # FeO 溶速约为 Fe₃O₄ 的 4 倍;有效速率取加权平均 - SCALE_RATE_FACTOR = 0.70 * 1.0 + 0.20 * 0.25 + 0.10 * 0.15 # ≈ 0.765 - - def __init__( - self, - thickness: float, # mm - width: float, # mm - steel_grade: str, - acid_conc_list: List[float], # 各槽游离酸 g/L - acid_temp_list: List[float], # 各槽温度 °C - scale_weight: float = 8.5, # g/m²,氧化铁皮重量 - target_pi: float = 95.0, - ): + def __init__(self, thickness, width, steel_grade, + acid_conc_list, acid_temp_list, + scale_weight=8.5, target_pi=95.0): if len(acid_conc_list) != self.NUM_TANKS: raise ValueError(f"acid_conc_list 需要 {self.NUM_TANKS} 个元素") if len(acid_temp_list) != self.NUM_TANKS: raise ValueError(f"acid_temp_list 需要 {self.NUM_TANKS} 个元素") - self.thickness = thickness self.width = width self.steel_grade = steel_grade @@ -100,110 +111,87 @@ class AcidSpeedModel: self.target_pi = target_pi self.K_cal = _load_cal().get(self.CAL_KEY, 1.0) - def _k_i(self, conc: float, temp_c: float) -> float: - """单槽有效酸洗速率常数(含文献参数 + 铁皮结构修正)""" - T_k = temp_c + 273.15 - arrhenius = math.exp(-self.EA_R * (1.0 / T_k - 1.0 / self.T_REF)) - conc_factor = max(conc / self.C_REF, 0.01) ** self.N_CONC - # 铁皮越厚,有效接触面积越低(正比于 1/scale_weight^0.3 经验修正) - scale_corr = (8.5 / max(self.scale_weight, 1.0)) ** 0.3 - return self.K0 * arrhenius * conc_factor * self.SCALE_RATE_FACTOR * scale_corr * self.K_cal + def _k_i(self, conc, temp_c): + T_k = temp_c + 273.15 + arrhenius = math.exp(-self.EA_R * (1.0/T_k - 1.0/self.T_REF)) + c_factor = max(conc/self.C_REF, 0.01) ** self.N_CONC + scale_corr = (8.5 / max(self.scale_weight, 1.0)) ** 0.3 + return self.K0 * arrhenius * c_factor * self.SCALE_RATE_FACTOR * scale_corr * self.K_cal - def _compute_pi(self, v_mpm: float) -> Tuple[float, List[float], List[float]]: - v_mps = v_mpm / 60.0 - pi_prev = 0.0 - pi_per_tank, rt_per_tank = [], [] + def _compute_pi(self, v_mpm): + v_mps = v_mpm / 60.0 + pi = 0.0 + pp, rt = [], [] for i in range(self.NUM_TANKS): - t_i = self.TANK_LENGTH / v_mps - k_i = self._k_i(self.acid_conc_list[i], self.acid_temp_list[i]) - # 精确解析解:dPI/dt = k*(1-PI/100) → PI_new = 100-(100-PI_old)*exp(-k*t) - # 避免 Euler 一阶近似在 k*t 较大时的严重失真 - pi_prev = 100.0 - (100.0 - pi_prev) * math.exp(-k_i * t_i) - pi_per_tank.append(round(pi_prev, 2)) - rt_per_tank.append(round(t_i, 1)) - return pi_prev, pi_per_tank, rt_per_tank + t_i = self.TANK_LENGTH / v_mps + k_i = self._k_i(self.acid_conc_list[i], self.acid_temp_list[i]) + pi = 100.0 - (100.0 - pi) * math.exp(-k_i * t_i) + pp.append(round(pi, 2)); rt.append(round(t_i, 1)) + return pi, pp, rt - def calculate(self) -> Dict[str, Any]: - # Nelder-Mead 单维退化为二分搜索(文献[5]验证有效) - pi_at_min, _, _ = self._compute_pi(self.V_MIN) - if pi_at_min < self.target_pi: + def _risk_level(self, speed, pi): + avg_conc = sum(self.acid_conc_list)/len(self.acid_conc_list) + avg_temp = sum(self.acid_temp_list)/len(self.acid_temp_list) + s = 0 + if pi < 85: s += 3 + elif pi < 92: s += 1 + if speed > 140: s += 2 + if avg_conc < 120: s += 2 + if avg_temp < 68: s += 2 + if self.thickness > 4.0: s += 1 + return "HIGH" if s >= 5 else "MEDIUM" if s >= 2 else "LOW" + + def _physics_result(self): + pi_min, _, _ = self._compute_pi(self.V_MIN) + if pi_min < self.target_pi: pi, pp, rt = self._compute_pi(self.V_MIN) return { - "max_speed": self.V_MIN, - "pi_per_tank": pp, - "residence_time_per_tank": rt, - "total_pi": round(pi, 2), + "max_speed": self.V_MIN, "pi_per_tank": pp, + "residence_time_per_tank": rt, "total_pi": round(pi, 2), "under_pickling_risk": self._risk_level(self.V_MIN, pi), - "warning": "酸液条件不足,即使最低速下酸洗指数仍低于目标,请检查酸浓度和温度", - "K_cal": self.K_cal, + "warning": "酸液条件不足,建议检查酸浓度和温度", + "K_cal": self.K_cal, "source": "physics", } - - lo, hi, best_v = self.V_MIN, self.V_MAX, self.V_MIN + lo, hi, best = self.V_MIN, self.V_MAX, self.V_MIN while hi - lo >= 0.5: mid = (lo + hi) / 2.0 - pi_mid, _, _ = self._compute_pi(mid) - if pi_mid >= self.target_pi: - best_v = mid; lo = mid + 0.5 + if self._compute_pi(mid)[0] >= self.target_pi: + best = mid; lo = mid + 0.5 else: hi = mid - 0.5 - - best_v = math.floor(best_v) - total_pi, pi_per_tank, rt_per_tank = self._compute_pi(best_v) - + best = math.floor(best) + pi, pp, rt = self._compute_pi(best) return { - "max_speed": best_v, - "pi_per_tank": pi_per_tank, - "residence_time_per_tank": rt_per_tank, - "total_pi": round(total_pi, 2), - "under_pickling_risk": self._risk_level(best_v, total_pi), - "warning": None, - "K_cal": self.K_cal, + "max_speed": best, "pi_per_tank": pp, + "residence_time_per_tank": rt, "total_pi": round(pi, 2), + "under_pickling_risk": self._risk_level(best, pi), + "warning": None, "K_cal": self.K_cal, "source": "physics", } - def _risk_level(self, speed: float, pi: float) -> str: - """ - 欠酸洗风险评估(文献[4] decision-tree 特征阈值) - 输入:speed(m/min), pi(%),结合厚度、浓度综合判断 - """ - avg_conc = sum(self.acid_conc_list) / len(self.acid_conc_list) - avg_temp = sum(self.acid_temp_list) / len(self.acid_temp_list) - # 文献给出的欠酸洗高风险条件组合 - risk_score = 0 - if pi < 85: risk_score += 3 - elif pi < 92: risk_score += 1 - if speed > 140: risk_score += 2 - if avg_conc < 120: risk_score += 2 - if avg_temp < 68: risk_score += 2 - if self.thickness > 4.0: risk_score += 1 + def calculate(self) -> Dict[str, Any]: + x = [self.thickness, self.scale_weight] + self.acid_conc_list + self.acid_temp_list + pt = _pt_infer("acid_speed", x) + if pt is not None: + raw_speed = pt[0] * self.K_cal + best = int(max(self.V_MIN, min(self.V_MAX, round(raw_speed)))) + pi, pp, rt = self._compute_pi(best) + return { + "max_speed": best, "pi_per_tank": pp, + "residence_time_per_tank": rt, "total_pi": round(pi, 2), + "under_pickling_risk": self._risk_level(best, pi), + "warning": None, "K_cal": self.K_cal, "source": "pt", + } + return self._physics_result() - if risk_score >= 5: return "HIGH" - elif risk_score >= 2: return "MEDIUM" - else: return "LOW" - - def calibrate(self, actual_max_speed: float, - actual_quality_ok: bool) -> float: - """ - 投产后标定接口: - 传入某卷的实际最大可用速度(操作员确认质量合格时的速度), - 用简单比例更新 K_cal,使模型逐步向真实工况收敛。 - - actual_max_speed: 实际测得质量合格的最高速度 (m/min) - actual_quality_ok: True=该速度下质量合格,False=出现欠酸洗 - """ + def calibrate(self, actual_max_speed, actual_quality_ok): predicted = self.calculate()["max_speed"] if not actual_quality_ok: - # 预测速度偏高,缩减 K_cal - adjustment = 0.95 + adj = 0.95 else: ratio = actual_max_speed / max(predicted, 1.0) - # 平滑更新,避免单次样本过拟合 - adjustment = 1.0 + 0.3 * (ratio - 1.0) - adjustment = max(0.7, min(1.3, adjustment)) - - self.K_cal = round(self.K_cal * adjustment, 4) - cal = _load_cal() - cal[self.CAL_KEY] = self.K_cal - _save_cal(cal) + adj = max(0.7, min(1.3, 1.0 + 0.3*(ratio - 1.0))) + self.K_cal = round(self.K_cal * adj, 4) + cal = _load_cal(); cal[self.CAL_KEY] = self.K_cal; _save_cal(cal) return self.K_cal @@ -212,106 +200,91 @@ class AcidSpeedModel: # ───────────────────────────────────────────────────────────────────────────── class TensionModel: """ - 张力模型:基于截面积×屈服强度,区间比例系数参考酸洗线工程手册。 - 每个区段独立校准系数 zone_kcal[zone],互不干扰。 + 灰箱: T_base = coef × σ_yield × A,各区段比例系数 + PT栈: 4 维输入 → 10 区段张力 kN + 输入: [thickness, width, yield_strength, tension_coef] """ - - # 各区基准比例系数(酸洗线工程实践均值) ZONE_RATIOS = { - "inlet": 1.00, - "s1_roller": 0.85, - "acid_entry": 0.78, - "acid1": 0.72, - "acid2": 0.68, - "acid3": 0.68, - "rinse": 0.70, - "leveler": 0.76, - "s2_roller": 0.88, - "outlet": 1.00, + "inlet": 1.00, "s1_roller": 0.85, "acid_entry": 0.78, + "acid1": 0.72, "acid2": 0.68, "acid3": 0.68, + "rinse": 0.70, "leveler": 0.76, "s2_roller": 0.88, "outlet": 1.00, } ZONE_NAMES_CN = { - "inlet": "入口张力辊", - "s1_roller": "S1夹送辊", - "acid_entry": "酸洗入口辊", - "acid1": "1#酸槽", - "acid2": "2#酸槽", - "acid3": "3#酸槽", - "rinse": "漂洗段辊", - "leveler": "拉矫机", - "s2_roller": "S2夹送辊", - "outlet": "出口张力辊", + "inlet": "入口张力辊", "s1_roller": "S1夹送辊", + "acid_entry": "酸洗入口辊", "acid1": "1#酸槽", + "acid2": "2#酸槽", "acid3": "3#酸槽", + "rinse": "漂洗段辊", "leveler": "拉矫机", + "s2_roller": "S2夹送辊", "outlet": "出口张力辊", } @staticmethod - def _zone_cal_key(zone: str) -> str: - return f"tension_zone_{zone}" + def _zone_cal_key(zone): return f"tension_zone_{zone}" - def __init__( - self, - thickness: float, - width: float, - yield_strength: float, - tension_coef: float = 0.25, - ): + def __init__(self, thickness, width, yield_strength, tension_coef=0.25): self.thickness = thickness self.width = width self.yield_strength = yield_strength self.tension_coef = tension_coef cal = _load_cal() - # 每个区段独立加载自己的校准系数,默认 1.0 - self.zone_kcal: Dict[str, float] = { - z: cal.get(self._zone_cal_key(z), 1.0) - for z in self.ZONE_RATIOS - } - - def calculate(self) -> Dict[str, Any]: - cross_section = self.thickness * self.width # mm² - # T_max 是理论基准值(不含区段校准,区段校准在各 zone 内单独乘) - t_base_kn = (self.tension_coef * self.yield_strength - * cross_section / 1000.0) # kN + self.zone_kcal = {z: cal.get(self._zone_cal_key(z), 1.0) for z in self.ZONE_RATIOS} + def _physics_zones(self, t_base_kn): zones = {} for zone, ratio in self.ZONE_RATIOS.items(): - k = self.zone_kcal.get(zone, 1.0) + k = self.zone_kcal[zone] zones[zone] = { "tension_kN": round(t_base_kn * ratio * k, 2), - "ratio": ratio, - "k_cal": k, - "name_cn": self.ZONE_NAMES_CN[zone], + "ratio": ratio, "k_cal": k, + "name_cn": self.ZONE_NAMES_CN[zone], } + return zones + + def calculate(self) -> Dict[str, Any]: + cross = self.thickness * self.width + t_base = self.tension_coef * self.yield_strength * cross / 1000.0 + + pt = _pt_infer("tension", [self.thickness, self.width, self.yield_strength, self.tension_coef]) + if pt is not None and _scalers and "tension" in _scalers: + zone_names = _scalers["tension"].get("zone_names", list(self.ZONE_RATIOS.keys())) + zones = {} + for i, zone in enumerate(zone_names): + k = self.zone_kcal.get(zone, 1.0) + kn = round(max(0.1, pt[i]) * k, 2) + zones[zone] = { + "tension_kN": kn, + "ratio": self.ZONE_RATIOS.get(zone, 1.0), + "k_cal": k, + "name_cn": self.ZONE_NAMES_CN.get(zone, zone), + } + source = "pt" + else: + zones = self._physics_zones(t_base) + source = "physics" density = 7850.0 - mass_per_m = density * (self.thickness / 1000.0) * (self.width / 1000.0) - accel_kn = round(mass_per_m * (30.0 / 60.0) / 1000.0, 3) - t_max_kn = round(t_base_kn * self.zone_kcal.get("inlet", 1.0), 2) + mass_per_m = density * (self.thickness/1000.0) * (self.width/1000.0) + accel_kn = round(mass_per_m * (30.0/60.0) / 1000.0, 3) + t_max = round(t_base * self.zone_kcal.get("inlet", 1.0), 2) return { - "T_max": t_max_kn, - "T_base": round(t_base_kn, 2), - "cross_section_mm2": round(cross_section, 1), - "zones": zones, + "T_max": t_max, "T_base": round(t_base, 2), + "cross_section_mm2": round(cross, 1), + "zones": zones, "weld_speed_limit": 60.0, - "weld_tension_kN": round(t_max_kn * 0.60, 2), - "accel_tension": accel_kn, - "zone_kcal": self.zone_kcal, + "weld_tension_kN": round(t_max * 0.60, 2), + "accel_tension": accel_kn, + "zone_kcal": self.zone_kcal, + "source": source, } - def calibrate(self, zone: str, measured_kn: float) -> Dict[str, float]: - """仅更新指定区段的校准系数,其他区段不变""" + def calibrate(self, zone, measured_kn): if zone not in self.ZONE_RATIOS: raise ValueError(f"未知区段: {zone}") - t_base = (self.tension_coef * self.yield_strength - * self.thickness * self.width / 1000.0) - predicted = t_base * self.ZONE_RATIOS[zone] * self.zone_kcal[zone] - ratio = measured_kn / max(predicted, 0.1) - # 平滑更新,步长 40%,范围限制在 [0.5, 2.0] - adjustment = 1.0 + 0.4 * (ratio - 1.0) - adjustment = max(0.5, min(2.0, adjustment)) - new_k = round(self.zone_kcal[zone] * adjustment, 4) - self.zone_kcal[zone] = new_k - cal = _load_cal() - cal[self._zone_cal_key(zone)] = new_k - _save_cal(cal) + t_base = self.tension_coef * self.yield_strength * self.thickness * self.width / 1000.0 + pred = t_base * self.ZONE_RATIOS[zone] * self.zone_kcal[zone] + adj = max(0.5, min(2.0, 1.0 + 0.4*(measured_kn/max(pred,0.1) - 1.0))) + self.zone_kcal[zone] = round(self.zone_kcal[zone] * adj, 4) + cal = _load_cal(); cal[self._zone_cal_key(zone)] = self.zone_kcal[zone]; _save_cal(cal) return self.zone_kcal @@ -320,29 +293,18 @@ class TensionModel: # ───────────────────────────────────────────────────────────────────────────── class QualityPredictionModel: """ - 欠酸洗风险 + 质量等级预测。 - - v2 变化: - - 使用与 AcidSpeedModel 一致的文献参数(Ea/R=5413, n=1.2) - - 欠酸洗风险特征阈值参考 arXiv:1207.0911 的 decision-tree 结论 - - 增加铁离子浓度(FeCl₂)对酸洗能力的抑制修正 - - 支持投产后用实际质量等级校准评分阈值 + 灰箱: Arrhenius PI 计算 + 速度惩罚 + PT栈: 6 维输入 → [pi_score, surface_score] + 输入: [thickness, avg_speed, acid_conc_avg, acid_temp_avg, scale_weight, fe_conc_avg] """ - EA_R = 5413.0 - T_REF = 348.15 - C_REF = 180.0 - N_CONC = 1.2 + EA_R = 5413.0 + T_REF = 348.15 + C_REF = 180.0 + N_CONC = 1.2 CAL_KEY = "quality_kcal" - def __init__( - self, - thickness: float, - avg_speed: float, - acid_conc_avg: float, # 游离酸均值 g/L - acid_temp_avg: float, # 温度均值 °C - scale_weight: float = 8.5, - fe_conc_avg: float = 60.0, # FeCl₂ 浓度 g/L(铁离子抑制效应) - ): + def __init__(self, thickness, avg_speed, acid_conc_avg, acid_temp_avg, + scale_weight=8.5, fe_conc_avg=60.0): self.thickness = thickness self.avg_speed = avg_speed self.acid_conc_avg = acid_conc_avg @@ -351,108 +313,96 @@ class QualityPredictionModel: self.fe_conc_avg = fe_conc_avg self.K_cal = _load_cal().get(self.CAL_KEY, 1.0) - def _pickling_index_score(self) -> float: - T_k = self.acid_temp_avg + 273.15 - arrhenius = math.exp(-self.EA_R * (1.0 / T_k - 1.0 / self.T_REF)) - conc_factor = max(self.acid_conc_avg / self.C_REF, 0.01) ** self.N_CONC - # 铁离子抑制:FeCl₂ > 80 g/L 时显著降低酸洗速率(文献经验) - fe_inhibition = 1.0 - max(0.0, (self.fe_conc_avg - 80.0) / 200.0) * 0.35 - scale_corr = (8.5 / max(self.scale_weight, 1.0)) ** 0.3 - exposure = (1.20 * arrhenius * conc_factor * fe_inhibition - * scale_corr * 18.0 * 6) / (self.avg_speed / 60.0) - pi_score = 100.0 * (1.0 - math.exp(-exposure / 10.0)) - return min(max(pi_score * self.K_cal, 0.0), 100.0) + def _pi(self): + T_k = self.acid_temp_avg + 273.15 + arr = math.exp(-self.EA_R*(1.0/T_k - 1.0/self.T_REF)) + c_factor = max(self.acid_conc_avg/self.C_REF, 0.01)**self.N_CONC + fe_inh = 1.0 - max(0.0, (self.fe_conc_avg-80.0)/200.0)*0.35 + scale_corr = (8.5/max(self.scale_weight,1.0))**0.3 + exposure = (1.2*arr*c_factor*fe_inh*scale_corr*18.0*6) / (self.avg_speed/60.0) + return min(max(100.0*(1.0-math.exp(-exposure/10.0))*self.K_cal, 0), 100) - def _surface_score(self, pi_score: float) -> float: - # 最优速度区间 80-140 m/min(文献[4] 欠酸洗风险判别边界) + def _surface(self, pi): if self.avg_speed < 60: - speed_score = 80.0 + ss = 80.0 elif self.avg_speed <= 140: - speed_score = 80.0 + 15.0 * (self.avg_speed - 60) / 80.0 + ss = 80.0 + 15.0*(self.avg_speed-60)/80.0 else: - over = (self.avg_speed - 140) / 40.0 - speed_score = 95.0 - 30.0 * over - return min(max(pi_score * 0.65 + speed_score * 0.35, 0.0), 100.0) + ss = 95.0 - 30.0*((self.avg_speed-140)/40.0) + return min(max(pi*0.65 + ss*0.35, 0), 100) - def _grade(self, pi: float, surface: float) -> str: - c = (pi + surface) / 2.0 + def _grade(self, pi, suf): + c = (pi+suf)/2.0 if c >= 90: return "A1" if c >= 80: return "A2" if c >= 70: return "B1" if c >= 60: return "B2" return "C" - def _recommendations(self, pi: float, surface: float) -> List[str]: + def _recommendations(self, pi, suf): recs = [] if self.fe_conc_avg > 80: - recs.append(f"铁离子浓度偏高({self.fe_conc_avg:.0f} g/L),酸洗能力受抑制,建议加速换酸或补充新酸") + recs.append(f"铁离子浓度偏高({self.fe_conc_avg:.0f} g/L),建议加速换酸") if pi < 80: - recs.append("酸洗指数偏低,建议提高酸液浓度至 180 g/L 以上,或将温度升至 80°C") + recs.append("酸洗指数偏低,建议提高酸液浓度至 180 g/L 以上,或升温至 80°C") if pi < 65: - recs.append(f"欠酸洗风险高,建议将线速降至 {max(self.avg_speed*0.75, 20):.0f} m/min 以下") + recs.append(f"欠酸洗风险高,建议将线速降至 {max(self.avg_speed*0.75, 20):.0f} m/min") if self.acid_temp_avg < 70: recs.append(f"酸液温度偏低({self.acid_temp_avg:.1f}°C),建议升温至 75~85°C") if self.acid_conc_avg < 120: - recs.append(f"游离酸浓度偏低({self.acid_conc_avg:.0f} g/L),建议补充新酸至 150 g/L") + recs.append(f"游离酸浓度偏低({self.acid_conc_avg:.0f} g/L),建议补充新酸") if self.avg_speed > 150: - recs.append(f"线速过高({self.avg_speed:.0f} m/min),欠酸洗风险,建议不超过 140 m/min") + recs.append(f"线速过高({self.avg_speed:.0f} m/min),欠酸洗风险") if self.scale_weight > 12.0: - recs.append(f"氧化铁皮偏重({self.scale_weight:.1f} g/m²),建议检查加热炉气氛控制") + recs.append(f"氧化铁皮偏重({self.scale_weight:.1f} g/m²),建议检查加热炉气氛") if not recs: recs.append("工艺参数在正常范围内,当前设定可继续保持") return recs def calculate(self) -> Dict[str, Any]: - pi = round(self._pickling_index_score(), 1) - surface = round(self._surface_score(pi), 1) + x = [self.thickness, self.avg_speed, self.acid_conc_avg, + self.acid_temp_avg, self.scale_weight, self.fe_conc_avg] + pt = _pt_infer("quality", x) + + if pt is not None: + pi = round(float(min(max(pt[0]*self.K_cal, 0), 100)), 1) + suf = round(float(min(max(pt[1]*self.K_cal, 0), 100)), 1) + src = "pt" + else: + pi = round(self._pi(), 1) + suf = round(self._surface(pi), 1) + src = "physics" + return { - "pi_score": pi, - "surface_score": surface, - "overall_grade": self._grade(pi, surface), - "recommendations": self._recommendations(pi, surface), - "K_cal": self.K_cal, + "pi_score": pi, "surface_score": suf, + "overall_grade": self._grade(pi, suf), + "recommendations": self._recommendations(pi, suf), + "K_cal": self.K_cal, "source": src, } - def calibrate(self, actual_grade: str) -> float: - """传入实际质检等级,更新评分校准系数""" + def calibrate(self, actual_grade): grade_map = {"A1": 95, "A2": 85, "B1": 75, "B2": 65, "C": 50} - actual_score = grade_map.get(actual_grade, 75) - result = self.calculate() - predicted_score = (result["pi_score"] + result["surface_score"]) / 2.0 - ratio = actual_score / max(predicted_score, 1.0) - adjustment = 1.0 + 0.3 * (ratio - 1.0) - adjustment = max(0.7, min(1.3, adjustment)) - self.K_cal = round(self.K_cal * adjustment, 4) - cal = _load_cal() - cal[self.CAL_KEY] = self.K_cal - _save_cal(cal) + target = grade_map.get(actual_grade, 75) + res = self.calculate() + pred = (res["pi_score"] + res["surface_score"]) / 2.0 + adj = max(0.7, min(1.3, 1.0 + 0.3*(target/max(pred,1.0) - 1.0))) + self.K_cal = round(self.K_cal * adj, 4) + cal = _load_cal(); cal[self.CAL_KEY] = self.K_cal; _save_cal(cal) return self.K_cal # ───────────────────────────────────────────────────────────────────────────── -# 4. 消耗预测模型 +# 4. 消耗预测模型(无 PT 版本,定额+修正公式足够) # ───────────────────────────────────────────────────────────────────────────── class AcidConsumptionModel: - """ - 单卷资源消耗预测。 - 单位消耗定额取自浙江企鹅1250mm规格书; - 酸耗额外引入铁离子浓度修正(FeCl₂ 越高酸液越快失效,换酸频率越高)。 - """ + ACID_WITH_REGEN = 2.0 + ACID_WITHOUT_REGEN = 35.0 + STEAM_UNIT = 39.8 + POWER_UNIT = 14.0 + COOLING_UNIT = 1.21 - ACID_WITH_REGEN = 2.0 # kg/t - ACID_WITHOUT_REGEN = 35.0 # kg/t - STEAM_UNIT = 39.8 # kg/t - POWER_UNIT = 14.0 # kWh/t - COOLING_UNIT = 1.21 # m³/t - - def __init__( - self, - thickness: float, - width: float, - coil_weight_kg: float, - has_regen_station: bool = True, - fe_conc_avg: float = 60.0, # FeCl₂ g/L,影响换酸频率 - ): + def __init__(self, thickness, width, coil_weight_kg, + has_regen_station=True, fe_conc_avg=60.0): self.thickness = thickness self.width = width self.coil_weight_kg = coil_weight_kg @@ -460,23 +410,19 @@ class AcidConsumptionModel: self.fe_conc_avg = fe_conc_avg def calculate(self) -> Dict[str, Any]: - weight_t = self.coil_weight_kg / 1000.0 + wt = self.coil_weight_kg / 1000.0 acid_base = self.ACID_WITH_REGEN if self.has_regen_station else self.ACID_WITHOUT_REGEN - - # 铁离子修正:FeCl₂ > 100 g/L 时酸液利用率下降,有效酸耗上升 - fe_factor = 1.0 + max(0.0, (self.fe_conc_avg - 100.0) / 100.0) * 0.4 + fe_factor = 1.0 + max(0.0, (self.fe_conc_avg-100.0)/100.0)*0.4 acid_unit = round(acid_base * fe_factor, 3) - return { - "coil_weight_t": round(weight_t, 3), - "has_regen_station": self.has_regen_station, - "acid_consumption_kg": round(acid_unit * weight_t, 2), - "acid_unit_kg_per_t": acid_unit, - "steam_consumption_kg": round(self.STEAM_UNIT * weight_t, 2), - "steam_unit_kg_per_t": self.STEAM_UNIT, - "power_consumption_kwh": round(self.POWER_UNIT * weight_t, 2), - "power_unit_kwh_per_t": self.POWER_UNIT, - "cooling_water_m3": round(self.COOLING_UNIT * weight_t, 3), + "coil_weight_t": round(wt, 3), + "acid_consumption_kg": round(acid_unit * wt, 2), + "acid_unit_kg_per_t": acid_unit, + "steam_consumption_kg": round(self.STEAM_UNIT * wt, 2), + "steam_unit_kg_per_t": self.STEAM_UNIT, + "power_consumption_kwh": round(self.POWER_UNIT * wt, 2), + "power_unit_kwh_per_t": self.POWER_UNIT, + "cooling_water_m3": round(self.COOLING_UNIT * wt, 3), "cooling_water_unit_m3_per_t": self.COOLING_UNIT, - "fe_conc_factor": round(fe_factor, 3), + "fe_conc_factor": round(fe_factor, 3), } diff --git a/backend/app/services/pt_models/acid_speed.onnx b/backend/app/services/pt_models/acid_speed.onnx new file mode 100644 index 0000000..d1fbafd Binary files /dev/null and b/backend/app/services/pt_models/acid_speed.onnx differ diff --git a/backend/app/services/pt_models/quality.onnx b/backend/app/services/pt_models/quality.onnx new file mode 100644 index 0000000..8d6c535 Binary files /dev/null and b/backend/app/services/pt_models/quality.onnx differ diff --git a/backend/app/services/pt_models/scalers.json b/backend/app/services/pt_models/scalers.json new file mode 100644 index 0000000..cbdb8e0 --- /dev/null +++ b/backend/app/services/pt_models/scalers.json @@ -0,0 +1,118 @@ +{ + "acid_speed": { + "X_mean": [ + 4.279637813568115, + 11.041364669799805, + 150.61114501953125, + 149.8550262451172, + 150.01771545410156, + 150.00747680664062, + 150.5503692626953, + 149.680419921875, + 69.46580505371094, + 69.35639953613281, + 69.35671997070312, + 69.52980041503906, + 69.55128479003906, + 69.26476287841797 + ], + "X_std": [ + 2.1389975547790527, + 4.052000999450684, + 51.74678421020508, + 52.34047317504883, + 51.52565383911133, + 52.03013610839844, + 51.94683074951172, + 51.77798080444336, + 10.097525596618652, + 10.071165084838867, + 10.012657165527344, + 10.097661018371582, + 10.077392578125, + 10.07375717163086 + ], + "y_mean": [ + 92.42212677001953 + ], + "y_std": [ + 30.15455436706543 + ] + }, + "tension": { + "X_mean": [ + 4.211259365081787, + 1104.45068359375, + 376.4231262207031, + 0.25053057074546814 + ], + "X_std": [ + 2.1753084659576416, + 290.6602783203125, + 129.755859375, + 0.057579852640628815 + ], + "y_mean": [ + 439.6892395019531, + 373.8921203613281, + 342.94708251953125, + 316.7646179199219, + 298.9944763183594, + 298.7807312011719, + 307.93359375, + 334.0721740722656, + 387.0394592285156, + 439.63824462890625 + ], + "y_std": [ + 341.6143798828125, + 290.6787109375, + 266.5653991699219, + 246.12326049804688, + 232.24676513671875, + 231.9054412841797, + 239.40611267089844, + 259.3957214355469, + 300.75, + 341.000244140625 + ], + "zone_names": [ + "inlet", + "s1_roller", + "acid_entry", + "acid1", + "acid2", + "acid3", + "rinse", + "leveler", + "s2_roller", + "outlet" + ] + }, + "quality": { + "X_mean": [ + 4.245825290679932, + 99.6146011352539, + 150.18072509765625, + 70.02484893798828, + 10.967876434326172, + 74.14215850830078 + ], + "X_std": [ + 2.18487548828125, + 46.58076858520508, + 51.831016540527344, + 11.553886413574219, + 4.02312707901001, + 31.69878578186035 + ], + "y_mean": [ + 92.10986328125, + 89.79142761230469 + ], + "y_std": [ + 11.346452713012695, + 8.92744255065918 + ] + } +} \ No newline at end of file diff --git a/backend/app/services/pt_models/tension.onnx b/backend/app/services/pt_models/tension.onnx new file mode 100644 index 0000000..5542d05 Binary files /dev/null and b/backend/app/services/pt_models/tension.onnx differ diff --git a/backend/requirements.txt b/backend/requirements.txt index 2b68318..0f828d4 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -18,3 +18,5 @@ redis==5.0.4 aioredis==2.0.1 httpx==0.27.0 loguru==0.7.2 +onnxruntime==1.18.0 +numpy==1.26.4 diff --git a/backend/train_models.py b/backend/train_models.py new file mode 100644 index 0000000..d7c4f83 --- /dev/null +++ b/backend/train_models.py @@ -0,0 +1,256 @@ +""" +本地训练脚本 — 生成合成数据、训练 MLP、导出 ONNX +运行方式(在 backend/ 目录下): + python train_models.py + +依赖(仅本地训练用,不进 Docker): + pip install torch onnx onnxruntime scikit-learn numpy +""" +import sys, json, time +from pathlib import Path + +import numpy as np +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import DataLoader, TensorDataset + +sys.path.insert(0, str(Path(__file__).parent)) +from app.services.prediction import AcidSpeedModel, TensionModel, QualityPredictionModel + +PT_DIR = Path(__file__).parent / "app" / "services" / "pt_models" +PT_DIR.mkdir(parents=True, exist_ok=True) + +SEED = 2024 +N = 12000 +np.random.seed(SEED) +torch.manual_seed(SEED) + +TENSION_ZONES = [ + "inlet", "s1_roller", "acid_entry", + "acid1", "acid2", "acid3", + "rinse", "leveler", "s2_roller", "outlet", +] + + +# ─── 网络结构 ─────────────────────────────────────────────────────────────── + +class MLP(nn.Module): + def __init__(self, in_dim: int, out_dim: int, hidden=(128, 64, 32)): + super().__init__() + layers: list = [] + prev = in_dim + for h in hidden: + layers += [nn.Linear(prev, h), nn.ReLU()] + prev = h + layers.append(nn.Linear(prev, out_dim)) + self.net = nn.Sequential(*layers) + + def forward(self, x: torch.Tensor) -> torch.Tensor: + return self.net(x) + + +# ─── 训练通用函数 ─────────────────────────────────────────────────────────── + +def fit(model: nn.Module, X: np.ndarray, y: np.ndarray, + epochs=300, lr=1e-3, batch_size=512) -> nn.Module: + Xt = torch.from_numpy(X) + yt = torch.from_numpy(y) + dl = DataLoader(TensorDataset(Xt, yt), batch_size=batch_size, shuffle=True) + opt = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4) + sched = optim.lr_scheduler.CosineAnnealingLR(opt, epochs) + loss_fn = nn.MSELoss() + + model.train() + for ep in range(1, epochs + 1): + tot = 0.0 + for xb, yb in dl: + opt.zero_grad() + loss = loss_fn(model(xb), yb) + loss.backward() + opt.step() + tot += loss.item() * len(xb) + sched.step() + if ep % 100 == 0: + print(f" ep {ep:3d}/{epochs} RMSE={((tot/len(Xt))**0.5):.5f}") + return model + + +def z_scale(arr: np.ndarray, mean=None, std=None): + if mean is None: + mean = arr.mean(axis=0) + std = arr.std(axis=0) + 1e-8 + return ((arr - mean) / std).astype(np.float32), mean, std + + +def export_onnx(model: nn.Module, in_dim: int, path: Path): + model.eval() + dummy = torch.zeros(1, in_dim) + torch.onnx.export( + model, dummy, str(path), + input_names=["input"], output_names=["output"], + dynamic_axes={"input": {0: "batch"}, "output": {0: "batch"}}, + opset_version=17, + ) + print(f" → {path.name} ({path.stat().st_size//1024} KB)") + + +# ─── 1. 酸洗速度模型 ──────────────────────────────────────────────────────── +# 输入(14): thickness, scale_weight, conc×6, temp×6 +# 输出(1): max_speed + +def gen_acid_speed(n: int): + rng = np.random.default_rng(SEED) + Xs, ys = [], [] + skip = 0 + while len(Xs) < n: + t = rng.uniform(0.5, 8.0) + sw = rng.uniform(4.0, 18.0) + conc = rng.uniform(60, 240, 6).tolist() + temp = rng.uniform(52, 87, 6).tolist() + tpi = rng.uniform(88, 97) + try: + m = AcidSpeedModel( + thickness=t, width=1000.0, steel_grade="Q235", + acid_conc_list=conc, acid_temp_list=temp, + scale_weight=sw, target_pi=tpi, + ) + spd = float(m.calculate()["max_speed"]) + except Exception: + skip += 1 + continue + + # 模拟真实工况偏差:±6% 相对噪声 + 钢种系数扰动 + steel_factor = rng.choice([0.92, 0.96, 1.00, 1.03, 1.06]) + noise = rng.normal(1.0, 0.06) + spd_n = float(np.clip(spd * noise * steel_factor, 20, 180)) + + Xs.append([t, sw] + conc + temp) + ys.append([spd_n]) + + print(f" acid_speed: {len(Xs)} samples (skipped {skip})") + return np.array(Xs, np.float32), np.array(ys, np.float32) + + +# ─── 2. 张力模型 ──────────────────────────────────────────────────────────── +# 输入(4): thickness, width, yield_strength, tension_coef +# 输出(10): 10 区段张力 kN + +def gen_tension(n: int): + rng = np.random.default_rng(SEED + 1) + Xs, ys = [], [] + while len(Xs) < n: + t = rng.uniform(0.5, 8.0) + w = rng.uniform(600, 1600) + ys_ = rng.uniform(150, 600) + tc = rng.uniform(0.15, 0.35) + + m = TensionModel(thickness=t, width=w, yield_strength=ys_, tension_coef=tc) + res = m.calculate() + tensions = [res["zones"][z]["tension_kN"] for z in TENSION_ZONES] + + # 各区段独立噪声(实测张力传感器精度约 ±4%) + noise = rng.normal(1.0, 0.04, 10) + tensions_n = [float(np.clip(v * noise[i], 0.1, 9999)) for i, v in enumerate(tensions)] + + Xs.append([t, w, ys_, tc]) + ys.append(tensions_n) + + print(f" tension: {len(Xs)} samples") + return np.array(Xs, np.float32), np.array(ys, np.float32) + + +# ─── 3. 质量预测模型 ───────────────────────────────────────────────────────── +# 输入(6): thickness, avg_speed, acid_conc_avg, acid_temp_avg, scale_weight, fe_conc_avg +# 输出(2): pi_score, surface_score + +def gen_quality(n: int): + rng = np.random.default_rng(SEED + 2) + Xs, ys = [], [] + while len(Xs) < n: + t = rng.uniform(0.5, 8.0) + spd = rng.uniform(20, 180) + conc = rng.uniform(60, 240) + temp = rng.uniform(50, 90) + sw = rng.uniform(4, 18) + fe = rng.uniform(20, 130) + + m = QualityPredictionModel( + thickness=t, avg_speed=spd, + acid_conc_avg=conc, acid_temp_avg=temp, + scale_weight=sw, fe_conc_avg=fe, + ) + res = m.calculate() + pi = res["pi_score"] + suf = res["surface_score"] + + # ±6% 噪声模拟质检测量不确定度 + pi_n = float(np.clip(pi * rng.normal(1.0, 0.06), 0, 100)) + suf_n = float(np.clip(suf * rng.normal(1.0, 0.06), 0, 100)) + + Xs.append([t, spd, conc, temp, sw, fe]) + ys.append([pi_n, suf_n]) + + print(f" quality: {len(Xs)} samples") + return np.array(Xs, np.float32), np.array(ys, np.float32) + + +# ─── 主流程 ───────────────────────────────────────────────────────────────── + +def main(): + scalers: dict = {} + t0 = time.time() + + # ── 酸洗速度 ── + print("\n[1/3] 酸洗速度模型") + X, y = gen_acid_speed(N) + Xn, Xm, Xs = z_scale(X) + yn, ym, ys_ = z_scale(y) + model = MLP(14, 1, hidden=(128, 64, 32)) + print(" 训练中...") + fit(model, Xn, yn, epochs=300) + export_onnx(model, 14, PT_DIR / "acid_speed.onnx") + scalers["acid_speed"] = { + "X_mean": Xm.tolist(), "X_std": Xs.tolist(), + "y_mean": ym.tolist(), "y_std": ys_.tolist(), + } + + # ── 张力 ── + print("\n[2/3] 张力模型") + X, y = gen_tension(N) + Xn, Xm, Xs = z_scale(X) + yn, ym, ys_ = z_scale(y) + model = MLP(4, 10, hidden=(64, 64, 32)) + print(" 训练中...") + fit(model, Xn, yn, epochs=300) + export_onnx(model, 4, PT_DIR / "tension.onnx") + scalers["tension"] = { + "X_mean": Xm.tolist(), "X_std": Xs.tolist(), + "y_mean": ym.tolist(), "y_std": ys_.tolist(), + "zone_names": TENSION_ZONES, + } + + # ── 质量 ── + print("\n[3/3] 质量预测模型") + X, y = gen_quality(N) + Xn, Xm, Xs = z_scale(X) + yn, ym, ys_ = z_scale(y) + model = MLP(6, 2, hidden=(64, 32)) + print(" 训练中...") + fit(model, Xn, yn, epochs=300) + export_onnx(model, 6, PT_DIR / "quality.onnx") + scalers["quality"] = { + "X_mean": Xm.tolist(), "X_std": Xs.tolist(), + "y_mean": ym.tolist(), "y_std": ys_.tolist(), + } + + # ── 保存 scaler 参数 ── + scaler_path = PT_DIR / "scalers.json" + with open(scaler_path, "w") as f: + json.dump(scalers, f, indent=2) + print(f"\n scalers → {scaler_path.name}") + print(f"\n完成 ({time.time()-t0:.1f}s)\n") + + +if __name__ == "__main__": + main()