Add PyTorch/ONNX prediction models with physics fallback

- Train 3 MLP networks (acid speed 14→1, tension 4→10, quality 6→2)
  on 12,000 synthetic samples generated from physics models + noise
- Export pre-trained ONNX models to pt_models/ directory
- Rewrite prediction.py: ONNX inference first, physics fallback if unavailable
- Add onnxruntime + numpy to requirements.txt (Aliyun mirror for Docker)
- Use Tsinghua mirror in Dockerfile for pip installs

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-27 17:31:25 +08:00
parent 62599b9c40
commit 6ae24cb14d
8 changed files with 642 additions and 320 deletions

View File

@@ -1,6 +1,6 @@
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
RUN pip install --no-cache-dir -i https://pypi.tuna.tsinghua.edu.cn/simple -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

View File

@@ -1,33 +1,22 @@
"""
工艺预测模型 — 灰箱Gray-box架构
工艺预测模型 — 灰箱物理模型 + ONNX 神经网络双栈
设计思路
物理结构来自 Arrhenius 酸洗动力学,参数取自公开文献实验值,
而非理论推导。每个模型内置校准系数 K_cal初始=1.0
投产后可通过 calibrate() 方法用实测结果回归更新,
使模型随数据积累逐步收敛到真实工况。
推理优先级
1. ONNX 模型onnxruntime若 pt_models/ 目录存在则加载)
2. 物理灰箱模型Arrhenius 解析解,始终可用)
关键文献依据:
[1] 碳钢 HCl 酸洗活化能Ea ≈ 40~50 kJ/mol实验测定均值取 45 kJ/mol
来源Hydrochloric Acid Pickling Process Optimization in Metal Wire,
IJSSST Vol-16 No-5; IspatGuru Pickling of Hot Rolled Strip
[2] H⁺ 浓度动力学阶次1.0~2.0阶(取保守值 1.2
来源Optimizing pickling process for 30Cr13 steel,
ScienceDirect 2025; neural network MPC studies
[3] 温度效应校验:速率每升温 6~8°C 翻倍Ea≈45 kJ/mol 时对应约 7°C
[4] 欠酸洗风险判别特征strip thickness, speed, conc, temp
来源Prediction of under pickling defects on steel strip surface,
arXiv:1207.0911
[5] 速度优化Nelder-Mead simplex 已在实际 1450mm 酸洗线验证
来源Zhu et al., Advances in Mechanical Engineering, 2016
训练:运行 backend/train_models.py 重新生成 pt_models/*.onnx
校准K_cal 系数持久化在 cal_coeffs.json两个栈都使用同一套 K_cal
"""
import math
import json
import os
from pathlib import Path
from typing import List, Dict, Any, Optional, Tuple
from loguru import logger
# ── 校准系数持久化路径 ────────────────────────────────────────────────────────
_CAL_FILE = os.path.join(os.path.dirname(__file__), "cal_coeffs.json")
# ── 校准系数持久化 ────────────────────────────────────────────────────────────
_CAL_FILE = Path(__file__).parent / "cal_coeffs.json"
def _load_cal() -> Dict[str, float]:
try:
@@ -41,56 +30,78 @@ def _save_cal(d: Dict[str, float]):
json.dump(d, f, indent=2)
# ── ONNX 推理层 ───────────────────────────────────────────────────────────────
_PT_DIR = Path(__file__).parent / "pt_models"
_scalers: Optional[Dict] = None
_sess: Dict[str, Any] = {}
try:
import onnxruntime as ort
import numpy as _np
_sp = _PT_DIR / "scalers.json"
if _sp.exists():
with open(_sp) as f:
_scalers = json.load(f)
for _name in ("acid_speed", "tension", "quality"):
_p = _PT_DIR / f"{_name}.onnx"
if _p.exists():
_sess[_name] = ort.InferenceSession(
str(_p), providers=["CPUExecutionProvider"]
)
if _sess:
logger.info(f"PT models loaded: {list(_sess.keys())}")
except ImportError:
logger.warning("onnxruntime not installed — using physics fallback")
def _pt_infer(name: str, x_raw: List[float]) -> Optional[List[float]]:
"""标准化 → ONNX 推理 → 反标准化,返回输出向量;失败返回 None。"""
if name not in _sess or _scalers is None:
return None
try:
sc = _scalers[name]
xm = _np.array(sc["X_mean"], dtype=_np.float32)
xs = _np.array(sc["X_std"], dtype=_np.float32)
ym = _np.array(sc["y_mean"], dtype=_np.float32)
ys = _np.array(sc["y_std"], dtype=_np.float32)
x = (_np.array(x_raw, dtype=_np.float32) - xm) / xs
raw = _sess[name].run(None, {"input": x.reshape(1, -1)})[0][0]
return (raw * ys + ym).tolist()
except Exception as e:
logger.warning(f"PT infer {name} failed: {e}")
return None
# ─────────────────────────────────────────────────────────────────────────────
# 1. 酸洗速度模型Gray-box
# 1. 酸洗速度模型
# ─────────────────────────────────────────────────────────────────────────────
class AcidSpeedModel:
"""
基于文献实测参数的 Arrhenius 灰箱模型。
与上一版本的关键差异:
- Ea/R: 3000 K → 5413 K45 kJ/mol 实验值,文献[1]
- 浓度指数: 0.6 → 1.2H⁺ 二阶动力学,文献[2]
- 增加氧化铁皮结构修正FeO/Fe₃O₄双层模型文献[4]
- 内置 K_cal 校准系数,支持投产后在线标定
灰箱: Arrhenius 动力学 + 二分搜索
PT栈: 14 维输入 → 最大速度 (m/min)
输入: [thickness, scale_weight, conc×6, temp×6]
"""
TANK_LENGTH = 18.0
NUM_TANKS = 6
K0 = 0.075
EA_R = 5413.0
T_REF = 348.15
C_REF = 180.0
N_CONC = 1.2
V_MIN = 20.0
V_MAX = 180.0
SCALE_RATE_FACTOR = 0.70 * 1.0 + 0.20 * 0.25 + 0.10 * 0.15
CAL_KEY = "acid_speed_kcal"
# 文献实验值(碳钢 HCl 连续酸洗)
TANK_LENGTH = 18.0 # m单槽有效长度按设备规格书
NUM_TANKS = 6
# K0 由实际工况反推:
# 目标75°C、180 g/L 正常酸液条件下,最大速度约 120~130 m/minPI≥95%
# 推导t_total = 6×18/(125/60)=51.8sk=ln(20)/51.8=0.058 s⁻¹
# k=K0×SCALE_RATE_FACTOR → K0=0.058/0.765≈0.075 s⁻¹
K0 = 0.075 # 指前因子 s⁻¹由设备规格反推标定
EA_R = 5413.0 # Ea/R (K)Ea=45 kJ/mol / R=8.314(文献实验值[1]
T_REF = 348.15 # 参考温度 75°C (K)
C_REF = 180.0 # 参考游离酸浓度 g/L
N_CONC = 1.2 # 浓度动力学阶次(文献[2] 取保守值)
V_MIN = 20.0
V_MAX = 180.0
CAL_KEY = "acid_speed_kcal"
# 氧化铁皮结构系数FeO 快速溶解 + Fe₃O₄ 慢速溶解,文献[4]
# 热轧碳钢铁皮组成约FeO 70%Fe₃O₄ 20%Fe₂O₃ 10%
# FeO 溶速约为 Fe₃O₄ 的 4 倍;有效速率取加权平均
SCALE_RATE_FACTOR = 0.70 * 1.0 + 0.20 * 0.25 + 0.10 * 0.15 # ≈ 0.765
def __init__(
self,
thickness: float, # mm
width: float, # mm
steel_grade: str,
acid_conc_list: List[float], # 各槽游离酸 g/L
acid_temp_list: List[float], # 各槽温度 °C
scale_weight: float = 8.5, # g/m²氧化铁皮重量
target_pi: float = 95.0,
):
def __init__(self, thickness, width, steel_grade,
acid_conc_list, acid_temp_list,
scale_weight=8.5, target_pi=95.0):
if len(acid_conc_list) != self.NUM_TANKS:
raise ValueError(f"acid_conc_list 需要 {self.NUM_TANKS} 个元素")
if len(acid_temp_list) != self.NUM_TANKS:
raise ValueError(f"acid_temp_list 需要 {self.NUM_TANKS} 个元素")
self.thickness = thickness
self.width = width
self.steel_grade = steel_grade
@@ -100,110 +111,87 @@ class AcidSpeedModel:
self.target_pi = target_pi
self.K_cal = _load_cal().get(self.CAL_KEY, 1.0)
def _k_i(self, conc: float, temp_c: float) -> float:
"""单槽有效酸洗速率常数(含文献参数 + 铁皮结构修正)"""
T_k = temp_c + 273.15
arrhenius = math.exp(-self.EA_R * (1.0 / T_k - 1.0 / self.T_REF))
conc_factor = max(conc / self.C_REF, 0.01) ** self.N_CONC
# 铁皮越厚,有效接触面积越低(正比于 1/scale_weight^0.3 经验修正)
scale_corr = (8.5 / max(self.scale_weight, 1.0)) ** 0.3
return self.K0 * arrhenius * conc_factor * self.SCALE_RATE_FACTOR * scale_corr * self.K_cal
def _k_i(self, conc, temp_c):
T_k = temp_c + 273.15
arrhenius = math.exp(-self.EA_R * (1.0/T_k - 1.0/self.T_REF))
c_factor = max(conc/self.C_REF, 0.01) ** self.N_CONC
scale_corr = (8.5 / max(self.scale_weight, 1.0)) ** 0.3
return self.K0 * arrhenius * c_factor * self.SCALE_RATE_FACTOR * scale_corr * self.K_cal
def _compute_pi(self, v_mpm: float) -> Tuple[float, List[float], List[float]]:
v_mps = v_mpm / 60.0
pi_prev = 0.0
pi_per_tank, rt_per_tank = [], []
def _compute_pi(self, v_mpm):
v_mps = v_mpm / 60.0
pi = 0.0
pp, rt = [], []
for i in range(self.NUM_TANKS):
t_i = self.TANK_LENGTH / v_mps
k_i = self._k_i(self.acid_conc_list[i], self.acid_temp_list[i])
# 精确解析解dPI/dt = k*(1-PI/100) → PI_new = 100-(100-PI_old)*exp(-k*t)
# 避免 Euler 一阶近似在 k*t 较大时的严重失真
pi_prev = 100.0 - (100.0 - pi_prev) * math.exp(-k_i * t_i)
pi_per_tank.append(round(pi_prev, 2))
rt_per_tank.append(round(t_i, 1))
return pi_prev, pi_per_tank, rt_per_tank
t_i = self.TANK_LENGTH / v_mps
k_i = self._k_i(self.acid_conc_list[i], self.acid_temp_list[i])
pi = 100.0 - (100.0 - pi) * math.exp(-k_i * t_i)
pp.append(round(pi, 2)); rt.append(round(t_i, 1))
return pi, pp, rt
def calculate(self) -> Dict[str, Any]:
# Nelder-Mead 单维退化为二分搜索(文献[5]验证有效)
pi_at_min, _, _ = self._compute_pi(self.V_MIN)
if pi_at_min < self.target_pi:
def _risk_level(self, speed, pi):
avg_conc = sum(self.acid_conc_list)/len(self.acid_conc_list)
avg_temp = sum(self.acid_temp_list)/len(self.acid_temp_list)
s = 0
if pi < 85: s += 3
elif pi < 92: s += 1
if speed > 140: s += 2
if avg_conc < 120: s += 2
if avg_temp < 68: s += 2
if self.thickness > 4.0: s += 1
return "HIGH" if s >= 5 else "MEDIUM" if s >= 2 else "LOW"
def _physics_result(self):
pi_min, _, _ = self._compute_pi(self.V_MIN)
if pi_min < self.target_pi:
pi, pp, rt = self._compute_pi(self.V_MIN)
return {
"max_speed": self.V_MIN,
"pi_per_tank": pp,
"residence_time_per_tank": rt,
"total_pi": round(pi, 2),
"max_speed": self.V_MIN, "pi_per_tank": pp,
"residence_time_per_tank": rt, "total_pi": round(pi, 2),
"under_pickling_risk": self._risk_level(self.V_MIN, pi),
"warning": "酸液条件不足,即使最低速下酸洗指数仍低于目标,请检查酸浓度和温度",
"K_cal": self.K_cal,
"warning": "酸液条件不足,建议检查酸浓度和温度",
"K_cal": self.K_cal, "source": "physics",
}
lo, hi, best_v = self.V_MIN, self.V_MAX, self.V_MIN
lo, hi, best = self.V_MIN, self.V_MAX, self.V_MIN
while hi - lo >= 0.5:
mid = (lo + hi) / 2.0
pi_mid, _, _ = self._compute_pi(mid)
if pi_mid >= self.target_pi:
best_v = mid; lo = mid + 0.5
if self._compute_pi(mid)[0] >= self.target_pi:
best = mid; lo = mid + 0.5
else:
hi = mid - 0.5
best_v = math.floor(best_v)
total_pi, pi_per_tank, rt_per_tank = self._compute_pi(best_v)
best = math.floor(best)
pi, pp, rt = self._compute_pi(best)
return {
"max_speed": best_v,
"pi_per_tank": pi_per_tank,
"residence_time_per_tank": rt_per_tank,
"total_pi": round(total_pi, 2),
"under_pickling_risk": self._risk_level(best_v, total_pi),
"warning": None,
"K_cal": self.K_cal,
"max_speed": best, "pi_per_tank": pp,
"residence_time_per_tank": rt, "total_pi": round(pi, 2),
"under_pickling_risk": self._risk_level(best, pi),
"warning": None, "K_cal": self.K_cal, "source": "physics",
}
def _risk_level(self, speed: float, pi: float) -> str:
"""
欠酸洗风险评估(文献[4] decision-tree 特征阈值)
输入speed(m/min), pi(%),结合厚度、浓度综合判断
"""
avg_conc = sum(self.acid_conc_list) / len(self.acid_conc_list)
avg_temp = sum(self.acid_temp_list) / len(self.acid_temp_list)
# 文献给出的欠酸洗高风险条件组合
risk_score = 0
if pi < 85: risk_score += 3
elif pi < 92: risk_score += 1
if speed > 140: risk_score += 2
if avg_conc < 120: risk_score += 2
if avg_temp < 68: risk_score += 2
if self.thickness > 4.0: risk_score += 1
def calculate(self) -> Dict[str, Any]:
x = [self.thickness, self.scale_weight] + self.acid_conc_list + self.acid_temp_list
pt = _pt_infer("acid_speed", x)
if pt is not None:
raw_speed = pt[0] * self.K_cal
best = int(max(self.V_MIN, min(self.V_MAX, round(raw_speed))))
pi, pp, rt = self._compute_pi(best)
return {
"max_speed": best, "pi_per_tank": pp,
"residence_time_per_tank": rt, "total_pi": round(pi, 2),
"under_pickling_risk": self._risk_level(best, pi),
"warning": None, "K_cal": self.K_cal, "source": "pt",
}
return self._physics_result()
if risk_score >= 5: return "HIGH"
elif risk_score >= 2: return "MEDIUM"
else: return "LOW"
def calibrate(self, actual_max_speed: float,
actual_quality_ok: bool) -> float:
"""
投产后标定接口:
传入某卷的实际最大可用速度(操作员确认质量合格时的速度),
用简单比例更新 K_cal使模型逐步向真实工况收敛。
actual_max_speed: 实际测得质量合格的最高速度 (m/min)
actual_quality_ok: True=该速度下质量合格False=出现欠酸洗
"""
def calibrate(self, actual_max_speed, actual_quality_ok):
predicted = self.calculate()["max_speed"]
if not actual_quality_ok:
# 预测速度偏高,缩减 K_cal
adjustment = 0.95
adj = 0.95
else:
ratio = actual_max_speed / max(predicted, 1.0)
# 平滑更新,避免单次样本过拟合
adjustment = 1.0 + 0.3 * (ratio - 1.0)
adjustment = max(0.7, min(1.3, adjustment))
self.K_cal = round(self.K_cal * adjustment, 4)
cal = _load_cal()
cal[self.CAL_KEY] = self.K_cal
_save_cal(cal)
adj = max(0.7, min(1.3, 1.0 + 0.3*(ratio - 1.0)))
self.K_cal = round(self.K_cal * adj, 4)
cal = _load_cal(); cal[self.CAL_KEY] = self.K_cal; _save_cal(cal)
return self.K_cal
@@ -212,106 +200,91 @@ class AcidSpeedModel:
# ─────────────────────────────────────────────────────────────────────────────
class TensionModel:
"""
张力模型:基于截面积×屈服强度,区间比例系数参考酸洗线工程手册。
每个区段独立校准系数 zone_kcal[zone],互不干扰。
灰箱: T_base = coef × σ_yield × A各区段比例系数
PT栈: 4 维输入 → 10 区段张力 kN
输入: [thickness, width, yield_strength, tension_coef]
"""
# 各区基准比例系数(酸洗线工程实践均值)
ZONE_RATIOS = {
"inlet": 1.00,
"s1_roller": 0.85,
"acid_entry": 0.78,
"acid1": 0.72,
"acid2": 0.68,
"acid3": 0.68,
"rinse": 0.70,
"leveler": 0.76,
"s2_roller": 0.88,
"outlet": 1.00,
"inlet": 1.00, "s1_roller": 0.85, "acid_entry": 0.78,
"acid1": 0.72, "acid2": 0.68, "acid3": 0.68,
"rinse": 0.70, "leveler": 0.76, "s2_roller": 0.88, "outlet": 1.00,
}
ZONE_NAMES_CN = {
"inlet": "入口张力辊",
"s1_roller": "S1夹送辊",
"acid_entry": "酸洗入口辊",
"acid1": "1#酸槽",
"acid2": "2#酸槽",
"acid3": "3#酸槽",
"rinse": "漂洗段辊",
"leveler": "拉矫机",
"s2_roller": "S2夹送辊",
"outlet": "出口张力辊",
"inlet": "入口张力辊", "s1_roller": "S1夹送辊",
"acid_entry": "酸洗入口辊", "acid1": "1#酸槽",
"acid2": "2#酸槽", "acid3": "3#酸槽",
"rinse": "漂洗段辊", "leveler": "拉矫机",
"s2_roller": "S2夹送辊", "outlet": "出口张力辊",
}
@staticmethod
def _zone_cal_key(zone: str) -> str:
return f"tension_zone_{zone}"
def _zone_cal_key(zone): return f"tension_zone_{zone}"
def __init__(
self,
thickness: float,
width: float,
yield_strength: float,
tension_coef: float = 0.25,
):
def __init__(self, thickness, width, yield_strength, tension_coef=0.25):
self.thickness = thickness
self.width = width
self.yield_strength = yield_strength
self.tension_coef = tension_coef
cal = _load_cal()
# 每个区段独立加载自己的校准系数,默认 1.0
self.zone_kcal: Dict[str, float] = {
z: cal.get(self._zone_cal_key(z), 1.0)
for z in self.ZONE_RATIOS
}
def calculate(self) -> Dict[str, Any]:
cross_section = self.thickness * self.width # mm²
# T_max 是理论基准值(不含区段校准,区段校准在各 zone 内单独乘)
t_base_kn = (self.tension_coef * self.yield_strength
* cross_section / 1000.0) # kN
self.zone_kcal = {z: cal.get(self._zone_cal_key(z), 1.0) for z in self.ZONE_RATIOS}
def _physics_zones(self, t_base_kn):
zones = {}
for zone, ratio in self.ZONE_RATIOS.items():
k = self.zone_kcal.get(zone, 1.0)
k = self.zone_kcal[zone]
zones[zone] = {
"tension_kN": round(t_base_kn * ratio * k, 2),
"ratio": ratio,
"k_cal": k,
"name_cn": self.ZONE_NAMES_CN[zone],
"ratio": ratio, "k_cal": k,
"name_cn": self.ZONE_NAMES_CN[zone],
}
return zones
def calculate(self) -> Dict[str, Any]:
cross = self.thickness * self.width
t_base = self.tension_coef * self.yield_strength * cross / 1000.0
pt = _pt_infer("tension", [self.thickness, self.width, self.yield_strength, self.tension_coef])
if pt is not None and _scalers and "tension" in _scalers:
zone_names = _scalers["tension"].get("zone_names", list(self.ZONE_RATIOS.keys()))
zones = {}
for i, zone in enumerate(zone_names):
k = self.zone_kcal.get(zone, 1.0)
kn = round(max(0.1, pt[i]) * k, 2)
zones[zone] = {
"tension_kN": kn,
"ratio": self.ZONE_RATIOS.get(zone, 1.0),
"k_cal": k,
"name_cn": self.ZONE_NAMES_CN.get(zone, zone),
}
source = "pt"
else:
zones = self._physics_zones(t_base)
source = "physics"
density = 7850.0
mass_per_m = density * (self.thickness / 1000.0) * (self.width / 1000.0)
accel_kn = round(mass_per_m * (30.0 / 60.0) / 1000.0, 3)
t_max_kn = round(t_base_kn * self.zone_kcal.get("inlet", 1.0), 2)
mass_per_m = density * (self.thickness/1000.0) * (self.width/1000.0)
accel_kn = round(mass_per_m * (30.0/60.0) / 1000.0, 3)
t_max = round(t_base * self.zone_kcal.get("inlet", 1.0), 2)
return {
"T_max": t_max_kn,
"T_base": round(t_base_kn, 2),
"cross_section_mm2": round(cross_section, 1),
"zones": zones,
"T_max": t_max, "T_base": round(t_base, 2),
"cross_section_mm2": round(cross, 1),
"zones": zones,
"weld_speed_limit": 60.0,
"weld_tension_kN": round(t_max_kn * 0.60, 2),
"accel_tension": accel_kn,
"zone_kcal": self.zone_kcal,
"weld_tension_kN": round(t_max * 0.60, 2),
"accel_tension": accel_kn,
"zone_kcal": self.zone_kcal,
"source": source,
}
def calibrate(self, zone: str, measured_kn: float) -> Dict[str, float]:
"""仅更新指定区段的校准系数,其他区段不变"""
def calibrate(self, zone, measured_kn):
if zone not in self.ZONE_RATIOS:
raise ValueError(f"未知区段: {zone}")
t_base = (self.tension_coef * self.yield_strength
* self.thickness * self.width / 1000.0)
predicted = t_base * self.ZONE_RATIOS[zone] * self.zone_kcal[zone]
ratio = measured_kn / max(predicted, 0.1)
# 平滑更新,步长 40%,范围限制在 [0.5, 2.0]
adjustment = 1.0 + 0.4 * (ratio - 1.0)
adjustment = max(0.5, min(2.0, adjustment))
new_k = round(self.zone_kcal[zone] * adjustment, 4)
self.zone_kcal[zone] = new_k
cal = _load_cal()
cal[self._zone_cal_key(zone)] = new_k
_save_cal(cal)
t_base = self.tension_coef * self.yield_strength * self.thickness * self.width / 1000.0
pred = t_base * self.ZONE_RATIOS[zone] * self.zone_kcal[zone]
adj = max(0.5, min(2.0, 1.0 + 0.4*(measured_kn/max(pred,0.1) - 1.0)))
self.zone_kcal[zone] = round(self.zone_kcal[zone] * adj, 4)
cal = _load_cal(); cal[self._zone_cal_key(zone)] = self.zone_kcal[zone]; _save_cal(cal)
return self.zone_kcal
@@ -320,29 +293,18 @@ class TensionModel:
# ─────────────────────────────────────────────────────────────────────────────
class QualityPredictionModel:
"""
欠酸洗风险 + 质量等级预测。
v2 变化:
- 使用与 AcidSpeedModel 一致的文献参数Ea/R=5413, n=1.2
- 欠酸洗风险特征阈值参考 arXiv:1207.0911 的 decision-tree 结论
- 增加铁离子浓度FeCl₂对酸洗能力的抑制修正
- 支持投产后用实际质量等级校准评分阈值
灰箱: Arrhenius PI 计算 + 速度惩罚
PT栈: 6 维输入 → [pi_score, surface_score]
输入: [thickness, avg_speed, acid_conc_avg, acid_temp_avg, scale_weight, fe_conc_avg]
"""
EA_R = 5413.0
T_REF = 348.15
C_REF = 180.0
N_CONC = 1.2
EA_R = 5413.0
T_REF = 348.15
C_REF = 180.0
N_CONC = 1.2
CAL_KEY = "quality_kcal"
def __init__(
self,
thickness: float,
avg_speed: float,
acid_conc_avg: float, # 游离酸均值 g/L
acid_temp_avg: float, # 温度均值 °C
scale_weight: float = 8.5,
fe_conc_avg: float = 60.0, # FeCl₂ 浓度 g/L铁离子抑制效应
):
def __init__(self, thickness, avg_speed, acid_conc_avg, acid_temp_avg,
scale_weight=8.5, fe_conc_avg=60.0):
self.thickness = thickness
self.avg_speed = avg_speed
self.acid_conc_avg = acid_conc_avg
@@ -351,108 +313,96 @@ class QualityPredictionModel:
self.fe_conc_avg = fe_conc_avg
self.K_cal = _load_cal().get(self.CAL_KEY, 1.0)
def _pickling_index_score(self) -> float:
T_k = self.acid_temp_avg + 273.15
arrhenius = math.exp(-self.EA_R * (1.0 / T_k - 1.0 / self.T_REF))
conc_factor = max(self.acid_conc_avg / self.C_REF, 0.01) ** self.N_CONC
# 铁离子抑制FeCl₂ > 80 g/L 时显著降低酸洗速率(文献经验)
fe_inhibition = 1.0 - max(0.0, (self.fe_conc_avg - 80.0) / 200.0) * 0.35
scale_corr = (8.5 / max(self.scale_weight, 1.0)) ** 0.3
exposure = (1.20 * arrhenius * conc_factor * fe_inhibition
* scale_corr * 18.0 * 6) / (self.avg_speed / 60.0)
pi_score = 100.0 * (1.0 - math.exp(-exposure / 10.0))
return min(max(pi_score * self.K_cal, 0.0), 100.0)
def _pi(self):
T_k = self.acid_temp_avg + 273.15
arr = math.exp(-self.EA_R*(1.0/T_k - 1.0/self.T_REF))
c_factor = max(self.acid_conc_avg/self.C_REF, 0.01)**self.N_CONC
fe_inh = 1.0 - max(0.0, (self.fe_conc_avg-80.0)/200.0)*0.35
scale_corr = (8.5/max(self.scale_weight,1.0))**0.3
exposure = (1.2*arr*c_factor*fe_inh*scale_corr*18.0*6) / (self.avg_speed/60.0)
return min(max(100.0*(1.0-math.exp(-exposure/10.0))*self.K_cal, 0), 100)
def _surface_score(self, pi_score: float) -> float:
# 最优速度区间 80-140 m/min文献[4] 欠酸洗风险判别边界)
def _surface(self, pi):
if self.avg_speed < 60:
speed_score = 80.0
ss = 80.0
elif self.avg_speed <= 140:
speed_score = 80.0 + 15.0 * (self.avg_speed - 60) / 80.0
ss = 80.0 + 15.0*(self.avg_speed-60)/80.0
else:
over = (self.avg_speed - 140) / 40.0
speed_score = 95.0 - 30.0 * over
return min(max(pi_score * 0.65 + speed_score * 0.35, 0.0), 100.0)
ss = 95.0 - 30.0*((self.avg_speed-140)/40.0)
return min(max(pi*0.65 + ss*0.35, 0), 100)
def _grade(self, pi: float, surface: float) -> str:
c = (pi + surface) / 2.0
def _grade(self, pi, suf):
c = (pi+suf)/2.0
if c >= 90: return "A1"
if c >= 80: return "A2"
if c >= 70: return "B1"
if c >= 60: return "B2"
return "C"
def _recommendations(self, pi: float, surface: float) -> List[str]:
def _recommendations(self, pi, suf):
recs = []
if self.fe_conc_avg > 80:
recs.append(f"铁离子浓度偏高({self.fe_conc_avg:.0f} g/L酸洗能力受抑制,建议加速换酸或补充新")
recs.append(f"铁离子浓度偏高({self.fe_conc_avg:.0f} g/L建议加速换")
if pi < 80:
recs.append("酸洗指数偏低,建议提高酸液浓度至 180 g/L 以上,或将温度升至 80°C")
recs.append("酸洗指数偏低,建议提高酸液浓度至 180 g/L 以上,或升至 80°C")
if pi < 65:
recs.append(f"欠酸洗风险高,建议将线速降至 {max(self.avg_speed*0.75, 20):.0f} m/min 以下")
recs.append(f"欠酸洗风险高,建议将线速降至 {max(self.avg_speed*0.75, 20):.0f} m/min")
if self.acid_temp_avg < 70:
recs.append(f"酸液温度偏低({self.acid_temp_avg:.1f}°C建议升温至 75~85°C")
if self.acid_conc_avg < 120:
recs.append(f"游离酸浓度偏低({self.acid_conc_avg:.0f} g/L建议补充新酸至 150 g/L")
recs.append(f"游离酸浓度偏低({self.acid_conc_avg:.0f} g/L建议补充新酸")
if self.avg_speed > 150:
recs.append(f"线速过高({self.avg_speed:.0f} m/min欠酸洗风险,建议不超过 140 m/min")
recs.append(f"线速过高({self.avg_speed:.0f} m/min欠酸洗风险")
if self.scale_weight > 12.0:
recs.append(f"氧化铁皮偏重({self.scale_weight:.1f} g/m²建议检查加热炉气氛控制")
recs.append(f"氧化铁皮偏重({self.scale_weight:.1f} g/m²建议检查加热炉气氛")
if not recs:
recs.append("工艺参数在正常范围内,当前设定可继续保持")
return recs
def calculate(self) -> Dict[str, Any]:
pi = round(self._pickling_index_score(), 1)
surface = round(self._surface_score(pi), 1)
x = [self.thickness, self.avg_speed, self.acid_conc_avg,
self.acid_temp_avg, self.scale_weight, self.fe_conc_avg]
pt = _pt_infer("quality", x)
if pt is not None:
pi = round(float(min(max(pt[0]*self.K_cal, 0), 100)), 1)
suf = round(float(min(max(pt[1]*self.K_cal, 0), 100)), 1)
src = "pt"
else:
pi = round(self._pi(), 1)
suf = round(self._surface(pi), 1)
src = "physics"
return {
"pi_score": pi,
"surface_score": surface,
"overall_grade": self._grade(pi, surface),
"recommendations": self._recommendations(pi, surface),
"K_cal": self.K_cal,
"pi_score": pi, "surface_score": suf,
"overall_grade": self._grade(pi, suf),
"recommendations": self._recommendations(pi, suf),
"K_cal": self.K_cal, "source": src,
}
def calibrate(self, actual_grade: str) -> float:
"""传入实际质检等级,更新评分校准系数"""
def calibrate(self, actual_grade):
grade_map = {"A1": 95, "A2": 85, "B1": 75, "B2": 65, "C": 50}
actual_score = grade_map.get(actual_grade, 75)
result = self.calculate()
predicted_score = (result["pi_score"] + result["surface_score"]) / 2.0
ratio = actual_score / max(predicted_score, 1.0)
adjustment = 1.0 + 0.3 * (ratio - 1.0)
adjustment = max(0.7, min(1.3, adjustment))
self.K_cal = round(self.K_cal * adjustment, 4)
cal = _load_cal()
cal[self.CAL_KEY] = self.K_cal
_save_cal(cal)
target = grade_map.get(actual_grade, 75)
res = self.calculate()
pred = (res["pi_score"] + res["surface_score"]) / 2.0
adj = max(0.7, min(1.3, 1.0 + 0.3*(target/max(pred,1.0) - 1.0)))
self.K_cal = round(self.K_cal * adj, 4)
cal = _load_cal(); cal[self.CAL_KEY] = self.K_cal; _save_cal(cal)
return self.K_cal
# ─────────────────────────────────────────────────────────────────────────────
# 4. 消耗预测模型
# 4. 消耗预测模型(无 PT 版本,定额+修正公式足够)
# ─────────────────────────────────────────────────────────────────────────────
class AcidConsumptionModel:
"""
单卷资源消耗预测。
单位消耗定额取自浙江企鹅1250mm规格书
酸耗额外引入铁离子浓度修正FeCl₂ 越高酸液越快失效,换酸频率越高)。
"""
ACID_WITH_REGEN = 2.0
ACID_WITHOUT_REGEN = 35.0
STEAM_UNIT = 39.8
POWER_UNIT = 14.0
COOLING_UNIT = 1.21
ACID_WITH_REGEN = 2.0 # kg/t
ACID_WITHOUT_REGEN = 35.0 # kg/t
STEAM_UNIT = 39.8 # kg/t
POWER_UNIT = 14.0 # kWh/t
COOLING_UNIT = 1.21 # m³/t
def __init__(
self,
thickness: float,
width: float,
coil_weight_kg: float,
has_regen_station: bool = True,
fe_conc_avg: float = 60.0, # FeCl₂ g/L影响换酸频率
):
def __init__(self, thickness, width, coil_weight_kg,
has_regen_station=True, fe_conc_avg=60.0):
self.thickness = thickness
self.width = width
self.coil_weight_kg = coil_weight_kg
@@ -460,23 +410,19 @@ class AcidConsumptionModel:
self.fe_conc_avg = fe_conc_avg
def calculate(self) -> Dict[str, Any]:
weight_t = self.coil_weight_kg / 1000.0
wt = self.coil_weight_kg / 1000.0
acid_base = self.ACID_WITH_REGEN if self.has_regen_station else self.ACID_WITHOUT_REGEN
# 铁离子修正FeCl₂ > 100 g/L 时酸液利用率下降,有效酸耗上升
fe_factor = 1.0 + max(0.0, (self.fe_conc_avg - 100.0) / 100.0) * 0.4
fe_factor = 1.0 + max(0.0, (self.fe_conc_avg-100.0)/100.0)*0.4
acid_unit = round(acid_base * fe_factor, 3)
return {
"coil_weight_t": round(weight_t, 3),
"has_regen_station": self.has_regen_station,
"acid_consumption_kg": round(acid_unit * weight_t, 2),
"acid_unit_kg_per_t": acid_unit,
"steam_consumption_kg": round(self.STEAM_UNIT * weight_t, 2),
"steam_unit_kg_per_t": self.STEAM_UNIT,
"power_consumption_kwh": round(self.POWER_UNIT * weight_t, 2),
"power_unit_kwh_per_t": self.POWER_UNIT,
"cooling_water_m3": round(self.COOLING_UNIT * weight_t, 3),
"coil_weight_t": round(wt, 3),
"acid_consumption_kg": round(acid_unit * wt, 2),
"acid_unit_kg_per_t": acid_unit,
"steam_consumption_kg": round(self.STEAM_UNIT * wt, 2),
"steam_unit_kg_per_t": self.STEAM_UNIT,
"power_consumption_kwh": round(self.POWER_UNIT * wt, 2),
"power_unit_kwh_per_t": self.POWER_UNIT,
"cooling_water_m3": round(self.COOLING_UNIT * wt, 3),
"cooling_water_unit_m3_per_t": self.COOLING_UNIT,
"fe_conc_factor": round(fe_factor, 3),
"fe_conc_factor": round(fe_factor, 3),
}

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,118 @@
{
"acid_speed": {
"X_mean": [
4.279637813568115,
11.041364669799805,
150.61114501953125,
149.8550262451172,
150.01771545410156,
150.00747680664062,
150.5503692626953,
149.680419921875,
69.46580505371094,
69.35639953613281,
69.35671997070312,
69.52980041503906,
69.55128479003906,
69.26476287841797
],
"X_std": [
2.1389975547790527,
4.052000999450684,
51.74678421020508,
52.34047317504883,
51.52565383911133,
52.03013610839844,
51.94683074951172,
51.77798080444336,
10.097525596618652,
10.071165084838867,
10.012657165527344,
10.097661018371582,
10.077392578125,
10.07375717163086
],
"y_mean": [
92.42212677001953
],
"y_std": [
30.15455436706543
]
},
"tension": {
"X_mean": [
4.211259365081787,
1104.45068359375,
376.4231262207031,
0.25053057074546814
],
"X_std": [
2.1753084659576416,
290.6602783203125,
129.755859375,
0.057579852640628815
],
"y_mean": [
439.6892395019531,
373.8921203613281,
342.94708251953125,
316.7646179199219,
298.9944763183594,
298.7807312011719,
307.93359375,
334.0721740722656,
387.0394592285156,
439.63824462890625
],
"y_std": [
341.6143798828125,
290.6787109375,
266.5653991699219,
246.12326049804688,
232.24676513671875,
231.9054412841797,
239.40611267089844,
259.3957214355469,
300.75,
341.000244140625
],
"zone_names": [
"inlet",
"s1_roller",
"acid_entry",
"acid1",
"acid2",
"acid3",
"rinse",
"leveler",
"s2_roller",
"outlet"
]
},
"quality": {
"X_mean": [
4.245825290679932,
99.6146011352539,
150.18072509765625,
70.02484893798828,
10.967876434326172,
74.14215850830078
],
"X_std": [
2.18487548828125,
46.58076858520508,
51.831016540527344,
11.553886413574219,
4.02312707901001,
31.69878578186035
],
"y_mean": [
92.10986328125,
89.79142761230469
],
"y_std": [
11.346452713012695,
8.92744255065918
]
}
}

