From 1dcf62bb08d8682fdd81355b21b3a0c7fd0292d8 Mon Sep 17 00:00:00 2001 From: dekun Date: Mon, 8 Jun 2026 11:37:00 +0800 Subject: [PATCH] fix: align OHLCV fetch window with limit so chart seeds past 30 bars Co-authored-by: Cursor --- hub_kline_store.py | 134 ++++++++++++++++++++++++++++++---- tests/test_hub_kline_store.py | 51 +++++++++++++ 2 files changed, 172 insertions(+), 13 deletions(-) diff --git a/hub_kline_store.py b/hub_kline_store.py index 34742f5..054a970 100644 --- a/hub_kline_store.py +++ b/hub_kline_store.py @@ -26,6 +26,9 @@ from hub_ohlcv_lib import ( seed_bar_target, ) +HUB_KLINE_MIN_BARS_BEFORE_TAIL = 200 +HUB_KLINE_REMOTE_FETCH_CAP = 1500 + _DEFAULT_RETENTION_DAYS = 15 @@ -327,6 +330,56 @@ def load_bars_range( conn.close() +def count_series_bars( + exchange_key: str, + symbol: str, + timeframe: str, + db_path: Path | None = None, +) -> int: + ex_k = (exchange_key or "").strip().lower() + sym = (symbol or "").strip().upper() + tf = normalize_chart_timeframe(timeframe) + conn = _connect(db_path) + try: + row = conn.execute( + """ + SELECT COUNT(*) AS c FROM ohlcv_bars + WHERE exchange_key=? AND symbol=? AND timeframe=? + """, + (ex_k, sym, tf), + ).fetchone() + return int(row["c"] or 0) if row else 0 + finally: + conn.close() + + +def _remote_fetch_limit( + *, + need: int, + force_refresh: bool, + storage_tf: str, + tail_only: bool, +) -> int: + if tail_only: + return min(need + 20, 300) + cap = HUB_KLINE_REMOTE_FETCH_CAP + if force_refresh: + return min(seed_bar_target(storage_tf), cap) + return min(max(need + 20, 1), cap) + + +def _since_ms_for_span( + *, + now_ms: int, + period_ms: int, + span_bars: int, + cutoff_ms: int, +) -> int: + """拉取窗口起点:跨度必须与 fetch_limit 一致,保证数据能铺到最近。""" + span = max(1, int(span_bars)) + return max(int(cutoff_ms), int(now_ms) - int(period_ms) * span) + + def load_bars_latest( exchange_key: str, symbol: str, @@ -549,8 +602,15 @@ def resolve_chart_bars( now_ms = int(time.time() * 1000) period_display = TIMEFRAME_MS[display_tf] period_storage = TIMEFRAME_MS[storage_tf] + series_bar_count = ( + count_series_bars(ex_k, sym, storage_tf, db_path) if not is_history else 0 + ) if tail_refresh and not is_history: - need = min(need, 30) + min_seed = min(chart_initial_limit(display_tf) // 5, HUB_KLINE_MIN_BARS_BEFORE_TAIL) + if series_bar_count < max(1, min_seed): + tail_refresh = False + else: + need = min(need, 30) cutoff = history_cutoff_ms_for_storage(storage_tf, now_ms) if clear_db and not is_history and not tail_refresh: @@ -605,13 +665,27 @@ def resolve_chart_bars( fetch_limit = min(need + 20, 1500) elif tail_only: anchor_ms = int(newest_db) if newest_db is not None else now_ms - since = max(cutoff, anchor_ms - period_storage * 5) - fetch_limit = min(need + 20, 300) + fetch_limit = _remote_fetch_limit( + need=need, force_refresh=False, storage_tf=storage_tf, tail_only=True + ) + since = _since_ms_for_span( + now_ms=anchor_ms, + period_ms=period_storage, + span_bars=5, + cutoff_ms=cutoff, + ) else: - since = max(cutoff, now_ms - period_storage * min(need, seed_bar_target(storage_tf))) - fetch_limit = min( - seed_bar_target(storage_tf) if force_refresh else need + 20, - 1500, + fetch_limit = _remote_fetch_limit( + need=need, + force_refresh=force_refresh, + storage_tf=storage_tf, + tail_only=False, + ) + since = _since_ms_for_span( + now_ms=now_ms, + period_ms=period_storage, + span_bars=fetch_limit, + cutoff_ms=cutoff, ) remote = remote_fetch( @@ -635,6 +709,33 @@ def resolve_chart_bars( timeframe=storage_tf, db_path=db_path, ) + if not is_history and not tail_only and db_rows: + newest_ms = int(db_rows[-1]["open_time_ms"]) + if newest_ms < int(last_closed) - period_display: + gap_limit = min( + 500, + int((now_ms - newest_ms) // period_storage) + 10, + ) + if gap_limit > 1: + gap_remote = remote_fetch( + symbol=sym, + timeframe=storage_tf, + since_ms=newest_ms, + limit=gap_limit, + ) + if gap_remote.get("ok") and gap_remote.get("bars"): + fetched += upsert_bars( + ex_k, sym, storage_tf, gap_remote["bars"], db_path + ) + db_rows = load_display_rows() + db_rows = normalize_contiguous_db_rows( + db_rows, + period_ms=period_display, + exchange_key=ex_k, + symbol=sym, + timeframe=storage_tf, + db_path=db_path, + ) else: remote_err = remote.get("msg") or remote.get("error") or "实例拉取 K 线失败" if not db_rows: @@ -683,15 +784,22 @@ def resolve_chart_bars( if not is_history and len(db_rows) < need: missing = need - len(db_rows) + backfill_limit = min(missing + 60, HUB_KLINE_REMOTE_FETCH_CAP) if db_rows: oldest = int(db_rows[0]["open_time_ms"]) - backfill_since = max(cutoff, oldest - period_storage * (missing + 40)) - backfill_limit = min(missing + 60, 1500) - else: - backfill_since = max( - cutoff, now_ms - period_storage * min(need, seed_bar_target(storage_tf)) + backfill_since = _since_ms_for_span( + now_ms=oldest, + period_ms=period_storage, + span_bars=backfill_limit, + cutoff_ms=cutoff, + ) + else: + backfill_since = _since_ms_for_span( + now_ms=now_ms, + period_ms=period_storage, + span_bars=backfill_limit, + cutoff_ms=cutoff, ) - backfill_limit = min(need + 20, 1500) try: remote_back = remote_fetch( symbol=sym, diff --git a/tests/test_hub_kline_store.py b/tests/test_hub_kline_store.py index b8d835e..6e97707 100644 --- a/tests/test_hub_kline_store.py +++ b/tests/test_hub_kline_store.py @@ -7,6 +7,8 @@ import unittest from pathlib import Path from hub_kline_store import ( + HUB_KLINE_REMOTE_FETCH_CAP, + _since_ms_for_span, clear_series_bars, init_db, load_bars_before, @@ -391,6 +393,55 @@ class TestHubKlineStore(unittest.TestCase): self.assertGreater(int(out.get("fetched") or 0), 0) self.assertGreaterEqual(len(out.get("candles") or []), 19) + def test_since_span_matches_fetch_limit_not_need(self): + period = TIMEFRAME_MS["15m"] + now_ms = 1_800_000_000_000 + fetch_limit = HUB_KLINE_REMOTE_FETCH_CAP + since = _since_ms_for_span( + now_ms=now_ms, + period_ms=period, + span_bars=fetch_limit, + cutoff_ms=0, + ) + self.assertEqual(since, now_ms - period * fetch_limit) + wrong_since = now_ms - period * chart_initial_limit("15m") + self.assertGreater(since, wrong_since) + + def test_thin_series_tail_refresh_fetches_full_window(self): + init_db(self.db) + period = TIMEFRAME_MS["15m"] + now = int(time.time() * 1000) + last_closed = last_closed_bar_open_ms("15m", now) + bars = [ + { + "open_time_ms": last_closed - period * (150 - i), + "open": 100000, + "high": 100100, + "low": 99900, + "close": 100050, + "volume": 1, + } + for i in range(150) + ] + fetch_calls: list[dict] = [] + + def remote_fetch(**kwargs): + fetch_calls.append(dict(kwargs)) + return {"ok": True, "bars": bars, "price_tick": 0.01} + + out = resolve_chart_bars( + "binance", + "BTC/USDT", + "15m", + remote_fetch, + db_path=self.db, + tail_refresh=True, + ) + self.assertTrue(out.get("ok")) + self.assertGreaterEqual(len(out.get("candles") or []), 100) + self.assertGreater(int(out.get("fetched") or 0), 0) + self.assertTrue(any(int(c.get("limit") or 0) > 30 for c in fetch_calls)) + def test_resolve_before_ms_exhausted(self): init_db(self.db)