feat(hub): background board poll every 5s with SSE snapshot updates

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-03 22:17:23 +08:00
parent 6a76993ca8
commit 5b6babd699
6 changed files with 409 additions and 76 deletions
+79 -27
View File
@@ -7,6 +7,7 @@ from __future__ import annotations
import asyncio
import os
import sys
from contextlib import asynccontextmanager
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parent.parent
@@ -51,6 +52,8 @@ from hub_sso import HUB_SSO_TTL_SEC, mint_hub_sso_token, safe_next_path
from url_public import browser_url, default_review_url, public_origin
from urllib.parse import urlencode
from hub_board_cache import HUB_BOARD_POLL_INTERVAL, board_store
try:
from exchange_orders import symbols_match as _symbols_match
except ImportError:
@@ -69,7 +72,7 @@ _allow_pub_raw = (os.getenv("HUB_ALLOW_PUBLIC") or "").strip().lower()
# 云服务器 + 域名反代时设为 true:不做 IP 限制,仅靠 HUB_PASSWORD / 登录页保护
HUB_ALLOW_PUBLIC = _allow_pub_raw in ("1", "true", "yes", "on")
DIR = Path(__file__).resolve().parent
HUB_BUILD = "20260528-hub-market"
HUB_BUILD = "20260603-hub-board-sse"
HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8"))
HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10"))
HUB_BOARD_TIMEOUT = float(os.getenv("HUB_BOARD_TIMEOUT", "45"))
@@ -133,7 +136,37 @@ def _find_exchange(ex_id: str) -> dict | None:
return None
app = FastAPI(title="复盘系统中控", docs_url=None, redoc_url=None)
async def _run_board_aggregate() -> dict:
try:
body = await asyncio.wait_for(_build_monitor_board_payload(), timeout=HUB_BOARD_TIMEOUT)
return {"ok": True, **body}
except asyncio.TimeoutError:
return {
"ok": False,
"rows": [],
"error": "board_timeout",
"msg": (
f"监控聚合超过 {int(HUB_BOARD_TIMEOUT)} 秒。"
"请检查子代理/Flask,或设 HUB_BOARD_KEY_PRICES=false、缩短 HUB_FLASK_TIMEOUT"
),
"updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"),
}
def _schedule_board_refresh() -> None:
board_store.request_refresh()
@asynccontextmanager
async def _hub_lifespan(_app: FastAPI):
await board_store.start(_run_board_aggregate)
try:
yield
finally:
await board_store.stop()
app = FastAPI(title="复盘系统中控", docs_url=None, redoc_url=None, lifespan=_hub_lifespan)
STATIC_DIR = DIR / "static"
if STATIC_DIR.is_dir():
app.mount("/assets", StaticFiles(directory=str(STATIC_DIR)), name="assets")
@@ -768,25 +801,31 @@ async def _build_monitor_board_payload() -> dict:
@app.get("/api/monitor/board")
async def api_monitor_board():
try:
return await asyncio.wait_for(_build_monitor_board_payload(), timeout=HUB_BOARD_TIMEOUT)
except asyncio.TimeoutError:
return JSONResponse(
{
"ok": False,
"rows": [],
"error": "board_timeout",
"msg": (
f"监控聚合超过 {int(HUB_BOARD_TIMEOUT)} 秒。"
"请检查子代理/Flask,或设 HUB_BOARD_KEY_PRICES=false、缩短 HUB_FLASK_TIMEOUT"
),
"updated_at": __import__("datetime").datetime.now().isoformat(
timespec="seconds"
),
},
status_code=504,
)
@app.get("/api/monitor/board/snapshot")
async def api_monitor_board_snapshot():
"""读后台缓存快照;完整聚合由 hub 每 HUB_BOARD_POLL_INTERVAL 秒执行。"""
return board_store.snapshot_dict()
@app.get("/api/monitor/board/stream")
async def api_monitor_board_stream():
from fastapi.responses import StreamingResponse
return StreamingResponse(
board_store.iter_sse(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.post("/api/monitor/board/refresh")
async def api_monitor_board_refresh():
_schedule_board_refresh()
return {"ok": True, "board_version": board_store.version}
def _require_hub_logged_in(request: Request) -> None:
@@ -877,12 +916,14 @@ async def api_cancel_order(exchange_id: str, body: CancelOrderBody):
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/orders/{exchange_id}/cancel-symbol")
@@ -902,12 +943,14 @@ async def api_cancel_symbol_orders(exchange_id: str, body: CancelSymbolOrdersBod
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/close/{exchange_id}/position")
@@ -933,12 +976,14 @@ async def api_close_position(exchange_id: str, body: ClosePositionBody):
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/orders/{exchange_id}/place-tpsl")
@@ -964,12 +1009,14 @@ async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody):
payload = r.json()
except Exception:
payload = {"raw": (r.text or "")[:2000]}
return {
out = {
"exchange": ex,
"status_code": r.status_code,
"payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")),
}
_schedule_board_refresh()
return out
@app.post("/api/close/{exchange_id}")
@@ -984,7 +1031,9 @@ async def api_close_exchange(exchange_id: str):
body = r.json()
except Exception:
body = {"raw": (r.text or "")[:2000]}
return {"exchange": ex, "status_code": r.status_code, "payload": body}
out = {"exchange": ex, "status_code": r.status_code, "payload": body}
_schedule_board_refresh()
return out
@app.post("/api/close-all")
@@ -1007,6 +1056,7 @@ async def api_close_all(body: CloseAllBody | None = Body(default=None)):
return {"id": ex["id"], "name": ex["name"], "status_code": None, "error": str(e)}
results = await asyncio.gather(*[one(ex) for ex in targets])
_schedule_board_refresh()
return {"results": list(results)}
@@ -1035,7 +1085,9 @@ def api_ping():
"service": "manual-trading-hub",
"build": HUB_BUILD,
"trade_ui": False,
"features": ["monitor", "settings", "auth"],
"features": ["monitor", "settings", "auth", "board_sse"],
"board_poll_interval_sec": HUB_BOARD_POLL_INTERVAL,
"board_version": board_store.version,
"password_required": password_required(),
"env_disabled_ids": sorted(env_force_disabled_ids()),
"hub_disabled_ids_raw": (os.getenv("HUB_DISABLED_IDS") or ""),