Files
pickling-mes/backend/app/api/plan.py
wangyu 9cf422ef0d feat(plan): 计划详细面板 + 来料重量/外径/分卷重量 + 在线/生产中状态机 + 入口移动 + 上次模板回填
- backend: plan 增加 incoming_weight/incoming_od/split_weights(JSON) 字段及迁移
- backend: GET /plan/last-template 返回最近一条计划的工艺字段用于新增回填(多端共享)
- backend: PATCH /plan/{id}/start 设为 producing,强制单卷在产(其他 producing 回退 online)
- backend: 生成实绩时按卷号自动把对应计划状态置为 produced
- frontend: 新增计划默认状态 online;新增时调用 last-template 自动回填
- frontend: Plan 表格行点击展开 计划详细 面板(按截图布局)
- frontend: Plan 行操作增加「移动」(ready/online → producing)
- frontend: 物料跟踪页加 在线计划队列 + 入口移动按钮,显示当前生产中卷
- frontend: 计划弹窗新增 轧制模式/来料重量/来料外径/1-6#分卷重量
2026-06-21 23:42:22 +08:00

142 lines
5.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, desc
from typing import Optional
from datetime import datetime
from app.database import get_db
from app.models.plan import ProductionPlan
from app.schemas.plan import PlanCreate, PlanUpdate, PlanOut, PlanTemplate
from app.schemas.common import Response, PageResponse
from app.services.auth_service import get_current_user
router = APIRouter()
def _parse_dt(s):
if not s:
return None
try:
return datetime.fromisoformat(s.replace('Z', ''))
except Exception:
return None
TEMPLATE_FIELDS = (
"steel_grade", "incoming_thickness", "product_thickness",
"deviation_upper", "deviation_lower",
"incoming_width", "product_width",
"packaging_req", "trim_req", "rolling_mode",
"coil_diameter", "split_count", "next_process",
"incoming_weight", "incoming_od", "split_weights",
)
@router.get("/", response_model=Response[PageResponse[PlanOut]])
async def list_plans(
page: int = 1,
page_size: int = 20,
status: Optional[str] = None,
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
db: AsyncSession = Depends(get_db),
_ = Depends(get_current_user),
):
query = select(ProductionPlan).order_by(desc(ProductionPlan.plan_date))
if status:
query = query.where(ProductionPlan.status == status)
_sd = _parse_dt(start_date)
if _sd:
query = query.where(ProductionPlan.plan_date >= _sd)
_ed = _parse_dt(end_date)
if _ed:
query = query.where(ProductionPlan.plan_date <= _ed)
total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar()
result = await db.execute(query.offset((page - 1) * page_size).limit(page_size))
items = [PlanOut.model_validate(p) for p in result.scalars()]
return Response.ok(PageResponse(total=total, page=page, page_size=page_size, items=items))
@router.get("/last-template", response_model=Response[PlanTemplate])
async def get_last_template(db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)):
"""最近一条计划的工艺字段(不含计划号/卷号/时间),用于新增时回填。"""
result = await db.execute(select(ProductionPlan).order_by(desc(ProductionPlan.created_at)).limit(1))
p = result.scalar_one_or_none()
if not p:
return Response.ok(PlanTemplate())
return Response.ok(PlanTemplate(**{k: getattr(p, k, None) for k in TEMPLATE_FIELDS}))
@router.post("/", response_model=Response[PlanOut])
async def create_plan(
body: PlanCreate,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user),
):
existing = await db.execute(select(ProductionPlan).where(ProductionPlan.plan_no == body.plan_no))
if existing.scalar_one_or_none():
raise HTTPException(status_code=400, detail="计划号已存在")
plan = ProductionPlan(**body.model_dump(), created_by=current_user.username)
db.add(plan)
await db.flush()
return Response.ok(PlanOut.model_validate(plan))
@router.get("/{plan_id}", response_model=Response[PlanOut])
async def get_plan(plan_id: int, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)):
result = await db.execute(select(ProductionPlan).where(ProductionPlan.id == plan_id))
plan = result.scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail="计划不存在")
return Response.ok(PlanOut.model_validate(plan))
@router.put("/{plan_id}", response_model=Response[PlanOut])
async def update_plan(
plan_id: int,
body: PlanUpdate,
db: AsyncSession = Depends(get_db),
_ = Depends(get_current_user),
):
result = await db.execute(select(ProductionPlan).where(ProductionPlan.id == plan_id))
plan = result.scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail="计划不存在")
for k, v in body.model_dump(exclude_none=True).items():
setattr(plan, k, v)
await db.flush()
return Response.ok(PlanOut.model_validate(plan))
@router.patch("/{plan_id}/confirm", response_model=Response[PlanOut])
async def confirm_plan(plan_id: int, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)):
result = await db.execute(select(ProductionPlan).where(ProductionPlan.id == plan_id))
plan = result.scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail="计划不存在")
plan.status = "online"
await db.flush()
return Response.ok(PlanOut.model_validate(plan))
@router.patch("/{plan_id}/start", response_model=Response[PlanOut])
async def start_producing(plan_id: int, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)):
"""移动到入口开始生产本条→producing其它 producing→online单卷在产"""
result = await db.execute(select(ProductionPlan).where(ProductionPlan.id == plan_id))
plan = result.scalar_one_or_none()
if not plan:
raise HTTPException(status_code=404, detail="计划不存在")
# 其它正在生产的全部回退为在线(强制单卷在产)
others = await db.execute(
select(ProductionPlan).where(
ProductionPlan.status == "producing",
ProductionPlan.id != plan_id,
)
)
for o in others.scalars():
o.status = "online"
plan.status = "producing"
await db.flush()
return Response.ok(PlanOut.model_validate(plan))