"""SQLite 数据库:管理员账号与节点。""" from __future__ import annotations import hashlib import hmac import os import secrets import sqlite3 import subprocess from pathlib import Path ROOT = Path(os.environ.get("JIEDIAN_ROOT", Path(__file__).resolve().parents[1])) DB_FILE = ROOT / "data" / "nodes.db" _PBKDF2_ITERATIONS = 600000 def _hash_password(password: str) -> str: salt = secrets.token_hex(16) digest = hashlib.pbkdf2_hmac( "sha256", password.encode(), salt.encode(), _PBKDF2_ITERATIONS ) return f"pbkdf2:sha256:{_PBKDF2_ITERATIONS}${salt}${digest.hex()}" def _verify_password(stored: str, password: str) -> bool: if not stored or stored.count("$") < 2: return False method, salt, expected = stored.split("$", 2) if not method.startswith("pbkdf2:sha256:"): return False iterations = int(method.rsplit(":", 1)[1]) digest = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), iterations) return hmac.compare_digest(digest.hex(), expected) def connect() -> sqlite3.Connection: DB_FILE.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(DB_FILE) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") return conn def init_db(env: dict[str, str]) -> None: conn = connect() conn.executescript( """ CREATE TABLE IF NOT EXISTS admin ( id INTEGER PRIMARY KEY, username TEXT UNIQUE NOT NULL, password_hash TEXT NOT NULL ); CREATE TABLE IF NOT EXISTS nodes ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, uuid TEXT NOT NULL UNIQUE, hy2_password TEXT NOT NULL, enabled INTEGER NOT NULL DEFAULT 1, created_at TEXT NOT NULL DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS traffic_counters ( node_id INTEGER PRIMARY KEY, upload_total INTEGER NOT NULL DEFAULT 0, download_total INTEGER NOT NULL DEFAULT 0, snapshot_upload INTEGER NOT NULL DEFAULT 0, snapshot_download INTEGER NOT NULL DEFAULT 0, updated_at TEXT, FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE ); """ ) username = env.get("PANEL_USERNAME", "admin") password = env.get("PANEL_PASSWORD") if not password: raise SystemExit("请在 .env 中设置 PANEL_PASSWORD") conn.execute("DELETE FROM admin") conn.execute( "INSERT INTO admin (username, password_hash) VALUES (?, ?)", (username, _hash_password(password)), ) count = conn.execute("SELECT COUNT(*) AS c FROM nodes").fetchone()["c"] if count == 0: uuid, hy2 = _generate_credentials() conn.execute( "INSERT INTO nodes (name, uuid, hy2_password) VALUES (?, ?, ?)", ("默认节点", uuid, hy2), ) for row in conn.execute("SELECT id FROM nodes").fetchall(): conn.execute( "INSERT OR IGNORE INTO traffic_counters (node_id) VALUES (?)", (row["id"],), ) conn.commit() conn.close() def verify_admin(username: str, password: str) -> bool: conn = connect() row = conn.execute( "SELECT password_hash FROM admin WHERE username = ?", (username,) ).fetchone() conn.close() if row is None: return False return _verify_password(row["password_hash"], password) def list_nodes() -> list[dict]: conn = connect() rows = conn.execute( "SELECT id, name, uuid, hy2_password, enabled, created_at " "FROM nodes ORDER BY id DESC" ).fetchall() conn.close() return [dict(row) for row in rows] def add_node(name: str) -> dict: name = name.strip() or "未命名节点" uuid, hy2 = _generate_credentials() conn = connect() cur = conn.execute( "INSERT INTO nodes (name, uuid, hy2_password) VALUES (?, ?, ?)", (name, uuid, hy2), ) node_id = cur.lastrowid row = conn.execute("SELECT * FROM nodes WHERE id = ?", (node_id,)).fetchone() conn.execute( "INSERT OR IGNORE INTO traffic_counters (node_id) VALUES (?)", (node_id,), ) conn.commit() conn.close() return dict(row) def delete_node(node_id: int) -> bool: conn = connect() cur = conn.execute("DELETE FROM nodes WHERE id = ?", (node_id,)) conn.commit() deleted = cur.rowcount > 0 conn.close() return deleted def node_count() -> int: conn = connect() count = conn.execute("SELECT COUNT(*) AS c FROM nodes").fetchone()["c"] conn.close() return count def _generate_credentials() -> tuple[str, str]: uuid = subprocess.check_output(["sing-box", "generate", "uuid"], text=True).strip() hy2 = secrets.token_urlsafe(18)[:24] return uuid, hy2