59a88a9afe
Use stdlib pbkdf2 for admin passwords so init_db works reliably. Remove .env from git to avoid pull conflicts on VPS. Verify flask install before database init. Co-authored-by: Cursor <cursoragent@cursor.com>
143 lines
4.0 KiB
Python
143 lines
4.0 KiB
Python
"""SQLite 数据库:管理员账号与节点。"""
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import hmac
|
|
import os
|
|
import secrets
|
|
import sqlite3
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(os.environ.get("JIEDIAN_ROOT", Path(__file__).resolve().parents[1]))
|
|
DB_FILE = ROOT / "data" / "nodes.db"
|
|
_PBKDF2_ITERATIONS = 600000
|
|
|
|
|
|
def _hash_password(password: str) -> str:
|
|
salt = secrets.token_hex(16)
|
|
digest = hashlib.pbkdf2_hmac(
|
|
"sha256", password.encode(), salt.encode(), _PBKDF2_ITERATIONS
|
|
)
|
|
return f"pbkdf2:sha256:{_PBKDF2_ITERATIONS}${salt}${digest.hex()}"
|
|
|
|
|
|
def _verify_password(stored: str, password: str) -> bool:
|
|
if not stored or stored.count("$") < 2:
|
|
return False
|
|
method, salt, expected = stored.split("$", 2)
|
|
if not method.startswith("pbkdf2:sha256:"):
|
|
return False
|
|
iterations = int(method.rsplit(":", 1)[1])
|
|
digest = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), iterations)
|
|
return hmac.compare_digest(digest.hex(), expected)
|
|
|
|
|
|
def connect() -> sqlite3.Connection:
|
|
DB_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
conn = sqlite3.connect(DB_FILE)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
return conn
|
|
|
|
|
|
def init_db(env: dict[str, str]) -> None:
|
|
conn = connect()
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS admin (
|
|
id INTEGER PRIMARY KEY,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password_hash TEXT NOT NULL
|
|
);
|
|
CREATE TABLE IF NOT EXISTS nodes (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
name TEXT NOT NULL,
|
|
uuid TEXT NOT NULL UNIQUE,
|
|
hy2_password TEXT NOT NULL,
|
|
enabled INTEGER NOT NULL DEFAULT 1,
|
|
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
|
);
|
|
"""
|
|
)
|
|
|
|
username = env.get("PANEL_USERNAME", "admin")
|
|
password = env.get("PANEL_PASSWORD")
|
|
if not password:
|
|
raise SystemExit("请在 .env 中设置 PANEL_PASSWORD")
|
|
|
|
conn.execute("DELETE FROM admin")
|
|
conn.execute(
|
|
"INSERT INTO admin (username, password_hash) VALUES (?, ?)",
|
|
(username, _hash_password(password)),
|
|
)
|
|
|
|
count = conn.execute("SELECT COUNT(*) AS c FROM nodes").fetchone()["c"]
|
|
if count == 0:
|
|
uuid, hy2 = _generate_credentials()
|
|
conn.execute(
|
|
"INSERT INTO nodes (name, uuid, hy2_password) VALUES (?, ?, ?)",
|
|
("默认节点", uuid, hy2),
|
|
)
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def verify_admin(username: str, password: str) -> bool:
|
|
conn = connect()
|
|
row = conn.execute(
|
|
"SELECT password_hash FROM admin WHERE username = ?", (username,)
|
|
).fetchone()
|
|
conn.close()
|
|
if row is None:
|
|
return False
|
|
return _verify_password(row["password_hash"], password)
|
|
|
|
|
|
def list_nodes() -> list[dict]:
|
|
conn = connect()
|
|
rows = conn.execute(
|
|
"SELECT id, name, uuid, hy2_password, enabled, created_at "
|
|
"FROM nodes ORDER BY id DESC"
|
|
).fetchall()
|
|
conn.close()
|
|
return [dict(row) for row in rows]
|
|
|
|
|
|
def add_node(name: str) -> dict:
|
|
name = name.strip() or "未命名节点"
|
|
uuid, hy2 = _generate_credentials()
|
|
conn = connect()
|
|
cur = conn.execute(
|
|
"INSERT INTO nodes (name, uuid, hy2_password) VALUES (?, ?, ?)",
|
|
(name, uuid, hy2),
|
|
)
|
|
node_id = cur.lastrowid
|
|
row = conn.execute("SELECT * FROM nodes WHERE id = ?", (node_id,)).fetchone()
|
|
conn.commit()
|
|
conn.close()
|
|
return dict(row)
|
|
|
|
|
|
def delete_node(node_id: int) -> bool:
|
|
conn = connect()
|
|
cur = conn.execute("DELETE FROM nodes WHERE id = ?", (node_id,))
|
|
conn.commit()
|
|
deleted = cur.rowcount > 0
|
|
conn.close()
|
|
return deleted
|
|
|
|
|
|
def node_count() -> int:
|
|
conn = connect()
|
|
count = conn.execute("SELECT COUNT(*) AS c FROM nodes").fetchone()["c"]
|
|
conn.close()
|
|
return count
|
|
|
|
|
|
def _generate_credentials() -> tuple[str, str]:
|
|
uuid = subprocess.check_output(["sing-box", "generate", "uuid"], text=True).strip()
|
|
hy2 = secrets.token_urlsafe(18)[:24]
|
|
return uuid, hy2
|