"""中控 Web 登录:HUB_USERNAME + HUB_PASSWORD 配置后启用会话 Cookie。""" from __future__ import annotations import base64 import hashlib import hmac import json import os import time from secrets import compare_digest SESSION_COOKIE = "hub_sess" SESSION_MAX_AGE_SEC = max(3600, int(os.getenv("HUB_SESSION_DAYS", "7")) * 86400) DEFAULT_USERNAME = "admin" def _env_username() -> str: return (os.getenv("HUB_USERNAME") or "").strip() def _env_password() -> str: return (os.getenv("HUB_PASSWORD") or "").strip() def password_required() -> bool: """已配置密码即要求登录(用户名未设时默认 admin)。""" return bool(_env_password()) def expected_username() -> str: return _env_username() or DEFAULT_USERNAME def verify_credentials(username: str, password: str) -> bool: if not _env_password(): return True u_ok = compare_digest(expected_username(), (username or "").strip()) p_ok = compare_digest(_env_password(), (password or "").strip()) return u_ok and p_ok def verify_password(password: str) -> bool: """兼容旧调用:仅校验密码、用户名用默认值。""" return verify_credentials(expected_username(), password) def _secret() -> bytes: raw = (os.getenv("HUB_SESSION_SECRET") or "").strip() if not raw: raw = "|".join(p for p in [_env_username(), _env_password()] if p) or "hub-dev-insecure" return raw.encode("utf-8") def _b64url_encode(data: bytes) -> str: return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=") def _b64url_decode(text: str) -> bytes: pad = "=" * (-len(text) % 4) return base64.urlsafe_b64decode(text + pad) def create_session_token(username: str | None = None) -> str: payload = { "exp": int(time.time()) + SESSION_MAX_AGE_SEC, "v": 2, "u": (username or expected_username()).strip(), } body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode("utf-8")) sig = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest() return f"{body}.{sig}" def validate_session_token(token: str | None) -> bool: if not token or "." not in token: return False body, sig = token.rsplit(".", 1) expected = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest() if not compare_digest(expected, sig): return False try: payload = json.loads(_b64url_decode(body)) except Exception: return False exp = int(payload.get("exp") or 0) if exp <= int(time.time()): return False sess_user = (payload.get("u") or "").strip() if sess_user and not compare_digest(sess_user, expected_username()): return False return True def cookie_secure_env_enabled() -> bool: """是否在 .env 中启用「HTTPS 时带 Secure Cookie」策略。""" return (os.getenv("HUB_COOKIE_SECURE") or "").strip().lower() in ( "1", "true", "yes", "on", ) def cookie_secure_for_request(request) -> bool: """ 仅在实际 HTTPS 访问时设置 Secure Cookie。 这样可同时支持:域名 HTTPS 反代 + 内网 http://IP:5100 登录。 """ if not cookie_secure_env_enabled(): return False proto = ( (request.headers.get("x-forwarded-proto") or request.url.scheme or "http") .split(",")[0] .strip() .lower() ) return proto == "https" def embed_allowed() -> bool: """允许被本地导航等页面 iframe 嵌入(默认开启,内网场景)。""" return (os.getenv("HUB_ALLOW_EMBED") or "true").strip().lower() in ( "1", "true", "yes", "on", ) def embed_frame_ancestors() -> str: """CSP frame-ancestors;默认 *,可设 HUB_EMBED_ORIGINS=http://192.168.8.6:5070""" raw = (os.getenv("HUB_EMBED_ORIGINS") or "*").strip() if raw == "*": return "*" origins = [o.strip() for o in raw.split(",") if o.strip()] return " ".join(origins) if origins else "*" def set_session_cookie(response, request, token: str, *, embed: bool = False) -> None: """ embed=True:LocalNav 等跨站 iframe 嵌入时须 SameSite=None + Secure(仅 HTTPS 有效)。 """ secure = cookie_secure_for_request(request) samesite = "lax" if embed: secure = True samesite = "none" response.set_cookie( SESSION_COOKIE, token, httponly=True, samesite=samesite, path="/", max_age=SESSION_MAX_AGE_SEC, secure=secure, ) def clear_session_cookie(response, request, *, embed: bool = False) -> None: secure = cookie_secure_for_request(request) samesite = "lax" if embed: secure = True samesite = "none" response.delete_cookie( SESSION_COOKIE, path="/", secure=secure, samesite=samesite, ) def is_public_path(path: str, method: str) -> bool: p = (path or "").split("?")[0].rstrip("/") or "/" if p.startswith("/assets"): return True if p in ("/login", "/embed-auth", "/api/auth/login", "/api/auth/status", "/api/ping"): return True if p == "/api/auth/logout" and method.upper() == "POST": return True return False