Files
crypto_monitor/manual_trading_hub/hub_web_auth.py
T
2026-05-22 11:49:41 +08:00

108 lines
3.1 KiB
Python

"""中控 Web 登录:HUB_USERNAME + HUB_PASSWORD 配置后启用会话 Cookie。"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import time
from secrets import compare_digest
SESSION_COOKIE = "hub_sess"
SESSION_MAX_AGE_SEC = max(3600, int(os.getenv("HUB_SESSION_DAYS", "7")) * 86400)
DEFAULT_USERNAME = "admin"
def _env_username() -> str:
return (os.getenv("HUB_USERNAME") or "").strip()
def _env_password() -> str:
return (os.getenv("HUB_PASSWORD") or "").strip()
def password_required() -> bool:
"""已配置密码即要求登录(用户名未设时默认 admin)。"""
return bool(_env_password())
def expected_username() -> str:
return _env_username() or DEFAULT_USERNAME
def verify_credentials(username: str, password: str) -> bool:
if not _env_password():
return True
u_ok = compare_digest(expected_username(), (username or "").strip())
p_ok = compare_digest(_env_password(), (password or "").strip())
return u_ok and p_ok
def verify_password(password: str) -> bool:
"""兼容旧调用:仅校验密码、用户名用默认值。"""
return verify_credentials(expected_username(), password)
def _secret() -> bytes:
raw = (os.getenv("HUB_SESSION_SECRET") or "").strip()
if not raw:
raw = "|".join(p for p in [_env_username(), _env_password()] if p) or "hub-dev-insecure"
return raw.encode("utf-8")
def _b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _b64url_decode(text: str) -> bytes:
pad = "=" * (-len(text) % 4)
return base64.urlsafe_b64decode(text + pad)
def create_session_token(username: str | None = None) -> str:
payload = {
"exp": int(time.time()) + SESSION_MAX_AGE_SEC,
"v": 2,
"u": (username or expected_username()).strip(),
}
body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode("utf-8"))
sig = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest()
return f"{body}.{sig}"
def validate_session_token(token: str | None) -> bool:
if not token or "." not in token:
return False
body, sig = token.rsplit(".", 1)
expected = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest()
if not compare_digest(expected, sig):
return False
try:
payload = json.loads(_b64url_decode(body))
except Exception:
return False
exp = int(payload.get("exp") or 0)
if exp <= int(time.time()):
return False
sess_user = (payload.get("u") or "").strip()
if sess_user and not compare_digest(sess_user, expected_username()):
return False
return True
def cookie_secure() -> bool:
return (os.getenv("HUB_COOKIE_SECURE") or "").strip().lower() in ("1", "true", "yes", "on")
def is_public_path(path: str, method: str) -> bool:
p = (path or "").split("?")[0].rstrip("/") or "/"
if p.startswith("/assets"):
return True
if p in ("/login", "/api/auth/login", "/api/auth/status"):
return True
if p == "/api/auth/logout" and method.upper() == "POST":
return True
return False