""" 工艺预测模型 — 灰箱物理模型 + ONNX 神经网络双栈 三层校准体系: 1. K_cal — 按钢种乘法偏置(立即生效) 2. PhysParams — EA_R / K0 / N_CONC 按钢种网格拟合(≥10 样本后自动触发) 3. 数据飞轮 — 积累实绩后触发 ONNX 重训(POST /retrain 离线触发) cal_coeffs.json 新结构: { "kcal": { "acid_speed": {"_default": 1.0, "Q235": 1.02}, ... }, "phys": { "acid_speed": {"_default": {EA_R, K0, N_CONC}, "Q235": {...}}, ... }, "history": [...] } production_samples.jsonl:每条一个 JSON,按 model + grade 索引。 """ import math import json from datetime import datetime from pathlib import Path from typing import List, Dict, Any, Optional from loguru import logger # ── 路径常量 ────────────────────────────────────────────────────────────────── _SVC_DIR = Path(__file__).parent _CAL_FILE = _SVC_DIR / "cal_coeffs.json" _SAMPLE_FILE = _SVC_DIR / "production_samples.jsonl" _PT_DIR = _SVC_DIR / "pt_models" _DEFAULT_PHYS: Dict[str, float] = {"EA_R": 5413.0, "K0": 0.075, "N_CONC": 1.2} _K0_REF = 0.075 # quality 模型 K0 归一化基准 _FIT_MIN_SAMPLES = 10 # 触发物理参数拟合的最少样本数 # ── Cal I/O ─────────────────────────────────────────────────────────────────── def _load_cal() -> Dict: try: with open(_CAL_FILE) as f: d = json.load(f) if "kcal" not in d: _migrate_cal(d) with open(_CAL_FILE) as f: d = json.load(f) return d except Exception: return {} def _migrate_cal(old: Dict): """旧平铺格式 → 新嵌套格式(一次性迁移)""" _ZONES = ["inlet","s1_roller","acid_entry","acid1","acid2","acid3", "rinse","leveler","s2_roller","outlet"] new: Dict = {"kcal": {}, "phys": {}, "history": old.get("history", [])} for m in ("acid_speed", "quality"): new["kcal"][m] = {"_default": old.get(f"{m}_kcal", 1.0)} new["phys"][m] = {"_default": _DEFAULT_PHYS.copy()} for z in _ZONES: new["kcal"][f"tension_{z}"] = {"_default": old.get(f"tension_zone_{z}", 1.0)} with open(_CAL_FILE, "w") as f: json.dump(new, f, indent=2, ensure_ascii=False) logger.info("cal_coeffs.json: 已从旧格式迁移到新嵌套格式") def _save_cal(d: Dict): with open(_CAL_FILE, "w") as f: json.dump(d, f, indent=2, ensure_ascii=False) def _get_kcal(model_key: str, grade: str = "_default") -> float: d = _load_cal().get("kcal", {}).get(model_key, {}) return d.get(grade, d.get("_default", 1.0)) def _set_kcal(model_key: str, grade: str, value: float): cal = _load_cal() cal.setdefault("kcal", {}).setdefault(model_key, {"_default": 1.0}) cal["kcal"][model_key][grade] = round(value, 4) _save_cal(cal) def _get_phys(model_key: str, grade: str = "_default") -> Dict: d = _load_cal().get("phys", {}).get(model_key, {}) return {**_DEFAULT_PHYS, **d.get(grade, d.get("_default", {}))} def _set_phys(model_key: str, grade: str, params: Dict): cal = _load_cal() cal.setdefault("phys", {}).setdefault(model_key, {"_default": _DEFAULT_PHYS.copy()}) cal["phys"][model_key][grade] = {k: round(v, 6) for k, v in params.items()} _save_cal(cal) # ── 生产样本 I/O ────────────────────────────────────────────────────────────── def append_sample(record: Dict): """追加一条生产实绩样本(含时间戳)到 JSONL 文件。""" record = {"ts": datetime.now().isoformat(timespec="seconds"), **record} with open(_SAMPLE_FILE, "a") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") def get_samples(model: str, grade: str) -> List[Dict]: """读取指定模型 + 钢种的样本,最多返回 200 条。""" if not _SAMPLE_FILE.exists(): return [] out = [] with open(_SAMPLE_FILE) as f: for line in f: try: r = json.loads(line) if r.get("model") == model and r.get("grade") == grade: out.append(r) except Exception: pass return out[-200:] def get_sample_stats() -> Dict: """返回各模型 + 钢种的样本数量汇总。""" if not _SAMPLE_FILE.exists(): return {} stats: Dict[str, Dict[str, int]] = {} with open(_SAMPLE_FILE) as f: for line in f: try: r = json.loads(line) m = r.get("model", "?") g = r.get("grade", "_default") stats.setdefault(m, {}).setdefault(g, 0) stats[m][g] += 1 except Exception: pass return stats # ── 模块级物理计算(供网格搜索和模型类共享)────────────────────────────────── _TANK_LENGTH = 18.0 _NUM_TANKS = 5 _T_REF = 348.15 _C_REF = 180.0 _SCALE_RATE_FACTOR = 0.70 * 1.0 + 0.20 * 0.25 + 0.10 * 0.15 def _acid_k_i(conc: float, temp_c: float, scale_weight: float, K0: float, EA_R: float, N_CONC: float, K_cal: float = 1.0) -> float: T_k = temp_c + 273.15 arr = math.exp(-EA_R * (1.0/T_k - 1.0/_T_REF)) c_f = max(conc / _C_REF, 0.01) ** N_CONC sc = (8.5 / max(scale_weight, 1.0)) ** 0.3 return K0 * arr * c_f * _SCALE_RATE_FACTOR * sc * K_cal def _acid_compute_pi(v_mpm: float, conc_list, temp_list, scale_weight, K0, EA_R, N_CONC, K_cal=1.0): v_mps = v_mpm / 60.0 pi, pp, rt = 0.0, [], [] for i in range(_NUM_TANKS): t_i = _TANK_LENGTH / v_mps k_i = _acid_k_i(conc_list[i], temp_list[i], scale_weight, K0, EA_R, N_CONC, K_cal) pi = 100.0 - (100.0 - pi) * math.exp(-k_i * t_i) pp.append(round(pi, 2)) rt.append(round(t_i, 1)) return pi, pp, rt def _acid_max_speed(conc_list, temp_list, scale_weight, target_pi, K0, EA_R, N_CONC, K_cal=1.0) -> float: V_MIN, V_MAX = 20.0, 180.0 if _acid_compute_pi(V_MIN, conc_list, temp_list, scale_weight, K0, EA_R, N_CONC, K_cal)[0] < target_pi: return V_MIN lo, hi, best = V_MIN, V_MAX, V_MIN while hi - lo >= 0.5: mid = (lo + hi) / 2.0 if _acid_compute_pi(mid, conc_list, temp_list, scale_weight, K0, EA_R, N_CONC, K_cal)[0] >= target_pi: best = mid; lo = mid + 0.5 else: hi = mid - 0.5 return math.floor(best) def _quality_pi_raw(avg_speed: float, acid_conc_avg: float, acid_temp_avg: float, scale_weight: float, fe_conc_avg: float, K0: float, EA_R: float, N_CONC: float, K_cal: float = 1.0) -> float: T_k = acid_temp_avg + 273.15 arr = math.exp(-EA_R * (1.0/T_k - 1.0/_T_REF)) c_f = max(acid_conc_avg / _C_REF, 0.01) ** N_CONC fe_ih = 1.0 - max(0.0, (fe_conc_avg - 80.0) / 200.0) * 0.35 sc = (8.5 / max(scale_weight, 1.0)) ** 0.3 k0_r = K0 / _K0_REF exp_ = k0_r * 1.2 * arr * c_f * fe_ih * sc * _TANK_LENGTH * _NUM_TANKS / (avg_speed / 60.0) return min(max(100.0 * (1.0 - math.exp(-exp_ / 10.0)) * K_cal, 0.0), 100.0) # ── 物理参数网格拟合 ────────────────────────────────────────────────────────── def fit_acid_phys_params(grade: str) -> Optional[Dict]: """ 从 production_samples.jsonl 中读取指定钢种的酸洗速度样本, 网格搜索最优 (K0, EA_R, N_CONC),≥10 条样本才触发。 成功则写入 cal_coeffs.json 并返回新参数,否则返回 None。 """ samples = get_samples("acid_speed", grade) if len(samples) < _FIT_MIN_SAMPLES: return None cur = _get_phys("acid_speed", grade) K0_g = [cur["K0"] * f for f in (0.85, 0.90, 0.95, 1.00, 1.05, 1.10, 1.15)] EA_R_g = [cur["EA_R"] * f for f in (0.94, 0.97, 1.00, 1.03, 1.06)] NC_g = [cur["N_CONC"]* f for f in (0.90, 1.00, 1.10)] best_mse, best = float("inf"), cur.copy() for K0 in K0_g: for EA_R in EA_R_g: for N_CONC in NC_g: mse = 0.0 for s in samples: inp = s["inputs"] # [t, sw, c0..c5, t0..t5] pred = _acid_max_speed( inp[2:8], inp[8:14], inp[1], s.get("target_pi", 95.0), K0, EA_R, N_CONC ) mse += (pred - s["actual_speed"]) ** 2 mse /= len(samples) if mse < best_mse: best_mse = mse best = {"K0": K0, "EA_R": EA_R, "N_CONC": N_CONC} _set_phys("acid_speed", grade, best) logger.info(f"acid_speed phys fit [{grade}]: RMSE={best_mse**0.5:.2f} m/min {best}") return best def fit_quality_phys_params(grade: str) -> Optional[Dict]: """同上,针对质量预测模型。""" samples = get_samples("quality", grade) if len(samples) < _FIT_MIN_SAMPLES: return None cur = _get_phys("quality", grade) grade_target = {"A1": 95.0, "A2": 85.0, "B1": 75.0, "B2": 65.0, "C": 50.0} K0_g = [cur["K0"] * f for f in (0.85, 0.90, 0.95, 1.00, 1.05, 1.10, 1.15)] EA_R_g = [cur["EA_R"] * f for f in (0.94, 0.97, 1.00, 1.03, 1.06)] NC_g = [cur["N_CONC"]* f for f in (0.90, 1.00, 1.10)] best_mse, best = float("inf"), cur.copy() for K0 in K0_g: for EA_R in EA_R_g: for N_CONC in NC_g: mse = 0.0 for s in samples: inp = s["inputs"] # [t, spd, conc, temp, sw, fe] t_pi = grade_target.get(s.get("actual_grade", "B1"), 75.0) pi = _quality_pi_raw(inp[1], inp[2], inp[3], inp[4], inp[5], K0, EA_R, N_CONC) mse += (pi - t_pi) ** 2 mse /= len(samples) if mse < best_mse: best_mse = mse best = {"K0": K0, "EA_R": EA_R, "N_CONC": N_CONC} _set_phys("quality", grade, best) logger.info(f"quality phys fit [{grade}]: RMSE={best_mse**0.5:.2f} {best}") return best # ── ONNX 推理层 ─────────────────────────────────────────────────────────────── _scalers: Optional[Dict] = None _sess: Dict[str, Any] = {} try: import onnxruntime as ort import numpy as _np _sp = _PT_DIR / "scalers.json" if _sp.exists(): with open(_sp) as f: _scalers = json.load(f) for _name in ("acid_speed", "tension", "quality"): _p = _PT_DIR / f"{_name}.onnx" if _p.exists(): _sess[_name] = ort.InferenceSession(str(_p), providers=["CPUExecutionProvider"]) if _sess: logger.info(f"PT models loaded: {list(_sess.keys())}") except ImportError: logger.warning("onnxruntime not installed — using physics fallback") def _pt_infer(name: str, x_raw: List[float]) -> Optional[List[float]]: """标准化 → ONNX 推理 → 反标准化,失败返回 None。""" if name not in _sess or _scalers is None: return None try: sc = _scalers[name] xm = _np.array(sc["X_mean"], dtype=_np.float32) xs = _np.array(sc["X_std"], dtype=_np.float32) ym = _np.array(sc["y_mean"], dtype=_np.float32) ys = _np.array(sc["y_std"], dtype=_np.float32) x = (_np.array(x_raw, dtype=_np.float32) - xm) / xs raw = _sess[name].run(None, {"input": x.reshape(1, -1)})[0][0] return (raw * ys + ym).tolist() except Exception as e: logger.warning(f"PT infer {name} failed: {e}") return None def reload_onnx(): """重训后调用,热重载 ONNX 模型文件。""" global _scalers, _sess try: sp = _PT_DIR / "scalers.json" if sp.exists(): with open(sp) as f: _scalers = json.load(f) for name in ("acid_speed", "tension", "quality"): p = _PT_DIR / f"{name}.onnx" if p.exists(): _sess[name] = ort.InferenceSession(str(p), providers=["CPUExecutionProvider"]) logger.info(f"ONNX 热重载完成: {list(_sess.keys())}") except Exception as e: logger.error(f"ONNX 热重载失败: {e}") # ───────────────────────────────────────────────────────────────────────────── # 1. 酸洗速度模型 # ───────────────────────────────────────────────────────────────────────────── class AcidSpeedModel: """ 灰箱: Arrhenius 动力学 + 二分搜索 PT栈: 14 维输入 → 最大速度 (m/min) 校准: K_cal 按钢种 + 物理参数 (EA_R/K0/N_CONC) 按钢种网格拟合 输入: [thickness, scale_weight, conc×6, temp×6] """ CAL_KEY = "acid_speed" V_MIN = 20.0 V_MAX = 180.0 def __init__(self, thickness, width, steel_grade, acid_conc_list, acid_temp_list, scale_weight=8.5, target_pi=95.0): if len(acid_conc_list) != _NUM_TANKS: raise ValueError(f"acid_conc_list 需要 {_NUM_TANKS} 个元素") if len(acid_temp_list) != _NUM_TANKS: raise ValueError(f"acid_temp_list 需要 {_NUM_TANKS} 个元素") self.thickness = thickness self.width = width self.steel_grade = steel_grade self.acid_conc_list = acid_conc_list self.acid_temp_list = acid_temp_list self.scale_weight = scale_weight self.target_pi = target_pi self.K_cal = _get_kcal(self.CAL_KEY, steel_grade) phys = _get_phys(self.CAL_KEY, steel_grade) self.K0 = phys["K0"] self.EA_R = phys["EA_R"] self.N_CONC= phys["N_CONC"] def _compute_pi(self, v_mpm): return _acid_compute_pi(v_mpm, self.acid_conc_list, self.acid_temp_list, self.scale_weight, self.K0, self.EA_R, self.N_CONC, self.K_cal) def _risk_level(self, speed, pi): avg_conc = sum(self.acid_conc_list) / len(self.acid_conc_list) avg_temp = sum(self.acid_temp_list) / len(self.acid_temp_list) s = 0 if pi < 85: s += 3 elif pi < 92: s += 1 if speed > 140: s += 2 if avg_conc < 120: s += 2 if avg_temp < 68: s += 2 if self.thickness > 4.0: s += 1 return "HIGH" if s >= 5 else "MEDIUM" if s >= 2 else "LOW" def _physics_result(self): pi_min, _, _ = self._compute_pi(self.V_MIN) if pi_min < self.target_pi: pi, pp, rt = self._compute_pi(self.V_MIN) return { "max_speed": self.V_MIN, "pi_per_tank": pp, "residence_time_per_tank": rt, "total_pi": round(pi, 2), "under_pickling_risk": self._risk_level(self.V_MIN, pi), "warning": "酸液条件不足,建议检查酸浓度和温度", "K_cal": self.K_cal, "phys_params": self._phys_dict(), "source": "physics", } best = _acid_max_speed(self.acid_conc_list, self.acid_temp_list, self.scale_weight, self.target_pi, self.K0, self.EA_R, self.N_CONC, self.K_cal) pi, pp, rt = self._compute_pi(best) return { "max_speed": best, "pi_per_tank": pp, "residence_time_per_tank": rt, "total_pi": round(pi, 2), "under_pickling_risk": self._risk_level(best, pi), "warning": None, "K_cal": self.K_cal, "phys_params": self._phys_dict(), "source": "physics", } def _phys_dict(self): return {"K0": self.K0, "EA_R": self.EA_R, "N_CONC": self.N_CONC} def calculate(self) -> Dict[str, Any]: x = [self.thickness, self.scale_weight] + self.acid_conc_list + self.acid_temp_list pt = _pt_infer("acid_speed", x) if pt is not None: raw = pt[0] * self.K_cal best = int(max(self.V_MIN, min(self.V_MAX, round(raw)))) pi, pp, rt = self._compute_pi(best) return { "max_speed": best, "pi_per_tank": pp, "residence_time_per_tank": rt, "total_pi": round(pi, 2), "under_pickling_risk": self._risk_level(best, pi), "warning": None, "K_cal": self.K_cal, "phys_params": self._phys_dict(), "source": "pt", } return self._physics_result() def calibrate(self, actual_max_speed: float, actual_quality_ok: bool) -> float: """ 更新当前钢种的 K_cal,保存样本,样本 ≥10 时自动触发物理参数拟合。 返回新 K_cal。 """ predicted = self.calculate()["max_speed"] if not actual_quality_ok: adj = 0.95 else: ratio = actual_max_speed / max(predicted, 1.0) adj = max(0.7, min(1.3, 1.0 + 0.3 * (ratio - 1.0))) self.K_cal = round(self.K_cal * adj, 4) _set_kcal(self.CAL_KEY, self.steel_grade, self.K_cal) # 保存样本 append_sample({ "model": "acid_speed", "grade": self.steel_grade, "inputs": [self.thickness, self.scale_weight] + self.acid_conc_list + self.acid_temp_list, "target_pi": self.target_pi, "predicted_speed": predicted, "actual_speed": actual_max_speed, "quality_ok": actual_quality_ok, }) # 样本够了就触发物理参数拟合 n = len(get_samples("acid_speed", self.steel_grade)) if n >= _FIT_MIN_SAMPLES and n % 5 == 0: fit_acid_phys_params(self.steel_grade) return self.K_cal # ───────────────────────────────────────────────────────────────────────────── # 2. 张力设定模型 # ───────────────────────────────────────────────────────────────────────────── class TensionModel: """ 灰箱: T_base = coef × σ_yield × A,各区段比例系数 PT栈: 4 维输入 → 10 区段张力 kN 校准: 每区段 K_cal 按钢种分组 输入: [thickness, width, yield_strength, tension_coef] """ ZONE_RATIOS = { "inlet": 1.00, "s1_roller": 0.85, "acid_entry": 0.78, "acid1": 0.72, "acid2": 0.68, "acid3": 0.68, "rinse": 0.70, "leveler": 0.76, "s2_roller": 0.88, "outlet": 1.00, } ZONE_NAMES_CN = { "inlet": "入口张力辊", "s1_roller": "S1夹送辊", "acid_entry": "酸洗入口辊", "acid1": "1#酸槽", "acid2": "2#酸槽", "acid3": "3#酸槽", "rinse": "漂洗段辊", "leveler": "拉矫机", "s2_roller": "S2夹送辊", "outlet": "出口张力辊", } @staticmethod def _zone_key(zone): return f"tension_{zone}" def __init__(self, thickness, width, yield_strength, tension_coef=0.25, steel_grade="_default"): self.thickness = thickness self.width = width self.yield_strength = yield_strength self.tension_coef = tension_coef self.steel_grade = steel_grade self.zone_kcal = {z: _get_kcal(self._zone_key(z), steel_grade) for z in self.ZONE_RATIOS} def _physics_zones(self, t_base_kn): zones = {} for zone, ratio in self.ZONE_RATIOS.items(): k = self.zone_kcal[zone] zones[zone] = { "tension_kN": round(t_base_kn * ratio * k, 2), "ratio": ratio, "k_cal": k, "name_cn": self.ZONE_NAMES_CN[zone], } return zones def calculate(self) -> Dict[str, Any]: cross = self.thickness * self.width t_base = self.tension_coef * self.yield_strength * cross / 1000.0 pt = _pt_infer("tension", [self.thickness, self.width, self.yield_strength, self.tension_coef]) if pt is not None and _scalers and "tension" in _scalers: zone_names = _scalers["tension"].get("zone_names", list(self.ZONE_RATIOS.keys())) zones = {} for i, zone in enumerate(zone_names): k = self.zone_kcal.get(zone, 1.0) kn = round(max(0.1, pt[i]) * k, 2) zones[zone] = { "tension_kN": kn, "ratio": self.ZONE_RATIOS.get(zone, 1.0), "k_cal": k, "name_cn": self.ZONE_NAMES_CN.get(zone, zone), } source = "pt" else: zones = self._physics_zones(t_base) source = "physics" density = 7850.0 mass_per_m = density * (self.thickness/1000.0) * (self.width/1000.0) accel_kn = round(mass_per_m * (30.0/60.0) / 1000.0, 3) t_max = round(t_base * self.zone_kcal.get("inlet", 1.0), 2) return { "T_max": t_max, "T_base": round(t_base, 2), "cross_section_mm2": round(cross, 1), "zones": zones, "weld_speed_limit": 60.0, "weld_tension_kN": round(t_max * 0.60, 2), "accel_tension": accel_kn, "zone_kcal": self.zone_kcal, "source": source, } def calibrate(self, zone: str, measured_kn: float) -> Dict: """更新指定区段的 K_cal(按当前钢种)。""" if zone not in self.ZONE_RATIOS: raise ValueError(f"未知区段: {zone}") t_base = self.tension_coef * self.yield_strength * self.thickness * self.width / 1000.0 pred = t_base * self.ZONE_RATIOS[zone] * self.zone_kcal[zone] adj = max(0.5, min(2.0, 1.0 + 0.4 * (measured_kn / max(pred, 0.1) - 1.0))) self.zone_kcal[zone] = round(self.zone_kcal[zone] * adj, 4) _set_kcal(self._zone_key(zone), self.steel_grade, self.zone_kcal[zone]) append_sample({ "model": "tension", "grade": self.steel_grade, "zone": zone, "inputs": [self.thickness, self.width, self.yield_strength, self.tension_coef], "predicted_kn": pred, "actual_kn": measured_kn, }) return self.zone_kcal # ───────────────────────────────────────────────────────────────────────────── # 3. 质量预测模型 # ───────────────────────────────────────────────────────────────────────────── class QualityPredictionModel: """ 灰箱: Arrhenius PI 计算 + 速度惩罚 PT栈: 6 维输入 → [pi_score, surface_score] 校准: K_cal 按钢种 + 物理参数按钢种网格拟合 输入: [thickness, avg_speed, acid_conc_avg, acid_temp_avg, scale_weight, fe_conc_avg] """ CAL_KEY = "quality" def __init__(self, thickness, avg_speed, acid_conc_avg, acid_temp_avg, scale_weight=8.5, fe_conc_avg=60.0, steel_grade="_default"): self.thickness = thickness self.avg_speed = avg_speed self.acid_conc_avg = acid_conc_avg self.acid_temp_avg = acid_temp_avg self.scale_weight = scale_weight self.fe_conc_avg = fe_conc_avg self.steel_grade = steel_grade self.K_cal = _get_kcal(self.CAL_KEY, steel_grade) phys = _get_phys(self.CAL_KEY, steel_grade) self.K0 = phys["K0"] self.EA_R = phys["EA_R"] self.N_CONC= phys["N_CONC"] def _pi(self) -> float: return _quality_pi_raw(self.avg_speed, self.acid_conc_avg, self.acid_temp_avg, self.scale_weight, self.fe_conc_avg, self.K0, self.EA_R, self.N_CONC, self.K_cal) def _surface(self, pi: float) -> float: if self.avg_speed < 60: ss = 80.0 elif self.avg_speed <= 140: ss = 80.0 + 15.0 * (self.avg_speed - 60) / 80.0 else: ss = 95.0 - 30.0 * ((self.avg_speed - 140) / 40.0) return min(max(pi * 0.65 + ss * 0.35, 0), 100) def _grade(self, pi: float, suf: float) -> str: c = (pi + suf) / 2.0 if c >= 90: return "A1" if c >= 80: return "A2" if c >= 70: return "B1" if c >= 60: return "B2" return "C" def _recommendations(self, pi: float, suf: float) -> List[str]: recs = [] if self.fe_conc_avg > 80: recs.append(f"铁离子浓度偏高({self.fe_conc_avg:.0f} g/L),建议加速换酸") if pi < 80: recs.append("酸洗指数偏低,建议提高酸液浓度至 180 g/L 以上,或升温至 80°C") if pi < 65: recs.append(f"欠酸洗风险高,建议将线速降至 {max(self.avg_speed*0.75, 20):.0f} m/min") if self.acid_temp_avg < 70: recs.append(f"酸液温度偏低({self.acid_temp_avg:.1f}°C),建议升温至 75~85°C") if self.acid_conc_avg < 120: recs.append(f"游离酸浓度偏低({self.acid_conc_avg:.0f} g/L),建议补充新酸") if self.avg_speed > 150: recs.append(f"线速过高({self.avg_speed:.0f} m/min),欠酸洗风险") if self.scale_weight > 12.0: recs.append(f"氧化铁皮偏重({self.scale_weight:.1f} g/m²),建议检查加热炉气氛") if not recs: recs.append("工艺参数在正常范围内,当前设定可继续保持") return recs def _phys_dict(self): return {"K0": self.K0, "EA_R": self.EA_R, "N_CONC": self.N_CONC} def calculate(self) -> Dict[str, Any]: x = [self.thickness, self.avg_speed, self.acid_conc_avg, self.acid_temp_avg, self.scale_weight, self.fe_conc_avg] pt = _pt_infer("quality", x) if pt is not None: pi = round(float(min(max(pt[0] * self.K_cal, 0), 100)), 1) suf = round(float(min(max(pt[1] * self.K_cal, 0), 100)), 1) src = "pt" else: pi = round(self._pi(), 1) suf = round(self._surface(pi), 1) src = "physics" return { "pi_score": pi, "surface_score": suf, "overall_grade": self._grade(pi, suf), "recommendations": self._recommendations(pi, suf), "K_cal": self.K_cal, "phys_params": self._phys_dict(), "source": src, } def calibrate(self, actual_grade: str) -> float: """ 更新当前钢种 K_cal,保存样本,样本 ≥10 时自动触发物理参数拟合。 返回新 K_cal。 """ grade_map = {"A1": 95, "A2": 85, "B1": 75, "B2": 65, "C": 50} target = grade_map.get(actual_grade, 75) res = self.calculate() pred = (res["pi_score"] + res["surface_score"]) / 2.0 adj = max(0.7, min(1.3, 1.0 + 0.3 * (target / max(pred, 1.0) - 1.0))) self.K_cal = round(self.K_cal * adj, 4) _set_kcal(self.CAL_KEY, self.steel_grade, self.K_cal) append_sample({ "model": "quality", "grade": self.steel_grade, "inputs": [self.thickness, self.avg_speed, self.acid_conc_avg, self.acid_temp_avg, self.scale_weight, self.fe_conc_avg], "predicted_grade": res["overall_grade"], "actual_grade": actual_grade, }) n = len(get_samples("quality", self.steel_grade)) if n >= _FIT_MIN_SAMPLES and n % 5 == 0: fit_quality_phys_params(self.steel_grade) return self.K_cal # ───────────────────────────────────────────────────────────────────────────── # 4. 消耗预测模型(无 PT 版本,定额+修正公式足够) # ───────────────────────────────────────────────────────────────────────────── class AcidConsumptionModel: ACID_WITH_REGEN = 2.0 ACID_WITHOUT_REGEN = 35.0 STEAM_UNIT = 39.8 POWER_UNIT = 14.0 COOLING_UNIT = 1.21 def __init__(self, thickness, width, coil_weight_kg, has_regen_station=True, fe_conc_avg=60.0): self.thickness = thickness self.width = width self.coil_weight_kg = coil_weight_kg self.has_regen_station = has_regen_station self.fe_conc_avg = fe_conc_avg def calculate(self) -> Dict[str, Any]: wt = self.coil_weight_kg / 1000.0 acid_base = self.ACID_WITH_REGEN if self.has_regen_station else self.ACID_WITHOUT_REGEN fe_factor = 1.0 + max(0.0, (self.fe_conc_avg - 100.0) / 100.0) * 0.4 acid_unit = round(acid_base * fe_factor, 3) return { "coil_weight_t": round(wt, 3), "acid_consumption_kg": round(acid_unit * wt, 2), "acid_unit_kg_per_t": acid_unit, "steam_consumption_kg": round(self.STEAM_UNIT * wt, 2), "steam_unit_kg_per_t": self.STEAM_UNIT, "power_consumption_kwh": round(self.POWER_UNIT * wt, 2), "power_unit_kwh_per_t": self.POWER_UNIT, "cooling_water_m3": round(self.COOLING_UNIT * wt, 3), "cooling_water_unit_m3_per_t": self.COOLING_UNIT, "fe_conc_factor": round(fe_factor, 3), }