Files
pickling-mes/backend/app/api/cost.py

113 lines
4.0 KiB
Python
Raw Normal View History

from fastapi import APIRouter, Depends, HTTPException, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, asc
from typing import Optional
from datetime import datetime
from app.database import get_db
from app.models.cost import CostRecord
from app.schemas.cost import CostItem, CostCreate, CostUpdate, CostOut
from app.schemas.common import Response
from app.services.auth_service import get_current_user
router = APIRouter()
# 预定义成本/消耗项
COST_ITEMS = [
{"item": "emulsion", "item_name": "乳化液", "unit": "L", "unit_cost_label": "L/t"},
{"item": "acid", "item_name": "盐酸", "unit": "L", "unit_cost_label": "L/t"},
{"item": "alkali", "item_name": "脱脂碱液", "unit": "kg", "unit_cost_label": "kg/t"},
{"item": "power", "item_name": "电耗", "unit": "kWh", "unit_cost_label": "kWh/t"},
{"item": "water", "item_name": "新水", "unit": "", "unit_cost_label": "m³/t"},
{"item": "steam", "item_name": "蒸汽", "unit": "t", "unit_cost_label": "kg/t"},
{"item": "antirust_oil","item_name": "防锈油", "unit": "L", "unit_cost_label": "L/t"},
{"item": "inhibitor", "item_name": "缓蚀剂", "unit": "kg", "unit_cost_label": "g/t"},
{"item": "roll", "item_name": "辊耗", "unit": "", "unit_cost_label": "支/kt"},
]
_ITEM_MAP = {x["item"]: x for x in COST_ITEMS}
def _parse_dt(s):
if not s:
return None
try:
return datetime.fromisoformat(s.replace('Z', ''))
except Exception:
return None
@router.get("/items", response_model=Response[list[CostItem]])
async def list_items(_ = Depends(get_current_user)):
return Response.ok([CostItem(**x) for x in COST_ITEMS])
@router.get("/", response_model=Response[list[CostOut]])
async def list_records(
item: Optional[str] = None,
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
db: AsyncSession = Depends(get_db),
_ = Depends(get_current_user),
):
query = select(CostRecord).order_by(asc(CostRecord.record_date))
if item:
query = query.where(CostRecord.item == item)
_sd = _parse_dt(start_date)
if _sd:
query = query.where(CostRecord.record_date >= _sd)
_ed = _parse_dt(end_date)
if _ed:
query = query.where(CostRecord.record_date <= _ed)
result = await db.execute(query)
return Response.ok([CostOut.model_validate(r) for r in result.scalars()])
@router.post("/", response_model=Response[CostOut])
async def create_record(
body: CostCreate,
db: AsyncSession = Depends(get_db),
current_user = Depends(get_current_user),
):
meta = _ITEM_MAP.get(body.item)
if not meta:
raise HTTPException(status_code=400, detail="未知成本项")
rec = CostRecord(
item=body.item,
item_name=meta["item_name"],
unit=meta["unit"],
record_date=body.record_date,
shift_a=body.shift_a or 0,
shift_b=body.shift_b or 0,
unit_cost=body.unit_cost or 0,
remark=body.remark,
created_by=current_user.username,
)
db.add(rec)
await db.flush()
return Response.ok(CostOut.model_validate(rec))
@router.put("/{rec_id}", response_model=Response[CostOut])
async def update_record(
rec_id: int,
body: CostUpdate,
db: AsyncSession = Depends(get_db),
_ = Depends(get_current_user),
):
rec = await db.get(CostRecord, rec_id)
if not rec:
raise HTTPException(status_code=404, detail="记录不存在")
for k, v in body.model_dump(exclude_none=True).items():
setattr(rec, k, v)
await db.flush()
return Response.ok(CostOut.model_validate(rec))
@router.delete("/{rec_id}", response_model=Response[dict])
async def delete_record(rec_id: int, db: AsyncSession = Depends(get_db), _ = Depends(get_current_user)):
rec = await db.get(CostRecord, rec_id)
if not rec:
raise HTTPException(status_code=404, detail="记录不存在")
await db.delete(rec)
return Response.ok({"deleted": rec_id})