Files
crypto_monitor/manual_trading_hub/hub_board_cache.py
T
2026-06-03 22:17:23 +08:00

165 lines
5.8 KiB
Python

"""监控区 board:后台定时聚合、内存快照、SSE 版本通知。"""
from __future__ import annotations
import asyncio
import json
import os
from collections.abc import AsyncIterator, Awaitable, Callable
from typing import Any
HUB_BOARD_POLL_INTERVAL = float(os.getenv("HUB_BOARD_POLL_INTERVAL", "5"))
HUB_BOARD_SSE_HEARTBEAT_SEC = float(os.getenv("HUB_BOARD_SSE_HEARTBEAT_SEC", "25"))
BuildFn = Callable[[], Awaitable[dict[str, Any]]]
class MonitorBoardStore:
def __init__(self) -> None:
self._lock = asyncio.Lock()
self.version = 0
self.payload: dict[str, Any] | None = None
self.aggregating = False
self.last_error: str | None = None
self._subscribers: list[asyncio.Queue[str | None]] = []
self._task: asyncio.Task | None = None
self._stop = asyncio.Event()
self._refresh = asyncio.Event()
self._build_fn: BuildFn | None = None
async def start(self, build_fn: BuildFn) -> None:
if self._task and not self._task.done():
return
self._build_fn = build_fn
self._stop.clear()
self._task = asyncio.create_task(self._loop(), name="hub-board-poll")
async def stop(self) -> None:
self._stop.set()
self._refresh.set()
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
self._broadcast(close=True)
def request_refresh(self) -> None:
self._refresh.set()
def snapshot_dict(self) -> dict[str, Any]:
p = self.payload or {}
rows = p.get("rows")
if not isinstance(rows, list):
rows = []
return {
"ok": p.get("ok", True) if self.payload else False,
"board_version": self.version,
"rows": rows,
"updated_at": p.get("updated_at"),
"aggregating": self.aggregating,
"error": self.last_error or p.get("error"),
"msg": p.get("msg"),
"poll_interval_sec": HUB_BOARD_POLL_INTERVAL,
}
def event_dict(self) -> dict[str, Any]:
p = self.payload or {}
return {
"board_version": self.version,
"updated_at": p.get("updated_at"),
"aggregating": self.aggregating,
"ok": p.get("ok", True) if self.payload else False,
"error": self.last_error or p.get("error"),
}
async def _loop(self) -> None:
assert self._build_fn is not None
while not self._stop.is_set():
await self._aggregate_once(self._build_fn)
if self._stop.is_set():
break
self._refresh.clear()
sleep_task = asyncio.create_task(asyncio.sleep(HUB_BOARD_POLL_INTERVAL))
refresh_task = asyncio.create_task(self._refresh.wait())
done, pending = await asyncio.wait(
{sleep_task, refresh_task},
return_when=asyncio.FIRST_COMPLETED,
)
for t in pending:
t.cancel()
async def _aggregate_once(self, build_fn: BuildFn) -> None:
async with self._lock:
self.aggregating = True
self._broadcast()
try:
result = await build_fn()
if not isinstance(result, dict):
result = {"ok": False, "msg": "聚合返回无效", "rows": []}
except Exception as e:
result = {"ok": False, "msg": str(e), "rows": [], "error": "aggregate_failed"}
async with self._lock:
self.version += 1
prev_rows = (self.payload or {}).get("rows") if isinstance(self.payload, dict) else None
if result.get("ok") is False and isinstance(prev_rows, list) and prev_rows:
result = {**result, "rows": prev_rows}
self.payload = result
self.last_error = None if result.get("ok") is not False else (
str(result.get("msg") or result.get("error") or "aggregate_failed")
)
self.aggregating = False
self._broadcast()
def _broadcast(self, *, close: bool = False) -> None:
dead: list[asyncio.Queue[str | None]] = []
for q in self._subscribers:
try:
q.put_nowait(None if close else json.dumps(self.event_dict(), ensure_ascii=False))
except asyncio.QueueFull:
try:
q.get_nowait()
except asyncio.QueueEmpty:
pass
try:
q.put_nowait(json.dumps(self.event_dict(), ensure_ascii=False))
except asyncio.QueueFull:
dead.append(q)
except Exception:
dead.append(q)
for q in dead:
if q in self._subscribers:
self._subscribers.remove(q)
async def iter_sse(self) -> AsyncIterator[str]:
q: asyncio.Queue[str | None] = asyncio.Queue(maxsize=32)
self._subscribers.append(q)
try:
yield _sse_frame(self.event_dict())
while True:
try:
raw = await asyncio.wait_for(q.get(), timeout=HUB_BOARD_SSE_HEARTBEAT_SEC)
except asyncio.TimeoutError:
yield ": heartbeat\n\n"
continue
if raw is None:
break
try:
data = json.loads(raw)
except Exception:
data = self.event_dict()
yield _sse_frame(data)
finally:
if q in self._subscribers:
self._subscribers.remove(q)
def _sse_frame(data: dict[str, Any]) -> str:
body = json.dumps(data, ensure_ascii=False)
return f"event: board\ndata: {body}\n\n"
board_store = MonitorBoardStore()