a5e1e94fb2
Co-authored-by: Cursor <cursoragent@cursor.com>
336 lines
11 KiB
Python
336 lines
11 KiB
Python
import os
|
|
import secrets
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from flask import Flask, flash, redirect, render_template, request, url_for
|
|
from flask_login import LoginManager, current_user, login_required, login_user, logout_user
|
|
from flask_wtf.csrf import CSRFProtect
|
|
from werkzeug.middleware.proxy_fix import ProxyFix
|
|
|
|
from forms import GroupForm, LoginForm, ServiceForm
|
|
from models import Service, ServiceGroup, User, db
|
|
|
|
_ROOT = Path(__file__).resolve().parent
|
|
|
|
|
|
def _load_env_file(path: Path) -> None:
|
|
"""若未安装 python-dotenv,用最小实现加载 .env(不覆盖已有环境变量)。"""
|
|
if not path.is_file():
|
|
return
|
|
try:
|
|
raw = path.read_text(encoding="utf-8-sig")
|
|
except OSError:
|
|
return
|
|
for line in raw.splitlines():
|
|
s = line.strip()
|
|
if not s or s.startswith("#"):
|
|
continue
|
|
if s.lower().startswith("export "):
|
|
s = s[7:].lstrip()
|
|
if "=" not in s:
|
|
continue
|
|
key, _, rest = s.partition("=")
|
|
key = key.strip()
|
|
if not key:
|
|
continue
|
|
val = rest.strip()
|
|
if len(val) >= 2 and val[0] == val[-1] and val[0] in "\"'":
|
|
val = val[1:-1]
|
|
if key not in os.environ:
|
|
os.environ[key] = val
|
|
|
|
|
|
try:
|
|
from dotenv import load_dotenv as _dotenv_load
|
|
except ImportError:
|
|
_dotenv_load = None
|
|
|
|
if _dotenv_load:
|
|
_dotenv_load(_ROOT / ".env")
|
|
else:
|
|
_load_env_file(_ROOT / ".env")
|
|
|
|
login_manager = LoginManager()
|
|
csrf = CSRFProtect()
|
|
|
|
|
|
def create_app() -> Flask:
|
|
app = Flask(__name__)
|
|
app.config["SECRET_KEY"] = os.environ.get("NAV_SECRET_KEY") or secrets.token_hex(32)
|
|
app.config["SQLALCHEMY_DATABASE_URI"] = os.environ.get(
|
|
"NAV_DATABASE_URL", "sqlite:///nav_local.db"
|
|
)
|
|
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
|
|
app.config["WTF_CSRF_TIME_LIMIT"] = None
|
|
|
|
if os.environ.get("NAV_SESSION_COOKIE_SECURE") == "1":
|
|
app.config["SESSION_COOKIE_SECURE"] = True
|
|
app.config["REMEMBER_COOKIE_SECURE"] = True
|
|
|
|
trusted = os.environ.get("NAV_CSRF_TRUSTED_ORIGINS", "").strip()
|
|
if trusted:
|
|
app.config["WTF_CSRF_TRUSTED_ORIGINS"] = [
|
|
o.strip() for o in trusted.split(",") if o.strip()
|
|
]
|
|
|
|
db.init_app(app)
|
|
login_manager.init_app(app)
|
|
login_manager.login_view = "login"
|
|
login_manager.login_message = "请先登录"
|
|
csrf.init_app(app)
|
|
|
|
@login_manager.user_loader
|
|
def load_user(user_id: str):
|
|
return db.session.get(User, int(user_id))
|
|
|
|
with app.app_context():
|
|
db.create_all()
|
|
_ensure_default_user()
|
|
|
|
@app.route("/login", methods=["GET", "POST"])
|
|
def login():
|
|
if current_user.is_authenticated:
|
|
return redirect(url_for("index"))
|
|
form = LoginForm()
|
|
if form.validate_on_submit():
|
|
user = User.query.filter_by(username=form.username.data.strip()).first()
|
|
if user and user.check_password(form.password.data):
|
|
login_user(user, remember=True)
|
|
next_url = request.args.get("next")
|
|
if next_url and next_url.startswith("/"):
|
|
return redirect(next_url)
|
|
return redirect(url_for("index"))
|
|
flash("用户名或密码错误", "error")
|
|
return render_template("login.html", form=form)
|
|
|
|
@app.route("/logout")
|
|
@login_required
|
|
def logout():
|
|
logout_user()
|
|
flash("已退出登录", "info")
|
|
return redirect(url_for("login"))
|
|
|
|
@app.route("/")
|
|
@login_required
|
|
def index():
|
|
groups = (
|
|
ServiceGroup.query.order_by(ServiceGroup.sort_order, ServiceGroup.id).all()
|
|
)
|
|
grouped = []
|
|
for g in groups:
|
|
svcs = (
|
|
Service.query.filter_by(group_id=g.id)
|
|
.order_by(Service.sort_order, Service.id)
|
|
.all()
|
|
)
|
|
grouped.append((g, svcs))
|
|
return render_template("index.html", grouped=grouped)
|
|
|
|
# ---------- 分组管理 ----------
|
|
@app.route("/admin/groups")
|
|
@login_required
|
|
def admin_groups():
|
|
rows = ServiceGroup.query.order_by(
|
|
ServiceGroup.sort_order, ServiceGroup.id
|
|
).all()
|
|
return render_template("admin_groups.html", groups=rows)
|
|
|
|
@app.route("/admin/groups/new", methods=["GET", "POST"])
|
|
@login_required
|
|
def admin_group_new():
|
|
form = GroupForm()
|
|
if form.validate_on_submit():
|
|
g = ServiceGroup(
|
|
name=form.name.data.strip(),
|
|
sort_order=form.sort_order.data or 0,
|
|
)
|
|
db.session.add(g)
|
|
db.session.commit()
|
|
flash("分组已创建", "success")
|
|
return redirect(url_for("admin_groups"))
|
|
return render_template("admin_group_form.html", form=form, title="新建分组")
|
|
|
|
@app.route("/admin/groups/<int:gid>/edit", methods=["GET", "POST"])
|
|
@login_required
|
|
def admin_group_edit(gid: int):
|
|
g = db.session.get(ServiceGroup, gid)
|
|
if not g:
|
|
flash("分组不存在", "error")
|
|
return redirect(url_for("admin_groups"))
|
|
form = GroupForm(obj=g)
|
|
if form.validate_on_submit():
|
|
g.name = form.name.data.strip()
|
|
g.sort_order = form.sort_order.data or 0
|
|
db.session.commit()
|
|
flash("分组已更新", "success")
|
|
return redirect(url_for("admin_groups"))
|
|
return render_template("admin_group_form.html", form=form, title="编辑分组")
|
|
|
|
@app.route("/admin/groups/<int:gid>/delete", methods=["POST"])
|
|
@login_required
|
|
def admin_group_delete(gid: int):
|
|
g = db.session.get(ServiceGroup, gid)
|
|
if not g:
|
|
flash("分组不存在", "error")
|
|
return redirect(url_for("admin_groups"))
|
|
db.session.delete(g)
|
|
db.session.commit()
|
|
flash("分组及其下属服务已删除", "success")
|
|
return redirect(url_for("admin_groups"))
|
|
|
|
# ---------- 服务管理 ----------
|
|
@app.route("/admin/services")
|
|
@login_required
|
|
def admin_services():
|
|
groups = ServiceGroup.query.order_by(
|
|
ServiceGroup.sort_order, ServiceGroup.id
|
|
).all()
|
|
gid = request.args.get("group_id", type=int)
|
|
q = Service.query
|
|
if gid:
|
|
q = q.filter_by(group_id=gid)
|
|
services = q.order_by(Service.group_id, Service.sort_order, Service.id).all()
|
|
return render_template(
|
|
"admin_services.html",
|
|
services=services,
|
|
groups=groups,
|
|
filter_group_id=gid,
|
|
)
|
|
|
|
@app.route("/admin/services/new", methods=["GET", "POST"])
|
|
@login_required
|
|
def admin_service_new():
|
|
form = ServiceForm()
|
|
form.group_id.choices = _group_choices()
|
|
if not form.group_id.choices:
|
|
flash("请先创建至少一个分组", "error")
|
|
return redirect(url_for("admin_groups"))
|
|
groups = ServiceGroup.query.order_by(
|
|
ServiceGroup.sort_order, ServiceGroup.id
|
|
).all()
|
|
if request.method == "GET":
|
|
gid = request.args.get("group_id", type=int)
|
|
if gid and any(gid == c[0] for c in form.group_id.choices):
|
|
form.group_id.data = gid
|
|
else:
|
|
fid = _first_group_id()
|
|
if fid is not None:
|
|
form.group_id.data = fid
|
|
if form.validate_on_submit():
|
|
path = (form.path.data or "").strip() or "/"
|
|
s = Service(
|
|
name=form.name.data.strip(),
|
|
host=form.host.data.strip(),
|
|
port=form.port.data,
|
|
path=path,
|
|
sort_order=form.sort_order.data or 0,
|
|
group_id=form.group_id.data,
|
|
)
|
|
db.session.add(s)
|
|
db.session.commit()
|
|
flash("服务已添加", "success")
|
|
return redirect(url_for("admin_services"))
|
|
return render_template(
|
|
"admin_service_form.html",
|
|
form=form,
|
|
groups=groups,
|
|
title="新建服务",
|
|
)
|
|
|
|
@app.route("/admin/services/<int:sid>/edit", methods=["GET", "POST"])
|
|
@login_required
|
|
def admin_service_edit(sid: int):
|
|
s = db.session.get(Service, sid)
|
|
if not s:
|
|
flash("服务不存在", "error")
|
|
return redirect(url_for("admin_services"))
|
|
form = ServiceForm(obj=s)
|
|
form.group_id.choices = _group_choices()
|
|
if not form.group_id.choices:
|
|
flash("请先创建至少一个分组", "error")
|
|
return redirect(url_for("admin_groups"))
|
|
groups = ServiceGroup.query.order_by(
|
|
ServiceGroup.sort_order, ServiceGroup.id
|
|
).all()
|
|
if form.validate_on_submit():
|
|
s.name = form.name.data.strip()
|
|
s.host = form.host.data.strip()
|
|
s.port = form.port.data
|
|
s.path = (form.path.data or "").strip() or "/"
|
|
s.sort_order = form.sort_order.data or 0
|
|
s.group_id = form.group_id.data
|
|
db.session.commit()
|
|
flash("服务已更新", "success")
|
|
return redirect(url_for("admin_services"))
|
|
return render_template(
|
|
"admin_service_form.html",
|
|
form=form,
|
|
groups=groups,
|
|
title="编辑服务",
|
|
)
|
|
|
|
@app.route("/admin/services/<int:sid>/delete", methods=["POST"])
|
|
@login_required
|
|
def admin_service_delete(sid: int):
|
|
s = db.session.get(Service, sid)
|
|
if not s:
|
|
flash("服务不存在", "error")
|
|
return redirect(url_for("admin_services"))
|
|
db.session.delete(s)
|
|
db.session.commit()
|
|
flash("服务已删除", "success")
|
|
return redirect(url_for("admin_services"))
|
|
|
|
if os.environ.get("NAV_TRUST_PROXY") == "1":
|
|
app.wsgi_app = ProxyFix(
|
|
app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_port=1, x_prefix=1
|
|
)
|
|
|
|
if not os.environ.get("NAV_SECRET_KEY"):
|
|
print(
|
|
"[nav] 警告: 未设置 NAV_SECRET_KEY。"
|
|
"若使用 gunicorn/uwsgi 等多 worker,或未固定密钥,登录后会话会失效;"
|
|
"请在环境变量中配置随机 NAV_SECRET_KEY。",
|
|
flush=True,
|
|
)
|
|
|
|
return app
|
|
|
|
|
|
def _first_group_id() -> Optional[int]:
|
|
g = ServiceGroup.query.order_by(ServiceGroup.sort_order, ServiceGroup.id).first()
|
|
return g.id if g else None
|
|
|
|
|
|
def _group_choices():
|
|
groups = ServiceGroup.query.order_by(
|
|
ServiceGroup.sort_order, ServiceGroup.id
|
|
).all()
|
|
return [(g.id, g.name) for g in groups]
|
|
|
|
|
|
def _ensure_default_user() -> None:
|
|
if User.query.count() > 0:
|
|
return
|
|
username = (os.environ.get("NAV_ADMIN_USERNAME") or "admin").strip() or "admin"
|
|
password = os.environ.get("NAV_ADMIN_PASSWORD") or "admin123"
|
|
u = User(username=username)
|
|
u.set_password(password)
|
|
db.session.add(u)
|
|
db.session.add(ServiceGroup(name="默认分组", sort_order=0))
|
|
db.session.commit()
|
|
print(
|
|
f"[nav] 首次运行:已创建管理员「{username}」"
|
|
"(来自 NAV_ADMIN_USERNAME / NAV_ADMIN_PASSWORD,未设置则为 admin / admin123)。"
|
|
"生产环境请尽快改为强密码。"
|
|
)
|
|
|
|
|
|
app = create_app()
|
|
|
|
if __name__ == "__main__":
|
|
host = os.environ.get("NAV_HOST", "0.0.0.0")
|
|
port = int(os.environ.get("NAV_PORT", "5000"))
|
|
app.run(host=host, port=port, debug=os.environ.get("NAV_DEBUG") == "1")
|