"""数据看板:后台定时聚合、内存快照、SSE 版本通知。""" from __future__ import annotations import asyncio import json import os from collections.abc import AsyncIterator, Awaitable, Callable from typing import Any from hub_dashboard import DASHBOARD_POLL_INTERVAL_SEC HUB_DASHBOARD_SSE_HEARTBEAT_SEC = float(os.getenv("HUB_DASHBOARD_SSE_HEARTBEAT_SEC", "25")) BuildFn = Callable[[], Awaitable[dict[str, Any]]] class DashboardStore: def __init__(self) -> None: self._lock = asyncio.Lock() self.version = 0 self.payload: dict[str, Any] | None = None self.aggregating = False self.last_error: str | None = None self._subscribers: list[asyncio.Queue[str | None]] = [] self._task: asyncio.Task | None = None self._stop = asyncio.Event() self._refresh = asyncio.Event() self._build_fn: BuildFn | None = None async def start(self, build_fn: BuildFn) -> None: if self._task and not self._task.done(): return self._build_fn = build_fn self._stop.clear() self._task = asyncio.create_task(self._loop(), name="hub-dashboard-poll") async def stop(self) -> None: self._stop.set() self._refresh.set() if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None self._broadcast(close=True) def request_refresh(self) -> None: self._refresh.set() def snapshot_dict(self) -> dict[str, Any]: p = dict(self.payload or {}) if not p: return { "ok": False, "dashboard_version": self.version, "aggregating": self.aggregating, "error": self.last_error, "poll_interval_sec": DASHBOARD_POLL_INTERVAL_SEC, } return { **p, "dashboard_version": self.version, "aggregating": self.aggregating, "error": self.last_error or p.get("error"), "poll_interval_sec": DASHBOARD_POLL_INTERVAL_SEC, } def event_dict(self) -> dict[str, Any]: p = self.payload or {} return { "dashboard_version": self.version, "updated_at": p.get("updated_at"), "aggregating": self.aggregating, "ok": p.get("ok", True) if self.payload else False, "error": self.last_error or p.get("error"), } async def _loop(self) -> None: assert self._build_fn is not None while not self._stop.is_set(): await self._aggregate_once(self._build_fn) if self._stop.is_set(): break self._refresh.clear() sleep_task = asyncio.create_task(asyncio.sleep(DASHBOARD_POLL_INTERVAL_SEC)) refresh_task = asyncio.create_task(self._refresh.wait()) done, pending = await asyncio.wait( {sleep_task, refresh_task}, return_when=asyncio.FIRST_COMPLETED, ) for t in pending: t.cancel() async def _aggregate_once(self, build_fn: BuildFn) -> None: async with self._lock: self.aggregating = True self._broadcast() try: result = await build_fn() if not isinstance(result, dict): result = {"ok": False, "msg": "聚合返回无效"} except Exception as e: result = {"ok": False, "msg": str(e), "error": "aggregate_failed"} async with self._lock: self.version += 1 prev = self.payload if isinstance(self.payload, dict) else None if result.get("ok") is False and prev and prev.get("ok"): self.payload = prev self.last_error = str(result.get("msg") or result.get("error") or "aggregate_failed") else: self.payload = result self.last_error = None if result.get("ok") is not False else ( str(result.get("msg") or result.get("error") or "aggregate_failed") ) self.aggregating = False self._broadcast() def _broadcast(self, *, close: bool = False) -> None: dead: list[asyncio.Queue[str | None]] = [] for q in self._subscribers: try: q.put_nowait(None if close else json.dumps(self.event_dict(), ensure_ascii=False)) except asyncio.QueueFull: try: q.get_nowait() except asyncio.QueueEmpty: pass try: q.put_nowait(json.dumps(self.event_dict(), ensure_ascii=False)) except asyncio.QueueFull: dead.append(q) except Exception: dead.append(q) for q in dead: if q in self._subscribers: self._subscribers.remove(q) async def iter_sse(self) -> AsyncIterator[str]: q: asyncio.Queue[str | None] = asyncio.Queue(maxsize=32) self._subscribers.append(q) try: yield _sse_frame(self.event_dict()) while True: try: raw = await asyncio.wait_for(q.get(), timeout=HUB_DASHBOARD_SSE_HEARTBEAT_SEC) except asyncio.TimeoutError: yield ": heartbeat\n\n" continue if raw is None: break try: data = json.loads(raw) except Exception: data = self.event_dict() yield _sse_frame(data) finally: if q in self._subscribers: self._subscribers.remove(q) def _sse_frame(data: dict[str, Any]) -> str: body = json.dumps(data, ensure_ascii=False) return f"event: dashboard\ndata: {body}\n\n" dashboard_store = DashboardStore()