# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 详见 LICENSE.zh-CN.txt """数据看板:本机服务器状态(IP、CPU、内存、硬盘)。""" from __future__ import annotations import os import shutil import socket import time from datetime import datetime from typing import Any, Optional from zoneinfo import ZoneInfo _TZ = ZoneInfo("Asia/Shanghai") _PUBLIC_IP_CACHE: dict[str, Any] = {"ip": None, "ts": 0.0} _PUBLIC_IP_TTL = 300.0 _NET_SAMPLE: dict[str, Any] = {"rx": 0, "tx": 0, "ts": 0.0} _CPU_SAMPLE: dict[str, Any] = {"total": 0, "idle": 0, "ts": 0.0} def _env_ip(name: str) -> Optional[str]: val = (os.getenv(name) or "").strip() return val or None def get_private_ip() -> Optional[str]: override = _env_ip("SERVER_PRIVATE_IP") if override: return override try: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: s.connect(("8.8.8.8", 80)) return s.getsockname()[0] except Exception: pass try: host = socket.gethostname() return socket.gethostbyname(host) except Exception: return None def get_public_ip() -> Optional[str]: override = _env_ip("SERVER_PUBLIC_IP") if override: return override now = time.time() cached = _PUBLIC_IP_CACHE.get("ip") if cached and (now - float(_PUBLIC_IP_CACHE.get("ts") or 0)) < _PUBLIC_IP_TTL: return str(cached) ip = None try: import requests resp = requests.get("https://api.ipify.org?format=text", timeout=4) if resp.ok: ip = (resp.text or "").strip() or None except Exception: pass if not ip: try: import requests resp = requests.get("https://ifconfig.me/ip", timeout=4) if resp.ok: ip = (resp.text or "").strip() or None except Exception: pass if ip: _PUBLIC_IP_CACHE["ip"] = ip _PUBLIC_IP_CACHE["ts"] = now return ip def _read_mem_linux() -> tuple[Optional[float], Optional[float], Optional[float]]: try: info: dict[str, int] = {} with open("/proc/meminfo", encoding="utf-8") as f: for line in f: parts = line.split() if len(parts) >= 2 and parts[1].isdigit(): info[parts[0].rstrip(":")] = int(parts[1]) total_kb = info.get("MemTotal") avail_kb = info.get("MemAvailable") or info.get("MemFree") if not total_kb or avail_kb is None: return None, None, None used_kb = max(0, total_kb - avail_kb) total_gb = total_kb / 1024 / 1024 used_gb = used_kb / 1024 / 1024 pct = round(used_kb / total_kb * 100, 1) if total_kb > 0 else None return pct, round(used_gb, 2), round(total_gb, 2) except Exception: return None, None, None def _read_cpu_percent() -> Optional[float]: try: with open("/proc/stat", encoding="utf-8") as f: line = f.readline() if not line.startswith("cpu "): return None parts = [int(x) for x in line.split()[1:]] if len(parts) < 4: return None idle = parts[3] + (parts[4] if len(parts) > 4 else 0) total = sum(parts) now = time.time() prev_total = int(_CPU_SAMPLE.get("total") or 0) prev_idle = int(_CPU_SAMPLE.get("idle") or 0) prev_ts = float(_CPU_SAMPLE.get("ts") or 0) _CPU_SAMPLE.update({"total": total, "idle": idle, "ts": now}) if prev_total <= 0 or now <= prev_ts: load1, _, _ = os.getloadavg() cores = os.cpu_count() or 1 return round(min(100.0, load1 / cores * 100), 1) dt_total = total - prev_total dt_idle = idle - prev_idle if dt_total <= 0: return None return round((1.0 - dt_idle / dt_total) * 100, 1) except Exception: try: load1, _, _ = os.getloadavg() cores = os.cpu_count() or 1 return round(min(100.0, load1 / cores * 100), 1) except Exception: return None def _read_disk(path: str) -> tuple[Optional[float], Optional[float], Optional[float]]: try: usage = shutil.disk_usage(path) total = usage.total used = usage.used if total <= 0: return None, None, None pct = round(used / total * 100, 1) return pct, round(used / 1024 ** 3, 2), round(total / 1024 ** 3, 2) except Exception: return None, None, None def _read_net_rates() -> tuple[Optional[float], Optional[float]]: try: rx = tx = 0 with open("/proc/net/dev", encoding="utf-8") as f: for line in f.readlines()[2:]: if ":" not in line: continue _, data = line.split(":", 1) cols = data.split() if len(cols) < 10: continue iface = line.split(":", 1)[0].strip() if iface == "lo": continue rx += int(cols[0]) tx += int(cols[8]) now = time.time() prev_rx = int(_NET_SAMPLE.get("rx") or 0) prev_tx = int(_NET_SAMPLE.get("tx") or 0) prev_ts = float(_NET_SAMPLE.get("ts") or 0) _NET_SAMPLE.update({"rx": rx, "tx": tx, "ts": now}) if prev_ts <= 0 or now <= prev_ts: return None, None dt = now - prev_ts down = max(0.0, (rx - prev_rx) / dt / 1024) up = max(0.0, (tx - prev_tx) / dt / 1024) return round(up, 1), round(down, 1) except Exception: return None, None def _format_uptime(seconds: float) -> str: sec = max(0, int(seconds)) days, rem = divmod(sec, 86400) hours, _ = divmod(rem, 3600) if days > 0: return f"{days}天{hours}时" if hours > 0: return f"{hours}时{rem % 3600 // 60}分" return f"{sec // 60}分" def build_server_status(*, disk_path: Optional[str] = None) -> dict[str, Any]: root = disk_path or os.getenv("SERVER_DISK_PATH") or "/" mem_pct, mem_used_gb, mem_total_gb = _read_mem_linux() disk_pct, disk_used_gb, disk_total_gb = _read_disk(root) net_up, net_down = _read_net_rates() uptime_sec = None try: with open("/proc/uptime", encoding="utf-8") as f: uptime_sec = float(f.read().split()[0]) except Exception: pass hostname = (os.getenv("SERVER_HOSTNAME") or "").strip() or socket.gethostname() return { "hostname": hostname, "public_ip": get_public_ip(), "private_ip": get_private_ip(), "cpu_pct": _read_cpu_percent(), "cpu_cores": os.cpu_count(), "memory_pct": mem_pct, "memory_used_gb": mem_used_gb, "memory_total_gb": mem_total_gb, "disk_pct": disk_pct, "disk_used_gb": disk_used_gb, "disk_total_gb": disk_total_gb, "net_up_kbps": net_up, "net_down_kbps": net_down, "uptime_label": _format_uptime(uptime_sec) if uptime_sec is not None else None, "updated_at": datetime.now(_TZ).strftime("%Y-%m-%d %H:%M:%S"), }