diff --git a/hub_kline_store.py b/hub_kline_store.py index 5dde267..9413894 100644 --- a/hub_kline_store.py +++ b/hub_kline_store.py @@ -286,10 +286,11 @@ def resolve_chart_bars( *, db_path: Path | None = None, force_refresh: bool = False, + tail_refresh: bool = False, ) -> dict[str, Any]: """ 按需:先读库,不足则 remote_fetch(symbol, timeframe, since_ms, limit) 补齐并写库。 - 无后台定时任务;每次调用时顺带 purge 15 天前数据。 + tail_refresh=True 时即使库内「够新」也增量拉取尾部 K 线(未收盘 K 的 OHLC 更新)。 """ init_db(db_path) purged = purge_retention(db_path) @@ -317,6 +318,10 @@ def resolve_chart_bars( period_ms = TIMEFRAME_MS[tf] newest_ok = newest_db is not None and int(newest_db) >= int(last_closed) - period_ms need_fetch = force_refresh or len(db_rows) < need or not newest_ok + tail_only = False + if tail_refresh and db_rows and not force_refresh and not need_fetch: + need_fetch = True + tail_only = True fetched = 0 price_tick: Optional[float] = None @@ -324,8 +329,10 @@ def resolve_chart_bars( if need_fetch: since = fetch_start_ms + if tail_only and newest_db is not None: + since = max(0, int(newest_db) - period_ms * 3) # 仅当库内根数已够且缺口在尾部时做增量拉取;否则全量回看,避免 Gate from>to - if ( + elif ( db_rows and not force_refresh and newest_ok diff --git a/manual_trading_hub/.env.example b/manual_trading_hub/.env.example index 3eb9076..772c023 100644 --- a/manual_trading_hub/.env.example +++ b/manual_trading_hub/.env.example @@ -68,6 +68,11 @@ HUB_TRUST_LAN=true # ---------- 行情区 K 线库(data/hub_kline.db,默认保留 15 天)---------- # HUB_KLINE_RETENTION_DAYS=15 # HUB_KLINE_DB_PATH=/opt/crypto_monitor/manual_trading_hub/data/hub_kline.db +# 行情区后台轮询 + SSE(对齐监控区 board) +# HUB_CHART_POLL_INTERVAL=5 +# HUB_CHART_POSITION_TIMEFRAME=5m +# HUB_CHART_WATCH_TTL_SEC=45 +# HUB_CHART_MAX_SERIES_PER_TICK=24 # --- 子代理 agent.py(在 crypto_monitor_* 目录启动时另设 EXCHANGE / PORT)--- # 与 HUB_BRIDGE_TOKEN 一致时可只设其一;agent 校验请求头 X-Control-Token diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index 4476d21..25a6681 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -52,6 +52,12 @@ from url_public import browser_url, default_review_url, public_origin from urllib.parse import urlencode from hub_board_cache import HUB_BOARD_POLL_INTERVAL, board_store +from hub_chart_cache import ( + HUB_CHART_POLL_INTERVAL, + HUB_CHART_WATCH_TTL_SEC, + chart_poll_store, + parse_series_key, +) try: from exchange_orders import symbols_match as _symbols_match @@ -135,6 +141,65 @@ def _find_exchange(ex_id: str) -> dict | None: return None +async def _run_chart_poll() -> dict: + keys = chart_poll_store.active_series_keys() + if not keys: + return {"ok": True, "series_count": 0, "polled": 0} + polled = 0 + errors: list[str] = [] + for key in keys: + parsed = parse_series_key(key) + if not parsed: + continue + ex_k, sym, tf = parsed + ex = _find_exchange_by_key(ex_k) + if not ex or not ex.get("enabled"): + continue + + ex_ref = ex + sym_ref = sym + tf_ref = tf + + def remote_fetch(**kwargs) -> dict: + tf_use = kwargs.get("timeframe") or tf_ref + return _fetch_instance_ohlcv_sync( + ex_ref, + symbol=kwargs.get("symbol") or sym_ref, + timeframe=tf_use, + since_ms=kwargs.get("since_ms"), + limit=int(kwargs.get("limit") or bar_limit_for_timeframe(tf_use)), + ) + + try: + result = await asyncio.to_thread( + resolve_chart_bars, + ex_k, + sym, + tf, + remote_fetch, + force_refresh=False, + tail_refresh=True, + ) + polled += 1 + chart_poll_store.note_series_result( + ex_k, + sym, + tf, + ok=bool(result.get("ok")), + fetched=int(result.get("fetched") or 0), + error=None if result.get("ok") else str(result.get("msg") or "poll_failed"), + ) + if not result.get("ok"): + errors.append(f"{key}:{result.get('msg')}") + except Exception as e: + chart_poll_store.note_series_result(ex_k, sym, tf, ok=False, error=str(e)) + errors.append(f"{key}:{e}") + out: dict = {"ok": True, "series_count": len(keys), "polled": polled} + if errors: + out["errors"] = errors[:8] + return out + + async def _run_board_aggregate() -> dict: try: body = await asyncio.wait_for(_build_monitor_board_payload(), timeout=HUB_BOARD_TIMEOUT) @@ -159,9 +224,11 @@ def _schedule_board_refresh() -> None: @asynccontextmanager async def _hub_lifespan(_app: FastAPI): await board_store.start(_run_board_aggregate) + await chart_poll_store.start(_run_chart_poll) try: yield finally: + await chart_poll_store.stop() await board_store.stop() @@ -442,9 +509,64 @@ def api_chart_ohlcv( else None, tick, ) + result["chart_version"] = chart_poll_store.version + result["series_version"] = chart_poll_store.series_version(ex_key, sym, timeframe) + result["chart_poll_interval_sec"] = HUB_CHART_POLL_INTERVAL return result +class ChartWatchBody(BaseModel): + exchange_key: str = "" + symbol: str = "" + timeframe: str = "5m" + + +@app.post("/api/chart/watch") +async def api_chart_watch(body: ChartWatchBody = Body(...)): + ex_k = (body.exchange_key or "").strip().lower() + sym = (body.symbol or "").strip().upper() + tf = (body.timeframe or "5m").strip() + if not ex_k or not sym: + raise HTTPException(status_code=400, detail="缺少 exchange_key 或 symbol") + if tf not in CHART_TIMEFRAMES: + raise HTTPException(status_code=400, detail="不支持的周期") + key = chart_poll_store.touch_watch(ex_k, sym, tf) + chart_poll_store.request_refresh() + return { + "ok": True, + "series_key": key, + "series_version": chart_poll_store.series_version(ex_k, sym, tf), + "chart_version": chart_poll_store.version, + "watch_ttl_sec": HUB_CHART_WATCH_TTL_SEC, + } + + +@app.post("/api/chart/unwatch") +async def api_chart_unwatch(body: ChartWatchBody = Body(...)): + chart_poll_store.clear_watch(body.exchange_key, body.symbol, body.timeframe) + return {"ok": True} + + +@app.get("/api/chart/stream") +async def api_chart_stream(): + from fastapi.responses import StreamingResponse + + return StreamingResponse( + chart_poll_store.iter_sse(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + +@app.get("/api/chart/poll/meta") +async def api_chart_poll_meta(): + return chart_poll_store.event_dict() + + @app.get("/api/settings/meta") def api_settings_meta(): po = public_origin() diff --git a/manual_trading_hub/hub_chart_cache.py b/manual_trading_hub/hub_chart_cache.py new file mode 100644 index 0000000..0e642bc --- /dev/null +++ b/manual_trading_hub/hub_chart_cache.py @@ -0,0 +1,259 @@ +"""行情区 K 线:后台轮询订阅 + SSE 版本通知(对齐监控区 board)。""" + +from __future__ import annotations + +import asyncio +import json +import os +import time +from collections.abc import AsyncIterator, Awaitable, Callable +from dataclasses import dataclass +from typing import Any + +from hub_board_cache import board_store + +HUB_CHART_POLL_INTERVAL = float(os.getenv("HUB_CHART_POLL_INTERVAL", "5")) +HUB_CHART_SSE_HEARTBEAT_SEC = float(os.getenv("HUB_CHART_SSE_HEARTBEAT_SEC", "25")) +HUB_CHART_WATCH_TTL_SEC = float(os.getenv("HUB_CHART_WATCH_TTL_SEC", "45")) +HUB_CHART_POSITION_TIMEFRAME = (os.getenv("HUB_CHART_POSITION_TIMEFRAME", "5m") or "5m").strip() +HUB_CHART_MAX_SERIES_PER_TICK = max(1, int(os.getenv("HUB_CHART_MAX_SERIES_PER_TICK", "24"))) + +PollFn = Callable[[], Awaitable[dict[str, Any]]] + + +def series_key(exchange_key: str, symbol: str, timeframe: str) -> str: + ex_k = (exchange_key or "").strip().lower() + sym = (symbol or "").strip().upper() + tf = (timeframe or "").strip() + return f"{ex_k}|{sym}|{tf}" + + +def parse_series_key(key: str) -> tuple[str, str, str] | None: + parts = (key or "").split("|") + if len(parts) != 3: + return None + ex_k, sym, tf = parts[0].strip().lower(), parts[1].strip().upper(), parts[2].strip() + if not ex_k or not sym or not tf: + return None + return ex_k, sym, tf + + +@dataclass +class SeriesState: + version: int = 0 + updated_at: str | None = None + fetched: int = 0 + error: str | None = None + + +class ChartPollStore: + def __init__(self) -> None: + self._lock = asyncio.Lock() + self.version = 0 + self.updated_at: str | None = None + self.polling = False + self.last_error: str | None = None + self._watch_until: dict[str, float] = {} + self._position_keys: set[str] = set() + self._series: dict[str, SeriesState] = {} + self._subscribers: list[asyncio.Queue[str | None]] = [] + self._task: asyncio.Task | None = None + self._stop = asyncio.Event() + self._refresh = asyncio.Event() + self._poll_fn: PollFn | None = None + + async def start(self, poll_fn: PollFn) -> None: + if self._task and not self._task.done(): + return + self._poll_fn = poll_fn + self._stop.clear() + self._task = asyncio.create_task(self._loop(), name="hub-chart-poll") + + async def stop(self) -> None: + self._stop.set() + self._refresh.set() + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + self._broadcast(close=True) + + def request_refresh(self) -> None: + self._refresh.set() + + def touch_watch(self, exchange_key: str, symbol: str, timeframe: str) -> str: + key = series_key(exchange_key, symbol, timeframe) + self._watch_until[key] = time.monotonic() + HUB_CHART_WATCH_TTL_SEC + return key + + def clear_watch(self, exchange_key: str, symbol: str, timeframe: str) -> None: + key = series_key(exchange_key, symbol, timeframe) + self._watch_until.pop(key, None) + + def sync_positions_from_rows(self, rows: list[Any]) -> None: + keys: set[str] = set() + tf = HUB_CHART_POSITION_TIMEFRAME + for row in rows or []: + if not isinstance(row, dict): + continue + ex_key = str(row.get("key") or row.get("exchange_key") or "").strip().lower() + if not ex_key: + ex_id = str(row.get("id") or "").strip() + if ex_id: + ex_key = ex_id.lower() + if not ex_key: + continue + ag = row.get("agent") if isinstance(row.get("agent"), dict) else {} + if ag.get("ok") is False: + continue + for pos in ag.get("positions") or []: + if not isinstance(pos, dict): + continue + sym = str(pos.get("symbol") or "").strip().upper() + if sym: + keys.add(series_key(ex_key, sym, tf)) + self._position_keys = keys + + def active_series_keys(self) -> list[str]: + now = time.monotonic() + watch = {k for k, until in self._watch_until.items() if until > now} + merged = self._position_keys | watch + return sorted(merged)[:HUB_CHART_MAX_SERIES_PER_TICK] + + def series_event_dict(self) -> dict[str, Any]: + out: dict[str, Any] = {} + for key, st in self._series.items(): + out[key] = { + "series_version": st.version, + "updated_at": st.updated_at, + "fetched": st.fetched, + "error": st.error, + } + return out + + def event_dict(self) -> dict[str, Any]: + return { + "chart_version": self.version, + "updated_at": self.updated_at, + "polling": self.polling, + "ok": self.last_error is None, + "error": self.last_error, + "series": self.series_event_dict(), + "poll_interval_sec": HUB_CHART_POLL_INTERVAL, + "position_timeframe": HUB_CHART_POSITION_TIMEFRAME, + } + + def series_version(self, exchange_key: str, symbol: str, timeframe: str) -> int: + key = series_key(exchange_key, symbol, timeframe) + st = self._series.get(key) + return st.version if st else 0 + + async def _loop(self) -> None: + assert self._poll_fn is not None + while not self._stop.is_set(): + await self._poll_once(self._poll_fn) + if self._stop.is_set(): + break + self._refresh.clear() + sleep_task = asyncio.create_task(asyncio.sleep(HUB_CHART_POLL_INTERVAL)) + refresh_task = asyncio.create_task(self._refresh.wait()) + done, pending = await asyncio.wait( + {sleep_task, refresh_task}, + return_when=asyncio.FIRST_COMPLETED, + ) + for t in pending: + t.cancel() + + async def _poll_once(self, poll_fn: PollFn) -> None: + async with self._lock: + self.polling = True + self._broadcast() + try: + snap = board_store.snapshot_dict() + rows = snap.get("rows") if isinstance(snap, dict) else [] + if isinstance(rows, list): + self.sync_positions_from_rows(rows) + result = await poll_fn() + if not isinstance(result, dict): + result = {"ok": False, "msg": "chart poll 返回无效"} + except Exception as e: + result = {"ok": False, "msg": str(e), "error": "chart_poll_failed"} + async with self._lock: + self.version += 1 + self.updated_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + self.last_error = None if result.get("ok") is not False else ( + str(result.get("msg") or result.get("error") or "chart_poll_failed") + ) + self.polling = False + self._broadcast() + + def note_series_result( + self, + exchange_key: str, + symbol: str, + timeframe: str, + *, + ok: bool, + fetched: int = 0, + error: str | None = None, + ) -> None: + key = series_key(exchange_key, symbol, timeframe) + st = self._series.setdefault(key, SeriesState()) + st.version += 1 + st.updated_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + st.fetched = int(fetched or 0) + st.error = error if not ok else None + + def _broadcast(self, *, close: bool = False) -> None: + dead: list[asyncio.Queue[str | None]] = [] + payload = None if close else json.dumps(self.event_dict(), ensure_ascii=False) + for q in self._subscribers: + try: + q.put_nowait(payload) + except asyncio.QueueFull: + try: + q.get_nowait() + except asyncio.QueueEmpty: + pass + try: + q.put_nowait(payload) + except asyncio.QueueFull: + dead.append(q) + except Exception: + dead.append(q) + for q in dead: + if q in self._subscribers: + self._subscribers.remove(q) + + async def iter_sse(self) -> AsyncIterator[str]: + q: asyncio.Queue[str | None] = asyncio.Queue(maxsize=32) + self._subscribers.append(q) + try: + yield _sse_frame(self.event_dict()) + while True: + try: + raw = await asyncio.wait_for(q.get(), timeout=HUB_CHART_SSE_HEARTBEAT_SEC) + except asyncio.TimeoutError: + yield ": heartbeat\n\n" + continue + if raw is None: + break + try: + data = json.loads(raw) + except Exception: + data = self.event_dict() + yield _sse_frame(data) + finally: + if q in self._subscribers: + self._subscribers.remove(q) + + +def _sse_frame(data: dict[str, Any]) -> str: + body = json.dumps(data, ensure_ascii=False) + return f"event: chart\ndata: {body}\n\n" + + +chart_poll_store = ChartPollStore() diff --git a/manual_trading_hub/static/app.js b/manual_trading_hub/static/app.js index fcd31ea..5c809c6 100644 --- a/manual_trading_hub/static/app.js +++ b/manual_trading_hub/static/app.js @@ -533,7 +533,10 @@ if (page === "market" && window.hubMarketChart) { window.hubMarketChart.init(); } else if (window.hubMarketChart) { - if (window.hubMarketChart.stopAutoRefresh) window.hubMarketChart.stopAutoRefresh(); + if (window.hubMarketChart.stopChartLive) window.hubMarketChart.stopChartLive(); + else { + if (window.hubMarketChart.stopAutoRefresh) window.hubMarketChart.stopAutoRefresh(); + } if (window.hubMarketChart.stopPriceTagTimer) window.hubMarketChart.stopPriceTagTimer(); } } diff --git a/manual_trading_hub/static/chart.js b/manual_trading_hub/static/chart.js index 2c26e07..2b92861 100644 --- a/manual_trading_hub/static/chart.js +++ b/manual_trading_hub/static/chart.js @@ -1,8 +1,10 @@ /** - * 中控行情区:K 线 + 成交量;默认最新 OHLCV;5s 自动刷新;价格轴「自动」。 + * 中控行情区:K 线 + 成交量;Hub 后台轮询 + SSE 推送;价格轴「自动」。 */ (function () { const AUTO_REFRESH_MS = 5000; + const CHART_WATCH_HEARTBEAT_MS = 25000; + const CHART_SSE_FALLBACK_MS = 30000; const DEFAULT_VISIBLE_BARS = 200; const RIGHT_OFFSET_BARS = 10; const CANDLE_SCALE_BOTTOM = 0.26; @@ -128,6 +130,11 @@ let loadToken = 0; let marketInited = false; let refreshTimer = null; + let chartWatchTimer = null; + let chartEventSource = null; + let chartSseReconnectTimer = null; + let localChartVersion = 0; + let localSeriesVersion = 0; let lastViewKey = ""; let currentTf = "1d"; let priceTagTimer = null; @@ -1529,18 +1536,117 @@ if (elTf && !elTf.value) elTf.value = "1d"; } + function currentViewSeriesKey() { + const exKey = (elExchange && elExchange.value) || ""; + const sym = (elSymbol && elSymbol.value.trim().toUpperCase()) || ""; + const tf = (elTf && elTf.value) || "1d"; + if (!exKey || !sym) return ""; + return exKey + "|" + sym + "|" + tf; + } + + function postChartWatch() { + const exKey = (elExchange && elExchange.value) || ""; + const sym = (elSymbol && elSymbol.value.trim().toUpperCase()) || ""; + const tf = (elTf && elTf.value) || "1d"; + if (!exKey || !sym) return Promise.resolve(); + return fetch("/api/chart/watch", { + method: "POST", + credentials: "same-origin", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ exchange_key: exKey, symbol: sym, timeframe: tf }), + }).catch(function () {}); + } + + function postChartUnwatch() { + const exKey = (elExchange && elExchange.value) || ""; + const sym = (elSymbol && elSymbol.value.trim().toUpperCase()) || ""; + const tf = (elTf && elTf.value) || "1d"; + if (!exKey || !sym) return Promise.resolve(); + return fetch("/api/chart/unwatch", { + method: "POST", + credentials: "same-origin", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ exchange_key: exKey, symbol: sym, timeframe: tf }), + }).catch(function () {}); + } + + function closeChartStream() { + if (chartEventSource) { + chartEventSource.close(); + chartEventSource = null; + } + } + + function connectChartStream() { + closeChartStream(); + const page = document.getElementById("page-market"); + if (!page || page.classList.contains("hidden")) return; + chartEventSource = new EventSource("/api/chart/stream"); + chartEventSource.addEventListener("chart", function (ev) { + try { + const st = JSON.parse(ev.data || "{}"); + const ver = Number(st.chart_version) || 0; + const series = st.series || {}; + const vKey = currentViewSeriesKey(); + const sVer = vKey && series[vKey] ? Number(series[vKey].series_version) || 0 : 0; + const seriesChanged = vKey && sVer > 0 && sVer !== localSeriesVersion; + if (seriesChanged) { + localSeriesVersion = sVer; + localChartVersion = ver; + loadChart(false, { autoTick: true }); + } else if (ver !== localChartVersion) { + localChartVersion = ver; + } + } catch (_) {} + }); + chartEventSource.onerror = function () { + closeChartStream(); + if (chartSseReconnectTimer) clearTimeout(chartSseReconnectTimer); + chartSseReconnectTimer = setTimeout(function () { + const p = document.getElementById("page-market"); + if (p && !p.classList.contains("hidden")) connectChartStream(); + }, 8000); + }; + } + + function startChartWatchHeartbeat() { + stopChartWatchHeartbeat(); + void postChartWatch(); + chartWatchTimer = setInterval(function () { + const page = document.getElementById("page-market"); + if (!page || page.classList.contains("hidden")) return; + void postChartWatch(); + }, CHART_WATCH_HEARTBEAT_MS); + } + + function stopChartWatchHeartbeat() { + if (chartWatchTimer) clearInterval(chartWatchTimer); + chartWatchTimer = null; + } + function startAutoRefresh() { stopAutoRefresh(); refreshTimer = setInterval(function () { const page = document.getElementById("page-market"); if (!page || page.classList.contains("hidden")) return; loadChart(false, { autoTick: true }); - }, AUTO_REFRESH_MS); + }, CHART_SSE_FALLBACK_MS); } function stopAutoRefresh() { if (refreshTimer) clearInterval(refreshTimer); refreshTimer = null; + if (chartSseReconnectTimer) { + clearTimeout(chartSseReconnectTimer); + chartSseReconnectTimer = null; + } + } + + function stopChartLive() { + stopAutoRefresh(); + stopChartWatchHeartbeat(); + closeChartStream(); + void postChartUnwatch(); } async function loadMeta() { @@ -1563,6 +1669,10 @@ async function loadChart(force, options) { options = options || {}; const autoTick = !!options.autoTick; + if (!autoTick) { + localSeriesVersion = 0; + void postChartWatch(); + } if (!ensureChart()) return; const exKey = (elExchange && elExchange.value) || ""; const sym = (elSymbol && elSymbol.value.trim().toUpperCase()) || ""; @@ -1645,7 +1755,9 @@ (data.from_cache || 0) + " / 新拉 " + (data.fetched || 0) + - ")· 每 5s 刷新"; + ")· 后台 " + + (data.chart_poll_interval_sec || 5) + + "s 轮询 · SSE"; if (data.stale && data.stale_message) { hint += " · 缓存:" + data.stale_message; } @@ -1654,6 +1766,8 @@ elStatus.textContent = hint; } if (elUpdated) elUpdated.textContent = "数据 " + (data.updated_at || "--"); + if (data.series_version != null) localSeriesVersion = Number(data.series_version) || localSeriesVersion; + if (data.chart_version != null) localChartVersion = Number(data.chart_version) || localChartVersion; tickLiveClock(); } catch (e) { if (myToken !== loadToken) return; @@ -1778,6 +1892,8 @@ readQuery(); } focusMarketChartArea(); + connectChartStream(); + startChartWatchHeartbeat(); startAutoRefresh(); await loadChart(false); startPriceTagTimer(); @@ -1790,7 +1906,10 @@ if (elSymbol && sym) elSymbol.value = String(sym).trim().toUpperCase(); if (tf && elTf) elTf.value = tf; lastViewKey = ""; + localSeriesVersion = 0; updateExchangeDisplay(); + connectChartStream(); + startChartWatchHeartbeat(); startAutoRefresh(); await loadChart(false); startPriceTagTimer(); @@ -1800,6 +1919,7 @@ }, startAutoRefresh: startAutoRefresh, stopAutoRefresh: stopAutoRefresh, + stopChartLive: stopChartLive, stopPriceTagTimer: stopPriceTagTimer, }; diff --git a/manual_trading_hub/static/index.html b/manual_trading_hub/static/index.html index d8c3047..a8a3da6 100644 --- a/manual_trading_hub/static/index.html +++ b/manual_trading_hub/static/index.html @@ -248,7 +248,7 @@
- - + +