This commit is contained in:
dekun
2026-05-30 16:01:35 +08:00
parent 979054546c
commit cdbe087202
6 changed files with 235 additions and 20 deletions
+32
View File
@@ -246,9 +246,41 @@ async def login_post(request: Request, body: LoginBody) -> JSONResponse | Redire
if body.username.strip() != settings.auth.username.strip() or _hash_password(body.password) != _password_hash(): if body.username.strip() != settings.auth.username.strip() or _hash_password(body.password) != _password_hash():
return JSONResponse({"ok": False, "detail": "账号或密码错误"}, status_code=401) return JSONResponse({"ok": False, "detail": "账号或密码错误"}, status_code=401)
request.session["logged_in"] = True request.session["logged_in"] = True
embed_hdr = (request.headers.get("x-nav-embed") or "").strip() == "1"
try:
from nav_session_auth import create_embed_bootstrap_token, nav_embed_session_active, safe_next_path
from urllib.parse import urlencode
if embed_hdr or nav_embed_session_active():
nxt = safe_next_path("/dashboard")
boot = create_embed_bootstrap_token(body.username.strip(), secret=settings.app.session_secret)
q = urlencode({"token": boot, "next": nxt, "embed": "1"})
return JSONResponse(
{
"ok": True,
"redirect": nxt,
"session_token": boot,
"embed_auth_url": f"/embed-auth?{q}",
}
)
except Exception:
pass
return JSONResponse({"ok": True, "redirect": "/dashboard"}) return JSONResponse({"ok": True, "redirect": "/dashboard"})
@app.get("/embed-auth", response_model=None)
async def embed_auth(request: Request, token: str = "", next: str = "/dashboard") -> RedirectResponse:
from nav_session_auth import safe_next_path, validate_embed_bootstrap_token
if not settings.auth.enabled:
return RedirectResponse(safe_next_path(next), status_code=302)
ok, _user = validate_embed_bootstrap_token(token, secret=settings.app.session_secret)
if ok:
request.session["logged_in"] = True
return RedirectResponse(safe_next_path(next), status_code=302)
return RedirectResponse("/login", status_code=302)
@app.get("/logout", response_model=None) @app.get("/logout", response_model=None)
async def logout(request: Request) -> RedirectResponse: async def logout(request: Request) -> RedirectResponse:
request.session.clear() request.session.clear()
+12 -3
View File
@@ -78,7 +78,10 @@
var password = fd.get("password"); var password = fd.get("password");
fetch("/login", { fetch("/login", {
method: "POST", method: "POST",
headers: { "Content-Type": "application/json" }, headers: {
"Content-Type": "application/json",
"X-Nav-Embed": window.self !== window.top ? "1" : "0",
},
credentials: "same-origin", credentials: "same-origin",
body: JSON.stringify({ username: username, password: password }), body: JSON.stringify({ username: username, password: password }),
}) })
@@ -88,10 +91,16 @@
}); });
}) })
.then(function (x) { .then(function (x) {
if (x.ok && x.body && x.body.redirect) { if (x.ok && x.body) {
window.location.href = x.body.redirect; var dest =
window.self !== window.top && x.body.embed_auth_url
? x.body.embed_auth_url
: x.body.redirect || "/dashboard";
if (dest) {
window.location.href = dest;
return; return;
} }
}
errEl.textContent = (x.body && x.body.detail) || "登录失败"; errEl.textContent = (x.body && x.body.detail) || "登录失败";
}) })
.catch(function () { .catch(function () {
+55 -13
View File
@@ -3,6 +3,9 @@
from __future__ import annotations from __future__ import annotations
import os import os
import re
from nav_session_auth import nav_embed_session_active, request_is_https
def nav_embed_allowed() -> bool: def nav_embed_allowed() -> bool:
@@ -19,26 +22,33 @@ def nav_embed_origins() -> str:
def nav_session_middleware_kwargs() -> dict: def nav_session_middleware_kwargs() -> dict:
""" """跨站 iframeSameSite=Nonehttps_only 保持 False,由响应 patch 补 Secure。"""
LocalNav 等跨站 iframe 内登录须 SameSite=None + Secure(仅 HTTPS 站点有效)。 if not nav_embed_session_active():
NAV_EMBED_SESSION=1 强制开启;auto 时在配置了 NAV_EMBED_ORIGINS 时开启。
"""
raw = (os.getenv("NAV_EMBED_SESSION") or "auto").strip().lower()
if raw in ("0", "false", "no", "off"):
return {"same_site": "lax", "https_only": False}
if raw in ("1", "true", "yes", "on"):
return {"same_site": "none", "https_only": True}
if raw == "auto":
origins = nav_embed_origins()
if origins and origins != "*":
return {"same_site": "none", "https_only": True}
return {"same_site": "lax", "https_only": False} return {"same_site": "lax", "https_only": False}
return {"same_site": "none", "https_only": False}
def install_proxy_headers(app) -> None:
if (os.getenv("NAV_TRUST_PROXY") or "").strip().lower() not in (
"1",
"true",
"yes",
"on",
):
return
try:
from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware
app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*")
except Exception:
pass
def install_nav_embed(app) -> None: def install_nav_embed(app) -> None:
if not nav_embed_allowed(): if not nav_embed_allowed():
return return
origins = nav_embed_origins() origins = nav_embed_origins()
install_proxy_headers(app)
@app.middleware("http") @app.middleware("http")
async def _nav_embed_frame_headers(request, call_next): async def _nav_embed_frame_headers(request, call_next):
@@ -48,4 +58,36 @@ def install_nav_embed(app) -> None:
else: else:
parts = " ".join(o.strip() for o in origins.split(",") if o.strip()) parts = " ".join(o.strip() for o in origins.split(",") if o.strip())
response.headers["Content-Security-Policy"] = f"frame-ancestors 'self' {parts}" response.headers["Content-Security-Policy"] = f"frame-ancestors 'self' {parts}"
if nav_embed_session_active():
_patch_set_cookie_for_embed(response, request)
return response return response
def _patch_set_cookie_for_embed(response, request) -> None:
"""HTTPS 访问时强制 session Cookie 为 SameSite=None; Secure。"""
force = (os.getenv("NAV_EMBED_FORCE_SECURE") or "1").strip().lower() in (
"1",
"true",
"yes",
"on",
)
if not request_is_https(request) and not force:
return
raw = response.headers.get("set-cookie")
if not raw:
return
def _fix_one(part: str) -> str:
p = part.strip()
if not p or "session=" not in p.lower():
return p
p = re.sub(r";\s*SameSite=[^;]*", "", p, flags=re.I)
p = re.sub(r";\s*Secure(?=;|$)", "", p, flags=re.I)
p += "; Secure; SameSite=none"
return p
if "," in raw and "session=" in raw.lower():
parts = re.split(r",(?=\s*[^;,]+=)", raw)
response.headers["set-cookie"] = ", ".join(_fix_one(x) for x in parts)
else:
response.headers["set-cookie"] = _fix_one(raw)
+87
View File
@@ -0,0 +1,87 @@
"""iframe 嵌入登录(LocalNav 代签 + /embed-auth 写 SameSite=None Cookie)。"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import time
from secrets import compare_digest
EMBED_BOOTSTRAP_TTL_SEC = int(os.getenv("NAV_EMBED_BOOTSTRAP_TTL_SEC", "120"))
def _secret(explicit: str | None = None) -> bytes:
raw = (explicit or os.getenv("NAV_SESSION_SECRET") or "").strip()
if not raw:
raw = "gate-scout-nav-embed-insecure"
return raw.encode("utf-8")
def _b64url_encode(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=")
def _b64url_decode(text: str) -> bytes:
pad = "=" * (-len(text) % 4)
return base64.urlsafe_b64decode(text + pad)
def create_embed_bootstrap_token(username: str, *, secret: str | None = None) -> str:
"""短效 token,供 /embed-auth 在 iframe 内写入 session。"""
payload = {
"kind": "embed",
"exp": int(time.time()) + max(30, EMBED_BOOTSTRAP_TTL_SEC),
"u": (username or "admin").strip(),
}
body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode("utf-8"))
sig = hmac.new(_secret(secret), body.encode("ascii"), hashlib.sha256).hexdigest()
return f"{body}.{sig}"
def validate_embed_bootstrap_token(token: str | None, *, secret: str | None = None) -> tuple[bool, str]:
raw = (token or "").strip()
if not raw or "." not in raw:
return False, ""
body, sig = raw.rsplit(".", 1)
try:
expect = hmac.new(_secret(secret), body.encode("ascii"), hashlib.sha256).hexdigest()
if not compare_digest(expect, sig):
return False, ""
payload = json.loads(_b64url_decode(body))
except Exception:
return False, ""
if not isinstance(payload, dict) or payload.get("kind") != "embed":
return False, ""
if int(payload.get("exp") or 0) <= int(time.time()):
return False, ""
return True, str(payload.get("u") or "admin").strip()
def safe_next_path(raw: str | None) -> str:
p = (raw or "/dashboard").strip() or "/dashboard"
if not p.startswith("/") or p.startswith("//") or "://" in p:
return "/dashboard"
return p
def request_is_https(request) -> bool:
proto = (
(request.headers.get("x-forwarded-proto") or getattr(request.url, "scheme", "http") or "http")
.split(",")[0]
.strip()
.lower()
)
return proto == "https"
def nav_embed_session_active() -> bool:
raw = (os.getenv("NAV_EMBED_SESSION") or "auto").strip().lower()
if raw in ("1", "true", "yes", "on"):
return True
if raw in ("0", "false", "no", "off"):
return False
origins = (os.getenv("NAV_EMBED_ORIGINS") or "").strip()
return bool(origins and origins != "*")
+38
View File
@@ -187,11 +187,19 @@ def create_app(settings: Settings) -> FastAPI:
if str(_root) not in sys.path: if str(_root) not in sys.path:
sys.path.insert(0, str(_root)) sys.path.insert(0, str(_root))
from nav_embed import install_nav_embed, nav_session_middleware_kwargs from nav_embed import install_nav_embed, nav_session_middleware_kwargs
from nav_session_auth import (
create_embed_bootstrap_token,
nav_embed_session_active,
safe_next_path,
)
install_nav_embed(app) install_nav_embed(app)
_sess_kw = nav_session_middleware_kwargs() _sess_kw = nav_session_middleware_kwargs()
except Exception: except Exception:
_sess_kw = {"same_site": "lax", "https_only": False} _sess_kw = {"same_site": "lax", "https_only": False}
nav_embed_session_active = lambda: False # type: ignore
create_embed_bootstrap_token = None # type: ignore
safe_next_path = lambda raw=None: "/dashboard" # type: ignore
app.add_middleware(GZipMiddleware, minimum_size=800) app.add_middleware(GZipMiddleware, minimum_size=800)
app.add_middleware( app.add_middleware(
SessionMiddleware, SessionMiddleware,
@@ -329,14 +337,44 @@ def create_app(settings: Settings) -> FastAPI:
return JSONResponse({"ok": False, "detail": "请求格式错误"}, status_code=400) return JSONResponse({"ok": False, "detail": "请求格式错误"}, status_code=400)
username = str(body.get("username") or "").strip() username = str(body.get("username") or "").strip()
password = str(body.get("password") or "") password = str(body.get("password") or "")
embed = (request.headers.get("x-nav-embed") or "").strip() == "1" or str(
body.get("embed") or ""
).strip().lower() in ("1", "true", "yes")
ok_user = username == app.state.auth_user ok_user = username == app.state.auth_user
ok_pass = _hash_password(password) == app.state.auth_password_hash ok_pass = _hash_password(password) == app.state.auth_password_hash
if ok_user and ok_pass: if ok_user and ok_pass:
request.session["logged_in"] = True request.session["logged_in"] = True
request.session["username"] = username request.session["username"] = username
if embed or nav_embed_session_active():
from urllib.parse import urlencode
nxt = safe_next_path(str(body.get("next") or "/dashboard"))
boot = create_embed_bootstrap_token(username, secret=settings.app.session_secret)
q = urlencode({"token": boot, "next": nxt, "embed": "1"})
return JSONResponse(
{
"ok": True,
"redirect": nxt,
"session_token": boot,
"embed_auth_url": f"/embed-auth?{q}",
}
)
return JSONResponse({"ok": True, "redirect": "/dashboard"}) return JSONResponse({"ok": True, "redirect": "/dashboard"})
return JSONResponse({"ok": False, "detail": "用户名或密码错误"}, status_code=401) return JSONResponse({"ok": False, "detail": "用户名或密码错误"}, status_code=401)
@app.get("/embed-auth")
async def embed_auth(request: Request, token: str = "", next: str = "/dashboard") -> RedirectResponse:
from nav_session_auth import safe_next_path, validate_embed_bootstrap_token
if not settings.auth.enabled:
return RedirectResponse(safe_next_path(next), status_code=302)
ok, username = validate_embed_bootstrap_token(token, secret=settings.app.session_secret)
if ok:
request.session["logged_in"] = True
request.session["username"] = username or settings.auth.username
return RedirectResponse(safe_next_path(next), status_code=302)
return RedirectResponse("/login?embed=1", status_code=302)
@app.get("/logout") @app.get("/logout")
async def logout(request: Request) -> RedirectResponse: async def logout(request: Request) -> RedirectResponse:
request.session.clear() request.session.clear()
+9 -2
View File
@@ -51,6 +51,7 @@
body: JSON.stringify({ body: JSON.stringify({
username: fd.get("username"), username: fd.get("username"),
password: fd.get("password"), password: fd.get("password"),
embed: window.self !== window.top ? "1" : "0",
}), }),
}) })
.then(function (r) { .then(function (r) {
@@ -59,10 +60,16 @@
}); });
}) })
.then(function (x) { .then(function (x) {
if (x.ok && x.body && x.body.redirect) { if (x.ok && x.body) {
window.location.href = x.body.redirect; var dest =
window.self !== window.top && x.body.embed_auth_url
? x.body.embed_auth_url
: x.body.redirect || "/dashboard";
if (dest) {
window.location.href = dest;
return; return;
} }
}
if (errEl) { if (errEl) {
errEl.textContent = (x.body && x.body.detail) || "登录失败"; errEl.textContent = (x.body && x.body.detail) || "登录失败";
} }