Files
LocalNav/app.py
T
dekun f7ce6f1058 feat: 侧边栏分组图标与导航样式优化
- 分组支持 icon 字段,可按名称自动匹配或手动选择
- 左侧导航与总览卡片显示彩色 SVG 图标
- 优化侧栏链接圆角与选中态样式

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-30 18:06:42 +08:00

838 lines
31 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import os
import secrets
import urllib.error
import urllib.request
from pathlib import Path
from typing import Optional
from urllib.parse import urlencode
from flask import Flask, flash, jsonify, redirect, render_template, request, url_for
from flask_login import LoginManager, current_user, login_required, login_user, logout_user
from flask_wtf.csrf import CSRFProtect
from werkzeug.middleware.proxy_fix import ProxyFix
from forms import GroupForm, LoginForm, ServiceForm
from models import Service, ServiceGroup, User, db
_ROOT = Path(__file__).resolve().parent
def _load_env_file(path: Path) -> None:
"""若未安装 python-dotenv,用最小实现加载 .env(不覆盖已有环境变量)。"""
if not path.is_file():
return
try:
raw = path.read_text(encoding="utf-8-sig")
except OSError:
return
for line in raw.splitlines():
s = line.strip()
if not s or s.startswith("#"):
continue
if s.lower().startswith("export "):
s = s[7:].lstrip()
if "=" not in s:
continue
key, _, rest = s.partition("=")
key = key.strip()
if not key:
continue
val = rest.strip()
if len(val) >= 2 and val[0] == val[-1] and val[0] in "\"'":
val = val[1:-1]
if key not in os.environ:
os.environ[key] = val
try:
from dotenv import load_dotenv as _dotenv_load
except ImportError:
_dotenv_load = None
if _dotenv_load:
_dotenv_load(_ROOT / ".env")
else:
_load_env_file(_ROOT / ".env")
login_manager = LoginManager()
csrf = CSRFProtect()
def _apply_session_cookie_settings(app: Flask) -> None:
"""Secure Cookie 仅在浏览器用 https:// 访问时才有意义;用 http://IP:端口 时若仍设 Secure,浏览器会丢弃会话,表现为登录后立即回到登录页。"""
if os.environ.get("NAV_COOKIES_INSECURE_HTTP") == "1":
app.config["SESSION_COOKIE_SECURE"] = False
app.config["REMEMBER_COOKIE_SECURE"] = False
if (os.environ.get("NAV_SESSION_COOKIE_SECURE") or "").strip().lower() in (
"1",
"true",
"yes",
):
print(
"[nav] NAV_COOKIES_INSECURE_HTTP=1:已关闭 Secure Cookie,便于 http:// 内网访问。",
flush=True,
)
return
mode = (os.environ.get("NAV_SESSION_COOKIE_SECURE") or "0").strip().lower()
if mode in ("1", "true", "yes"):
app.config["SESSION_COOKIE_SECURE"] = True
app.config["REMEMBER_COOKIE_SECURE"] = True
elif mode == "auto":
app.config["SESSION_COOKIE_SECURE"] = False
app.config["REMEMBER_COOKIE_SECURE"] = False
@app.before_request
def _nav_session_cookie_secure_auto():
sec = request.is_secure
app.config["SESSION_COOKIE_SECURE"] = sec
app.config["REMEMBER_COOKIE_SECURE"] = sec
else:
app.config["SESSION_COOKIE_SECURE"] = False
app.config["REMEMBER_COOKIE_SECURE"] = False
_HUB_SESSION_COOKIE = "hub_sess"
def _hub_api_login(base: str, username: str, password: str) -> tuple[str | None, str | None]:
"""服务端登录中控,返回 (session_token, error_detail)。"""
payload = json.dumps({"username": username, "password": password}).encode("utf-8")
req = urllib.request.Request(
f"{base.rstrip('/')}/api/auth/login",
data=payload,
headers={"Content-Type": "application/json", "Accept": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
try:
err_body = e.read().decode("utf-8", errors="replace")
detail = json.loads(err_body).get("detail", err_body)
except Exception:
detail = str(e)
return None, detail or "中控登录失败"
except Exception as e:
return None, f"无法连接中控: {e}"
try:
data = json.loads(raw)
except json.JSONDecodeError:
return None, "中控返回非 JSON"
if not data.get("ok"):
return None, data.get("detail") or "登录失败"
token = (data.get("session_token") or "").strip()
if not token:
return None, "中控未返回 session_token,请升级云端 hub 后重试"
return token, None
def _hub_instance_open_url(
base: str, session_token: str, exchange_id: str, next_path: str, *, embed: bool = True
) -> tuple[str | None, str | None]:
"""已登录中控会话下签发实例 SSO 打开链接。"""
params = {"exchange_id": str(exchange_id), "next": next_path or "/"}
if embed:
params["embed"] = "1"
q = urlencode(params)
req = urllib.request.Request(
f"{base.rstrip('/')}/api/instance/open-url?{q}",
headers={
"Accept": "application/json",
"Cookie": f"{_HUB_SESSION_COOKIE}={session_token}",
},
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
try:
err_body = e.read().decode("utf-8", errors="replace")
detail = json.loads(err_body).get("detail", err_body)
except Exception:
detail = str(e)
return None, detail or "无法签发实例链接"
except Exception as e:
return None, f"无法连接中控: {e}"
try:
data = json.loads(raw)
except json.JSONDecodeError:
return None, "中控返回非 JSON"
url = (data.get("url") or "").strip()
if not data.get("ok") or not url:
return None, data.get("detail") or "无法签发实例链接"
return url, None
def _resolve_hub_embed_service(body: dict) -> tuple[str | None, str | None, str | None]:
"""返回 (base_origin, default_next_path, error_detail)。"""
sid = body.get("service_id")
base = (body.get("base_url") or "").strip().rstrip("/")
next_path = (body.get("next") or "/monitor").strip() or "/monitor"
if not next_path.startswith("/"):
next_path = "/" + next_path
if sid:
svc = db.session.get(Service, int(sid))
if not svc or not svc.is_hub_embed():
return None, None, "服务不存在或未标记为中控"
base = svc.build_origin()
if not body.get("next") or next_path == "/monitor":
p = (svc.path or "/monitor").strip() or "/monitor"
next_path = p if p.startswith("/") else "/" + p
if not base:
return None, None, "缺少 base_url"
return base, next_path, None
def _resolve_gate_scout_embed_service(
body: dict,
) -> tuple[str | None, str | None, str | None, str | None]:
"""返回 (base_origin, default_next_path, embed_kind, error_detail)。"""
sid = body.get("service_id")
base = (body.get("base_url") or "").strip().rstrip("/")
next_path = (body.get("next") or "/dashboard").strip() or "/dashboard"
embed_kind = (body.get("embed_kind") or "").strip().lower()
if not next_path.startswith("/"):
next_path = "/" + next_path
if sid:
svc = db.session.get(Service, int(sid))
if not svc or not svc.is_gate_scout_embed():
return None, None, None, "服务不存在或未标记为 Gate 扫单嵌入"
base = svc.build_origin()
embed_kind = (svc.embed_kind or "").strip().lower()
if not body.get("next") or next_path == "/dashboard":
p = (svc.path or "/dashboard").strip() or "/dashboard"
next_path = p if p.startswith("/") else "/" + p
if not base:
return None, None, None, "缺少 base_url"
if embed_kind in ("gate_exec", "exec"):
login_path = "/login"
else:
login_path = "/api/auth/login"
return base, next_path, login_path, None
def _gate_scout_api_login(
base: str, username: str, password: str, *, login_path: str, next_path: str
) -> tuple[str | None, str | None]:
"""服务端登录 Gate 扫单/执行器,返回 (完整 embed_auth_url, error_detail)。"""
payload = json.dumps(
{"username": username, "password": password, "embed": "1", "next": next_path}
).encode("utf-8")
req = urllib.request.Request(
f"{base.rstrip('/')}{login_path}",
data=payload,
headers={
"Content-Type": "application/json",
"Accept": "application/json",
"X-Nav-Embed": "1",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=15) as resp:
raw = resp.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
try:
err_body = e.read().decode("utf-8", errors="replace")
detail = json.loads(err_body).get("detail", err_body)
except Exception:
detail = str(e)
return None, detail or "Gate 扫单登录失败"
except Exception as e:
return None, f"无法连接 Gate 服务: {e}"
try:
data = json.loads(raw)
except json.JSONDecodeError:
return None, "Gate 服务返回非 JSON"
if not data.get("ok"):
return None, data.get("detail") or "登录失败"
embed_url = (data.get("embed_auth_url") or "").strip()
if embed_url.startswith("/"):
embed_url = base.rstrip("/") + embed_url
if not embed_url:
token = (data.get("session_token") or "").strip()
if token:
q = urlencode({"token": token, "next": next_path, "embed": "1"})
embed_url = f"{base.rstrip('/')}/embed-auth?{q}"
if not embed_url:
return None, "未返回 embed_auth_url,请确认云端已设置 NAV_EMBED_SESSION=1 并重启 PM2"
return embed_url, None
def create_app() -> Flask:
app = Flask(__name__)
app.config["SECRET_KEY"] = os.environ.get("NAV_SECRET_KEY") or secrets.token_hex(32)
app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get(
"NAV_DATABASE_URL", "sqlite:///nav_local.db"
)
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config["WTF_CSRF_TIME_LIMIT"] = None
_apply_session_cookie_settings(app)
trusted = os.environ.get("NAV_CSRF_TRUSTED_ORIGINS", "").strip()
if trusted:
app.config["WTF_CSRF_TRUSTED_ORIGINS"] = [
o.strip() for o in trusted.split(",") if o.strip()
]
db.init_app(app)
login_manager.init_app(app)
login_manager.login_view = "login"
login_manager.login_message = "请先登录"
csrf.init_app(app)
@login_manager.user_loader
def load_user(user_id: str):
return db.session.get(User, int(user_id))
with app.app_context():
db.create_all()
_migrate_schema()
_ensure_default_user()
_ensure_admin_from_env()
_ensure_gate_scout_services()
@app.route("/login", methods=["GET", "POST"])
def login():
if current_user.is_authenticated:
return redirect(url_for("index"))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data.strip()).first()
if user and user.check_password(form.password.data):
login_user(user, remember=True)
next_url = request.args.get("next")
if next_url and next_url.startswith("/"):
return redirect(next_url)
return redirect(url_for("index"))
flash("用户名或密码错误", "error")
return render_template("login.html", form=form)
@app.route("/logout")
@login_required
def logout():
logout_user()
flash("已退出登录", "info")
return redirect(url_for("login"))
@app.route("/")
@login_required
def index():
groups = (
ServiceGroup.query.order_by(ServiceGroup.sort_order, ServiceGroup.id).all()
)
grouped = []
for g in groups:
svcs = (
Service.query.filter_by(group_id=g.id)
.order_by(Service.sort_order, Service.id)
.all()
)
grouped.append((g, svcs))
return render_template(
"index.html",
grouped=grouped,
hub_auto_login=os.environ.get("NAV_HUB_AUTO_LOGIN", "").strip() == "1",
gate_scout_auto_login=os.environ.get("NAV_GATE_SCOUT_AUTO_LOGIN", "").strip()
== "1",
)
@app.route("/api/embed/hub-login", methods=["POST"])
@csrf.exempt
@login_required
def api_embed_hub_login():
"""
本地导航代登录云端中控:服务端请求 hub /api/auth/login,返回 embed-auth URL。
避免浏览器在跨站 iframe 里丢弃 Set-Cookie。
"""
body = request.get_json(silent=True) or {}
username = (body.get("username") or os.environ.get("NAV_HUB_USERNAME") or "").strip()
password = body.get("password")
if password is None:
password = os.environ.get("NAV_HUB_PASSWORD") or ""
password = str(password)
base, next_path, err = _resolve_hub_embed_service(body)
if err:
return jsonify({"ok": False, "detail": err}), 400
if not username or not password:
return jsonify({"ok": False, "detail": "缺少中控用户名或密码(可配置 NAV_HUB_USERNAME / NAV_HUB_PASSWORD"}), 400
token, err = _hub_api_login(base, username, password)
if err:
status = 401 if "登录" in err or "401" in err else 502
return jsonify({"ok": False, "detail": err}), status
q = urlencode({"token": token, "next": next_path})
embed_url = f"{base}/embed-auth?{q}"
return jsonify({"ok": True, "embed_auth_url": embed_url, "next": next_path})
@app.route("/api/embed/gate-scout-login", methods=["POST"])
@csrf.exempt
@login_required
def api_embed_gate_scout_login():
"""
本地导航代登录 Gate 扫描端/执行器:服务端请求 /api/auth/login 或 /login
再让 iframe 打开 /embed-auth 写入 SameSite=None Cookie。
"""
body = request.get_json(silent=True) or {}
username = (
body.get("username") or os.environ.get("NAV_GATE_SCOUT_USERNAME") or ""
).strip()
password = body.get("password")
if password is None:
password = os.environ.get("NAV_GATE_SCOUT_PASSWORD") or ""
password = str(password)
base, next_path, login_path, err = _resolve_gate_scout_embed_service(body)
if err:
return jsonify({"ok": False, "detail": err}), 400
if not username or not password:
return jsonify(
{
"ok": False,
"detail": "缺少 Gate 扫单用户名或密码(可配置 NAV_GATE_SCOUT_USERNAME / NAV_GATE_SCOUT_PASSWORD",
}
), 400
embed_url, err = _gate_scout_api_login(
base,
username,
password,
login_path=login_path,
next_path=next_path,
)
if err:
status = 401 if "登录" in err or "401" in err or "密码" in err else 502
return jsonify({"ok": False, "detail": err}), status
return jsonify({"ok": True, "embed_auth_url": embed_url, "next": next_path})
@app.route("/api/embed/hub-instance-url", methods=["POST"])
@csrf.exempt
@login_required
def api_embed_hub_instance_url():
"""
本地导航代签实例 SSO:服务端登录中控后调用 /api/instance/open-url。
用于 LocalNav iframe 直接打开实例(避免中控内再嵌一层 iframe)。
"""
body = request.get_json(silent=True) or {}
exchange_id = str(body.get("exchange_id") or "").strip()
next_path = (body.get("next") or "/").strip() or "/"
if not next_path.startswith("/"):
next_path = "/" + next_path
if not exchange_id:
return jsonify({"ok": False, "detail": "缺少 exchange_id"}), 400
username = (body.get("username") or os.environ.get("NAV_HUB_USERNAME") or "").strip()
password = body.get("password")
if password is None:
password = os.environ.get("NAV_HUB_PASSWORD") or ""
password = str(password)
base, _, err = _resolve_hub_embed_service(body)
if err:
return jsonify({"ok": False, "detail": err}), 400
if not username or not password:
return jsonify({"ok": False, "detail": "缺少中控用户名或密码(可配置 NAV_HUB_USERNAME / NAV_HUB_PASSWORD"}), 400
token, err = _hub_api_login(base, username, password)
if err:
status = 401 if "登录" in err or "401" in err else 502
return jsonify({"ok": False, "detail": err}), status
url, err = _hub_instance_open_url(
base,
token,
exchange_id,
next_path,
embed=(body.get("embed") or "1").strip().lower() not in ("0", "false", "no"),
)
if err:
return jsonify({"ok": False, "detail": err}), 502
return jsonify({"ok": True, "url": url, "exchange_id": exchange_id, "next": next_path})
# ---------- 分组管理 ----------
@app.route("/admin/groups")
@login_required
def admin_groups():
rows = ServiceGroup.query.order_by(
ServiceGroup.sort_order, ServiceGroup.id
).all()
return render_template("admin_groups.html", groups=rows)
@app.route("/admin/groups/new", methods=["GET", "POST"])
@login_required
def admin_group_new():
form = GroupForm()
if form.validate_on_submit():
g = ServiceGroup(
name=form.name.data.strip(),
icon=(form.icon.data or "").strip(),
sort_order=form.sort_order.data or 0,
)
db.session.add(g)
db.session.commit()
flash("分组已创建", "success")
return redirect(url_for("admin_groups"))
return render_template("admin_group_form.html", form=form, title="新建分组")
@app.route("/admin/groups/<int:gid>/edit", methods=["GET", "POST"])
@login_required
def admin_group_edit(gid: int):
g = db.session.get(ServiceGroup, gid)
if not g:
flash("分组不存在", "error")
return redirect(url_for("admin_groups"))
form = GroupForm(obj=g)
if form.validate_on_submit():
g.name = form.name.data.strip()
g.icon = (form.icon.data or "").strip()
g.sort_order = form.sort_order.data or 0
db.session.commit()
flash("分组已更新", "success")
return redirect(url_for("admin_groups"))
return render_template("admin_group_form.html", form=form, title="编辑分组")
@app.route("/admin/groups/<int:gid>/delete", methods=["POST"])
@login_required
def admin_group_delete(gid: int):
g = db.session.get(ServiceGroup, gid)
if not g:
flash("分组不存在", "error")
return redirect(url_for("admin_groups"))
db.session.delete(g)
db.session.commit()
flash("分组及其下属服务已删除", "success")
return redirect(url_for("admin_groups"))
# ---------- 服务管理 ----------
@app.route("/admin/services")
@login_required
def admin_services():
groups = ServiceGroup.query.order_by(
ServiceGroup.sort_order, ServiceGroup.id
).all()
gid = request.args.get("group_id", type=int)
q = Service.query
if gid:
q = q.filter_by(group_id=gid)
services = q.order_by(Service.group_id, Service.sort_order, Service.id).all()
return render_template(
"admin_services.html",
services=services,
groups=groups,
filter_group_id=gid,
)
@app.route("/admin/services/new", methods=["GET", "POST"])
@login_required
def admin_service_new():
form = ServiceForm()
form.group_id.choices = _group_choices()
if not form.group_id.choices:
flash("请先创建至少一个分组", "error")
return redirect(url_for("admin_groups"))
groups = ServiceGroup.query.order_by(
ServiceGroup.sort_order, ServiceGroup.id
).all()
if request.method == "GET":
gid = request.args.get("group_id", type=int)
if gid and any(gid == c[0] for c in form.group_id.choices):
form.group_id.data = gid
else:
fid = _first_group_id()
if fid is not None:
form.group_id.data = fid
if form.validate_on_submit():
path = (form.path.data or "").strip() or "/"
s = Service(
name=form.name.data.strip(),
scheme=form.scheme.data,
host=form.host.data.strip(),
port=form.port.data,
path=path,
sort_order=form.sort_order.data or 0,
group_id=form.group_id.data,
embed_kind=(form.embed_kind.data or "").strip(),
)
db.session.add(s)
db.session.commit()
flash("服务已添加", "success")
return redirect(url_for("admin_services"))
return render_template(
"admin_service_form.html",
form=form,
groups=groups,
title="新建服务",
)
@app.route("/admin/services/<int:sid>/edit", methods=["GET", "POST"])
@login_required
def admin_service_edit(sid: int):
s = db.session.get(Service, sid)
if not s:
flash("服务不存在", "error")
return redirect(url_for("admin_services"))
form = ServiceForm(obj=s)
form.group_id.choices = _group_choices()
if not form.group_id.choices:
flash("请先创建至少一个分组", "error")
return redirect(url_for("admin_groups"))
groups = ServiceGroup.query.order_by(
ServiceGroup.sort_order, ServiceGroup.id
).all()
if form.validate_on_submit():
s.name = form.name.data.strip()
s.scheme = form.scheme.data
s.host = form.host.data.strip()
s.port = form.port.data
s.path = (form.path.data or "").strip() or "/"
s.sort_order = form.sort_order.data or 0
s.group_id = form.group_id.data
s.embed_kind = (form.embed_kind.data or "").strip()
db.session.commit()
flash("服务已更新", "success")
return redirect(url_for("admin_services"))
return render_template(
"admin_service_form.html",
form=form,
groups=groups,
title="编辑服务",
)
@app.route("/admin/services/<int:sid>/delete", methods=["POST"])
@login_required
def admin_service_delete(sid: int):
s = db.session.get(Service, sid)
if not s:
flash("服务不存在", "error")
return redirect(url_for("admin_services"))
db.session.delete(s)
db.session.commit()
flash("服务已删除", "success")
return redirect(url_for("admin_services"))
if os.environ.get("NAV_TRUST_PROXY") == "1":
app.wsgi_app = ProxyFix(
app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_prefix=1
)
if not os.environ.get("NAV_SECRET_KEY"):
print(
"[nav] 警告: 未设置 NAV_SECRET_KEY。"
"若使用 gunicorn/uwsgi 等多 worker,或未固定密钥,登录后会话会失效;"
"请在环境变量中配置随机 NAV_SECRET_KEY。",
flush=True,
)
return app
def _migrate_schema() -> None:
"""SQLite 已有库补列(db.create_all 不会 ALTER 已有表)。"""
from sqlalchemy import inspect, text
try:
insp = inspect(db.engine)
if "services" not in insp.get_table_names():
return
cols = {c["name"] for c in insp.get_columns("services")}
if "scheme" not in cols:
with db.engine.begin() as conn:
conn.execute(
text(
"ALTER TABLE services ADD COLUMN scheme VARCHAR(8) "
"NOT NULL DEFAULT 'http'"
)
)
print("[nav] 已为 services 表添加 scheme 列(默认 http)。", flush=True)
cols = {c["name"] for c in insp.get_columns("services")}
if "embed_kind" not in cols:
with db.engine.begin() as conn:
conn.execute(
text(
"ALTER TABLE services ADD COLUMN embed_kind VARCHAR(16) "
"NOT NULL DEFAULT ''"
)
)
print("[nav] 已为 services 表添加 embed_kind 列。", flush=True)
if "service_groups" in insp.get_table_names():
gcols = {c["name"] for c in insp.get_columns("service_groups")}
if "icon" not in gcols:
with db.engine.begin() as conn:
conn.execute(
text(
"ALTER TABLE service_groups ADD COLUMN icon VARCHAR(16) "
"NOT NULL DEFAULT ''"
)
)
print("[nav] 已为 service_groups 表添加 icon 列。", flush=True)
except Exception as exc:
print(f"[nav] 数据库结构迁移跳过或失败: {exc}", flush=True)
def _first_group_id() -> Optional[int]:
g = ServiceGroup.query.order_by(ServiceGroup.sort_order, ServiceGroup.id).first()
return g.id if g else None
def _group_choices():
groups = ServiceGroup.query.order_by(
ServiceGroup.sort_order, ServiceGroup.id
).all()
return [(g.id, g.name) for g in groups]
def _ensure_default_user() -> None:
if User.query.count() > 0:
return
username = (os.environ.get("NAV_ADMIN_USERNAME") or "admin").strip() or "admin"
password = os.environ.get("NAV_ADMIN_PASSWORD") or "admin123"
u = User(username=username)
u.set_password(password)
db.session.add(u)
db.session.add(ServiceGroup(name="默认分组", sort_order=0))
db.session.commit()
print(
f"[nav] 首次运行:已创建管理员「{username}"
"(来自 NAV_ADMIN_USERNAME / NAV_ADMIN_PASSWORD,未设置则为 admin / admin123)。"
"生产环境请尽快改为强密码。"
)
def _ensure_gate_scout_services() -> None:
"""gate_scout_order:扫描端 + 执行器(NAV_SEED_GATE_SCOUT=1;云服务器用域名 + https)。"""
flag = (os.environ.get("NAV_SEED_GATE_SCOUT") or "").strip().lower()
if flag not in ("1", "true", "yes", "on"):
return
scheme = (os.environ.get("NAV_GATE_SCOUT_SCHEME") or "http").strip().lower()
if scheme not in ("http", "https"):
scheme = "http"
default_host = (os.environ.get("NAV_GATE_SCOUT_HOST") or "127.0.0.1").strip() or "127.0.0.1"
scout_host = (os.environ.get("NAV_GATE_SCOUT_SCOUT_HOST") or default_host).strip() or default_host
exec_host = (os.environ.get("NAV_GATE_EXECUTOR_HOST") or default_host).strip() or default_host
default_port = "443" if scheme == "https" else "8088"
default_exec_port = "443" if scheme == "https" else "8090"
try:
scout_port = int(os.environ.get("NAV_GATE_SCOUT_PORT") or default_port)
exec_port = int(os.environ.get("NAV_GATE_EXECUTOR_PORT") or default_exec_port)
except ValueError:
scout_port = 443 if scheme == "https" else 8088
exec_port = 443 if scheme == "https" else 8090
scout_path = (os.environ.get("NAV_GATE_SCOUT_PATH") or "/dashboard").strip() or "/dashboard"
exec_path = (os.environ.get("NAV_GATE_EXECUTOR_PATH") or "/dashboard").strip() or "/dashboard"
if not scout_path.startswith("/"):
scout_path = "/" + scout_path
if not exec_path.startswith("/"):
exec_path = "/" + exec_path
update_existing = (os.environ.get("NAV_GATE_SCOUT_UPDATE") or "").strip().lower() in (
"1",
"true",
"yes",
"on",
)
group_name = (os.environ.get("NAV_GATE_SCOUT_GROUP") or "Gate 扫单").strip() or "Gate 扫单"
g = ServiceGroup.query.filter_by(name=group_name).first()
if not g:
g = ServiceGroup(name=group_name, sort_order=50)
db.session.add(g)
db.session.flush()
defs = (
("Gate 扫描端", scout_host, scout_port, scout_path, 0, "gate_scout"),
("Gate 下单执行器", exec_host, exec_port, exec_path, 10, "gate_exec"),
)
added = 0
updated = 0
for name, h, port, path, order, embed_k in defs:
existing = Service.query.filter_by(group_id=g.id, name=name).first()
if existing:
if update_existing and (
existing.scheme != scheme
or existing.host != h
or existing.port != port
or existing.path != path
or (existing.embed_kind or "") != embed_k
):
existing.scheme = scheme
existing.host = h
existing.port = port
existing.path = path
existing.embed_kind = embed_k
updated += 1
elif not (existing.embed_kind or "").strip():
existing.embed_kind = embed_k
updated += 1
continue
db.session.add(
Service(
name=name,
scheme=scheme,
host=h,
port=port,
path=path,
sort_order=order,
group_id=g.id,
embed_kind=embed_k,
)
)
added += 1
if added or updated:
db.session.commit()
print(
f"[nav] Gate 扫单:新增 {added}、更新 {updated}(分组「{group_name}」)。"
f" 扫描 {scheme}://{scout_host}:{scout_port}{scout_path}"
f"执行器 {scheme}://{exec_host}:{exec_port}{exec_path}",
flush=True,
)
def _ensure_admin_from_env() -> None:
"""库中已有用户后,.env 里的管理员账号不会自动覆盖旧库;若配置了用户名且尚不存在则补建。"""
username = (os.environ.get("NAV_ADMIN_USERNAME") or "").strip()
password = os.environ.get("NAV_ADMIN_PASSWORD")
if not username or password is None or password == "":
return
existing = User.query.filter_by(username=username).first()
if existing:
if os.environ.get("NAV_ADMIN_UPDATE_PASSWORD") == "1":
existing.set_password(password)
db.session.commit()
print(
f"[nav] 已按 NAV_ADMIN_UPDATE_PASSWORD=1 更新用户「{username}」的登录密码。",
flush=True,
)
return
u = User(username=username)
u.set_password(password)
db.session.add(u)
db.session.commit()
print(
f"[nav] 已从环境变量创建用户「{username}」(此前库中无此用户名)。",
flush=True,
)
app = create_app()
if __name__ == "__main__":
host = os.environ.get("NAV_HOST", "0.0.0.0")
port = int(os.environ.get("NAV_PORT", "5070"))
app.run(host=host, port=port, debug=os.environ.get("NAV_DEBUG") == "1")