feat: push chart tail candles over SSE for faster market refresh

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-08 12:24:25 +08:00
parent 4918699276
commit e68e29629e
4 changed files with 141 additions and 49 deletions
+2
View File
@@ -214,6 +214,8 @@ async def _run_chart_poll() -> dict:
ok=bool(result.get("ok")),
fetched=int(result.get("fetched") or 0),
error=None if result.get("ok") else str(result.get("msg") or "poll_failed"),
candles=result.get("candles") if result.get("ok") else None,
price_tick=result.get("price_tick"),
)
if not result.get("ok"):
errors.append(f"{key}:{result.get('msg')}")
+25 -4
View File
@@ -1,4 +1,4 @@
"""行情区 K 线:后台轮询订阅 + SSE 版本通知(对齐监控区 board)。"""
"""行情区 K 线:后台轮询订阅 + SSE 推送尾部 K 线(对齐监控区 board)。"""
from __future__ import annotations
@@ -17,6 +17,7 @@ HUB_CHART_SSE_HEARTBEAT_SEC = float(os.getenv("HUB_CHART_SSE_HEARTBEAT_SEC", "25
HUB_CHART_WATCH_TTL_SEC = float(os.getenv("HUB_CHART_WATCH_TTL_SEC", "45"))
HUB_CHART_POSITION_TIMEFRAME = (os.getenv("HUB_CHART_POSITION_TIMEFRAME", "5m") or "5m").strip()
HUB_CHART_MAX_SERIES_PER_TICK = max(1, int(os.getenv("HUB_CHART_MAX_SERIES_PER_TICK", "24")))
HUB_CHART_SSE_TAIL_BARS = max(5, min(int(os.getenv("HUB_CHART_SSE_TAIL_BARS", "30")), 120))
PollFn = Callable[[], Awaitable[dict[str, Any]]]
@@ -56,6 +57,7 @@ class ChartPollStore:
self._watch_until: dict[str, float] = {}
self._position_keys: set[str] = set()
self._series: dict[str, SeriesState] = {}
self._push_tails: dict[str, dict[str, Any]] = {}
self._subscribers: list[asyncio.Queue[str | None]] = []
self._task: asyncio.Task | None = None
self._stop = asyncio.Event()
@@ -134,8 +136,8 @@ class ChartPollStore:
}
return out
def event_dict(self) -> dict[str, Any]:
return {
def event_dict(self, *, tails: dict[str, dict[str, Any]] | None = None) -> dict[str, Any]:
out: dict[str, Any] = {
"chart_version": self.version,
"updated_at": self.updated_at,
"polling": self.polling,
@@ -144,7 +146,12 @@ class ChartPollStore:
"series": self.series_event_dict(),
"poll_interval_sec": HUB_CHART_POLL_INTERVAL,
"position_timeframe": HUB_CHART_POSITION_TIMEFRAME,
"push_tails": True,
}
tail_map = tails if tails is not None else self._push_tails
if tail_map:
out["tails"] = tail_map
return out
def series_version(self, exchange_key: str, symbol: str, timeframe: str) -> int:
key = series_key(exchange_key, symbol, timeframe)
@@ -199,6 +206,8 @@ class ChartPollStore:
ok: bool,
fetched: int = 0,
error: str | None = None,
candles: list[dict[str, Any]] | None = None,
price_tick: Any = None,
) -> None:
key = series_key(exchange_key, symbol, timeframe)
st = self._series.setdefault(key, SeriesState())
@@ -206,10 +215,22 @@ class ChartPollStore:
st.updated_at = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
st.fetched = int(fetched or 0)
st.error = error if not ok else None
if ok and candles:
tail = list(candles[-HUB_CHART_SSE_TAIL_BARS :])
if tail:
self._push_tails[key] = {
"series_version": st.version,
"updated_at": st.updated_at,
"fetched": st.fetched,
"candles": tail,
"price_tick": price_tick,
}
def _broadcast(self, *, close: bool = False) -> None:
dead: list[asyncio.Queue[str | None]] = []
payload = None if close else json.dumps(self.event_dict(), ensure_ascii=False)
tails_snap = dict(self._push_tails)
self._push_tails.clear()
payload = None if close else json.dumps(self.event_dict(tails=tails_snap), ensure_ascii=False)
for q in self._subscribers:
try:
q.put_nowait(payload)
+77 -45
View File
@@ -1,10 +1,9 @@
/**
* 中控行情区K 线 + 成交量Hub 后台轮询 + SSE 推送自动控制价格轴与视口跟随
* 中控行情区K 线 + 成交量Hub 后台轮询 + SSE 直推尾部 K 线自动控制价格轴与视口跟随
*/
(function () {
const AUTO_REFRESH_MS = 5000;
const CHART_WATCH_HEARTBEAT_MS = 25000;
const CHART_SSE_FALLBACK_MS = 30000;
const CHART_SSE_FALLBACK_MS = 60000;
const DEFAULT_VISIBLE_BARS = 200;
const CHART_LOAD_LEFT_THRESHOLD = 25;
const CHART_INITIAL_LIMITS = {
@@ -2288,6 +2287,60 @@
}
}
function applyIncomingTailCandles(incoming, meta) {
meta = meta || {};
const vKey = currentViewSeriesKey();
if (!vKey || !lastCandles.length || chartDataLoading) return false;
if (!lastViewKey || vKey !== lastViewKey) return false;
const epochAtStart = chartViewEpoch;
const autoFollow = priceAutoScale;
let savedRange = null;
if (chart) savedRange = chart.timeScale().getVisibleLogicalRange();
if (!incoming || !incoming.length) return false;
if (meta.price_tick != null) {
priceTick = meta.price_tick;
try {
applyChartPriceFormat();
} catch (fmtErr) {
priceTick = null;
applyChartPriceFormat();
}
}
const aligned = alignCandlesToTick(incoming);
if (!autoFollow && applyTailCandlePatch(aligned)) {
/* 手动模式:增量 update,不触碰时间轴 */
} else {
const merged = mergeCandles(lastCandles, aligned, { prepend: false });
applyCandlesToChart(merged, 0, {
preserveRange: false,
skipAutoScale: !autoFollow,
skipRightGap: !autoFollow,
});
if (epochAtStart !== chartViewEpoch) return false;
const n = lastCandles.length;
if (autoFollow) {
applyDefaultVisibleRange();
} else if (savedRange) {
applyPreservedVisibleRange(savedRange, n);
}
}
if (epochAtStart !== chartViewEpoch) return false;
scheduleRangeUiUpdate();
if (posContext) {
updateLivePosPnl();
refreshPosPnlFromBoard();
}
if (meta.series_version != null) {
localSeriesVersion = Number(meta.series_version) || localSeriesVersion;
}
if (meta.chart_version != null) {
localChartVersion = Number(meta.chart_version) || localChartVersion;
}
if (elUpdated) elUpdated.textContent = "数据 " + (meta.updated_at || "--");
tickLiveClock();
return true;
}
async function refreshChartTail() {
const exKey = (elExchange && elExchange.value) || "";
const sym = (elSymbol && elSymbol.value.trim().toUpperCase()) || "";
@@ -2297,9 +2350,6 @@
if (!lastViewKey || vKey !== lastViewKey) return;
const myToken = loadToken;
const epochAtStart = chartViewEpoch;
const autoFollow = priceAutoScale;
let savedRange = null;
if (chart) savedRange = chart.timeScale().getVisibleLogicalRange();
try {
const data = await fetchChartChunk({
exchange_key: exKey,
@@ -2312,43 +2362,12 @@
if (vKey !== lastViewKey) return;
if (epochAtStart !== chartViewEpoch) return;
if (!data.ok || !data.candles || !data.candles.length) return;
if (data.price_tick != null) {
priceTick = data.price_tick;
try {
applyChartPriceFormat();
} catch (fmtErr) {
priceTick = null;
applyChartPriceFormat();
}
}
const incoming = alignCandlesToTick(data.candles);
if (!autoFollow && applyTailCandlePatch(incoming)) {
/* 手动模式:增量更新,不触碰时间轴 */
} else {
const merged = mergeCandles(lastCandles, incoming, { prepend: false });
applyCandlesToChart(merged, 0, {
preserveRange: false,
skipAutoScale: !autoFollow,
skipRightGap: !autoFollow,
});
if (epochAtStart !== chartViewEpoch) return;
const n = lastCandles.length;
if (autoFollow) {
applyDefaultVisibleRange();
} else if (savedRange) {
applyPreservedVisibleRange(savedRange, n);
}
}
if (epochAtStart !== chartViewEpoch) return;
scheduleRangeUiUpdate();
if (posContext) {
updateLivePosPnl();
refreshPosPnlFromBoard();
}
if (data.series_version != null) localSeriesVersion = Number(data.series_version) || localSeriesVersion;
if (data.chart_version != null) localChartVersion = Number(data.chart_version) || localChartVersion;
if (elUpdated) elUpdated.textContent = "数据 " + (data.updated_at || "--");
tickLiveClock();
applyIncomingTailCandles(data.candles, {
price_tick: data.price_tick,
series_version: data.series_version,
chart_version: data.chart_version,
updated_at: data.updated_at,
});
} catch (_) {}
}
@@ -2482,14 +2501,27 @@
chartEventSource.addEventListener("chart", function (ev) {
try {
const st = JSON.parse(ev.data || "{}");
if (st.polling) return;
const ver = Number(st.chart_version) || 0;
const series = st.series || {};
const vKey = currentViewSeriesKey();
const tails = st.tails || {};
const tailPack = vKey && tails[vKey] ? tails[vKey] : null;
if (tailPack && tailPack.candles && tailPack.candles.length) {
if (
applyIncomingTailCandles(tailPack.candles, {
price_tick: tailPack.price_tick,
series_version: tailPack.series_version,
chart_version: ver,
updated_at: tailPack.updated_at || st.updated_at,
})
) {
return;
}
}
const sVer = vKey && series[vKey] ? Number(series[vKey].series_version) || 0 : 0;
const seriesChanged = vKey && sVer > 0 && sVer !== localSeriesVersion;
if (seriesChanged) {
localSeriesVersion = sVer;
localChartVersion = ver;
refreshChartTail();
} else if (posContext) {
updateLivePosPnl();
+37
View File
@@ -35,6 +35,43 @@ class TestHubChartCache(unittest.TestCase):
self.assertIn(series_key("okx_auto", "BTC/USDT", "5m"), keys)
self.assertIn(series_key("gate_trend", "HYPE/USDT", "5m"), keys)
def test_note_series_result_pushes_tail_candles(self) -> None:
store = ChartPollStore()
key = series_key("binance", "BTC/USDT", "15m")
candles = [
{"time": 1_700_000_000 + i * 900, "open": 1, "high": 2, "low": 0.5, "close": 1.5, "volume": 10}
for i in range(40)
]
store.note_series_result(
"binance",
"BTC/USDT",
"15m",
ok=True,
fetched=3,
candles=candles,
price_tick=0.01,
)
ev = store.event_dict()
self.assertIn("tails", ev)
self.assertIn(key, ev["tails"])
tail = ev["tails"][key]
self.assertEqual(len(tail["candles"]), 30)
self.assertEqual(tail["price_tick"], 0.01)
self.assertGreater(tail["series_version"], 0)
def test_broadcast_clears_pending_tails(self) -> None:
store = ChartPollStore()
store.note_series_result(
"gate",
"ONDO/USDT",
"5m",
ok=True,
candles=[{"time": 100, "open": 1, "high": 1, "low": 1, "close": 1, "volume": 1}],
)
store._broadcast()
ev = store.event_dict()
self.assertNotIn("tails", ev)
def test_poll_increments_version(self) -> None:
async def run() -> None:
store = ChartPollStore()