Files
2026-05-30 12:33:20 +08:00

167 lines
5.5 KiB
Python

"""
实例浏览器 SSO(复用 HUB_BRIDGE_TOKEN)。无 Flask 依赖,供中控 FastAPI 与各实例共用。
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import secrets
import threading
import time
HUB_SSO_TTL_SEC = int(os.getenv("HUB_SSO_TTL_SEC", "7200"))
HUB_EMBED_BOOTSTRAP_TTL_SEC = int(os.getenv("HUB_EMBED_BOOTSTRAP_TTL_SEC", "120"))
_used_nonces: dict[str, float] = {}
_nonce_lock = threading.Lock()
def hub_bridge_token() -> str:
return (os.getenv("HUB_BRIDGE_TOKEN") or "").strip()
def safe_next_path(raw: str | None) -> str:
p = (raw or "/").strip()
if not p.startswith("/") or p.startswith("//"):
return "/"
if "://" in p:
return "/"
return p
def _sso_secret() -> str:
return hub_bridge_token()
def _b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode().rstrip("=")
def _b64url_decode(data: str) -> bytes:
pad = "=" * (-len(data) % 4)
return base64.urlsafe_b64decode(data + pad)
def _prune_used_nonces() -> None:
now = time.time()
with _nonce_lock:
dead = [k for k, exp in _used_nonces.items() if exp <= now]
for k in dead:
del _used_nonces[k]
def mint_hub_sso_token(exchange_key: str, next_path: str = "/") -> str | None:
secret = _sso_secret()
ex = (exchange_key or "").strip().lower()
if not secret or not ex:
return None
payload = {
"ex": ex,
"exp": int(time.time()) + max(60, HUB_SSO_TTL_SEC),
"nonce": secrets.token_urlsafe(16),
"next": safe_next_path(next_path),
}
body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(secret.encode(), body.encode(), hashlib.sha256).hexdigest()
return f"{body}.{sig}"
def verify_hub_sso_token(
token: str | None, expected_exchange: str
) -> tuple[bool, str, str | None]:
secret = _sso_secret()
expected = (expected_exchange or "").strip().lower()
if not secret or not expected:
return False, "/", "未配置 HUB_BRIDGE_TOKEN"
raw = (token or "").strip()
if "." not in raw:
return False, "/", "token 无效"
body, sig = raw.rsplit(".", 1)
try:
expect_sig = hmac.new(secret.encode(), body.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(expect_sig, sig):
return False, "/", "签名校验失败"
payload = json.loads(_b64url_decode(body).decode())
except Exception:
return False, "/", "token 解析失败"
if not isinstance(payload, dict):
return False, "/", "payload 无效"
if str(payload.get("ex") or "").lower() != expected:
return False, "/", "实例不匹配"
try:
exp = int(payload.get("exp") or 0)
except (TypeError, ValueError):
return False, "/", "exp 无效"
if exp < int(time.time()):
return False, "/", "链接已过期"
nonce = str(payload.get("nonce") or "")
if not nonce:
return False, "/", "nonce 缺失"
_prune_used_nonces()
with _nonce_lock:
if nonce in _used_nonces:
return False, "/", "链接已使用"
_used_nonces[nonce] = float(exp)
return True, safe_next_path(str(payload.get("next") or "/")), None
def mint_hub_embed_bootstrap(exchange_key: str, next_path: str = "/") -> str | None:
"""iframe 内嵌登录引导 token(短效、单次),供 /hub-embed-auth 写入 SameSite=None Cookie。"""
secret = _sso_secret()
ex = (exchange_key or "").strip().lower()
if not secret or not ex:
return None
payload = {
"kind": "embed",
"ex": ex,
"exp": int(time.time()) + max(30, HUB_EMBED_BOOTSTRAP_TTL_SEC),
"nonce": secrets.token_urlsafe(16),
"next": safe_next_path(next_path),
}
body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode())
sig = hmac.new(secret.encode(), body.encode(), hashlib.sha256).hexdigest()
return f"{body}.{sig}"
def verify_hub_embed_bootstrap(
token: str | None, expected_exchange: str
) -> tuple[bool, str, str | None]:
secret = _sso_secret()
expected = (expected_exchange or "").strip().lower()
if not secret or not expected:
return False, "/", "未配置 HUB_BRIDGE_TOKEN"
raw = (token or "").strip()
if "." not in raw:
return False, "/", "token 无效"
body, sig = raw.rsplit(".", 1)
try:
expect_sig = hmac.new(secret.encode(), body.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(expect_sig, sig):
return False, "/", "签名校验失败"
payload = json.loads(_b64url_decode(body).decode())
except Exception:
return False, "/", "token 解析失败"
if not isinstance(payload, dict) or payload.get("kind") != "embed":
return False, "/", "token 类型无效"
if str(payload.get("ex") or "").lower() != expected:
return False, "/", "实例不匹配"
try:
exp = int(payload.get("exp") or 0)
except (TypeError, ValueError):
return False, "/", "exp 无效"
if exp < int(time.time()):
return False, "/", "链接已过期"
nonce = str(payload.get("nonce") or "")
if not nonce:
return False, "/", "nonce 缺失"
key = f"embed:{nonce}"
_prune_used_nonces()
with _nonce_lock:
if key in _used_nonces:
return False, "/", "链接已使用"
_used_nonces[key] = float(exp)
return True, safe_next_path(str(payload.get("next") or "/")), None