import json import os import secrets import urllib.error import urllib.request from pathlib import Path from typing import Optional from urllib.parse import urlencode from flask import Flask, flash, jsonify, redirect, render_template, request, url_for from flask_login import LoginManager, current_user, login_required, login_user, logout_user from flask_wtf.csrf import CSRFProtect from werkzeug.middleware.proxy_fix import ProxyFix from forms import GroupForm, LoginForm, ServiceForm from models import Service, ServiceGroup, User, db _ROOT = Path(__file__).resolve().parent def _load_env_file(path: Path) -> None: """若未安装 python-dotenv,用最小实现加载 .env(不覆盖已有环境变量)。""" if not path.is_file(): return try: raw = path.read_text(encoding="utf-8-sig") except OSError: return for line in raw.splitlines(): s = line.strip() if not s or s.startswith("#"): continue if s.lower().startswith("export "): s = s[7:].lstrip() if "=" not in s: continue key, _, rest = s.partition("=") key = key.strip() if not key: continue val = rest.strip() if len(val) >= 2 and val[0] == val[-1] and val[0] in "\"'": val = val[1:-1] if key not in os.environ: os.environ[key] = val try: from dotenv import load_dotenv as _dotenv_load except ImportError: _dotenv_load = None if _dotenv_load: _dotenv_load(_ROOT / ".env") else: _load_env_file(_ROOT / ".env") login_manager = LoginManager() csrf = CSRFProtect() def _apply_session_cookie_settings(app: Flask) -> None: """Secure Cookie 仅在浏览器用 https:// 访问时才有意义;用 http://IP:端口 时若仍设 Secure,浏览器会丢弃会话,表现为登录后立即回到登录页。""" if os.environ.get("NAV_COOKIES_INSECURE_HTTP") == "1": app.config["SESSION_COOKIE_SECURE"] = False app.config["REMEMBER_COOKIE_SECURE"] = False if (os.environ.get("NAV_SESSION_COOKIE_SECURE") or "").strip().lower() in ( "1", "true", "yes", ): print( "[nav] NAV_COOKIES_INSECURE_HTTP=1:已关闭 Secure Cookie,便于 http:// 内网访问。", flush=True, ) return mode = (os.environ.get("NAV_SESSION_COOKIE_SECURE") or "0").strip().lower() if mode in ("1", "true", "yes"): app.config["SESSION_COOKIE_SECURE"] = True app.config["REMEMBER_COOKIE_SECURE"] = True elif mode == "auto": app.config["SESSION_COOKIE_SECURE"] = False app.config["REMEMBER_COOKIE_SECURE"] = False @app.before_request def _nav_session_cookie_secure_auto(): sec = request.is_secure app.config["SESSION_COOKIE_SECURE"] = sec app.config["REMEMBER_COOKIE_SECURE"] = sec else: app.config["SESSION_COOKIE_SECURE"] = False app.config["REMEMBER_COOKIE_SECURE"] = False _HUB_SESSION_COOKIE = "hub_sess" def _hub_api_login(base: str, username: str, password: str) -> tuple[str | None, str | None]: """服务端登录中控,返回 (session_token, error_detail)。""" payload = json.dumps({"username": username, "password": password}).encode("utf-8") req = urllib.request.Request( f"{base.rstrip('/')}/api/auth/login", data=payload, headers={"Content-Type": "application/json", "Accept": "application/json"}, method="POST", ) try: with urllib.request.urlopen(req, timeout=15) as resp: raw = resp.read().decode("utf-8", errors="replace") except urllib.error.HTTPError as e: try: err_body = e.read().decode("utf-8", errors="replace") detail = json.loads(err_body).get("detail", err_body) except Exception: detail = str(e) return None, detail or "中控登录失败" except Exception as e: return None, f"无法连接中控: {e}" try: data = json.loads(raw) except json.JSONDecodeError: return None, "中控返回非 JSON" if not data.get("ok"): return None, data.get("detail") or "登录失败" token = (data.get("session_token") or "").strip() if not token: return None, "中控未返回 session_token,请升级云端 hub 后重试" return token, None def _hub_instance_open_url( base: str, session_token: str, exchange_id: str, next_path: str, *, embed: bool = True ) -> tuple[str | None, str | None]: """已登录中控会话下签发实例 SSO 打开链接。""" params = {"exchange_id": str(exchange_id), "next": next_path or "/"} if embed: params["embed"] = "1" q = urlencode(params) req = urllib.request.Request( f"{base.rstrip('/')}/api/instance/open-url?{q}", headers={ "Accept": "application/json", "Cookie": f"{_HUB_SESSION_COOKIE}={session_token}", }, ) try: with urllib.request.urlopen(req, timeout=15) as resp: raw = resp.read().decode("utf-8", errors="replace") except urllib.error.HTTPError as e: try: err_body = e.read().decode("utf-8", errors="replace") detail = json.loads(err_body).get("detail", err_body) except Exception: detail = str(e) return None, detail or "无法签发实例链接" except Exception as e: return None, f"无法连接中控: {e}" try: data = json.loads(raw) except json.JSONDecodeError: return None, "中控返回非 JSON" url = (data.get("url") or "").strip() if not data.get("ok") or not url: return None, data.get("detail") or "无法签发实例链接" return url, None def _resolve_hub_embed_service(body: dict) -> tuple[str | None, str | None, str | None]: """返回 (base_origin, default_next_path, error_detail)。""" sid = body.get("service_id") base = (body.get("base_url") or "").strip().rstrip("/") next_path = (body.get("next") or "/monitor").strip() or "/monitor" if not next_path.startswith("/"): next_path = "/" + next_path if sid: svc = db.session.get(Service, int(sid)) if not svc or not svc.is_hub_embed(): return None, None, "服务不存在或未标记为中控" base = svc.build_origin() if not body.get("next") or next_path == "/monitor": p = (svc.path or "/monitor").strip() or "/monitor" next_path = p if p.startswith("/") else "/" + p if not base: return None, None, "缺少 base_url" return base, next_path, None def _resolve_gate_scout_embed_service( body: dict, ) -> tuple[str | None, str | None, str | None, str | None]: """返回 (base_origin, default_next_path, embed_kind, error_detail)。""" sid = body.get("service_id") base = (body.get("base_url") or "").strip().rstrip("/") next_path = (body.get("next") or "/dashboard").strip() or "/dashboard" embed_kind = (body.get("embed_kind") or "").strip().lower() if not next_path.startswith("/"): next_path = "/" + next_path if sid: svc = db.session.get(Service, int(sid)) if not svc or not svc.is_gate_scout_embed(): return None, None, None, "服务不存在或未标记为 Gate 扫单嵌入" base = svc.build_origin() embed_kind = (svc.embed_kind or "").strip().lower() if not body.get("next") or next_path == "/dashboard": p = (svc.path or "/dashboard").strip() or "/dashboard" next_path = p if p.startswith("/") else "/" + p if not base: return None, None, None, "缺少 base_url" if embed_kind in ("gate_exec", "exec"): login_path = "/login" else: login_path = "/api/auth/login" return base, next_path, login_path, None def _gate_scout_api_login( base: str, username: str, password: str, *, login_path: str, next_path: str ) -> tuple[str | None, str | None]: """服务端登录 Gate 扫单/执行器,返回 (完整 embed_auth_url, error_detail)。""" payload = json.dumps( {"username": username, "password": password, "embed": "1", "next": next_path} ).encode("utf-8") req = urllib.request.Request( f"{base.rstrip('/')}{login_path}", data=payload, headers={ "Content-Type": "application/json", "Accept": "application/json", "X-Nav-Embed": "1", }, method="POST", ) try: with urllib.request.urlopen(req, timeout=15) as resp: raw = resp.read().decode("utf-8", errors="replace") except urllib.error.HTTPError as e: try: err_body = e.read().decode("utf-8", errors="replace") detail = json.loads(err_body).get("detail", err_body) except Exception: detail = str(e) return None, detail or "Gate 扫单登录失败" except Exception as e: return None, f"无法连接 Gate 服务: {e}" try: data = json.loads(raw) except json.JSONDecodeError: return None, "Gate 服务返回非 JSON" if not data.get("ok"): return None, data.get("detail") or "登录失败" embed_url = (data.get("embed_auth_url") or "").strip() if embed_url.startswith("/"): embed_url = base.rstrip("/") + embed_url if not embed_url: token = (data.get("session_token") or "").strip() if token: q = urlencode({"token": token, "next": next_path, "embed": "1"}) embed_url = f"{base.rstrip('/')}/embed-auth?{q}" if not embed_url: return None, "未返回 embed_auth_url,请确认云端已设置 NAV_EMBED_SESSION=1 并重启 PM2" return embed_url, None def create_app() -> Flask: app = Flask(__name__) app.config["SECRET_KEY"] = os.environ.get("NAV_SECRET_KEY") or secrets.token_hex(32) app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get( "NAV_DATABASE_URL", "sqlite:///nav_local.db" ) app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["WTF_CSRF_TIME_LIMIT"] = None _apply_session_cookie_settings(app) trusted = os.environ.get("NAV_CSRF_TRUSTED_ORIGINS", "").strip() if trusted: app.config["WTF_CSRF_TRUSTED_ORIGINS"] = [ o.strip() for o in trusted.split(",") if o.strip() ] db.init_app(app) login_manager.init_app(app) login_manager.login_view = "login" login_manager.login_message = "请先登录" csrf.init_app(app) @login_manager.user_loader def load_user(user_id: str): return db.session.get(User, int(user_id)) with app.app_context(): db.create_all() _migrate_schema() _ensure_default_user() _ensure_admin_from_env() _ensure_gate_scout_services() @app.route("/login", methods=["GET", "POST"]) def login(): if current_user.is_authenticated: return redirect(url_for("index")) form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data.strip()).first() if user and user.check_password(form.password.data): login_user(user, remember=True) next_url = request.args.get("next") if next_url and next_url.startswith("/"): return redirect(next_url) return redirect(url_for("index")) flash("用户名或密码错误", "error") return render_template("login.html", form=form) @app.route("/logout") @login_required def logout(): logout_user() flash("已退出登录", "info") return redirect(url_for("login")) @app.route("/") @login_required def index(): groups = ( ServiceGroup.query.order_by(ServiceGroup.sort_order, ServiceGroup.id).all() ) grouped = [] for g in groups: svcs = ( Service.query.filter_by(group_id=g.id) .order_by(Service.sort_order, Service.id) .all() ) grouped.append((g, svcs)) return render_template( "index.html", grouped=grouped, hub_auto_login=os.environ.get("NAV_HUB_AUTO_LOGIN", "").strip() == "1", gate_scout_auto_login=os.environ.get("NAV_GATE_SCOUT_AUTO_LOGIN", "").strip() == "1", ) @app.route("/api/embed/hub-login", methods=["POST"]) @csrf.exempt @login_required def api_embed_hub_login(): """ 本地导航代登录云端中控:服务端请求 hub /api/auth/login,返回 embed-auth URL。 避免浏览器在跨站 iframe 里丢弃 Set-Cookie。 """ body = request.get_json(silent=True) or {} username = (body.get("username") or os.environ.get("NAV_HUB_USERNAME") or "").strip() password = body.get("password") if password is None: password = os.environ.get("NAV_HUB_PASSWORD") or "" password = str(password) base, next_path, err = _resolve_hub_embed_service(body) if err: return jsonify({"ok": False, "detail": err}), 400 if not username or not password: return jsonify({"ok": False, "detail": "缺少中控用户名或密码(可配置 NAV_HUB_USERNAME / NAV_HUB_PASSWORD)"}), 400 token, err = _hub_api_login(base, username, password) if err: status = 401 if "登录" in err or "401" in err else 502 return jsonify({"ok": False, "detail": err}), status q = urlencode({"token": token, "next": next_path}) embed_url = f"{base}/embed-auth?{q}" return jsonify({"ok": True, "embed_auth_url": embed_url, "next": next_path}) @app.route("/api/embed/gate-scout-login", methods=["POST"]) @csrf.exempt @login_required def api_embed_gate_scout_login(): """ 本地导航代登录 Gate 扫描端/执行器:服务端请求 /api/auth/login 或 /login, 再让 iframe 打开 /embed-auth 写入 SameSite=None Cookie。 """ body = request.get_json(silent=True) or {} username = ( body.get("username") or os.environ.get("NAV_GATE_SCOUT_USERNAME") or "" ).strip() password = body.get("password") if password is None: password = os.environ.get("NAV_GATE_SCOUT_PASSWORD") or "" password = str(password) base, next_path, login_path, err = _resolve_gate_scout_embed_service(body) if err: return jsonify({"ok": False, "detail": err}), 400 if not username or not password: return jsonify( { "ok": False, "detail": "缺少 Gate 扫单用户名或密码(可配置 NAV_GATE_SCOUT_USERNAME / NAV_GATE_SCOUT_PASSWORD)", } ), 400 embed_url, err = _gate_scout_api_login( base, username, password, login_path=login_path, next_path=next_path, ) if err: status = 401 if "登录" in err or "401" in err or "密码" in err else 502 return jsonify({"ok": False, "detail": err}), status return jsonify({"ok": True, "embed_auth_url": embed_url, "next": next_path}) @app.route("/api/embed/hub-instance-url", methods=["POST"]) @csrf.exempt @login_required def api_embed_hub_instance_url(): """ 本地导航代签实例 SSO:服务端登录中控后调用 /api/instance/open-url。 用于 LocalNav iframe 直接打开实例(避免中控内再嵌一层 iframe)。 """ body = request.get_json(silent=True) or {} exchange_id = str(body.get("exchange_id") or "").strip() next_path = (body.get("next") or "/").strip() or "/" if not next_path.startswith("/"): next_path = "/" + next_path if not exchange_id: return jsonify({"ok": False, "detail": "缺少 exchange_id"}), 400 username = (body.get("username") or os.environ.get("NAV_HUB_USERNAME") or "").strip() password = body.get("password") if password is None: password = os.environ.get("NAV_HUB_PASSWORD") or "" password = str(password) base, _, err = _resolve_hub_embed_service(body) if err: return jsonify({"ok": False, "detail": err}), 400 if not username or not password: return jsonify({"ok": False, "detail": "缺少中控用户名或密码(可配置 NAV_HUB_USERNAME / NAV_HUB_PASSWORD)"}), 400 token, err = _hub_api_login(base, username, password) if err: status = 401 if "登录" in err or "401" in err else 502 return jsonify({"ok": False, "detail": err}), status url, err = _hub_instance_open_url( base, token, exchange_id, next_path, embed=(body.get("embed") or "1").strip().lower() not in ("0", "false", "no"), ) if err: return jsonify({"ok": False, "detail": err}), 502 return jsonify({"ok": True, "url": url, "exchange_id": exchange_id, "next": next_path}) # ---------- 分组管理 ---------- @app.route("/admin/groups") @login_required def admin_groups(): rows = ServiceGroup.query.order_by( ServiceGroup.sort_order, ServiceGroup.id ).all() return render_template("admin_groups.html", groups=rows) @app.route("/admin/groups/new", methods=["GET", "POST"]) @login_required def admin_group_new(): form = GroupForm() if form.validate_on_submit(): g = ServiceGroup( name=form.name.data.strip(), sort_order=form.sort_order.data or 0, ) db.session.add(g) db.session.commit() flash("分组已创建", "success") return redirect(url_for("admin_groups")) return render_template("admin_group_form.html", form=form, title="新建分组") @app.route("/admin/groups//edit", methods=["GET", "POST"]) @login_required def admin_group_edit(gid: int): g = db.session.get(ServiceGroup, gid) if not g: flash("分组不存在", "error") return redirect(url_for("admin_groups")) form = GroupForm(obj=g) if form.validate_on_submit(): g.name = form.name.data.strip() g.sort_order = form.sort_order.data or 0 db.session.commit() flash("分组已更新", "success") return redirect(url_for("admin_groups")) return render_template("admin_group_form.html", form=form, title="编辑分组") @app.route("/admin/groups//delete", methods=["POST"]) @login_required def admin_group_delete(gid: int): g = db.session.get(ServiceGroup, gid) if not g: flash("分组不存在", "error") return redirect(url_for("admin_groups")) db.session.delete(g) db.session.commit() flash("分组及其下属服务已删除", "success") return redirect(url_for("admin_groups")) # ---------- 服务管理 ---------- @app.route("/admin/services") @login_required def admin_services(): groups = ServiceGroup.query.order_by( ServiceGroup.sort_order, ServiceGroup.id ).all() gid = request.args.get("group_id", type=int) q = Service.query if gid: q = q.filter_by(group_id=gid) services = q.order_by(Service.group_id, Service.sort_order, Service.id).all() return render_template( "admin_services.html", services=services, groups=groups, filter_group_id=gid, ) @app.route("/admin/services/new", methods=["GET", "POST"]) @login_required def admin_service_new(): form = ServiceForm() form.group_id.choices = _group_choices() if not form.group_id.choices: flash("请先创建至少一个分组", "error") return redirect(url_for("admin_groups")) groups = ServiceGroup.query.order_by( ServiceGroup.sort_order, ServiceGroup.id ).all() if request.method == "GET": gid = request.args.get("group_id", type=int) if gid and any(gid == c[0] for c in form.group_id.choices): form.group_id.data = gid else: fid = _first_group_id() if fid is not None: form.group_id.data = fid if form.validate_on_submit(): path = (form.path.data or "").strip() or "/" s = Service( name=form.name.data.strip(), scheme=form.scheme.data, host=form.host.data.strip(), port=form.port.data, path=path, sort_order=form.sort_order.data or 0, group_id=form.group_id.data, embed_kind=(form.embed_kind.data or "").strip(), ) db.session.add(s) db.session.commit() flash("服务已添加", "success") return redirect(url_for("admin_services")) return render_template( "admin_service_form.html", form=form, groups=groups, title="新建服务", ) @app.route("/admin/services//edit", methods=["GET", "POST"]) @login_required def admin_service_edit(sid: int): s = db.session.get(Service, sid) if not s: flash("服务不存在", "error") return redirect(url_for("admin_services")) form = ServiceForm(obj=s) form.group_id.choices = _group_choices() if not form.group_id.choices: flash("请先创建至少一个分组", "error") return redirect(url_for("admin_groups")) groups = ServiceGroup.query.order_by( ServiceGroup.sort_order, ServiceGroup.id ).all() if form.validate_on_submit(): s.name = form.name.data.strip() s.scheme = form.scheme.data s.host = form.host.data.strip() s.port = form.port.data s.path = (form.path.data or "").strip() or "/" s.sort_order = form.sort_order.data or 0 s.group_id = form.group_id.data s.embed_kind = (form.embed_kind.data or "").strip() db.session.commit() flash("服务已更新", "success") return redirect(url_for("admin_services")) return render_template( "admin_service_form.html", form=form, groups=groups, title="编辑服务", ) @app.route("/admin/services//delete", methods=["POST"]) @login_required def admin_service_delete(sid: int): s = db.session.get(Service, sid) if not s: flash("服务不存在", "error") return redirect(url_for("admin_services")) db.session.delete(s) db.session.commit() flash("服务已删除", "success") return redirect(url_for("admin_services")) if os.environ.get("NAV_TRUST_PROXY") == "1": app.wsgi_app = ProxyFix( app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_prefix=1 ) if not os.environ.get("NAV_SECRET_KEY"): print( "[nav] 警告: 未设置 NAV_SECRET_KEY。" "若使用 gunicorn/uwsgi 等多 worker,或未固定密钥,登录后会话会失效;" "请在环境变量中配置随机 NAV_SECRET_KEY。", flush=True, ) return app def _migrate_schema() -> None: """SQLite 已有库补列(db.create_all 不会 ALTER 已有表)。""" from sqlalchemy import inspect, text try: insp = inspect(db.engine) if "services" not in insp.get_table_names(): return cols = {c["name"] for c in insp.get_columns("services")} if "scheme" not in cols: with db.engine.begin() as conn: conn.execute( text( "ALTER TABLE services ADD COLUMN scheme VARCHAR(8) " "NOT NULL DEFAULT 'http'" ) ) print("[nav] 已为 services 表添加 scheme 列(默认 http)。", flush=True) cols = {c["name"] for c in insp.get_columns("services")} if "embed_kind" not in cols: with db.engine.begin() as conn: conn.execute( text( "ALTER TABLE services ADD COLUMN embed_kind VARCHAR(16) " "NOT NULL DEFAULT ''" ) ) print("[nav] 已为 services 表添加 embed_kind 列。", flush=True) except Exception as exc: print(f"[nav] 数据库结构迁移跳过或失败: {exc}", flush=True) def _first_group_id() -> Optional[int]: g = ServiceGroup.query.order_by(ServiceGroup.sort_order, ServiceGroup.id).first() return g.id if g else None def _group_choices(): groups = ServiceGroup.query.order_by( ServiceGroup.sort_order, ServiceGroup.id ).all() return [(g.id, g.name) for g in groups] def _ensure_default_user() -> None: if User.query.count() > 0: return username = (os.environ.get("NAV_ADMIN_USERNAME") or "admin").strip() or "admin" password = os.environ.get("NAV_ADMIN_PASSWORD") or "admin123" u = User(username=username) u.set_password(password) db.session.add(u) db.session.add(ServiceGroup(name="默认分组", sort_order=0)) db.session.commit() print( f"[nav] 首次运行:已创建管理员「{username}」" "(来自 NAV_ADMIN_USERNAME / NAV_ADMIN_PASSWORD,未设置则为 admin / admin123)。" "生产环境请尽快改为强密码。" ) def _ensure_gate_scout_services() -> None: """gate_scout_order:扫描端 + 执行器(NAV_SEED_GATE_SCOUT=1;云服务器用域名 + https)。""" flag = (os.environ.get("NAV_SEED_GATE_SCOUT") or "").strip().lower() if flag not in ("1", "true", "yes", "on"): return scheme = (os.environ.get("NAV_GATE_SCOUT_SCHEME") or "http").strip().lower() if scheme not in ("http", "https"): scheme = "http" default_host = (os.environ.get("NAV_GATE_SCOUT_HOST") or "127.0.0.1").strip() or "127.0.0.1" scout_host = (os.environ.get("NAV_GATE_SCOUT_SCOUT_HOST") or default_host).strip() or default_host exec_host = (os.environ.get("NAV_GATE_EXECUTOR_HOST") or default_host).strip() or default_host default_port = "443" if scheme == "https" else "8088" default_exec_port = "443" if scheme == "https" else "8090" try: scout_port = int(os.environ.get("NAV_GATE_SCOUT_PORT") or default_port) exec_port = int(os.environ.get("NAV_GATE_EXECUTOR_PORT") or default_exec_port) except ValueError: scout_port = 443 if scheme == "https" else 8088 exec_port = 443 if scheme == "https" else 8090 scout_path = (os.environ.get("NAV_GATE_SCOUT_PATH") or "/dashboard").strip() or "/dashboard" exec_path = (os.environ.get("NAV_GATE_EXECUTOR_PATH") or "/dashboard").strip() or "/dashboard" if not scout_path.startswith("/"): scout_path = "/" + scout_path if not exec_path.startswith("/"): exec_path = "/" + exec_path update_existing = (os.environ.get("NAV_GATE_SCOUT_UPDATE") or "").strip().lower() in ( "1", "true", "yes", "on", ) group_name = (os.environ.get("NAV_GATE_SCOUT_GROUP") or "Gate 扫单").strip() or "Gate 扫单" g = ServiceGroup.query.filter_by(name=group_name).first() if not g: g = ServiceGroup(name=group_name, sort_order=50) db.session.add(g) db.session.flush() defs = ( ("Gate 扫描端", scout_host, scout_port, scout_path, 0, "gate_scout"), ("Gate 下单执行器", exec_host, exec_port, exec_path, 10, "gate_exec"), ) added = 0 updated = 0 for name, h, port, path, order, embed_k in defs: existing = Service.query.filter_by(group_id=g.id, name=name).first() if existing: if update_existing and ( existing.scheme != scheme or existing.host != h or existing.port != port or existing.path != path or (existing.embed_kind or "") != embed_k ): existing.scheme = scheme existing.host = h existing.port = port existing.path = path existing.embed_kind = embed_k updated += 1 elif not (existing.embed_kind or "").strip(): existing.embed_kind = embed_k updated += 1 continue db.session.add( Service( name=name, scheme=scheme, host=h, port=port, path=path, sort_order=order, group_id=g.id, embed_kind=embed_k, ) ) added += 1 if added or updated: db.session.commit() print( f"[nav] Gate 扫单:新增 {added}、更新 {updated}(分组「{group_name}」)。" f" 扫描 {scheme}://{scout_host}:{scout_port}{scout_path};" f"执行器 {scheme}://{exec_host}:{exec_port}{exec_path}", flush=True, ) def _ensure_admin_from_env() -> None: """库中已有用户后,.env 里的管理员账号不会自动覆盖旧库;若配置了用户名且尚不存在则补建。""" username = (os.environ.get("NAV_ADMIN_USERNAME") or "").strip() password = os.environ.get("NAV_ADMIN_PASSWORD") if not username or password is None or password == "": return existing = User.query.filter_by(username=username).first() if existing: if os.environ.get("NAV_ADMIN_UPDATE_PASSWORD") == "1": existing.set_password(password) db.session.commit() print( f"[nav] 已按 NAV_ADMIN_UPDATE_PASSWORD=1 更新用户「{username}」的登录密码。", flush=True, ) return u = User(username=username) u.set_password(password) db.session.add(u) db.session.commit() print( f"[nav] 已从环境变量创建用户「{username}」(此前库中无此用户名)。", flush=True, ) app = create_app() if __name__ == "__main__": host = os.environ.get("NAV_HOST", "0.0.0.0") port = int(os.environ.get("NAV_PORT", "5070")) app.run(host=host, port=port, debug=os.environ.get("NAV_DEBUG") == "1")