From e68e29629eb17db26c461dcba7d589de9dc56a46 Mon Sep 17 00:00:00 2001 From: dekun Date: Mon, 8 Jun 2026 12:24:25 +0800 Subject: [PATCH] feat: push chart tail candles over SSE for faster market refresh Co-authored-by: Cursor --- manual_trading_hub/hub.py | 2 + manual_trading_hub/hub_chart_cache.py | 29 +++++- manual_trading_hub/static/chart.js | 122 ++++++++++++++++---------- tests/test_hub_chart_cache.py | 37 ++++++++ 4 files changed, 141 insertions(+), 49 deletions(-) diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index 3a7ffcd..ef783c4 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -214,6 +214,8 @@ async def _run_chart_poll() -> dict: ok=bool(result.get("ok")), fetched=int(result.get("fetched") or 0), error=None if result.get("ok") else str(result.get("msg") or "poll_failed"), + candles=result.get("candles") if result.get("ok") else None, + price_tick=result.get("price_tick"), ) if not result.get("ok"): errors.append(f"{key}:{result.get('msg')}") diff --git a/manual_trading_hub/hub_chart_cache.py b/manual_trading_hub/hub_chart_cache.py index 0e642bc..66ca896 100644 --- a/manual_trading_hub/hub_chart_cache.py +++ b/manual_trading_hub/hub_chart_cache.py @@ -1,4 +1,4 @@ -"""行情区 K 线:后台轮询订阅 + SSE 版本通知(对齐监控区 board)。""" +"""行情区 K 线:后台轮询订阅 + SSE 推送尾部 K 线(对齐监控区 board)。""" from __future__ import annotations @@ -17,6 +17,7 @@ HUB_CHART_SSE_HEARTBEAT_SEC = float(os.getenv("HUB_CHART_SSE_HEARTBEAT_SEC", "25 HUB_CHART_WATCH_TTL_SEC = float(os.getenv("HUB_CHART_WATCH_TTL_SEC", "45")) HUB_CHART_POSITION_TIMEFRAME = (os.getenv("HUB_CHART_POSITION_TIMEFRAME", "5m") or "5m").strip() HUB_CHART_MAX_SERIES_PER_TICK = max(1, int(os.getenv("HUB_CHART_MAX_SERIES_PER_TICK", "24"))) +HUB_CHART_SSE_TAIL_BARS = max(5, min(int(os.getenv("HUB_CHART_SSE_TAIL_BARS", "30")), 120)) PollFn = Callable[[], Awaitable[dict[str, Any]]] @@ -56,6 +57,7 @@ class ChartPollStore: self._watch_until: dict[str, float] = {} self._position_keys: set[str] = set() self._series: dict[str, SeriesState] = {} + self._push_tails: dict[str, dict[str, Any]] = {} self._subscribers: list[asyncio.Queue[str | None]] = [] self._task: asyncio.Task | None = None self._stop = asyncio.Event() @@ -134,8 +136,8 @@ class ChartPollStore: } return out - def event_dict(self) -> dict[str, Any]: - return { + def event_dict(self, *, tails: dict[str, dict[str, Any]] | None = None) -> dict[str, Any]: + out: dict[str, Any] = { "chart_version": self.version, "updated_at": self.updated_at, "polling": self.polling, @@ -144,7 +146,12 @@ class ChartPollStore: "series": self.series_event_dict(), "poll_interval_sec": HUB_CHART_POLL_INTERVAL, "position_timeframe": HUB_CHART_POSITION_TIMEFRAME, + "push_tails": True, } + tail_map = tails if tails is not None else self._push_tails + if tail_map: + out["tails"] = tail_map + return out def series_version(self, exchange_key: str, symbol: str, timeframe: str) -> int: key = series_key(exchange_key, symbol, timeframe) @@ -199,6 +206,8 @@ class ChartPollStore: ok: bool, fetched: int = 0, error: str | None = None, + candles: list[dict[str, Any]] | None = None, + price_tick: Any = None, ) -> None: key = series_key(exchange_key, symbol, timeframe) st = self._series.setdefault(key, SeriesState()) @@ -206,10 +215,22 @@ class ChartPollStore: st.updated_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) st.fetched = int(fetched or 0) st.error = error if not ok else None + if ok and candles: + tail = list(candles[-HUB_CHART_SSE_TAIL_BARS :]) + if tail: + self._push_tails[key] = { + "series_version": st.version, + "updated_at": st.updated_at, + "fetched": st.fetched, + "candles": tail, + "price_tick": price_tick, + } def _broadcast(self, *, close: bool = False) -> None: dead: list[asyncio.Queue[str | None]] = [] - payload = None if close else json.dumps(self.event_dict(), ensure_ascii=False) + tails_snap = dict(self._push_tails) + self._push_tails.clear() + payload = None if close else json.dumps(self.event_dict(tails=tails_snap), ensure_ascii=False) for q in self._subscribers: try: q.put_nowait(payload) diff --git a/manual_trading_hub/static/chart.js b/manual_trading_hub/static/chart.js index 640bcb6..acc949d 100644 --- a/manual_trading_hub/static/chart.js +++ b/manual_trading_hub/static/chart.js @@ -1,10 +1,9 @@ /** - * 中控行情区:K 线 + 成交量;Hub 后台轮询 + SSE 推送;「自动」控制价格轴与视口跟随。 + * 中控行情区:K 线 + 成交量;Hub 后台轮询 + SSE 直推尾部 K 线;「自动」控制价格轴与视口跟随。 */ (function () { - const AUTO_REFRESH_MS = 5000; const CHART_WATCH_HEARTBEAT_MS = 25000; - const CHART_SSE_FALLBACK_MS = 30000; + const CHART_SSE_FALLBACK_MS = 60000; const DEFAULT_VISIBLE_BARS = 200; const CHART_LOAD_LEFT_THRESHOLD = 25; const CHART_INITIAL_LIMITS = { @@ -2288,6 +2287,60 @@ } } + function applyIncomingTailCandles(incoming, meta) { + meta = meta || {}; + const vKey = currentViewSeriesKey(); + if (!vKey || !lastCandles.length || chartDataLoading) return false; + if (!lastViewKey || vKey !== lastViewKey) return false; + const epochAtStart = chartViewEpoch; + const autoFollow = priceAutoScale; + let savedRange = null; + if (chart) savedRange = chart.timeScale().getVisibleLogicalRange(); + if (!incoming || !incoming.length) return false; + if (meta.price_tick != null) { + priceTick = meta.price_tick; + try { + applyChartPriceFormat(); + } catch (fmtErr) { + priceTick = null; + applyChartPriceFormat(); + } + } + const aligned = alignCandlesToTick(incoming); + if (!autoFollow && applyTailCandlePatch(aligned)) { + /* 手动模式:增量 update,不触碰时间轴 */ + } else { + const merged = mergeCandles(lastCandles, aligned, { prepend: false }); + applyCandlesToChart(merged, 0, { + preserveRange: false, + skipAutoScale: !autoFollow, + skipRightGap: !autoFollow, + }); + if (epochAtStart !== chartViewEpoch) return false; + const n = lastCandles.length; + if (autoFollow) { + applyDefaultVisibleRange(); + } else if (savedRange) { + applyPreservedVisibleRange(savedRange, n); + } + } + if (epochAtStart !== chartViewEpoch) return false; + scheduleRangeUiUpdate(); + if (posContext) { + updateLivePosPnl(); + refreshPosPnlFromBoard(); + } + if (meta.series_version != null) { + localSeriesVersion = Number(meta.series_version) || localSeriesVersion; + } + if (meta.chart_version != null) { + localChartVersion = Number(meta.chart_version) || localChartVersion; + } + if (elUpdated) elUpdated.textContent = "数据 " + (meta.updated_at || "--"); + tickLiveClock(); + return true; + } + async function refreshChartTail() { const exKey = (elExchange && elExchange.value) || ""; const sym = (elSymbol && elSymbol.value.trim().toUpperCase()) || ""; @@ -2297,9 +2350,6 @@ if (!lastViewKey || vKey !== lastViewKey) return; const myToken = loadToken; const epochAtStart = chartViewEpoch; - const autoFollow = priceAutoScale; - let savedRange = null; - if (chart) savedRange = chart.timeScale().getVisibleLogicalRange(); try { const data = await fetchChartChunk({ exchange_key: exKey, @@ -2312,43 +2362,12 @@ if (vKey !== lastViewKey) return; if (epochAtStart !== chartViewEpoch) return; if (!data.ok || !data.candles || !data.candles.length) return; - if (data.price_tick != null) { - priceTick = data.price_tick; - try { - applyChartPriceFormat(); - } catch (fmtErr) { - priceTick = null; - applyChartPriceFormat(); - } - } - const incoming = alignCandlesToTick(data.candles); - if (!autoFollow && applyTailCandlePatch(incoming)) { - /* 手动模式:增量更新,不触碰时间轴 */ - } else { - const merged = mergeCandles(lastCandles, incoming, { prepend: false }); - applyCandlesToChart(merged, 0, { - preserveRange: false, - skipAutoScale: !autoFollow, - skipRightGap: !autoFollow, - }); - if (epochAtStart !== chartViewEpoch) return; - const n = lastCandles.length; - if (autoFollow) { - applyDefaultVisibleRange(); - } else if (savedRange) { - applyPreservedVisibleRange(savedRange, n); - } - } - if (epochAtStart !== chartViewEpoch) return; - scheduleRangeUiUpdate(); - if (posContext) { - updateLivePosPnl(); - refreshPosPnlFromBoard(); - } - if (data.series_version != null) localSeriesVersion = Number(data.series_version) || localSeriesVersion; - if (data.chart_version != null) localChartVersion = Number(data.chart_version) || localChartVersion; - if (elUpdated) elUpdated.textContent = "数据 " + (data.updated_at || "--"); - tickLiveClock(); + applyIncomingTailCandles(data.candles, { + price_tick: data.price_tick, + series_version: data.series_version, + chart_version: data.chart_version, + updated_at: data.updated_at, + }); } catch (_) {} } @@ -2482,14 +2501,27 @@ chartEventSource.addEventListener("chart", function (ev) { try { const st = JSON.parse(ev.data || "{}"); + if (st.polling) return; const ver = Number(st.chart_version) || 0; const series = st.series || {}; const vKey = currentViewSeriesKey(); + const tails = st.tails || {}; + const tailPack = vKey && tails[vKey] ? tails[vKey] : null; + if (tailPack && tailPack.candles && tailPack.candles.length) { + if ( + applyIncomingTailCandles(tailPack.candles, { + price_tick: tailPack.price_tick, + series_version: tailPack.series_version, + chart_version: ver, + updated_at: tailPack.updated_at || st.updated_at, + }) + ) { + return; + } + } const sVer = vKey && series[vKey] ? Number(series[vKey].series_version) || 0 : 0; const seriesChanged = vKey && sVer > 0 && sVer !== localSeriesVersion; if (seriesChanged) { - localSeriesVersion = sVer; - localChartVersion = ver; refreshChartTail(); } else if (posContext) { updateLivePosPnl(); diff --git a/tests/test_hub_chart_cache.py b/tests/test_hub_chart_cache.py index 086bada..8085d1d 100644 --- a/tests/test_hub_chart_cache.py +++ b/tests/test_hub_chart_cache.py @@ -35,6 +35,43 @@ class TestHubChartCache(unittest.TestCase): self.assertIn(series_key("okx_auto", "BTC/USDT", "5m"), keys) self.assertIn(series_key("gate_trend", "HYPE/USDT", "5m"), keys) + def test_note_series_result_pushes_tail_candles(self) -> None: + store = ChartPollStore() + key = series_key("binance", "BTC/USDT", "15m") + candles = [ + {"time": 1_700_000_000 + i * 900, "open": 1, "high": 2, "low": 0.5, "close": 1.5, "volume": 10} + for i in range(40) + ] + store.note_series_result( + "binance", + "BTC/USDT", + "15m", + ok=True, + fetched=3, + candles=candles, + price_tick=0.01, + ) + ev = store.event_dict() + self.assertIn("tails", ev) + self.assertIn(key, ev["tails"]) + tail = ev["tails"][key] + self.assertEqual(len(tail["candles"]), 30) + self.assertEqual(tail["price_tick"], 0.01) + self.assertGreater(tail["series_version"], 0) + + def test_broadcast_clears_pending_tails(self) -> None: + store = ChartPollStore() + store.note_series_result( + "gate", + "ONDO/USDT", + "5m", + ok=True, + candles=[{"time": 100, "open": 1, "high": 1, "low": 1, "close": 1, "volume": 1}], + ) + store._broadcast() + ev = store.event_dict() + self.assertNotIn("tails", ev) + def test_poll_increments_version(self) -> None: async def run() -> None: store = ChartPollStore()