183 lines
5.2 KiB
Python
183 lines
5.2 KiB
Python
"""中控 Web 登录:HUB_USERNAME + HUB_PASSWORD 配置后启用会话 Cookie。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import base64
|
||
import hashlib
|
||
import hmac
|
||
import json
|
||
import os
|
||
import time
|
||
from secrets import compare_digest
|
||
|
||
SESSION_COOKIE = "hub_sess"
|
||
SESSION_MAX_AGE_SEC = max(3600, int(os.getenv("HUB_SESSION_DAYS", "7")) * 86400)
|
||
DEFAULT_USERNAME = "admin"
|
||
|
||
|
||
def _env_username() -> str:
|
||
return (os.getenv("HUB_USERNAME") or "").strip()
|
||
|
||
|
||
def _env_password() -> str:
|
||
return (os.getenv("HUB_PASSWORD") or "").strip()
|
||
|
||
|
||
def password_required() -> bool:
|
||
"""已配置密码即要求登录(用户名未设时默认 admin)。"""
|
||
return bool(_env_password())
|
||
|
||
|
||
def expected_username() -> str:
|
||
return _env_username() or DEFAULT_USERNAME
|
||
|
||
|
||
def verify_credentials(username: str, password: str) -> bool:
|
||
if not _env_password():
|
||
return True
|
||
u_ok = compare_digest(expected_username(), (username or "").strip())
|
||
p_ok = compare_digest(_env_password(), (password or "").strip())
|
||
return u_ok and p_ok
|
||
|
||
|
||
def verify_password(password: str) -> bool:
|
||
"""兼容旧调用:仅校验密码、用户名用默认值。"""
|
||
return verify_credentials(expected_username(), password)
|
||
|
||
|
||
def _secret() -> bytes:
|
||
raw = (os.getenv("HUB_SESSION_SECRET") or "").strip()
|
||
if not raw:
|
||
raw = "|".join(p for p in [_env_username(), _env_password()] if p) or "hub-dev-insecure"
|
||
return raw.encode("utf-8")
|
||
|
||
|
||
def _b64url_encode(data: bytes) -> str:
|
||
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
|
||
|
||
|
||
def _b64url_decode(text: str) -> bytes:
|
||
pad = "=" * (-len(text) % 4)
|
||
return base64.urlsafe_b64decode(text + pad)
|
||
|
||
|
||
def create_session_token(username: str | None = None) -> str:
|
||
payload = {
|
||
"exp": int(time.time()) + SESSION_MAX_AGE_SEC,
|
||
"v": 2,
|
||
"u": (username or expected_username()).strip(),
|
||
}
|
||
body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode("utf-8"))
|
||
sig = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest()
|
||
return f"{body}.{sig}"
|
||
|
||
|
||
def validate_session_token(token: str | None) -> bool:
|
||
if not token or "." not in token:
|
||
return False
|
||
body, sig = token.rsplit(".", 1)
|
||
expected = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest()
|
||
if not compare_digest(expected, sig):
|
||
return False
|
||
try:
|
||
payload = json.loads(_b64url_decode(body))
|
||
except Exception:
|
||
return False
|
||
exp = int(payload.get("exp") or 0)
|
||
if exp <= int(time.time()):
|
||
return False
|
||
sess_user = (payload.get("u") or "").strip()
|
||
if sess_user and not compare_digest(sess_user, expected_username()):
|
||
return False
|
||
return True
|
||
|
||
|
||
def cookie_secure_env_enabled() -> bool:
|
||
"""是否在 .env 中启用「HTTPS 时带 Secure Cookie」策略。"""
|
||
return (os.getenv("HUB_COOKIE_SECURE") or "").strip().lower() in (
|
||
"1",
|
||
"true",
|
||
"yes",
|
||
"on",
|
||
)
|
||
|
||
|
||
def cookie_secure_for_request(request) -> bool:
|
||
"""
|
||
仅在实际 HTTPS 访问时设置 Secure Cookie。
|
||
这样可同时支持:域名 HTTPS 反代 + 内网 http://IP:5100 登录。
|
||
"""
|
||
if not cookie_secure_env_enabled():
|
||
return False
|
||
proto = (
|
||
(request.headers.get("x-forwarded-proto") or request.url.scheme or "http")
|
||
.split(",")[0]
|
||
.strip()
|
||
.lower()
|
||
)
|
||
return proto == "https"
|
||
|
||
|
||
def embed_allowed() -> bool:
|
||
"""允许被本地导航等页面 iframe 嵌入(默认开启,内网场景)。"""
|
||
return (os.getenv("HUB_ALLOW_EMBED") or "true").strip().lower() in (
|
||
"1",
|
||
"true",
|
||
"yes",
|
||
"on",
|
||
)
|
||
|
||
|
||
def embed_frame_ancestors() -> str:
|
||
"""CSP frame-ancestors;默认 *,可设 HUB_EMBED_ORIGINS=http://192.168.8.6:5070"""
|
||
raw = (os.getenv("HUB_EMBED_ORIGINS") or "*").strip()
|
||
if raw == "*":
|
||
return "*"
|
||
origins = [o.strip() for o in raw.split(",") if o.strip()]
|
||
return " ".join(origins) if origins else "*"
|
||
|
||
|
||
def set_session_cookie(response, request, token: str, *, embed: bool = False) -> None:
|
||
"""
|
||
embed=True:LocalNav 等跨站 iframe 嵌入时须 SameSite=None + Secure(仅 HTTPS 有效)。
|
||
"""
|
||
secure = cookie_secure_for_request(request)
|
||
samesite = "lax"
|
||
if embed:
|
||
secure = True
|
||
samesite = "none"
|
||
response.set_cookie(
|
||
SESSION_COOKIE,
|
||
token,
|
||
httponly=True,
|
||
samesite=samesite,
|
||
path="/",
|
||
max_age=SESSION_MAX_AGE_SEC,
|
||
secure=secure,
|
||
)
|
||
|
||
|
||
def clear_session_cookie(response, request, *, embed: bool = False) -> None:
|
||
secure = cookie_secure_for_request(request)
|
||
samesite = "lax"
|
||
if embed:
|
||
secure = True
|
||
samesite = "none"
|
||
response.delete_cookie(
|
||
SESSION_COOKIE,
|
||
path="/",
|
||
secure=secure,
|
||
samesite=samesite,
|
||
)
|
||
|
||
|
||
def is_public_path(path: str, method: str) -> bool:
|
||
p = (path or "").split("?")[0].rstrip("/") or "/"
|
||
if p.startswith("/assets"):
|
||
return True
|
||
if p in ("/login", "/embed-auth", "/api/auth/login", "/api/auth/status", "/api/ping"):
|
||
return True
|
||
if p == "/api/auth/logout" and method.upper() == "POST":
|
||
return True
|
||
return False
|