Files
crypto_monitor/manual_trading_hub/hub_web_auth.py
T
dekun bfa3352122 feat: 系统设置增加备份恢复与默认登录 admin
支持手动/每日自动备份四所数据库、K线库与 env,上传 zip 一键恢复;中控默认账号 admin/admin123。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-07-02 16:39:46 +08:00

183 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""中控 Web 登录:HUB_USERNAME + HUB_PASSWORD 配置后启用会话 Cookie。"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import time
from secrets import compare_digest
SESSION_COOKIE = "hub_sess"
SESSION_MAX_AGE_SEC = max(3600, int(os.getenv("HUB_SESSION_DAYS", "7")) * 86400)
DEFAULT_USERNAME = "admin"
DEFAULT_PASSWORD = "admin123"
def _env_username() -> str:
return (os.getenv("HUB_USERNAME") or "").strip()
def _env_password() -> str:
raw = (os.getenv("HUB_PASSWORD") or "").strip()
return raw or DEFAULT_PASSWORD
def password_required() -> bool:
"""默认启用登录(admin / admin123,可通过 .env 覆盖)。"""
return True
def expected_username() -> str:
return _env_username() or DEFAULT_USERNAME
def verify_credentials(username: str, password: str) -> bool:
u_ok = compare_digest(expected_username(), (username or "").strip())
p_ok = compare_digest(_env_password(), (password or "").strip())
return u_ok and p_ok
def verify_password(password: str) -> bool:
"""兼容旧调用:仅校验密码、用户名用默认值。"""
return verify_credentials(expected_username(), password)
def _secret() -> bytes:
raw = (os.getenv("HUB_SESSION_SECRET") or "").strip()
if not raw:
raw = "|".join(p for p in [_env_username(), _env_password()] if p) or "hub-dev-insecure"
return raw.encode("utf-8")
def _b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _b64url_decode(text: str) -> bytes:
pad = "=" * (-len(text) % 4)
return base64.urlsafe_b64decode(text + pad)
def create_session_token(username: str | None = None) -> str:
payload = {
"exp": int(time.time()) + SESSION_MAX_AGE_SEC,
"v": 2,
"u": (username or expected_username()).strip(),
}
body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode("utf-8"))
sig = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest()
return f"{body}.{sig}"
def validate_session_token(token: str | None) -> bool:
if not token or "." not in token:
return False
body, sig = token.rsplit(".", 1)
expected = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest()
if not compare_digest(expected, sig):
return False
try:
payload = json.loads(_b64url_decode(body))
except Exception:
return False
exp = int(payload.get("exp") or 0)
if exp <= int(time.time()):
return False
sess_user = (payload.get("u") or "").strip()
if sess_user and not compare_digest(sess_user, expected_username()):
return False
return True
def cookie_secure_env_enabled() -> bool:
"""是否在 .env 中启用「HTTPS 时带 Secure Cookie」策略。"""
return (os.getenv("HUB_COOKIE_SECURE") or "").strip().lower() in (
"1",
"true",
"yes",
"on",
)
def cookie_secure_for_request(request) -> bool:
"""
仅在实际 HTTPS 访问时设置 Secure Cookie。
这样可同时支持:域名 HTTPS 反代 + 内网 http://IP:5100 登录。
"""
if not cookie_secure_env_enabled():
return False
proto = (
(request.headers.get("x-forwarded-proto") or request.url.scheme or "http")
.split(",")[0]
.strip()
.lower()
)
return proto == "https"
def embed_allowed() -> bool:
"""允许被本地导航等页面 iframe 嵌入(默认开启,内网场景)。"""
return (os.getenv("HUB_ALLOW_EMBED") or "true").strip().lower() in (
"1",
"true",
"yes",
"on",
)
def embed_frame_ancestors() -> str:
"""CSP frame-ancestors;默认 *,可设 HUB_EMBED_ORIGINS=http://192.168.8.6:5070"""
raw = (os.getenv("HUB_EMBED_ORIGINS") or "*").strip()
if raw == "*":
return "*"
origins = [o.strip() for o in raw.split(",") if o.strip()]
return " ".join(origins) if origins else "*"
def set_session_cookie(response, request, token: str, *, embed: bool = False) -> None:
"""
embed=TrueLocalNav 等跨站 iframe 嵌入时须 SameSite=None + Secure(仅 HTTPS 有效)。
"""
secure = cookie_secure_for_request(request)
samesite = "lax"
if embed:
secure = True
samesite = "none"
response.set_cookie(
SESSION_COOKIE,
token,
httponly=True,
samesite=samesite,
path="/",
max_age=SESSION_MAX_AGE_SEC,
secure=secure,
)
def clear_session_cookie(response, request, *, embed: bool = False) -> None:
secure = cookie_secure_for_request(request)
samesite = "lax"
if embed:
secure = True
samesite = "none"
response.delete_cookie(
SESSION_COOKIE,
path="/",
secure=secure,
samesite=samesite,
)
def is_public_path(path: str, method: str) -> bool:
p = (path or "").split("?")[0].rstrip("/") or "/"
if p.startswith("/assets"):
return True
if p in ("/login", "/embed-auth", "/api/auth/login", "/api/auth/status", "/api/ping"):
return True
if p == "/api/auth/logout" and method.upper() == "POST":
return True
return False