Files
pickling-mes/backend/app/services/line_service.py
wangyu 1a6deea4bb fix(linkage): 单卷在产时不再保留在线队首,避免生产中仍显示在线
- ensure_online:当有卷在上卷鞍座/生产中时,所有在线计划回退为准备好;
  当前卷生产完成后再把最早准备好的置为唯一在线

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-29 15:06:10 +08:00

264 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""产线联动引擎:计划上线 / 上卷鞍座 / 生产实绩 / 停机自动检测。
状态流转:
ready(准备好) --自动--> online(在线, 队首唯一) --移动--> 上卷鞍座(staged)
--有速度/投入生产--> producing(生产中) --带头2000m--> produced(生产完成,产生实绩)
停机:当产线无生产(速度=0)持续超过 10min自动新增一条未结束停机记录
恢复速度后自动结束该记录,原因由用户后续补充录入。
"""
from datetime import datetime
from sqlalchemy import select, asc
from sqlalchemy.ext.asyncio import AsyncSession
from loguru import logger
from app.models.plan import ProductionPlan
from app.models.production import ProductionRecord
from app.models.downtime import DowntimeRecord
from app.models.line_state import LineState
# ── 入口位置 ──
SADDLE_NAME = "上卷鞍座" # 唯一会触发生产联动的位置
ENTRY_POSITIONS = [
"1#上卷小车", "2#上卷小车",
"1#称重位", "2#称重位",
"1#地辊", "2#地辊",
"1#倒卷小车", "2#倒卷小车",
SADDLE_NAME,
]
# ── 仿真参数 ──
TARGET_LENGTH_M = 2000.0 # 带头目标长度(生产完成阈值)
SIM_SPEED_M_MIN = 600.0 # 仿真线速度 m/min2000m ≈ 200s
DOWNTIME_THRESHOLD_S = 600 # 速度为0持续超过该秒数判定停机10min
async def place_at_position(db: AsyncSession, plan: ProductionPlan, position: str):
"""把计划放到任意入口位置;位置唯一占用。上卷鞍座单独触发生产联动。"""
if position not in ENTRY_POSITIONS:
raise ValueError("未知入口位置")
if position == SADDLE_NAME:
await move_to_saddle(db, plan)
return
# 普通位置:清除同位置上的其它计划,移出鞍座(若在)
res = await db.execute(
select(ProductionPlan).where(ProductionPlan.position == position, ProductionPlan.id != plan.id)
)
for o in res.scalars():
o.position = None
plan.on_saddle = 0
plan.run_started_at = None
plan.run_speed = 0
plan.run_length_m = 0
plan.position = position
if plan.status == "producing":
plan.status = "online"
await ensure_online(db)
def _shift_of(dt: datetime) -> str:
return "" if 8 <= dt.hour < 20 else ""
async def _saddle_plan(db: AsyncSession):
res = await db.execute(select(ProductionPlan).where(ProductionPlan.on_saddle == 1))
return res.scalars().first()
async def ensure_online(db: AsyncSession):
"""单卷在产:有卷在上卷鞍座/生产中时不保留在线队首;否则把最早 ready 置为唯一在线。"""
res = await db.execute(
select(ProductionPlan).where(ProductionPlan.status == "online", ProductionPlan.on_saddle != 1)
)
online = list(res.scalars())
# 是否已有在产/在鞍座的卷
res2 = await db.execute(
select(ProductionPlan).where(
(ProductionPlan.status == "producing") | (ProductionPlan.on_saddle == 1)
)
)
active = res2.scalars().first() is not None
if active:
# 正在生产时,不再保留「在线」队首,全部回退 ready等当前卷完成
for p in online:
p.status = "ready"
return
if len(online) > 1:
online.sort(key=lambda p: (p.plan_date or datetime.max, p.id))
for p in online[1:]:
p.status = "ready"
online = online[:1]
if not online:
res = await db.execute(
select(ProductionPlan)
.where(ProductionPlan.status == "ready")
.order_by(asc(ProductionPlan.plan_date), asc(ProductionPlan.id))
.limit(1)
)
head = res.scalar_one_or_none()
if head:
head.status = "online"
async def move_to_saddle(db: AsyncSession, plan: ProductionPlan):
"""把计划移动到上卷鞍座:上卷即获得速度 → 生产中。"""
occupied = await _saddle_plan(db)
if occupied and occupied.id != plan.id:
raise ValueError("上卷鞍座已被占用,请等待当前钢卷生产完成")
now = datetime.now()
plan.on_saddle = 1
plan.position = SADDLE_NAME
plan.saddle_at = now
if plan.status != "produced":
plan.run_started_at = now
plan.run_speed = SIM_SPEED_M_MIN
plan.run_length_m = 0
plan.status = "producing"
await ensure_online(db)
async def commit_plan(db: AsyncSession, plan: ProductionPlan):
"""投入生产(兜底):鞍座计划置为生产中。"""
if plan.on_saddle != 1:
await move_to_saddle(db, plan)
return
if plan.run_started_at is None:
plan.run_started_at = datetime.now()
plan.run_speed = SIM_SPEED_M_MIN
plan.status = "producing"
await ensure_online(db)
async def _produce(db: AsyncSession, plan: ProductionPlan):
"""带头到达目标长度 → 生产完成,并生成实绩记录。"""
now = datetime.now()
plan.status = "produced"
plan.produced_at = now
plan.on_saddle = 0
plan.position = None
plan.run_speed = 0
plan.run_length_m = TARGET_LENGTH_M
weight = plan.incoming_weight or plan.plan_weight or 0
length = TARGET_LENGTH_M
rec = ProductionRecord(
coil_no=plan.cold_coil_no or plan.plan_no,
sub_coil_no=plan.cold_coil_no,
hot_coil_no=plan.hot_coil_no,
plan_id=plan.id,
shift=_shift_of(now),
steel_grade=plan.steel_grade,
incoming_thickness=plan.incoming_thickness,
outlet_thickness=plan.product_thickness,
deviation_upper=plan.deviation_upper,
deviation_lower=plan.deviation_lower,
incoming_width=plan.incoming_width,
outlet_width=plan.product_width,
incoming_weight=weight,
weighed_weight=round(weight * 0.985, 4) if weight else None,
packaging_req=plan.packaging_req,
trim_req=plan.trim_req,
surface_quality="合格",
product_quality=99.0,
product_length=length,
length_per_ton=round(length / weight, 2) if weight else None,
offline_time=now,
status="PRODUCT",
shift_date=now,
start_time=plan.run_started_at or now,
end_time=now,
process_length=length,
process_weight=weight,
avg_speed=SIM_SPEED_M_MIN,
max_speed=round(SIM_SPEED_M_MIN * 1.05, 1),
inlet_thickness=plan.incoming_thickness,
inlet_width=plan.incoming_width,
quality_grade="A",
operator="系统",
)
db.add(rec)
logger.info(f"生产完成并产生实绩: {plan.cold_coil_no or plan.plan_no}")
async def advance_saddle(db: AsyncSession):
"""推进鞍座计划:累计带头长度到 2000m → 生产完成。"""
plan = await _saddle_plan(db)
if not plan:
return
now = datetime.now()
if plan.status == "producing" and plan.run_started_at:
elapsed = (now - plan.run_started_at).total_seconds()
plan.run_length_m = min(TARGET_LENGTH_M, (plan.run_speed or 0) / 60.0 * elapsed)
if plan.run_length_m >= TARGET_LENGTH_M:
await _produce(db, plan)
async def _get_line_state(db: AsyncSession) -> LineState:
st = await db.get(LineState, 1)
if not st:
st = LineState(id=1, speed=0)
db.add(st)
await db.flush()
return st
async def detect_downtime(db: AsyncSession):
"""速度为0持续超过阈值 → 自动新增停机;恢复速度 → 自动结束。"""
res = await db.execute(select(ProductionPlan).where(ProductionPlan.status == "producing"))
running = res.scalars().first() is not None
now = datetime.now()
st = await _get_line_state(db)
if running:
st.speed = SIM_SPEED_M_MIN
st.zero_since = None
if st.open_downtime_id:
rec = await db.get(DowntimeRecord, st.open_downtime_id)
if rec and rec.end_time is None:
rec.end_time = now
rec.duration = round((now - rec.start_time).total_seconds() / 60.0, 1)
st.open_downtime_id = None
else:
st.speed = 0
if st.zero_since is None:
st.zero_since = now
elif st.open_downtime_id is None and (now - st.zero_since).total_seconds() >= DOWNTIME_THRESHOLD_S:
rec = DowntimeRecord(
category_code="AUTO",
category_name="待定",
shift=_shift_of(now),
shift_date=now,
start_time=st.zero_since,
fault_desc="线速度为0持续超过10分钟系统自动检测原因待补充",
reporter="系统",
is_planned=0,
)
db.add(rec)
await db.flush()
st.open_downtime_id = rec.id
logger.info("自动检测到停机,已新增待补充停机记录")
async def tick(db: AsyncSession):
"""引擎单步:上线 + 推进鞍座 + 停机检测。"""
await ensure_online(db)
await advance_saddle(db)
await detect_downtime(db)
async def run_engine_loop(interval_s: int = 15):
"""后台循环,使联动在无人查看时也能自动推进。"""
import asyncio
from app.database import AsyncSessionLocal
while True:
try:
async with AsyncSessionLocal() as db:
await tick(db)
await db.commit()
except Exception as e: # noqa
logger.warning(f"line engine tick 失败: {e}")
await asyncio.sleep(interval_s)