diff --git a/manual_trading_hub/.env.example b/manual_trading_hub/.env.example index 8b4095f..3eb9076 100644 --- a/manual_trading_hub/.env.example +++ b/manual_trading_hub/.env.example @@ -56,7 +56,9 @@ HUB_TRUST_LAN=true # 四实例网页登录(直链反代/IP:端口 访问时输入;中控点「打开实例」免输) # 各 crypto_monitor_*/.env 统一:APP_USERNAME=... APP_PASSWORD=... -# 监控区 /api/monitor/board 聚合超时(秒,默认 agent 8 / flask 10 / board 45) +# 监控区:hub 后台每 N 秒聚合一次,浏览器经 SSE 收版本号再拉快照(默认 5 秒) +# HUB_BOARD_POLL_INTERVAL=5 +# 单次聚合超时(秒,默认 agent 8 / flask 10 / board 45) # HUB_AGENT_TIMEOUT=8 # HUB_FLASK_TIMEOUT=10 # HUB_BOARD_TIMEOUT=45 diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index 26559ef..63f7df1 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -7,6 +7,7 @@ from __future__ import annotations import asyncio import os import sys +from contextlib import asynccontextmanager from pathlib import Path _REPO_ROOT = Path(__file__).resolve().parent.parent @@ -51,6 +52,8 @@ from hub_sso import HUB_SSO_TTL_SEC, mint_hub_sso_token, safe_next_path from url_public import browser_url, default_review_url, public_origin from urllib.parse import urlencode +from hub_board_cache import HUB_BOARD_POLL_INTERVAL, board_store + try: from exchange_orders import symbols_match as _symbols_match except ImportError: @@ -69,7 +72,7 @@ _allow_pub_raw = (os.getenv("HUB_ALLOW_PUBLIC") or "").strip().lower() # 云服务器 + 域名反代时设为 true:不做 IP 限制,仅靠 HUB_PASSWORD / 登录页保护 HUB_ALLOW_PUBLIC = _allow_pub_raw in ("1", "true", "yes", "on") DIR = Path(__file__).resolve().parent -HUB_BUILD = "20260528-hub-market" +HUB_BUILD = "20260603-hub-board-sse" HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8")) HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10")) HUB_BOARD_TIMEOUT = float(os.getenv("HUB_BOARD_TIMEOUT", "45")) @@ -133,7 +136,37 @@ def _find_exchange(ex_id: str) -> dict | None: return None -app = FastAPI(title="复盘系统中控", docs_url=None, redoc_url=None) +async def _run_board_aggregate() -> dict: + try: + body = await asyncio.wait_for(_build_monitor_board_payload(), timeout=HUB_BOARD_TIMEOUT) + return {"ok": True, **body} + except asyncio.TimeoutError: + return { + "ok": False, + "rows": [], + "error": "board_timeout", + "msg": ( + f"监控聚合超过 {int(HUB_BOARD_TIMEOUT)} 秒。" + "请检查子代理/Flask,或设 HUB_BOARD_KEY_PRICES=false、缩短 HUB_FLASK_TIMEOUT" + ), + "updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"), + } + + +def _schedule_board_refresh() -> None: + board_store.request_refresh() + + +@asynccontextmanager +async def _hub_lifespan(_app: FastAPI): + await board_store.start(_run_board_aggregate) + try: + yield + finally: + await board_store.stop() + + +app = FastAPI(title="复盘系统中控", docs_url=None, redoc_url=None, lifespan=_hub_lifespan) STATIC_DIR = DIR / "static" if STATIC_DIR.is_dir(): app.mount("/assets", StaticFiles(directory=str(STATIC_DIR)), name="assets") @@ -768,25 +801,31 @@ async def _build_monitor_board_payload() -> dict: @app.get("/api/monitor/board") -async def api_monitor_board(): - try: - return await asyncio.wait_for(_build_monitor_board_payload(), timeout=HUB_BOARD_TIMEOUT) - except asyncio.TimeoutError: - return JSONResponse( - { - "ok": False, - "rows": [], - "error": "board_timeout", - "msg": ( - f"监控聚合超过 {int(HUB_BOARD_TIMEOUT)} 秒。" - "请检查子代理/Flask,或设 HUB_BOARD_KEY_PRICES=false、缩短 HUB_FLASK_TIMEOUT" - ), - "updated_at": __import__("datetime").datetime.now().isoformat( - timespec="seconds" - ), - }, - status_code=504, - ) +@app.get("/api/monitor/board/snapshot") +async def api_monitor_board_snapshot(): + """读后台缓存快照;完整聚合由 hub 每 HUB_BOARD_POLL_INTERVAL 秒执行。""" + return board_store.snapshot_dict() + + +@app.get("/api/monitor/board/stream") +async def api_monitor_board_stream(): + from fastapi.responses import StreamingResponse + + return StreamingResponse( + board_store.iter_sse(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +@app.post("/api/monitor/board/refresh") +async def api_monitor_board_refresh(): + _schedule_board_refresh() + return {"ok": True, "board_version": board_store.version} def _require_hub_logged_in(request: Request) -> None: @@ -877,12 +916,14 @@ async def api_cancel_order(exchange_id: str, body: CancelOrderBody): payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} - return { + out = { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } + _schedule_board_refresh() + return out @app.post("/api/orders/{exchange_id}/cancel-symbol") @@ -902,12 +943,14 @@ async def api_cancel_symbol_orders(exchange_id: str, body: CancelSymbolOrdersBod payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} - return { + out = { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } + _schedule_board_refresh() + return out @app.post("/api/close/{exchange_id}/position") @@ -933,12 +976,14 @@ async def api_close_position(exchange_id: str, body: ClosePositionBody): payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} - return { + out = { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } + _schedule_board_refresh() + return out @app.post("/api/orders/{exchange_id}/place-tpsl") @@ -964,12 +1009,14 @@ async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody): payload = r.json() except Exception: payload = {"raw": (r.text or "")[:2000]} - return { + out = { "exchange": ex, "status_code": r.status_code, "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } + _schedule_board_refresh() + return out @app.post("/api/close/{exchange_id}") @@ -984,7 +1031,9 @@ async def api_close_exchange(exchange_id: str): body = r.json() except Exception: body = {"raw": (r.text or "")[:2000]} - return {"exchange": ex, "status_code": r.status_code, "payload": body} + out = {"exchange": ex, "status_code": r.status_code, "payload": body} + _schedule_board_refresh() + return out @app.post("/api/close-all") @@ -1007,6 +1056,7 @@ async def api_close_all(body: CloseAllBody | None = Body(default=None)): return {"id": ex["id"], "name": ex["name"], "status_code": None, "error": str(e)} results = await asyncio.gather(*[one(ex) for ex in targets]) + _schedule_board_refresh() return {"results": list(results)} @@ -1035,7 +1085,9 @@ def api_ping(): "service": "manual-trading-hub", "build": HUB_BUILD, "trade_ui": False, - "features": ["monitor", "settings", "auth"], + "features": ["monitor", "settings", "auth", "board_sse"], + "board_poll_interval_sec": HUB_BOARD_POLL_INTERVAL, + "board_version": board_store.version, "password_required": password_required(), "env_disabled_ids": sorted(env_force_disabled_ids()), "hub_disabled_ids_raw": (os.getenv("HUB_DISABLED_IDS") or ""), diff --git a/manual_trading_hub/hub_board_cache.py b/manual_trading_hub/hub_board_cache.py new file mode 100644 index 0000000..65cdfb6 --- /dev/null +++ b/manual_trading_hub/hub_board_cache.py @@ -0,0 +1,164 @@ +"""监控区 board:后台定时聚合、内存快照、SSE 版本通知。""" + +from __future__ import annotations + +import asyncio +import json +import os +from collections.abc import AsyncIterator, Awaitable, Callable +from typing import Any + +HUB_BOARD_POLL_INTERVAL = float(os.getenv("HUB_BOARD_POLL_INTERVAL", "5")) +HUB_BOARD_SSE_HEARTBEAT_SEC = float(os.getenv("HUB_BOARD_SSE_HEARTBEAT_SEC", "25")) + +BuildFn = Callable[[], Awaitable[dict[str, Any]]] + + +class MonitorBoardStore: + def __init__(self) -> None: + self._lock = asyncio.Lock() + self.version = 0 + self.payload: dict[str, Any] | None = None + self.aggregating = False + self.last_error: str | None = None + self._subscribers: list[asyncio.Queue[str | None]] = [] + self._task: asyncio.Task | None = None + self._stop = asyncio.Event() + self._refresh = asyncio.Event() + self._build_fn: BuildFn | None = None + + async def start(self, build_fn: BuildFn) -> None: + if self._task and not self._task.done(): + return + self._build_fn = build_fn + self._stop.clear() + self._task = asyncio.create_task(self._loop(), name="hub-board-poll") + + async def stop(self) -> None: + self._stop.set() + self._refresh.set() + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + self._broadcast(close=True) + + def request_refresh(self) -> None: + self._refresh.set() + + def snapshot_dict(self) -> dict[str, Any]: + p = self.payload or {} + rows = p.get("rows") + if not isinstance(rows, list): + rows = [] + return { + "ok": p.get("ok", True) if self.payload else False, + "board_version": self.version, + "rows": rows, + "updated_at": p.get("updated_at"), + "aggregating": self.aggregating, + "error": self.last_error or p.get("error"), + "msg": p.get("msg"), + "poll_interval_sec": HUB_BOARD_POLL_INTERVAL, + } + + def event_dict(self) -> dict[str, Any]: + p = self.payload or {} + return { + "board_version": self.version, + "updated_at": p.get("updated_at"), + "aggregating": self.aggregating, + "ok": p.get("ok", True) if self.payload else False, + "error": self.last_error or p.get("error"), + } + + async def _loop(self) -> None: + assert self._build_fn is not None + while not self._stop.is_set(): + await self._aggregate_once(self._build_fn) + if self._stop.is_set(): + break + self._refresh.clear() + sleep_task = asyncio.create_task(asyncio.sleep(HUB_BOARD_POLL_INTERVAL)) + refresh_task = asyncio.create_task(self._refresh.wait()) + done, pending = await asyncio.wait( + {sleep_task, refresh_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + for t in pending: + t.cancel() + + async def _aggregate_once(self, build_fn: BuildFn) -> None: + async with self._lock: + self.aggregating = True + self._broadcast() + try: + result = await build_fn() + if not isinstance(result, dict): + result = {"ok": False, "msg": "聚合返回无效", "rows": []} + except Exception as e: + result = {"ok": False, "msg": str(e), "rows": [], "error": "aggregate_failed"} + async with self._lock: + self.version += 1 + prev_rows = (self.payload or {}).get("rows") if isinstance(self.payload, dict) else None + if result.get("ok") is False and isinstance(prev_rows, list) and prev_rows: + result = {**result, "rows": prev_rows} + self.payload = result + self.last_error = None if result.get("ok") is not False else ( + str(result.get("msg") or result.get("error") or "aggregate_failed") + ) + self.aggregating = False + self._broadcast() + + def _broadcast(self, *, close: bool = False) -> None: + dead: list[asyncio.Queue[str | None]] = [] + for q in self._subscribers: + try: + q.put_nowait(None if close else json.dumps(self.event_dict(), ensure_ascii=False)) + except asyncio.QueueFull: + try: + q.get_nowait() + except asyncio.QueueEmpty: + pass + try: + q.put_nowait(json.dumps(self.event_dict(), ensure_ascii=False)) + except asyncio.QueueFull: + dead.append(q) + except Exception: + dead.append(q) + for q in dead: + if q in self._subscribers: + self._subscribers.remove(q) + + async def iter_sse(self) -> AsyncIterator[str]: + q: asyncio.Queue[str | None] = asyncio.Queue(maxsize=32) + self._subscribers.append(q) + try: + yield _sse_frame(self.event_dict()) + while True: + try: + raw = await asyncio.wait_for(q.get(), timeout=HUB_BOARD_SSE_HEARTBEAT_SEC) + except asyncio.TimeoutError: + yield ": heartbeat\n\n" + continue + if raw is None: + break + try: + data = json.loads(raw) + except Exception: + data = self.event_dict() + yield _sse_frame(data) + finally: + if q in self._subscribers: + self._subscribers.remove(q) + + +def _sse_frame(data: dict[str, Any]) -> str: + body = json.dumps(data, ensure_ascii=False) + return f"event: board\ndata: {body}\n\n" + + +board_store = MonitorBoardStore() diff --git a/manual_trading_hub/static/app.js b/manual_trading_hub/static/app.js index b053e35..e7f425a 100644 --- a/manual_trading_hub/static/app.js +++ b/manual_trading_hub/static/app.js @@ -1,17 +1,20 @@ (function () { const toast = document.getElementById("toast"); let settingsCache = null; - let monitorTimer = null; let authState = { required: false, logged_in: true }; let tpslPending = null; let lastMonitorRows = []; let expandedExchangeId = sessionStorage.getItem("hub_expanded_ex") || ""; const HUB_MONITOR_BOARD_CACHE_KEY = "hub_monitor_board_v1"; const HUB_MONITOR_CACHE_MAX_AGE_MS = 6 * 60 * 60 * 1000; - const HUB_MONITOR_FETCH_TIMEOUT_MS = 55000; + const MONITOR_BOARD_SNAPSHOT_URL = "/api/monitor/board/snapshot"; + const HUB_MONITOR_SNAPSHOT_TIMEOUT_MS = 15000; let lastMonitorBoardUpdatedAt = ""; + let localBoardVersion = 0; let monitorBoardInFlight = false; let monitorBoardSlowHintTimer = null; + let boardEventSource = null; + let sseReconnectTimer = null; async function apiFetch(url, opts) { const r = await fetch(url, opts); @@ -346,9 +349,52 @@ } function stopMonitorPoll() { - clearTimeout(monitorTimer); - clearInterval(monitorTimer); - monitorTimer = null; + closeMonitorBoardStream(); + if (sseReconnectTimer) { + clearTimeout(sseReconnectTimer); + sseReconnectTimer = null; + } + } + + function closeMonitorBoardStream() { + if (boardEventSource) { + boardEventSource.close(); + boardEventSource = null; + } + } + + function connectMonitorBoardStream() { + closeMonitorBoardStream(); + if (!document.getElementById("auto-monitor")?.checked) return; + if (currentPage() !== "monitor") return; + boardEventSource = new EventSource("/api/monitor/board/stream"); + boardEventSource.addEventListener("board", (ev) => { + try { + const st = JSON.parse(ev.data || "{}"); + const ver = Number(st.board_version) || 0; + if (ver > localBoardVersion) { + void fetchMonitorBoardSnapshot({ background: true }); + } else if (st.aggregating && lastMonitorRows.length) { + applyMonitorBoardUi(lastMonitorRows, st.updated_at || lastMonitorBoardUpdatedAt, { + stale: true, + }); + } + } catch (_) {} + }); + boardEventSource.onerror = () => { + closeMonitorBoardStream(); + if (sseReconnectTimer) clearTimeout(sseReconnectTimer); + sseReconnectTimer = setTimeout(() => { + if (currentPage() === "monitor" && document.getElementById("auto-monitor")?.checked) { + connectMonitorBoardStream(); + void fetchMonitorBoardSnapshot({ background: true }); + } + }, 8000); + }; + } + + async function requestMonitorBoardRefresh() { + await apiFetch("/api/monitor/board/refresh", { method: "POST" }); } function clearMonitorBoardSlowHint() { @@ -368,17 +414,18 @@ const sub = el.querySelector(".board-loading-sub"); if (sub) { sub.textContent = - "聚合较慢(四所子代理 + Flask)。可检查 PM2、或设 HUB_BOARD_KEY_PRICES=false 加速;下方超时后会提示错误。"; + "后台首次聚合较慢(四所子代理 + Flask)。可检查 PM2、或设 HUB_BOARD_KEY_PRICES=false 加速。"; } }, 12000); } - function saveMonitorBoardCache(rows, updatedAt) { + function saveMonitorBoardCache(rows, updatedAt, boardVersion) { try { sessionStorage.setItem( HUB_MONITOR_BOARD_CACHE_KEY, JSON.stringify({ version: 1, + board_version: boardVersion != null ? boardVersion : localBoardVersion, updated_at: updatedAt || "", rows: rows || [], saved_at: Date.now(), @@ -409,6 +456,7 @@ if (!cached) return false; lastMonitorRows = cached.rows; lastMonitorBoardUpdatedAt = cached.updated_at || ""; + localBoardVersion = Number(cached.board_version) || 0; applyMonitorBoardUi(cached.rows, lastMonitorBoardUpdatedAt, { stale: true }); return true; } @@ -430,8 +478,8 @@ const ts = tsRaw.replace("T", " "); upd.textContent = options.stale ? ts - ? `缓存 ${ts} · 刷新中…` - : "刷新中…" + ? `缓存 ${ts} · 后台聚合中…` + : "后台聚合中…" : ts ? `UPD ${ts}` : ""; @@ -439,22 +487,10 @@ renderMonitorGrid(rows || []); } - function scheduleNextMonitorPoll() { - stopMonitorPoll(); - if (!document.getElementById("auto-monitor")?.checked) return; - if (currentPage() !== "monitor") return; - monitorTimer = setTimeout(async () => { - await loadMonitorBoard({ background: true }); - scheduleNextMonitorPoll(); - }, 5000); - } - function startMonitorPoll() { - stopMonitorPoll(); const hadCache = restoreMonitorBoardFromCache(); - void loadMonitorBoard({ background: hadCache }).finally(() => { - scheduleNextMonitorPoll(); - }); + void fetchMonitorBoardSnapshot({ showLoading: !hadCache }); + connectMonitorBoardStream(); } async function loadSettings() { @@ -607,39 +643,58 @@ return `已保本`; } - async function loadMonitorBoard(opts) { + async function fetchMonitorBoardSnapshot(opts) { const options = opts || {}; const background = !!options.background; - const force = !!options.force; - if (monitorBoardInFlight && background && !force) return; + const showLoading = !!options.showLoading && !lastMonitorRows.length; const box = document.getElementById("monitor-grid"); - const showLoading = !background && !lastMonitorRows.length; + if (monitorBoardInFlight && background) return; if (showLoading && box) { box.innerHTML = - '