From 440d1ecbc9d8587247f3652f049928c3504aa08f Mon Sep 17 00:00:00 2001 From: dekun Date: Mon, 8 Jun 2026 11:21:01 +0800 Subject: [PATCH] Fix discontinuous hub chart candles from orphaned DB bars. Keep only the latest contiguous K-line segment, purge isolated stale rows, and backfill when the tail is still shorter than the initial limit. Co-authored-by: Cursor --- hub_kline_store.py | 85 +++++++++++++++++++++++++++ manual_trading_hub/static/index.html | 2 +- tests/test_hub_kline_store.py | 87 ++++++++++++++++++++++++++++ 3 files changed, 173 insertions(+), 1 deletion(-) diff --git a/hub_kline_store.py b/hub_kline_store.py index 425910c..7717345 100644 --- a/hub_kline_store.py +++ b/hub_kline_store.py @@ -345,6 +345,54 @@ def load_bars_before( conn.close() +def trim_contiguous_tail( + bars: list[dict[str, Any]], + period_ms: int, + *, + max_gap_factor: float = 1.5, +) -> tuple[list[dict[str, Any]], int]: + """只保留最近一段连续 K 线,丢弃左侧与主段断开的孤立数据。""" + if len(bars) <= 1: + return list(bars), 0 + try: + period = max(1, int(period_ms)) + except (TypeError, ValueError): + period = 60_000 + max_gap = int(period * max_gap_factor) + split = 0 + for i in range(len(bars) - 1, 0, -1): + gap = int(bars[i]["open_time_ms"]) - int(bars[i - 1]["open_time_ms"]) + if gap > max_gap: + split = i + break + return bars[split:], split + + +def purge_bars_open_before( + exchange_key: str, + symbol: str, + timeframe: str, + open_time_ms: int, + db_path: Path | None = None, +) -> int: + """删除某品种周期下早于 open_time_ms 的 K 线(清理与主段断开的孤立历史)。""" + ex_k = (exchange_key or "").strip().lower() + sym = (symbol or "").strip().upper() + tf = normalize_chart_timeframe(timeframe) + conn = _connect(db_path) + try: + cur = conn.execute( + """ + DELETE FROM ohlcv_bars + WHERE exchange_key=? AND symbol=? AND timeframe=? AND open_time_ms < ? + """, + (ex_k, sym, tf, int(open_time_ms)), + ) + return int(cur.rowcount or 0) + finally: + conn.close() + + def _rows_to_bars(rows) -> list[dict[str, Any]]: return [ { @@ -532,6 +580,43 @@ def resolve_chart_bars( except Exception: pass + if not is_history and db_rows and len(db_rows) > 1: + trimmed, split_at = trim_contiguous_tail(db_rows, period_display) + if split_at > 0: + purge_bars_open_before( + ex_k, sym, storage_tf, int(trimmed[0]["open_time_ms"]), db_path + ) + db_rows = trimmed + + if ( + not is_history + and db_rows + and len(db_rows) < need + and not force_refresh + ): + oldest = int(db_rows[0]["open_time_ms"]) + missing = need - len(db_rows) + backfill_since = max(cutoff, oldest - period_storage * (missing + 40)) + try: + remote_back = remote_fetch( + symbol=sym, + timeframe=storage_tf, + since_ms=backfill_since, + limit=min(missing + 60, 1500), + ) + if remote_back.get("ok") and remote_back.get("bars"): + fetched += upsert_bars(ex_k, sym, storage_tf, remote_back["bars"], db_path) + db_rows = load_display_rows() + if len(db_rows) > 1: + trimmed, split_at = trim_contiguous_tail(db_rows, period_display) + if split_at > 0: + purge_bars_open_before( + ex_k, sym, storage_tf, int(trimmed[0]["open_time_ms"]), db_path + ) + db_rows = trimmed + except Exception: + pass + price_tick = normalize_price_tick(price_tick) if db_rows and price_tick is not None: round_ohlcv_bars_to_tick(db_rows, price_tick) diff --git a/manual_trading_hub/static/index.html b/manual_trading_hub/static/index.html index e64e46b..462b58a 100644 --- a/manual_trading_hub/static/index.html +++ b/manual_trading_hub/static/index.html @@ -349,7 +349,7 @@
- + diff --git a/tests/test_hub_kline_store.py b/tests/test_hub_kline_store.py index 2ec7b3f..c18ed95 100644 --- a/tests/test_hub_kline_store.py +++ b/tests/test_hub_kline_store.py @@ -14,6 +14,7 @@ from hub_kline_store import ( purge_timeframe_by_days, resolve_chart_bars, retention_days, + trim_contiguous_tail, upsert_bars, ) from hub_ohlcv_lib import ( @@ -190,6 +191,92 @@ class TestHubKlineStore(unittest.TestCase): self.assertEqual(len(got), 2) self.assertEqual(got[-1]["open_time_ms"], base + 2 * period) + def test_trim_contiguous_tail_drops_orphan_prefix(self): + period = TIMEFRAME_MS["15m"] + base_old = 1_700_000_000_000 + base_new = base_old + period * 500 + bars = [] + for i in range(3): + bars.append( + { + "open_time_ms": base_old + i * period, + "open": 1, + "high": 2, + "low": 0.5, + "close": 1.5, + "volume": 1, + } + ) + for i in range(5): + bars.append( + { + "open_time_ms": base_new + i * period, + "open": 2, + "high": 3, + "low": 1.5, + "close": 2.5, + "volume": 2, + } + ) + trimmed, split = trim_contiguous_tail(bars, period) + self.assertEqual(split, 3) + self.assertEqual(len(trimmed), 5) + self.assertEqual(trimmed[0]["open_time_ms"], base_new) + + def test_resolve_drops_discontinuous_orphans(self): + init_db(self.db) + period = TIMEFRAME_MS["15m"] + now = int(time.time() * 1000) + old_ms = now - period * 800 + upsert_bars( + "okx", + "ONDO/USDT", + "15m", + [ + { + "open_time_ms": old_ms, + "open": 0.33, + "high": 0.34, + "low": 0.32, + "close": 0.335, + "volume": 100, + } + ], + self.db, + ) + recent = [] + start = now - period * 20 + for i in range(20): + recent.append( + { + "open_time_ms": start + i * period, + "open": 0.35, + "high": 0.36, + "low": 0.34, + "close": 0.355, + "volume": 50, + } + ) + + def remote_fetch(**kwargs): + return {"ok": True, "bars": recent, "price_tick": 0.0001} + + out = resolve_chart_bars( + "okx", + "ONDO/USDT", + "15m", + remote_fetch, + db_path=self.db, + limit=50, + ) + self.assertTrue(out.get("ok")) + candles = out.get("candles") or [] + self.assertGreaterEqual(len(candles), 19) + if len(candles) >= 2: + for i in range(1, len(candles)): + gap = candles[i]["time"] - candles[i - 1]["time"] + self.assertLessEqual(gap, int(period / 1000 * 1.5)) + def test_resolve_before_ms_exhausted(self): init_db(self.db)