feat(linkage): 计划-鞍座-实绩-停机联动 + 成本管理页

后端:
- 计划录入即「准备好」,队首(最早)自动「在线」(唯一)
- 新增上卷鞍座联动引擎 line_service:移动→鞍座→(有速度/投入生产)→生产中
  →带头达2000m→生产完成并自动产生实绩、持久化运行数据
- 停机自动检测:线速度为0持续>10min 自动新增待补充停机记录,恢复后自动结束
- /plan/start=移动到鞍座, 新增 /plan/{id}/commit 投入生产, /plan/saddle/current,
  /plan/seed 批量插入(轧制力模式);后台引擎循环自动推进
- 新增成本管理:CostRecord 模型 + /cost CRUD + 9 类成本项(乳化液/盐酸/碱/电/水/蒸汽…)

前端:
- 入口跟踪重构为单个上卷鞍座工位(实时速度/带头长度进度/投入生产)+待上卷卡片+队列,
  计划列表/卡片/队列均可「移动」
- 新增成本管理页(成本项切换 + 柱+线图 + 明细表 + 时间筛选 + 新增),布局参考乳化液耗量统计

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-29 13:57:59 +08:00
parent 2144f13b88
commit 9fb3dcb785
18 changed files with 969 additions and 149 deletions

View File

@@ -1,6 +1,6 @@
from fastapi import APIRouter
from app.api import auth, material, production, plan, downtime, equipment, message, dashboard
from app.api import prediction, pdi, quality, inspection
from app.api import prediction, pdi, quality, inspection, cost
router = APIRouter()
router.include_router(auth.router, prefix="/auth", tags=["认证"])
@@ -15,3 +15,4 @@ router.include_router(prediction.router, prefix="/prediction", tags=["工艺预
router.include_router(pdi.router, prefix="/pdi", tags=["PDI管理"])
router.include_router(quality.router, prefix="/quality", tags=["质量管理"])
router.include_router(inspection.router, prefix="/inspection", tags=["设备巡检"])
router.include_router(cost.router, prefix="/cost", tags=["成本管理"])

112
backend/app/api/cost.py Normal file
View File

@@ -0,0 +1,112 @@
from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, asc
from typing import Optional
from datetime import datetime
from app.database import get_db
from app.models.cost import CostRecord
from app.schemas.cost import CostItem, CostCreate, CostUpdate, CostOut
from app.schemas.common import Response
from app.services.auth_service import get_current_user
router = APIRouter()
# 预定义成本/消耗项
COST_ITEMS = [
{"item": "emulsion", "item_name": "乳化液", "unit": "L", "unit_cost_label": "L/t"},
{"item": "acid", "item_name": "盐酸", "unit": "L", "unit_cost_label": "L/t"},
{"item": "alkali", "item_name": "脱脂碱液", "unit": "kg", "unit_cost_label": "kg/t"},
{"item": "power", "item_name": "电耗", "unit": "kWh", "unit_cost_label": "kWh/t"},
{"item": "water", "item_name": "新水", "unit": "", "unit_cost_label": "m³/t"},
{"item": "steam", "item_name": "蒸汽", "unit": "t", "unit_cost_label": "kg/t"},
{"item": "antirust_oil","item_name": "防锈油", "unit": "L", "unit_cost_label": "L/t"},
{"item": "inhibitor", "item_name": "缓蚀剂", "unit": "kg", "unit_cost_label": "g/t"},
{"item": "roll", "item_name": "辊耗", "unit": "", "unit_cost_label": "支/kt"},
]
_ITEM_MAP = {x["item"]: x for x in COST_ITEMS}
def _parse_dt(s):
if not s:
return None
try:
return datetime.fromisoformat(s.replace('Z', ''))
except Exception:
return None
@router.get("/items", response_model=Response[list[CostItem]])
async def list_items(_ = Depends(get_current_user)):
return Response.ok([CostItem(**x) for x in COST_ITEMS])
@router.get("/", response_model=Response[list[CostOut]])
async def list_records(
item: Optional[str] = None,
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
db: AsyncSession = Depends(get_db),
_ = Depends(get_current_user),
):
query = select(CostRecord).order_by(asc(CostRecord.record_date))
if item:
query = query.where(CostRecord.item == item)
_sd = _parse_dt(start_date)
if _sd:
query = query.where(CostRecord.record_date >= _sd)
_ed = _parse_dt(end_date)
if _ed:
query = query.where(CostRecord.record_date <= _ed)
result = await db.execute(query)
return Response.ok([CostOut.model_validate(r) for r in result.scalars()])
@router.post("/", response_model=Response[CostOut])
async def create_record(
body: CostCreate,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user),
):
meta = _ITEM_MAP.get(body.item)
if not meta:
raise HTTPException(status_code=400, detail="未知成本项")
rec = CostRecord(
item=body.item,
item_name=meta["item_name"],
unit=meta["unit"],
record_date=body.record_date,
shift_a=body.shift_a or 0,
shift_b=body.shift_b or 0,
unit_cost=body.unit_cost or 0,
remark=body.remark,
created_by=current_user.username,
)
db.add(rec)
await db.flush()
return Response.ok(CostOut.model_validate(rec))
@router.put("/{rec_id}", response_model=Response[CostOut])
async def update_record(
rec_id: int,
body: CostUpdate,
db: AsyncSession = Depends(get_db),
_ = Depends(get_current_user),
):
rec = await db.get(CostRecord, rec_id)
if not rec:
raise HTTPException(status_code=404, detail="记录不存在")
for k, v in body.model_dump(exclude_none=True).items():
setattr(rec, k, v)
await db.flush()
return Response.ok(CostOut.model_validate(rec))
@router.delete("/{rec_id}", response_model=Response[dict])
async def delete_record(rec_id: int, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)):
rec = await db.get(CostRecord, rec_id)
if not rec:
raise HTTPException(status_code=404, detail="记录不存在")
await db.delete(rec)
return Response.ok({"deleted": rec_id})