Binary file not shown.

View File

@@ -18,3 +18,5 @@ redis==5.0.4
aioredis==2.0.1
httpx==0.27.0
loguru==0.7.2
onnxruntime==1.18.0
numpy==1.26.4

256
backend/train_models.py Normal file
View File

@@ -0,0 +1,256 @@
"""
本地训练脚本 — 生成合成数据、训练 MLP、导出 ONNX
运行方式(在 backend/ 目录下):
python train_models.py
依赖(仅本地训练用,不进 Docker
pip install torch onnx onnxruntime scikit-learn numpy
"""
import sys, json, time
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
sys.path.insert(0, str(Path(__file__).parent))
from app.services.prediction import AcidSpeedModel, TensionModel, QualityPredictionModel
PT_DIR = Path(__file__).parent / "app" / "services" / "pt_models"
PT_DIR.mkdir(parents=True, exist_ok=True)
SEED = 2024
N = 12000
np.random.seed(SEED)
torch.manual_seed(SEED)
TENSION_ZONES = [
"inlet", "s1_roller", "acid_entry",
"acid1", "acid2", "acid3",
"rinse", "leveler", "s2_roller", "outlet",
]
# ─── 网络结构 ───────────────────────────────────────────────────────────────
class MLP(nn.Module):
def __init__(self, in_dim: int, out_dim: int, hidden=(128, 64, 32)):
super().__init__()
layers: list = []
prev = in_dim
for h in hidden:
layers += [nn.Linear(prev, h), nn.ReLU()]
prev = h
layers.append(nn.Linear(prev, out_dim))
self.net = nn.Sequential(*layers)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.net(x)
# ─── 训练通用函数 ───────────────────────────────────────────────────────────
def fit(model: nn.Module, X: np.ndarray, y: np.ndarray,
epochs=300, lr=1e-3, batch_size=512) -> nn.Module:
Xt = torch.from_numpy(X)
yt = torch.from_numpy(y)
dl = DataLoader(TensorDataset(Xt, yt), batch_size=batch_size, shuffle=True)
opt = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
sched = optim.lr_scheduler.CosineAnnealingLR(opt, epochs)
loss_fn = nn.MSELoss()
model.train()
for ep in range(1, epochs + 1):
tot = 0.0
for xb, yb in dl:
opt.zero_grad()
loss = loss_fn(model(xb), yb)
loss.backward()
opt.step()
tot += loss.item() * len(xb)
sched.step()
if ep % 100 == 0:
print(f" ep {ep:3d}/{epochs} RMSE={((tot/len(Xt))**0.5):.5f}")
return model
def z_scale(arr: np.ndarray, mean=None, std=None):
if mean is None:
mean = arr.mean(axis=0)
std = arr.std(axis=0) + 1e-8
return ((arr - mean) / std).astype(np.float32), mean, std
def export_onnx(model: nn.Module, in_dim: int, path: Path):
model.eval()
dummy = torch.zeros(1, in_dim)
torch.onnx.export(
model, dummy, str(path),
input_names=["input"], output_names=["output"],
dynamic_axes={"input": {0: "batch"}, "output": {0: "batch"}},
opset_version=17,
)
print(f"{path.name} ({path.stat().st_size//1024} KB)")
# ─── 1. 酸洗速度模型 ────────────────────────────────────────────────────────
# 输入(14): thickness, scale_weight, conc×6, temp×6
# 输出(1): max_speed
def gen_acid_speed(n: int):
rng = np.random.default_rng(SEED)
Xs, ys = [], []
skip = 0
while len(Xs) < n:
t = rng.uniform(0.5, 8.0)
sw = rng.uniform(4.0, 18.0)
conc = rng.uniform(60, 240, 6).tolist()
temp = rng.uniform(52, 87, 6).tolist()
tpi = rng.uniform(88, 97)
try:
m = AcidSpeedModel(
thickness=t, width=1000.0, steel_grade="Q235",
acid_conc_list=conc, acid_temp_list=temp,
scale_weight=sw, target_pi=tpi,
)
spd = float(m.calculate()["max_speed"])
except Exception:
skip += 1
continue
# 模拟真实工况偏差±6% 相对噪声 + 钢种系数扰动
steel_factor = rng.choice([0.92, 0.96, 1.00, 1.03, 1.06])
noise = rng.normal(1.0, 0.06)
spd_n = float(np.clip(spd * noise * steel_factor, 20, 180))
Xs.append([t, sw] + conc + temp)
ys.append([spd_n])
print(f" acid_speed: {len(Xs)} samples (skipped {skip})")
return np.array(Xs, np.float32), np.array(ys, np.float32)
# ─── 2. 张力模型 ────────────────────────────────────────────────────────────
# 输入(4): thickness, width, yield_strength, tension_coef
# 输出(10): 10 区段张力 kN
def gen_tension(n: int):
rng = np.random.default_rng(SEED + 1)
Xs, ys = [], []
while len(Xs) < n:
t = rng.uniform(0.5, 8.0)
w = rng.uniform(600, 1600)
ys_ = rng.uniform(150, 600)
tc = rng.uniform(0.15, 0.35)
m = TensionModel(thickness=t, width=w, yield_strength=ys_, tension_coef=tc)
res = m.calculate()
tensions = [res["zones"][z]["tension_kN"] for z in TENSION_ZONES]
# 各区段独立噪声(实测张力传感器精度约 ±4%
noise = rng.normal(1.0, 0.04, 10)
tensions_n = [float(np.clip(v * noise[i], 0.1, 9999)) for i, v in enumerate(tensions)]
Xs.append([t, w, ys_, tc])
ys.append(tensions_n)
print(f" tension: {len(Xs)} samples")
return np.array(Xs, np.float32), np.array(ys, np.float32)
# ─── 3. 质量预测模型 ─────────────────────────────────────────────────────────
# 输入(6): thickness, avg_speed, acid_conc_avg, acid_temp_avg, scale_weight, fe_conc_avg
# 输出(2): pi_score, surface_score
def gen_quality(n: int):
rng = np.random.default_rng(SEED + 2)
Xs, ys = [], []
while len(Xs) < n:
t = rng.uniform(0.5, 8.0)
spd = rng.uniform(20, 180)
conc = rng.uniform(60, 240)
temp = rng.uniform(50, 90)
sw = rng.uniform(4, 18)
fe = rng.uniform(20, 130)
m = QualityPredictionModel(
thickness=t, avg_speed=spd,
acid_conc_avg=conc, acid_temp_avg=temp,
scale_weight=sw, fe_conc_avg=fe,
)
res = m.calculate()
pi = res["pi_score"]
suf = res["surface_score"]
# ±6% 噪声模拟质检测量不确定度
pi_n = float(np.clip(pi * rng.normal(1.0, 0.06), 0, 100))
suf_n = float(np.clip(suf * rng.normal(1.0, 0.06), 0, 100))
Xs.append([t, spd, conc, temp, sw, fe])
ys.append([pi_n, suf_n])
print(f" quality: {len(Xs)} samples")
return np.array(Xs, np.float32), np.array(ys, np.float32)
# ─── 主流程 ─────────────────────────────────────────────────────────────────
def main():
scalers: dict = {}
t0 = time.time()
# ── 酸洗速度 ──
print("\n[1/3] 酸洗速度模型")
X, y = gen_acid_speed(N)
Xn, Xm, Xs = z_scale(X)
yn, ym, ys_ = z_scale(y)
model = MLP(14, 1, hidden=(128, 64, 32))
print(" 训练中...")
fit(model, Xn, yn, epochs=300)
export_onnx(model, 14, PT_DIR / "acid_speed.onnx")
scalers["acid_speed"] = {
"X_mean": Xm.tolist(), "X_std": Xs.tolist(),
"y_mean": ym.tolist(), "y_std": ys_.tolist(),
}
# ── 张力 ──
print("\n[2/3] 张力模型")
X, y = gen_tension(N)
Xn, Xm, Xs = z_scale(X)
yn, ym, ys_ = z_scale(y)
model = MLP(4, 10, hidden=(64, 64, 32))
print(" 训练中...")
fit(model, Xn, yn, epochs=300)
export_onnx(model, 4, PT_DIR / "tension.onnx")
scalers["tension"] = {
"X_mean": Xm.tolist(), "X_std": Xs.tolist(),
"y_mean": ym.tolist(), "y_std": ys_.tolist(),
"zone_names": TENSION_ZONES,
}
# ── 质量 ──
print("\n[3/3] 质量预测模型")
X, y = gen_quality(N)
Xn, Xm, Xs = z_scale(X)
yn, ym, ys_ = z_scale(y)
model = MLP(6, 2, hidden=(64, 32))
print(" 训练中...")
fit(model, Xn, yn, epochs=300)
export_onnx(model, 6, PT_DIR / "quality.onnx")
scalers["quality"] = {
"X_mean": Xm.tolist(), "X_std": Xs.tolist(),
"y_mean": ym.tolist(), "y_std": ys_.tolist(),
}
# ── 保存 scaler 参数 ──
scaler_path = PT_DIR / "scalers.json"
with open(scaler_path, "w") as f:
json.dump(scalers, f, indent=2)
print(f"\n scalers → {scaler_path.name}")
print(f"\n完成 ({time.time()-t0:.1f}s)\n")
if __name__ == "__main__":
main()