import os import secrets from pathlib import Path from typing import Optional from flask import Flask, flash, redirect, render_template, request, url_for from flask_login import LoginManager, current_user, login_required, login_user, logout_user from flask_wtf.csrf import CSRFProtect from werkzeug.middleware.proxy_fix import ProxyFix from forms import GroupForm, LoginForm, ServiceForm from models import Service, ServiceGroup, User, db _ROOT = Path(__file__).resolve().parent def _load_env_file(path: Path) -> None: """若未安装 python-dotenv,用最小实现加载 .env(不覆盖已有环境变量)。""" if not path.is_file(): return try: raw = path.read_text(encoding="utf-8-sig") except OSError: return for line in raw.splitlines(): s = line.strip() if not s or s.startswith("#"): continue if s.lower().startswith("export "): s = s[7:].lstrip() if "=" not in s: continue key, _, rest = s.partition("=") key = key.strip() if not key: continue val = rest.strip() if len(val) >= 2 and val[0] == val[-1] and val[0] in "\"'": val = val[1:-1] if key not in os.environ: os.environ[key] = val try: from dotenv import load_dotenv as _dotenv_load except ImportError: _dotenv_load = None if _dotenv_load: _dotenv_load(_ROOT / ".env") else: _load_env_file(_ROOT / ".env") login_manager = LoginManager() csrf = CSRFProtect() def _apply_session_cookie_settings(app: Flask) -> None: """Secure Cookie 仅在浏览器用 https:// 访问时才有意义;用 http://IP:端口 时若仍设 Secure,浏览器会丢弃会话,表现为登录后立即回到登录页。""" if os.environ.get("NAV_COOKIES_INSECURE_HTTP") == "1": app.config["SESSION_COOKIE_SECURE"] = False app.config["REMEMBER_COOKIE_SECURE"] = False if (os.environ.get("NAV_SESSION_COOKIE_SECURE") or "").strip().lower() in ( "1", "true", "yes", ): print( "[nav] NAV_COOKIES_INSECURE_HTTP=1:已关闭 Secure Cookie,便于 http:// 内网访问。", flush=True, ) return mode = (os.environ.get("NAV_SESSION_COOKIE_SECURE") or "0").strip().lower() if mode in ("1", "true", "yes"): app.config["SESSION_COOKIE_SECURE"] = True app.config["REMEMBER_COOKIE_SECURE"] = True elif mode == "auto": app.config["SESSION_COOKIE_SECURE"] = False app.config["REMEMBER_COOKIE_SECURE"] = False @app.before_request def _nav_session_cookie_secure_auto(): sec = request.is_secure app.config["SESSION_COOKIE_SECURE"] = sec app.config["REMEMBER_COOKIE_SECURE"] = sec else: app.config["SESSION_COOKIE_SECURE"] = False app.config["REMEMBER_COOKIE_SECURE"] = False def create_app() -> Flask: app = Flask(__name__) app.config["SECRET_KEY"] = os.environ.get("NAV_SECRET_KEY") or secrets.token_hex(32) app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get( "NAV_DATABASE_URL", "sqlite:///nav_local.db" ) app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False app.config["WTF_CSRF_TIME_LIMIT"] = None _apply_session_cookie_settings(app) trusted = os.environ.get("NAV_CSRF_TRUSTED_ORIGINS", "").strip() if trusted: app.config["WTF_CSRF_TRUSTED_ORIGINS"] = [ o.strip() for o in trusted.split(",") if o.strip() ] db.init_app(app) login_manager.init_app(app) login_manager.login_view = "login" login_manager.login_message = "请先登录" csrf.init_app(app) @login_manager.user_loader def load_user(user_id: str): return db.session.get(User, int(user_id)) with app.app_context(): db.create_all() _ensure_default_user() _ensure_admin_from_env() @app.route("/login", methods=["GET", "POST"]) def login(): if current_user.is_authenticated: return redirect(url_for("index")) form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data.strip()).first() if user and user.check_password(form.password.data): login_user(user, remember=True) next_url = request.args.get("next") if next_url and next_url.startswith("/"): return redirect(next_url) return redirect(url_for("index")) flash("用户名或密码错误", "error") return render_template("login.html", form=form) @app.route("/logout") @login_required def logout(): logout_user() flash("已退出登录", "info") return redirect(url_for("login")) @app.route("/") @login_required def index(): groups = ( ServiceGroup.query.order_by(ServiceGroup.sort_order, ServiceGroup.id).all() ) grouped = [] for g in groups: svcs = ( Service.query.filter_by(group_id=g.id) .order_by(Service.sort_order, Service.id) .all() ) grouped.append((g, svcs)) return render_template("index.html", grouped=grouped) # ---------- 分组管理 ---------- @app.route("/admin/groups") @login_required def admin_groups(): rows = ServiceGroup.query.order_by( ServiceGroup.sort_order, ServiceGroup.id ).all() return render_template("admin_groups.html", groups=rows) @app.route("/admin/groups/new", methods=["GET", "POST"]) @login_required def admin_group_new(): form = GroupForm() if form.validate_on_submit(): g = ServiceGroup( name=form.name.data.strip(), sort_order=form.sort_order.data or 0, ) db.session.add(g) db.session.commit() flash("分组已创建", "success") return redirect(url_for("admin_groups")) return render_template("admin_group_form.html", form=form, title="新建分组") @app.route("/admin/groups//edit", methods=["GET", "POST"]) @login_required def admin_group_edit(gid: int): g = db.session.get(ServiceGroup, gid) if not g: flash("分组不存在", "error") return redirect(url_for("admin_groups")) form = GroupForm(obj=g) if form.validate_on_submit(): g.name = form.name.data.strip() g.sort_order = form.sort_order.data or 0 db.session.commit() flash("分组已更新", "success") return redirect(url_for("admin_groups")) return render_template("admin_group_form.html", form=form, title="编辑分组") @app.route("/admin/groups//delete", methods=["POST"]) @login_required def admin_group_delete(gid: int): g = db.session.get(ServiceGroup, gid) if not g: flash("分组不存在", "error") return redirect(url_for("admin_groups")) db.session.delete(g) db.session.commit() flash("分组及其下属服务已删除", "success") return redirect(url_for("admin_groups")) # ---------- 服务管理 ---------- @app.route("/admin/services") @login_required def admin_services(): groups = ServiceGroup.query.order_by( ServiceGroup.sort_order, ServiceGroup.id ).all() gid = request.args.get("group_id", type=int) q = Service.query if gid: q = q.filter_by(group_id=gid) services = q.order_by(Service.group_id, Service.sort_order, Service.id).all() return render_template( "admin_services.html", services=services, groups=groups, filter_group_id=gid, ) @app.route("/admin/services/new", methods=["GET", "POST"]) @login_required def admin_service_new(): form = ServiceForm() form.group_id.choices = _group_choices() if not form.group_id.choices: flash("请先创建至少一个分组", "error") return redirect(url_for("admin_groups")) groups = ServiceGroup.query.order_by( ServiceGroup.sort_order, ServiceGroup.id ).all() if request.method == "GET": gid = request.args.get("group_id", type=int) if gid and any(gid == c[0] for c in form.group_id.choices): form.group_id.data = gid else: fid = _first_group_id() if fid is not None: form.group_id.data = fid if form.validate_on_submit(): path = (form.path.data or "").strip() or "/" s = Service( name=form.name.data.strip(), host=form.host.data.strip(), port=form.port.data, path=path, sort_order=form.sort_order.data or 0, group_id=form.group_id.data, ) db.session.add(s) db.session.commit() flash("服务已添加", "success") return redirect(url_for("admin_services")) return render_template( "admin_service_form.html", form=form, groups=groups, title="新建服务", ) @app.route("/admin/services//edit", methods=["GET", "POST"]) @login_required def admin_service_edit(sid: int): s = db.session.get(Service, sid) if not s: flash("服务不存在", "error") return redirect(url_for("admin_services")) form = ServiceForm(obj=s) form.group_id.choices = _group_choices() if not form.group_id.choices: flash("请先创建至少一个分组", "error") return redirect(url_for("admin_groups")) groups = ServiceGroup.query.order_by( ServiceGroup.sort_order, ServiceGroup.id ).all() if form.validate_on_submit(): s.name = form.name.data.strip() s.host = form.host.data.strip() s.port = form.port.data s.path = (form.path.data or "").strip() or "/" s.sort_order = form.sort_order.data or 0 s.group_id = form.group_id.data db.session.commit() flash("服务已更新", "success") return redirect(url_for("admin_services")) return render_template( "admin_service_form.html", form=form, groups=groups, title="编辑服务", ) @app.route("/admin/services//delete", methods=["POST"]) @login_required def admin_service_delete(sid: int): s = db.session.get(Service, sid) if not s: flash("服务不存在", "error") return redirect(url_for("admin_services")) db.session.delete(s) db.session.commit() flash("服务已删除", "success") return redirect(url_for("admin_services")) if os.environ.get("NAV_TRUST_PROXY") == "1": app.wsgi_app = ProxyFix( app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_prefix=1 ) if not os.environ.get("NAV_SECRET_KEY"): print( "[nav] 警告: 未设置 NAV_SECRET_KEY。" "若使用 gunicorn/uwsgi 等多 worker,或未固定密钥,登录后会话会失效;" "请在环境变量中配置随机 NAV_SECRET_KEY。", flush=True, ) return app def _first_group_id() -> Optional[int]: g = ServiceGroup.query.order_by(ServiceGroup.sort_order, ServiceGroup.id).first() return g.id if g else None def _group_choices(): groups = ServiceGroup.query.order_by( ServiceGroup.sort_order, ServiceGroup.id ).all() return [(g.id, g.name) for g in groups] def _ensure_default_user() -> None: if User.query.count() > 0: return username = (os.environ.get("NAV_ADMIN_USERNAME") or "admin").strip() or "admin" password = os.environ.get("NAV_ADMIN_PASSWORD") or "admin123" u = User(username=username) u.set_password(password) db.session.add(u) db.session.add(ServiceGroup(name="默认分组", sort_order=0)) db.session.commit() print( f"[nav] 首次运行:已创建管理员「{username}」" "(来自 NAV_ADMIN_USERNAME / NAV_ADMIN_PASSWORD,未设置则为 admin / admin123)。" "生产环境请尽快改为强密码。" ) def _ensure_admin_from_env() -> None: """库中已有用户后,.env 里的管理员账号不会自动覆盖旧库;若配置了用户名且尚不存在则补建。""" username = (os.environ.get("NAV_ADMIN_USERNAME") or "").strip() password = os.environ.get("NAV_ADMIN_PASSWORD") if not username or password is None or password == "": return existing = User.query.filter_by(username=username).first() if existing: if os.environ.get("NAV_ADMIN_UPDATE_PASSWORD") == "1": existing.set_password(password) db.session.commit() print( f"[nav] 已按 NAV_ADMIN_UPDATE_PASSWORD=1 更新用户「{username}」的登录密码。", flush=True, ) return u = User(username=username) u.set_password(password) db.session.add(u) db.session.commit() print( f"[nav] 已从环境变量创建用户「{username}」(此前库中无此用户名)。", flush=True, ) app = create_app() if __name__ == "__main__": host = os.environ.get("NAV_HOST", "0.0.0.0") port = int(os.environ.get("NAV_PORT", "5000")) app.run(host=host, port=port, debug=os.environ.get("NAV_DEBUG") == "1")