import asyncio import logging import os from contextlib import asynccontextmanager from typing import Optional from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware load_dotenv() from database import get_connection from models import PDIPLTMCreate, PDIPLTMUpdate, OpcConfig from opc_service import opc_service from sqlite_sync import ( init_db, sync_all_from_oracle, sqlite_upsert_pdi, sqlite_delete_pdi ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): # Init SQLite schema init_db() # Sync Oracle -> SQLite on startup (in thread pool to avoid blocking) loop = asyncio.get_event_loop() try: result = await loop.run_in_executor(None, sync_all_from_oracle) logger.info("Startup sync: %s", result) except Exception as e: logger.warning("Startup sync failed (Oracle may be unreachable): %s", e) # Start OPC polling asyncio.create_task(opc_service.start_polling()) logger.info("OPC polling task started") yield await opc_service.stop_polling() logger.info("OPC polling task stopped") app = FastAPI(title="HEFA-L2 PDI管理系统", version="1.0.0", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ───────────────────────────────────────────── # Sync endpoint # ───────────────────────────────────────────── @app.post("/api/sync") async def trigger_sync(): """Manually trigger a full Oracle -> SQLite sync.""" loop = asyncio.get_event_loop() try: result = await loop.run_in_executor(None, sync_all_from_oracle) return {"message": "同步成功", "rows": result} except Exception as e: raise HTTPException(status_code=500, detail=f"同步失败: {e}") # ───────────────────────────────────────────── # PDI_PLTM CRUD # ───────────────────────────────────────────── @app.get("/api/pdi") def list_pdi( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=200), coilid: Optional[str] = None, status: Optional[int] = None, steel_grade: Optional[str] = None, ): conn = get_connection() cursor = conn.cursor() try: conditions = [] params = {} if coilid: conditions.append("COILID LIKE :coilid") params["coilid"] = f"%{coilid}%" if status is not None: conditions.append("STATUS = :status") params["status"] = status if steel_grade: conditions.append("STEEL_GRADE LIKE :steel_grade") params["steel_grade"] = f"%{steel_grade}%" where = ("WHERE " + " AND ".join(conditions)) if conditions else "" count_sql = f"SELECT COUNT(*) FROM PDI_PLTM {where}" cursor.execute(count_sql, params) total = cursor.fetchone()[0] offset = (page - 1) * page_size sql = f""" SELECT * FROM ( SELECT a.*, ROWNUM rn FROM ( SELECT SID, ROLLPROGRAMNB, SEQUENCENB, STATUS, SCHEDULE_CODE, COILID, ENTRY_COIL_THICKNESS, ENTRY_COIL_WIDTH, ENTRY_COIL_WEIGHT, ENTRY_OF_COIL_LENGTH, EXIT_COIL_NO, EXIT_COIL_THICKNESS, EXIT_COIL_WIDTH, EXIT_COIL_WEIGHT, WORK_ORDER_NO, ORDER_QUALITY, STEEL_GRADE, SG_SIGN, ORDER_THICKNESS, ORDER_WIDTH, COILER_DIAMETER, L2_GRADE, WEIGHT_MODE, CREATED_DT, UPDATED_DT, SEND_FLAG, ENTRY_COIL_THICKNESS_MAX, ENTRY_COIL_THICKNESS_MIN, ENTRY_COIL_WIDTH_MAX, ENTRY_COIL_WIDTH_MIN, EXIT_COIL_THICKNESS_MAX, EXIT_COIL_THICKNESS_MIN, EXIT_COIL_WIDTH_MAX, EXIT_COIL_WIDTH_MIN, CROSS_SECTION_AREA, UNCOILER_TENSION, LOOPER_TENSION_1, PL_TENSION, LOOPER_TENSION_2, LOOPER_TENSION_3, DUMMY_COIL_MRK, CUT_MODE, TRIMMING, TRIMMING_WIDTH FROM PDI_PLTM {where} ORDER BY COILID DESC ) a WHERE ROWNUM <= :end_row ) WHERE rn > :start_row """ params["end_row"] = offset + page_size params["start_row"] = offset cursor.execute(sql, params) columns = [col[0].lower() for col in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] for row in rows: for k, v in row.items(): if hasattr(v, 'isoformat'): row[k] = v.isoformat() return {"total": total, "page": page, "page_size": page_size, "data": rows} finally: cursor.close() conn.close() @app.get("/api/pdi/{coilid}") def get_pdi(coilid: str): conn = get_connection() cursor = conn.cursor() try: cursor.execute("SELECT * FROM PDI_PLTM WHERE COILID = :coilid", {"coilid": coilid}) columns = [col[0].lower() for col in cursor.description] row = cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="记录不存在") result = dict(zip(columns, row)) for k, v in result.items(): if hasattr(v, 'isoformat'): result[k] = v.isoformat() return result finally: cursor.close() conn.close() @app.post("/api/pdi", status_code=201) def create_pdi(data: PDIPLTMCreate): conn = get_connection() cursor = conn.cursor() try: fields = {k: v for k, v in data.model_dump(exclude_none=True).items()} cols = ", ".join(f.upper() for f in fields.keys()) vals = ", ".join([f":{k}" for k in fields.keys()]) sql = f"INSERT INTO PDI_PLTM ({cols}) VALUES ({vals})" cursor.execute(sql, fields) conn.commit() # Mirror to SQLite try: sqlite_upsert_pdi(fields) except Exception as e: logger.warning("SQLite mirror failed on create: %s", e) return {"message": "创建成功", "coilid": data.coilid} except HTTPException: raise except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() @app.put("/api/pdi/{coilid}") def update_pdi(coilid: str, data: PDIPLTMUpdate): conn = get_connection() cursor = conn.cursor() try: fields = {k: v for k, v in data.model_dump(exclude_none=True).items()} if not fields: raise HTTPException(status_code=400, detail="无更新字段") set_clause = ", ".join([f"{k.upper()} = :{k}" for k in fields.keys()]) fields["coilid_"] = coilid sql = f"UPDATE PDI_PLTM SET {set_clause} WHERE COILID = :coilid_" cursor.execute(sql, fields) if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="记录不存在") conn.commit() # Mirror to SQLite: re-fetch the updated row try: cursor2 = conn.cursor() cursor2.execute("SELECT * FROM PDI_PLTM WHERE COILID = :c", {"c": coilid}) cols = [d[0].lower() for d in cursor2.description] row = cursor2.fetchone() cursor2.close() if row: row_dict = dict(zip(cols, row)) for k, v in row_dict.items(): if hasattr(v, 'isoformat'): row_dict[k] = v.isoformat() sqlite_upsert_pdi(row_dict) except Exception as e: logger.warning("SQLite mirror failed on update: %s", e) return {"message": "更新成功"} except HTTPException: raise except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() @app.delete("/api/pdi/{coilid}") def delete_pdi(coilid: str): conn = get_connection() cursor = conn.cursor() try: cursor.execute("DELETE FROM PDI_PLTM WHERE COILID = :coilid", {"coilid": coilid}) if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="记录不存在") conn.commit() # Mirror to SQLite try: sqlite_delete_pdi(coilid) except Exception as e: logger.warning("SQLite mirror failed on delete: %s", e) return {"message": "删除成功"} except HTTPException: raise except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() # ───────────────────────────────────────────── # CMPT_PL_TRACKMAP # ───────────────────────────────────────────── @app.get("/api/trackmap") def list_trackmap(): conn = get_connection() cursor = conn.cursor() try: cursor.execute( "SELECT POSITION, COILID, BEF_ES, ES, ENT_LOO, PL, INT_LOO, " "ST, EXI_LOO, RUN_SPEED_MIN, RUN_SPEED_MAX, " "WELD_SPEED_MIN, WELD_SPEED_MAX, TOC, TOM, MOP " "FROM PLTM.CMPT_PL_TRACKMAP ORDER BY POSITION" ) columns = [col[0].lower() for col in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] for row in rows: for k, v in row.items(): if hasattr(v, 'isoformat'): row[k] = v.isoformat() return rows finally: cursor.close() conn.close() # ───────────────────────────────────────────── # OPC Configuration & Status # ───────────────────────────────────────────── @app.get("/api/opc/config") def get_opc_config(): return { "opc_url": opc_service.opc_url, "counter_node": opc_service.counter_node, "trackmap_nodes": opc_service.trackmap_nodes, "poll_interval": opc_service.poll_interval, "running": opc_service.running, "last_counter": opc_service.last_counter, "last_update": opc_service.last_update, } @app.post("/api/opc/config") async def save_opc_config(config: OpcConfig): await opc_service.stop_polling() opc_service.opc_url = config.opc_url opc_service.counter_node = config.counter_node opc_service.trackmap_nodes = config.trackmap_nodes opc_service.poll_interval = config.poll_interval try: opc_service.save_config() except Exception as e: logger.warning("Persist OPC config failed: %s", e) raise HTTPException(status_code=500, detail=f"配置保存失败: {e}") asyncio.create_task(opc_service.start_polling()) return {"message": "OPC配置已保存并重启轮询"} @app.get("/api/opc/status") def opc_status(): return { "running": opc_service.running, "last_counter": opc_service.last_counter, "last_update": opc_service.last_update, "log": opc_service.event_log[-50:], } @app.post("/api/opc/restart") async def restart_opc(): await opc_service.stop_polling() asyncio.create_task(opc_service.start_polling()) return {"message": "OPC服务已重启"}