Files
pickling-mes/backend/app/api/quality.py

217 lines
9.8 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, desc
from typing import Optional
from datetime import datetime
from app.database import get_db
from app.models.quality import QcTask, QcTaskItem, QcDefect
from app.schemas.quality import (
QcTaskCreate, QcTaskUpdate, QcTaskOut,
QcTaskItemCreate, QcTaskItemUpdate, QcTaskItemOut,
QcDefectCreate, QcDefectUpdate, QcDefectOut, QcDefectBulkSave,
)
from app.schemas.common import Response, PageResponse
from app.services.auth_service import get_current_user
router = APIRouter()
# ─── 检验任务 ───────────────────────────────────────────────────────────────
@router.get("/tasks", response_model=Response[PageResponse[QcTaskOut]])
async def list_tasks(
page: int = 1,
page_size: int = 20,
task_code: Optional[str] = None,
coil_no: Optional[str] = None,
status: Optional[int] = None,
result: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
db: AsyncSession = Depends(get_db),
_=Depends(get_current_user),
):
query = select(QcTask).where(QcTask.del_flag == 0).order_by(desc(QcTask.created_at))
if task_code:
query = query.where(QcTask.task_code.ilike(f"%{task_code}%"))
if coil_no:
query = query.where(QcTask.coil_no.ilike(f"%{coil_no}%"))
if status is not None:
query = query.where(QcTask.status == status)
if result:
query = query.where(QcTask.result == result)
if start_date:
query = query.where(QcTask.created_at >= datetime.fromisoformat(start_date))
if end_date:
query = query.where(QcTask.created_at <= datetime.fromisoformat(end_date + "T23:59:59"))
total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar()
result_rows = await db.execute(query.offset((page - 1) * page_size).limit(page_size))
items = [QcTaskOut.model_validate(r) for r in result_rows.scalars()]
return Response.ok(PageResponse(total=total, page=page, page_size=page_size, items=items))
@router.post("/tasks", response_model=Response[QcTaskOut])
async def create_task(body: QcTaskCreate, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
task = QcTask(**body.model_dump())
db.add(task)
await db.flush()
return Response.ok(QcTaskOut.model_validate(task))
@router.get("/tasks/{task_id}", response_model=Response[QcTaskOut])
async def get_task(task_id: int, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
r = await db.execute(select(QcTask).where(QcTask.id == task_id, QcTask.del_flag == 0))
task = r.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
return Response.ok(QcTaskOut.model_validate(task))
@router.put("/tasks/{task_id}", response_model=Response[QcTaskOut])
async def update_task(task_id: int, body: QcTaskUpdate, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
r = await db.execute(select(QcTask).where(QcTask.id == task_id, QcTask.del_flag == 0))
task = r.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
for k, v in body.model_dump(exclude_none=True).items():
setattr(task, k, v)
await db.flush()
return Response.ok(QcTaskOut.model_validate(task))
@router.delete("/tasks/{task_id}", response_model=Response[dict])
async def delete_task(task_id: int, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
r = await db.execute(select(QcTask).where(QcTask.id == task_id))
task = r.scalar_one_or_none()
if not task:
raise HTTPException(status_code=404, detail="任务不存在")
task.del_flag = 1
await db.flush()
return Response.ok({})
# ─── 检验项目 ────────────────────────────────────────────────────────────────
@router.get("/tasks/{task_id}/items", response_model=Response[list[QcTaskItemOut]])
async def list_task_items(task_id: int, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
r = await db.execute(select(QcTaskItem).where(QcTaskItem.task_id == task_id).order_by(QcTaskItem.id))
return Response.ok([QcTaskItemOut.model_validate(i) for i in r.scalars()])
@router.post("/task-items", response_model=Response[QcTaskItemOut])
async def create_task_item(body: QcTaskItemCreate, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
item = QcTaskItem(**body.model_dump())
db.add(item)
await db.flush()
return Response.ok(QcTaskItemOut.model_validate(item))
@router.put("/task-items/{item_id}", response_model=Response[QcTaskItemOut])
async def update_task_item(item_id: int, body: QcTaskItemUpdate, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
r = await db.execute(select(QcTaskItem).where(QcTaskItem.id == item_id))
item = r.scalar_one_or_none()
if not item:
raise HTTPException(status_code=404, detail="检验项不存在")
for k, v in body.model_dump(exclude_none=True).items():
setattr(item, k, v)
await db.flush()
return Response.ok(QcTaskItemOut.model_validate(item))
# ─── 缺陷记录 ────────────────────────────────────────────────────────────────
@router.get("/defects", response_model=Response[PageResponse[QcDefectOut]])
async def list_defects(
page: int = 1,
page_size: int = 20,
coil_no: Optional[str] = None,
defect_type: Optional[str] = None,
degree: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
db: AsyncSession = Depends(get_db),
_=Depends(get_current_user),
):
query = select(QcDefect).where(QcDefect.del_flag == 0).order_by(desc(QcDefect.created_at))
if coil_no:
query = query.where(QcDefect.coil_no.ilike(f"%{coil_no}%"))
if defect_type:
query = query.where(QcDefect.defect_type.ilike(f"%{defect_type}%"))
if degree:
query = query.where(QcDefect.degree == degree)
if start_date:
query = query.where(QcDefect.created_at >= datetime.fromisoformat(start_date))
if end_date:
query = query.where(QcDefect.created_at <= datetime.fromisoformat(end_date + "T23:59:59"))
total = (await db.execute(select(func.count()).select_from(query.subquery()))).scalar()
rows = await db.execute(query.offset((page - 1) * page_size).limit(page_size))
items = [QcDefectOut.model_validate(r) for r in rows.scalars()]
return Response.ok(PageResponse(total=total, page=page, page_size=page_size, items=items))
@router.post("/defects", response_model=Response[QcDefectOut])
async def create_defect(body: QcDefectCreate, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
defect = QcDefect(**body.model_dump())
db.add(defect)
await db.flush()
return Response.ok(QcDefectOut.model_validate(defect))
@router.get("/defects/by-coil/{coil_no}", response_model=Response[list[QcDefectOut]])
async def list_defects_by_coil(coil_no: str, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
r = await db.execute(
select(QcDefect)
.where(QcDefect.coil_no == coil_no, QcDefect.del_flag == 0)
.order_by(QcDefect.seq_no.asc().nulls_last(), QcDefect.id.asc())
)
return Response.ok([QcDefectOut.model_validate(d) for d in r.scalars()])
@router.post("/defects/bulk-save", response_model=Response[list[QcDefectOut]])
async def bulk_save_defects(body: QcDefectBulkSave, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
"""按卷号替换全部缺陷记录(软删旧记录后批量插入)"""
coil_no = body.coil_no
# 软删旧记录
old = await db.execute(select(QcDefect).where(QcDefect.coil_no == coil_no, QcDefect.del_flag == 0))
for d in old.scalars():
d.del_flag = 1
# 插入新记录
saved: list[QcDefect] = []
for idx, item in enumerate(body.defects, start=1):
data = item.model_dump()
data["coil_no"] = coil_no
if not data.get("seq_no"):
data["seq_no"] = idx
# 自动算长度
if data.get("start_position") is not None and data.get("end_position") is not None and not data.get("length_val"):
data["length_val"] = round(float(data["end_position"]) - float(data["start_position"]), 3)
d = QcDefect(**data)
db.add(d)
saved.append(d)
await db.flush()
return Response.ok([QcDefectOut.model_validate(d) for d in saved])
@router.put("/defects/{defect_id}", response_model=Response[QcDefectOut])
async def update_defect(defect_id: int, body: QcDefectUpdate, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
r = await db.execute(select(QcDefect).where(QcDefect.id == defect_id, QcDefect.del_flag == 0))
defect = r.scalar_one_or_none()
if not defect:
raise HTTPException(status_code=404, detail="缺陷记录不存在")
for k, v in body.model_dump(exclude_none=True).items():
setattr(defect, k, v)
await db.flush()
return Response.ok(QcDefectOut.model_validate(defect))
@router.delete("/defects/{defect_id}", response_model=Response[dict])
async def delete_defect(defect_id: int, db: AsyncSession = Depends(get_db), _=Depends(get_current_user)):
r = await db.execute(select(QcDefect).where(QcDefect.id == defect_id))
defect = r.scalar_one_or_none()
if not defect:
raise HTTPException(status_code=404, detail="缺陷记录不存在")
defect.del_flag = 1
await db.flush()
return Response.ok({})