diff --git a/backend/app/api/prediction.py b/backend/app/api/prediction.py index 47e6bb9..caf4142 100644 --- a/backend/app/api/prediction.py +++ b/backend/app/api/prediction.py @@ -1,7 +1,12 @@ -from fastapi import APIRouter, Depends, HTTPException -from pydantic import BaseModel, Field -from typing import List, Optional +import asyncio +import subprocess +import sys from datetime import datetime +from pathlib import Path +from typing import List, Optional + +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException +from pydantic import BaseModel, Field from app.schemas.common import Response from app.services.auth_service import get_current_user @@ -12,10 +17,20 @@ from app.services.prediction import ( AcidConsumptionModel, _load_cal, _save_cal, + _get_phys, + append_sample, + get_sample_stats, + fit_acid_phys_params, + fit_quality_phys_params, + reload_onnx, ) router = APIRouter() +TENSION_ZONES = ["inlet","s1_roller","acid_entry","acid1","acid2","acid3", + "rinse","leveler","s2_roller","outlet"] +_BACKEND_DIR = Path(__file__).parent.parent.parent + # ───────────────────────────────────────────────────────────────────────────── # Prediction request schemas @@ -36,6 +51,7 @@ class TensionRequest(BaseModel): width: float = Field(..., gt=0) yield_strength: float = Field(..., gt=0) tension_coef: Optional[float] = 0.25 + steel_grade: Optional[str] = "_default" class QualityRequest(BaseModel): @@ -44,13 +60,16 @@ class QualityRequest(BaseModel): acid_conc_avg: float = Field(..., gt=0) acid_temp_avg: float = Field(..., gt=0) scale_weight: Optional[float] = 8.5 + fe_conc_avg: Optional[float] = 60.0 + steel_grade: Optional[str] = "_default" class ConsumptionRequest(BaseModel): thickness: float = Field(..., gt=0) width: float = Field(..., gt=0) coil_weight_kg: float = Field(..., gt=0) - has_regen_station: Optional[bool] = True + has_regen_station: Optional[bool] = True + fe_conc_avg: Optional[float] = 60.0 # ───────────────────────────────────────────────────────────────────────────── @@ -58,17 +77,15 @@ class ConsumptionRequest(BaseModel): # ───────────────────────────────────────────────────────────────────────────── class AcidCalibRequest(BaseModel): - # 需要重建模型实例的上下文参数 thickness: float = Field(..., gt=0) width: float = Field(..., gt=0) steel_grade: str acid_conc_list: List[float] acid_temp_list: List[float] scale_weight: Optional[float] = 8.5 - # 校准输入 - actual_max_speed: float = Field(..., gt=0, description="实测质量合格时的最高速度 m/min") - actual_quality_ok: bool = Field(..., description="该速度下质量是否合格") - note: Optional[str] = None + actual_max_speed: float = Field(..., gt=0, description="实测质量合格时的最高速度 m/min") + actual_quality_ok: bool = Field(..., description="该速度下质量是否合格") + note: Optional[str] = None class TensionCalibRequest(BaseModel): @@ -76,6 +93,7 @@ class TensionCalibRequest(BaseModel): width: float = Field(..., gt=0) yield_strength: float = Field(..., gt=0) tension_coef: Optional[float] = 0.25 + steel_grade: Optional[str] = "_default" zone: str = Field(..., description="测量位置,如 s1_roller") measured_kn: float = Field(..., gt=0, description="实测张力 kN") note: Optional[str] = None @@ -87,6 +105,8 @@ class QualityCalibRequest(BaseModel): acid_conc_avg: float = Field(..., gt=0) acid_temp_avg: float = Field(..., gt=0) scale_weight: Optional[float] = 8.5 + fe_conc_avg: Optional[float] = 60.0 + steel_grade: Optional[str] = "_default" actual_grade: str = Field(..., description="实际质检等级 A1/A2/B1/B2/C") note: Optional[str] = None @@ -95,9 +115,9 @@ class QualityCalibRequest(BaseModel): # Helper: append calibration history # ───────────────────────────────────────────────────────────────────────────── -def _append_history(model_key: str, k_before: float, k_after: float, +def _append_history(model_key: str, k_before, k_after, input_data: dict, note: str = ""): - cal = _load_cal() + cal = _load_cal() history = cal.get("history", []) history.insert(0, { "ts": datetime.now().isoformat(timespec="seconds"), @@ -136,6 +156,7 @@ async def predict_tension(body: TensionRequest, _=Depends(get_current_user)): model = TensionModel( thickness=body.thickness, width=body.width, yield_strength=body.yield_strength, tension_coef=body.tension_coef, + steel_grade=body.steel_grade, ) return Response.ok(model.calculate()) @@ -145,7 +166,8 @@ async def predict_quality(body: QualityRequest, _=Depends(get_current_user)): model = QualityPredictionModel( thickness=body.thickness, avg_speed=body.avg_speed, acid_conc_avg=body.acid_conc_avg, acid_temp_avg=body.acid_temp_avg, - scale_weight=body.scale_weight, + scale_weight=body.scale_weight, fe_conc_avg=body.fe_conc_avg, + steel_grade=body.steel_grade, ) return Response.ok(model.calculate()) @@ -156,6 +178,7 @@ async def predict_consumption(body: ConsumptionRequest, _=Depends(get_current_us thickness=body.thickness, width=body.width, coil_weight_kg=body.coil_weight_kg, has_regen_station=body.has_regen_station, + fe_conc_avg=body.fe_conc_avg, ) return Response.ok(model.calculate()) @@ -164,27 +187,44 @@ async def predict_consumption(body: ConsumptionRequest, _=Depends(get_current_us # Calibration endpoints # ───────────────────────────────────────────────────────────────────────────── -TENSION_ZONES = ["inlet","s1_roller","acid_entry","acid1","acid2","acid3","rinse","leveler","s2_roller","outlet"] - - @router.get("/calibration", response_model=Response[dict]) async def get_calibration(_=Depends(get_current_user)): - """返回各模型当前校准系数和历史记录""" + """返回各模型当前校准系数(按钢种)和历史记录。""" cal = _load_cal() - tension_zone_kcal = { - z: cal.get(f"tension_zone_{z}", 1.0) for z in TENSION_ZONES - } return Response.ok({ - "acid_speed_kcal": cal.get("acid_speed_kcal", 1.0), - "tension_zone_kcal": tension_zone_kcal, - "quality_kcal": cal.get("quality_kcal", 1.0), - "history": cal.get("history", []), + "kcal": cal.get("kcal", {}), + "phys": cal.get("phys", {}), + "history": cal.get("history", []), }) +@router.get("/calibration/samples", response_model=Response[dict]) +async def get_calibration_samples(_=Depends(get_current_user)): + """返回各模型 + 钢种的生产样本数量统计,以及重训所需的样本阈值。""" + stats = get_sample_stats() + return Response.ok({ + "stats": stats, + "fit_threshold": 10, + "retrain_tip": "样本累积足够后,POST /api/prediction/retrain 可触发 ONNX 重训", + }) + + +@router.get("/calibration/phys-params", response_model=Response[dict]) +async def get_phys_params(steel_grade: Optional[str] = None, _=Depends(get_current_user)): + """查询物理参数(EA_R / K0 / N_CONC),可按钢种过滤。""" + cal = _load_cal() + phys = cal.get("phys", {}) + if steel_grade: + result = {} + for m in ("acid_speed", "quality"): + result[m] = _get_phys(m, steel_grade) + return Response.ok({"steel_grade": steel_grade, "phys_params": result}) + return Response.ok(phys) + + @router.post("/calibration/acid-speed", response_model=Response[dict]) async def calibrate_acid_speed(body: AcidCalibRequest, _=Depends(get_current_user)): - """录入实测数据,更新酸洗速度模型校准系数""" + """录入实测速度,更新对应钢种 K_cal;样本 ≥10 后自动触发物理参数拟合。""" try: model = AcidSpeedModel( thickness=body.thickness, width=body.width, @@ -196,76 +236,78 @@ async def calibrate_acid_speed(body: AcidCalibRequest, _=Depends(get_current_use except ValueError as e: raise HTTPException(status_code=422, detail=str(e)) - k_before = model.K_cal + k_before = model.K_cal predicted_speed = model.calculate()["max_speed"] - k_after = model.calibrate( + k_after = model.calibrate( actual_max_speed=body.actual_max_speed, actual_quality_ok=body.actual_quality_ok, ) _append_history( - "acid_speed", k_before, k_after, + f"acid_speed[{body.steel_grade}]", k_before, k_after, {"actual_speed": body.actual_max_speed, - "quality_ok": body.actual_quality_ok, + "quality_ok": body.actual_quality_ok, "predicted_speed": predicted_speed}, body.note or "", ) return Response.ok({ - "k_before": k_before, - "k_after": k_after, + "steel_grade": body.steel_grade, + "k_before": k_before, + "k_after": k_after, "predicted_speed": predicted_speed, - "adjustment": round((k_after / k_before - 1) * 100, 2), + "adjustment": round((k_after / k_before - 1) * 100, 2), }) @router.post("/calibration/tension", response_model=Response[dict]) async def calibrate_tension(body: TensionCalibRequest, _=Depends(get_current_user)): - """录入实测张力,仅更新指定区段的校准系数""" + """录入实测张力,更新对应钢种 + 区段的 K_cal。""" model = TensionModel( thickness=body.thickness, width=body.width, yield_strength=body.yield_strength, tension_coef=body.tension_coef, + steel_grade=body.steel_grade, ) - calc = model.calculate() - predicted_kn = calc["zones"].get(body.zone, {}).get("tension_kN", 0) - k_before = model.zone_kcal.get(body.zone, 1.0) + calc = model.calculate() + predicted_kn= calc["zones"].get(body.zone, {}).get("tension_kN", 0) + k_before = model.zone_kcal.get(body.zone, 1.0) new_zone_kcal = model.calibrate(zone=body.zone, measured_kn=body.measured_kn) - k_after = new_zone_kcal.get(body.zone, 1.0) + k_after = new_zone_kcal.get(body.zone, 1.0) _append_history( - "tension", k_before, k_after, - {"zone": body.zone, - "measured_kn": body.measured_kn, - "predicted_kn": predicted_kn}, + f"tension[{body.steel_grade}][{body.zone}]", k_before, k_after, + {"zone": body.zone, "measured_kn": body.measured_kn, "predicted_kn": predicted_kn}, body.note or "", ) return Response.ok({ - "zone": body.zone, - "k_before": k_before, - "k_after": k_after, - "predicted_kn": predicted_kn, - "measured_kn": body.measured_kn, - "adjustment": round((k_after / k_before - 1) * 100, 2), - "zone_kcal": new_zone_kcal, + "steel_grade": body.steel_grade, + "zone": body.zone, + "k_before": k_before, + "k_after": k_after, + "predicted_kn": predicted_kn, + "measured_kn": body.measured_kn, + "adjustment": round((k_after / k_before - 1) * 100, 2), + "zone_kcal": new_zone_kcal, }) @router.post("/calibration/quality", response_model=Response[dict]) async def calibrate_quality(body: QualityCalibRequest, _=Depends(get_current_user)): - """录入实际质检等级,更新质量模型校准系数""" + """录入实际质检等级,更新对应钢种 K_cal;样本 ≥10 后自动触发物理参数拟合。""" model = QualityPredictionModel( thickness=body.thickness, avg_speed=body.avg_speed, acid_conc_avg=body.acid_conc_avg, acid_temp_avg=body.acid_temp_avg, - scale_weight=body.scale_weight, + scale_weight=body.scale_weight, fe_conc_avg=body.fe_conc_avg, + steel_grade=body.steel_grade, ) - k_before = model.K_cal - calc = model.calculate() + k_before = model.K_cal + calc = model.calculate() predicted_grade = calc["overall_grade"] - k_after = model.calibrate(actual_grade=body.actual_grade) + k_after = model.calibrate(actual_grade=body.actual_grade) _append_history( - "quality", k_before, k_after, - {"actual_grade": body.actual_grade, - "predicted_grade": predicted_grade}, + f"quality[{body.steel_grade}]", k_before, k_after, + {"actual_grade": body.actual_grade, "predicted_grade": predicted_grade}, body.note or "", ) return Response.ok({ + "steel_grade": body.steel_grade, "k_before": k_before, "k_after": k_after, "predicted_grade": predicted_grade, @@ -274,21 +316,111 @@ async def calibrate_quality(body: QualityCalibRequest, _=Depends(get_current_use }) +@router.post("/calibration/fit-phys/{model_key}", response_model=Response[dict]) +async def fit_phys_params_api(model_key: str, steel_grade: str, _=Depends(get_current_user)): + """ + 手动触发指定模型 + 钢种的物理参数拟合(自动触发也会调用此逻辑)。 + model_key: acid_speed | quality + """ + if model_key == "acid_speed": + result = fit_acid_phys_params(steel_grade) + elif model_key == "quality": + result = fit_quality_phys_params(steel_grade) + else: + raise HTTPException(status_code=404, detail="model_key 仅支持 acid_speed / quality") + + if result is None: + from app.services.prediction import _FIT_MIN_SAMPLES + return Response.ok({ + "fitted": False, + "reason": f"样本不足,需 ≥{_FIT_MIN_SAMPLES} 条,请继续录入校准数据", + }) + return Response.ok({"fitted": True, "steel_grade": steel_grade, "phys_params": result}) + + @router.post("/calibration/reset/{model_key}", response_model=Response[dict]) -async def reset_calibration(model_key: str, _=Depends(get_current_user)): - """将指定模型的校准系数全部重置为 1.0""" +async def reset_calibration(model_key: str, steel_grade: Optional[str] = None, + _=Depends(get_current_user)): + """ + 重置校准系数为 1.0。 + steel_grade 为空时重置该模型所有钢种;否则只重置指定钢种。 + """ cal = _load_cal() if model_key == "tension": - # 重置所有区段 for z in TENSION_ZONES: - cal[f"tension_zone_{z}"] = 1.0 - _append_history("tension", None, 1.0, {"action": "reset_all_zones"}) + key = f"tension_{z}" + if steel_grade: + cal.setdefault("kcal", {}).setdefault(key, {}).pop(steel_grade, None) + else: + cal.setdefault("kcal", {})[key] = {"_default": 1.0} + _append_history("tension", None, 1.0, {"action": "reset", "steel_grade": steel_grade}) elif model_key in ("acid_speed", "quality"): - key = f"{model_key}_kcal" - k_before = cal.get(key, 1.0) - cal[key] = 1.0 - _append_history(model_key, k_before, 1.0, {"action": "reset"}) + if steel_grade: + cal.setdefault("kcal", {}).setdefault(model_key, {}).pop(steel_grade, None) + cal.setdefault("phys", {}).setdefault(model_key, {}).pop(steel_grade, None) + else: + cal.setdefault("kcal", {})[model_key] = {"_default": 1.0} + from app.services.prediction import _DEFAULT_PHYS + cal.setdefault("phys", {})[model_key] = {"_default": _DEFAULT_PHYS.copy()} + _append_history(model_key, None, 1.0, + {"action": "reset", "steel_grade": steel_grade or "all"}) else: raise HTTPException(status_code=404, detail="未知模型") _save_cal(cal) - return Response.ok({"model": model_key, "reset": True}) + return Response.ok({"model": model_key, "steel_grade": steel_grade or "all", "reset": True}) + + +# ───────────────────────────────────────────────────────────────────────────── +# 数据飞轮:ONNX 重训端点 +# ───────────────────────────────────────────────────────────────────────────── + +_retrain_lock = asyncio.Lock() +_retrain_status: dict = {"running": False, "last_ts": None, "last_result": None} + + +def _run_retrain(): + """在子进程中运行 train_models.py --use-real-data,完成后热重载 ONNX。""" + global _retrain_status + _retrain_status["running"] = True + _retrain_status["last_ts"] = datetime.now().isoformat(timespec="seconds") + try: + result = subprocess.run( + [sys.executable, str(_BACKEND_DIR / "train_models.py"), "--use-real-data"], + capture_output=True, text=True, timeout=600, + ) + ok = result.returncode == 0 + _retrain_status["last_result"] = { + "success": ok, + "stdout": result.stdout[-2000:] if result.stdout else "", + "stderr": result.stderr[-1000:] if result.stderr else "", + } + if ok: + reload_onnx() + except subprocess.TimeoutExpired: + _retrain_status["last_result"] = {"success": False, "stderr": "训练超时(>600s)"} + except Exception as e: + _retrain_status["last_result"] = {"success": False, "stderr": str(e)} + finally: + _retrain_status["running"] = False + + +@router.post("/retrain", response_model=Response[dict]) +async def trigger_retrain(background_tasks: BackgroundTasks, _=Depends(get_current_user)): + """ + 触发 ONNX 重训(使用 production_samples.jsonl 中的生产实绩)。 + 训练在后台子进程运行,完成后自动热重载模型。 + """ + if _retrain_status["running"]: + raise HTTPException(status_code=409, detail="重训任务正在进行中,请稍后再试") + stats = get_sample_stats() + background_tasks.add_task(_run_retrain) + return Response.ok({ + "message": "重训任务已启动,完成后 ONNX 将自动热重载", + "sample_stats": stats, + }) + + +@router.get("/retrain/status", response_model=Response[dict]) +async def retrain_status(_=Depends(get_current_user)): + """查询重训任务状态。""" + return Response.ok(_retrain_status) diff --git a/backend/app/services/prediction.py b/backend/app/services/prediction.py index 600ef38..5ba802c 100644 --- a/backend/app/services/prediction.py +++ b/backend/app/services/prediction.py @@ -1,37 +1,260 @@ """ 工艺预测模型 — 灰箱物理模型 + ONNX 神经网络双栈 -推理优先级: - 1. ONNX 模型(onnxruntime,若 pt_models/ 目录存在则加载) - 2. 物理灰箱模型(Arrhenius 解析解,始终可用) +三层校准体系: + 1. K_cal — 按钢种乘法偏置(立即生效) + 2. PhysParams — EA_R / K0 / N_CONC 按钢种网格拟合(≥10 样本后自动触发) + 3. 数据飞轮 — 积累实绩后触发 ONNX 重训(POST /retrain 离线触发) -训练:运行 backend/train_models.py 重新生成 pt_models/*.onnx -校准:K_cal 系数持久化在 cal_coeffs.json,两个栈都使用同一套 K_cal +cal_coeffs.json 新结构: + { + "kcal": { "acid_speed": {"_default": 1.0, "Q235": 1.02}, ... }, + "phys": { "acid_speed": {"_default": {EA_R, K0, N_CONC}, "Q235": {...}}, ... }, + "history": [...] + } +production_samples.jsonl:每条一个 JSON,按 model + grade 索引。 """ import math import json -import os +from datetime import datetime from pathlib import Path -from typing import List, Dict, Any, Optional, Tuple +from typing import List, Dict, Any, Optional from loguru import logger -# ── 校准系数持久化 ──────────────────────────────────────────────────────────── -_CAL_FILE = Path(__file__).parent / "cal_coeffs.json" +# ── 路径常量 ────────────────────────────────────────────────────────────────── +_SVC_DIR = Path(__file__).parent +_CAL_FILE = _SVC_DIR / "cal_coeffs.json" +_SAMPLE_FILE = _SVC_DIR / "production_samples.jsonl" +_PT_DIR = _SVC_DIR / "pt_models" -def _load_cal() -> Dict[str, float]: +_DEFAULT_PHYS: Dict[str, float] = {"EA_R": 5413.0, "K0": 0.075, "N_CONC": 1.2} +_K0_REF = 0.075 # quality 模型 K0 归一化基准 +_FIT_MIN_SAMPLES = 10 # 触发物理参数拟合的最少样本数 + +# ── Cal I/O ─────────────────────────────────────────────────────────────────── + +def _load_cal() -> Dict: try: with open(_CAL_FILE) as f: - return json.load(f) + d = json.load(f) + if "kcal" not in d: + _migrate_cal(d) + with open(_CAL_FILE) as f: + d = json.load(f) + return d except Exception: return {} -def _save_cal(d: Dict[str, float]): +def _migrate_cal(old: Dict): + """旧平铺格式 → 新嵌套格式(一次性迁移)""" + _ZONES = ["inlet","s1_roller","acid_entry","acid1","acid2","acid3", + "rinse","leveler","s2_roller","outlet"] + new: Dict = {"kcal": {}, "phys": {}, "history": old.get("history", [])} + for m in ("acid_speed", "quality"): + new["kcal"][m] = {"_default": old.get(f"{m}_kcal", 1.0)} + new["phys"][m] = {"_default": _DEFAULT_PHYS.copy()} + for z in _ZONES: + new["kcal"][f"tension_{z}"] = {"_default": old.get(f"tension_zone_{z}", 1.0)} with open(_CAL_FILE, "w") as f: - json.dump(d, f, indent=2) + json.dump(new, f, indent=2, ensure_ascii=False) + logger.info("cal_coeffs.json: 已从旧格式迁移到新嵌套格式") + +def _save_cal(d: Dict): + with open(_CAL_FILE, "w") as f: + json.dump(d, f, indent=2, ensure_ascii=False) + +def _get_kcal(model_key: str, grade: str = "_default") -> float: + d = _load_cal().get("kcal", {}).get(model_key, {}) + return d.get(grade, d.get("_default", 1.0)) + +def _set_kcal(model_key: str, grade: str, value: float): + cal = _load_cal() + cal.setdefault("kcal", {}).setdefault(model_key, {"_default": 1.0}) + cal["kcal"][model_key][grade] = round(value, 4) + _save_cal(cal) + +def _get_phys(model_key: str, grade: str = "_default") -> Dict: + d = _load_cal().get("phys", {}).get(model_key, {}) + return {**_DEFAULT_PHYS, **d.get(grade, d.get("_default", {}))} + +def _set_phys(model_key: str, grade: str, params: Dict): + cal = _load_cal() + cal.setdefault("phys", {}).setdefault(model_key, {"_default": _DEFAULT_PHYS.copy()}) + cal["phys"][model_key][grade] = {k: round(v, 6) for k, v in params.items()} + _save_cal(cal) + +# ── 生产样本 I/O ────────────────────────────────────────────────────────────── + +def append_sample(record: Dict): + """追加一条生产实绩样本(含时间戳)到 JSONL 文件。""" + record = {"ts": datetime.now().isoformat(timespec="seconds"), **record} + with open(_SAMPLE_FILE, "a") as f: + f.write(json.dumps(record, ensure_ascii=False) + "\n") + +def get_samples(model: str, grade: str) -> List[Dict]: + """读取指定模型 + 钢种的样本,最多返回 200 条。""" + if not _SAMPLE_FILE.exists(): + return [] + out = [] + with open(_SAMPLE_FILE) as f: + for line in f: + try: + r = json.loads(line) + if r.get("model") == model and r.get("grade") == grade: + out.append(r) + except Exception: + pass + return out[-200:] + +def get_sample_stats() -> Dict: + """返回各模型 + 钢种的样本数量汇总。""" + if not _SAMPLE_FILE.exists(): + return {} + stats: Dict[str, Dict[str, int]] = {} + with open(_SAMPLE_FILE) as f: + for line in f: + try: + r = json.loads(line) + m = r.get("model", "?") + g = r.get("grade", "_default") + stats.setdefault(m, {}).setdefault(g, 0) + stats[m][g] += 1 + except Exception: + pass + return stats + +# ── 模块级物理计算(供网格搜索和模型类共享)────────────────────────────────── + +_TANK_LENGTH = 18.0 +_NUM_TANKS = 5 +_T_REF = 348.15 +_C_REF = 180.0 +_SCALE_RATE_FACTOR = 0.70 * 1.0 + 0.20 * 0.25 + 0.10 * 0.15 + + +def _acid_k_i(conc: float, temp_c: float, scale_weight: float, + K0: float, EA_R: float, N_CONC: float, K_cal: float = 1.0) -> float: + T_k = temp_c + 273.15 + arr = math.exp(-EA_R * (1.0/T_k - 1.0/_T_REF)) + c_f = max(conc / _C_REF, 0.01) ** N_CONC + sc = (8.5 / max(scale_weight, 1.0)) ** 0.3 + return K0 * arr * c_f * _SCALE_RATE_FACTOR * sc * K_cal + + +def _acid_compute_pi(v_mpm: float, conc_list, temp_list, scale_weight, + K0, EA_R, N_CONC, K_cal=1.0): + v_mps = v_mpm / 60.0 + pi, pp, rt = 0.0, [], [] + for i in range(_NUM_TANKS): + t_i = _TANK_LENGTH / v_mps + k_i = _acid_k_i(conc_list[i], temp_list[i], scale_weight, K0, EA_R, N_CONC, K_cal) + pi = 100.0 - (100.0 - pi) * math.exp(-k_i * t_i) + pp.append(round(pi, 2)) + rt.append(round(t_i, 1)) + return pi, pp, rt + + +def _acid_max_speed(conc_list, temp_list, scale_weight, target_pi, + K0, EA_R, N_CONC, K_cal=1.0) -> float: + V_MIN, V_MAX = 20.0, 180.0 + if _acid_compute_pi(V_MIN, conc_list, temp_list, scale_weight, K0, EA_R, N_CONC, K_cal)[0] < target_pi: + return V_MIN + lo, hi, best = V_MIN, V_MAX, V_MIN + while hi - lo >= 0.5: + mid = (lo + hi) / 2.0 + if _acid_compute_pi(mid, conc_list, temp_list, scale_weight, K0, EA_R, N_CONC, K_cal)[0] >= target_pi: + best = mid; lo = mid + 0.5 + else: + hi = mid - 0.5 + return math.floor(best) + + +def _quality_pi_raw(avg_speed: float, acid_conc_avg: float, acid_temp_avg: float, + scale_weight: float, fe_conc_avg: float, + K0: float, EA_R: float, N_CONC: float, K_cal: float = 1.0) -> float: + T_k = acid_temp_avg + 273.15 + arr = math.exp(-EA_R * (1.0/T_k - 1.0/_T_REF)) + c_f = max(acid_conc_avg / _C_REF, 0.01) ** N_CONC + fe_ih = 1.0 - max(0.0, (fe_conc_avg - 80.0) / 200.0) * 0.35 + sc = (8.5 / max(scale_weight, 1.0)) ** 0.3 + k0_r = K0 / _K0_REF + exp_ = k0_r * 1.2 * arr * c_f * fe_ih * sc * _TANK_LENGTH * _NUM_TANKS / (avg_speed / 60.0) + return min(max(100.0 * (1.0 - math.exp(-exp_ / 10.0)) * K_cal, 0.0), 100.0) + + +# ── 物理参数网格拟合 ────────────────────────────────────────────────────────── + +def fit_acid_phys_params(grade: str) -> Optional[Dict]: + """ + 从 production_samples.jsonl 中读取指定钢种的酸洗速度样本, + 网格搜索最优 (K0, EA_R, N_CONC),≥10 条样本才触发。 + 成功则写入 cal_coeffs.json 并返回新参数,否则返回 None。 + """ + samples = get_samples("acid_speed", grade) + if len(samples) < _FIT_MIN_SAMPLES: + return None + + cur = _get_phys("acid_speed", grade) + K0_g = [cur["K0"] * f for f in (0.85, 0.90, 0.95, 1.00, 1.05, 1.10, 1.15)] + EA_R_g = [cur["EA_R"] * f for f in (0.94, 0.97, 1.00, 1.03, 1.06)] + NC_g = [cur["N_CONC"]* f for f in (0.90, 1.00, 1.10)] + + best_mse, best = float("inf"), cur.copy() + for K0 in K0_g: + for EA_R in EA_R_g: + for N_CONC in NC_g: + mse = 0.0 + for s in samples: + inp = s["inputs"] # [t, sw, c0..c5, t0..t5] + pred = _acid_max_speed( + inp[2:8], inp[8:14], inp[1], + s.get("target_pi", 95.0), K0, EA_R, N_CONC + ) + mse += (pred - s["actual_speed"]) ** 2 + mse /= len(samples) + if mse < best_mse: + best_mse = mse + best = {"K0": K0, "EA_R": EA_R, "N_CONC": N_CONC} + + _set_phys("acid_speed", grade, best) + logger.info(f"acid_speed phys fit [{grade}]: RMSE={best_mse**0.5:.2f} m/min {best}") + return best + + +def fit_quality_phys_params(grade: str) -> Optional[Dict]: + """同上,针对质量预测模型。""" + samples = get_samples("quality", grade) + if len(samples) < _FIT_MIN_SAMPLES: + return None + + cur = _get_phys("quality", grade) + grade_target = {"A1": 95.0, "A2": 85.0, "B1": 75.0, "B2": 65.0, "C": 50.0} + K0_g = [cur["K0"] * f for f in (0.85, 0.90, 0.95, 1.00, 1.05, 1.10, 1.15)] + EA_R_g = [cur["EA_R"] * f for f in (0.94, 0.97, 1.00, 1.03, 1.06)] + NC_g = [cur["N_CONC"]* f for f in (0.90, 1.00, 1.10)] + + best_mse, best = float("inf"), cur.copy() + for K0 in K0_g: + for EA_R in EA_R_g: + for N_CONC in NC_g: + mse = 0.0 + for s in samples: + inp = s["inputs"] # [t, spd, conc, temp, sw, fe] + t_pi = grade_target.get(s.get("actual_grade", "B1"), 75.0) + pi = _quality_pi_raw(inp[1], inp[2], inp[3], inp[4], inp[5], + K0, EA_R, N_CONC) + mse += (pi - t_pi) ** 2 + mse /= len(samples) + if mse < best_mse: + best_mse = mse + best = {"K0": K0, "EA_R": EA_R, "N_CONC": N_CONC} + + _set_phys("quality", grade, best) + logger.info(f"quality phys fit [{grade}]: RMSE={best_mse**0.5:.2f} {best}") + return best # ── ONNX 推理层 ─────────────────────────────────────────────────────────────── -_PT_DIR = Path(__file__).parent / "pt_models" _scalers: Optional[Dict] = None _sess: Dict[str, Any] = {} @@ -47,9 +270,7 @@ try: for _name in ("acid_speed", "tension", "quality"): _p = _PT_DIR / f"{_name}.onnx" if _p.exists(): - _sess[_name] = ort.InferenceSession( - str(_p), providers=["CPUExecutionProvider"] - ) + _sess[_name] = ort.InferenceSession(str(_p), providers=["CPUExecutionProvider"]) if _sess: logger.info(f"PT models loaded: {list(_sess.keys())}") except ImportError: @@ -57,23 +278,40 @@ except ImportError: def _pt_infer(name: str, x_raw: List[float]) -> Optional[List[float]]: - """标准化 → ONNX 推理 → 反标准化,返回输出向量;失败返回 None。""" + """标准化 → ONNX 推理 → 反标准化,失败返回 None。""" if name not in _sess or _scalers is None: return None try: - sc = _scalers[name] - xm = _np.array(sc["X_mean"], dtype=_np.float32) - xs = _np.array(sc["X_std"], dtype=_np.float32) - ym = _np.array(sc["y_mean"], dtype=_np.float32) - ys = _np.array(sc["y_std"], dtype=_np.float32) - x = (_np.array(x_raw, dtype=_np.float32) - xm) / xs - raw = _sess[name].run(None, {"input": x.reshape(1, -1)})[0][0] + sc = _scalers[name] + xm = _np.array(sc["X_mean"], dtype=_np.float32) + xs = _np.array(sc["X_std"], dtype=_np.float32) + ym = _np.array(sc["y_mean"], dtype=_np.float32) + ys = _np.array(sc["y_std"], dtype=_np.float32) + x = (_np.array(x_raw, dtype=_np.float32) - xm) / xs + raw = _sess[name].run(None, {"input": x.reshape(1, -1)})[0][0] return (raw * ys + ym).tolist() except Exception as e: logger.warning(f"PT infer {name} failed: {e}") return None +def reload_onnx(): + """重训后调用,热重载 ONNX 模型文件。""" + global _scalers, _sess + try: + sp = _PT_DIR / "scalers.json" + if sp.exists(): + with open(sp) as f: + _scalers = json.load(f) + for name in ("acid_speed", "tension", "quality"): + p = _PT_DIR / f"{name}.onnx" + if p.exists(): + _sess[name] = ort.InferenceSession(str(p), providers=["CPUExecutionProvider"]) + logger.info(f"ONNX 热重载完成: {list(_sess.keys())}") + except Exception as e: + logger.error(f"ONNX 热重载失败: {e}") + + # ───────────────────────────────────────────────────────────────────────────── # 1. 酸洗速度模型 # ───────────────────────────────────────────────────────────────────────────── @@ -81,27 +319,20 @@ class AcidSpeedModel: """ 灰箱: Arrhenius 动力学 + 二分搜索 PT栈: 14 维输入 → 最大速度 (m/min) + 校准: K_cal 按钢种 + 物理参数 (EA_R/K0/N_CONC) 按钢种网格拟合 输入: [thickness, scale_weight, conc×6, temp×6] """ - TANK_LENGTH = 18.0 - NUM_TANKS = 6 - K0 = 0.075 - EA_R = 5413.0 - T_REF = 348.15 - C_REF = 180.0 - N_CONC = 1.2 - V_MIN = 20.0 - V_MAX = 180.0 - SCALE_RATE_FACTOR = 0.70 * 1.0 + 0.20 * 0.25 + 0.10 * 0.15 - CAL_KEY = "acid_speed_kcal" + CAL_KEY = "acid_speed" + V_MIN = 20.0 + V_MAX = 180.0 def __init__(self, thickness, width, steel_grade, acid_conc_list, acid_temp_list, scale_weight=8.5, target_pi=95.0): - if len(acid_conc_list) != self.NUM_TANKS: - raise ValueError(f"acid_conc_list 需要 {self.NUM_TANKS} 个元素") - if len(acid_temp_list) != self.NUM_TANKS: - raise ValueError(f"acid_temp_list 需要 {self.NUM_TANKS} 个元素") + if len(acid_conc_list) != _NUM_TANKS: + raise ValueError(f"acid_conc_list 需要 {_NUM_TANKS} 个元素") + if len(acid_temp_list) != _NUM_TANKS: + raise ValueError(f"acid_temp_list 需要 {_NUM_TANKS} 个元素") self.thickness = thickness self.width = width self.steel_grade = steel_grade @@ -109,29 +340,19 @@ class AcidSpeedModel: self.acid_temp_list = acid_temp_list self.scale_weight = scale_weight self.target_pi = target_pi - self.K_cal = _load_cal().get(self.CAL_KEY, 1.0) - - def _k_i(self, conc, temp_c): - T_k = temp_c + 273.15 - arrhenius = math.exp(-self.EA_R * (1.0/T_k - 1.0/self.T_REF)) - c_factor = max(conc/self.C_REF, 0.01) ** self.N_CONC - scale_corr = (8.5 / max(self.scale_weight, 1.0)) ** 0.3 - return self.K0 * arrhenius * c_factor * self.SCALE_RATE_FACTOR * scale_corr * self.K_cal + self.K_cal = _get_kcal(self.CAL_KEY, steel_grade) + phys = _get_phys(self.CAL_KEY, steel_grade) + self.K0 = phys["K0"] + self.EA_R = phys["EA_R"] + self.N_CONC= phys["N_CONC"] def _compute_pi(self, v_mpm): - v_mps = v_mpm / 60.0 - pi = 0.0 - pp, rt = [], [] - for i in range(self.NUM_TANKS): - t_i = self.TANK_LENGTH / v_mps - k_i = self._k_i(self.acid_conc_list[i], self.acid_temp_list[i]) - pi = 100.0 - (100.0 - pi) * math.exp(-k_i * t_i) - pp.append(round(pi, 2)); rt.append(round(t_i, 1)) - return pi, pp, rt + return _acid_compute_pi(v_mpm, self.acid_conc_list, self.acid_temp_list, + self.scale_weight, self.K0, self.EA_R, self.N_CONC, self.K_cal) def _risk_level(self, speed, pi): - avg_conc = sum(self.acid_conc_list)/len(self.acid_conc_list) - avg_temp = sum(self.acid_temp_list)/len(self.acid_temp_list) + avg_conc = sum(self.acid_conc_list) / len(self.acid_conc_list) + avg_temp = sum(self.acid_temp_list) / len(self.acid_temp_list) s = 0 if pi < 85: s += 3 elif pi < 92: s += 1 @@ -150,48 +371,66 @@ class AcidSpeedModel: "residence_time_per_tank": rt, "total_pi": round(pi, 2), "under_pickling_risk": self._risk_level(self.V_MIN, pi), "warning": "酸液条件不足,建议检查酸浓度和温度", - "K_cal": self.K_cal, "source": "physics", + "K_cal": self.K_cal, "phys_params": self._phys_dict(), "source": "physics", } - lo, hi, best = self.V_MIN, self.V_MAX, self.V_MIN - while hi - lo >= 0.5: - mid = (lo + hi) / 2.0 - if self._compute_pi(mid)[0] >= self.target_pi: - best = mid; lo = mid + 0.5 - else: - hi = mid - 0.5 - best = math.floor(best) + best = _acid_max_speed(self.acid_conc_list, self.acid_temp_list, self.scale_weight, + self.target_pi, self.K0, self.EA_R, self.N_CONC, self.K_cal) pi, pp, rt = self._compute_pi(best) return { "max_speed": best, "pi_per_tank": pp, "residence_time_per_tank": rt, "total_pi": round(pi, 2), "under_pickling_risk": self._risk_level(best, pi), - "warning": None, "K_cal": self.K_cal, "source": "physics", + "warning": None, "K_cal": self.K_cal, "phys_params": self._phys_dict(), "source": "physics", } + def _phys_dict(self): + return {"K0": self.K0, "EA_R": self.EA_R, "N_CONC": self.N_CONC} + def calculate(self) -> Dict[str, Any]: - x = [self.thickness, self.scale_weight] + self.acid_conc_list + self.acid_temp_list + x = [self.thickness, self.scale_weight] + self.acid_conc_list + self.acid_temp_list pt = _pt_infer("acid_speed", x) if pt is not None: - raw_speed = pt[0] * self.K_cal - best = int(max(self.V_MIN, min(self.V_MAX, round(raw_speed)))) + raw = pt[0] * self.K_cal + best = int(max(self.V_MIN, min(self.V_MAX, round(raw)))) pi, pp, rt = self._compute_pi(best) return { "max_speed": best, "pi_per_tank": pp, "residence_time_per_tank": rt, "total_pi": round(pi, 2), "under_pickling_risk": self._risk_level(best, pi), - "warning": None, "K_cal": self.K_cal, "source": "pt", + "warning": None, "K_cal": self.K_cal, "phys_params": self._phys_dict(), "source": "pt", } return self._physics_result() - def calibrate(self, actual_max_speed, actual_quality_ok): + def calibrate(self, actual_max_speed: float, actual_quality_ok: bool) -> float: + """ + 更新当前钢种的 K_cal,保存样本,样本 ≥10 时自动触发物理参数拟合。 + 返回新 K_cal。 + """ predicted = self.calculate()["max_speed"] if not actual_quality_ok: adj = 0.95 else: ratio = actual_max_speed / max(predicted, 1.0) - adj = max(0.7, min(1.3, 1.0 + 0.3*(ratio - 1.0))) + adj = max(0.7, min(1.3, 1.0 + 0.3 * (ratio - 1.0))) self.K_cal = round(self.K_cal * adj, 4) - cal = _load_cal(); cal[self.CAL_KEY] = self.K_cal; _save_cal(cal) + _set_kcal(self.CAL_KEY, self.steel_grade, self.K_cal) + + # 保存样本 + append_sample({ + "model": "acid_speed", + "grade": self.steel_grade, + "inputs": [self.thickness, self.scale_weight] + self.acid_conc_list + self.acid_temp_list, + "target_pi": self.target_pi, + "predicted_speed": predicted, + "actual_speed": actual_max_speed, + "quality_ok": actual_quality_ok, + }) + + # 样本够了就触发物理参数拟合 + n = len(get_samples("acid_speed", self.steel_grade)) + if n >= _FIT_MIN_SAMPLES and n % 5 == 0: + fit_acid_phys_params(self.steel_grade) + return self.K_cal @@ -202,6 +441,7 @@ class TensionModel: """ 灰箱: T_base = coef × σ_yield × A,各区段比例系数 PT栈: 4 维输入 → 10 区段张力 kN + 校准: 每区段 K_cal 按钢种分组 输入: [thickness, width, yield_strength, tension_coef] """ ZONE_RATIOS = { @@ -218,15 +458,17 @@ class TensionModel: } @staticmethod - def _zone_cal_key(zone): return f"tension_zone_{zone}" + def _zone_key(zone): return f"tension_{zone}" - def __init__(self, thickness, width, yield_strength, tension_coef=0.25): + def __init__(self, thickness, width, yield_strength, + tension_coef=0.25, steel_grade="_default"): self.thickness = thickness self.width = width self.yield_strength = yield_strength self.tension_coef = tension_coef - cal = _load_cal() - self.zone_kcal = {z: cal.get(self._zone_cal_key(z), 1.0) for z in self.ZONE_RATIOS} + self.steel_grade = steel_grade + self.zone_kcal = {z: _get_kcal(self._zone_key(z), steel_grade) + for z in self.ZONE_RATIOS} def _physics_zones(self, t_base_kn): zones = {} @@ -240,7 +482,7 @@ class TensionModel: return zones def calculate(self) -> Dict[str, Any]: - cross = self.thickness * self.width + cross = self.thickness * self.width t_base = self.tension_coef * self.yield_strength * cross / 1000.0 pt = _pt_infer("tension", [self.thickness, self.width, self.yield_strength, self.tension_coef]) @@ -248,12 +490,12 @@ class TensionModel: zone_names = _scalers["tension"].get("zone_names", list(self.ZONE_RATIOS.keys())) zones = {} for i, zone in enumerate(zone_names): - k = self.zone_kcal.get(zone, 1.0) + k = self.zone_kcal.get(zone, 1.0) kn = round(max(0.1, pt[i]) * k, 2) zones[zone] = { "tension_kN": kn, - "ratio": self.ZONE_RATIOS.get(zone, 1.0), - "k_cal": k, + "ratio": self.ZONE_RATIOS.get(zone, 1.0), + "k_cal": k, "name_cn": self.ZONE_NAMES_CN.get(zone, zone), } source = "pt" @@ -271,20 +513,30 @@ class TensionModel: "cross_section_mm2": round(cross, 1), "zones": zones, "weld_speed_limit": 60.0, - "weld_tension_kN": round(t_max * 0.60, 2), - "accel_tension": accel_kn, - "zone_kcal": self.zone_kcal, + "weld_tension_kN": round(t_max * 0.60, 2), + "accel_tension": accel_kn, + "zone_kcal": self.zone_kcal, "source": source, } - def calibrate(self, zone, measured_kn): + def calibrate(self, zone: str, measured_kn: float) -> Dict: + """更新指定区段的 K_cal(按当前钢种)。""" if zone not in self.ZONE_RATIOS: raise ValueError(f"未知区段: {zone}") t_base = self.tension_coef * self.yield_strength * self.thickness * self.width / 1000.0 pred = t_base * self.ZONE_RATIOS[zone] * self.zone_kcal[zone] - adj = max(0.5, min(2.0, 1.0 + 0.4*(measured_kn/max(pred,0.1) - 1.0))) + adj = max(0.5, min(2.0, 1.0 + 0.4 * (measured_kn / max(pred, 0.1) - 1.0))) self.zone_kcal[zone] = round(self.zone_kcal[zone] * adj, 4) - cal = _load_cal(); cal[self._zone_cal_key(zone)] = self.zone_kcal[zone]; _save_cal(cal) + _set_kcal(self._zone_key(zone), self.steel_grade, self.zone_kcal[zone]) + + append_sample({ + "model": "tension", + "grade": self.steel_grade, + "zone": zone, + "inputs": [self.thickness, self.width, self.yield_strength, self.tension_coef], + "predicted_kn": pred, + "actual_kn": measured_kn, + }) return self.zone_kcal @@ -295,51 +547,49 @@ class QualityPredictionModel: """ 灰箱: Arrhenius PI 计算 + 速度惩罚 PT栈: 6 维输入 → [pi_score, surface_score] + 校准: K_cal 按钢种 + 物理参数按钢种网格拟合 输入: [thickness, avg_speed, acid_conc_avg, acid_temp_avg, scale_weight, fe_conc_avg] """ - EA_R = 5413.0 - T_REF = 348.15 - C_REF = 180.0 - N_CONC = 1.2 - CAL_KEY = "quality_kcal" + CAL_KEY = "quality" def __init__(self, thickness, avg_speed, acid_conc_avg, acid_temp_avg, - scale_weight=8.5, fe_conc_avg=60.0): + scale_weight=8.5, fe_conc_avg=60.0, steel_grade="_default"): self.thickness = thickness self.avg_speed = avg_speed self.acid_conc_avg = acid_conc_avg self.acid_temp_avg = acid_temp_avg self.scale_weight = scale_weight self.fe_conc_avg = fe_conc_avg - self.K_cal = _load_cal().get(self.CAL_KEY, 1.0) + self.steel_grade = steel_grade + self.K_cal = _get_kcal(self.CAL_KEY, steel_grade) + phys = _get_phys(self.CAL_KEY, steel_grade) + self.K0 = phys["K0"] + self.EA_R = phys["EA_R"] + self.N_CONC= phys["N_CONC"] - def _pi(self): - T_k = self.acid_temp_avg + 273.15 - arr = math.exp(-self.EA_R*(1.0/T_k - 1.0/self.T_REF)) - c_factor = max(self.acid_conc_avg/self.C_REF, 0.01)**self.N_CONC - fe_inh = 1.0 - max(0.0, (self.fe_conc_avg-80.0)/200.0)*0.35 - scale_corr = (8.5/max(self.scale_weight,1.0))**0.3 - exposure = (1.2*arr*c_factor*fe_inh*scale_corr*18.0*6) / (self.avg_speed/60.0) - return min(max(100.0*(1.0-math.exp(-exposure/10.0))*self.K_cal, 0), 100) + def _pi(self) -> float: + return _quality_pi_raw(self.avg_speed, self.acid_conc_avg, self.acid_temp_avg, + self.scale_weight, self.fe_conc_avg, + self.K0, self.EA_R, self.N_CONC, self.K_cal) - def _surface(self, pi): + def _surface(self, pi: float) -> float: if self.avg_speed < 60: ss = 80.0 elif self.avg_speed <= 140: - ss = 80.0 + 15.0*(self.avg_speed-60)/80.0 + ss = 80.0 + 15.0 * (self.avg_speed - 60) / 80.0 else: - ss = 95.0 - 30.0*((self.avg_speed-140)/40.0) - return min(max(pi*0.65 + ss*0.35, 0), 100) + ss = 95.0 - 30.0 * ((self.avg_speed - 140) / 40.0) + return min(max(pi * 0.65 + ss * 0.35, 0), 100) - def _grade(self, pi, suf): - c = (pi+suf)/2.0 + def _grade(self, pi: float, suf: float) -> str: + c = (pi + suf) / 2.0 if c >= 90: return "A1" if c >= 80: return "A2" if c >= 70: return "B1" if c >= 60: return "B2" return "C" - def _recommendations(self, pi, suf): + def _recommendations(self, pi: float, suf: float) -> List[str]: recs = [] if self.fe_conc_avg > 80: recs.append(f"铁离子浓度偏高({self.fe_conc_avg:.0f} g/L),建议加速换酸") @@ -359,14 +609,17 @@ class QualityPredictionModel: recs.append("工艺参数在正常范围内,当前设定可继续保持") return recs + def _phys_dict(self): + return {"K0": self.K0, "EA_R": self.EA_R, "N_CONC": self.N_CONC} + def calculate(self) -> Dict[str, Any]: x = [self.thickness, self.avg_speed, self.acid_conc_avg, self.acid_temp_avg, self.scale_weight, self.fe_conc_avg] pt = _pt_infer("quality", x) if pt is not None: - pi = round(float(min(max(pt[0]*self.K_cal, 0), 100)), 1) - suf = round(float(min(max(pt[1]*self.K_cal, 0), 100)), 1) + pi = round(float(min(max(pt[0] * self.K_cal, 0), 100)), 1) + suf = round(float(min(max(pt[1] * self.K_cal, 0), 100)), 1) src = "pt" else: pi = round(self._pi(), 1) @@ -377,17 +630,35 @@ class QualityPredictionModel: "pi_score": pi, "surface_score": suf, "overall_grade": self._grade(pi, suf), "recommendations": self._recommendations(pi, suf), - "K_cal": self.K_cal, "source": src, + "K_cal": self.K_cal, "phys_params": self._phys_dict(), "source": src, } - def calibrate(self, actual_grade): + def calibrate(self, actual_grade: str) -> float: + """ + 更新当前钢种 K_cal,保存样本,样本 ≥10 时自动触发物理参数拟合。 + 返回新 K_cal。 + """ grade_map = {"A1": 95, "A2": 85, "B1": 75, "B2": 65, "C": 50} target = grade_map.get(actual_grade, 75) res = self.calculate() pred = (res["pi_score"] + res["surface_score"]) / 2.0 - adj = max(0.7, min(1.3, 1.0 + 0.3*(target/max(pred,1.0) - 1.0))) + adj = max(0.7, min(1.3, 1.0 + 0.3 * (target / max(pred, 1.0) - 1.0))) self.K_cal = round(self.K_cal * adj, 4) - cal = _load_cal(); cal[self.CAL_KEY] = self.K_cal; _save_cal(cal) + _set_kcal(self.CAL_KEY, self.steel_grade, self.K_cal) + + append_sample({ + "model": "quality", + "grade": self.steel_grade, + "inputs": [self.thickness, self.avg_speed, self.acid_conc_avg, + self.acid_temp_avg, self.scale_weight, self.fe_conc_avg], + "predicted_grade": res["overall_grade"], + "actual_grade": actual_grade, + }) + + n = len(get_samples("quality", self.steel_grade)) + if n >= _FIT_MIN_SAMPLES and n % 5 == 0: + fit_quality_phys_params(self.steel_grade) + return self.K_cal @@ -412,17 +683,17 @@ class AcidConsumptionModel: def calculate(self) -> Dict[str, Any]: wt = self.coil_weight_kg / 1000.0 acid_base = self.ACID_WITH_REGEN if self.has_regen_station else self.ACID_WITHOUT_REGEN - fe_factor = 1.0 + max(0.0, (self.fe_conc_avg-100.0)/100.0)*0.4 + fe_factor = 1.0 + max(0.0, (self.fe_conc_avg - 100.0) / 100.0) * 0.4 acid_unit = round(acid_base * fe_factor, 3) return { - "coil_weight_t": round(wt, 3), - "acid_consumption_kg": round(acid_unit * wt, 2), - "acid_unit_kg_per_t": acid_unit, - "steam_consumption_kg": round(self.STEAM_UNIT * wt, 2), - "steam_unit_kg_per_t": self.STEAM_UNIT, - "power_consumption_kwh": round(self.POWER_UNIT * wt, 2), - "power_unit_kwh_per_t": self.POWER_UNIT, - "cooling_water_m3": round(self.COOLING_UNIT * wt, 3), + "coil_weight_t": round(wt, 3), + "acid_consumption_kg": round(acid_unit * wt, 2), + "acid_unit_kg_per_t": acid_unit, + "steam_consumption_kg": round(self.STEAM_UNIT * wt, 2), + "steam_unit_kg_per_t": self.STEAM_UNIT, + "power_consumption_kwh": round(self.POWER_UNIT * wt, 2), + "power_unit_kwh_per_t": self.POWER_UNIT, + "cooling_water_m3": round(self.COOLING_UNIT * wt, 3), "cooling_water_unit_m3_per_t": self.COOLING_UNIT, - "fe_conc_factor": round(fe_factor, 3), + "fe_conc_factor": round(fe_factor, 3), } diff --git a/backend/train_models.py b/backend/train_models.py index d7c4f83..a4d254f 100644 --- a/backend/train_models.py +++ b/backend/train_models.py @@ -1,12 +1,14 @@ """ 本地训练脚本 — 生成合成数据、训练 MLP、导出 ONNX + 运行方式(在 backend/ 目录下): - python train_models.py + python train_models.py # 纯合成数据 + python train_models.py --use-real-data # 混入生产实绩(10× 权重) 依赖(仅本地训练用,不进 Docker): pip install torch onnx onnxruntime scikit-learn numpy """ -import sys, json, time +import sys, json, time, argparse from pathlib import Path import numpy as np @@ -16,7 +18,10 @@ import torch.optim as optim from torch.utils.data import DataLoader, TensorDataset sys.path.insert(0, str(Path(__file__).parent)) -from app.services.prediction import AcidSpeedModel, TensionModel, QualityPredictionModel +from app.services.prediction import ( + AcidSpeedModel, TensionModel, QualityPredictionModel, + _SAMPLE_FILE, get_sample_stats, +) PT_DIR = Path(__file__).parent / "app" / "services" / "pt_models" PT_DIR.mkdir(parents=True, exist_ok=True) @@ -32,6 +37,8 @@ TENSION_ZONES = [ "rinse", "leveler", "s2_roller", "outlet", ] +REAL_SAMPLE_WEIGHT = 10 # 每条真实样本复制次数(等效权重) + # ─── 网络结构 ─────────────────────────────────────────────────────────────── @@ -95,6 +102,74 @@ def export_onnx(model: nn.Module, in_dim: int, path: Path): print(f" → {path.name} ({path.stat().st_size//1024} KB)") +# ─── 读取生产实绩样本 ──────────────────────────────────────────────────────── + +def load_real_samples(model_name: str): + """ + 从 production_samples.jsonl 读取指定模型的实绩样本, + 返回 (X_real, y_real) numpy 数组,或 (None, None)。 + """ + if not _SAMPLE_FILE.exists(): + return None, None + + Xs, ys = [], [] + with open(_SAMPLE_FILE) as f: + for line in f: + try: + r = json.loads(line) + if r.get("model") != model_name: + continue + inp = r.get("inputs") + if not inp: + continue + + if model_name == "acid_speed": + spd = r.get("actual_speed") + if spd is None: continue + Xs.append(inp[:14]) + ys.append([spd]) + + elif model_name == "tension": + kn = r.get("actual_kn") + if kn is None: continue + zone = r.get("zone") + if zone not in TENSION_ZONES: continue + # 单区段样本:只校准该区段,其他用模型预测填充 + m = TensionModel(inp[0], inp[1], inp[2], inp[3]) + res = m.calculate() + tensions = [res["zones"][z]["tension_kN"] for z in TENSION_ZONES] + tensions[TENSION_ZONES.index(zone)] = kn + Xs.append(inp[:4]) + ys.append(tensions) + + elif model_name == "quality": + ag = r.get("actual_grade") + if ag is None: continue + grade_map = {"A1": 95.0, "A2": 85.0, "B1": 75.0, "B2": 65.0, "C": 50.0} + target_pi = grade_map.get(ag, 75.0) + Xs.append(inp[:6]) + ys.append([target_pi, target_pi]) # pi ≈ surface as proxy + + except Exception: + continue + + if not Xs: + return None, None + + print(f" 实绩样本: {model_name} = {len(Xs)} 条 (将按 {REAL_SAMPLE_WEIGHT}× 权重混入)") + return np.array(Xs, np.float32), np.array(ys, np.float32) + + +def mix_with_real(X_syn: np.ndarray, y_syn: np.ndarray, + X_real, y_real) -> tuple: + """将真实样本重复 REAL_SAMPLE_WEIGHT 次后拼接到合成数据尾部。""" + if X_real is None or len(X_real) == 0: + return X_syn, y_syn + X_r = np.tile(X_real, (REAL_SAMPLE_WEIGHT, 1)) + y_r = np.tile(y_real, (REAL_SAMPLE_WEIGHT, 1)) + return np.concatenate([X_syn, X_r], axis=0), np.concatenate([y_syn, y_r], axis=0) + + # ─── 1. 酸洗速度模型 ──────────────────────────────────────────────────────── # 输入(14): thickness, scale_weight, conc×6, temp×6 # 输出(1): max_speed @@ -104,31 +179,27 @@ def gen_acid_speed(n: int): Xs, ys = [], [] skip = 0 while len(Xs) < n: - t = rng.uniform(0.5, 8.0) - sw = rng.uniform(4.0, 18.0) + t = rng.uniform(0.5, 8.0) + sw = rng.uniform(4.0, 18.0) conc = rng.uniform(60, 240, 6).tolist() temp = rng.uniform(52, 87, 6).tolist() tpi = rng.uniform(88, 97) try: - m = AcidSpeedModel( - thickness=t, width=1000.0, steel_grade="Q235", - acid_conc_list=conc, acid_temp_list=temp, - scale_weight=sw, target_pi=tpi, - ) + m = AcidSpeedModel(thickness=t, width=1000.0, steel_grade="Q235", + acid_conc_list=conc, acid_temp_list=temp, + scale_weight=sw, target_pi=tpi) spd = float(m.calculate()["max_speed"]) except Exception: skip += 1 continue - # 模拟真实工况偏差:±6% 相对噪声 + 钢种系数扰动 steel_factor = rng.choice([0.92, 0.96, 1.00, 1.03, 1.06]) noise = rng.normal(1.0, 0.06) spd_n = float(np.clip(spd * noise * steel_factor, 20, 180)) - Xs.append([t, sw] + conc + temp) ys.append([spd_n]) - print(f" acid_speed: {len(Xs)} samples (skipped {skip})") + print(f" 合成样本: acid_speed = {len(Xs)} 条 (skipped {skip})") return np.array(Xs, np.float32), np.array(ys, np.float32) @@ -142,21 +213,18 @@ def gen_tension(n: int): while len(Xs) < n: t = rng.uniform(0.5, 8.0) w = rng.uniform(600, 1600) - ys_ = rng.uniform(150, 600) - tc = rng.uniform(0.15, 0.35) + ys_= rng.uniform(150, 600) + tc = rng.uniform(0.15, 0.35) m = TensionModel(thickness=t, width=w, yield_strength=ys_, tension_coef=tc) res = m.calculate() tensions = [res["zones"][z]["tension_kN"] for z in TENSION_ZONES] - - # 各区段独立噪声(实测张力传感器精度约 ±4%) noise = rng.normal(1.0, 0.04, 10) tensions_n = [float(np.clip(v * noise[i], 0.1, 9999)) for i, v in enumerate(tensions)] - Xs.append([t, w, ys_, tc]) ys.append(tensions_n) - print(f" tension: {len(Xs)} samples") + print(f" 合成样本: tension = {len(Xs)} 条") return np.array(Xs, np.float32), np.array(ys, np.float32) @@ -175,35 +243,34 @@ def gen_quality(n: int): sw = rng.uniform(4, 18) fe = rng.uniform(20, 130) - m = QualityPredictionModel( - thickness=t, avg_speed=spd, - acid_conc_avg=conc, acid_temp_avg=temp, - scale_weight=sw, fe_conc_avg=fe, - ) + m = QualityPredictionModel(thickness=t, avg_speed=spd, + acid_conc_avg=conc, acid_temp_avg=temp, + scale_weight=sw, fe_conc_avg=fe) res = m.calculate() - pi = res["pi_score"] - suf = res["surface_score"] - - # ±6% 噪声模拟质检测量不确定度 - pi_n = float(np.clip(pi * rng.normal(1.0, 0.06), 0, 100)) - suf_n = float(np.clip(suf * rng.normal(1.0, 0.06), 0, 100)) - + pi_n = float(np.clip(res["pi_score"] * rng.normal(1.0, 0.06), 0, 100)) + suf_n = float(np.clip(res["surface_score"] * rng.normal(1.0, 0.06), 0, 100)) Xs.append([t, spd, conc, temp, sw, fe]) ys.append([pi_n, suf_n]) - print(f" quality: {len(Xs)} samples") + print(f" 合成样本: quality = {len(Xs)} 条") return np.array(Xs, np.float32), np.array(ys, np.float32) # ─── 主流程 ───────────────────────────────────────────────────────────────── -def main(): +def main(use_real_data: bool = False): scalers: dict = {} t0 = time.time() + if use_real_data: + stats = get_sample_stats() + print(f"\n生产实绩样本统计: {stats}") + # ── 酸洗速度 ── print("\n[1/3] 酸洗速度模型") X, y = gen_acid_speed(N) + if use_real_data: + X, y = mix_with_real(X, y, *load_real_samples("acid_speed")) Xn, Xm, Xs = z_scale(X) yn, ym, ys_ = z_scale(y) model = MLP(14, 1, hidden=(128, 64, 32)) @@ -218,6 +285,8 @@ def main(): # ── 张力 ── print("\n[2/3] 张力模型") X, y = gen_tension(N) + if use_real_data: + X, y = mix_with_real(X, y, *load_real_samples("tension")) Xn, Xm, Xs = z_scale(X) yn, ym, ys_ = z_scale(y) model = MLP(4, 10, hidden=(64, 64, 32)) @@ -233,6 +302,8 @@ def main(): # ── 质量 ── print("\n[3/3] 质量预测模型") X, y = gen_quality(N) + if use_real_data: + X, y = mix_with_real(X, y, *load_real_samples("quality")) Xn, Xm, Xs = z_scale(X) yn, ym, ys_ = z_scale(y) model = MLP(6, 2, hidden=(64, 32)) @@ -244,7 +315,6 @@ def main(): "y_mean": ym.tolist(), "y_std": ys_.tolist(), } - # ── 保存 scaler 参数 ── scaler_path = PT_DIR / "scalers.json" with open(scaler_path, "w") as f: json.dump(scalers, f, indent=2) @@ -253,4 +323,8 @@ def main(): if __name__ == "__main__": - main() + parser = argparse.ArgumentParser() + parser.add_argument("--use-real-data", action="store_true", + help="将 production_samples.jsonl 中的实绩混入训练集") + args = parser.parse_args() + main(use_real_data=args.use_real_data)