"""中控备份与恢复:四所 SQLite、K 线库、env、hub JSON。""" from __future__ import annotations import json import os import re import shutil import subprocess import tempfile import zipfile from datetime import datetime, timedelta from pathlib import Path from typing import Any, Callable, Optional from zoneinfo import ZoneInfo from lib.paths import REPO_ROOT, hub_data_dir, manual_trading_hub_dir HUB_DIR = manual_trading_hub_dir() TZ_NAME = (os.getenv("HUB_BACKUP_TZ") or "Asia/Shanghai").strip() or "Asia/Shanghai" EXCHANGE_DIRS: list[tuple[str, str]] = [ ("binance", "crypto_monitor_binance"), ("okx", "crypto_monitor_okx"), ("gate", "crypto_monitor_gate"), ("gate_bot", "crypto_monitor_gate_bot"), ] HUB_JSON_FILES = ( "hub_settings.json", "hub_fund_history.json", "hub_ai_summaries.json", "hub_ai_chat.json", "hub_supervisor_state.json", ) HUB_DATA_FILES = ( "hub_kline.db", "hub_symbol_archive.db", "hub_entry_plans.db", "hub_macro_calendar.db", "hub_volume_rank.json", ) DEFAULT_BACKUP_SETTINGS = { "auto_enabled": True, "auto_hour": 0, "retention_days": 30, "include_env": True, "include_exchange_images": False, "backup_root": "", } BACKUP_STATE_PATH = HUB_DIR / "hub_backup_state.json" def normalize_backup_settings(raw: dict | None) -> dict: out = dict(DEFAULT_BACKUP_SETTINGS) if isinstance(raw, dict): for key in DEFAULT_BACKUP_SETTINGS: if key in raw: out[key] = raw[key] try: out["auto_hour"] = max(0, min(23, int(out.get("auto_hour", 0)))) except (TypeError, ValueError): out["auto_hour"] = 0 try: out["retention_days"] = max(1, min(365, int(out.get("retention_days", 30)))) except (TypeError, ValueError): out["retention_days"] = 30 out["auto_enabled"] = bool(out.get("auto_enabled")) out["include_env"] = bool(out.get("include_env", True)) out["include_exchange_images"] = bool(out.get("include_exchange_images")) out["backup_root"] = str(out.get("backup_root") or "").strip() return out def backup_root(settings: dict | None = None) -> Path: cfg = normalize_backup_settings((settings or {}).get("backup") if settings else None) raw = cfg.get("backup_root") or (os.getenv("HUB_BACKUP_ROOT") or "").strip() if not raw: raw = (os.getenv("BACKUP_ROOT") or "/root/backups").strip() root = Path(raw).expanduser() if not root.is_absolute(): root = REPO_ROOT / root portal = root / "crypto_monitor_portal" portal.mkdir(parents=True, exist_ok=True) return portal def _now_local() -> datetime: try: return datetime.now(ZoneInfo(TZ_NAME)) except Exception: return datetime.now() def _read_env_var(env_path: Path, key: str, default: str = "") -> str: if not env_path.is_file(): return default try: for line in env_path.read_text(encoding="utf-8", errors="ignore").splitlines(): raw = line.strip() if not raw or raw.startswith("#") or "=" not in raw: continue k, v = raw.split("=", 1) if k.strip() == key: return v.strip().strip('"').strip("'") except Exception: pass return default def _resolve_project_path(project_dir: Path, rel: str) -> Path: p = Path(rel or "") if p.is_absolute(): return p return project_dir / p def _load_backup_state() -> dict: if not BACKUP_STATE_PATH.is_file(): return {} try: data = json.loads(BACKUP_STATE_PATH.read_text(encoding="utf-8")) return data if isinstance(data, dict) else {} except Exception: return {} def _save_backup_state(state: dict) -> None: BACKUP_STATE_PATH.write_text( json.dumps(state, ensure_ascii=False, indent=2), encoding="utf-8", ) def _safe_archive_name(name: str) -> bool: return bool(re.fullmatch(r"backup_[0-9]{4}-[0-9]{2}-[0-9]{2}_[0-9]{6}\.zip", name or "")) def _collect_targets( *, include_env: bool, include_exchange_images: bool, ) -> list[tuple[str, Path, str]]: """Return list of (archive_rel_path, source_path, kind).""" items: list[tuple[str, Path, str]] = [] if include_env: hub_env = HUB_DIR / ".env" if hub_env.is_file(): items.append(("hub/.env", hub_env, "env")) for name in HUB_JSON_FILES: src = HUB_DIR / name if src.is_file(): items.append((f"hub/{name}", src, "json")) data_dir = hub_data_dir() for name in HUB_DATA_FILES: src = data_dir / name if src.is_file(): items.append((f"hub/data/{name}", src, "sqlite" if name.endswith(".db") else "json")) for key, dirname in EXCHANGE_DIRS: proj = REPO_ROOT / dirname prefix = dirname env_path = proj / ".env" db_rel = "crypto.db" upload_rel = "static/images" if env_path.is_file(): db_rel = _read_env_var(env_path, "DB_PATH", "crypto.db") or "crypto.db" upload_rel = _read_env_var(env_path, "UPLOAD_DIR", "static/images") or "static/images" if include_env: items.append((f"{prefix}/.env", env_path, "env")) db_path = _resolve_project_path(proj, db_rel) if db_path.is_file(): items.append((f"{prefix}/{db_rel}", db_path, "sqlite")) if include_exchange_images: img_dir = _resolve_project_path(proj, upload_rel) if img_dir.is_dir(): for fp in sorted(img_dir.rglob("*")): if fp.is_file(): rel = fp.relative_to(proj).as_posix() items.append((f"{prefix}/{rel}", fp, "image")) return items def _write_manifest(staging: Path, trigger: str, files: list[dict]) -> None: manifest = { "version": 1, "created_at": _now_local().strftime("%Y-%m-%d %H:%M:%S"), "timezone": TZ_NAME, "trigger": trigger, "repo_root": str(REPO_ROOT), "files": files, } (staging / "manifest.json").write_text( json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8", ) def run_backup( *, trigger: str = "manual", settings: dict | None = None, log_fn: Callable[[str], None] | None = None, ) -> dict[str, Any]: cfg = normalize_backup_settings((settings or {}).get("backup") if settings else None) root = backup_root(settings) ts = _now_local().strftime("%Y-%m-%d_%H%M%S") archive_name = f"backup_{ts}.zip" archive_path = root / archive_name def log(msg: str) -> None: if log_fn: log_fn(msg) targets = _collect_targets( include_env=cfg["include_env"], include_exchange_images=cfg["include_exchange_images"], ) if not targets: return {"ok": False, "error": "没有可备份的文件"} file_meta: list[dict] = [] with tempfile.TemporaryDirectory(prefix="hub_backup_") as tmp: staging = Path(tmp) for arc_rel, src, kind in targets: dest = staging / arc_rel dest.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(src, dest) file_meta.append( { "path": arc_rel.replace("\\", "/"), "size": src.stat().st_size, "kind": kind, } ) _write_manifest(staging, trigger, file_meta) with zipfile.ZipFile(archive_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: for fp in sorted(staging.rglob("*")): if fp.is_file(): zf.write(fp, fp.relative_to(staging).as_posix()) size = archive_path.stat().st_size prune_old_backups(root, cfg["retention_days"]) state = _load_backup_state() if trigger == "auto": state["last_auto_day"] = _now_local().strftime("%Y-%m-%d") state["last_auto_at"] = _now_local().strftime("%Y-%m-%d %H:%M:%S") state["last_backup_at"] = _now_local().strftime("%Y-%m-%d %H:%M:%S") state["last_backup_file"] = archive_name state["last_trigger"] = trigger _save_backup_state(state) log(f"backup written: {archive_path}") return { "ok": True, "file": archive_name, "path": str(archive_path), "size": size, "file_count": len(file_meta), "trigger": trigger, } def prune_old_backups(root: Path, retention_days: int) -> int: if not root.is_dir(): return 0 cutoff = _now_local() - timedelta(days=max(1, retention_days)) removed = 0 for fp in root.glob("backup_*.zip"): try: mtime = datetime.fromtimestamp(fp.stat().st_mtime, tz=cutoff.tzinfo) except Exception: continue if mtime < cutoff: fp.unlink(missing_ok=True) removed += 1 return removed def list_backups(settings: dict | None = None) -> list[dict[str, Any]]: root = backup_root(settings) rows: list[dict[str, Any]] = [] if not root.is_dir(): return rows for fp in sorted(root.glob("backup_*.zip"), reverse=True): try: st = fp.stat() except OSError: continue rows.append( { "name": fp.name, "size": st.st_size, "modified_at": datetime.fromtimestamp(st.st_mtime).strftime("%Y-%m-%d %H:%M:%S"), } ) return rows def backup_status(settings: dict | None = None) -> dict[str, Any]: cfg = normalize_backup_settings((settings or {}).get("backup") if settings else None) state = _load_backup_state() root = backup_root(settings) return { "ok": True, "settings": cfg, "backup_root": str(root), "state": state, "backups": list_backups(settings)[:50], "timezone": TZ_NAME, } def _pm2_restart_all() -> dict[str, Any]: if os.name != "posix": return {"ok": False, "skipped": True, "reason": "non-posix"} try: proc = subprocess.run( ["pm2", "restart", "all"], capture_output=True, text=True, timeout=120, ) return { "ok": proc.returncode == 0, "returncode": proc.returncode, "stdout": (proc.stdout or "")[-2000:], "stderr": (proc.stderr or "")[-2000:], } except Exception as e: return {"ok": False, "error": str(e)} def restore_backup_archive( archive_path: Path, *, settings: dict | None = None, pre_backup: bool = True, restart_pm2: bool = True, ) -> dict[str, Any]: if not archive_path.is_file(): return {"ok": False, "error": "备份文件不存在"} pre = None if pre_backup: pre = run_backup(trigger="pre_restore", settings=settings) restored: list[str] = [] skipped: list[str] = [] with tempfile.TemporaryDirectory(prefix="hub_restore_") as tmp: extract_dir = Path(tmp) with zipfile.ZipFile(archive_path, "r") as zf: zf.extractall(extract_dir) manifest_path = extract_dir / "manifest.json" if not manifest_path.is_file(): return {"ok": False, "error": "无效的备份包:缺少 manifest.json"} for fp in extract_dir.rglob("*"): if not fp.is_file() or fp.name == "manifest.json": continue rel = fp.relative_to(extract_dir).as_posix() parts = Path(rel).parts if parts[0] == "hub": if len(parts) >= 3 and parts[1] == "data": dest = hub_data_dir() / parts[-1] else: dest = HUB_DIR.joinpath(*parts[1:]) else: matched = False for _key, dirname in EXCHANGE_DIRS: if rel.startswith(dirname + "/"): dest = REPO_ROOT / rel matched = True break if not matched: skipped.append(rel) continue dest.parent.mkdir(parents=True, exist_ok=True) shutil.copy2(fp, dest) restored.append(rel) pm2 = _pm2_restart_all() if restart_pm2 else {"ok": False, "skipped": True} state = _load_backup_state() state["last_restore_at"] = _now_local().strftime("%Y-%m-%d %H:%M:%S") state["last_restore_from"] = archive_path.name _save_backup_state(state) return { "ok": True, "restored": restored, "skipped": skipped, "pre_backup": pre, "pm2": pm2, } def restore_backup_upload( content: bytes, filename: str, *, settings: dict | None = None, ) -> dict[str, Any]: if not content: return {"ok": False, "error": "空文件"} suffix = Path(filename or "").suffix.lower() if suffix != ".zip": return {"ok": False, "error": "仅支持 .zip 备份包"} with tempfile.NamedTemporaryFile(prefix="hub_restore_upload_", suffix=".zip", delete=False) as tf: tf.write(content) temp_path = Path(tf.name) try: return restore_backup_archive(temp_path, settings=settings) finally: temp_path.unlink(missing_ok=True) def resolve_backup_download(settings: dict | None, name: str) -> Optional[Path]: if not _safe_archive_name(name): return None fp = backup_root(settings) / name if fp.is_file(): return fp return None def should_run_auto_backup(settings: dict) -> bool: cfg = normalize_backup_settings(settings.get("backup")) if not cfg.get("auto_enabled"): return False now = _now_local() today = now.strftime("%Y-%m-%d") state = _load_backup_state() if state.get("last_auto_day") == today: return False if now.hour < int(cfg.get("auto_hour", 0)): return False return True def mark_auto_backup_done() -> None: state = _load_backup_state() state["last_auto_day"] = _now_local().strftime("%Y-%m-%d") state["last_auto_at"] = _now_local().strftime("%Y-%m-%d %H:%M:%S") _save_backup_state(state)