diff --git a/gate_order_executor/app/main.py b/gate_order_executor/app/main.py index e6053cd..bf73b91 100644 --- a/gate_order_executor/app/main.py +++ b/gate_order_executor/app/main.py @@ -246,9 +246,41 @@ async def login_post(request: Request, body: LoginBody) -> JSONResponse | Redire if body.username.strip() != settings.auth.username.strip() or _hash_password(body.password) != _password_hash(): return JSONResponse({"ok": False, "detail": "账号或密码错误"}, status_code=401) request.session["logged_in"] = True + embed_hdr = (request.headers.get("x-nav-embed") or "").strip() == "1" + try: + from nav_session_auth import create_embed_bootstrap_token, nav_embed_session_active, safe_next_path + from urllib.parse import urlencode + + if embed_hdr or nav_embed_session_active(): + nxt = safe_next_path("/dashboard") + boot = create_embed_bootstrap_token(body.username.strip(), secret=settings.app.session_secret) + q = urlencode({"token": boot, "next": nxt, "embed": "1"}) + return JSONResponse( + { + "ok": True, + "redirect": nxt, + "session_token": boot, + "embed_auth_url": f"/embed-auth?{q}", + } + ) + except Exception: + pass return JSONResponse({"ok": True, "redirect": "/dashboard"}) +@app.get("/embed-auth", response_model=None) +async def embed_auth(request: Request, token: str = "", next: str = "/dashboard") -> RedirectResponse: + from nav_session_auth import safe_next_path, validate_embed_bootstrap_token + + if not settings.auth.enabled: + return RedirectResponse(safe_next_path(next), status_code=302) + ok, _user = validate_embed_bootstrap_token(token, secret=settings.app.session_secret) + if ok: + request.session["logged_in"] = True + return RedirectResponse(safe_next_path(next), status_code=302) + return RedirectResponse("/login", status_code=302) + + @app.get("/logout", response_model=None) async def logout(request: Request) -> RedirectResponse: request.session.clear() diff --git a/gate_order_executor/templates/login.html b/gate_order_executor/templates/login.html index 0ab09ce..1513d71 100644 --- a/gate_order_executor/templates/login.html +++ b/gate_order_executor/templates/login.html @@ -78,7 +78,10 @@ var password = fd.get("password"); fetch("/login", { method: "POST", - headers: { "Content-Type": "application/json" }, + headers: { + "Content-Type": "application/json", + "X-Nav-Embed": window.self !== window.top ? "1" : "0", + }, credentials: "same-origin", body: JSON.stringify({ username: username, password: password }), }) @@ -88,9 +91,15 @@ }); }) .then(function (x) { - if (x.ok && x.body && x.body.redirect) { - window.location.href = x.body.redirect; - return; + if (x.ok && x.body) { + var dest = + window.self !== window.top && x.body.embed_auth_url + ? x.body.embed_auth_url + : x.body.redirect || "/dashboard"; + if (dest) { + window.location.href = dest; + return; + } } errEl.textContent = (x.body && x.body.detail) || "登录失败"; }) diff --git a/nav_embed.py b/nav_embed.py index 05730b8..768c63c 100644 --- a/nav_embed.py +++ b/nav_embed.py @@ -3,6 +3,9 @@ from __future__ import annotations import os +import re + +from nav_session_auth import nav_embed_session_active, request_is_https def nav_embed_allowed() -> bool: @@ -19,26 +22,33 @@ def nav_embed_origins() -> str: def nav_session_middleware_kwargs() -> dict: - """ - LocalNav 等跨站 iframe 内登录须 SameSite=None + Secure(仅 HTTPS 站点有效)。 - NAV_EMBED_SESSION=1 强制开启;auto 时在配置了 NAV_EMBED_ORIGINS 时开启。 - """ - raw = (os.getenv("NAV_EMBED_SESSION") or "auto").strip().lower() - if raw in ("0", "false", "no", "off"): + """跨站 iframe:SameSite=None;https_only 保持 False,由响应 patch 补 Secure。""" + if not nav_embed_session_active(): return {"same_site": "lax", "https_only": False} - if raw in ("1", "true", "yes", "on"): - return {"same_site": "none", "https_only": True} - if raw == "auto": - origins = nav_embed_origins() - if origins and origins != "*": - return {"same_site": "none", "https_only": True} - return {"same_site": "lax", "https_only": False} + return {"same_site": "none", "https_only": False} + + +def install_proxy_headers(app) -> None: + if (os.getenv("NAV_TRUST_PROXY") or "").strip().lower() not in ( + "1", + "true", + "yes", + "on", + ): + return + try: + from uvicorn.middleware.proxy_headers import ProxyHeadersMiddleware + + app.add_middleware(ProxyHeadersMiddleware, trusted_hosts="*") + except Exception: + pass def install_nav_embed(app) -> None: if not nav_embed_allowed(): return origins = nav_embed_origins() + install_proxy_headers(app) @app.middleware("http") async def _nav_embed_frame_headers(request, call_next): @@ -48,4 +58,36 @@ def install_nav_embed(app) -> None: else: parts = " ".join(o.strip() for o in origins.split(",") if o.strip()) response.headers["Content-Security-Policy"] = f"frame-ancestors 'self' {parts}" + if nav_embed_session_active(): + _patch_set_cookie_for_embed(response, request) return response + + +def _patch_set_cookie_for_embed(response, request) -> None: + """HTTPS 访问时强制 session Cookie 为 SameSite=None; Secure。""" + force = (os.getenv("NAV_EMBED_FORCE_SECURE") or "1").strip().lower() in ( + "1", + "true", + "yes", + "on", + ) + if not request_is_https(request) and not force: + return + raw = response.headers.get("set-cookie") + if not raw: + return + + def _fix_one(part: str) -> str: + p = part.strip() + if not p or "session=" not in p.lower(): + return p + p = re.sub(r";\s*SameSite=[^;]*", "", p, flags=re.I) + p = re.sub(r";\s*Secure(?=;|$)", "", p, flags=re.I) + p += "; Secure; SameSite=none" + return p + + if "," in raw and "session=" in raw.lower(): + parts = re.split(r",(?=\s*[^;,]+=)", raw) + response.headers["set-cookie"] = ", ".join(_fix_one(x) for x in parts) + else: + response.headers["set-cookie"] = _fix_one(raw) diff --git a/nav_session_auth.py b/nav_session_auth.py new file mode 100644 index 0000000..4d82e6a --- /dev/null +++ b/nav_session_auth.py @@ -0,0 +1,87 @@ +"""iframe 嵌入登录(LocalNav 代签 + /embed-auth 写 SameSite=None Cookie)。""" + +from __future__ import annotations + +import base64 +import hashlib +import hmac +import json +import os +import time +from secrets import compare_digest + +EMBED_BOOTSTRAP_TTL_SEC = int(os.getenv("NAV_EMBED_BOOTSTRAP_TTL_SEC", "120")) + + +def _secret(explicit: str | None = None) -> bytes: + raw = (explicit or os.getenv("NAV_SESSION_SECRET") or "").strip() + if not raw: + raw = "gate-scout-nav-embed-insecure" + return raw.encode("utf-8") + + +def _b64url_encode(data: bytes) -> str: + return base64.urlsafe_b64encode(data).decode("ascii").rstrip("=") + + +def _b64url_decode(text: str) -> bytes: + pad = "=" * (-len(text) % 4) + return base64.urlsafe_b64decode(text + pad) + + +def create_embed_bootstrap_token(username: str, *, secret: str | None = None) -> str: + """短效 token,供 /embed-auth 在 iframe 内写入 session。""" + payload = { + "kind": "embed", + "exp": int(time.time()) + max(30, EMBED_BOOTSTRAP_TTL_SEC), + "u": (username or "admin").strip(), + } + body = _b64url_encode(json.dumps(payload, separators=(",", ":")).encode("utf-8")) + sig = hmac.new(_secret(secret), body.encode("ascii"), hashlib.sha256).hexdigest() + return f"{body}.{sig}" + + +def validate_embed_bootstrap_token(token: str | None, *, secret: str | None = None) -> tuple[bool, str]: + raw = (token or "").strip() + if not raw or "." not in raw: + return False, "" + body, sig = raw.rsplit(".", 1) + try: + expect = hmac.new(_secret(secret), body.encode("ascii"), hashlib.sha256).hexdigest() + if not compare_digest(expect, sig): + return False, "" + payload = json.loads(_b64url_decode(body)) + except Exception: + return False, "" + if not isinstance(payload, dict) or payload.get("kind") != "embed": + return False, "" + if int(payload.get("exp") or 0) <= int(time.time()): + return False, "" + return True, str(payload.get("u") or "admin").strip() + + +def safe_next_path(raw: str | None) -> str: + p = (raw or "/dashboard").strip() or "/dashboard" + if not p.startswith("/") or p.startswith("//") or "://" in p: + return "/dashboard" + return p + + +def request_is_https(request) -> bool: + proto = ( + (request.headers.get("x-forwarded-proto") or getattr(request.url, "scheme", "http") or "http") + .split(",")[0] + .strip() + .lower() + ) + return proto == "https" + + +def nav_embed_session_active() -> bool: + raw = (os.getenv("NAV_EMBED_SESSION") or "auto").strip().lower() + if raw in ("1", "true", "yes", "on"): + return True + if raw in ("0", "false", "no", "off"): + return False + origins = (os.getenv("NAV_EMBED_ORIGINS") or "").strip() + return bool(origins and origins != "*") diff --git a/onchain_scout_gate/app/web.py b/onchain_scout_gate/app/web.py index a09396d..d0a1bb7 100644 --- a/onchain_scout_gate/app/web.py +++ b/onchain_scout_gate/app/web.py @@ -187,11 +187,19 @@ def create_app(settings: Settings) -> FastAPI: if str(_root) not in sys.path: sys.path.insert(0, str(_root)) from nav_embed import install_nav_embed, nav_session_middleware_kwargs + from nav_session_auth import ( + create_embed_bootstrap_token, + nav_embed_session_active, + safe_next_path, + ) install_nav_embed(app) _sess_kw = nav_session_middleware_kwargs() except Exception: _sess_kw = {"same_site": "lax", "https_only": False} + nav_embed_session_active = lambda: False # type: ignore + create_embed_bootstrap_token = None # type: ignore + safe_next_path = lambda raw=None: "/dashboard" # type: ignore app.add_middleware(GZipMiddleware, minimum_size=800) app.add_middleware( SessionMiddleware, @@ -329,14 +337,44 @@ def create_app(settings: Settings) -> FastAPI: return JSONResponse({"ok": False, "detail": "请求格式错误"}, status_code=400) username = str(body.get("username") or "").strip() password = str(body.get("password") or "") + embed = (request.headers.get("x-nav-embed") or "").strip() == "1" or str( + body.get("embed") or "" + ).strip().lower() in ("1", "true", "yes") ok_user = username == app.state.auth_user ok_pass = _hash_password(password) == app.state.auth_password_hash if ok_user and ok_pass: request.session["logged_in"] = True request.session["username"] = username + if embed or nav_embed_session_active(): + from urllib.parse import urlencode + + nxt = safe_next_path(str(body.get("next") or "/dashboard")) + boot = create_embed_bootstrap_token(username, secret=settings.app.session_secret) + q = urlencode({"token": boot, "next": nxt, "embed": "1"}) + return JSONResponse( + { + "ok": True, + "redirect": nxt, + "session_token": boot, + "embed_auth_url": f"/embed-auth?{q}", + } + ) return JSONResponse({"ok": True, "redirect": "/dashboard"}) return JSONResponse({"ok": False, "detail": "用户名或密码错误"}, status_code=401) + @app.get("/embed-auth") + async def embed_auth(request: Request, token: str = "", next: str = "/dashboard") -> RedirectResponse: + from nav_session_auth import safe_next_path, validate_embed_bootstrap_token + + if not settings.auth.enabled: + return RedirectResponse(safe_next_path(next), status_code=302) + ok, username = validate_embed_bootstrap_token(token, secret=settings.app.session_secret) + if ok: + request.session["logged_in"] = True + request.session["username"] = username or settings.auth.username + return RedirectResponse(safe_next_path(next), status_code=302) + return RedirectResponse("/login?embed=1", status_code=302) + @app.get("/logout") async def logout(request: Request) -> RedirectResponse: request.session.clear() diff --git a/onchain_scout_gate/templates/login.html b/onchain_scout_gate/templates/login.html index 9b0a97b..db1a4ca 100644 --- a/onchain_scout_gate/templates/login.html +++ b/onchain_scout_gate/templates/login.html @@ -51,6 +51,7 @@ body: JSON.stringify({ username: fd.get("username"), password: fd.get("password"), + embed: window.self !== window.top ? "1" : "0", }), }) .then(function (r) { @@ -59,9 +60,15 @@ }); }) .then(function (x) { - if (x.ok && x.body && x.body.redirect) { - window.location.href = x.body.redirect; - return; + if (x.ok && x.body) { + var dest = + window.self !== window.top && x.body.embed_auth_url + ? x.body.embed_auth_url + : x.body.redirect || "/dashboard"; + if (dest) { + window.location.href = dest; + return; + } } if (errEl) { errEl.textContent = (x.body && x.body.detail) || "登录失败";