Add frontend backup upload and list-based restore with validation.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-02 16:03:18 +08:00
parent 481086eddc
commit 9379bc4f4f
7 changed files with 726 additions and 68 deletions
+435 -10
View File
@@ -13,13 +13,14 @@ import re
import shutil
import sqlite3
import subprocess
import sys
import tarfile
import tempfile
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Callable, Optional
from typing import Any, Callable, IO, Optional
from zoneinfo import ZoneInfo
from modules.core.db_conn import DB_PATH, db_backend
@@ -36,6 +37,8 @@ DEFAULT_KEEP_COUNT = 30
DEFAULT_AUTO_HOUR = 3
CHECK_INTERVAL_SEC = 3600
_backup_lock = threading.Lock()
RESTORE_STATUS_FILE = "restore_status.json"
RESTORE_CONFIRM_TOKEN = "RESTORE"
RESTORE_MD = """# qihuo 备份恢复说明
@@ -130,6 +133,14 @@ def default_restore_dir() -> str:
return "/root/qihuo"
def restore_target_dir() -> Path:
"""Web/API 恢复目标目录,默认当前应用根目录。"""
env = (os.getenv("QIHUO_RESTORE_DIR") or "").strip()
if env:
return Path(env)
return _app_root()
def backup_dir() -> Path:
path = Path(default_backup_dir())
path.mkdir(parents=True, exist_ok=True)
@@ -140,6 +151,402 @@ def backup_in_progress() -> bool:
return _backup_lock.locked()
def _restore_status_path() -> Path:
from modules.core.paths import DATA_DIR
DATA_DIR.mkdir(parents=True, exist_ok=True)
return DATA_DIR / RESTORE_STATUS_FILE
def _write_restore_status(state: str, message: str = "", **extra: Any) -> None:
payload = {
"state": state,
"message": message,
"updated_at": datetime.now(TZ).isoformat(timespec="seconds"),
}
payload.update(extra)
_restore_status_path().write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def get_restore_status() -> dict:
path = _restore_status_path()
if not path.is_file():
return {"state": "idle", "message": "", "updated_at": ""}
try:
data = json.loads(path.read_text(encoding="utf-8"))
if isinstance(data, dict):
data.setdefault("state", "idle")
data.setdefault("message", "")
return data
except Exception:
pass
return {"state": "idle", "message": "", "updated_at": ""}
def restore_in_progress() -> bool:
return get_restore_status().get("state") in ("pending", "running")
def _manifest_root_prefix(tar: tarfile.TarFile) -> str:
for member in tar.getmembers():
name = member.name.rstrip("/")
if name.endswith("/manifest.json") or name == "manifest.json":
if name == "manifest.json":
return ""
return name[: -len("/manifest.json")]
raise ValueError("备份包缺少 manifest.json")
def _read_manifest_from_tar(tar: tarfile.TarFile) -> dict:
root = _manifest_root_prefix(tar)
manifest_name = f"{root}/manifest.json" if root else "manifest.json"
try:
member = tar.getmember(manifest_name)
except KeyError as exc:
raise ValueError("备份包缺少 manifest.json") from exc
extracted = tar.extractfile(member)
if not extracted:
raise ValueError("无法读取 manifest.json")
data = json.loads(extracted.read().decode("utf-8"))
if not isinstance(data, dict):
raise ValueError("manifest.json 格式无效")
return data
def _validate_manifest(manifest: dict, *, current_backend: str | None = None) -> str:
if manifest.get("app") != "qihuo":
raise ValueError("不是有效的 qihuo 备份包")
backend = (manifest.get("backend") or "").strip()
if backend not in ("sqlite", "postgres"):
raise ValueError("manifest 缺少或无效的数据库类型")
if current_backend and backend != current_backend:
label = "PostgreSQL" if backend == "postgres" else "SQLite"
cur = "PostgreSQL" if current_backend == "postgres" else "SQLite"
raise ValueError(f"备份为 {label},当前服务为 {cur},无法恢复")
return backend
def _member_exists(tar: tarfile.TarFile, root: str, name: str) -> bool:
candidates = [name]
if root:
candidates.append(f"{root}/{name}")
return any(_tar_has_member(tar, path) for path in candidates)
def _tar_has_member(tar: tarfile.TarFile, path: str) -> bool:
try:
tar.getmember(path)
return True
except KeyError:
return False
def _validate_archive_contents(tar: tarfile.TarFile, manifest: dict, root: str) -> None:
backend = manifest["backend"]
if backend == "sqlite":
if not _member_exists(tar, root, "futures.db"):
raise ValueError("SQLite 备份缺少 futures.db")
elif not _member_exists(tar, root, "postgres_dump.sql"):
raise ValueError("PostgreSQL 备份缺少 postgres_dump.sql")
def _manifest_preview(manifest: dict, path: Path) -> dict:
backend = manifest.get("backend", "")
stat = path.stat()
created_at = (manifest.get("created_at") or "").strip()
return {
"name": path.name,
"backend": backend,
"backend_label": "PostgreSQL" if backend == "postgres" else "SQLite",
"created_at": created_at,
"includes_uploads": bool(manifest.get("includes_uploads")),
"includes_env": bool(manifest.get("includes_env")),
"env_restore_path": (manifest.get("env_restore_path") or "").strip(),
"size": stat.st_size,
"size_mb": round(stat.st_size / (1024 * 1024), 2),
"mtime": datetime.fromtimestamp(stat.st_mtime, TZ).isoformat(timespec="seconds"),
}
def peek_manifest(path: Path) -> dict:
with tarfile.open(path, "r:gz") as tar:
return _read_manifest_from_tar(tar)
def inspect_backup_archive(path: Path, *, check_backend: bool = True) -> dict:
with tarfile.open(path, "r:gz") as tar:
manifest = _read_manifest_from_tar(tar)
current = db_backend() if check_backend else None
_validate_manifest(manifest, current_backend=current)
root = _manifest_root_prefix(tar)
_validate_archive_contents(tar, manifest, root)
return _manifest_preview(manifest, path)
def _allocate_backup_filename(manifest: dict, preferred: str = "") -> str:
preferred = (preferred or "").strip()
if preferred and BACKUP_FILENAME_RE.match(preferred):
candidate = backup_dir() / preferred
if not candidate.exists():
return preferred
created = (manifest.get("created_at") or "").strip()
stamp = ""
if created:
try:
stamp = datetime.fromisoformat(created).strftime("%Y%m%d_%H%M%S")
except ValueError:
stamp = ""
if not stamp:
stamp = datetime.now(TZ).strftime("%Y%m%d_%H%M%S")
name = f"qihuo_backup_{stamp}.tar.gz"
if not (backup_dir() / name).exists():
return name
stamp = datetime.now(TZ).strftime("%Y%m%d_%H%M%S")
return f"qihuo_backup_{stamp}.tar.gz"
def save_uploaded_backup(stream: IO[bytes], original_filename: str = "") -> tuple[str, dict]:
with tempfile.NamedTemporaryFile(delete=False, suffix=".tar.gz") as tmp:
shutil.copyfileobj(stream, tmp)
tmp_path = Path(tmp.name)
try:
info = inspect_backup_archive(tmp_path, check_backend=True)
manifest = peek_manifest(tmp_path)
filename = _allocate_backup_filename(manifest, original_filename)
dest = backup_dir() / filename
shutil.move(str(tmp_path), str(dest))
info["name"] = filename
return filename, info
except Exception:
tmp_path.unlink(missing_ok=True)
raise
def _pm2_available() -> bool:
return shutil.which("pm2") is not None
def _pm2_stop() -> None:
if not _pm2_available():
logger.warning("pm2 not found, skip stop")
return
proc = subprocess.run(
["pm2", "stop", "qihuo"],
capture_output=True,
text=True,
check=False,
)
if proc.returncode != 0:
logger.warning("pm2 stop qihuo: %s", proc.stderr.strip() or proc.stdout.strip())
def _pm2_restart() -> None:
if not _pm2_available():
logger.warning("pm2 not found, skip restart")
return
proc = subprocess.run(
["pm2", "restart", "qihuo"],
capture_output=True,
text=True,
check=False,
)
if proc.returncode != 0:
proc = subprocess.run(
["pm2", "start", "qihuo"],
capture_output=True,
text=True,
check=False,
)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "pm2 restart 失败")
def _extract_member_to_path(tar: tarfile.TarFile, member_name: str, dest: Path) -> None:
try:
member = tar.getmember(member_name)
except KeyError:
return
extracted = tar.extractfile(member)
if not extracted:
return
dest.parent.mkdir(parents=True, exist_ok=True)
with open(dest, "wb") as out:
shutil.copyfileobj(extracted, out)
def _restore_uploads_dir(tar: tarfile.TarFile, root: str, restore_dir: Path) -> None:
prefix = f"{root}/uploads" if root else "uploads"
uploads_dest = restore_dir / "uploads"
uploads_dest.mkdir(parents=True, exist_ok=True)
found = False
for member in tar.getmembers():
if member.name == prefix or member.name.startswith(prefix + "/"):
found = True
rel = member.name[len(prefix) :].lstrip("/")
if not rel:
continue
target = uploads_dest / rel
if member.isdir():
target.mkdir(parents=True, exist_ok=True)
else:
target.parent.mkdir(parents=True, exist_ok=True)
extracted = tar.extractfile(member)
if extracted:
with open(target, "wb") as out:
shutil.copyfileobj(extracted, out)
if not found:
logger.info("backup has no uploads/")
def _reload_env_file(env_path: Path) -> None:
if not env_path.is_file():
return
try:
from dotenv import load_dotenv
load_dotenv(str(env_path), override=True)
except Exception as exc:
logger.warning("reload .env failed: %s", exc)
def _restore_postgres_dump(dump_path: Path) -> None:
url = (os.getenv("DATABASE_URL") or "").strip()
if not url:
raise RuntimeError("PostgreSQL 恢复需要 DATABASE_URL(请先恢复 .env 或检查环境变量)")
if not shutil.which("psql"):
raise RuntimeError("未找到 psql,请先安装 PostgreSQL 客户端")
proc = subprocess.run(
["psql", url, "-f", str(dump_path)],
capture_output=True,
text=True,
check=False,
)
if proc.returncode != 0:
raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or "psql 导入失败")
def _perform_restore(archive_path: Path, restore_dir: Path) -> dict:
restore_dir.mkdir(parents=True, exist_ok=True)
with tempfile.TemporaryDirectory(prefix="qihuo_restore_") as tmp:
work = Path(tmp)
with tarfile.open(archive_path, "r:gz") as tar:
manifest = _read_manifest_from_tar(tar)
backend = _validate_manifest(manifest, current_backend=db_backend())
root = _manifest_root_prefix(tar)
_validate_archive_contents(tar, manifest, root)
env_member = f"{root}/.env" if root else ".env"
env_restore_path = (manifest.get("env_restore_path") or "config/.env").strip()
if manifest.get("includes_env") and _tar_has_member(tar, env_member):
env_dest = restore_dir / env_restore_path
_extract_member_to_path(tar, env_member, env_dest)
_reload_env_file(env_dest)
if backend == "sqlite":
db_member = f"{root}/futures.db" if root else "futures.db"
db_dest = Path(DB_PATH)
if not db_dest.is_absolute():
db_dest = restore_dir / db_dest.name
_extract_member_to_path(tar, db_member, db_dest)
else:
dump_member = f"{root}/postgres_dump.sql" if root else "postgres_dump.sql"
dump_path = work / "postgres_dump.sql"
_extract_member_to_path(tar, dump_member, dump_path)
_restore_postgres_dump(dump_path)
_restore_uploads_dir(tar, root, restore_dir)
return {
"backend": backend,
"restore_dir": str(restore_dir),
"includes_env": bool(manifest.get("includes_env")),
"includes_uploads": bool(manifest.get("includes_uploads")),
}
def run_restore_job(archive_path: Path) -> None:
filename = archive_path.name
restore_dir = restore_target_dir()
try:
_write_restore_status(
"running",
"正在停止服务…",
filename=filename,
step="stop",
restore_dir=str(restore_dir),
)
_pm2_stop()
_write_restore_status(
"running",
"正在恢复数据…",
filename=filename,
step="restore",
restore_dir=str(restore_dir),
)
summary = _perform_restore(archive_path.resolve(), restore_dir)
_write_restore_status(
"running",
"正在重启服务…",
filename=filename,
step="restart",
restore_dir=str(restore_dir),
)
_pm2_restart()
_write_restore_status(
"done",
"恢复完成,服务已重启",
filename=filename,
restore_dir=str(restore_dir),
summary=summary,
)
except Exception as exc:
logger.exception("restore failed: %s", exc)
_write_restore_status(
"error",
str(exc),
filename=filename,
restore_dir=str(restore_dir),
)
try:
_pm2_restart()
except Exception as restart_exc:
logger.warning("restart after restore error: %s", restart_exc)
def schedule_restore(filename: str) -> tuple[bool, str]:
if _backup_lock.locked():
return False, "备份进行中,请稍后再试"
if restore_in_progress():
return False, "恢复进行中,请稍后再试"
try:
path = resolve_backup_file(filename)
inspect_backup_archive(path, check_backend=True)
except (ValueError, FileNotFoundError) as exc:
return False, str(exc)
_write_restore_status(
"pending",
"恢复任务已提交…",
filename=filename,
restore_dir=str(restore_target_dir()),
)
script = Path(__file__).resolve().parent / "restore_job.py"
subprocess.Popen(
[sys.executable, str(script), str(path.resolve())],
start_new_session=True,
cwd=str(_app_root()),
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return True, "恢复已开始,服务将短暂中断后自动重启"
def get_backup_last_at(get_setting: Callable[[str, str], str]) -> str:
return (get_setting(BACKUP_LAST_KEY, "") or "").strip()
@@ -316,20 +723,36 @@ def create_backup(*, include_uploads: bool = True) -> tuple[str, str]:
return filename, f"备份已生成 {filename}{label}{size_mb:.2f} MB"
def list_backups() -> list[dict]:
def list_backups(*, with_manifest: bool = True) -> list[dict]:
items: list[dict] = []
for path in sorted(backup_dir().glob("qihuo_backup_*.tar.gz"), reverse=True):
if not BACKUP_FILENAME_RE.match(path.name):
continue
stat = path.stat()
items.append(
{
"name": path.name,
"size": stat.st_size,
"size_mb": round(stat.st_size / (1024 * 1024), 2),
"mtime": datetime.fromtimestamp(stat.st_mtime, TZ).isoformat(timespec="seconds"),
}
)
item = {
"name": path.name,
"size": stat.st_size,
"size_mb": round(stat.st_size / (1024 * 1024), 2),
"mtime": datetime.fromtimestamp(stat.st_mtime, TZ).isoformat(timespec="seconds"),
"backend": "",
"backend_label": "",
"created_at": "",
"includes_env": False,
"includes_uploads": False,
}
if with_manifest:
try:
manifest = peek_manifest(path)
item["backend"] = manifest.get("backend", "")
item["backend_label"] = (
"PostgreSQL" if manifest.get("backend") == "postgres" else "SQLite"
)
item["created_at"] = (manifest.get("created_at") or "").strip()
item["includes_env"] = bool(manifest.get("includes_env"))
item["includes_uploads"] = bool(manifest.get("includes_uploads"))
except Exception as exc:
logger.debug("read manifest %s: %s", path.name, exc)
items.append(item)
return items
@@ -386,6 +809,8 @@ def schedule_backup(
) -> tuple[bool, str]:
if _backup_lock.locked():
return False, "备份进行中,请稍后再试"
if restore_in_progress():
return False, "恢复进行中,请稍后再试"
def _run() -> None:
try: