# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """SQLite 数据库自动备份:打包 futures.db 与 uploads,可在其他服务器恢复。""" from __future__ import annotations import json import logging import os import re import shutil import sqlite3 import tarfile import tempfile import threading import time from datetime import datetime from pathlib import Path from typing import Callable, Optional from zoneinfo import ZoneInfo from db_conn import DB_PATH logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") BACKUP_FILENAME_RE = re.compile(r"^qihuo_backup_\d{8}_\d{6}\.tar\.gz$") BACKUP_LAST_KEY = "backup_last_at" BACKUP_KEEP_KEY = "backup_keep_count" BACKUP_AUTO_KEY = "backup_auto_enabled" BACKUP_HOUR_KEY = "backup_auto_hour" DEFAULT_KEEP_COUNT = 30 DEFAULT_AUTO_HOUR = 3 CHECK_INTERVAL_SEC = 3600 _backup_lock = threading.Lock() RESTORE_MD = """# qihuo 备份恢复说明 本压缩包由 qihuo 系统自动生成,可在另一台 Linux 服务器上恢复交易数据。 ## 包内文件 | 文件/目录 | 说明 | |-----------|------| | `futures.db` | SQLite 主库(账号、交易记录、设置等) | | `uploads/` | 复盘截图与 K 线图(若备份时存在) | | `manifest.json` | 备份元数据 | | `restore.sh` | 一键恢复脚本 | ## 快速恢复(推荐) 1. 将本压缩包上传到目标服务器(例如 `/root/`) 2. 解压并执行恢复脚本: ```bash cd /root tar -xzf qihuo_backup_YYYYMMDD_HHMMSS.tar.gz cd qihuo_backup_YYYYMMDD_HHMMSS chmod +x restore.sh ./restore.sh ``` 默认恢复到 **`/root/qihuo`**。指定目录: ```bash RESTORE_DIR=/opt/qihuo ./restore.sh ``` 3. 在新服务器部署 qihuo 代码与 Python 环境(见 `docs/DEPLOY.md`) 4. 若恢复到 `/opt/qihuo`,将生成的 `futures.db`、`uploads/` 放入该目录 5. 配置 `.env`(CTP 账号、SECRET_KEY 等),**不要**直接复制旧 `.env` 到公网环境 6. 重启服务:`pm2 restart qihuo` ## 手工恢复 ```bash mkdir -p /root/qihuo/uploads cp futures.db /root/qihuo/futures.db cp -a uploads/. /root/qihuo/uploads/ # 若有 uploads 目录 ``` ## 注意 - 恢复前请停止 qihuo 进程,避免覆盖正在使用的数据库 - 恢复后首次启动会自动执行数据库迁移,一般无需手工改表 - `.env` 含敏感信息,请单独安全传输,不要放入公开网盘 """ def _app_root() -> Path: return Path(os.path.dirname(os.path.abspath(__file__))) def default_backup_dir() -> str: env = (os.getenv("QIHUO_BACKUP_DIR") or "").strip() if env: return env if os.name == "nt": return str(_app_root() / "qihuo_backup") return "/root/qihuo_backup" def default_restore_dir() -> str: env = (os.getenv("QIHUO_RESTORE_DIR") or "").strip() if env: return env if os.name == "nt": return str(_app_root()) return "/root/qihuo" def backup_dir() -> Path: path = Path(default_backup_dir()) path.mkdir(parents=True, exist_ok=True) return path def backup_in_progress() -> bool: return _backup_lock.locked() def get_backup_last_at(get_setting: Callable[[str, str], str]) -> str: return (get_setting(BACKUP_LAST_KEY, "") or "").strip() def _backup_sqlite(src_path: str, dst_path: str) -> None: src = sqlite3.connect(src_path, timeout=30) try: try: src.execute("PRAGMA wal_checkpoint(TRUNCATE)") except sqlite3.OperationalError: pass dst = sqlite3.connect(dst_path) try: src.backup(dst) dst.commit() finally: dst.close() finally: src.close() def _write_restore_script(dest: Path, folder_name: str) -> None: script = f"""#!/bin/bash set -euo pipefail RESTORE_DIR="${{RESTORE_DIR:-{default_restore_dir()}}}" SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" mkdir -p "$RESTORE_DIR/uploads" if [ -f "$SCRIPT_DIR/futures.db" ]; then cp -f "$SCRIPT_DIR/futures.db" "$RESTORE_DIR/futures.db" echo "已复制 futures.db -> $RESTORE_DIR/futures.db" fi if [ -d "$SCRIPT_DIR/uploads" ]; then cp -a "$SCRIPT_DIR/uploads/." "$RESTORE_DIR/uploads/" echo "已复制 uploads -> $RESTORE_DIR/uploads/" fi echo "" echo "恢复完成。目标目录: $RESTORE_DIR" echo "下一步: 部署 qihuo 代码、配置 .env、pm2 restart qihuo" echo "详见 RESTORE.md 与 docs/BACKUP.md" """ dest.write_text(script, encoding="utf-8") def create_backup(*, include_uploads: bool = True) -> tuple[str, str]: """创建 tar.gz 备份,返回 (文件名, 说明)。""" if not os.path.isfile(DB_PATH): raise FileNotFoundError(f"数据库不存在: {DB_PATH}") with _backup_lock: stamp = datetime.now(TZ).strftime("%Y%m%d_%H%M%S") folder_name = f"qihuo_backup_{stamp}" filename = f"{folder_name}.tar.gz" out_path = backup_dir() / filename app_root = _app_root() upload_src = app_root / "uploads" with tempfile.TemporaryDirectory(prefix="qihuo_bak_") as tmp: work = Path(tmp) / folder_name work.mkdir() _backup_sqlite(DB_PATH, str(work / "futures.db")) if include_uploads and upload_src.is_dir(): shutil.copytree(upload_src, work / "uploads", dirs_exist_ok=True) manifest = { "app": "qihuo", "created_at": datetime.now(TZ).isoformat(timespec="seconds"), "db_path": DB_PATH, "includes_uploads": include_uploads and upload_src.is_dir(), "default_restore_dir": default_restore_dir(), "files": sorted(p.name for p in work.iterdir()), } (work / "manifest.json").write_text( json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8", ) (work / "RESTORE.md").write_text(RESTORE_MD, encoding="utf-8") _write_restore_script(work / "restore.sh", folder_name) with tarfile.open(out_path, "w:gz") as tar: tar.add(work, arcname=folder_name) size_mb = out_path.stat().st_size / (1024 * 1024) return filename, f"备份已生成 {filename}({size_mb:.2f} MB)" def list_backups() -> list[dict]: items: list[dict] = [] for path in sorted(backup_dir().glob("qihuo_backup_*.tar.gz"), reverse=True): if not BACKUP_FILENAME_RE.match(path.name): continue stat = path.stat() items.append( { "name": path.name, "size": stat.st_size, "size_mb": round(stat.st_size / (1024 * 1024), 2), "mtime": datetime.fromtimestamp(stat.st_mtime, TZ).isoformat(timespec="seconds"), } ) return items def resolve_backup_file(filename: str) -> Path: name = (filename or "").strip() if not BACKUP_FILENAME_RE.match(name): raise ValueError("无效的备份文件名") path = (backup_dir() / name).resolve() root = backup_dir().resolve() if not str(path).startswith(str(root) + os.sep) and path != root: raise ValueError("无效的备份路径") if not path.is_file(): raise FileNotFoundError("备份文件不存在") return path def prune_old_backups(keep: int) -> int: keep_n = max(1, int(keep or DEFAULT_KEEP_COUNT)) files = list_backups() removed = 0 for item in files[keep_n:]: try: resolve_backup_file(item["name"]).unlink() removed += 1 except Exception as exc: logger.warning("prune backup %s: %s", item["name"], exc) return removed def run_backup_job( *, get_setting: Callable[[str, str], str], set_setting: Callable[[str, str], None], include_uploads: bool = True, ) -> tuple[str, str]: keep = DEFAULT_KEEP_COUNT try: keep = max(5, min(200, int(get_setting(BACKUP_KEEP_KEY, str(DEFAULT_KEEP_COUNT)) or DEFAULT_KEEP_COUNT))) except ValueError: pass filename, msg = create_backup(include_uploads=include_uploads) set_setting(BACKUP_LAST_KEY, datetime.now(TZ).isoformat(timespec="seconds")) removed = prune_old_backups(keep) if removed: msg = f"{msg},已清理 {removed} 个旧备份" return filename, msg def schedule_backup( *, get_setting: Callable[[str, str], str], set_setting: Callable[[str, str], None], include_uploads: bool = True, ) -> tuple[bool, str]: if _backup_lock.locked(): return False, "备份进行中,请稍后再试" def _run() -> None: try: run_backup_job( get_setting=get_setting, set_setting=set_setting, include_uploads=include_uploads, ) except Exception as exc: logger.exception("backup failed: %s", exc) threading.Thread(target=_run, daemon=True, name="qihuo-backup-run").start() return True, "已在后台开始备份,请稍后刷新本页查看" def _should_auto_backup(get_setting: Callable[[str, str], str]) -> bool: if (get_setting(BACKUP_AUTO_KEY, "1") or "1").strip() not in ("1", "true", "yes"): return False try: hour = int(get_setting(BACKUP_HOUR_KEY, str(DEFAULT_AUTO_HOUR)) or DEFAULT_AUTO_HOUR) except ValueError: hour = DEFAULT_AUTO_HOUR hour = max(0, min(23, hour)) now = datetime.now(TZ) if now.hour != hour: return False last = get_backup_last_at(get_setting) if last and last[:10] == now.date().isoformat(): return False return True def start_backup_worker( *, get_setting_fn: Callable[[str, str], str], set_setting_fn: Callable[[str, str], None], interval: int = CHECK_INTERVAL_SEC, ) -> None: """后台线程:按设定小时每日自动备份。""" def _loop() -> None: time.sleep(30) while True: try: if _should_auto_backup(get_setting_fn): filename, msg = run_backup_job( get_setting=get_setting_fn, set_setting=set_setting_fn, include_uploads=True, ) logger.info("auto backup: %s — %s", filename, msg) except Exception as exc: logger.warning("backup worker: %s", exc) time.sleep(max(300, interval)) threading.Thread(target=_loop, daemon=True, name="qihuo-backup-worker").start()