diff --git a/app.py b/app.py
index f73cb7e..22e51dd 100644
--- a/app.py
+++ b/app.py
@@ -92,6 +92,7 @@ app = Flask(
static_folder=os.path.join(ROOT, "modules", "web", "static"),
)
app.secret_key = os.getenv("SECRET_KEY", "futures_monitor_default_secret")
+app.config["MAX_CONTENT_LENGTH"] = int(os.getenv("MAX_BACKUP_UPLOAD_MB", "500")) * 1024 * 1024
HOST = os.getenv("HOST", "0.0.0.0")
PORT = int(os.getenv("PORT", "6600"))
diff --git a/modules/backup/db_backup.py b/modules/backup/db_backup.py
index f0d0f7b..f15373a 100644
--- a/modules/backup/db_backup.py
+++ b/modules/backup/db_backup.py
@@ -13,13 +13,14 @@ import re
import shutil
import sqlite3
import subprocess
+import sys
import tarfile
import tempfile
import threading
import time
from datetime import datetime
from pathlib import Path
-from typing import Callable, Optional
+from typing import Any, Callable, IO, Optional
from zoneinfo import ZoneInfo
from modules.core.db_conn import DB_PATH, db_backend
@@ -36,6 +37,8 @@ DEFAULT_KEEP_COUNT = 30
DEFAULT_AUTO_HOUR = 3
CHECK_INTERVAL_SEC = 3600
_backup_lock = threading.Lock()
+RESTORE_STATUS_FILE = "restore_status.json"
+RESTORE_CONFIRM_TOKEN = "RESTORE"
RESTORE_MD = """# qihuo 备份恢复说明
@@ -130,6 +133,14 @@ def default_restore_dir() -> str:
return "/root/qihuo"
+def restore_target_dir() -> Path:
+ """Web/API 恢复目标目录,默认当前应用根目录。"""
+ env = (os.getenv("QIHUO_RESTORE_DIR") or "").strip()
+ if env:
+ return Path(env)
+ return _app_root()
+
+
def backup_dir() -> Path:
path = Path(default_backup_dir())
path.mkdir(parents=True, exist_ok=True)
@@ -140,6 +151,402 @@ def backup_in_progress() -> bool:
return _backup_lock.locked()
+def _restore_status_path() -> Path:
+ from modules.core.paths import DATA_DIR
+
+ DATA_DIR.mkdir(parents=True, exist_ok=True)
+ return DATA_DIR / RESTORE_STATUS_FILE
+
+
+def _write_restore_status(state: str, message: str = "", **extra: Any) -> None:
+ payload = {
+ "state": state,
+ "message": message,
+ "updated_at": datetime.now(TZ).isoformat(timespec="seconds"),
+ }
+ payload.update(extra)
+ _restore_status_path().write_text(
+ json.dumps(payload, ensure_ascii=False, indent=2),
+ encoding="utf-8",
+ )
+
+
+def get_restore_status() -> dict:
+ path = _restore_status_path()
+ if not path.is_file():
+ return {"state": "idle", "message": "", "updated_at": ""}
+ try:
+ data = json.loads(path.read_text(encoding="utf-8"))
+ if isinstance(data, dict):
+ data.setdefault("state", "idle")
+ data.setdefault("message", "")
+ return data
+ except Exception:
+ pass
+ return {"state": "idle", "message": "", "updated_at": ""}
+
+
+def restore_in_progress() -> bool:
+ return get_restore_status().get("state") in ("pending", "running")
+
+
+def _manifest_root_prefix(tar: tarfile.TarFile) -> str:
+ for member in tar.getmembers():
+ name = member.name.rstrip("/")
+ if name.endswith("/manifest.json") or name == "manifest.json":
+ if name == "manifest.json":
+ return ""
+ return name[: -len("/manifest.json")]
+ raise ValueError("备份包缺少 manifest.json")
+
+
+def _read_manifest_from_tar(tar: tarfile.TarFile) -> dict:
+ root = _manifest_root_prefix(tar)
+ manifest_name = f"{root}/manifest.json" if root else "manifest.json"
+ try:
+ member = tar.getmember(manifest_name)
+ except KeyError as exc:
+ raise ValueError("备份包缺少 manifest.json") from exc
+ extracted = tar.extractfile(member)
+ if not extracted:
+ raise ValueError("无法读取 manifest.json")
+ data = json.loads(extracted.read().decode("utf-8"))
+ if not isinstance(data, dict):
+ raise ValueError("manifest.json 格式无效")
+ return data
+
+
+def _validate_manifest(manifest: dict, *, current_backend: str | None = None) -> str:
+ if manifest.get("app") != "qihuo":
+ raise ValueError("不是有效的 qihuo 备份包")
+ backend = (manifest.get("backend") or "").strip()
+ if backend not in ("sqlite", "postgres"):
+ raise ValueError("manifest 缺少或无效的数据库类型")
+ if current_backend and backend != current_backend:
+ label = "PostgreSQL" if backend == "postgres" else "SQLite"
+ cur = "PostgreSQL" if current_backend == "postgres" else "SQLite"
+ raise ValueError(f"备份为 {label},当前服务为 {cur},无法恢复")
+ return backend
+
+
+def _member_exists(tar: tarfile.TarFile, root: str, name: str) -> bool:
+ candidates = [name]
+ if root:
+ candidates.append(f"{root}/{name}")
+ return any(_tar_has_member(tar, path) for path in candidates)
+
+
+def _tar_has_member(tar: tarfile.TarFile, path: str) -> bool:
+ try:
+ tar.getmember(path)
+ return True
+ except KeyError:
+ return False
+
+
+def _validate_archive_contents(tar: tarfile.TarFile, manifest: dict, root: str) -> None:
+ backend = manifest["backend"]
+ if backend == "sqlite":
+ if not _member_exists(tar, root, "futures.db"):
+ raise ValueError("SQLite 备份缺少 futures.db")
+ elif not _member_exists(tar, root, "postgres_dump.sql"):
+ raise ValueError("PostgreSQL 备份缺少 postgres_dump.sql")
+
+
+def _manifest_preview(manifest: dict, path: Path) -> dict:
+ backend = manifest.get("backend", "")
+ stat = path.stat()
+ created_at = (manifest.get("created_at") or "").strip()
+ return {
+ "name": path.name,
+ "backend": backend,
+ "backend_label": "PostgreSQL" if backend == "postgres" else "SQLite",
+ "created_at": created_at,
+ "includes_uploads": bool(manifest.get("includes_uploads")),
+ "includes_env": bool(manifest.get("includes_env")),
+ "env_restore_path": (manifest.get("env_restore_path") or "").strip(),
+ "size": stat.st_size,
+ "size_mb": round(stat.st_size / (1024 * 1024), 2),
+ "mtime": datetime.fromtimestamp(stat.st_mtime, TZ).isoformat(timespec="seconds"),
+ }
+
+
+def peek_manifest(path: Path) -> dict:
+ with tarfile.open(path, "r:gz") as tar:
+ return _read_manifest_from_tar(tar)
+
+
+def inspect_backup_archive(path: Path, *, check_backend: bool = True) -> dict:
+ with tarfile.open(path, "r:gz") as tar:
+ manifest = _read_manifest_from_tar(tar)
+ current = db_backend() if check_backend else None
+ _validate_manifest(manifest, current_backend=current)
+ root = _manifest_root_prefix(tar)
+ _validate_archive_contents(tar, manifest, root)
+ return _manifest_preview(manifest, path)
+
+
+def _allocate_backup_filename(manifest: dict, preferred: str = "") -> str:
+ preferred = (preferred or "").strip()
+ if preferred and BACKUP_FILENAME_RE.match(preferred):
+ candidate = backup_dir() / preferred
+ if not candidate.exists():
+ return preferred
+ created = (manifest.get("created_at") or "").strip()
+ stamp = ""
+ if created:
+ try:
+ stamp = datetime.fromisoformat(created).strftime("%Y%m%d_%H%M%S")
+ except ValueError:
+ stamp = ""
+ if not stamp:
+ stamp = datetime.now(TZ).strftime("%Y%m%d_%H%M%S")
+ name = f"qihuo_backup_{stamp}.tar.gz"
+ if not (backup_dir() / name).exists():
+ return name
+ stamp = datetime.now(TZ).strftime("%Y%m%d_%H%M%S")
+ return f"qihuo_backup_{stamp}.tar.gz"
+
+
+def save_uploaded_backup(stream: IO[bytes], original_filename: str = "") -> tuple[str, dict]:
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".tar.gz") as tmp:
+ shutil.copyfileobj(stream, tmp)
+ tmp_path = Path(tmp.name)
+ try:
+ info = inspect_backup_archive(tmp_path, check_backend=True)
+ manifest = peek_manifest(tmp_path)
+ filename = _allocate_backup_filename(manifest, original_filename)
+ dest = backup_dir() / filename
+ shutil.move(str(tmp_path), str(dest))
+ info["name"] = filename
+ return filename, info
+ except Exception:
+ tmp_path.unlink(missing_ok=True)
+ raise
+
+
+def _pm2_available() -> bool:
+ return shutil.which("pm2") is not None
+
+
+def _pm2_stop() -> None:
+ if not _pm2_available():
+ logger.warning("pm2 not found, skip stop")
+ return
+ proc = subprocess.run(
+ ["pm2", "stop", "qihuo"],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ if proc.returncode != 0:
+ logger.warning("pm2 stop qihuo: %s", proc.stderr.strip() or proc.stdout.strip())
+
+
+def _pm2_restart() -> None:
+ if not _pm2_available():
+ logger.warning("pm2 not found, skip restart")
+ return
+ proc = subprocess.run(
+ ["pm2", "restart", "qihuo"],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ if proc.returncode != 0:
+ proc = subprocess.run(
+ ["pm2", "start", "qihuo"],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ if proc.returncode != 0:
+ raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "pm2 restart 失败")
+
+
+def _extract_member_to_path(tar: tarfile.TarFile, member_name: str, dest: Path) -> None:
+ try:
+ member = tar.getmember(member_name)
+ except KeyError:
+ return
+ extracted = tar.extractfile(member)
+ if not extracted:
+ return
+ dest.parent.mkdir(parents=True, exist_ok=True)
+ with open(dest, "wb") as out:
+ shutil.copyfileobj(extracted, out)
+
+
+def _restore_uploads_dir(tar: tarfile.TarFile, root: str, restore_dir: Path) -> None:
+ prefix = f"{root}/uploads" if root else "uploads"
+ uploads_dest = restore_dir / "uploads"
+ uploads_dest.mkdir(parents=True, exist_ok=True)
+ found = False
+ for member in tar.getmembers():
+ if member.name == prefix or member.name.startswith(prefix + "/"):
+ found = True
+ rel = member.name[len(prefix) :].lstrip("/")
+ if not rel:
+ continue
+ target = uploads_dest / rel
+ if member.isdir():
+ target.mkdir(parents=True, exist_ok=True)
+ else:
+ target.parent.mkdir(parents=True, exist_ok=True)
+ extracted = tar.extractfile(member)
+ if extracted:
+ with open(target, "wb") as out:
+ shutil.copyfileobj(extracted, out)
+ if not found:
+ logger.info("backup has no uploads/")
+
+
+def _reload_env_file(env_path: Path) -> None:
+ if not env_path.is_file():
+ return
+ try:
+ from dotenv import load_dotenv
+
+ load_dotenv(str(env_path), override=True)
+ except Exception as exc:
+ logger.warning("reload .env failed: %s", exc)
+
+
+def _restore_postgres_dump(dump_path: Path) -> None:
+ url = (os.getenv("DATABASE_URL") or "").strip()
+ if not url:
+ raise RuntimeError("PostgreSQL 恢复需要 DATABASE_URL(请先恢复 .env 或检查环境变量)")
+ if not shutil.which("psql"):
+ raise RuntimeError("未找到 psql,请先安装 PostgreSQL 客户端")
+ proc = subprocess.run(
+ ["psql", url, "-f", str(dump_path)],
+ capture_output=True,
+ text=True,
+ check=False,
+ )
+ if proc.returncode != 0:
+ raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "psql 导入失败")
+
+
+def _perform_restore(archive_path: Path, restore_dir: Path) -> dict:
+ restore_dir.mkdir(parents=True, exist_ok=True)
+ with tempfile.TemporaryDirectory(prefix="qihuo_restore_") as tmp:
+ work = Path(tmp)
+ with tarfile.open(archive_path, "r:gz") as tar:
+ manifest = _read_manifest_from_tar(tar)
+ backend = _validate_manifest(manifest, current_backend=db_backend())
+ root = _manifest_root_prefix(tar)
+ _validate_archive_contents(tar, manifest, root)
+
+ env_member = f"{root}/.env" if root else ".env"
+ env_restore_path = (manifest.get("env_restore_path") or "config/.env").strip()
+ if manifest.get("includes_env") and _tar_has_member(tar, env_member):
+ env_dest = restore_dir / env_restore_path
+ _extract_member_to_path(tar, env_member, env_dest)
+ _reload_env_file(env_dest)
+
+ if backend == "sqlite":
+ db_member = f"{root}/futures.db" if root else "futures.db"
+ db_dest = Path(DB_PATH)
+ if not db_dest.is_absolute():
+ db_dest = restore_dir / db_dest.name
+ _extract_member_to_path(tar, db_member, db_dest)
+ else:
+ dump_member = f"{root}/postgres_dump.sql" if root else "postgres_dump.sql"
+ dump_path = work / "postgres_dump.sql"
+ _extract_member_to_path(tar, dump_member, dump_path)
+ _restore_postgres_dump(dump_path)
+
+ _restore_uploads_dir(tar, root, restore_dir)
+
+ return {
+ "backend": backend,
+ "restore_dir": str(restore_dir),
+ "includes_env": bool(manifest.get("includes_env")),
+ "includes_uploads": bool(manifest.get("includes_uploads")),
+ }
+
+
+def run_restore_job(archive_path: Path) -> None:
+ filename = archive_path.name
+ restore_dir = restore_target_dir()
+ try:
+ _write_restore_status(
+ "running",
+ "正在停止服务…",
+ filename=filename,
+ step="stop",
+ restore_dir=str(restore_dir),
+ )
+ _pm2_stop()
+
+ _write_restore_status(
+ "running",
+ "正在恢复数据…",
+ filename=filename,
+ step="restore",
+ restore_dir=str(restore_dir),
+ )
+ summary = _perform_restore(archive_path.resolve(), restore_dir)
+
+ _write_restore_status(
+ "running",
+ "正在重启服务…",
+ filename=filename,
+ step="restart",
+ restore_dir=str(restore_dir),
+ )
+ _pm2_restart()
+
+ _write_restore_status(
+ "done",
+ "恢复完成,服务已重启",
+ filename=filename,
+ restore_dir=str(restore_dir),
+ summary=summary,
+ )
+ except Exception as exc:
+ logger.exception("restore failed: %s", exc)
+ _write_restore_status(
+ "error",
+ str(exc),
+ filename=filename,
+ restore_dir=str(restore_dir),
+ )
+ try:
+ _pm2_restart()
+ except Exception as restart_exc:
+ logger.warning("restart after restore error: %s", restart_exc)
+
+
+def schedule_restore(filename: str) -> tuple[bool, str]:
+ if _backup_lock.locked():
+ return False, "备份进行中,请稍后再试"
+ if restore_in_progress():
+ return False, "恢复进行中,请稍后再试"
+ try:
+ path = resolve_backup_file(filename)
+ inspect_backup_archive(path, check_backend=True)
+ except (ValueError, FileNotFoundError) as exc:
+ return False, str(exc)
+
+ _write_restore_status(
+ "pending",
+ "恢复任务已提交…",
+ filename=filename,
+ restore_dir=str(restore_target_dir()),
+ )
+ script = Path(__file__).resolve().parent / "restore_job.py"
+ subprocess.Popen(
+ [sys.executable, str(script), str(path.resolve())],
+ start_new_session=True,
+ cwd=str(_app_root()),
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ return True, "恢复已开始,服务将短暂中断后自动重启"
+
+
def get_backup_last_at(get_setting: Callable[[str, str], str]) -> str:
return (get_setting(BACKUP_LAST_KEY, "") or "").strip()
@@ -316,20 +723,36 @@ def create_backup(*, include_uploads: bool = True) -> tuple[str, str]:
return filename, f"备份已生成 {filename}({label},{size_mb:.2f} MB)"
-def list_backups() -> list[dict]:
+def list_backups(*, with_manifest: bool = True) -> list[dict]:
items: list[dict] = []
for path in sorted(backup_dir().glob("qihuo_backup_*.tar.gz"), reverse=True):
if not BACKUP_FILENAME_RE.match(path.name):
continue
stat = path.stat()
- items.append(
- {
- "name": path.name,
- "size": stat.st_size,
- "size_mb": round(stat.st_size / (1024 * 1024), 2),
- "mtime": datetime.fromtimestamp(stat.st_mtime, TZ).isoformat(timespec="seconds"),
- }
- )
+ item = {
+ "name": path.name,
+ "size": stat.st_size,
+ "size_mb": round(stat.st_size / (1024 * 1024), 2),
+ "mtime": datetime.fromtimestamp(stat.st_mtime, TZ).isoformat(timespec="seconds"),
+ "backend": "",
+ "backend_label": "",
+ "created_at": "",
+ "includes_env": False,
+ "includes_uploads": False,
+ }
+ if with_manifest:
+ try:
+ manifest = peek_manifest(path)
+ item["backend"] = manifest.get("backend", "")
+ item["backend_label"] = (
+ "PostgreSQL" if manifest.get("backend") == "postgres" else "SQLite"
+ )
+ item["created_at"] = (manifest.get("created_at") or "").strip()
+ item["includes_env"] = bool(manifest.get("includes_env"))
+ item["includes_uploads"] = bool(manifest.get("includes_uploads"))
+ except Exception as exc:
+ logger.debug("read manifest %s: %s", path.name, exc)
+ items.append(item)
return items
@@ -386,6 +809,8 @@ def schedule_backup(
) -> tuple[bool, str]:
if _backup_lock.locked():
return False, "备份进行中,请稍后再试"
+ if restore_in_progress():
+ return False, "恢复进行中,请稍后再试"
def _run() -> None:
try:
diff --git a/modules/backup/restore_job.py b/modules/backup/restore_job.py
new file mode 100644
index 0000000..470b2b9
--- /dev/null
+++ b/modules/backup/restore_job.py
@@ -0,0 +1,26 @@
+# Copyright (c) 2025-2026 马建军. All rights reserved.
+"""Detached restore worker — survives pm2 stop of the parent web process."""
+
+from __future__ import annotations
+
+import sys
+from pathlib import Path
+
+
+def main(argv: list[str] | None = None) -> int:
+ args = argv if argv is not None else sys.argv[1:]
+ if len(args) != 1:
+ print("usage: python -m modules.backup.restore_job {{ backup_dir }}
{% if backup_last_at %} · 上次备份 {{ backup_last_at.replace('T', ' ') }}{% else %} · 尚未备份{% endif %}
{% if backup_running %} · 备份进行中…{% endif %}
+ {% if backup_restore_running %} · 恢复进行中…{% endif %}