Files
jiedian/panel/stats.py
T
dekun 6d82c7eb07 fix: improve online detection when Clash API omits user metadata
Match connections by multiple auth keys, fall back to global traffic for
single-node setups, and document offline-status troubleshooting.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-16 10:22:13 +08:00

322 lines
10 KiB
Python

"""从 sing-box Clash API 采集节点连接与流量(官方预编译包不含 v2ray_api)。"""
from __future__ import annotations
import json
import os
import sqlite3
import time
import urllib.error
import urllib.request
from pathlib import Path
from db import connect, list_nodes
ROOT = Path(os.environ.get("JIEDIAN_ROOT", Path(__file__).resolve().parents[1]))
ENV_FILE = ROOT / ".env"
CLASH_ADDR = "127.0.0.1:9090"
_speed_cache: dict[int, tuple[float, int, int]] = {}
_conn_cache: dict[str, dict[str, int | str]] = {}
def _load_env() -> dict[str, str]:
env: dict[str, str] = {}
if not ENV_FILE.exists():
return env
for line in ENV_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
key, _, value = line.partition("=")
env[key.strip()] = value.strip()
return env
def format_bytes(num: int | float) -> str:
n = float(num)
for unit in ("B", "KB", "MB", "GB", "TB"):
if n < 1024 or unit == "TB":
if unit == "B":
return f"{int(n)} B"
return f"{n:.1f} {unit}"
n /= 1024
return f"{n:.1f} PB"
def format_speed(num: float) -> str:
return f"{format_bytes(num)}/s"
def fetch_clash_connections() -> tuple[list[dict], bool]:
env = _load_env()
secret = env.get("CLASH_API_SECRET", "")
url = f"http://{CLASH_ADDR}/connections"
req = urllib.request.Request(url)
if secret:
req.add_header("Authorization", f"Bearer {secret}")
try:
with urllib.request.urlopen(req, timeout=3) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError):
return [], False
return payload.get("connections") or [], True
def fetch_clash_traffic() -> tuple[int, int, bool]:
"""返回 (upload B/s, download B/s, ok)。"""
env = _load_env()
secret = env.get("CLASH_API_SECRET", "")
url = f"http://{CLASH_ADDR}/traffic"
req = urllib.request.Request(url)
if secret:
req.add_header("Authorization", f"Bearer {secret}")
try:
with urllib.request.urlopen(req, timeout=3) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError):
return 0, 0, False
return int(payload.get("up") or 0), int(payload.get("down") or 0), True
def _connection_user(conn: dict) -> str:
meta = conn.get("metadata") or {}
for key in ("user", "uid", "auth_user", "auth", "username"):
val = meta.get(key)
if val:
return str(val)
for key in ("user", "uid"):
val = conn.get(key)
if val:
return str(val)
return ""
def _connection_meta(conn: dict) -> str:
meta = conn.get("metadata") or {}
parts = [
str(meta.get("type") or ""),
str(meta.get("network") or ""),
str(meta.get("inbound") or meta.get("inboundTag") or ""),
str(meta.get("inboundType") or ""),
]
return " ".join(parts).lower()
def _node_auth_keys(node: dict) -> set[str]:
keys = {node["uuid"]}
if node.get("hy2_password"):
keys.add(node["hy2_password"])
return keys
def _match_connection(conn: dict, node: dict) -> bool:
user = _connection_user(conn)
if user and user in _node_auth_keys(node):
return True
meta = _connection_meta(conn)
if "hysteria" in meta and node.get("hy2_password"):
# 旧配置未设置 hy2 name 时,Clash API 可能不带 user
if user == node["hy2_password"]:
return True
return False
def _connection_id(conn: dict) -> str:
return str(conn.get("id") or "")
def _ensure_traffic_schema(conn: sqlite3.Connection) -> None:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS traffic_counters (
node_id INTEGER PRIMARY KEY,
upload_total INTEGER NOT NULL DEFAULT 0,
download_total INTEGER NOT NULL DEFAULT 0,
snapshot_upload INTEGER NOT NULL DEFAULT 0,
snapshot_download INTEGER NOT NULL DEFAULT 0,
updated_at TEXT,
FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
);
"""
)
for row in conn.execute("SELECT id FROM nodes").fetchall():
conn.execute(
"INSERT OR IGNORE INTO traffic_counters (node_id) VALUES (?)",
(row["id"],),
)
def _add_closed_traffic(node_id: int, upload: int, download: int) -> None:
if upload <= 0 and download <= 0:
return
conn = connect()
_ensure_traffic_schema(conn)
conn.execute(
"""
UPDATE traffic_counters
SET upload_total = upload_total + ?,
download_total = download_total + ?,
updated_at = datetime('now')
WHERE node_id = ?
""",
(upload, download, node_id),
)
conn.commit()
conn.close()
def _get_stored_totals(node_id: int) -> tuple[int, int]:
conn = connect()
_ensure_traffic_schema(conn)
row = conn.execute(
"SELECT upload_total, download_total FROM traffic_counters WHERE node_id = ?",
(node_id,),
).fetchone()
conn.close()
if row is None:
return 0, 0
return int(row["upload_total"]), int(row["download_total"])
def _sync_connections(
connections: list[dict],
nodes: list[dict],
uuid_to_node: dict[str, int],
) -> dict[str, tuple[int, int]]:
"""同步连接缓存,断开连接时写入累计流量,返回各用户当前活跃会话流量。"""
seen: set[str] = set()
active: dict[str, tuple[int, int]] = {}
single_node = len(nodes) == 1
only_uuid = nodes[0]["uuid"] if single_node else ""
for conn in connections:
matched_uuid = ""
for node in nodes:
if _match_connection(conn, node):
matched_uuid = node["uuid"]
break
if not matched_uuid and single_node and connections:
matched_uuid = only_uuid
if matched_uuid not in uuid_to_node:
continue
cid = _connection_id(conn)
if not cid:
continue
upload = int(conn.get("upload") or 0)
download = int(conn.get("download") or 0)
seen.add(cid)
prev = _conn_cache.get(cid)
if prev and prev["uuid"] == matched_uuid:
prev_up = int(prev["upload"])
prev_down = int(prev["download"])
if upload < prev_up or download < prev_down:
_add_closed_traffic(uuid_to_node[matched_uuid], prev_up, prev_down)
_conn_cache[cid] = {
"uuid": matched_uuid,
"upload": upload,
"download": download,
}
cur_up, cur_down = active.get(matched_uuid, (0, 0))
active[matched_uuid] = (cur_up + upload, cur_down + download)
for cid in list(_conn_cache.keys()):
if cid in seen:
continue
info = _conn_cache.pop(cid)
user = str(info["uuid"])
node_id = uuid_to_node.get(user)
if node_id:
_add_closed_traffic(node_id, int(info["upload"]), int(info["download"]))
return active
def _calc_speed(node_id: int, up: int, down: int) -> tuple[float, float]:
now = time.time()
prev = _speed_cache.get(node_id)
_speed_cache[node_id] = (now, up, down)
if not prev:
return 0.0, 0.0
t0, u0, d0 = prev
dt = now - t0
if dt <= 0:
return 0.0, 0.0
return max(0.0, (up - u0) / dt), max(0.0, (down - d0) / dt)
def collect_node_stats() -> dict:
nodes = list_nodes()
uuid_to_node = {node["uuid"]: int(node["id"]) for node in nodes}
connections, clash_ok = fetch_clash_connections()
traffic_up, traffic_down, traffic_ok = fetch_clash_traffic()
active_by_uuid = _sync_connections(connections, nodes, uuid_to_node)
singbox_ok = clash_ok or traffic_ok
single_node = len(nodes) == 1
has_connections = len(connections) > 0
global_active = (traffic_up + traffic_down) > 512
result_nodes: dict[str, dict] = {}
summary_online = 0
summary_up_speed = 0.0
summary_down_speed = 0.0
for node in nodes:
uid = node["uuid"]
node_id = int(node["id"])
stored_up, stored_down = _get_stored_totals(node_id)
session_up, session_down = active_by_uuid.get(uid, (0, 0))
display_up = stored_up + session_up
display_down = stored_down + session_down
up_speed, down_speed = _calc_speed(node_id, display_up, display_down)
matched = [c for c in connections if _match_connection(c, node)]
if not matched and single_node and has_connections:
matched = connections
if not matched and single_node and global_active:
up_speed = float(traffic_up)
down_speed = float(traffic_down)
online = (
len(matched) > 0
or (up_speed + down_speed) > 512
or (single_node and global_active)
)
if online:
summary_online += 1
summary_up_speed += up_speed
summary_down_speed += down_speed
result_nodes[str(node_id)] = {
"online": online,
"connections": len(matched),
"upload_speed": round(up_speed),
"download_speed": round(down_speed),
"upload_total": display_up,
"download_total": display_down,
"upload_speed_human": format_speed(up_speed),
"download_speed_human": format_speed(down_speed),
"upload_total_human": format_bytes(display_up),
"download_total_human": format_bytes(display_down),
}
return {
"ok": True,
"singbox": clash_ok,
"nodes": result_nodes,
"summary": {
"online": summary_online,
"total_nodes": len(nodes),
"upload_speed": round(summary_up_speed),
"download_speed": round(summary_down_speed),
"upload_speed_human": format_speed(summary_up_speed),
"download_speed_human": format_speed(summary_down_speed),
},
}