33533d7ebc
Co-authored-by: Cursor <cursoragent@cursor.com>
326 lines
10 KiB
Python
326 lines
10 KiB
Python
"""从 sing-box Clash API 采集节点连接与流量(官方预编译包不含 v2ray_api)。"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
from db import connect, list_nodes
|
|
|
|
ROOT = Path(os.environ.get("JIEDIAN_ROOT", Path(__file__).resolve().parents[1]))
|
|
ENV_FILE = ROOT / ".env"
|
|
|
|
CLASH_ADDR = "127.0.0.1:9090"
|
|
|
|
_speed_cache: dict[int, tuple[float, int, int]] = {}
|
|
_conn_cache: dict[str, dict[str, int | str]] = {}
|
|
_global_bytes_cache: tuple[float, int, int] | None = None
|
|
|
|
|
|
def _load_env() -> dict[str, str]:
|
|
env: dict[str, str] = {}
|
|
if not ENV_FILE.exists():
|
|
return env
|
|
for line in ENV_FILE.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, value = line.partition("=")
|
|
env[key.strip()] = value.strip()
|
|
return env
|
|
|
|
|
|
def format_bytes(num: int | float) -> str:
|
|
n = float(num)
|
|
for unit in ("B", "KB", "MB", "GB", "TB"):
|
|
if n < 1024 or unit == "TB":
|
|
if unit == "B":
|
|
return f"{int(n)} B"
|
|
return f"{n:.1f} {unit}"
|
|
n /= 1024
|
|
return f"{n:.1f} PB"
|
|
|
|
|
|
def format_speed(num: float) -> str:
|
|
return f"{format_bytes(num)}/s"
|
|
|
|
|
|
def fetch_clash_connections() -> tuple[list[dict], bool]:
|
|
env = _load_env()
|
|
secret = env.get("CLASH_API_SECRET", "")
|
|
url = f"http://{CLASH_ADDR}/connections"
|
|
req = urllib.request.Request(url)
|
|
if secret:
|
|
req.add_header("Authorization", f"Bearer {secret}")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=3) as resp:
|
|
payload = json.loads(resp.read().decode("utf-8"))
|
|
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError):
|
|
return [], False
|
|
return payload.get("connections") or [], True
|
|
|
|
|
|
def _global_conn_speed(connections: list[dict]) -> tuple[float, float]:
|
|
"""从 /connections 汇总字节增量估算全局速率(/traffic 为 WebSocket 流,不能同步 HTTP 读)。"""
|
|
total_up = sum(int(c.get("upload") or 0) for c in connections)
|
|
total_down = sum(int(c.get("download") or 0) for c in connections)
|
|
now = time.time()
|
|
global _global_bytes_cache
|
|
prev = _global_bytes_cache
|
|
_global_bytes_cache = (now, total_up, total_down)
|
|
if not prev:
|
|
return 0.0, 0.0
|
|
t0, u0, d0 = prev
|
|
dt = now - t0
|
|
if dt <= 0:
|
|
return 0.0, 0.0
|
|
return max(0.0, (total_up - u0) / dt), max(0.0, (total_down - d0) / dt)
|
|
|
|
|
|
def _connection_user(conn: dict) -> str:
|
|
meta = conn.get("metadata") or {}
|
|
for key in ("user", "uid", "auth_user", "auth", "username"):
|
|
val = meta.get(key)
|
|
if val:
|
|
return str(val)
|
|
for key in ("user", "uid"):
|
|
val = conn.get(key)
|
|
if val:
|
|
return str(val)
|
|
return ""
|
|
|
|
|
|
def _connection_meta(conn: dict) -> str:
|
|
meta = conn.get("metadata") or {}
|
|
parts = [
|
|
str(meta.get("type") or ""),
|
|
str(meta.get("network") or ""),
|
|
str(meta.get("inbound") or meta.get("inboundTag") or ""),
|
|
str(meta.get("inboundType") or ""),
|
|
]
|
|
return " ".join(parts).lower()
|
|
|
|
|
|
def _node_auth_keys(node: dict) -> set[str]:
|
|
keys = {node["uuid"]}
|
|
if node.get("hy2_password"):
|
|
keys.add(node["hy2_password"])
|
|
return keys
|
|
|
|
|
|
def _match_connection(conn: dict, node: dict) -> bool:
|
|
user = _connection_user(conn)
|
|
if user and user in _node_auth_keys(node):
|
|
return True
|
|
meta = _connection_meta(conn)
|
|
if "hysteria" in meta and node.get("hy2_password"):
|
|
# 旧配置未设置 hy2 name 时,Clash API 可能不带 user
|
|
if user == node["hy2_password"]:
|
|
return True
|
|
return False
|
|
|
|
|
|
def _connection_id(conn: dict) -> str:
|
|
return str(conn.get("id") or "")
|
|
|
|
|
|
def _ensure_traffic_schema(conn: sqlite3.Connection) -> None:
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS traffic_counters (
|
|
node_id INTEGER PRIMARY KEY,
|
|
upload_total INTEGER NOT NULL DEFAULT 0,
|
|
download_total INTEGER NOT NULL DEFAULT 0,
|
|
snapshot_upload INTEGER NOT NULL DEFAULT 0,
|
|
snapshot_download INTEGER NOT NULL DEFAULT 0,
|
|
updated_at TEXT,
|
|
FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
|
|
);
|
|
"""
|
|
)
|
|
for row in conn.execute("SELECT id FROM nodes").fetchall():
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO traffic_counters (node_id) VALUES (?)",
|
|
(row["id"],),
|
|
)
|
|
|
|
|
|
def _add_closed_traffic(node_id: int, upload: int, download: int) -> None:
|
|
if upload <= 0 and download <= 0:
|
|
return
|
|
conn = connect()
|
|
_ensure_traffic_schema(conn)
|
|
conn.execute(
|
|
"""
|
|
UPDATE traffic_counters
|
|
SET upload_total = upload_total + ?,
|
|
download_total = download_total + ?,
|
|
updated_at = datetime('now')
|
|
WHERE node_id = ?
|
|
""",
|
|
(upload, download, node_id),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _get_stored_totals(node_id: int) -> tuple[int, int]:
|
|
conn = connect()
|
|
_ensure_traffic_schema(conn)
|
|
row = conn.execute(
|
|
"SELECT upload_total, download_total FROM traffic_counters WHERE node_id = ?",
|
|
(node_id,),
|
|
).fetchone()
|
|
conn.close()
|
|
if row is None:
|
|
return 0, 0
|
|
return int(row["upload_total"]), int(row["download_total"])
|
|
|
|
|
|
def _sync_connections(
|
|
connections: list[dict],
|
|
nodes: list[dict],
|
|
uuid_to_node: dict[str, int],
|
|
) -> dict[str, tuple[int, int]]:
|
|
"""同步连接缓存,断开连接时写入累计流量,返回各用户当前活跃会话流量。"""
|
|
seen: set[str] = set()
|
|
active: dict[str, tuple[int, int]] = {}
|
|
single_node = len(nodes) == 1
|
|
only_uuid = nodes[0]["uuid"] if single_node else ""
|
|
|
|
for conn in connections:
|
|
matched_uuid = ""
|
|
for node in nodes:
|
|
if _match_connection(conn, node):
|
|
matched_uuid = node["uuid"]
|
|
break
|
|
if not matched_uuid and single_node and connections:
|
|
matched_uuid = only_uuid
|
|
|
|
if matched_uuid not in uuid_to_node:
|
|
continue
|
|
|
|
cid = _connection_id(conn)
|
|
if not cid:
|
|
continue
|
|
|
|
upload = int(conn.get("upload") or 0)
|
|
download = int(conn.get("download") or 0)
|
|
seen.add(cid)
|
|
|
|
prev = _conn_cache.get(cid)
|
|
if prev and prev["uuid"] == matched_uuid:
|
|
prev_up = int(prev["upload"])
|
|
prev_down = int(prev["download"])
|
|
if upload < prev_up or download < prev_down:
|
|
_add_closed_traffic(uuid_to_node[matched_uuid], prev_up, prev_down)
|
|
_conn_cache[cid] = {
|
|
"uuid": matched_uuid,
|
|
"upload": upload,
|
|
"download": download,
|
|
}
|
|
|
|
cur_up, cur_down = active.get(matched_uuid, (0, 0))
|
|
active[matched_uuid] = (cur_up + upload, cur_down + download)
|
|
|
|
for cid in list(_conn_cache.keys()):
|
|
if cid in seen:
|
|
continue
|
|
info = _conn_cache.pop(cid)
|
|
user = str(info["uuid"])
|
|
node_id = uuid_to_node.get(user)
|
|
if node_id:
|
|
_add_closed_traffic(node_id, int(info["upload"]), int(info["download"]))
|
|
|
|
return active
|
|
|
|
|
|
def _calc_speed(node_id: int, up: int, down: int) -> tuple[float, float]:
|
|
now = time.time()
|
|
prev = _speed_cache.get(node_id)
|
|
_speed_cache[node_id] = (now, up, down)
|
|
if not prev:
|
|
return 0.0, 0.0
|
|
t0, u0, d0 = prev
|
|
dt = now - t0
|
|
if dt <= 0:
|
|
return 0.0, 0.0
|
|
return max(0.0, (up - u0) / dt), max(0.0, (down - d0) / dt)
|
|
|
|
|
|
def collect_node_stats() -> dict:
|
|
nodes = list_nodes()
|
|
uuid_to_node = {node["uuid"]: int(node["id"]) for node in nodes}
|
|
connections, clash_ok = fetch_clash_connections()
|
|
active_by_uuid = _sync_connections(connections, nodes, uuid_to_node)
|
|
single_node = len(nodes) == 1
|
|
has_connections = len(connections) > 0
|
|
global_up_speed, global_down_speed = _global_conn_speed(connections) if clash_ok else (0.0, 0.0)
|
|
global_active = (global_up_speed + global_down_speed) > 512
|
|
|
|
result_nodes: dict[str, dict] = {}
|
|
summary_online = 0
|
|
summary_up_speed = 0.0
|
|
summary_down_speed = 0.0
|
|
|
|
for node in nodes:
|
|
uid = node["uuid"]
|
|
node_id = int(node["id"])
|
|
stored_up, stored_down = _get_stored_totals(node_id)
|
|
session_up, session_down = active_by_uuid.get(uid, (0, 0))
|
|
display_up = stored_up + session_up
|
|
display_down = stored_down + session_down
|
|
up_speed, down_speed = _calc_speed(node_id, display_up, display_down)
|
|
|
|
matched = [c for c in connections if _match_connection(c, node)]
|
|
if not matched and single_node and has_connections:
|
|
matched = connections
|
|
if not matched and (session_up + session_down) > 0:
|
|
matched = [None] # 有活跃会话但 Clash 未返回连接详情
|
|
if not matched and single_node and global_active:
|
|
up_speed = global_up_speed
|
|
down_speed = global_down_speed
|
|
|
|
online = (
|
|
len(matched) > 0
|
|
or (session_up + session_down) > 0
|
|
or (up_speed + down_speed) > 512
|
|
or (single_node and (global_active or has_connections))
|
|
)
|
|
|
|
if online:
|
|
summary_online += 1
|
|
summary_up_speed += up_speed
|
|
summary_down_speed += down_speed
|
|
|
|
result_nodes[str(node_id)] = {
|
|
"online": online,
|
|
"connections": len(matched),
|
|
"upload_speed": round(up_speed),
|
|
"download_speed": round(down_speed),
|
|
"upload_total": display_up,
|
|
"download_total": display_down,
|
|
"upload_speed_human": format_speed(up_speed),
|
|
"download_speed_human": format_speed(down_speed),
|
|
"upload_total_human": format_bytes(display_up),
|
|
"download_total_human": format_bytes(display_down),
|
|
}
|
|
|
|
return {
|
|
"ok": True,
|
|
"singbox": clash_ok,
|
|
"nodes": result_nodes,
|
|
"summary": {
|
|
"online": summary_online,
|
|
"total_nodes": len(nodes),
|
|
"upload_speed": round(summary_up_speed),
|
|
"download_speed": round(summary_down_speed),
|
|
"upload_speed_human": format_speed(summary_up_speed),
|
|
"download_speed_human": format_speed(summary_down_speed),
|
|
},
|
|
}
|