import asyncio import logging import os from contextlib import asynccontextmanager from typing import Optional from dotenv import load_dotenv from fastapi import FastAPI, HTTPException, Query, APIRouter import datetime from fastapi.middleware.cors import CORSMiddleware load_dotenv() from database import get_connection from models import PDIPLTMCreate, PDIPLTMUpdate, OpcConfig from opc_service import opc_service router = APIRouter() from sqlite_sync import ( init_db, sync_all_from_oracle, sqlite_upsert_pdi, sqlite_delete_pdi, sqlite_get_coil_track, sqlite_update_coil_track_item, sqlite_add_coil_track_item, sqlite_delete_coil_track_item, sqlite_clear_coil_track, sqlite_get_coils_by_sequencenb_range ) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): # Init SQLite schema init_db() # Sync Oracle -> SQLite on startup (in thread pool to avoid blocking) loop = asyncio.get_event_loop() try: result = await loop.run_in_executor(None, sync_all_from_oracle) logger.info("Startup sync: %s", result) except Exception as e: logger.warning("Startup sync failed (Oracle may be unreachable): %s", e) # Start OPC polling asyncio.create_task(opc_service.start_polling()) logger.info("OPC polling task started") yield await opc_service.stop_polling() logger.info("OPC polling task stopped") app = FastAPI(title="HEFA-L2 PDI管理系统", version="1.0.0", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # ───────────────────────────────────────────── # Sync endpoint # ───────────────────────────────────────────── @app.post("/api/sync") async def trigger_sync(): """Manually trigger a full Oracle -> SQLite sync.""" loop = asyncio.get_event_loop() try: result = await loop.run_in_executor(None, sync_all_from_oracle) return {"message": "同步成功", "rows": result} except Exception as e: raise HTTPException(status_code=500, detail=f"同步失败: {e}") # ───────────────────────────────────────────── # PDI_PLTM CRUD # ───────────────────────────────────────────── @app.get("/api/pdi") def list_pdi( page: int = Query(1, ge=1), page_size: int = Query(20, ge=1, le=200), coilid: Optional[str] = None, status: Optional[int] = None, steel_grade: Optional[str] = None, ): conn = get_connection() cursor = conn.cursor() try: conditions = [] params = {} if coilid: conditions.append("COILID LIKE :coilid") params["coilid"] = f"%{coilid}%" if status is not None: conditions.append("STATUS = :status") params["status"] = status if steel_grade: conditions.append("STEEL_GRADE LIKE :steel_grade") params["steel_grade"] = f"%{steel_grade}%" where = ("WHERE " + " AND ".join(conditions)) if conditions else "" count_sql = f"SELECT COUNT(*) FROM PLTM.PDI_PLTM {where}" cursor.execute(count_sql, params) total = cursor.fetchone()[0] offset = (page - 1) * page_size sql = f""" SELECT * FROM ( SELECT a.*, ROWNUM rn FROM ( SELECT SID, ROLLPROGRAMNB, SEQUENCENB, STATUS, SCHEDULE_CODE, COILID, ENTRY_COIL_THICKNESS, ENTRY_COIL_WIDTH, ENTRY_COIL_WEIGHT, ENTRY_OF_COIL_LENGTH, EXIT_COIL_NO, EXIT_COIL_THICKNESS, EXIT_COIL_WIDTH, EXIT_COIL_WEIGHT, WORK_ORDER_NO, ORDER_QUALITY, STEEL_GRADE, SG_SIGN, ORDER_THICKNESS, ORDER_WIDTH, COILER_DIAMETER, L2_GRADE, WEIGHT_MODE, CREATED_DT, UPDATED_DT, SEND_FLAG, ENTRY_COIL_THICKNESS_MAX, ENTRY_COIL_THICKNESS_MIN, ENTRY_COIL_WIDTH_MAX, ENTRY_COIL_WIDTH_MIN, EXIT_COIL_THICKNESS_MAX, EXIT_COIL_THICKNESS_MIN, EXIT_COIL_WIDTH_MAX, EXIT_COIL_WIDTH_MIN, CROSS_SECTION_AREA, UNCOILER_TENSION, LOOPER_TENSION_1, PL_TENSION, LOOPER_TENSION_2, LOOPER_TENSION_3, DUMMY_COIL_MRK, CUT_MODE, TRIMMING, TRIMMING_WIDTH FROM PLTM.PDI_PLTM {where} ORDER BY COILID DESC ) a WHERE ROWNUM <= :end_row ) WHERE rn > :start_row """ params["end_row"] = offset + page_size params["start_row"] = offset cursor.execute(sql, params) columns = [col[0].lower() for col in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] for row in rows: for k, v in row.items(): if hasattr(v, 'isoformat'): row[k] = v.isoformat() return {"total": total, "page": page, "page_size": page_size, "data": rows} finally: cursor.close() conn.close() @app.get("/api/pdi/{coilid}") def get_pdi(coilid: str): conn = get_connection() cursor = conn.cursor() try: cursor.execute("SELECT * FROM PLTM.PDI_PLTM WHERE COILID = :coilid", {"coilid": coilid}) columns = [col[0].lower() for col in cursor.description] row = cursor.fetchone() if not row: raise HTTPException(status_code=404, detail="记录不存在") result = dict(zip(columns, row)) for k, v in result.items(): if hasattr(v, 'isoformat'): result[k] = v.isoformat() return result finally: cursor.close() conn.close() @app.post("/api/pdi", status_code=201) def create_pdi(data: PDIPLTMCreate): conn = get_connection() cursor = conn.cursor() try: # 1. 验证钢卷号长度 if len(data.coilid) != 12: raise HTTPException(status_code=400, detail="钢卷号必须为12位") # 2. 验证批次编号 if data.rollprogramnb: # 获取数据库中最大的批次编号 cursor.execute("SELECT MAX(ROLLPROGRAMNB) FROM PLTM.PDI_PLTM") max_batch = cursor.fetchone()[0] if max_batch and data.rollprogramnb < max_batch: raise HTTPException(status_code=400, detail="批次编号不能小于已有的最大批次编号") # 3. 验证顺序号 if data.rollprogramnb and data.sequencenb: # 检查本批次内是否已存在相同的顺序号 cursor.execute(""" SELECT COUNT(*) FROM PLTM.PDI_PLTM WHERE ROLLPROGRAMNB = :batch AND SEQUENCENB = :seq """, {"batch": data.rollprogramnb, "seq": data.sequencenb}) if cursor.fetchone()[0] > 0: raise HTTPException(status_code=400, detail="本批次内顺序号不能重复") # 检查本批次内的最大顺序号 cursor.execute(""" SELECT MAX(SEQUENCENB) FROM PLTM.PDI_PLTM WHERE ROLLPROGRAMNB = :batch """, {"batch": data.rollprogramnb}) max_seq = cursor.fetchone()[0] if max_seq and data.sequencenb <= max_seq: raise HTTPException(status_code=400, detail="本批次内顺序号必须递增") fields = {k: v for k, v in data.model_dump(exclude_none=True).items()} cols = ", ".join(f.upper() for f in fields.keys()) vals = ", ".join([f":{k}" for k in fields.keys()]) sql = f"INSERT INTO PLTM.PDI_PLTM ({cols}) VALUES ({vals})" cursor.execute(sql, fields) conn.commit() # Mirror to SQLite try: sqlite_upsert_pdi(fields) except Exception as e: logger.warning("SQLite mirror failed on create: %s", e) return {"message": "创建成功", "coilid": data.coilid} except HTTPException: raise except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() @app.put("/api/pdi/{coilid}") def update_pdi(coilid: str, data: PDIPLTMUpdate): conn = get_connection() cursor = conn.cursor() try: fields = {k: v for k, v in data.model_dump(exclude_none=True).items()} if not fields: raise HTTPException(status_code=400, detail="无更新字段") # 验证批次编号 if "rollprogramnb" in fields: # 获取数据库中最大的批次编号 cursor.execute("SELECT MAX(ROLLPROGRAMNB) FROM PLTM.PDI_PLTM") max_batch = cursor.fetchone()[0] if max_batch and fields["rollprogramnb"] < max_batch: raise HTTPException(status_code=400, detail="批次编号不能小于已有的最大批次编号") # 验证顺序号 if "rollprogramnb" in fields or "sequencenb" in fields: # 获取当前记录的批次编号和顺序号 cursor.execute(""" SELECT ROLLPROGRAMNB, SEQUENCENB FROM PLTM.PDI_PLTM WHERE COILID = :coilid """, {"coilid": coilid}) current = cursor.fetchone() if not current: raise HTTPException(status_code=404, detail="记录不存在") new_batch = fields.get("rollprogramnb", current[0]) new_seq = fields.get("sequencenb", current[1]) # 检查本批次内是否已存在相同的顺序号(排除当前记录) cursor.execute(""" SELECT COUNT(*) FROM PLTM.PDI_PLTM WHERE ROLLPROGRAMNB = :batch AND SEQUENCENB = :seq AND COILID != :coilid """, {"batch": new_batch, "seq": new_seq, "coilid": coilid}) if cursor.fetchone()[0] > 0: raise HTTPException(status_code=400, detail="本批次内顺序号不能重复") # 检查本批次内的最大顺序号 cursor.execute(""" SELECT MAX(SEQUENCENB) FROM PLTM.PDI_PLTM WHERE ROLLPROGRAMNB = :batch AND COILID != :coilid """, {"batch": new_batch, "coilid": coilid}) max_seq = cursor.fetchone()[0] if max_seq and new_seq <= max_seq: raise HTTPException(status_code=400, detail="本批次内顺序号必须递增") set_clause = ", ".join([f"{k.upper()} = :{k}" for k in fields.keys()]) fields["coilid_"] = coilid sql = f"UPDATE PLTM.PDI_PLTM SET {set_clause} WHERE COILID = :coilid_" cursor.execute(sql, fields) if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="记录不存在") conn.commit() # Mirror to SQLite: re-fetch the updated row try: cursor2 = conn.cursor() cursor2.execute("SELECT * FROM PDI_PLTM WHERE COILID = :c", {"c": coilid}) cols = [d[0].lower() for d in cursor2.description] row = cursor2.fetchone() cursor2.close() if row: row_dict = dict(zip(cols, row)) for k, v in row_dict.items(): if hasattr(v, 'isoformat'): row_dict[k] = v.isoformat() sqlite_upsert_pdi(row_dict) except Exception as e: logger.warning("SQLite mirror failed on update: %s", e) return {"message": "更新成功"} except HTTPException: raise except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() @app.delete("/api/pdi/{coilid}") def delete_pdi(coilid: str): conn = get_connection() cursor = conn.cursor() try: cursor.execute("DELETE FROM PLTM.PDI_PLTM WHERE COILID = :coilid", {"coilid": coilid}) if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="记录不存在") conn.commit() # Mirror to SQLite try: sqlite_delete_pdi(coilid) except Exception as e: logger.warning("SQLite mirror failed on delete: %s", e) return {"message": "删除成功"} except HTTPException: raise except Exception as e: conn.rollback() raise HTTPException(status_code=400, detail=str(e)) finally: cursor.close() conn.close() # ───────────────────────────────────────────── # CMPT_PL_TRACKMAP # ───────────────────────────────────────────── @app.get("/api/trackmap") def list_trackmap(): conn = get_connection() cursor = conn.cursor() try: cursor.execute( "SELECT POSITION, COILID, BEF_ES, ES, ENT_LOO, PL, INT_LOO, " "ST, EXI_LOO, RUN_SPEED_MIN, RUN_SPEED_MAX, " "WELD_SPEED_MIN, WELD_SPEED_MAX, TOC, TOM, MOP " "FROM PLTM.CMPT_PL_TRACKMAP ORDER BY POSITION" ) columns = [col[0].lower() for col in cursor.description] rows = [dict(zip(columns, row)) for row in cursor.fetchall()] for row in rows: for k, v in row.items(): if hasattr(v, 'isoformat'): row[k] = v.isoformat() return rows finally: cursor.close() conn.close() # ───────────────────────────────────────────── # OPC Configuration & Status # ───────────────────────────────────────────── @app.get("/api/opc/config") def get_opc_config(): return { "opc_url": opc_service.opc_url, "counter_node": opc_service.counter_node, "trackmap_nodes": opc_service.trackmap_nodes, "poll_interval": opc_service.poll_interval, "signal1_node": opc_service.signal1_node, "signal2_node": opc_service.signal2_node, "running": opc_service.running, "last_counter": opc_service.last_counter, "last_update": opc_service.last_update, } @app.post("/api/opc/config") async def save_opc_config(config: OpcConfig): await opc_service.stop_polling() opc_service.opc_url = config.opc_url opc_service.counter_node = config.counter_node opc_service.trackmap_nodes = config.trackmap_nodes opc_service.poll_interval = config.poll_interval opc_service.signal1_node = config.signal1_node opc_service.signal2_node = config.signal2_node try: opc_service.save_config() except Exception as e: logger.warning("Persist OPC config failed: %s", e) raise HTTPException(status_code=500, detail=f"配置保存失败: {e}") asyncio.create_task(opc_service.start_polling()) return {"message": "OPC配置已保存并重启轮询"} @app.get("/api/opc/status") def opc_status(): return { "running": opc_service.running, "last_counter": opc_service.last_counter, "last_update": opc_service.last_update, "log": opc_service.event_log[-50:], "track_state": opc_service.track_state, } @app.post("/api/opc/restart") async def restart_opc(): await opc_service.stop_polling() asyncio.create_task(opc_service.start_polling()) return {"message": "OPC服务已重启"} # ───────────────────────────────────────────── # 钢种查询接口 (为酸洗提供选择) # ───────────────────────────────────────────── @app.get("/api/grades/entry") def get_entry_grades(): """获取来料钢种列表 - 来自 PL_BM_WELD_MACHINE.ALLOY_ID""" conn = get_connection() cursor = conn.cursor() try: cursor.execute(""" SELECT DISTINCT ALLOY_ID FROM PL_BM_WELD_MACHINE WHERE ALLOY_ID IS NOT NULL AND TRIM(ALLOY_ID) <> '' ORDER BY ALLOY_ID """) grades = [row[0] for row in cursor.fetchall()] return grades except Exception as e: logger.warning("获取来料钢种失败: %s", e) return [] finally: cursor.close() conn.close() @app.get("/api/grades/product") def get_product_grades(): """获取成品钢种列表 - 来自 L3_GRADE.L3_GRADE""" conn = get_connection() cursor = conn.cursor() try: cursor.execute(""" SELECT DISTINCT L3_GRADE FROM L3_GRADE WHERE L3_GRADE IS NOT NULL AND TRIM(L3_GRADE) <> '' ORDER BY L3_GRADE """) grades = [row[0] for row in cursor.fetchall()] return grades except Exception as e: logger.warning("获取成品钢种失败: %s", e) return [] finally: cursor.close() conn.close() @app.get("/api/grades/l2model") def get_l2_model_grades(): """获取二级模型钢种列表 - 来自 L3_GRADE.L2_GRADE""" conn = get_connection() cursor = conn.cursor() try: cursor.execute(""" SELECT DISTINCT L2_GRADE FROM L3_GRADE WHERE L2_GRADE IS NOT NULL AND TRIM(L2_GRADE) <> '' ORDER BY L2_GRADE """) grades = [row[0] for row in cursor.fetchall()] return grades except Exception as e: logger.warning("获取二级模型钢种失败: %s", e) return [] finally: cursor.close() conn.close() @app.get("/api/pdi/next-numbers") def get_next_numbers(): """ 获取下一个可用的批次编号和顺序号 - 批次编号格式: 年月日(6位) + 批次顺序(3位),如 260406001 - 顺序号: 当前批次内的最大顺序号+1 """ try: today = datetime.datetime.now().strftime("%y%m%d") today_prefix = int(today) * 1000 # 如 260406000 # 获取连接 conn = get_connection() # 开启事务 conn.autocommit = False cursor = conn.cursor() # ============================================== # 1. 查询今天已有的最大批次号 # ============================================== cursor.execute(""" SELECT MAX(ROLLPROGRAMNB) FROM PLTM.PDI_PLTM WHERE ROLLPROGRAMNB BETWEEN :min_val AND :max_val """, {"min_val": today_prefix, "max_val": today_prefix + 999}) row = cursor.fetchone() max_today_batch = row[0] if row[0] is not None else today_prefix # ============================================== # 3. 查询【当前最大批次】的顺序号 # ============================================== cursor.execute(""" SELECT NVL(MAX(SEQUENCENB), 0) FROM PLTM.PDI_PLTM WHERE ROLLPROGRAMNB = :batch """, {"batch": max_today_batch}) max_sequence = cursor.fetchone()[0] next_sequence = max_sequence + 1 # 提交事务 conn.commit() return { "rollprogramnb": max_today_batch, "sequencenb": next_sequence, "batch_date": today, "max_batch_today": max_today_batch if max_today_batch > today_prefix else None } except Exception as e: # 回滚 if 'conn' in locals(): conn.rollback() logger.warning("获取下一编号失败: %s", str(e)) return {"rollprogramnb": None, "sequencenb": None} finally: # 安全关闭 if 'cursor' in locals(): cursor.close() if 'conn' in locals(): conn.close() # ───────────────────────────────────────────── # COIL_TRACK_TEMP 临时跟踪表 CRUD # ───────────────────────────────────────────── @app.get("/api/track/coils") def get_track_coils(): """获取临时跟踪表中的钢卷列表""" coils = sqlite_get_coil_track() return {"data": coils} @app.post("/api/track/coils") def add_track_coil(data: dict): """新增一个钢卷到临时跟踪表""" coilid = data.get("coilid") sequencenb = data.get("sequencenb") rollprogramnb = data.get("rollprogramnb") if not coilid: raise HTTPException(status_code=400, detail="coilid不能为空") sqlite_add_coil_track_item(coilid, sequencenb or 0, rollprogramnb or 0) return {"message": "添加成功"} @app.put("/api/track/coils/{id}") def update_track_coil(id: int, data: dict): """更新临时跟踪表中的钢卷""" coilid = data.get("coilid") sequencenb = data.get("sequencenb", 0) rollprogramnb = data.get("rollprogramnb", 0) position = data.get("position", 1) if not coilid: raise HTTPException(status_code=400, detail="coilid不能为空") sqlite_update_coil_track_item(id, coilid, sequencenb, rollprogramnb, position) return {"message": "更新成功"} @app.delete("/api/track/coils/{id}") def delete_track_coil(id: int): """删除临时跟踪表中的钢卷""" sqlite_delete_coil_track_item(id) return {"message": "删除成功"} @app.delete("/api/track/coils") def clear_track_coils(): """清空临时跟踪表""" sqlite_clear_coil_track() return {"message": "清空成功"} @app.get("/api/track/coils/range") def get_coils_by_range(start: int = Query(1), end: int = Query(5)): """根据顺序号范围查询钢卷""" coils = sqlite_get_coils_by_sequencenb_range(start, end) return {"data": coils} # ───────────────────────────────────────────── # OPC Signal Configuration # ───────────────────────────────────────────── @app.get("/api/opc/signals") def get_signal_config(): """获取信号节点配置""" return { "signal1_node": opc_service.signal1_node, "signal2_node": opc_service.signal2_node, } @app.post("/api/opc/signals") async def save_signal_config(data: dict): """保存信号节点配置""" opc_service.signal1_node = data.get("signal1_node", opc_service.signal1_node) opc_service.signal2_node = data.get("signal2_node", opc_service.signal2_node) try: opc_service.save_config() except Exception as e: logger.warning("Save signal config failed: %s", e) raise HTTPException(status_code=500, detail=f"配置保存失败: {e}") return {"message": "信号节点配置已保存"} # ───────────────────────────────────────────── # 模拟信号接口 (用于测试) # ───────────────────────────────────────────── @app.post("/api/track/simulate/signal1") async def simulate_signal1(): """模拟信号1触发 - 获取下5个钢卷""" await opc_service._handle_signal1() return {"message": "信号1已触发"} @app.post("/api/track/simulate/signal2") async def simulate_signal2(): """模拟信号2触发 - 更新追踪表""" await opc_service._handle_signal2() return {"message": "信号2已触发"}