View File

@@ -9,6 +9,7 @@ from app.models.plan import ProductionPlan
from app.schemas.plan import PlanCreate, PlanUpdate, PlanOut, PlanTemplate
from app.schemas.common import Response, PageResponse
from app.services.auth_service import get_current_user
from app.services import line_service
router = APIRouter()
@@ -42,6 +43,8 @@ async def list_plans(
db: AsyncSession = Depends(get_db),
_ = Depends(get_current_user),
):
# 拉取前保证队首自动上线(鞍座推进/停机检测由 /saddle 与后台引擎负责)
await line_service.ensure_online(db)
query = select(ProductionPlan).order_by(desc(ProductionPlan.plan_date))
if status:
query = query.where(ProductionPlan.status == status)
@@ -80,6 +83,9 @@ async def create_plan(
plan = ProductionPlan(**body.model_dump(), created_by=current_user.username)
db.add(plan)
await db.flush()
# 录入即准备好;若当前无在线计划,则队首自动上线
await line_service.ensure_online(db)
await db.refresh(plan)
return Response.ok(PlanOut.model_validate(plan))
@@ -121,21 +127,85 @@ async def confirm_plan(plan_id: int, db: AsyncSession = Depends(get_db), _ = Dep
@router.patch("/{plan_id}/start", response_model=Response[PlanOut])
async def start_producing(plan_id: int, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)):
"""移动到入口开始生产本条→producing其它 producing→online单卷在产)。"""
async def move_to_saddle(plan_id: int, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)):
"""移动:把在线计划推到上卷鞍座(等待速度/投入生产)。"""
result = await db.execute(select(ProductionPlan).where(ProductionPlan.id == plan_id))
plan = result.scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail="计划不存在")
# 其它正在生产的全部回退为在线(强制单卷在产)
others = await db.execute(
select(ProductionPlan).where(
ProductionPlan.status == "producing",
ProductionPlan.id != plan_id,
)
)
for o in others.scalars():
o.status = "online"
plan.status = "producing"
try:
await line_service.move_to_saddle(db, plan)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
await db.flush()
await db.refresh(plan)
return Response.ok(PlanOut.model_validate(plan))
@router.patch("/{plan_id}/commit", response_model=Response[PlanOut])
async def commit_producing(plan_id: int, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)):
"""投入生产:把鞍座上的计划置为生产中(兜底未实时变化的数据)。"""
result = await db.execute(select(ProductionPlan).where(ProductionPlan.id == plan_id))
plan = result.scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail="计划不存在")
await line_service.commit_plan(db, plan)
await db.flush()
await db.refresh(plan)
return Response.ok(PlanOut.model_validate(plan))
@router.get("/saddle/current", response_model=Response[Optional[PlanOut]])
async def get_saddle(db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)):
"""上卷鞍座当前计划(含实时速度/已生产长度),并推进联动。"""
await line_service.tick(db)
res = await db.execute(select(ProductionPlan).where(ProductionPlan.on_saddle == 1))
plan = res.scalars().first()
return Response.ok(PlanOut.model_validate(plan) if plan else None)
@router.post("/seed", response_model=Response[dict])
async def seed_plans(count: int = 50, db: AsyncSession = Depends(get_db), current_user = Depends(get_current_user)):
"""批量插入准备好的计划(轧制力模式),用于演示联动。"""
import random
from datetime import timedelta
res = await db.execute(select(func.count()).select_from(ProductionPlan))
base = (res.scalar() or 0)
grades = ["QStE340TM", "SPHC", "SAPH440", "B510L", "QTGLG-2019"]
now = datetime.now()
created = 0
for i in range(count):
seq = base + i + 1
it = round(random.uniform(2.0, 5.0), 2)
pt = round(it - random.uniform(0.0, 0.1), 2)
iw = random.choice([1000, 1050, 1100, 1150, 1200, 1250])
wt = round(random.uniform(15.0, 26.0), 3)
plan = ProductionPlan(
plan_no=f"PL{now:%Y%m%d}{seq:04d}",
plan_date=now + timedelta(minutes=i),
status="ready",
cold_coil_no=f"C{now:%y%m%d}{seq:04d}",
hot_coil_no=f"H{now:%y%m%d}{seq:04d}",
steel_grade=random.choice(grades),
incoming_thickness=it,
product_thickness=pt,
deviation_upper=0.05,
deviation_lower=-0.05,
incoming_width=iw,
product_width=iw - random.choice([0, 4, 6]),
packaging_req=random.choice(["裸包", "筒包"]),
trim_req=random.choice(["切边", "不切边"]),
rolling_mode="轧制力模式",
coil_diameter=random.choice([1450, 1500, 1550]),
split_count=1,
next_process="冷轧",
incoming_weight=wt,
incoming_od=random.choice([1400, 1450, 1500]),
split_weights=[wt],
created_by=current_user.username,
)
db.add(plan)
created += 1
await db.flush()
await line_service.ensure_online(db)
return Response.ok({"created": created})

