""" SQLite mirror for PDI_PLTM and CMPT_PL_TRACKMAP. On startup (and on demand) the service pulls all rows from Oracle and upserts them into a local SQLite file (hefa_l2.db). Whenever the FastAPI endpoints write to Oracle they also call the corresponding sqlite_* helper here so the two databases stay in sync. """ import sqlite3 import logging import os from typing import Any, Dict, List, Optional logger = logging.getLogger(__name__) DB_PATH = os.path.join(os.path.dirname(__file__), "hefa_l2.db") # ───────────────────────────────────────────────────────────── # Connection helper # ───────────────────────────────────────────────────────────── def get_sqlite() -> sqlite3.Connection: conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn # ───────────────────────────────────────────────────────────── # Schema bootstrap # ───────────────────────────────────────────────────────────── PDI_DDL = """ CREATE TABLE IF NOT EXISTS PDI_PLTM ( SID INTEGER, ROLLPROGRAMNB INTEGER, SEQUENCENB INTEGER, STATUS INTEGER DEFAULT 0, SCHEDULE_CODE TEXT, COILID TEXT NOT NULL PRIMARY KEY, ENTRY_COIL_THICKNESS REAL, ENTRY_COIL_THICKNESS_MAX REAL, ENTRY_COIL_THICKNESS_MIN REAL, ENTRY_COIL_WIDTH REAL, ENTRY_COIL_WIDTH_MAX REAL, ENTRY_COIL_WIDTH_MIN REAL, ENTRY_COIL_WEIGHT REAL, ENTRY_OF_COIL_LENGTH REAL, ENTRY_OF_COIL_INNER_DIAMETER REAL, ENTRY_OF_COIL_OUTER_DIAMETER REAL, TRIMMING INTEGER, TRIMMING_WIDTH REAL, SMP_LENGTH REAL, SMP_NUM REAL, SMP_FRQ TEXT, SMP_NUM_HEAD REAL, SMP_NUM_MID REAL, SMP_NUM_TAIL REAL, PRECEDING_PROCESS_CODE TEXT, NEXT_PROCESS_CODE TEXT, HOT_MILL_DELIVERY_TEMP REAL, FINISHED_COIL_TEMP REAL, CROWN_AVERAGE REAL, COIL_FLATNESS_AVERAGE REAL, COIL_FLATNESS_MAX_VALUE REAL, COIL_FLATNESS_MIN_VALUE REAL, MATERIAL_YIELD_POINT REAL, MATERIAL_TENSILE REAL, HOTACTFMWEDGEAVG REAL, WEIGHT_MODE TEXT, DUMMY_COIL_MRK TEXT, CUT_MODE TEXT, OFF_GAUGE_HEAD_LENGTH REAL, OFF_GAUGE_TAIL_LENGTH REAL, EXIT_COIL_NO TEXT, EXIT_COIL_WEIGHT REAL, EXIT_COIL_WEIGHT_MAX REAL, EXIT_COIL_WEIGHT_MIN REAL, EXIT_COIL_THICKNESS REAL, EXIT_COIL_THICKNESS_MAX REAL, EXIT_COIL_THICKNESS_MIN REAL, EXIT_COIL_WIDTH REAL, EXIT_COIL_WIDTH_MAX REAL, EXIT_COIL_WIDTH_MIN REAL, WORK_ORDER_NO TEXT, ORDER_QUALITY TEXT, STEEL_GRADE TEXT, SG_SIGN TEXT, ORDER_THICKNESS REAL, ORDER_THICKNESS_MAX REAL, ORDER_THICKNESS_MIN REAL, ORDER_WIDTH REAL, ORDER_WIDTH_MAX REAL, ORDER_WIDTH_MIN REAL, SLEEVE_CODE_OF_COLD_COIL TEXT, PACKING_TYPE_CODE TEXT, THK_DS TEXT, EXT_NUM_01 TEXT, C REAL, SI REAL, MN REAL, P REAL, S REAL, CU REAL, NI REAL, CR REAL, MO REAL, V REAL, TI REAL, SOL_AL REAL, FE REAL, NB REAL, N REAL, B REAL, SEND_FLAG TEXT, SEND_DATE TEXT, TRANSACTION_ID TEXT, VERSION INTEGER, TEXT1 TEXT, TEXT2 TEXT, TEXT3 TEXT, TEXT4 TEXT, TEXT5 TEXT, TOC TEXT, TOM TEXT, MOP TEXT, POSITION INTEGER DEFAULT 0, CROSS_SECTION_AREA REAL, UNCOILER_TENSION REAL, LOOPER_TENSION_1 REAL, PL_TENSION REAL, LOOPER_TENSION_2 REAL, LOOPER_TENSION_3 REAL, METERWEIGHT REAL, METER_D_OUTSIDE REAL, METER_WIDTH REAL, SCRAP_CUT_HEAD_LEN REAL, SCRAP_CUT_TAIL_LEN REAL, COILER_DIAMETER INTEGER, L2_GRADE TEXT, CREATED_BY TEXT, CREATED_DT TEXT, CREATED_BY_NAME TEXT, UPDATED_BY TEXT, UPDATED_DT TEXT, UPDATED_BY_NAME TEXT ) """ TRACKMAP_DDL = """ CREATE TABLE IF NOT EXISTS CMPT_PL_TRACKMAP ( POSITION INTEGER PRIMARY KEY, COILID TEXT, BEF_ES INTEGER, ES INTEGER, ENT_LOO INTEGER, PL INTEGER, INT_LOO INTEGER, ST INTEGER, EXI_LOO INTEGER, RUN_SPEED_MIN REAL, RUN_SPEED_MAX REAL, WELD_SPEED_MIN REAL, WELD_SPEED_MAX REAL, TOC TEXT, TOM TEXT, MOP TEXT ) """ def init_db(): """Create tables if they don't exist.""" conn = get_sqlite() try: conn.execute(PDI_DDL) conn.execute(TRACKMAP_DDL) conn.commit() logger.info("SQLite schema ready: %s", DB_PATH) finally: conn.close() # ───────────────────────────────────────────────────────────── # Full sync from Oracle → SQLite # ───────────────────────────────────────────────────────────── def _oracle_rows_to_dicts(cursor) -> List[Dict[str, Any]]: columns = [col[0].upper() for col in cursor.description] rows = [] for raw in cursor.fetchall(): row = {} for col, val in zip(columns, raw): if hasattr(val, 'isoformat'): val = val.isoformat() row[col] = val rows.append(row) return rows def sync_pdi_from_oracle() -> int: """Pull all PDI_PLTM rows from Oracle and UPSERT into SQLite.""" from database import get_connection oc = get_connection() oc_cur = oc.cursor() try: oc_cur.execute("SELECT * FROM PDI_PLTM") rows = _oracle_rows_to_dicts(oc_cur) finally: oc_cur.close() oc.close() if not rows: return 0 sc = get_sqlite() try: cols = list(rows[0].keys()) placeholders = ", ".join([f":{c}" for c in cols]) col_list = ", ".join(cols) sql = ( f"INSERT OR REPLACE INTO PDI_PLTM ({col_list}) " f"VALUES ({placeholders})" ) sc.executemany(sql, rows) sc.commit() logger.info("Synced %d PDI_PLTM rows to SQLite", len(rows)) return len(rows) finally: sc.close() def sync_trackmap_from_oracle() -> int: """Pull all CMPT_PL_TRACKMAP rows from Oracle and UPSERT into SQLite.""" from database import get_connection oc = get_connection() oc_cur = oc.cursor() try: oc_cur.execute( "SELECT POSITION, COILID, BEF_ES, ES, ENT_LOO, PL, INT_LOO, " "ST, EXI_LOO, RUN_SPEED_MIN, RUN_SPEED_MAX, " "WELD_SPEED_MIN, WELD_SPEED_MAX, TOC, TOM, MOP " "FROM PLTM.CMPT_PL_TRACKMAP ORDER BY POSITION" ) rows = _oracle_rows_to_dicts(oc_cur) finally: oc_cur.close() oc.close() if not rows: return 0 sc = get_sqlite() try: cols = list(rows[0].keys()) placeholders = ", ".join([f":{c}" for c in cols]) col_list = ", ".join(cols) sql = ( f"INSERT OR REPLACE INTO CMPT_PL_TRACKMAP ({col_list}) " f"VALUES ({placeholders})" ) sc.executemany(sql, rows) sc.commit() logger.info("Synced %d CMPT_PL_TRACKMAP rows to SQLite", len(rows)) return len(rows) finally: sc.close() def sync_all_from_oracle() -> Dict[str, int]: pdi = sync_pdi_from_oracle() tm = sync_trackmap_from_oracle() return {"pdi_pltm": pdi, "cmpt_pl_trackmap": tm} # ───────────────────────────────────────────────────────────── # Incremental write-through helpers (called after Oracle commits) # ───────────────────────────────────────────────────────────── def sqlite_upsert_pdi(row: Dict[str, Any]): """Insert or replace one PDI_PLTM row in SQLite.""" sc = get_sqlite() try: upper = {k.upper(): v for k, v in row.items()} cols = list(upper.keys()) placeholders = ", ".join([f":{c}" for c in cols]) col_list = ", ".join(cols) sc.execute( f"INSERT OR REPLACE INTO PDI_PLTM ({col_list}) VALUES ({placeholders})", upper ) sc.commit() finally: sc.close() def sqlite_delete_pdi(coilid: str): sc = get_sqlite() try: sc.execute("DELETE FROM PDI_PLTM WHERE COILID = ?", (coilid,)) sc.commit() finally: sc.close() def sqlite_upsert_trackmap(row: Dict[str, Any]): sc = get_sqlite() try: upper = {k.upper(): v for k, v in row.items()} cols = list(upper.keys()) placeholders = ", ".join([f":{c}" for c in cols]) col_list = ", ".join(cols) sc.execute( f"INSERT OR REPLACE INTO CMPT_PL_TRACKMAP ({col_list}) VALUES ({placeholders})", upper ) sc.commit() finally: sc.close()