From 2095839fc3799eab6797b57849071dcfa97dee98 Mon Sep 17 00:00:00 2001 From: dekun Date: Mon, 8 Jun 2026 11:27:00 +0800 Subject: [PATCH] Fix hub chart skipping remote fetch when DB bars are discontinuous. Trim gaps before deciding fetch need, always backfill short contiguous tails, and relax gap detection so tail polls do not block full history loads. Co-authored-by: Cursor --- hub_kline_store.py | 114 ++++++++++++++++++++------- manual_trading_hub/static/index.html | 2 +- tests/test_hub_kline_store.py | 64 ++++++++++++++- 3 files changed, 151 insertions(+), 29 deletions(-) diff --git a/hub_kline_store.py b/hub_kline_store.py index 7717345..f0c7a26 100644 --- a/hub_kline_store.py +++ b/hub_kline_store.py @@ -349,7 +349,7 @@ def trim_contiguous_tail( bars: list[dict[str, Any]], period_ms: int, *, - max_gap_factor: float = 1.5, + max_gap_factor: float = 3.0, ) -> tuple[list[dict[str, Any]], int]: """只保留最近一段连续 K 线,丢弃左侧与主段断开的孤立数据。""" if len(bars) <= 1: @@ -368,6 +368,31 @@ def trim_contiguous_tail( return bars[split:], split +def normalize_contiguous_db_rows( + bars: list[dict[str, Any]], + *, + period_ms: int, + exchange_key: str, + symbol: str, + timeframe: str, + db_path: Path | None = None, + purge_orphans: bool = True, +) -> list[dict[str, Any]]: + """去掉与主段断开的孤立前缀;可选同步清理库内孤立数据。""" + if len(bars) <= 1: + return list(bars) + trimmed, split_at = trim_contiguous_tail(bars, period_ms) + if split_at > 0 and purge_orphans: + purge_bars_open_before( + exchange_key, + symbol, + timeframe, + int(trimmed[0]["open_time_ms"]), + db_path, + ) + return trimmed + + def purge_bars_open_before( exchange_key: str, symbol: str, @@ -494,6 +519,15 @@ def resolve_chart_bars( db_rows: list[dict[str, Any]] = [] if not force_refresh: db_rows = load_display_rows() + if not is_history and db_rows: + db_rows = normalize_contiguous_db_rows( + db_rows, + period_ms=period_display, + exchange_key=ex_k, + symbol=sym, + timeframe=storage_tf, + db_path=db_path, + ) last_closed = last_closed_bar_open_ms(display_tf, now_ms) newest_db = db_rows[-1]["open_time_ms"] if db_rows else None @@ -502,7 +536,9 @@ def resolve_chart_bars( else: newest_ok = newest_db is not None and int(newest_db) >= int(last_closed) - period_display - need_fetch = force_refresh or (not is_history and (len(db_rows) < need or not newest_ok)) + need_fetch = force_refresh or ( + not is_history and (len(db_rows) < need or not newest_ok) + ) if is_history and len(db_rows) < need: need_fetch = True @@ -544,6 +580,15 @@ def resolve_chart_bars( if price_tick is not None: save_symbol_price_tick(ex_k, sym, price_tick, db_path) db_rows = load_display_rows() + if not is_history and db_rows: + db_rows = normalize_contiguous_db_rows( + db_rows, + period_ms=period_display, + exchange_key=ex_k, + symbol=sym, + timeframe=storage_tf, + db_path=db_path, + ) else: remote_err = remote.get("msg") or remote.get("error") or "实例拉取 K 线失败" if not db_rows: @@ -580,42 +625,57 @@ def resolve_chart_bars( except Exception: pass - if not is_history and db_rows and len(db_rows) > 1: - trimmed, split_at = trim_contiguous_tail(db_rows, period_display) - if split_at > 0: - purge_bars_open_before( - ex_k, sym, storage_tf, int(trimmed[0]["open_time_ms"]), db_path - ) - db_rows = trimmed + if not is_history and db_rows: + db_rows = normalize_contiguous_db_rows( + db_rows, + period_ms=period_display, + exchange_key=ex_k, + symbol=sym, + timeframe=storage_tf, + db_path=db_path, + ) - if ( - not is_history - and db_rows - and len(db_rows) < need - and not force_refresh - ): - oldest = int(db_rows[0]["open_time_ms"]) + if not is_history and len(db_rows) < need: missing = need - len(db_rows) - backfill_since = max(cutoff, oldest - period_storage * (missing + 40)) + if db_rows: + oldest = int(db_rows[0]["open_time_ms"]) + backfill_since = max(cutoff, oldest - period_storage * (missing + 40)) + backfill_limit = min(missing + 60, 1500) + else: + backfill_since = max( + cutoff, now_ms - period_storage * min(need, seed_bar_target(storage_tf)) + ) + backfill_limit = min(need + 20, 1500) try: remote_back = remote_fetch( symbol=sym, timeframe=storage_tf, since_ms=backfill_since, - limit=min(missing + 60, 1500), + limit=backfill_limit, ) if remote_back.get("ok") and remote_back.get("bars"): fetched += upsert_bars(ex_k, sym, storage_tf, remote_back["bars"], db_path) + if remote_back.get("price_tick") is not None: + price_tick = remote_back.get("price_tick") + save_symbol_price_tick(ex_k, sym, price_tick, db_path) db_rows = load_display_rows() - if len(db_rows) > 1: - trimmed, split_at = trim_contiguous_tail(db_rows, period_display) - if split_at > 0: - purge_bars_open_before( - ex_k, sym, storage_tf, int(trimmed[0]["open_time_ms"]), db_path - ) - db_rows = trimmed - except Exception: - pass + db_rows = normalize_contiguous_db_rows( + db_rows, + period_ms=period_display, + exchange_key=ex_k, + symbol=sym, + timeframe=storage_tf, + db_path=db_path, + ) + elif not remote_err: + remote_err = ( + remote_back.get("msg") + or remote_back.get("error") + or "实例补拉 K 线失败" + ) + except Exception as e: + if not remote_err: + remote_err = str(e) price_tick = normalize_price_tick(price_tick) if db_rows and price_tick is not None: diff --git a/manual_trading_hub/static/index.html b/manual_trading_hub/static/index.html index 462b58a..cb8342c 100644 --- a/manual_trading_hub/static/index.html +++ b/manual_trading_hub/static/index.html @@ -349,7 +349,7 @@
- + diff --git a/tests/test_hub_kline_store.py b/tests/test_hub_kline_store.py index c18ed95..d7749e0 100644 --- a/tests/test_hub_kline_store.py +++ b/tests/test_hub_kline_store.py @@ -275,7 +275,69 @@ class TestHubKlineStore(unittest.TestCase): if len(candles) >= 2: for i in range(1, len(candles)): gap = candles[i]["time"] - candles[i - 1]["time"] - self.assertLessEqual(gap, int(period / 1000 * 1.5)) + self.assertLessEqual(gap, int(period / 1000 * 3.0)) + + def test_resolve_refetches_when_db_has_discontinuous_full_count(self): + init_db(self.db) + period = TIMEFRAME_MS["15m"] + now = int(time.time() * 1000) + old_start = now - period * 3000 + recent_start = now - period * 25 + old_bars = [ + { + "open_time_ms": old_start + i * period, + "open": 62000, + "high": 62100, + "low": 61900, + "close": 62050, + "volume": 10, + } + for i in range(500) + ] + recent = [ + { + "open_time_ms": recent_start + i * period, + "open": 104000, + "high": 104100, + "low": 103900, + "close": 104050, + "volume": 20, + } + for i in range(30) + ] + upsert_bars("binance", "BTC/USDT", "15m", old_bars, self.db) + upsert_bars("binance", "BTC/USDT", "15m", recent, self.db) + fetch_calls = [] + + def remote_fetch(**kwargs): + fetch_calls.append(dict(kwargs)) + full = [] + start = now - period * 120 + for i in range(120): + full.append( + { + "open_time_ms": start + i * period, + "open": 104000 + i, + "high": 104100 + i, + "low": 103900 + i, + "close": 104050 + i, + "volume": 30, + } + ) + return {"ok": True, "bars": full, "price_tick": 0.01} + + out = resolve_chart_bars( + "binance", + "BTC/USDT", + "15m", + remote_fetch, + db_path=self.db, + limit=2000, + ) + self.assertTrue(out.get("ok")) + self.assertGreater(len(fetch_calls), 0) + self.assertGreaterEqual(len(out.get("candles") or []), 100) + self.assertGreater(int(out.get("fetched") or 0), 0) def test_resolve_before_ms_exhausted(self): init_db(self.db)