View File

@@ -59,6 +59,13 @@ async def _run_migrations(conn):
"ALTER TABLE production_plans ADD COLUMN IF NOT EXISTS incoming_weight DOUBLE PRECISION",
"ALTER TABLE production_plans ADD COLUMN IF NOT EXISTS incoming_od DOUBLE PRECISION",
"ALTER TABLE production_plans ADD COLUMN IF NOT EXISTS split_weights JSONB",
# 上卷鞍座 / 生产联动字段
"ALTER TABLE production_plans ADD COLUMN IF NOT EXISTS on_saddle INTEGER DEFAULT 0",
"ALTER TABLE production_plans ADD COLUMN IF NOT EXISTS saddle_at TIMESTAMP",
"ALTER TABLE production_plans ADD COLUMN IF NOT EXISTS run_started_at TIMESTAMP",
"ALTER TABLE production_plans ADD COLUMN IF NOT EXISTS run_speed DOUBLE PRECISION DEFAULT 0",
"ALTER TABLE production_plans ADD COLUMN IF NOT EXISTS run_length_m DOUBLE PRECISION DEFAULT 0",
"ALTER TABLE production_plans ADD COLUMN IF NOT EXISTS produced_at TIMESTAMP",
# 状态列改为 VARCHAR 以适配新值
"ALTER TABLE production_plans ALTER COLUMN status TYPE VARCHAR(20) USING status::text",
# production_records 新字段

View File

