"""产线联动引擎:计划上线 / 上卷鞍座 / 生产实绩 / 停机自动检测。 状态流转: ready(准备好) --自动--> online(在线, 队首唯一) --移动--> 上卷鞍座(staged) --有速度/投入生产--> producing(生产中) --带头2000m--> produced(生产完成,产生实绩) 停机:当产线无生产(速度=0)持续超过 10min,自动新增一条未结束停机记录; 恢复速度后自动结束该记录,原因由用户后续补充录入。 """ from datetime import datetime from sqlalchemy import select, asc from sqlalchemy.ext.asyncio import AsyncSession from loguru import logger from app.models.plan import ProductionPlan from app.models.production import ProductionRecord from app.models.downtime import DowntimeRecord from app.models.line_state import LineState # ── 入口位置 ── SADDLE_NAME = "上卷鞍座" # 唯一会触发生产联动的位置 ENTRY_POSITIONS = [ "1#上卷小车", "2#上卷小车", "1#称重位", "2#称重位", "1#地辊", "2#地辊", "1#倒卷小车", "2#倒卷小车", SADDLE_NAME, ] # ── 仿真参数 ── TARGET_LENGTH_M = 2000.0 # 带头目标长度(生产完成阈值) SIM_SPEED_M_MIN = 600.0 # 仿真线速度 m/min(2000m ≈ 200s) DOWNTIME_THRESHOLD_S = 600 # 速度为0持续超过该秒数判定停机(10min) async def place_at_position(db: AsyncSession, plan: ProductionPlan, position: str): """把计划放到任意入口位置;位置唯一占用。上卷鞍座单独触发生产联动。""" if position not in ENTRY_POSITIONS: raise ValueError("未知入口位置") if position == SADDLE_NAME: await move_to_saddle(db, plan) return # 普通位置:清除同位置上的其它计划,移出鞍座(若在) res = await db.execute( select(ProductionPlan).where(ProductionPlan.position == position, ProductionPlan.id != plan.id) ) for o in res.scalars(): o.position = None plan.on_saddle = 0 plan.run_started_at = None plan.run_speed = 0 plan.run_length_m = 0 plan.position = position # 移动到入口端即变「在线」(人工触发,非自动) if plan.status != "produced": plan.status = "online" def _shift_of(dt: datetime) -> str: return "甲" if 8 <= dt.hour < 20 else "乙" async def _saddle_plan(db: AsyncSession): res = await db.execute(select(ProductionPlan).where(ProductionPlan.on_saddle == 1)) return res.scalars().first() async def ensure_online(db: AsyncSession): """在线为人工触发:用户手动「移动到入口」才变在线,引擎不再自动上线。""" return async def move_to_saddle(db: AsyncSession, plan: ProductionPlan): """把计划移动到上卷鞍座(预备生产位,尚未生产)。""" occupied = await _saddle_plan(db) if occupied and occupied.id != plan.id: raise ValueError("上卷鞍座已有预备卷,请先投入生产或移走") plan.on_saddle = 1 plan.position = SADDLE_NAME plan.saddle_at = datetime.now() plan.run_started_at = None plan.run_speed = 0 plan.run_length_m = 0 if plan.status != "produced": plan.status = "online" # 在鞍座预备(待投入生产) await ensure_online(db) async def commit_plan(db: AsyncSession, plan: ProductionPlan): """投入生产:鞍座预备卷进入生产中,离开鞍座进入产线(物料跟踪)。""" res = await db.execute(select(ProductionPlan).where(ProductionPlan.status == "producing")) if res.scalars().first() is not None: raise ValueError("产线已有钢卷在生产,请等待当前卷完成") now = datetime.now() plan.on_saddle = 0 # 离开上卷鞍座 plan.position = None plan.saddle_at = None plan.run_started_at = now plan.run_speed = SIM_SPEED_M_MIN plan.run_length_m = 0 plan.status = "producing" await ensure_online(db) # 鞍座空出 → 队首自动上线 async def _produce(db: AsyncSession, plan: ProductionPlan): """带头到达目标长度 → 生产完成,并生成实绩记录。""" now = datetime.now() plan.status = "produced" plan.produced_at = now plan.on_saddle = 0 plan.position = None plan.run_speed = 0 plan.run_length_m = TARGET_LENGTH_M weight = plan.incoming_weight or plan.plan_weight or 0 length = TARGET_LENGTH_M rec = ProductionRecord( coil_no=plan.cold_coil_no or plan.plan_no, sub_coil_no=plan.cold_coil_no, hot_coil_no=plan.hot_coil_no, plan_id=plan.id, shift=_shift_of(now), steel_grade=plan.steel_grade, incoming_thickness=plan.incoming_thickness, outlet_thickness=plan.product_thickness, deviation_upper=plan.deviation_upper, deviation_lower=plan.deviation_lower, incoming_width=plan.incoming_width, outlet_width=plan.product_width, incoming_weight=weight, weighed_weight=round(weight * 0.985, 4) if weight else None, packaging_req=plan.packaging_req, trim_req=plan.trim_req, surface_quality="合格", product_quality=99.0, product_length=length, length_per_ton=round(length / weight, 2) if weight else None, offline_time=now, status="PRODUCT", shift_date=now, start_time=plan.run_started_at or now, end_time=now, process_length=length, process_weight=weight, avg_speed=SIM_SPEED_M_MIN, max_speed=round(SIM_SPEED_M_MIN * 1.05, 1), inlet_thickness=plan.incoming_thickness, inlet_width=plan.incoming_width, quality_grade="A", operator="系统", process_data=plan.run_data, ) db.add(rec) logger.info(f"生产完成并产生实绩: {plan.cold_coil_no or plan.plan_no}") async def advance_production(db: AsyncSession): """推进产线在产卷(已离开鞍座):累计带头长度到 2000m → 生产完成。""" res = await db.execute(select(ProductionPlan).where(ProductionPlan.status == "producing")) plan = res.scalars().first() if not plan or not plan.run_started_at: return now = datetime.now() elapsed = (now - plan.run_started_at).total_seconds() plan.run_length_m = min(TARGET_LENGTH_M, (plan.run_speed or 0) / 60.0 * elapsed) if plan.run_length_m >= TARGET_LENGTH_M: await _produce(db, plan) async def _get_line_state(db: AsyncSession) -> LineState: st = await db.get(LineState, 1) if not st: st = LineState(id=1, speed=0) db.add(st) await db.flush() return st async def detect_downtime(db: AsyncSession): """速度为0持续超过阈值 → 自动新增停机;恢复速度 → 自动结束。""" res = await db.execute(select(ProductionPlan).where(ProductionPlan.status == "producing")) running = res.scalars().first() is not None now = datetime.now() st = await _get_line_state(db) if running: st.speed = SIM_SPEED_M_MIN st.zero_since = None if st.open_downtime_id: rec = await db.get(DowntimeRecord, st.open_downtime_id) if rec and rec.end_time is None: rec.end_time = now rec.duration = round((now - rec.start_time).total_seconds() / 60.0, 1) st.open_downtime_id = None else: st.speed = 0 if st.zero_since is None: st.zero_since = now elif st.open_downtime_id is None and (now - st.zero_since).total_seconds() >= DOWNTIME_THRESHOLD_S: rec = DowntimeRecord( category_code="AUTO", category_name="待定", shift=_shift_of(now), shift_date=now, start_time=st.zero_since, fault_desc="线速度为0持续超过10分钟(系统自动检测,原因待补充)", reporter="系统", is_planned=0, ) db.add(rec) await db.flush() st.open_downtime_id = rec.id logger.info("自动检测到停机,已新增待补充停机记录") async def auto_commit_saddle(db: AsyncSession): """产线空闲(无在产卷)且上卷鞍座有预备卷 → 自动投入生产(无需人工点击)。""" res = await db.execute(select(ProductionPlan).where(ProductionPlan.status == "producing")) if res.scalars().first() is not None: return # 产线占用:鞍座预备卷等待上一卷生产完成 saddle = await _saddle_plan(db) if saddle is None: return await commit_plan(db, saddle) async def tick(db: AsyncSession): """引擎单步:推进在产卷 → 产线空闲则自动投入鞍座预备卷 → 停机检测。""" await advance_production(db) await auto_commit_saddle(db) await detect_downtime(db) async def run_engine_loop(interval_s: int = 5): """后台循环,使联动在无人查看时也能自动推进。""" import asyncio from app.database import AsyncSessionLocal while True: try: async with AsyncSessionLocal() as db: await tick(db) await db.commit() except Exception as e: # noqa logger.warning(f"line engine tick 失败: {e}") await asyncio.sleep(interval_s)