import json import os import secrets import time from pathlib import Path from typing import Optional import bcrypt from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer DEFAULT_USERNAME = "admin" DEFAULT_PASSWORD = "admin" TOKEN_MAX_AGE = 60 * 60 * 24 * 7 # 7 days class AuthManager: def __init__(self, data_dir: Path) -> None: self.data_dir = data_dir self.auth_file = data_dir / "auth.json" self.secret_file = data_dir / "secret.key" self._serializer: Optional[URLSafeTimedSerializer] = None self._ensure_data_dir() self._ensure_secret() self._ensure_default_user() def _ensure_data_dir(self) -> None: self.data_dir.mkdir(parents=True, exist_ok=True) def _ensure_secret(self) -> None: if not self.secret_file.exists(): self.secret_file.write_text(secrets.token_hex(32), encoding="utf-8") secret = self.secret_file.read_text(encoding="utf-8").strip() self._serializer = URLSafeTimedSerializer(secret, salt="cloud-browser-auth") def _ensure_default_user(self) -> None: if not self.auth_file.exists(): self._save_credentials(DEFAULT_USERNAME, DEFAULT_PASSWORD) def _hash_password(self, password: str) -> str: return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8") def _verify_password(self, password: str, password_hash: str) -> bool: try: return bcrypt.checkpw(password.encode("utf-8"), password_hash.encode("utf-8")) except ValueError: return False def _load_credentials(self) -> dict: return json.loads(self.auth_file.read_text(encoding="utf-8")) def _save_credentials(self, username: str, password: str) -> None: payload = { "username": username, "password_hash": self._hash_password(password), } self.auth_file.write_text( json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8", ) def get_username(self) -> str: return self._load_credentials()["username"] def authenticate(self, username: str, password: str) -> bool: creds = self._load_credentials() if username != creds["username"]: return False return self._verify_password(password, creds["password_hash"]) def create_token(self, username: str) -> str: assert self._serializer is not None return self._serializer.dumps({"username": username, "ts": time.time()}) def verify_token(self, token: str) -> Optional[str]: if not token or self._serializer is None: return None try: data = self._serializer.loads(token, max_age=TOKEN_MAX_AGE) except (BadSignature, SignatureExpired): return None username = data.get("username") if username != self.get_username(): return None return username def change_credentials( self, current_username: str, current_password: str, new_username: str, new_password: str, ) -> None: creds = self._load_credentials() if current_username != creds["username"]: raise ValueError("当前用户名不正确") if not self._verify_password(current_password, creds["password_hash"]): raise ValueError("当前密码不正确") if len(new_username.strip()) < 2: raise ValueError("新用户名至少 2 个字符") if len(new_password) < 4: raise ValueError("新密码至少 4 个字符") self._save_credentials(new_username.strip(), new_password) def get_data_dir() -> Path: return Path(os.getenv("DATA_DIR", "/app/data")) auth_manager = AuthManager(get_data_dir())