6edba863b5
Official sing-box binaries lack v2ray_api, so drop that config and grpcio and track per-node traffic from Clash connection stats instead. Co-authored-by: Cursor <cursoragent@cursor.com>
237 lines
7.4 KiB
Python
237 lines
7.4 KiB
Python
"""从 sing-box Clash API 采集节点连接与流量(官方预编译包不含 v2ray_api)。"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
import urllib.error
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
from db import connect, list_nodes
|
|
|
|
ROOT = Path(os.environ.get("JIEDIAN_ROOT", Path(__file__).resolve().parents[1]))
|
|
ENV_FILE = ROOT / ".env"
|
|
|
|
CLASH_ADDR = "127.0.0.1:9090"
|
|
|
|
_speed_cache: dict[int, tuple[float, int, int]] = {}
|
|
_conn_cache: dict[str, dict[str, int | str]] = {}
|
|
|
|
|
|
def _load_env() -> dict[str, str]:
|
|
env: dict[str, str] = {}
|
|
if not ENV_FILE.exists():
|
|
return env
|
|
for line in ENV_FILE.read_text(encoding="utf-8").splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("#") or "=" not in line:
|
|
continue
|
|
key, _, value = line.partition("=")
|
|
env[key.strip()] = value.strip()
|
|
return env
|
|
|
|
|
|
def format_bytes(num: int | float) -> str:
|
|
n = float(num)
|
|
for unit in ("B", "KB", "MB", "GB", "TB"):
|
|
if n < 1024 or unit == "TB":
|
|
if unit == "B":
|
|
return f"{int(n)} B"
|
|
return f"{n:.1f} {unit}"
|
|
n /= 1024
|
|
return f"{n:.1f} PB"
|
|
|
|
|
|
def format_speed(num: float) -> str:
|
|
return f"{format_bytes(num)}/s"
|
|
|
|
|
|
def fetch_clash_connections() -> tuple[list[dict], bool]:
|
|
env = _load_env()
|
|
secret = env.get("CLASH_API_SECRET", "")
|
|
url = f"http://{CLASH_ADDR}/connections"
|
|
req = urllib.request.Request(url)
|
|
if secret:
|
|
req.add_header("Authorization", f"Bearer {secret}")
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=3) as resp:
|
|
payload = json.loads(resp.read().decode("utf-8"))
|
|
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError):
|
|
return [], False
|
|
return payload.get("connections") or [], True
|
|
|
|
|
|
def _connection_user(conn: dict) -> str:
|
|
meta = conn.get("metadata") or {}
|
|
return str(meta.get("user") or meta.get("uid") or "")
|
|
|
|
|
|
def _connection_id(conn: dict) -> str:
|
|
return str(conn.get("id") or "")
|
|
|
|
|
|
def _ensure_traffic_schema(conn: sqlite3.Connection) -> None:
|
|
conn.executescript(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS traffic_counters (
|
|
node_id INTEGER PRIMARY KEY,
|
|
upload_total INTEGER NOT NULL DEFAULT 0,
|
|
download_total INTEGER NOT NULL DEFAULT 0,
|
|
snapshot_upload INTEGER NOT NULL DEFAULT 0,
|
|
snapshot_download INTEGER NOT NULL DEFAULT 0,
|
|
updated_at TEXT,
|
|
FOREIGN KEY (node_id) REFERENCES nodes(id) ON DELETE CASCADE
|
|
);
|
|
"""
|
|
)
|
|
for row in conn.execute("SELECT id FROM nodes").fetchall():
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO traffic_counters (node_id) VALUES (?)",
|
|
(row["id"],),
|
|
)
|
|
|
|
|
|
def _add_closed_traffic(node_id: int, upload: int, download: int) -> None:
|
|
if upload <= 0 and download <= 0:
|
|
return
|
|
conn = connect()
|
|
_ensure_traffic_schema(conn)
|
|
conn.execute(
|
|
"""
|
|
UPDATE traffic_counters
|
|
SET upload_total = upload_total + ?,
|
|
download_total = download_total + ?,
|
|
updated_at = datetime('now')
|
|
WHERE node_id = ?
|
|
""",
|
|
(upload, download, node_id),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
def _get_stored_totals(node_id: int) -> tuple[int, int]:
|
|
conn = connect()
|
|
_ensure_traffic_schema(conn)
|
|
row = conn.execute(
|
|
"SELECT upload_total, download_total FROM traffic_counters WHERE node_id = ?",
|
|
(node_id,),
|
|
).fetchone()
|
|
conn.close()
|
|
if row is None:
|
|
return 0, 0
|
|
return int(row["upload_total"]), int(row["download_total"])
|
|
|
|
|
|
def _sync_connections(
|
|
connections: list[dict], uuid_to_node: dict[str, int]
|
|
) -> dict[str, tuple[int, int]]:
|
|
"""同步连接缓存,断开连接时写入累计流量,返回各用户当前活跃会话流量。"""
|
|
seen: set[str] = set()
|
|
active: dict[str, tuple[int, int]] = {}
|
|
|
|
for conn in connections:
|
|
user = _connection_user(conn)
|
|
if user not in uuid_to_node:
|
|
continue
|
|
cid = _connection_id(conn)
|
|
if not cid:
|
|
continue
|
|
|
|
upload = int(conn.get("upload") or 0)
|
|
download = int(conn.get("download") or 0)
|
|
seen.add(cid)
|
|
|
|
prev = _conn_cache.get(cid)
|
|
if prev and prev["uuid"] == user:
|
|
prev_up = int(prev["upload"])
|
|
prev_down = int(prev["download"])
|
|
if upload < prev_up or download < prev_down:
|
|
_add_closed_traffic(uuid_to_node[user], prev_up, prev_down)
|
|
_conn_cache[cid] = {"uuid": user, "upload": upload, "download": download}
|
|
|
|
cur_up, cur_down = active.get(user, (0, 0))
|
|
active[user] = (cur_up + upload, cur_down + download)
|
|
|
|
for cid in list(_conn_cache.keys()):
|
|
if cid in seen:
|
|
continue
|
|
info = _conn_cache.pop(cid)
|
|
user = str(info["uuid"])
|
|
node_id = uuid_to_node.get(user)
|
|
if node_id:
|
|
_add_closed_traffic(node_id, int(info["upload"]), int(info["download"]))
|
|
|
|
return active
|
|
|
|
|
|
def _calc_speed(node_id: int, up: int, down: int) -> tuple[float, float]:
|
|
now = time.time()
|
|
prev = _speed_cache.get(node_id)
|
|
_speed_cache[node_id] = (now, up, down)
|
|
if not prev:
|
|
return 0.0, 0.0
|
|
t0, u0, d0 = prev
|
|
dt = now - t0
|
|
if dt <= 0:
|
|
return 0.0, 0.0
|
|
return max(0.0, (up - u0) / dt), max(0.0, (down - d0) / dt)
|
|
|
|
|
|
def collect_node_stats() -> dict:
|
|
nodes = list_nodes()
|
|
uuid_to_node = {node["uuid"]: int(node["id"]) for node in nodes}
|
|
connections, clash_ok = fetch_clash_connections()
|
|
active_by_uuid = _sync_connections(connections, uuid_to_node)
|
|
|
|
result_nodes: dict[str, dict] = {}
|
|
summary_online = 0
|
|
summary_up_speed = 0.0
|
|
summary_down_speed = 0.0
|
|
|
|
for node in nodes:
|
|
uid = node["uuid"]
|
|
node_id = int(node["id"])
|
|
stored_up, stored_down = _get_stored_totals(node_id)
|
|
session_up, session_down = active_by_uuid.get(uid, (0, 0))
|
|
display_up = stored_up + session_up
|
|
display_down = stored_down + session_down
|
|
up_speed, down_speed = _calc_speed(node_id, display_up, display_down)
|
|
|
|
matched = [c for c in connections if _connection_user(c) == uid]
|
|
online = len(matched) > 0 or (up_speed + down_speed) > 512
|
|
|
|
if online:
|
|
summary_online += 1
|
|
summary_up_speed += up_speed
|
|
summary_down_speed += down_speed
|
|
|
|
result_nodes[str(node_id)] = {
|
|
"online": online,
|
|
"connections": len(matched),
|
|
"upload_speed": round(up_speed),
|
|
"download_speed": round(down_speed),
|
|
"upload_total": display_up,
|
|
"download_total": display_down,
|
|
"upload_speed_human": format_speed(up_speed),
|
|
"download_speed_human": format_speed(down_speed),
|
|
"upload_total_human": format_bytes(display_up),
|
|
"download_total_human": format_bytes(display_down),
|
|
}
|
|
|
|
return {
|
|
"ok": True,
|
|
"singbox": clash_ok,
|
|
"nodes": result_nodes,
|
|
"summary": {
|
|
"online": summary_online,
|
|
"total_nodes": len(nodes),
|
|
"upload_speed": round(summary_up_speed),
|
|
"download_speed": round(summary_down_speed),
|
|
"upload_speed_human": format_speed(summary_up_speed),
|
|
"download_speed_human": format_speed(summary_down_speed),
|
|
},
|
|
}
|