c9895133cb
sing-box Hy2 stays on 8443+; port 443 VLESS uses Xray which pairs reliably with v2rayN/Xray-core clients. Co-authored-by: Cursor <cursoragent@cursor.com>
435 lines
14 KiB
Python
435 lines
14 KiB
Python
"""从 sing-box Clash API 采集节点连接与流量(官方预编译包不含 v2ray_api)。"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import re
|
|
import sqlite3
|
|
import subprocess
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
from db import connect, list_nodes
|
|
from nodes_util import hy2_inbound_tag, ordered_nodes
|
|
|
|
ROOT = Path(os.environ.get("JIEDIAN_ROOT", Path(__file__).resolve().parents[1]))
|
|
ENV_FILE = ROOT / ".env"
|
|
|
|
CLASH_ADDR = "127.0.0.1:9090"
|
|
_VLESS_INBOUND = "vless-reality-in"
|
|
_XRAY_ACCESS_LOG = Path("/var/log/xray/access.log")
|
|
_LOG_USER_RE = re.compile(
|
|
r"\[([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})\]\s+inbound connection"
|
|
)
|
|
_LOG_INDEX_RE = re.compile(r"\[(\d+)\] inbound connection")
|
|
_LOG_XRAY_UUID_RE = re.compile(
|
|
r"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
|
|
)
|
|
|
|
_speed_cache: dict[int, tuple[float, int, int]] = {}
|
|
_conn_cache: dict[str, dict[str, int | str]] = {}
|
|
_global_bytes_cache: tuple[float, int, int] | None = None
|
|
|
|
|
|
def _load_env() -> dict[str, str]:
|
|
env: dict[str, str] = {}
|
|
if not ENV_FILE.exists():
|
|
return env
|
|
for line in ENV_FILE.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, value = line.partition("=")
|
|
env[key.strip()] = value.strip()
|
|
return env
|
|
|
|
|
|
def format_bytes(num: int | float) -> str:
|
|
n = float(num)
|
|
for unit in ("B", "KB", "MB", "GB", "TB"):
|
|
if n < 1024 or unit == "TB":
|
|
if unit == "B":
|
|
return f"{int(n)} B"
|
|
return f"{n:.1f} {unit}"
|
|
n /= 1024
|
|
return f"{n:.1f} PB"
|
|
|
|
|
|
def format_speed(num: float) -> str:
|
|
return f"{format_bytes(num)}/s"
|
|
|
|
|
|
def fetch_clash_connections() -> tuple[list[dict], bool]:
|
|
env = _load_env()
|
|
secret = env.get("CLASH_API_SECRET", "")
|
|
url = f"http://{CLASH_ADDR}/connections"
|
|
req = urllib.request.Request(url)
|
|
if secret:
|
|
req.add_header("Authorization", f"Bearer {secret}")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=3) as resp:
|
|
payload = json.loads(resp.read().decode("utf-8"))
|
|
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError):
|
|
return [], False
|
|
return payload.get("connections") or [], True
|
|
|
|
|
|
def fetch_xray_access_uuids(nodes: list[dict]) -> set[str]:
|
|
"""VLESS Reality 由 Xray 承载,从 access.log 读取近期活跃 UUID。"""
|
|
if not _XRAY_ACCESS_LOG.exists():
|
|
return set()
|
|
try:
|
|
text = _XRAY_ACCESS_LOG.read_text(encoding="utf-8", errors="ignore")
|
|
except OSError:
|
|
return set()
|
|
known = {node["uuid"] for node in nodes}
|
|
active: set[str] = set()
|
|
for line in text.splitlines()[-400:]:
|
|
if "accepted" not in line:
|
|
continue
|
|
for match in _LOG_XRAY_UUID_RE.finditer(line):
|
|
uid = match.group(1)
|
|
if uid in known:
|
|
active.add(uid)
|
|
return active
|
|
|
|
|
|
def fetch_recent_log_uuids(nodes: list[dict]) -> set[str]:
|
|
"""sing-box Clash API 不导出 user 字段,VLESS 多用户需从近期日志补全在线 UUID。"""
|
|
try:
|
|
proc = subprocess.run(
|
|
[
|
|
"journalctl",
|
|
"-u",
|
|
"sing-box",
|
|
"--since",
|
|
"3 min ago",
|
|
"--no-pager",
|
|
"-o",
|
|
"cat",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=2,
|
|
)
|
|
except (OSError, subprocess.TimeoutExpired):
|
|
return set()
|
|
if proc.returncode != 0:
|
|
return set()
|
|
|
|
known = {node["uuid"] for node in nodes}
|
|
index_to_uuid = {i: node["uuid"] for i, node in enumerate(ordered_nodes(nodes))}
|
|
active: set[str] = set()
|
|
for match in _LOG_USER_RE.finditer(proc.stdout):
|
|
uid = match.group(1)
|
|
if uid in known:
|
|
active.add(uid)
|
|
for match in _LOG_INDEX_RE.finditer(proc.stdout):
|
|
uid = index_to_uuid.get(int(match.group(1)))
|
|
if uid:
|
|
active.add(uid)
|
|
return active
|
|
|
|
|
|
def _global_conn_speed(connections: list[dict]) -> tuple[float, float]:
|
|
"""从 /connections 汇总字节增量估算全局速率(/traffic 为 WebSocket 流,不能同步 HTTP 读)。"""
|
|
total_up = sum(int(c.get("upload") or 0) for c in connections)
|
|
total_down = sum(int(c.get("download") or 0) for c in connections)
|
|
now = time.time()
|
|
global _global_bytes_cache
|
|
prev = _global_bytes_cache
|
|
_global_bytes_cache = (now, total_up, total_down)
|
|
if not prev:
|
|
return 0.0, 0.0
|
|
t0, u0, d0 = prev
|
|
dt = now - t0
|
|
if dt <= 0:
|
|
return 0.0, 0.0
|
|
return max(0.0, (total_up - u0) / dt), max(0.0, (total_down - d0) / dt)
|
|
|
|
|
|
def _connection_inbound_tag(conn: dict) -> str:
|
|
meta = conn.get("metadata") or {}
|
|
inbound_type = str(meta.get("type") or "")
|
|
if "/" in inbound_type:
|
|
return inbound_type.split("/", 1)[1]
|
|
return inbound_type
|
|
|
|
|
|
def _connection_user(conn: dict) -> str:
|
|
meta = conn.get("metadata") or {}
|
|
for key in ("user", "uid", "auth_user", "auth", "username"):
|
|
val = meta.get(key)
|
|
if val:
|
|
return str(val)
|
|
for key in ("user", "uid"):
|
|
val = conn.get(key)
|
|
if val:
|
|
return str(val)
|
|
return ""
|
|
|
|
|
|
def _node_auth_keys(node: dict) -> set[str]:
|
|
keys = {node["uuid"]}
|
|
if node.get("hy2_password"):
|
|
keys.add(node["hy2_password"])
|
|
return keys
|
|
|
|
|
|
def _match_connection(conn: dict, node: dict, *, single_node: bool = False) -> bool:
|
|
user = _connection_user(conn)
|
|
if user and user in _node_auth_keys(node):
|
|
return True
|
|
|
|
tag = _connection_inbound_tag(conn)
|
|
node_id = int(node["id"])
|
|
expected_hy2 = hy2_inbound_tag(node_id)
|
|
if tag == expected_hy2:
|
|
return True
|
|
if tag == "hysteria2-in" and single_node:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _match_vless_connection(conn: dict, node: dict, log_active: set[str]) -> bool:
|
|
tag = _connection_inbound_tag(conn)
|
|
if tag != _VLESS_INBOUND:
|
|
return False
|
|
user = _connection_user(conn)
|
|
if user == node["uuid"]:
|
|
return True
|
|
# 共享 VLESS inbound 无法从 Clash API 区分用户;仅唯一活跃用户时归因
|
|
return node["uuid"] in log_active and len(log_active) == 1
|
|
|
|
|
|
def _connection_id(conn: dict) -> str:
|
|
return str(conn.get("id") or "")
|
|
|
|
|
|
def _ensure_traffic_schema(conn: sqlite3.Connection) -> None:
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS traffic_counters (
|
|
node_id INTEGER PRIMARY KEY,
|
|
upload_total INTEGER NOT NULL DEFAULT 0,
|
|
download_total INTEGER NOT NULL DEFAULT 0,
|
|
snapshot_upload INTEGER NOT NULL DEFAULT 0,
|
|
snapshot_download INTEGER NOT NULL DEFAULT 0,
|
|
updated_at TEXT,
|
|
FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
|
|
);
|
|
"""
|
|
)
|
|
for row in conn.execute("SELECT id FROM nodes").fetchall():
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO traffic_counters (node_id) VALUES (?)",
|
|
(row["id"],),
|
|
)
|
|
|
|
|
|
def _add_closed_traffic(node_id: int, upload: int, download: int) -> None:
|
|
if upload <= 0 and download <= 0:
|
|
return
|
|
conn = connect()
|
|
_ensure_traffic_schema(conn)
|
|
conn.execute(
|
|
"""
|
|
UPDATE traffic_counters
|
|
SET upload_total = upload_total + ?,
|
|
download_total = download_total + ?,
|
|
updated_at = datetime('now')
|
|
WHERE node_id = ?
|
|
""",
|
|
(upload, download, node_id),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _get_stored_totals(node_id: int) -> tuple[int, int]:
|
|
conn = connect()
|
|
_ensure_traffic_schema(conn)
|
|
row = conn.execute(
|
|
"SELECT upload_total, download_total FROM traffic_counters WHERE node_id = ?",
|
|
(node_id,),
|
|
).fetchone()
|
|
conn.close()
|
|
if row is None:
|
|
return 0, 0
|
|
return int(row["upload_total"]), int(row["download_total"])
|
|
|
|
|
|
def _sync_connections(
|
|
connections: list[dict],
|
|
nodes: list[dict],
|
|
uuid_to_node: dict[str, int],
|
|
log_active: set[str],
|
|
) -> dict[str, tuple[int, int]]:
|
|
"""同步连接缓存,断开连接时写入累计流量,返回各用户当前活跃会话流量。"""
|
|
seen: set[str] = set()
|
|
active: dict[str, tuple[int, int]] = {}
|
|
single_node = len(nodes) == 1
|
|
only_uuid = nodes[0]["uuid"] if single_node else ""
|
|
|
|
for conn in connections:
|
|
matched_uuid = ""
|
|
for node in nodes:
|
|
if _match_connection(conn, node, single_node=single_node) or _match_vless_connection(
|
|
conn, node, log_active
|
|
):
|
|
matched_uuid = node["uuid"]
|
|
break
|
|
if not matched_uuid and single_node and connections:
|
|
matched_uuid = only_uuid
|
|
|
|
if matched_uuid not in uuid_to_node:
|
|
continue
|
|
|
|
cid = _connection_id(conn)
|
|
if not cid:
|
|
continue
|
|
|
|
upload = int(conn.get("upload") or 0)
|
|
download = int(conn.get("download") or 0)
|
|
seen.add(cid)
|
|
|
|
prev = _conn_cache.get(cid)
|
|
if prev and prev["uuid"] == matched_uuid:
|
|
prev_up = int(prev["upload"])
|
|
prev_down = int(prev["download"])
|
|
if upload < prev_up or download < prev_down:
|
|
_add_closed_traffic(uuid_to_node[matched_uuid], prev_up, prev_down)
|
|
_conn_cache[cid] = {
|
|
"uuid": matched_uuid,
|
|
"upload": upload,
|
|
"download": download,
|
|
}
|
|
|
|
cur_up, cur_down = active.get(matched_uuid, (0, 0))
|
|
active[matched_uuid] = (cur_up + upload, cur_down + download)
|
|
|
|
for cid in list(_conn_cache.keys()):
|
|
if cid in seen:
|
|
continue
|
|
info = _conn_cache.pop(cid)
|
|
user = str(info["uuid"])
|
|
node_id = uuid_to_node.get(user)
|
|
if node_id:
|
|
_add_closed_traffic(node_id, int(info["upload"]), int(info["download"]))
|
|
|
|
return active
|
|
|
|
|
|
def _calc_speed(node_id: int, up: int, down: int) -> tuple[float, float]:
|
|
now = time.time()
|
|
prev = _speed_cache.get(node_id)
|
|
_speed_cache[node_id] = (now, up, down)
|
|
if not prev:
|
|
return 0.0, 0.0
|
|
t0, u0, d0 = prev
|
|
dt = now - t0
|
|
if dt <= 0:
|
|
return 0.0, 0.0
|
|
return max(0.0, (up - u0) / dt), max(0.0, (down - d0) / dt)
|
|
|
|
|
|
def _connections_for_node(
|
|
connections: list[dict],
|
|
node: dict,
|
|
nodes: list[dict],
|
|
log_active: set[str],
|
|
) -> list[dict | None]:
|
|
single_node = len(nodes) == 1
|
|
matched = [
|
|
c
|
|
for c in connections
|
|
if _match_connection(c, node, single_node=single_node)
|
|
or _match_vless_connection(c, node, log_active)
|
|
]
|
|
if matched:
|
|
return matched
|
|
|
|
if single_node and connections:
|
|
return connections
|
|
|
|
if node["uuid"] in log_active:
|
|
vless_hits = [c for c in connections if _connection_inbound_tag(c) == _VLESS_INBOUND]
|
|
if vless_hits:
|
|
return vless_hits
|
|
return [None]
|
|
return []
|
|
|
|
|
|
def collect_node_stats() -> dict:
|
|
nodes = list_nodes()
|
|
uuid_to_node = {node["uuid"]: int(node["id"]) for node in nodes}
|
|
connections, clash_ok = fetch_clash_connections()
|
|
log_active = fetch_recent_log_uuids(nodes) if len(nodes) > 1 else set()
|
|
log_active |= fetch_xray_access_uuids(nodes)
|
|
active_by_uuid = _sync_connections(connections, nodes, uuid_to_node, log_active)
|
|
single_node = len(nodes) == 1
|
|
has_connections = len(connections) > 0
|
|
global_up_speed, global_down_speed = _global_conn_speed(connections) if clash_ok else (0.0, 0.0)
|
|
global_active = (global_up_speed + global_down_speed) > 512
|
|
|
|
result_nodes: dict[str, dict] = {}
|
|
summary_online = 0
|
|
summary_up_speed = 0.0
|
|
summary_down_speed = 0.0
|
|
|
|
for node in nodes:
|
|
uid = node["uuid"]
|
|
node_id = int(node["id"])
|
|
stored_up, stored_down = _get_stored_totals(node_id)
|
|
session_up, session_down = active_by_uuid.get(uid, (0, 0))
|
|
display_up = stored_up + session_up
|
|
display_down = stored_down + session_down
|
|
up_speed, down_speed = _calc_speed(node_id, display_up, display_down)
|
|
|
|
matched = _connections_for_node(connections, node, nodes, log_active)
|
|
if not matched and single_node and global_active:
|
|
up_speed = global_up_speed
|
|
down_speed = global_down_speed
|
|
|
|
online = (
|
|
len(matched) > 0
|
|
or (session_up + session_down) > 0
|
|
or uid in log_active
|
|
or (up_speed + down_speed) > 512
|
|
or (single_node and (global_active or has_connections))
|
|
)
|
|
|
|
if online:
|
|
summary_online += 1
|
|
summary_up_speed += up_speed
|
|
summary_down_speed += down_speed
|
|
|
|
result_nodes[str(node_id)] = {
|
|
"online": online,
|
|
"connections": len(matched),
|
|
"upload_speed": round(up_speed),
|
|
"download_speed": round(down_speed),
|
|
"upload_total": display_up,
|
|
"download_total": display_down,
|
|
"upload_speed_human": format_speed(up_speed),
|
|
"download_speed_human": format_speed(down_speed),
|
|
"upload_total_human": format_bytes(display_up),
|
|
"download_total_human": format_bytes(display_down),
|
|
}
|
|
|
|
return {
|
|
"ok": True,
|
|
"singbox": clash_ok,
|
|
"nodes": result_nodes,
|
|
"summary": {
|
|
"online": summary_online,
|
|
"total_nodes": len(nodes),
|
|
"upload_speed": round(summary_up_speed),
|
|
"download_speed": round(summary_down_speed),
|
|
"upload_speed_human": format_speed(summary_up_speed),
|
|
"download_speed_human": format_speed(summary_down_speed),
|
|
},
|
|
}
|