Files
qihuo/modules/stats/server_status_lib.py
T

219 lines
7.1 KiB
Python

# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 详见 LICENSE.zh-CN.txt
"""数据看板:本机服务器状态(IP、CPU、内存、硬盘)。"""
from __future__ import annotations
import os
import shutil
import socket
import time
from datetime import datetime
from typing import Any, Optional
from zoneinfo import ZoneInfo
_TZ = ZoneInfo("Asia/Shanghai")
_PUBLIC_IP_CACHE: dict[str, Any] = {"ip": None, "ts": 0.0}
_PUBLIC_IP_TTL = 300.0
_NET_SAMPLE: dict[str, Any] = {"rx": 0, "tx": 0, "ts": 0.0}
_CPU_SAMPLE: dict[str, Any] = {"total": 0, "idle": 0, "ts": 0.0}
def _env_ip(name: str) -> Optional[str]:
val = (os.getenv(name) or "").strip()
return val or None
def get_private_ip() -> Optional[str]:
override = _env_ip("SERVER_PRIVATE_IP")
if override:
return override
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
pass
try:
host = socket.gethostname()
return socket.gethostbyname(host)
except Exception:
return None
def get_public_ip() -> Optional[str]:
override = _env_ip("SERVER_PUBLIC_IP")
if override:
return override
now = time.time()
cached = _PUBLIC_IP_CACHE.get("ip")
if cached and (now - float(_PUBLIC_IP_CACHE.get("ts") or 0)) < _PUBLIC_IP_TTL:
return str(cached)
ip = None
try:
import requests
resp = requests.get("https://api.ipify.org?format=text", timeout=4)
if resp.ok:
ip = (resp.text or "").strip() or None
except Exception:
pass
if not ip:
try:
import requests
resp = requests.get("https://ifconfig.me/ip", timeout=4)
if resp.ok:
ip = (resp.text or "").strip() or None
except Exception:
pass
if ip:
_PUBLIC_IP_CACHE["ip"] = ip
_PUBLIC_IP_CACHE["ts"] = now
return ip
def _read_mem_linux() -> tuple[Optional[float], Optional[float], Optional[float]]:
try:
info: dict[str, int] = {}
with open("/proc/meminfo", encoding="utf-8") as f:
for line in f:
parts = line.split()
if len(parts) >= 2 and parts[1].isdigit():
info[parts[0].rstrip(":")] = int(parts[1])
total_kb = info.get("MemTotal")
avail_kb = info.get("MemAvailable") or info.get("MemFree")
if not total_kb or avail_kb is None:
return None, None, None
used_kb = max(0, total_kb - avail_kb)
total_gb = total_kb / 1024 / 1024
used_gb = used_kb / 1024 / 1024
pct = round(used_kb / total_kb * 100, 1) if total_kb > 0 else None
return pct, round(used_gb, 2), round(total_gb, 2)
except Exception:
return None, None, None
def _read_cpu_percent() -> Optional[float]:
try:
with open("/proc/stat", encoding="utf-8") as f:
line = f.readline()
if not line.startswith("cpu "):
return None
parts = [int(x) for x in line.split()[1:]]
if len(parts) < 4:
return None
idle = parts[3] + (parts[4] if len(parts) > 4 else 0)
total = sum(parts)
now = time.time()
prev_total = int(_CPU_SAMPLE.get("total") or 0)
prev_idle = int(_CPU_SAMPLE.get("idle") or 0)
prev_ts = float(_CPU_SAMPLE.get("ts") or 0)
_CPU_SAMPLE.update({"total": total, "idle": idle, "ts": now})
if prev_total <= 0 or now <= prev_ts:
load1, _, _ = os.getloadavg()
cores = os.cpu_count() or 1
return round(min(100.0, load1 / cores * 100), 1)
dt_total = total - prev_total
dt_idle = idle - prev_idle
if dt_total <= 0:
return None
return round((1.0 - dt_idle / dt_total) * 100, 1)
except Exception:
try:
load1, _, _ = os.getloadavg()
cores = os.cpu_count() or 1
return round(min(100.0, load1 / cores * 100), 1)
except Exception:
return None
def _read_disk(path: str) -> tuple[Optional[float], Optional[float], Optional[float]]:
try:
usage = shutil.disk_usage(path)
total = usage.total
used = usage.used
if total <= 0:
return None, None, None
pct = round(used / total * 100, 1)
return pct, round(used / 1024 ** 3, 2), round(total / 1024 ** 3, 2)
except Exception:
return None, None, None
def _read_net_rates() -> tuple[Optional[float], Optional[float]]:
try:
rx = tx = 0
with open("/proc/net/dev", encoding="utf-8") as f:
for line in f.readlines()[2:]:
if ":" not in line:
continue
_, data = line.split(":", 1)
cols = data.split()
if len(cols) < 10:
continue
iface = line.split(":", 1)[0].strip()
if iface == "lo":
continue
rx += int(cols[0])
tx += int(cols[8])
now = time.time()
prev_rx = int(_NET_SAMPLE.get("rx") or 0)
prev_tx = int(_NET_SAMPLE.get("tx") or 0)
prev_ts = float(_NET_SAMPLE.get("ts") or 0)
_NET_SAMPLE.update({"rx": rx, "tx": tx, "ts": now})
if prev_ts <= 0 or now <= prev_ts:
return None, None
dt = now - prev_ts
down = max(0.0, (rx - prev_rx) / dt / 1024)
up = max(0.0, (tx - prev_tx) / dt / 1024)
return round(up, 1), round(down, 1)
except Exception:
return None, None
def _format_uptime(seconds: float) -> str:
sec = max(0, int(seconds))
days, rem = divmod(sec, 86400)
hours, _ = divmod(rem, 3600)
if days > 0:
return f"{days}{hours}"
if hours > 0:
return f"{hours}{rem % 3600 // 60}"
return f"{sec // 60}"
def build_server_status(*, disk_path: Optional[str] = None) -> dict[str, Any]:
root = disk_path or os.getenv("SERVER_DISK_PATH") or "/"
mem_pct, mem_used_gb, mem_total_gb = _read_mem_linux()
disk_pct, disk_used_gb, disk_total_gb = _read_disk(root)
net_up, net_down = _read_net_rates()
uptime_sec = None
try:
with open("/proc/uptime", encoding="utf-8") as f:
uptime_sec = float(f.read().split()[0])
except Exception:
pass
hostname = (os.getenv("SERVER_HOSTNAME") or "").strip() or socket.gethostname()
return {
"hostname": hostname,
"public_ip": get_public_ip(),
"private_ip": get_private_ip(),
"cpu_pct": _read_cpu_percent(),
"cpu_cores": os.cpu_count(),
"memory_pct": mem_pct,
"memory_used_gb": mem_used_gb,
"memory_total_gb": mem_total_gb,
"disk_pct": disk_pct,
"disk_used_gb": disk_used_gb,
"disk_total_gb": disk_total_gb,
"net_up_kbps": net_up,
"net_down_kbps": net_down,
"uptime_label": _format_uptime(uptime_sec) if uptime_sec is not None else None,
"updated_at": datetime.now(_TZ).strftime("%Y-%m-%d %H:%M:%S"),
}