import json import logging import os import re import shutil import subprocess import tarfile import tempfile from datetime import datetime, timezone from pathlib import Path from urllib.parse import urlparse from app.core.config import settings logger = logging.getLogger(__name__) BACKUP_NAME_PATTERN = re.compile(r"^grade-archive_\d{8}_\d{6}\.tar\.gz$") APP_VERSION = "1.0.0" def backup_dir() -> Path: path = Path(settings.BACKUP_DIR) path.mkdir(parents=True, exist_ok=True) return path def _pg_params() -> dict[str, str]: parsed = urlparse(settings.DATABASE_URL) return { "host": parsed.hostname or "127.0.0.1", "port": str(parsed.port or 5432), "user": parsed.username or "postgres", "password": parsed.password or "", "dbname": parsed.path.lstrip("/") or "student_archive", } def _dump_database(dest: Path) -> None: pg = _pg_params() env = {**os.environ, "PGPASSWORD": pg["password"]} cmd = [ "pg_dump", "-h", pg["host"], "-p", pg["port"], "-U", pg["user"], "-d", pg["dbname"], "--no-owner", "--no-privileges", "--clean", "--if-exists", ] with dest.open("w", encoding="utf-8") as out: subprocess.run(cmd, check=True, env=env, stdout=out) def create_backup() -> Path: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") archive_path = backup_dir() / f"grade-archive_{timestamp}.tar.gz" work = Path(tempfile.mkdtemp(prefix="grade-archive-backup-")) try: db_file = work / "database.sql" _dump_database(db_file) manifest = { "app": "secondary-school-grade-archive", "version": APP_VERSION, "created_at": datetime.now(timezone.utc).isoformat(), "database": pg["dbname"] if (pg := _pg_params()) else "student_archive", } manifest_file = work / "manifest.json" manifest_file.write_text(json.dumps(manifest, ensure_ascii=False, indent=2), encoding="utf-8") uploads_dir = Path(settings.UPLOAD_DIR) with tarfile.open(archive_path, "w:gz") as tar: tar.add(db_file, arcname="database.sql") tar.add(manifest_file, arcname="manifest.json") if uploads_dir.is_dir(): tar.add(uploads_dir, arcname="uploads") cleanup_old_backups() logger.info("Backup created: %s", archive_path) return archive_path finally: shutil.rmtree(work, ignore_errors=True) def cleanup_old_backups() -> None: if settings.BACKUP_RETENTION_DAYS <= 0: return cutoff = datetime.now().timestamp() - settings.BACKUP_RETENTION_DAYS * 86400 for item in backup_dir().glob("grade-archive_*.tar.gz"): if item.stat().st_mtime < cutoff: item.unlink(missing_ok=True) def list_backups() -> list[dict]: items: list[dict] = [] for path in sorted(backup_dir().glob("grade-archive_*.tar.gz"), reverse=True): stat = path.stat() items.append( { "filename": path.name, "size_bytes": stat.st_size, "created_at": datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc), } ) return items def resolve_backup_file(filename: str) -> Path: if not BACKUP_NAME_PATTERN.match(filename): raise ValueError("无效的备份文件名") path = backup_dir() / filename if not path.is_file(): raise FileNotFoundError(filename) return path def restore_backup(archive_path: Path) -> None: work = Path(tempfile.mkdtemp(prefix="grade-archive-restore-")) try: with tarfile.open(archive_path, "r:gz") as tar: tar.extractall(work) db_file = work / "database.sql" if not db_file.is_file(): raise ValueError("备份包缺少 database.sql") pg = _pg_params() env = {**os.environ, "PGPASSWORD": pg["password"]} subprocess.run( [ "psql", "-h", pg["host"], "-p", pg["port"], "-U", pg["user"], "-d", pg["dbname"], "-v", "ON_ERROR_STOP=1", "-f", str(db_file), ], check=True, env=env, ) uploads_src = work / "uploads" uploads_dest = Path(settings.UPLOAD_DIR) if uploads_src.is_dir(): if uploads_dest.exists(): shutil.rmtree(uploads_dest) shutil.copytree(uploads_src, uploads_dest) else: uploads_dest.mkdir(parents=True, exist_ok=True) logger.info("Backup restored from %s", archive_path) finally: shutil.rmtree(work, ignore_errors=True)