Files

160 lines
4.7 KiB
Python

"""SQLite 数据库:管理员账号与节点。"""
from __future__ import annotations
import hashlib
import hmac
import os
import secrets
import sqlite3
import uuid
from pathlib import Path
ROOT = Path(os.environ.get("JIEDIAN_ROOT", Path(__file__).resolve().parents[1]))
DB_FILE = ROOT / "data" / "nodes.db"
_PBKDF2_ITERATIONS = 600000
def _hash_password(password: str) -> str:
salt = secrets.token_hex(16)
digest = hashlib.pbkdf2_hmac(
"sha256", password.encode(), salt.encode(), _PBKDF2_ITERATIONS
)
return f"pbkdf2:sha256:{_PBKDF2_ITERATIONS}${salt}${digest.hex()}"
def _verify_password(stored: str, password: str) -> bool:
if not stored or stored.count("$") < 2:
return False
method, salt, expected = stored.split("$", 2)
if not method.startswith("pbkdf2:sha256:"):
return False
iterations = int(method.rsplit(":", 1)[1])
digest = hashlib.pbkdf2_hmac("sha256", password.encode(), salt.encode(), iterations)
return hmac.compare_digest(digest.hex(), expected)
def connect() -> sqlite3.Connection:
DB_FILE.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(DB_FILE)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def init_db(env: dict[str, str]) -> None:
conn = connect()
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS admin (
id INTEGER PRIMARY KEY,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
uuid TEXT NOT NULL UNIQUE,
hy2_password TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS traffic_counters (
node_id INTEGER PRIMARY KEY,
upload_total INTEGER NOT NULL DEFAULT 0,
download_total INTEGER NOT NULL DEFAULT 0,
snapshot_upload INTEGER NOT NULL DEFAULT 0,
snapshot_download INTEGER NOT NULL DEFAULT 0,
updated_at TEXT,
FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
);
"""
)
username = env.get("PANEL_USERNAME", "admin")
password = env.get("PANEL_PASSWORD")
if not password:
raise SystemExit("请在 .env 中设置 PANEL_PASSWORD")
conn.execute("DELETE FROM admin")
conn.execute(
"INSERT INTO admin (username, password_hash) VALUES (?, ?)",
(username, _hash_password(password)),
)
count = conn.execute("SELECT COUNT(*) AS c FROM nodes").fetchone()["c"]
if count == 0:
uuid, hy2 = _generate_credentials()
conn.execute(
"INSERT INTO nodes (name, uuid, hy2_password) VALUES (?, ?, ?)",
("默认节点", uuid, hy2),
)
for row in conn.execute("SELECT id FROM nodes").fetchall():
conn.execute(
"INSERT OR IGNORE INTO traffic_counters (node_id) VALUES (?)",
(row["id"],),
)
conn.commit()
conn.close()
def verify_admin(username: str, password: str) -> bool:
conn = connect()
row = conn.execute(
"SELECT password_hash FROM admin WHERE username = ?", (username,)
).fetchone()
conn.close()
if row is None:
return False
return _verify_password(row["password_hash"], password)
def list_nodes() -> list[dict]:
conn = connect()
rows = conn.execute(
"SELECT id, name, uuid, hy2_password, enabled, created_at "
"FROM nodes ORDER BY id DESC"
).fetchall()
conn.close()
return [dict(row) for row in rows]
def add_node(name: str) -> dict:
name = name.strip() or "未命名节点"
uuid, hy2 = _generate_credentials()
conn = connect()
cur = conn.execute(
"INSERT INTO nodes (name, uuid, hy2_password) VALUES (?, ?, ?)",
(name, uuid, hy2),
)
node_id = cur.lastrowid
row = conn.execute("SELECT * FROM nodes WHERE id = ?", (node_id,)).fetchone()
conn.execute(
"INSERT OR IGNORE INTO traffic_counters (node_id) VALUES (?)",
(node_id,),
)
conn.commit()
conn.close()
return dict(row)
def delete_node(node_id: int) -> bool:
conn = connect()
cur = conn.execute("DELETE FROM nodes WHERE id = ?", (node_id,))
conn.commit()
deleted = cur.rowcount > 0
conn.close()
return deleted
def node_count() -> int:
conn = connect()
count = conn.execute("SELECT COUNT(*) AS c FROM nodes").fetchone()["c"]
conn.close()
return count
def _generate_credentials() -> tuple[str, str]:
return str(uuid.uuid4()), secrets.token_urlsafe(18)[:24]