65f5caf4d9
Co-authored-by: Cursor <cursoragent@cursor.com>
110 lines
3.8 KiB
Python
110 lines
3.8 KiB
Python
import json
|
|
import os
|
|
import secrets
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import bcrypt
|
|
from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer
|
|
|
|
DEFAULT_USERNAME = "admin"
|
|
DEFAULT_PASSWORD = "admin"
|
|
TOKEN_MAX_AGE = 60 * 60 * 24 * 7 # 7 days
|
|
|
|
|
|
class AuthManager:
|
|
def __init__(self, data_dir: Path) -> None:
|
|
self.data_dir = data_dir
|
|
self.auth_file = data_dir / "auth.json"
|
|
self.secret_file = data_dir / "secret.key"
|
|
self._serializer: Optional[URLSafeTimedSerializer] = None
|
|
self._ensure_data_dir()
|
|
self._ensure_secret()
|
|
self._ensure_default_user()
|
|
|
|
def _ensure_data_dir(self) -> None:
|
|
self.data_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _ensure_secret(self) -> None:
|
|
if not self.secret_file.exists():
|
|
self.secret_file.write_text(secrets.token_hex(32), encoding="utf-8")
|
|
secret = self.secret_file.read_text(encoding="utf-8").strip()
|
|
self._serializer = URLSafeTimedSerializer(secret, salt="cloud-browser-auth")
|
|
|
|
def _ensure_default_user(self) -> None:
|
|
if not self.auth_file.exists():
|
|
self._save_credentials(DEFAULT_USERNAME, DEFAULT_PASSWORD)
|
|
|
|
def _hash_password(self, password: str) -> str:
|
|
return bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
|
|
|
|
def _verify_password(self, password: str, password_hash: str) -> bool:
|
|
try:
|
|
return bcrypt.checkpw(password.encode("utf-8"), password_hash.encode("utf-8"))
|
|
except ValueError:
|
|
return False
|
|
|
|
def _load_credentials(self) -> dict:
|
|
return json.loads(self.auth_file.read_text(encoding="utf-8"))
|
|
|
|
def _save_credentials(self, username: str, password: str) -> None:
|
|
payload = {
|
|
"username": username,
|
|
"password_hash": self._hash_password(password),
|
|
}
|
|
self.auth_file.write_text(
|
|
json.dumps(payload, ensure_ascii=False, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
def get_username(self) -> str:
|
|
return self._load_credentials()["username"]
|
|
|
|
def authenticate(self, username: str, password: str) -> bool:
|
|
creds = self._load_credentials()
|
|
if username != creds["username"]:
|
|
return False
|
|
return self._verify_password(password, creds["password_hash"])
|
|
|
|
def create_token(self, username: str) -> str:
|
|
assert self._serializer is not None
|
|
return self._serializer.dumps({"username": username, "ts": time.time()})
|
|
|
|
def verify_token(self, token: str) -> Optional[str]:
|
|
if not token or self._serializer is None:
|
|
return None
|
|
try:
|
|
data = self._serializer.loads(token, max_age=TOKEN_MAX_AGE)
|
|
except (BadSignature, SignatureExpired):
|
|
return None
|
|
username = data.get("username")
|
|
if username != self.get_username():
|
|
return None
|
|
return username
|
|
|
|
def change_credentials(
|
|
self,
|
|
current_username: str,
|
|
current_password: str,
|
|
new_username: str,
|
|
new_password: str,
|
|
) -> None:
|
|
creds = self._load_credentials()
|
|
if current_username != creds["username"]:
|
|
raise ValueError("当前用户名不正确")
|
|
if not self._verify_password(current_password, creds["password_hash"]):
|
|
raise ValueError("当前密码不正确")
|
|
if len(new_username.strip()) < 2:
|
|
raise ValueError("新用户名至少 2 个字符")
|
|
if len(new_password) < 4:
|
|
raise ValueError("新密码至少 4 个字符")
|
|
self._save_credentials(new_username.strip(), new_password)
|
|
|
|
|
|
def get_data_dir() -> Path:
|
|
return Path(os.getenv("DATA_DIR", "/app/data"))
|
|
|
|
|
|
auth_manager = AuthManager(get_data_dir())
|