from fastapi import APIRouter, Depends, HTTPException, Query from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy import select, func, desc from typing import Optional from datetime import datetime from app.database import get_db from app.models.production import ProductionRecord from app.models.plan import ProductionPlan from app.schemas.production import ProductionRecordCreate, ProductionRecordUpdate, ProductionRecordOut from app.schemas.common import Response, PageResponse from app.services.auth_service import get_current_user async def _mark_plan_produced(db: AsyncSession, record: ProductionRecord): """根据实绩记录的卷号自动把对应计划标记为 produced。""" candidates = [] for v in (getattr(record, "hot_coil_no", None), record.coil_no, getattr(record, "sub_coil_no", None)): if v and v not in candidates: candidates.append(v) if not candidates: return q = select(ProductionPlan).where( (ProductionPlan.cold_coil_no.in_(candidates)) | (ProductionPlan.hot_coil_no.in_(candidates)) ) res = await db.execute(q) for plan in res.scalars(): if plan.status != "produced": plan.status = "produced" router = APIRouter() def _parse_dt(s): if not s: return None try: return datetime.fromisoformat(s.replace('Z', '')) except Exception: return None @router.get("/", response_model=Response[PageResponse[ProductionRecordOut]]) async def list_records( page: int = 1, page_size: int = 20, coil_no: Optional[str] = None, shift: Optional[str] = None, start_date: Optional[str] = Query(None), end_date: Optional[str] = Query(None), db: AsyncSession = Depends(get_db), _ = Depends(get_current_user), ): query = select(ProductionRecord).order_by(desc(ProductionRecord.start_time)) if coil_no: query = query.where(ProductionRecord.coil_no.ilike(f"%{coil_no}%")) if shift: query = query.where(ProductionRecord.shift == shift) _sd = _parse_dt(start_date) if _sd: query = query.where(ProductionRecord.start_time >= _sd) _ed = _parse_dt(end_date) if _ed: query = query.where(ProductionRecord.start_time <= _ed) total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar() result = await db.execute(query.offset((page - 1) * page_size).limit(page_size)) items = [ProductionRecordOut.model_validate(r) for r in result.scalars()] return Response.ok(PageResponse(total=total, page=page, page_size=page_size, items=items)) @router.post("/", response_model=Response[ProductionRecordOut]) async def create_record( body: ProductionRecordCreate, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user), ): record = ProductionRecord(**body.model_dump()) db.add(record) await db.flush() await _mark_plan_produced(db, record) await db.flush() return Response.ok(ProductionRecordOut.model_validate(record)) @router.get("/{record_id}", response_model=Response[ProductionRecordOut]) async def get_record(record_id: int, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)): result = await db.execute(select(ProductionRecord).where(ProductionRecord.id == record_id)) record = result.scalar_one_or_none() if not record: raise HTTPException(status_code=404, detail="实绩记录不存在") return Response.ok(ProductionRecordOut.model_validate(record)) @router.put("/{record_id}", response_model=Response[ProductionRecordOut]) async def update_record( record_id: int, body: ProductionRecordUpdate, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user), ): result = await db.execute(select(ProductionRecord).where(ProductionRecord.id == record_id)) record = result.scalar_one_or_none() if not record: raise HTTPException(status_code=404, detail="实绩记录不存在") for k, v in body.model_dump(exclude_none=True).items(): setattr(record, k, v) await db.flush() await _mark_plan_produced(db, record) await db.flush() return Response.ok(ProductionRecordOut.model_validate(record))