Files
datakeep/backend/app.py
2026-02-09 18:07:46 +08:00

293 lines
10 KiB
Python

from __future__ import annotations
import json
import os
import shutil
import subprocess
import tempfile
import logging
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional
from threading import Lock
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from fastapi import FastAPI, HTTPException, BackgroundTasks, Depends
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from minio import Minio
import redis
from sqlalchemy import create_all_models, create_engine, Column, Integer, String, Boolean, DateTime, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
# 日志配置
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("DataKeep")
DATA_DIR = Path(os.environ.get("DATAKEEP_DATA_DIR", "./datakeep_data")).resolve()
DATA_DIR.mkdir(parents=True, exist_ok=True)
DB_PATH = DATA_DIR / "datakeep.db"
RUN_DIR = DATA_DIR / "runs"
RUN_DIR.mkdir(parents=True, exist_ok=True)
# SQLAlchemy 设置
engine = create_engine(f"sqlite:///{DB_PATH}", connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# --- 数据库模型 ---
class Instance(Base):
__tablename__ = "instances"
id = Column(String, primary_key=True, index=True)
name = Column(String)
enabled = Column(Boolean, default=True)
sync_hour = Column(Integer, default=2)
# 存储连接信息的 JSON 字符串
config_json = Column(Text)
class RunRecord(Base):
__tablename__ = "run_records"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
instance_id = Column(String, index=True)
started_at = Column(DateTime, default=datetime.utcnow)
finished_at = Column(DateTime, nullable=True)
ok = Column(Boolean, default=False)
steps_json = Column(Text)
Base.metadata.create_all(bind=engine)
# --- Pydantic 模型 ---
class MySQLConn(BaseModel):
host: str
port: int = 3306
user: str
password: str
database: str
class RedisConn(BaseModel):
host: str
port: int = 6379
password: Optional[str] = None
class MinIOConn(BaseModel):
endpoint: str
access_key: str
secret_key: str
secure: bool = False
class InstanceSchema(BaseModel):
id: str
name: str
enabled: bool = True
sync_hour: int = Field(2, ge=0, le=23)
prod_mysql: Optional[MySQLConn] = None
dr_mysql: Optional[MySQLConn] = None
prod_redis: Optional[RedisConn] = None
dr_redis: Optional[RedisConn] = None
prod_minio: Optional[MinIOConn] = None
dr_minio: Optional[MinIOConn] = None
minio_buckets: List[str] = []
class Config:
from_attributes = True
# --- 依赖项 ---
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# --- 同步逻辑 (保持不变但适配新模型) ---
def mysql_sync(prod: MySQLConn, dr: MySQLConn):
with tempfile.TemporaryDirectory() as td:
dump_path = Path(td) / "dump.sql"
# 增加 --add-drop-database 确保目标端库级别也能同步更新/重建
dump_args = [
"mysqldump", f"-h{prod.host}", f"-P{prod.port}", f"-u{prod.user}",
"--databases", prod.database,
"--add-drop-database",
"--single-transaction",
"--set-gtid-purged=OFF",
"--column-statistics=0",
"--routines",
"--triggers"
]
env = {**os.environ, "MYSQL_PWD": prod.password}
with open(dump_path, "w", encoding="utf-8") as f:
p1 = subprocess.run(dump_args, stdout=f, stderr=subprocess.PIPE, text=True, env=env)
if p1.returncode != 0: raise RuntimeError(f"MySQL Dump Error: {p1.stderr}")
load_args = ["mysql", f"-h{dr.host}", f"-P{dr.port}", f"-u{dr.user}"]
env_dr = {**os.environ, "MYSQL_PWD": dr.password}
with open(dump_path, "r", encoding="utf-8") as f:
p2 = subprocess.run(load_args, stdin=f, stderr=subprocess.PIPE, text=True, env=env_dr)
if p2.returncode != 0: raise RuntimeError(f"MySQL Load Error: {p2.stderr}")
def redis_sync(prod: RedisConn, dr: RedisConn):
r_prod = redis.Redis(host=prod.host, port=prod.port, password=prod.password, decode_responses=False)
r_dr = redis.Redis(host=dr.host, port=dr.port, password=dr.password, decode_responses=False)
r_dr.flushall()
cursor = 0
count = 0
while True:
cursor, keys = r_prod.scan(cursor=cursor, count=1000)
for key in keys:
ttl = r_prod.ttl(key)
if ttl == -2: continue
px_ttl = ttl * 1000 if ttl > 0 else 0
val = r_prod.dump(key)
if val:
r_dr.restore(key, px_ttl, val, replace=True)
count += 1
if cursor == 0: break
return count
def minio_sync(prod: MinIOConn, dr: MinIOConn, buckets: List[str]):
client_p = Minio(prod.endpoint, access_key=prod.access_key, secret_key=prod.secret_key, secure=prod.secure)
client_d = Minio(dr.endpoint, access_key=dr.access_key, secret_key=dr.secret_key, secure=dr.secure)
synced = 0
for b in buckets:
if not client_d.bucket_exists(b): client_d.make_bucket(b)
for obj in client_p.list_objects(b, recursive=True):
try:
stat_d = client_d.stat_object(b, obj.object_name)
if stat_d.size == obj.size and stat_d.etag == obj.etag: continue
except: pass
res = client_p.get_object(b, obj.object_name)
try:
client_d.put_object(b, obj.object_name, res, obj.size, content_type=obj.content_type or "application/octet-stream")
synced += 1
finally:
res.close()
res.release_conn()
return synced
# --- 任务调度 ---
scheduler = BackgroundScheduler(timezone="Asia/Shanghai")
instance_locks: Dict[str, Lock] = {}
def run_instance_task(instance_id: str):
if instance_id not in instance_locks: instance_locks[instance_id] = Lock()
if not instance_locks[instance_id].acquire(blocking=False): return
db = SessionLocal()
record = RunRecord(instance_id=instance_id, started_at=datetime.utcnow(), steps_json="[]")
db.add(record)
db.commit()
steps = []
ok = True
try:
inst_db = db.query(Instance).filter(Instance.id == instance_id).first()
if not inst_db: return
cfg = InstanceSchema.model_validate(json.loads(inst_db.config_json))
if cfg.prod_mysql and cfg.dr_mysql:
try:
mysql_sync(cfg.prod_mysql, cfg.dr_mysql)
steps.append({"name": "MySQL", "ok": True})
except Exception as e:
steps.append({"name": "MySQL", "ok": False, "error": str(e)})
ok = False
if ok and cfg.prod_redis and cfg.dr_redis:
try:
keys = redis_sync(cfg.prod_redis, cfg.dr_redis)
steps.append({"name": "Redis", "ok": True, "count": keys})
except Exception as e:
steps.append({"name": "Redis", "ok": False, "error": str(e)})
ok = False
if ok and cfg.prod_minio and cfg.dr_minio and cfg.minio_buckets:
try:
files = minio_sync(cfg.prod_minio, cfg.dr_minio, cfg.minio_buckets)
steps.append({"name": "MinIO", "ok": True, "count": files})
except Exception as e:
steps.append({"name": "MinIO", "ok": False, "error": str(e)})
ok = False
except Exception as e:
ok = False
steps.append({"name": "System", "ok": False, "error": str(e)})
finally:
record.finished_at = datetime.utcnow()
record.ok = ok
record.steps_json = json.dumps(steps)
db.commit()
db.close()
instance_locks[instance_id].release()
def reschedule_jobs():
scheduler.remove_all_jobs()
db = SessionLocal()
instances = db.query(Instance).filter(Instance.enabled == True).all()
for inst in instances:
scheduler.add_job(run_instance_task, CronTrigger(hour=inst.sync_hour, minute=0), args=[inst.id], id=inst.id)
db.close()
# --- API ---
app = FastAPI(title="DataKeep")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
@app.on_event("startup")
def startup():
reschedule_jobs()
scheduler.start()
@app.get("/api/instances", response_model=List[InstanceSchema])
def list_instances(db: Session = Depends(get_db)):
insts = db.query(Instance).all()
return [InstanceSchema.model_validate(json.loads(i.config_json)) for i in insts]
@app.post("/api/instances")
def create_instance(inst: InstanceSchema, db: Session = Depends(get_db)):
db_inst = Instance(id=inst.id, name=inst.name, enabled=inst.enabled, sync_hour=inst.sync_hour, config_json=inst.model_dump_json())
db.add(db_inst)
db.commit()
reschedule_jobs()
return {"status": "ok"}
@app.put("/api/instances/{id}")
def update_instance(id: str, inst: InstanceSchema, db: Session = Depends(get_db)):
db_inst = db.query(Instance).filter(Instance.id == id).first()
if not db_inst: raise HTTPException(404)
db_inst.name = inst.name
db_inst.enabled = inst.enabled
db_inst.sync_hour = inst.sync_hour
db_inst.config_json = inst.model_dump_json()
db.commit()
reschedule_jobs()
return {"status": "ok"}
@app.delete("/api/instances/{id}")
def delete_instance(id: str, db: Session = Depends(get_db)):
db.query(Instance).filter(Instance.id == id).delete()
db.commit()
reschedule_jobs()
return {"status": "ok"}
@app.get("/api/instances/{id}/runs")
def get_runs(id: str, db: Session = Depends(get_db)):
runs = db.query(RunRecord).filter(RunRecord.instance_id == id).order_by(RunRecord.started_at.desc()).limit(20).all()
return [{"id": r.id, "started_at": r.started_at, "finished_at": r.finished_at, "ok": r.ok, "steps": json.loads(r.steps_json)} for r in runs]
@app.post("/api/instances/{id}/run")
def trigger_run(id: str, bg: BackgroundTasks):
bg.add_task(run_instance_task, id)
return {"status": "triggered"}
@app.get("/api/health")
def health(): return {"status": "ok"}