Files
tiandihe/backend/main.py
Joshi 3ef502d737 feat(追踪系统): 添加手动设置起始钢卷功能
- 后端添加设置起始钢卷API和钢卷查询API
- 前端实现起始钢卷选择界面和状态管理
- 优化追踪逻辑自动加载后续钢卷
- 添加本地存储保持起始钢卷设置
2026-04-30 11:45:42 +08:00

791 lines
29 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
import logging
import os
from contextlib import asynccontextmanager
from typing import Optional
from dotenv import load_dotenv
from fastapi import FastAPI, HTTPException, Query, APIRouter
import datetime
from fastapi.middleware.cors import CORSMiddleware
load_dotenv()
from database import get_connection
from models import PDIPLTMCreate, PDIPLTMUpdate, OpcConfig
from opc_service import opc_service
router = APIRouter()
from sqlite_sync import (
init_db, sync_all_from_oracle,
sqlite_upsert_pdi, sqlite_delete_pdi,
sqlite_get_coil_track, sqlite_update_coil_track_item,
sqlite_add_coil_track_item, sqlite_delete_coil_track_item,
sqlite_clear_coil_track, sqlite_get_coils_by_sequencenb_range
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
# Init SQLite schema
init_db()
# Sync Oracle -> SQLite on startup (in thread pool to avoid blocking)
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(None, sync_all_from_oracle)
logger.info("Startup sync: %s", result)
except Exception as e:
logger.warning("Startup sync failed (Oracle may be unreachable): %s", e)
# Start OPC polling
asyncio.create_task(opc_service.start_polling())
logger.info("OPC polling task started")
yield
await opc_service.stop_polling()
logger.info("OPC polling task stopped")
app = FastAPI(title="HEFA-L2 PDI管理系统", version="1.0.0", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ─────────────────────────────────────────────
# Sync endpoint
# ─────────────────────────────────────────────
@app.post("/api/sync")
async def trigger_sync():
"""Manually trigger a full Oracle -> SQLite sync."""
loop = asyncio.get_event_loop()
try:
result = await loop.run_in_executor(None, sync_all_from_oracle)
return {"message": "同步成功", "rows": result}
except Exception as e:
raise HTTPException(status_code=500, detail=f"同步失败: {e}")
# ─────────────────────────────────────────────
# PDI_PLTM CRUD
# ─────────────────────────────────────────────
@app.get("/api/pdi")
def list_pdi(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=200),
coilid: Optional[str] = None,
status: Optional[int] = None,
steel_grade: Optional[str] = None,
):
conn = get_connection()
cursor = conn.cursor()
try:
conditions = []
params = {}
if coilid:
conditions.append("COILID LIKE :coilid")
params["coilid"] = f"%{coilid}%"
if status is not None:
conditions.append("STATUS = :status")
params["status"] = status
if steel_grade:
conditions.append("STEEL_GRADE LIKE :steel_grade")
params["steel_grade"] = f"%{steel_grade}%"
where = ("WHERE " + " AND ".join(conditions)) if conditions else ""
count_sql = f"SELECT COUNT(*) FROM PLTM.PDI_PLTM {where}"
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
offset = (page - 1) * page_size
sql = f"""
SELECT * FROM (
SELECT a.*, ROWNUM rn FROM (
SELECT SID, ROLLPROGRAMNB, SEQUENCENB, STATUS, SCHEDULE_CODE,
COILID, ENTRY_COIL_THICKNESS, ENTRY_COIL_WIDTH,
ENTRY_COIL_WEIGHT, ENTRY_OF_COIL_LENGTH,
EXIT_COIL_NO, EXIT_COIL_THICKNESS, EXIT_COIL_WIDTH,
EXIT_COIL_WEIGHT, WORK_ORDER_NO, ORDER_QUALITY,
STEEL_GRADE, SG_SIGN, ORDER_THICKNESS, ORDER_WIDTH,
COILER_DIAMETER, L2_GRADE, WEIGHT_MODE,
CREATED_DT, UPDATED_DT, SEND_FLAG,
ENTRY_COIL_THICKNESS_MAX, ENTRY_COIL_THICKNESS_MIN,
ENTRY_COIL_WIDTH_MAX, ENTRY_COIL_WIDTH_MIN,
EXIT_COIL_THICKNESS_MAX, EXIT_COIL_THICKNESS_MIN,
EXIT_COIL_WIDTH_MAX, EXIT_COIL_WIDTH_MIN,
CROSS_SECTION_AREA, UNCOILER_TENSION,
LOOPER_TENSION_1, PL_TENSION,
LOOPER_TENSION_2, LOOPER_TENSION_3,
DUMMY_COIL_MRK, CUT_MODE, TRIMMING, TRIMMING_WIDTH
FROM PLTM.PDI_PLTM {where}
ORDER BY COILID DESC
) a WHERE ROWNUM <= :end_row
) WHERE rn > :start_row
"""
params["end_row"] = offset + page_size
params["start_row"] = offset
cursor.execute(sql, params)
columns = [col[0].lower() for col in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
for row in rows:
for k, v in row.items():
if hasattr(v, 'isoformat'):
row[k] = v.isoformat()
return {"total": total, "page": page, "page_size": page_size, "data": rows}
finally:
cursor.close()
conn.close()
@app.get("/api/pdi/{coilid}")
def get_pdi(coilid: str):
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT * FROM PLTM.PDI_PLTM WHERE COILID = :coilid", {"coilid": coilid})
columns = [col[0].lower() for col in cursor.description]
row = cursor.fetchone()
if not row:
raise HTTPException(status_code=404, detail="记录不存在")
result = dict(zip(columns, row))
for k, v in result.items():
if hasattr(v, 'isoformat'):
result[k] = v.isoformat()
return result
finally:
cursor.close()
conn.close()
@app.post("/api/pdi", status_code=201)
def create_pdi(data: PDIPLTMCreate):
conn = get_connection()
cursor = conn.cursor()
try:
# 1. 验证钢卷号长度
if len(data.coilid) != 12:
raise HTTPException(status_code=400, detail="钢卷号必须为12位")
# 2. 验证批次编号
if data.rollprogramnb:
# 获取数据库中最大的批次编号
cursor.execute("SELECT MAX(ROLLPROGRAMNB) FROM PLTM.PDI_PLTM")
max_batch = cursor.fetchone()[0]
if max_batch and data.rollprogramnb < max_batch:
raise HTTPException(status_code=400, detail="批次编号不能小于已有的最大批次编号")
# 3. 验证顺序号
if data.rollprogramnb and data.sequencenb:
# 检查本批次内是否已存在相同的顺序号
cursor.execute("""
SELECT COUNT(*) FROM PLTM.PDI_PLTM
WHERE ROLLPROGRAMNB = :batch AND SEQUENCENB = :seq
""", {"batch": data.rollprogramnb, "seq": data.sequencenb})
if cursor.fetchone()[0] > 0:
raise HTTPException(status_code=400, detail="本批次内顺序号不能重复")
# 检查本批次内的最大顺序号
cursor.execute("""
SELECT MAX(SEQUENCENB) FROM PLTM.PDI_PLTM
WHERE ROLLPROGRAMNB = :batch
""", {"batch": data.rollprogramnb})
max_seq = cursor.fetchone()[0]
if max_seq and data.sequencenb <= max_seq:
raise HTTPException(status_code=400, detail="本批次内顺序号必须递增")
fields = {k: v for k, v in data.model_dump(exclude_none=True).items()}
cols = ", ".join(f.upper() for f in fields.keys())
vals = ", ".join([f":{k}" for k in fields.keys()])
sql = f"INSERT INTO PLTM.PDI_PLTM ({cols}) VALUES ({vals})"
cursor.execute(sql, fields)
conn.commit()
# Mirror to SQLite
try:
sqlite_upsert_pdi(fields)
except Exception as e:
logger.warning("SQLite mirror failed on create: %s", e)
return {"message": "创建成功", "coilid": data.coilid}
except HTTPException:
raise
except Exception as e:
conn.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
@app.put("/api/pdi/{coilid}")
def update_pdi(coilid: str, data: PDIPLTMUpdate):
conn = get_connection()
cursor = conn.cursor()
try:
fields = {k: v for k, v in data.model_dump(exclude_none=True).items()}
if not fields:
raise HTTPException(status_code=400, detail="无更新字段")
# 验证批次编号
if "rollprogramnb" in fields:
# 获取数据库中最大的批次编号
cursor.execute("SELECT MAX(ROLLPROGRAMNB) FROM PLTM.PDI_PLTM")
max_batch = cursor.fetchone()[0]
if max_batch and fields["rollprogramnb"] < max_batch:
raise HTTPException(status_code=400, detail="批次编号不能小于已有的最大批次编号")
# 验证顺序号
if "rollprogramnb" in fields or "sequencenb" in fields:
# 获取当前记录的批次编号和顺序号
cursor.execute("""
SELECT ROLLPROGRAMNB, SEQUENCENB FROM PLTM.PDI_PLTM
WHERE COILID = :coilid
""", {"coilid": coilid})
current = cursor.fetchone()
if not current:
raise HTTPException(status_code=404, detail="记录不存在")
new_batch = fields.get("rollprogramnb", current[0])
new_seq = fields.get("sequencenb", current[1])
# 检查本批次内是否已存在相同的顺序号(排除当前记录)
cursor.execute("""
SELECT COUNT(*) FROM PLTM.PDI_PLTM
WHERE ROLLPROGRAMNB = :batch AND SEQUENCENB = :seq AND COILID != :coilid
""", {"batch": new_batch, "seq": new_seq, "coilid": coilid})
if cursor.fetchone()[0] > 0:
raise HTTPException(status_code=400, detail="本批次内顺序号不能重复")
# 检查本批次内的最大顺序号
cursor.execute("""
SELECT MAX(SEQUENCENB) FROM PLTM.PDI_PLTM
WHERE ROLLPROGRAMNB = :batch AND COILID != :coilid
""", {"batch": new_batch, "coilid": coilid})
max_seq = cursor.fetchone()[0]
if max_seq and new_seq <= max_seq:
raise HTTPException(status_code=400, detail="本批次内顺序号必须递增")
set_clause = ", ".join([f"{k.upper()} = :{k}" for k in fields.keys()])
fields["coilid_"] = coilid
sql = f"UPDATE PLTM.PDI_PLTM SET {set_clause} WHERE COILID = :coilid_"
cursor.execute(sql, fields)
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="记录不存在")
conn.commit()
# Mirror to SQLite: re-fetch the updated row
try:
cursor2 = conn.cursor()
cursor2.execute("SELECT * FROM PDI_PLTM WHERE COILID = :c", {"c": coilid})
cols = [d[0].lower() for d in cursor2.description]
row = cursor2.fetchone()
cursor2.close()
if row:
row_dict = dict(zip(cols, row))
for k, v in row_dict.items():
if hasattr(v, 'isoformat'):
row_dict[k] = v.isoformat()
sqlite_upsert_pdi(row_dict)
except Exception as e:
logger.warning("SQLite mirror failed on update: %s", e)
return {"message": "更新成功"}
except HTTPException:
raise
except Exception as e:
conn.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
@app.delete("/api/pdi/{coilid}")
def delete_pdi(coilid: str):
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("DELETE FROM PLTM.PDI_PLTM WHERE COILID = :coilid", {"coilid": coilid})
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="记录不存在")
conn.commit()
# Mirror to SQLite
try:
sqlite_delete_pdi(coilid)
except Exception as e:
logger.warning("SQLite mirror failed on delete: %s", e)
return {"message": "删除成功"}
except HTTPException:
raise
except Exception as e:
conn.rollback()
raise HTTPException(status_code=400, detail=str(e))
finally:
cursor.close()
conn.close()
# ─────────────────────────────────────────────
# CMPT_PL_TRACKMAP
# ─────────────────────────────────────────────
@app.get("/api/trackmap")
def list_trackmap():
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute(
"SELECT POSITION, COILID, BEF_ES, ES, ENT_LOO, PL, INT_LOO, "
"ST, EXI_LOO, RUN_SPEED_MIN, RUN_SPEED_MAX, "
"WELD_SPEED_MIN, WELD_SPEED_MAX, TOC, TOM, MOP "
"FROM PLTM.CMPT_PL_TRACKMAP ORDER BY POSITION"
)
columns = [col[0].lower() for col in cursor.description]
rows = [dict(zip(columns, row)) for row in cursor.fetchall()]
for row in rows:
for k, v in row.items():
if hasattr(v, 'isoformat'):
row[k] = v.isoformat()
return rows
finally:
cursor.close()
conn.close()
# ─────────────────────────────────────────────
# OPC Configuration & Status
# ─────────────────────────────────────────────
@app.get("/api/opc/config")
def get_opc_config():
return {
"opc_url": opc_service.opc_url,
"counter_node": opc_service.counter_node,
"trackmap_nodes": opc_service.trackmap_nodes,
"poll_interval": opc_service.poll_interval,
"signal1_node": opc_service.signal1_node,
"signal2_node": opc_service.signal2_node,
"write_counter_node": opc_service.write_counter_node,
"write_source_node": opc_service.write_source_node,
"write_target_node": opc_service.write_target_node,
"write_s7_endpoint": opc_service.write_s7_endpoint,
"write_s7_rack": opc_service.write_s7_rack,
"write_s7_slot": opc_service.write_s7_slot,
"write_nodes": opc_service.write_nodes,
"running": opc_service.running,
"last_counter": opc_service.last_counter,
"last_update": opc_service.last_update,
}
@app.post("/api/opc/config")
async def save_opc_config(config: OpcConfig):
await opc_service.stop_polling()
opc_service.opc_url = config.opc_url
opc_service.counter_node = config.counter_node
opc_service.trackmap_nodes = config.trackmap_nodes
opc_service.poll_interval = config.poll_interval
opc_service.signal1_node = config.signal1_node
opc_service.signal2_node = config.signal2_node
opc_service.write_counter_node = config.write_counter_node
opc_service.write_source_node = config.write_source_node
opc_service.write_target_node = config.write_target_node
opc_service.write_s7_endpoint = config.write_s7_endpoint
opc_service.write_s7_rack = config.write_s7_rack
opc_service.write_s7_slot = config.write_s7_slot
opc_service.write_nodes = config.write_nodes
opc_service.write_counter_last = None
try:
opc_service.save_config()
except Exception as e:
logger.warning("Persist OPC config failed: %s", e)
raise HTTPException(status_code=500, detail=f"配置保存失败: {e}")
asyncio.create_task(opc_service.start_polling())
return {"message": "OPC配置已保存并重启轮询"}
@app.get("/api/opc/status")
def opc_status():
# 获取当前正在追踪的4个钢卷
current_tracking_coils = []
try:
from sqlite_sync import sqlite_get_coil_track
coils = sqlite_get_coil_track()
current_tracking_coils = [
{
"coilid": coil.get("coilid", ""),
"sequencenb": coil.get("sequencenb", 0),
"rollprogramnb": coil.get("rollprogramnb", 0)
}
for coil in coils[:4]
]
except Exception as e:
logger.warning(f"Failed to get current tracking coils: {e}")
return {
"running": opc_service.running,
"last_counter": opc_service.last_counter,
"last_update": opc_service.last_update,
"log": opc_service.event_log[-50:],
"track_state": opc_service.track_state,
"write_counter_last": opc_service.write_counter_last,
"tracking_info": {
"first_coilid": opc_service.first_coilid, # 这个保持不变
"last_tracked_coilid": opc_service.last_tracked_coilid,
"end_coilid": opc_service.end_coilid,
"tracking_ended": opc_service.tracking_ended,
"current_tracking_coils": current_tracking_coils # 当前正在追踪的4个钢卷
}
}
@app.post("/api/opc/restart")
async def restart_opc():
await opc_service.stop_polling()
asyncio.create_task(opc_service.start_polling())
return {"message": "OPC服务已重启"}
# ─────────────────────────────────────────────
# 钢种查询接口 (为酸洗提供选择)
# ─────────────────────────────────────────────
@app.get("/api/grades/entry")
def get_entry_grades():
"""获取来料钢种列表 - 来自 PL_BM_WELD_MACHINE.ALLOY_ID"""
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT DISTINCT ALLOY_ID
FROM PL_BM_WELD_MACHINE
WHERE ALLOY_ID IS NOT NULL AND TRIM(ALLOY_ID) <> ''
ORDER BY ALLOY_ID
""")
grades = [row[0] for row in cursor.fetchall()]
return grades
except Exception as e:
logger.warning("获取来料钢种失败: %s", e)
return []
finally:
cursor.close()
conn.close()
@app.get("/api/grades/product")
def get_product_grades():
"""获取成品钢种列表 - 来自 L3_GRADE.L3_GRADE"""
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT DISTINCT L3_GRADE
FROM L3_GRADE
WHERE L3_GRADE IS NOT NULL AND TRIM(L3_GRADE) <> ''
ORDER BY L3_GRADE
""")
grades = [row[0] for row in cursor.fetchall()]
return grades
except Exception as e:
logger.warning("获取成品钢种失败: %s", e)
return []
finally:
cursor.close()
conn.close()
@app.get("/api/grades/l2model")
def get_l2_model_grades():
"""获取二级模型钢种列表 - 来自 L3_GRADE.L2_GRADE"""
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("""
SELECT DISTINCT L2_GRADE
FROM L3_GRADE
WHERE L2_GRADE IS NOT NULL AND TRIM(L2_GRADE) <> ''
ORDER BY L2_GRADE
""")
grades = [row[0] for row in cursor.fetchall()]
return grades
except Exception as e:
logger.warning("获取二级模型钢种失败: %s", e)
return []
finally:
cursor.close()
conn.close()
@app.get("/api/pdi/next-numbers")
def get_next_numbers():
"""
获取下一个可用的批次编号和顺序号
- 批次编号格式: 年月日(6位) + 批次顺序(3位),如 260406001
- 顺序号: 当前批次内的最大顺序号+1
"""
try:
today = datetime.datetime.now().strftime("%y%m%d")
today_prefix = int(today) * 1000 # 如 260406000
# 获取连接
conn = get_connection()
# 开启事务
conn.autocommit = False
cursor = conn.cursor()
# ==============================================
# 1. 查询今天已有的最大批次号
# ==============================================
cursor.execute("""
SELECT MAX(ROLLPROGRAMNB)
FROM PLTM.PDI_PLTM
WHERE ROLLPROGRAMNB BETWEEN :min_val AND :max_val
""", {"min_val": today_prefix, "max_val": today_prefix + 999})
row = cursor.fetchone()
max_today_batch = row[0] if row[0] is not None else today_prefix
# ==============================================
# 3. 查询【当前最大批次】的顺序号
# ==============================================
cursor.execute("""
SELECT NVL(MAX(SEQUENCENB), 0)
FROM PLTM.PDI_PLTM
WHERE ROLLPROGRAMNB = :batch
""", {"batch": max_today_batch})
max_sequence = cursor.fetchone()[0]
next_sequence = max_sequence + 1
# 提交事务
conn.commit()
return {
"rollprogramnb": max_today_batch,
"sequencenb": next_sequence,
"batch_date": today,
"max_batch_today": max_today_batch if max_today_batch > today_prefix else None
}
except Exception as e:
# 回滚
if 'conn' in locals():
conn.rollback()
logger.warning("获取下一编号失败: %s", str(e))
return {"rollprogramnb": None, "sequencenb": None}
finally:
# 安全关闭
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()
# ─────────────────────────────────────────────
# COIL_TRACK_TEMP 临时跟踪表 CRUD
# ─────────────────────────────────────────────
@app.get("/api/track/coils")
def get_track_coils():
"""获取临时跟踪表中的钢卷列表"""
coils = sqlite_get_coil_track()
return {"data": coils}
@app.post("/api/track/coils")
def add_track_coil(data: dict):
"""新增一个钢卷到临时跟踪表"""
coilid = data.get("coilid")
sequencenb = data.get("sequencenb")
rollprogramnb = data.get("rollprogramnb")
if not coilid:
raise HTTPException(status_code=400, detail="coilid不能为空")
sqlite_add_coil_track_item(coilid, sequencenb or 0, rollprogramnb or 0)
return {"message": "添加成功"}
@app.put("/api/track/coils/{id}")
def update_track_coil(id: int, data: dict):
"""更新临时跟踪表中的钢卷"""
coilid = data.get("coilid")
sequencenb = data.get("sequencenb", 0)
rollprogramnb = data.get("rollprogramnb", 0)
position = data.get("position", 1)
if not coilid:
raise HTTPException(status_code=400, detail="coilid不能为空")
sqlite_update_coil_track_item(id, coilid, sequencenb, rollprogramnb, position)
return {"message": "更新成功"}
@app.delete("/api/track/coils/{id}")
def delete_track_coil(id: int):
"""删除临时跟踪表中的钢卷"""
sqlite_delete_coil_track_item(id)
return {"message": "删除成功"}
@app.delete("/api/track/coils")
def clear_track_coils():
"""清空临时跟踪表"""
sqlite_clear_coil_track()
return {"message": "清空成功"}
@app.get("/api/track/coils/range")
def get_coils_by_range(start: int = Query(1), end: int = Query(5)):
"""根据顺序号范围查询钢卷"""
coils = sqlite_get_coils_by_sequencenb_range(start, end)
return {"data": coils}
# ─────────────────────────────────────────────
# OPC Signal Configuration
# ─────────────────────────────────────────────
@app.get("/api/opc/signals")
def get_signal_config():
"""获取信号节点配置"""
return {
"signal1_node": opc_service.signal1_node,
"signal2_node": opc_service.signal2_node,
}
@app.post("/api/opc/signals")
async def save_signal_config(data: dict):
"""保存信号节点配置"""
opc_service.signal1_node = data.get("signal1_node", opc_service.signal1_node)
opc_service.signal2_node = data.get("signal2_node", opc_service.signal2_node)
try:
opc_service.save_config()
except Exception as e:
logger.warning("Save signal config failed: %s", e)
raise HTTPException(status_code=500, detail=f"配置保存失败: {e}")
return {"message": "信号节点配置已保存"}
# ─────────────────────────────────────────────
# 模拟信号接口 (用于测试)
# ─────────────────────────────────────────────
@app.post("/api/track/simulate/signal1")
async def simulate_signal1():
"""模拟信号1触发 - 获取下5个钢卷"""
await opc_service._handle_signal1()
return {"message": "信号1已触发"}
@app.post("/api/track/simulate/signal2")
async def simulate_signal2():
"""模拟信号2触发 - 更新追踪表"""
await opc_service._handle_signal2()
return {"message": "信号2已触发"}
# ─────────────────────────────────────────────
# Manual Starting Coil Configuration
# ─────────────────────────────────────────────
@app.post("/api/track/set-start-coil")
async def set_start_coil(data: dict):
"""设置手动起始钢卷ID用于追踪"""
coilid = data.get("coilid")
if not coilid:
raise HTTPException(status_code=400, detail="coilid不能为空")
# 验证钢卷是否存在
conn = get_connection()
cursor = conn.cursor()
try:
cursor.execute("SELECT COUNT(*) FROM PLTM.PDI_PLTM WHERE COILID = :coilid", {"coilid": coilid})
if cursor.fetchone()[0] == 0:
raise HTTPException(status_code=404, detail=f"钢卷 {coilid} 不存在")
finally:
cursor.close()
conn.close()
# 设置起始钢卷
opc_service.set_manual_start_coil(coilid)
# 自动读取起始钢卷及后续3个钢卷到暂存表
try:
await opc_service._handle_signal1()
except Exception as e:
# 即使读取失败也不影响设置成功
logger.warning(f"Failed to load initial coils after setting start coil: {e}")
return {"message": f"已设置起始钢卷: {coilid}"}
@app.get("/api/track/available-coils")
def get_available_coils(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=200),
coilid: Optional[str] = None,
sequencenb_min: Optional[int] = None,
sequencenb_max: Optional[int] = None
):
"""获取可用的钢卷列表用于选择起始点(支持搜索和分页)"""
conn = get_connection()
cursor = conn.cursor()
try:
# 构建查询条件
conditions = []
params = {}
if coilid:
conditions.append("COILID LIKE :coilid")
params["coilid"] = f"%{coilid}%"
if sequencenb_min is not None:
conditions.append("SEQUENCENB >= :sequencenb_min")
params["sequencenb_min"] = sequencenb_min
if sequencenb_max is not None:
conditions.append("SEQUENCENB <= :sequencenb_max")
params["sequencenb_max"] = sequencenb_max
where_clause = ""
if conditions:
where_clause = "WHERE " + " AND ".join(conditions)
# 查询总数
count_sql = f"SELECT COUNT(*) FROM PLTM.PDI_PLTM {where_clause}"
cursor.execute(count_sql, params)
total = cursor.fetchone()[0]
# 分页查询
offset = (page - 1) * page_size
data_sql = f"""
SELECT COILID, SEQUENCENB, ROLLPROGRAMNB, STEEL_GRADE,
ENTRY_COIL_THICKNESS, ENTRY_COIL_WIDTH, ENTRY_COIL_WEIGHT
FROM PLTM.PDI_PLTM
{where_clause}
ORDER BY COILID ASC
"""
cursor.execute(data_sql, params)
# 手动分页因为Oracle的ROWNUM处理比较复杂
all_rows = cursor.fetchall()
paginated_rows = all_rows[offset:offset + page_size]
columns = [col[0].lower() for col in cursor.description]
rows = [dict(zip(columns, row)) for row in paginated_rows]
return {
"data": rows,
"total": total,
"page": page,
"page_size": page_size
}
finally:
cursor.close()
conn.close()