feat(hub): dashboard SSE push, light-theme cards, simplify AI coach
Replace dashboard polling with backend SSE and snapshot refresh. Restyle for light/dark theme with soft card glow instead of neon. Remove Today's Summary from AI page; keep trading and general chat only. Update hub documentation. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -0,0 +1,169 @@
|
||||
"""数据看板:后台定时聚合、内存快照、SSE 版本通知。"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from collections.abc import AsyncIterator, Awaitable, Callable
|
||||
from typing import Any
|
||||
|
||||
from hub_dashboard import DASHBOARD_POLL_INTERVAL_SEC
|
||||
|
||||
HUB_DASHBOARD_SSE_HEARTBEAT_SEC = float(os.getenv("HUB_DASHBOARD_SSE_HEARTBEAT_SEC", "25"))
|
||||
|
||||
BuildFn = Callable[[], Awaitable[dict[str, Any]]]
|
||||
|
||||
|
||||
class DashboardStore:
|
||||
def __init__(self) -> None:
|
||||
self._lock = asyncio.Lock()
|
||||
self.version = 0
|
||||
self.payload: dict[str, Any] | None = None
|
||||
self.aggregating = False
|
||||
self.last_error: str | None = None
|
||||
self._subscribers: list[asyncio.Queue[str | None]] = []
|
||||
self._task: asyncio.Task | None = None
|
||||
self._stop = asyncio.Event()
|
||||
self._refresh = asyncio.Event()
|
||||
self._build_fn: BuildFn | None = None
|
||||
|
||||
async def start(self, build_fn: BuildFn) -> None:
|
||||
if self._task and not self._task.done():
|
||||
return
|
||||
self._build_fn = build_fn
|
||||
self._stop.clear()
|
||||
self._task = asyncio.create_task(self._loop(), name="hub-dashboard-poll")
|
||||
|
||||
async def stop(self) -> None:
|
||||
self._stop.set()
|
||||
self._refresh.set()
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
self._task = None
|
||||
self._broadcast(close=True)
|
||||
|
||||
def request_refresh(self) -> None:
|
||||
self._refresh.set()
|
||||
|
||||
def snapshot_dict(self) -> dict[str, Any]:
|
||||
p = dict(self.payload or {})
|
||||
if not p:
|
||||
return {
|
||||
"ok": False,
|
||||
"dashboard_version": self.version,
|
||||
"aggregating": self.aggregating,
|
||||
"error": self.last_error,
|
||||
"poll_interval_sec": DASHBOARD_POLL_INTERVAL_SEC,
|
||||
}
|
||||
return {
|
||||
**p,
|
||||
"dashboard_version": self.version,
|
||||
"aggregating": self.aggregating,
|
||||
"error": self.last_error or p.get("error"),
|
||||
"poll_interval_sec": DASHBOARD_POLL_INTERVAL_SEC,
|
||||
}
|
||||
|
||||
def event_dict(self) -> dict[str, Any]:
|
||||
p = self.payload or {}
|
||||
return {
|
||||
"dashboard_version": self.version,
|
||||
"updated_at": p.get("updated_at"),
|
||||
"aggregating": self.aggregating,
|
||||
"ok": p.get("ok", True) if self.payload else False,
|
||||
"error": self.last_error or p.get("error"),
|
||||
}
|
||||
|
||||
async def _loop(self) -> None:
|
||||
assert self._build_fn is not None
|
||||
while not self._stop.is_set():
|
||||
await self._aggregate_once(self._build_fn)
|
||||
if self._stop.is_set():
|
||||
break
|
||||
self._refresh.clear()
|
||||
sleep_task = asyncio.create_task(asyncio.sleep(DASHBOARD_POLL_INTERVAL_SEC))
|
||||
refresh_task = asyncio.create_task(self._refresh.wait())
|
||||
done, pending = await asyncio.wait(
|
||||
{sleep_task, refresh_task},
|
||||
return_when=asyncio.FIRST_COMPLETED,
|
||||
)
|
||||
for t in pending:
|
||||
t.cancel()
|
||||
|
||||
async def _aggregate_once(self, build_fn: BuildFn) -> None:
|
||||
async with self._lock:
|
||||
self.aggregating = True
|
||||
self._broadcast()
|
||||
try:
|
||||
result = await build_fn()
|
||||
if not isinstance(result, dict):
|
||||
result = {"ok": False, "msg": "聚合返回无效"}
|
||||
except Exception as e:
|
||||
result = {"ok": False, "msg": str(e), "error": "aggregate_failed"}
|
||||
async with self._lock:
|
||||
self.version += 1
|
||||
prev = self.payload if isinstance(self.payload, dict) else None
|
||||
if result.get("ok") is False and prev and prev.get("ok"):
|
||||
self.payload = prev
|
||||
self.last_error = str(result.get("msg") or result.get("error") or "aggregate_failed")
|
||||
else:
|
||||
self.payload = result
|
||||
self.last_error = None if result.get("ok") is not False else (
|
||||
str(result.get("msg") or result.get("error") or "aggregate_failed")
|
||||
)
|
||||
self.aggregating = False
|
||||
self._broadcast()
|
||||
|
||||
def _broadcast(self, *, close: bool = False) -> None:
|
||||
dead: list[asyncio.Queue[str | None]] = []
|
||||
for q in self._subscribers:
|
||||
try:
|
||||
q.put_nowait(None if close else json.dumps(self.event_dict(), ensure_ascii=False))
|
||||
except asyncio.QueueFull:
|
||||
try:
|
||||
q.get_nowait()
|
||||
except asyncio.QueueEmpty:
|
||||
pass
|
||||
try:
|
||||
q.put_nowait(json.dumps(self.event_dict(), ensure_ascii=False))
|
||||
except asyncio.QueueFull:
|
||||
dead.append(q)
|
||||
except Exception:
|
||||
dead.append(q)
|
||||
for q in dead:
|
||||
if q in self._subscribers:
|
||||
self._subscribers.remove(q)
|
||||
|
||||
async def iter_sse(self) -> AsyncIterator[str]:
|
||||
q: asyncio.Queue[str | None] = asyncio.Queue(maxsize=32)
|
||||
self._subscribers.append(q)
|
||||
try:
|
||||
yield _sse_frame(self.event_dict())
|
||||
while True:
|
||||
try:
|
||||
raw = await asyncio.wait_for(q.get(), timeout=HUB_DASHBOARD_SSE_HEARTBEAT_SEC)
|
||||
except asyncio.TimeoutError:
|
||||
yield ": heartbeat\n\n"
|
||||
continue
|
||||
if raw is None:
|
||||
break
|
||||
try:
|
||||
data = json.loads(raw)
|
||||
except Exception:
|
||||
data = self.event_dict()
|
||||
yield _sse_frame(data)
|
||||
finally:
|
||||
if q in self._subscribers:
|
||||
self._subscribers.remove(q)
|
||||
|
||||
|
||||
def _sse_frame(data: dict[str, Any]) -> str:
|
||||
body = json.dumps(data, ensure_ascii=False)
|
||||
return f"event: dashboard\ndata: {body}\n\n"
|
||||
|
||||
|
||||
dashboard_store = DashboardStore()
|
||||
Reference in New Issue
Block a user