@@ -14,15 +14,21 @@ async def lifespan(app: FastAPI):
logger.info("启动推拉酸洗线二级系统...")
await init_db()
await _create_default_admin()
await _ensure_line_state()
# 启动L1报文接收服务UDP
from app.services.message_parser import l1_server
import app.services.material_service # noqa: 注册报文处理器
await l1_server.start()
# 启动产线联动引擎(计划上线/上卷鞍座/实绩/停机自动检测)
from app.services.line_service import run_engine_loop
engine_task = asyncio.create_task(run_engine_loop())
logger.info("系统启动完成")
yield
engine_task.cancel()
l1_server.stop()
logger.info("系统已停止")
@@ -48,6 +54,17 @@ async def _create_default_admin():
logger.info("默认管理员已创建: admin / admin123")
async def _ensure_line_state():
"""确保产线状态单例行存在(避免并发首建竞争)。"""
from app.database import AsyncSessionLocal
from app.models.line_state import LineState
async with AsyncSessionLocal() as db:
if not await db.get(LineState, 1):
db.add(LineState(id=1, speed=0))
await db.commit()
app = FastAPI(
title="推拉酸洗线二级系统",
description="Pickling Line Level-2 MES System",

View File

@@ -9,6 +9,8 @@ from app.models.pdi import PDIRecord
from app.models.quality import QcTask, QcTaskItem, QcDefect
from app.models.energy import EnergyRecord
from app.models.inspection import EqpChecklist, EqpChecklistItem, EqpInspectionRecord, EqpInspectionDetail
from app.models.line_state import LineState
from app.models.cost import CostRecord
__all__ = [
"User",
@@ -22,4 +24,5 @@ __all__ = [
"QcTask", "QcTaskItem", "QcDefect",
"EnergyRecord",
"EqpChecklist", "EqpChecklistItem", "EqpInspectionRecord", "EqpInspectionDetail",
"LineState", "CostRecord",
]

View File

@@ -0,0 +1,20 @@
from sqlalchemy import Column, Integer, String, Float, DateTime, Text, func
from app.database import Base
class CostRecord(Base):
"""成本/消耗记录(乳化液、盐酸、电、水、蒸汽等)"""
__tablename__ = "cost_records"
id = Column(Integer, primary_key=True, index=True)
item = Column(String(30), index=True, nullable=False, comment="成本项编码")
item_name = Column(String(50), comment="成本项名称")
unit = Column(String(20), comment="计量单位")
record_date = Column(DateTime, nullable=False, index=True, comment="记录时间")
shift_a = Column(Float, default=0, comment="A班量")
shift_b = Column(Float, default=0, comment="B班量")
unit_cost = Column(Float, default=0, comment="吨耗量")
remark = Column(Text, comment="备注")
created_by = Column(String(50))
created_at = Column(DateTime, server_default=func.now())
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

View File

@@ -0,0 +1,13 @@
from sqlalchemy import Column, Integer, Float, DateTime, func
from app.database import Base
class LineState(Base):
"""产线运行状态单例id=1用于停机自动检测。"""
__tablename__ = "line_state"
id = Column(Integer, primary_key=True, index=True)
speed = Column(Float, default=0, comment="当前线速度 m/min")
zero_since = Column(DateTime, comment="速度为0的起始时间")
open_downtime_id = Column(Integer, comment="当前未结束的自动停机记录id")
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now())

View File

@@ -35,6 +35,14 @@ class ProductionPlan(Base):
incoming_od = Column(Float, comment="来料外径 mm")
split_weights = Column(JSON, comment="分卷重量 [t,...]")
# 上卷鞍座 / 生产联动
on_saddle = Column(Integer, default=0, comment="是否在上卷鞍座 0/1")
saddle_at = Column(DateTime, comment="移动到鞍座时间")
run_started_at = Column(DateTime, comment="投入生产(有速度)时间")
run_speed = Column(Float, default=0, comment="当前线速度 m/min")
run_length_m = Column(Float, default=0, comment="带头已生产长度 m")
produced_at = Column(DateTime, comment="生产完成时间")
# 兼容历史字段
shift = Column(String(10), comment="班次")
plan_quantity = Column(Integer, default=0)

View File

@@ -0,0 +1,43 @@
from pydantic import BaseModel
from typing import Optional
from datetime import datetime
class CostItem(BaseModel):
item: str
item_name: str
unit: str
unit_cost_label: str # 吨耗单位
class CostCreate(BaseModel):
item: str
record_date: datetime
shift_a: Optional[float] = 0
shift_b: Optional[float] = 0
unit_cost: Optional[float] = 0
remark: Optional[str] = None
class CostUpdate(BaseModel):
record_date: Optional[datetime] = None
shift_a: Optional[float] = None
shift_b: Optional[float] = None
unit_cost: Optional[float] = None
remark: Optional[str] = None
class CostOut(BaseModel):
id: int
item: str
item_name: Optional[str] = None
unit: Optional[str] = None
record_date: datetime
shift_a: Optional[float] = 0
shift_b: Optional[float] = 0
unit_cost: Optional[float] = 0
remark: Optional[str] = None
created_at: datetime
class Config:
from_attributes = True

View File

@@ -24,7 +24,7 @@ class PlanCreate(BaseModel):
incoming_weight: Optional[float] = None
incoming_od: Optional[float] = None
split_weights: Optional[List[Optional[float]]] = None
status: Optional[str] = "online"
status: Optional[str] = "ready"
remark: Optional[str] = None
@@ -78,6 +78,13 @@ class PlanOut(BaseModel):
remark: Optional[str] = None
created_by: Optional[str] = None
created_at: datetime
# 上卷鞍座 / 生产联动
on_saddle: Optional[int] = 0
saddle_at: Optional[datetime] = None
run_started_at: Optional[datetime] = None
run_speed: Optional[float] = 0
run_length_m: Optional[float] = 0
produced_at: Optional[datetime] = None
class Config:
from_attributes = True

View File

@@ -0,0 +1,216 @@
"""产线联动引擎:计划上线 / 上卷鞍座 / 生产实绩 / 停机自动检测。
状态流转:
ready(准备好) --自动--> online(在线, 队首唯一) --移动--> 上卷鞍座(staged)
--有速度/投入生产--> producing(生产中) --带头2000m--> produced(生产完成,产生实绩)
停机:当产线无生产(速度=0)持续超过 10min自动新增一条未结束停机记录
恢复速度后自动结束该记录,原因由用户后续补充录入。
"""
from datetime import datetime
from sqlalchemy import select, asc
from sqlalchemy.ext.asyncio import AsyncSession
from loguru import logger
from app.models.plan import ProductionPlan
from app.models.production import ProductionRecord
from app.models.downtime import DowntimeRecord
from app.models.line_state import LineState
# ── 仿真参数 ──
TARGET_LENGTH_M = 2000.0 # 带头目标长度(生产完成阈值)
SIM_SPEED_M_MIN = 600.0 # 仿真线速度 m/min2000m ≈ 200s
DOWNTIME_THRESHOLD_S = 600 # 速度为0持续超过该秒数判定停机10min
def _shift_of(dt: datetime) -> str:
return "" if 8 <= dt.hour < 20 else ""
async def _saddle_plan(db: AsyncSession):
res = await db.execute(select(ProductionPlan).where(ProductionPlan.on_saddle == 1))
return res.scalars().first()
async def ensure_online(db: AsyncSession):
"""保证恰好一条 online队首最早录入的 ready"""
res = await db.execute(select(ProductionPlan).where(ProductionPlan.status == "online"))
online = list(res.scalars())
if len(online) > 1:
# 仅保留最早的一条为 online其余回退 ready
online.sort(key=lambda p: (p.plan_date or datetime.max, p.id))
for p in online[1:]:
p.status = "ready"
online = online[:1]
if not online:
res = await db.execute(
select(ProductionPlan)
.where(ProductionPlan.status == "ready")
.order_by(asc(ProductionPlan.plan_date), asc(ProductionPlan.id))
.limit(1)
)
head = res.scalar_one_or_none()
if head:
head.status = "online"
async def move_to_saddle(db: AsyncSession, plan: ProductionPlan):
"""把在线计划移动到上卷鞍座staged等待速度/投入生产)。"""
occupied = await _saddle_plan(db)
if occupied and occupied.id != plan.id:
raise ValueError("上卷鞍座已被占用,请等待当前钢卷生产完成")
plan.on_saddle = 1
plan.saddle_at = datetime.now()
plan.run_started_at = None
plan.run_speed = 0
plan.run_length_m = 0
if plan.status not in ("producing", "produced"):
plan.status = "online"
await ensure_online(db)
async def commit_plan(db: AsyncSession, plan: ProductionPlan):
"""投入生产:鞍座计划有速度 → 生产中。"""
if plan.on_saddle != 1:
await move_to_saddle(db, plan)
if plan.run_started_at is None:
plan.run_started_at = datetime.now()
plan.run_speed = SIM_SPEED_M_MIN
plan.status = "producing"
await ensure_online(db)
async def _produce(db: AsyncSession, plan: ProductionPlan):
"""带头到达目标长度 → 生产完成,并生成实绩记录。"""
now = datetime.now()
plan.status = "produced"
plan.produced_at = now
plan.on_saddle = 0
plan.run_speed = 0
plan.run_length_m = TARGET_LENGTH_M
weight = plan.incoming_weight or plan.plan_weight or 0
length = TARGET_LENGTH_M
rec = ProductionRecord(
coil_no=plan.cold_coil_no or plan.plan_no,
sub_coil_no=plan.cold_coil_no,
hot_coil_no=plan.hot_coil_no,
plan_id=plan.id,
shift=_shift_of(now),
steel_grade=plan.steel_grade,
incoming_thickness=plan.incoming_thickness,
outlet_thickness=plan.product_thickness,
deviation_upper=plan.deviation_upper,
deviation_lower=plan.deviation_lower,
incoming_width=plan.incoming_width,
outlet_width=plan.product_width,
incoming_weight=weight,
weighed_weight=round(weight * 0.985, 4) if weight else None,
packaging_req=plan.packaging_req,
trim_req=plan.trim_req,
surface_quality="合格",
product_quality=99.0,
product_length=length,
length_per_ton=round(length / weight, 2) if weight else None,
offline_time=now,
status="PRODUCT",
shift_date=now,
start_time=plan.run_started_at or now,
end_time=now,
process_length=length,
process_weight=weight,
avg_speed=SIM_SPEED_M_MIN,
max_speed=round(SIM_SPEED_M_MIN * 1.05, 1),
inlet_thickness=plan.incoming_thickness,
inlet_width=plan.incoming_width,
quality_grade="A",
operator="系统",
)
db.add(rec)
logger.info(f"生产完成并产生实绩: {plan.cold_coil_no or plan.plan_no}")
async def advance_saddle(db: AsyncSession):
"""推进鞍座计划staged 自动获得速度 → 生产中;累计长度到 2000m → 完成。"""
plan = await _saddle_plan(db)
if not plan:
return
now = datetime.now()
# staged在线且在鞍座自动获得速度模拟 PLC 速度信号
if plan.status == "online" and plan.run_started_at is None:
plan.run_started_at = now
plan.run_speed = SIM_SPEED_M_MIN
plan.status = "producing"
await ensure_online(db)
if plan.status == "producing" and plan.run_started_at:
elapsed = (now - plan.run_started_at).total_seconds()
plan.run_length_m = min(TARGET_LENGTH_M, (plan.run_speed or 0) / 60.0 * elapsed)
if plan.run_length_m >= TARGET_LENGTH_M:
await _produce(db, plan)
async def _get_line_state(db: AsyncSession) -> LineState:
st = await db.get(LineState, 1)
if not st:
st = LineState(id=1, speed=0)
db.add(st)
await db.flush()
return st
async def detect_downtime(db: AsyncSession):
"""速度为0持续超过阈值 → 自动新增停机;恢复速度 → 自动结束。"""
res = await db.execute(select(ProductionPlan).where(ProductionPlan.status == "producing"))
running = res.scalars().first() is not None
now = datetime.now()
st = await _get_line_state(db)
if running:
st.speed = SIM_SPEED_M_MIN
st.zero_since = None
if st.open_downtime_id:
rec = await db.get(DowntimeRecord, st.open_downtime_id)
if rec and rec.end_time is None:
rec.end_time = now
rec.duration = round((now - rec.start_time).total_seconds() / 60.0, 1)
st.open_downtime_id = None
else:
st.speed = 0
if st.zero_since is None:
st.zero_since = now
elif st.open_downtime_id is None and (now - st.zero_since).total_seconds() >= DOWNTIME_THRESHOLD_S:
rec = DowntimeRecord(
category_code="AUTO",
category_name="待定",
shift=_shift_of(now),
shift_date=now,
start_time=st.zero_since,
fault_desc="线速度为0持续超过10分钟系统自动检测原因待补充",
reporter="系统",
is_planned=0,
)
db.add(rec)
await db.flush()
st.open_downtime_id = rec.id
logger.info("自动检测到停机,已新增待补充停机记录")
async def tick(db: AsyncSession):
"""引擎单步:上线 + 推进鞍座 + 停机检测。"""
await ensure_online(db)
await advance_saddle(db)
await detect_downtime(db)
async def run_engine_loop(interval_s: int = 15):
"""后台循环,使联动在无人查看时也能自动推进。"""
import asyncio
from app.database import AsyncSessionLocal
while True:
try:
async with AsyncSessionLocal() as db:
await tick(db)
await db.commit()
except Exception as e: # noqa
logger.warning(f"line engine tick 失败: {e}")
await asyncio.sleep(interval_s)