Files
crypto_monitor/hub_auth.py
T
2026-05-25 11:49:53 +08:00

125 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""中控调用实例 API 时的鉴权;实例浏览器 SSO(复用 HUB_BRIDGE_TOKEN)。"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import secrets
import threading
import time
from typing import Any
from flask import request
# 中控打开实例链接有效期(秒),默认 2 小时
HUB_SSO_TTL_SEC = int(os.getenv("HUB_SSO_TTL_SEC", "7200"))
_used_nonces: dict[str, float] = {}
_nonce_lock = threading.Lock()
def hub_bridge_token() -> str:
return (os.getenv("HUB_BRIDGE_TOKEN") or "").strip()
def request_allowed(session_logged_in: bool, auth_disabled: bool) -> bool:
if auth_disabled or session_logged_in:
return True
tok = hub_bridge_token()
if tok and request.headers.get("X-Hub-Token") == tok:
return True
return False
def safe_next_path(raw: str | None) -> str:
"""仅允许站内相对路径,防止开放重定向。"""
p = (raw or "/").strip()
if not p.startswith("/") or p.startswith("//"):
return "/"
if "://" in p:
return "/"
return p
def _sso_secret() -> str:
return hub_bridge_token()
def _b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode().rstrip("=")
def _b64url_decode(data: str) -> bytes:
pad = "=" * (-len(data) % 4)
return base64.urlsafe_b64decode(data + pad)
def _prune_used_nonces() -> None:
now = time.time()
with _nonce_lock:
dead = [k for k, exp in _used_nonces.items() if exp <= now]
for k in dead:
del _used_nonces[k]
def mint_hub_sso_token(exchange_key: str, next_path: str = "/") -> str | None:
"""签发实例浏览器 SSO tokenexchange_key 与 hub_bridge install_on_app 的 exchange 一致)。"""
secret = _sso_secret()
ex = (exchange_key or "").strip().lower()
if not secret or not ex:
return None
payload = {
"ex": ex,
"exp": int(time.time()) + max(60, HUB_SSO_TTL_SEC),
"nonce": secrets.token_urlsafe(16),
"next": safe_next_path(next_path),
}
body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(secret.encode(), body.encode(), hashlib.sha256).hexdigest()
return f"{body}.{sig}"
def verify_hub_sso_token(
token: str | None, expected_exchange: str
) -> tuple[bool, str, str | None]:
"""
校验 SSO token。成功返回 (True, next_path, None);失败返回 (False, '/', 原因)。
单次使用:nonce 用过后在 exp 之前不可复用。
"""
secret = _sso_secret()
expected = (expected_exchange or "").strip().lower()
if not secret or not expected:
return False, "/", "未配置 HUB_BRIDGE_TOKEN"
raw = (token or "").strip()
if "." not in raw:
return False, "/", "token 无效"
body, sig = raw.rsplit(".", 1)
try:
expect_sig = hmac.new(secret.encode(), body.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(expect_sig, sig):
return False, "/", "签名校验失败"
payload = json.loads(_b64url_decode(body).decode())
except Exception:
return False, "/", "token 解析失败"
if not isinstance(payload, dict):
return False, "/", "payload 无效"
if str(payload.get("ex") or "").lower() != expected:
return False, "/", "实例不匹配"
try:
exp = int(payload.get("exp") or 0)
except (TypeError, ValueError):
return False, "/", "exp 无效"
if exp < int(time.time()):
return False, "/", "链接已过期"
nonce = str(payload.get("nonce") or "")
if not nonce:
return False, "/", "nonce 缺失"
_prune_used_nonces()
with _nonce_lock:
if nonce in _used_nonces:
return False, "/", "链接已使用"
_used_nonces[nonce] = float(exp)
return True, safe_next_path(str(payload.get("next") or "/")), None