from __future__ import annotations import json import os import shutil import subprocess import tempfile import logging from datetime import datetime from pathlib import Path from typing import Any, Dict, List, Optional from threading import Lock from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from minio import Minio import redis from sqlalchemy import create_all_models, create_engine, Column, Integer, String, Boolean, DateTime, Text, ForeignKey from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker, Session # 日志配置 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("DataKeep") DATA_DIR = Path(os.environ.get("DATAKEEP_DATA_DIR", "./datakeep_data")).resolve() DATA_DIR.mkdir(parents=True, exist_ok=True) DB_PATH = DATA_DIR / "datakeep.db" RUN_DIR = DATA_DIR / "runs" RUN_DIR.mkdir(parents=True, exist_ok=True) # SQLAlchemy 设置 engine = create_engine(f"sqlite:///{DB_PATH}", connect_args={"check_same_thread": False}) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() # --- 数据库模型 --- class Instance(Base): __tablename__ = "instances" id = Column(String, primary_key=True, index=True) name = Column(String) enabled = Column(Boolean, default=True) sync_hour = Column(Integer, default=2) # 存储连接信息的 JSON 字符串 config_json = Column(Text) class RunRecord(Base): __tablename__ = "run_records" id = Column(Integer, primary_key=True, index=True, autoincrement=True) instance_id = Column(String, index=True) started_at = Column(DateTime, default=datetime.utcnow) finished_at = Column(DateTime, nullable=True) ok = Column(Boolean, default=False) steps_json = Column(Text) Base.metadata.create_all(bind=engine) # --- Pydantic 模型 --- class MySQLConn(BaseModel): host: str port: int = 3306 user: str password: str database: str class RedisConn(BaseModel): host: str port: int = 6379 password: Optional[str] = None class MinIOConn(BaseModel): endpoint: str access_key: str secret_key: str secure: bool = False class InstanceSchema(BaseModel): id: str name: str enabled: bool = True sync_hour: int = Field(2, ge=0, le=23) prod_mysql: Optional[MySQLConn] = None dr_mysql: Optional[MySQLConn] = None prod_redis: Optional[RedisConn] = None dr_redis: Optional[RedisConn] = None prod_minio: Optional[MinIOConn] = None dr_minio: Optional[MinIOConn] = None minio_buckets: List[str] = [] class Config: from_attributes = True # --- 依赖项 --- def get_db(): db = SessionLocal() try: yield db finally: db.close() # --- 同步逻辑 (保持不变但适配新模型) --- def mysql_sync(prod: MySQLConn, dr: MySQLConn): with tempfile.TemporaryDirectory() as td: dump_path = Path(td) / "dump.sql" # 增加 --add-drop-database 确保目标端库级别也能同步更新/重建 dump_args = [ "mysqldump", f"-h{prod.host}", f"-P{prod.port}", f"-u{prod.user}", "--databases", prod.database, "--add-drop-database", "--single-transaction", "--set-gtid-purged=OFF", "--column-statistics=0", "--routines", "--triggers" ] env = {**os.environ, "MYSQL_PWD": prod.password} with open(dump_path, "w", encoding="utf-8") as f: p1 = subprocess.run(dump_args, stdout=f, stderr=subprocess.PIPE, text=True, env=env) if p1.returncode != 0: raise RuntimeError(f"MySQL Dump Error: {p1.stderr}") load_args = ["mysql", f"-h{dr.host}", f"-P{dr.port}", f"-u{dr.user}"] env_dr = {**os.environ, "MYSQL_PWD": dr.password} with open(dump_path, "r", encoding="utf-8") as f: p2 = subprocess.run(load_args, stdin=f, stderr=subprocess.PIPE, text=True, env=env_dr) if p2.returncode != 0: raise RuntimeError(f"MySQL Load Error: {p2.stderr}") def redis_sync(prod: RedisConn, dr: RedisConn): r_prod = redis.Redis(host=prod.host, port=prod.port, password=prod.password, decode_responses=False) r_dr = redis.Redis(host=dr.host, port=dr.port, password=dr.password, decode_responses=False) r_dr.flushall() cursor = 0 count = 0 while True: cursor, keys = r_prod.scan(cursor=cursor, count=1000) for key in keys: ttl = r_prod.ttl(key) if ttl == -2: continue px_ttl = ttl * 1000 if ttl > 0 else 0 val = r_prod.dump(key) if val: r_dr.restore(key, px_ttl, val, replace=True) count += 1 if cursor == 0: break return count def minio_sync(prod: MinIOConn, dr: MinIOConn, buckets: List[str]): client_p = Minio(prod.endpoint, access_key=prod.access_key, secret_key=prod.secret_key, secure=prod.secure) client_d = Minio(dr.endpoint, access_key=dr.access_key, secret_key=dr.secret_key, secure=dr.secure) synced = 0 for b in buckets: if not client_d.bucket_exists(b): client_d.make_bucket(b) for obj in client_p.list_objects(b, recursive=True): try: stat_d = client_d.stat_object(b, obj.object_name) if stat_d.size == obj.size and stat_d.etag == obj.etag: continue except: pass res = client_p.get_object(b, obj.object_name) try: client_d.put_object(b, obj.object_name, res, obj.size, content_type=obj.content_type or "application/octet-stream") synced += 1 finally: res.close() res.release_conn() return synced # --- 任务调度 --- scheduler = BackgroundScheduler(timezone="Asia/Shanghai") instance_locks: Dict[str, Lock] = {} def run_instance_task(instance_id: str): if instance_id not in instance_locks: instance_locks[instance_id] = Lock() if not instance_locks[instance_id].acquire(blocking=False): return db = SessionLocal() record = RunRecord(instance_id=instance_id, started_at=datetime.utcnow(), steps_json="[]") db.add(record) db.commit() steps = [] ok = True try: inst_db = db.query(Instance).filter(Instance.id == instance_id).first() if not inst_db: return cfg = InstanceSchema.model_validate(json.loads(inst_db.config_json)) if cfg.prod_mysql and cfg.dr_mysql: try: mysql_sync(cfg.prod_mysql, cfg.dr_mysql) steps.append({"name": "MySQL", "ok": True}) except Exception as e: steps.append({"name": "MySQL", "ok": False, "error": str(e)}) ok = False if ok and cfg.prod_redis and cfg.dr_redis: try: keys = redis_sync(cfg.prod_redis, cfg.dr_redis) steps.append({"name": "Redis", "ok": True, "count": keys}) except Exception as e: steps.append({"name": "Redis", "ok": False, "error": str(e)}) ok = False if ok and cfg.prod_minio and cfg.dr_minio and cfg.minio_buckets: try: files = minio_sync(cfg.prod_minio, cfg.dr_minio, cfg.minio_buckets) steps.append({"name": "MinIO", "ok": True, "count": files}) except Exception as e: steps.append({"name": "MinIO", "ok": False, "error": str(e)}) ok = False except Exception as e: ok = False steps.append({"name": "System", "ok": False, "error": str(e)}) finally: record.finished_at = datetime.utcnow() record.ok = ok record.steps_json = json.dumps(steps) db.commit() db.close() instance_locks[instance_id].release() def reschedule_jobs(): scheduler.remove_all_jobs() db = SessionLocal() instances = db.query(Instance).filter(Instance.enabled == True).all() for inst in instances: scheduler.add_job(run_instance_task, CronTrigger(hour=inst.sync_hour, minute=0), args=[inst.id], id=inst.id) db.close() # --- API --- app = FastAPI(title="DataKeep") app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) @app.on_event("startup") def startup(): reschedule_jobs() scheduler.start() @app.get("/api/instances", response_model=List[InstanceSchema]) def list_instances(db: Session = Depends(get_db)): insts = db.query(Instance).all() return [InstanceSchema.model_validate(json.loads(i.config_json)) for i in insts] @app.post("/api/instances") def create_instance(inst: InstanceSchema, db: Session = Depends(get_db)): db_inst = Instance(id=inst.id, name=inst.name, enabled=inst.enabled, sync_hour=inst.sync_hour, config_json=inst.model_dump_json()) db.add(db_inst) db.commit() reschedule_jobs() return {"status": "ok"} @app.put("/api/instances/{id}") def update_instance(id: str, inst: InstanceSchema, db: Session = Depends(get_db)): db_inst = db.query(Instance).filter(Instance.id == id).first() if not db_inst: raise HTTPException(404) db_inst.name = inst.name db_inst.enabled = inst.enabled db_inst.sync_hour = inst.sync_hour db_inst.config_json = inst.model_dump_json() db.commit() reschedule_jobs() return {"status": "ok"} @app.delete("/api/instances/{id}") def delete_instance(id: str, db: Session = Depends(get_db)): db.query(Instance).filter(Instance.id == id).delete() db.commit() reschedule_jobs() return {"status": "ok"} @app.get("/api/instances/{id}/runs") def get_runs(id: str, db: Session = Depends(get_db)): runs = db.query(RunRecord).filter(RunRecord.instance_id == id).order_by(RunRecord.started_at.desc()).limit(20).all() return [{"id": r.id, "started_at": r.started_at, "finished_at": r.finished_at, "ok": r.ok, "steps": json.loads(r.steps_json)} for r in runs] @app.post("/api/instances/{id}/run") def trigger_run(id: str, bg: BackgroundTasks): bg.add_task(run_instance_task, id) return {"status": "triggered"} @app.get("/api/health") def health(): return {"status": "ok"}