diff --git a/hub_kline_store.py b/hub_kline_store.py index 7717345..f0c7a26 100644 --- a/hub_kline_store.py +++ b/hub_kline_store.py @@ -349,7 +349,7 @@ def trim_contiguous_tail( bars: list[dict[str, Any]], period_ms: int, *, - max_gap_factor: float = 1.5, + max_gap_factor: float = 3.0, ) -> tuple[list[dict[str, Any]], int]: """只保留最近一段连续 K 线,丢弃左侧与主段断开的孤立数据。""" if len(bars) <= 1: @@ -368,6 +368,31 @@ def trim_contiguous_tail( return bars[split:], split +def normalize_contiguous_db_rows( + bars: list[dict[str, Any]], + *, + period_ms: int, + exchange_key: str, + symbol: str, + timeframe: str, + db_path: Path | None = None, + purge_orphans: bool = True, +) -> list[dict[str, Any]]: + """去掉与主段断开的孤立前缀;可选同步清理库内孤立数据。""" + if len(bars) <= 1: + return list(bars) + trimmed, split_at = trim_contiguous_tail(bars, period_ms) + if split_at > 0 and purge_orphans: + purge_bars_open_before( + exchange_key, + symbol, + timeframe, + int(trimmed[0]["open_time_ms"]), + db_path, + ) + return trimmed + + def purge_bars_open_before( exchange_key: str, symbol: str, @@ -494,6 +519,15 @@ def resolve_chart_bars( db_rows: list[dict[str, Any]] = [] if not force_refresh: db_rows = load_display_rows() + if not is_history and db_rows: + db_rows = normalize_contiguous_db_rows( + db_rows, + period_ms=period_display, + exchange_key=ex_k, + symbol=sym, + timeframe=storage_tf, + db_path=db_path, + ) last_closed = last_closed_bar_open_ms(display_tf, now_ms) newest_db = db_rows[-1]["open_time_ms"] if db_rows else None @@ -502,7 +536,9 @@ def resolve_chart_bars( else: newest_ok = newest_db is not None and int(newest_db) >= int(last_closed) - period_display - need_fetch = force_refresh or (not is_history and (len(db_rows) < need or not newest_ok)) + need_fetch = force_refresh or ( + not is_history and (len(db_rows) < need or not newest_ok) + ) if is_history and len(db_rows) < need: need_fetch = True @@ -544,6 +580,15 @@ def resolve_chart_bars( if price_tick is not None: save_symbol_price_tick(ex_k, sym, price_tick, db_path) db_rows = load_display_rows() + if not is_history and db_rows: + db_rows = normalize_contiguous_db_rows( + db_rows, + period_ms=period_display, + exchange_key=ex_k, + symbol=sym, + timeframe=storage_tf, + db_path=db_path, + ) else: remote_err = remote.get("msg") or remote.get("error") or "实例拉取 K 线失败" if not db_rows: @@ -580,42 +625,57 @@ def resolve_chart_bars( except Exception: pass - if not is_history and db_rows and len(db_rows) > 1: - trimmed, split_at = trim_contiguous_tail(db_rows, period_display) - if split_at > 0: - purge_bars_open_before( - ex_k, sym, storage_tf, int(trimmed[0]["open_time_ms"]), db_path - ) - db_rows = trimmed + if not is_history and db_rows: + db_rows = normalize_contiguous_db_rows( + db_rows, + period_ms=period_display, + exchange_key=ex_k, + symbol=sym, + timeframe=storage_tf, + db_path=db_path, + ) - if ( - not is_history - and db_rows - and len(db_rows) < need - and not force_refresh - ): - oldest = int(db_rows[0]["open_time_ms"]) + if not is_history and len(db_rows) < need: missing = need - len(db_rows) - backfill_since = max(cutoff, oldest - period_storage * (missing + 40)) + if db_rows: + oldest = int(db_rows[0]["open_time_ms"]) + backfill_since = max(cutoff, oldest - period_storage * (missing + 40)) + backfill_limit = min(missing + 60, 1500) + else: + backfill_since = max( + cutoff, now_ms - period_storage * min(need, seed_bar_target(storage_tf)) + ) + backfill_limit = min(need + 20, 1500) try: remote_back = remote_fetch( symbol=sym, timeframe=storage_tf, since_ms=backfill_since, - limit=min(missing + 60, 1500), + limit=backfill_limit, ) if remote_back.get("ok") and remote_back.get("bars"): fetched += upsert_bars(ex_k, sym, storage_tf, remote_back["bars"], db_path) + if remote_back.get("price_tick") is not None: + price_tick = remote_back.get("price_tick") + save_symbol_price_tick(ex_k, sym, price_tick, db_path) db_rows = load_display_rows() - if len(db_rows) > 1: - trimmed, split_at = trim_contiguous_tail(db_rows, period_display) - if split_at > 0: - purge_bars_open_before( - ex_k, sym, storage_tf, int(trimmed[0]["open_time_ms"]), db_path - ) - db_rows = trimmed - except Exception: - pass + db_rows = normalize_contiguous_db_rows( + db_rows, + period_ms=period_display, + exchange_key=ex_k, + symbol=sym, + timeframe=storage_tf, + db_path=db_path, + ) + elif not remote_err: + remote_err = ( + remote_back.get("msg") + or remote_back.get("error") + or "实例补拉 K 线失败" + ) + except Exception as e: + if not remote_err: + remote_err = str(e) price_tick = normalize_price_tick(price_tick) if db_rows and price_tick is not None: diff --git a/manual_trading_hub/static/index.html b/manual_trading_hub/static/index.html index 462b58a..cb8342c 100644 --- a/manual_trading_hub/static/index.html +++ b/manual_trading_hub/static/index.html @@ -349,7 +349,7 @@
- + diff --git a/tests/test_hub_kline_store.py b/tests/test_hub_kline_store.py index c18ed95..d7749e0 100644 --- a/tests/test_hub_kline_store.py +++ b/tests/test_hub_kline_store.py @@ -275,7 +275,69 @@ class TestHubKlineStore(unittest.TestCase): if len(candles) >= 2: for i in range(1, len(candles)): gap = candles[i]["time"] - candles[i - 1]["time"] - self.assertLessEqual(gap, int(period / 1000 * 1.5)) + self.assertLessEqual(gap, int(period / 1000 * 3.0)) + + def test_resolve_refetches_when_db_has_discontinuous_full_count(self): + init_db(self.db) + period = TIMEFRAME_MS["15m"] + now = int(time.time() * 1000) + old_start = now - period * 3000 + recent_start = now - period * 25 + old_bars = [ + { + "open_time_ms": old_start + i * period, + "open": 62000, + "high": 62100, + "low": 61900, + "close": 62050, + "volume": 10, + } + for i in range(500) + ] + recent = [ + { + "open_time_ms": recent_start + i * period, + "open": 104000, + "high": 104100, + "low": 103900, + "close": 104050, + "volume": 20, + } + for i in range(30) + ] + upsert_bars("binance", "BTC/USDT", "15m", old_bars, self.db) + upsert_bars("binance", "BTC/USDT", "15m", recent, self.db) + fetch_calls = [] + + def remote_fetch(**kwargs): + fetch_calls.append(dict(kwargs)) + full = [] + start = now - period * 120 + for i in range(120): + full.append( + { + "open_time_ms": start + i * period, + "open": 104000 + i, + "high": 104100 + i, + "low": 103900 + i, + "close": 104050 + i, + "volume": 30, + } + ) + return {"ok": True, "bars": full, "price_tick": 0.01} + + out = resolve_chart_bars( + "binance", + "BTC/USDT", + "15m", + remote_fetch, + db_path=self.db, + limit=2000, + ) + self.assertTrue(out.get("ok")) + self.assertGreater(len(fetch_calls), 0) + self.assertGreaterEqual(len(out.get("candles") or []), 100) + self.assertGreater(int(out.get("fetched") or 0), 0) def test_resolve_before_ms_exhausted(self): init_db(self.db)