Files
crypto_monitor/manual_trading_hub/hub_web_auth.py
T
2026-05-30 12:04:03 +08:00

183 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""中控 Web 登录:HUB_USERNAME + HUB_PASSWORD 配置后启用会话 Cookie。"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import time
from secrets import compare_digest
SESSION_COOKIE = "hub_sess"
SESSION_MAX_AGE_SEC = max(3600, int(os.getenv("HUB_SESSION_DAYS", "7")) * 86400)
DEFAULT_USERNAME = "admin"
def _env_username() -> str:
return (os.getenv("HUB_USERNAME") or "").strip()
def _env_password() -> str:
return (os.getenv("HUB_PASSWORD") or "").strip()
def password_required() -> bool:
"""已配置密码即要求登录(用户名未设时默认 admin)。"""
return bool(_env_password())
def expected_username() -> str:
return _env_username() or DEFAULT_USERNAME
def verify_credentials(username: str, password: str) -> bool:
if not _env_password():
return True
u_ok = compare_digest(expected_username(), (username or "").strip())
p_ok = compare_digest(_env_password(), (password or "").strip())
return u_ok and p_ok
def verify_password(password: str) -> bool:
"""兼容旧调用:仅校验密码、用户名用默认值。"""
return verify_credentials(expected_username(), password)
def _secret() -> bytes:
raw = (os.getenv("HUB_SESSION_SECRET") or "").strip()
if not raw:
raw = "|".join(p for p in [_env_username(), _env_password()] if p) or "hub-dev-insecure"
return raw.encode("utf-8")
def _b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _b64url_decode(text: str) -> bytes:
pad = "=" * (-len(text) % 4)
return base64.urlsafe_b64decode(text + pad)
def create_session_token(username: str | None = None) -> str:
payload = {
"exp": int(time.time()) + SESSION_MAX_AGE_SEC,
"v": 2,
"u": (username or expected_username()).strip(),
}
body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode("utf-8"))
sig = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest()
return f"{body}.{sig}"
def validate_session_token(token: str | None) -> bool:
if not token or "." not in token:
return False
body, sig = token.rsplit(".", 1)
expected = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest()
if not compare_digest(expected, sig):
return False
try:
payload = json.loads(_b64url_decode(body))
except Exception:
return False
exp = int(payload.get("exp") or 0)
if exp <= int(time.time()):
return False
sess_user = (payload.get("u") or "").strip()
if sess_user and not compare_digest(sess_user, expected_username()):
return False
return True
def cookie_secure_env_enabled() -> bool:
"""是否在 .env 中启用「HTTPS 时带 Secure Cookie」策略。"""
return (os.getenv("HUB_COOKIE_SECURE") or "").strip().lower() in (
"1",
"true",
"yes",
"on",
)
def cookie_secure_for_request(request) -> bool:
"""
仅在实际 HTTPS 访问时设置 Secure Cookie。
这样可同时支持:域名 HTTPS 反代 + 内网 http://IP:5100 登录。
"""
if not cookie_secure_env_enabled():
return False
proto = (
(request.headers.get("x-forwarded-proto") or request.url.scheme or "http")
.split(",")[0]
.strip()
.lower()
)
return proto == "https"
def embed_allowed() -> bool:
"""允许被本地导航等页面 iframe 嵌入(默认开启,内网场景)。"""
return (os.getenv("HUB_ALLOW_EMBED") or "true").strip().lower() in (
"1",
"true",
"yes",
"on",
)
def embed_frame_ancestors() -> str:
"""CSP frame-ancestors;默认 *,可设 HUB_EMBED_ORIGINS=http://192.168.8.6:5070"""
raw = (os.getenv("HUB_EMBED_ORIGINS") or "*").strip()
if raw == "*":
return "*"
origins = [o.strip() for o in raw.split(",") if o.strip()]
return " ".join(origins) if origins else "*"
def set_session_cookie(response, request, token: str, *, embed: bool = False) -> None:
"""
embed=TrueLocalNav 等跨站 iframe 嵌入时须 SameSite=None + Secure(仅 HTTPS 有效)。
"""
secure = cookie_secure_for_request(request)
samesite = "lax"
if embed:
secure = True
samesite = "none"
response.set_cookie(
SESSION_COOKIE,
token,
httponly=True,
samesite=samesite,
path="/",
max_age=SESSION_MAX_AGE_SEC,
secure=secure,
)
def clear_session_cookie(response, request, *, embed: bool = False) -> None:
secure = cookie_secure_for_request(request)
samesite = "lax"
if embed:
secure = True
samesite = "none"
response.delete_cookie(
SESSION_COOKIE,
path="/",
secure=secure,
samesite=samesite,
)
def is_public_path(path: str, method: str) -> bool:
p = (path or "").split("?")[0].rstrip("/") or "/"
if p.startswith("/assets"):
return True
if p in ("/login", "/embed-auth", "/api/auth/login", "/api/auth/status", "/api/ping"):
return True
if p == "/api/auth/logout" and method.upper() == "POST":
return True
return False