"""从 sing-box Clash API 采集节点连接与流量(官方预编译包不含 v2ray_api)。""" from __future__ import annotations import json import os import re import sqlite3 import subprocess import time import urllib.error import urllib.request from pathlib import Path from db import connect, list_nodes from nodes_util import hy2_inbound_tag, ordered_nodes ROOT = Path(os.environ.get("JIEDIAN_ROOT", Path(__file__).resolve().parents[1])) ENV_FILE = ROOT / ".env" CLASH_ADDR = "127.0.0.1:9090" _VLESS_INBOUND = "vless-reality-in" _LOG_USER_RE = re.compile( r"\[([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\]\s+inbound connection" ) _LOG_INDEX_RE = re.compile(r"\[(\d+)\] inbound connection") _speed_cache: dict[int, tuple[float, int, int]] = {} _conn_cache: dict[str, dict[str, int | str]] = {} _global_bytes_cache: tuple[float, int, int] | None = None def _load_env() -> dict[str, str]: env: dict[str, str] = {} if not ENV_FILE.exists(): return env for line in ENV_FILE.read_text(encoding="utf-8").splitlines(): line = line.strip() if not line or line.startswith("#") or "=" not in line: continue key, _, value = line.partition("=") env[key.strip()] = value.strip() return env def format_bytes(num: int | float) -> str: n = float(num) for unit in ("B", "KB", "MB", "GB", "TB"): if n < 1024 or unit == "TB": if unit == "B": return f"{int(n)} B" return f"{n:.1f} {unit}" n /= 1024 return f"{n:.1f} PB" def format_speed(num: float) -> str: return f"{format_bytes(num)}/s" def fetch_clash_connections() -> tuple[list[dict], bool]: env = _load_env() secret = env.get("CLASH_API_SECRET", "") url = f"http://{CLASH_ADDR}/connections" req = urllib.request.Request(url) if secret: req.add_header("Authorization", f"Bearer {secret}") try: with urllib.request.urlopen(req, timeout=3) as resp: payload = json.loads(resp.read().decode("utf-8")) except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError): return [], False return payload.get("connections") or [], True def fetch_recent_log_uuids(nodes: list[dict]) -> set[str]: """sing-box Clash API 不导出 user 字段,VLESS 多用户需从近期日志补全在线 UUID。""" try: proc = subprocess.run( [ "journalctl", "-u", "sing-box", "--since", "3 min ago", "--no-pager", "-o", "cat", ], capture_output=True, text=True, timeout=2, ) except (OSError, subprocess.TimeoutExpired): return set() if proc.returncode != 0: return set() known = {node["uuid"] for node in nodes} index_to_uuid = {i: node["uuid"] for i, node in enumerate(ordered_nodes(nodes))} active: set[str] = set() for match in _LOG_USER_RE.finditer(proc.stdout): uid = match.group(1) if uid in known: active.add(uid) for match in _LOG_INDEX_RE.finditer(proc.stdout): uid = index_to_uuid.get(int(match.group(1))) if uid: active.add(uid) return active def _global_conn_speed(connections: list[dict]) -> tuple[float, float]: """从 /connections 汇总字节增量估算全局速率(/traffic 为 WebSocket 流,不能同步 HTTP 读)。""" total_up = sum(int(c.get("upload") or 0) for c in connections) total_down = sum(int(c.get("download") or 0) for c in connections) now = time.time() global _global_bytes_cache prev = _global_bytes_cache _global_bytes_cache = (now, total_up, total_down) if not prev: return 0.0, 0.0 t0, u0, d0 = prev dt = now - t0 if dt <= 0: return 0.0, 0.0 return max(0.0, (total_up - u0) / dt), max(0.0, (total_down - d0) / dt) def _connection_inbound_tag(conn: dict) -> str: meta = conn.get("metadata") or {} inbound_type = str(meta.get("type") or "") if "/" in inbound_type: return inbound_type.split("/", 1)[1] return inbound_type def _connection_user(conn: dict) -> str: meta = conn.get("metadata") or {} for key in ("user", "uid", "auth_user", "auth", "username"): val = meta.get(key) if val: return str(val) for key in ("user", "uid"): val = conn.get(key) if val: return str(val) return "" def _node_auth_keys(node: dict) -> set[str]: keys = {node["uuid"]} if node.get("hy2_password"): keys.add(node["hy2_password"]) return keys def _match_connection(conn: dict, node: dict, *, single_node: bool = False) -> bool: user = _connection_user(conn) if user and user in _node_auth_keys(node): return True tag = _connection_inbound_tag(conn) node_id = int(node["id"]) expected_hy2 = hy2_inbound_tag(node_id) if tag == expected_hy2: return True if tag == "hysteria2-in" and single_node: return True return False def _match_vless_connection(conn: dict, node: dict, log_active: set[str]) -> bool: tag = _connection_inbound_tag(conn) if tag != _VLESS_INBOUND: return False user = _connection_user(conn) if user == node["uuid"]: return True # 共享 VLESS inbound 无法从 Clash API 区分用户;仅唯一活跃用户时归因 return node["uuid"] in log_active and len(log_active) == 1 def _connection_id(conn: dict) -> str: return str(conn.get("id") or "") def _ensure_traffic_schema(conn: sqlite3.Connection) -> None: conn.executescript( """ CREATE TABLE IF NOT EXISTS traffic_counters ( node_id INTEGER PRIMARY KEY, upload_total INTEGER NOT NULL DEFAULT 0, download_total INTEGER NOT NULL DEFAULT 0, snapshot_upload INTEGER NOT NULL DEFAULT 0, snapshot_download INTEGER NOT NULL DEFAULT 0, updated_at TEXT, FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE ); """ ) for row in conn.execute("SELECT id FROM nodes").fetchall(): conn.execute( "INSERT OR IGNORE INTO traffic_counters (node_id) VALUES (?)", (row["id"],), ) def _add_closed_traffic(node_id: int, upload: int, download: int) -> None: if upload <= 0 and download <= 0: return conn = connect() _ensure_traffic_schema(conn) conn.execute( """ UPDATE traffic_counters SET upload_total = upload_total + ?, download_total = download_total + ?, updated_at = datetime('now') WHERE node_id = ? """, (upload, download, node_id), ) conn.commit() conn.close() def _get_stored_totals(node_id: int) -> tuple[int, int]: conn = connect() _ensure_traffic_schema(conn) row = conn.execute( "SELECT upload_total, download_total FROM traffic_counters WHERE node_id = ?", (node_id,), ).fetchone() conn.close() if row is None: return 0, 0 return int(row["upload_total"]), int(row["download_total"]) def _sync_connections( connections: list[dict], nodes: list[dict], uuid_to_node: dict[str, int], log_active: set[str], ) -> dict[str, tuple[int, int]]: """同步连接缓存,断开连接时写入累计流量,返回各用户当前活跃会话流量。""" seen: set[str] = set() active: dict[str, tuple[int, int]] = {} single_node = len(nodes) == 1 only_uuid = nodes[0]["uuid"] if single_node else "" for conn in connections: matched_uuid = "" for node in nodes: if _match_connection(conn, node, single_node=single_node) or _match_vless_connection( conn, node, log_active ): matched_uuid = node["uuid"] break if not matched_uuid and single_node and connections: matched_uuid = only_uuid if matched_uuid not in uuid_to_node: continue cid = _connection_id(conn) if not cid: continue upload = int(conn.get("upload") or 0) download = int(conn.get("download") or 0) seen.add(cid) prev = _conn_cache.get(cid) if prev and prev["uuid"] == matched_uuid: prev_up = int(prev["upload"]) prev_down = int(prev["download"]) if upload < prev_up or download < prev_down: _add_closed_traffic(uuid_to_node[matched_uuid], prev_up, prev_down) _conn_cache[cid] = { "uuid": matched_uuid, "upload": upload, "download": download, } cur_up, cur_down = active.get(matched_uuid, (0, 0)) active[matched_uuid] = (cur_up + upload, cur_down + download) for cid in list(_conn_cache.keys()): if cid in seen: continue info = _conn_cache.pop(cid) user = str(info["uuid"]) node_id = uuid_to_node.get(user) if node_id: _add_closed_traffic(node_id, int(info["upload"]), int(info["download"])) return active def _calc_speed(node_id: int, up: int, down: int) -> tuple[float, float]: now = time.time() prev = _speed_cache.get(node_id) _speed_cache[node_id] = (now, up, down) if not prev: return 0.0, 0.0 t0, u0, d0 = prev dt = now - t0 if dt <= 0: return 0.0, 0.0 return max(0.0, (up - u0) / dt), max(0.0, (down - d0) / dt) def _connections_for_node( connections: list[dict], node: dict, nodes: list[dict], log_active: set[str], ) -> list[dict | None]: single_node = len(nodes) == 1 matched = [ c for c in connections if _match_connection(c, node, single_node=single_node) or _match_vless_connection(c, node, log_active) ] if matched: return matched if single_node and connections: return connections if node["uuid"] in log_active: vless_hits = [c for c in connections if _connection_inbound_tag(c) == _VLESS_INBOUND] if vless_hits: return vless_hits return [None] return [] def collect_node_stats() -> dict: nodes = list_nodes() uuid_to_node = {node["uuid"]: int(node["id"]) for node in nodes} connections, clash_ok = fetch_clash_connections() log_active = fetch_recent_log_uuids(nodes) if len(nodes) > 1 else set() active_by_uuid = _sync_connections(connections, nodes, uuid_to_node, log_active) single_node = len(nodes) == 1 has_connections = len(connections) > 0 global_up_speed, global_down_speed = _global_conn_speed(connections) if clash_ok else (0.0, 0.0) global_active = (global_up_speed + global_down_speed) > 512 result_nodes: dict[str, dict] = {} summary_online = 0 summary_up_speed = 0.0 summary_down_speed = 0.0 for node in nodes: uid = node["uuid"] node_id = int(node["id"]) stored_up, stored_down = _get_stored_totals(node_id) session_up, session_down = active_by_uuid.get(uid, (0, 0)) display_up = stored_up + session_up display_down = stored_down + session_down up_speed, down_speed = _calc_speed(node_id, display_up, display_down) matched = _connections_for_node(connections, node, nodes, log_active) if not matched and single_node and global_active: up_speed = global_up_speed down_speed = global_down_speed online = ( len(matched) > 0 or (session_up + session_down) > 0 or uid in log_active or (up_speed + down_speed) > 512 or (single_node and (global_active or has_connections)) ) if online: summary_online += 1 summary_up_speed += up_speed summary_down_speed += down_speed result_nodes[str(node_id)] = { "online": online, "connections": len(matched), "upload_speed": round(up_speed), "download_speed": round(down_speed), "upload_total": display_up, "download_total": display_down, "upload_speed_human": format_speed(up_speed), "download_speed_human": format_speed(down_speed), "upload_total_human": format_bytes(display_up), "download_total_human": format_bytes(display_down), } return { "ok": True, "singbox": clash_ok, "nodes": result_nodes, "summary": { "online": summary_online, "total_nodes": len(nodes), "upload_speed": round(summary_up_speed), "download_speed": round(summary_down_speed), "upload_speed_human": format_speed(summary_up_speed), "download_speed_human": format_speed(summary_down_speed), }, }