Files
2026-06-27 10:59:59 +08:00

110 lines
3.7 KiB
Python

import json
import os
import secrets
import time
from pathlib import Path
from typing import Optional
import bcrypt
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin"
TOKEN_MAX_AGE = 60 * 60 * 24 * 7 # 7 days
class AuthManager:
def __init__(self, data_dir: Path) -> None:
self.data_dir = data_dir
self.auth_file = data_dir / "auth.json"
self.secret_file = data_dir / "secret.key"
self._serializer: Optional[URLSafeTimedSerializer] = None
self._ensure_data_dir()
self._ensure_secret()
self._ensure_default_user()
def _ensure_data_dir(self) -> None:
self.data_dir.mkdir(parents=True, exist_ok=True)
def _ensure_secret(self) -> None:
if not self.secret_file.exists():
self.secret_file.write_text(secrets.token_hex(32), encoding="utf-8")
secret = self.secret_file.read_text(encoding="utf-8").strip()
self._serializer = URLSafeTimedSerializer(secret, salt="cloud-browser-auth")
def _ensure_default_user(self) -> None:
if not self.auth_file.exists():
self._save_credentials(DEFAULT_USERNAME, DEFAULT_PASSWORD)
def _hash_password(self, password: str) -> str:
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def _verify_password(self, password: str, password_hash: str) -> bool:
try:
return bcrypt.checkpw(password.encode("utf-8"), password_hash.encode("utf-8"))
except ValueError:
return False
def _load_credentials(self) -> dict:
return json.loads(self.auth_file.read_text(encoding="utf-8"))
def _save_credentials(self, username: str, password: str) -> None:
payload = {
"username": username,
"password_hash": self._hash_password(password),
}
self.auth_file.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
def get_username(self) -> str:
return self._load_credentials()["username"]
def authenticate(self, username: str, password: str) -> bool:
creds = self._load_credentials()
if username != creds["username"]:
return False
return self._verify_password(password, creds["password_hash"])
def create_token(self, username: str) -> str:
assert self._serializer is not None
return self._serializer.dumps({"username": username, "ts": time.time()})
def verify_token(self, token: str) -> Optional[str]:
if not token or self._serializer is None:
return None
try:
data = self._serializer.loads(token, max_age=TOKEN_MAX_AGE)
except (BadSignature, SignatureExpired):
return None
username = data.get("username")
if username != self.get_username():
return None
return username
def change_credentials(
self,
current_username: str,
current_password: str,
new_username: str,
new_password: str,
) -> None:
creds = self._load_credentials()
if current_username != creds["username"]:
raise ValueError("当前用户名不正确")
if not self._verify_password(current_password, creds["password_hash"]):
raise ValueError("当前密码不正确")
if len(new_username.strip()) < 2:
raise ValueError("新用户名至少 2 个字符")
if len(new_password) < 4:
raise ValueError("新密码至少 4 个字符")
self._save_credentials(new_username.strip(), new_password)
def get_data_dir() -> Path:
return Path(os.getenv("DATA_DIR", "/app/data"))
auth_manager = AuthManager(get_data_dir())