增加用户名
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
"""中控 Web 登录:HUB_PASSWORD 非空时启用会话 Cookie。"""
|
||||
"""中控 Web 登录:HUB_USERNAME + HUB_PASSWORD 配置后启用会话 Cookie。"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
@@ -12,23 +12,43 @@ from secrets import compare_digest
|
||||
|
||||
SESSION_COOKIE = "hub_sess"
|
||||
SESSION_MAX_AGE_SEC = max(3600, int(os.getenv("HUB_SESSION_DAYS", "7")) * 86400)
|
||||
DEFAULT_USERNAME = "admin"
|
||||
|
||||
|
||||
def _env_username() -> str:
|
||||
return (os.getenv("HUB_USERNAME") or "").strip()
|
||||
|
||||
|
||||
def _env_password() -> str:
|
||||
return (os.getenv("HUB_PASSWORD") or "").strip()
|
||||
|
||||
|
||||
def password_required() -> bool:
|
||||
return bool((os.getenv("HUB_PASSWORD") or "").strip())
|
||||
"""已配置密码即要求登录(用户名未设时默认 admin)。"""
|
||||
return bool(_env_password())
|
||||
|
||||
|
||||
def expected_username() -> str:
|
||||
return _env_username() or DEFAULT_USERNAME
|
||||
|
||||
|
||||
def verify_credentials(username: str, password: str) -> bool:
|
||||
if not _env_password():
|
||||
return True
|
||||
u_ok = compare_digest(expected_username(), (username or "").strip())
|
||||
p_ok = compare_digest(_env_password(), (password or "").strip())
|
||||
return u_ok and p_ok
|
||||
|
||||
|
||||
def verify_password(password: str) -> bool:
|
||||
expected = (os.getenv("HUB_PASSWORD") or "").strip()
|
||||
if not expected:
|
||||
return True
|
||||
return compare_digest(expected, (password or "").strip())
|
||||
"""兼容旧调用:仅校验密码、用户名用默认值。"""
|
||||
return verify_credentials(expected_username(), password)
|
||||
|
||||
|
||||
def _secret() -> bytes:
|
||||
raw = (os.getenv("HUB_SESSION_SECRET") or os.getenv("HUB_PASSWORD") or "").strip()
|
||||
raw = (os.getenv("HUB_SESSION_SECRET") or "").strip()
|
||||
if not raw:
|
||||
return b"hub-dev-insecure"
|
||||
raw = "|".join(p for p in [_env_username(), _env_password()] if p) or "hub-dev-insecure"
|
||||
return raw.encode("utf-8")
|
||||
|
||||
|
||||
@@ -41,8 +61,12 @@ def _b64url_decode(text: str) -> bytes:
|
||||
return base64.urlsafe_b64decode(text + pad)
|
||||
|
||||
|
||||
def create_session_token() -> str:
|
||||
payload = {"exp": int(time.time()) + SESSION_MAX_AGE_SEC, "v": 1}
|
||||
def create_session_token(username: str | None = None) -> str:
|
||||
payload = {
|
||||
"exp": int(time.time()) + SESSION_MAX_AGE_SEC,
|
||||
"v": 2,
|
||||
"u": (username or expected_username()).strip(),
|
||||
}
|
||||
body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode("utf-8"))
|
||||
sig = hmac.new(_secret(), body.encode("ascii"), hashlib.sha256).hexdigest()
|
||||
return f"{body}.{sig}"
|
||||
@@ -60,7 +84,12 @@ def validate_session_token(token: str | None) -> bool:
|
||||
except Exception:
|
||||
return False
|
||||
exp = int(payload.get("exp") or 0)
|
||||
return exp > int(time.time())
|
||||
if exp <= int(time.time()):
|
||||
return False
|
||||
sess_user = (payload.get("u") or "").strip()
|
||||
if sess_user and not compare_digest(sess_user, expected_username()):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def cookie_secure() -> bool:
|
||||
|
||||
Reference in New Issue
Block a user