diff --git a/hub_kline_store.py b/hub_kline_store.py index ca540fb..dcfab01 100644 --- a/hub_kline_store.py +++ b/hub_kline_store.py @@ -255,9 +255,9 @@ def resolve_chart_bars( remote_err: Optional[str] = None if need_fetch: - since = fetch_start_ms - if db_rows and not force_refresh: - since = min(since, int(db_rows[0]["open_time_ms"])) + since = None + if db_rows and not force_refresh and newest_ok and len(db_rows) >= need: + since = max(0, int(newest_db) - period_ms * 2) remote = remote_fetch( symbol=sym, timeframe=tf, diff --git a/hub_ohlcv_lib.py b/hub_ohlcv_lib.py index bfa0b82..b8eccc7 100644 --- a/hub_ohlcv_lib.py +++ b/hub_ohlcv_lib.py @@ -157,17 +157,21 @@ def fetch_ohlcv_for_hub( if since_ms is not None and int(since_ms) > 0: since = int(since_ms) guard = 0 - while len(collected) < want and guard < 20: + prev_since = None + while len(collected) < want and guard < 60: guard += 1 + req_limit = min(chunk_max, want - len(collected)) batch = exchange.fetch_ohlcv( - ex_sym, timeframe=tf, since=since, limit=min(chunk_max, want - len(collected)) + ex_sym, timeframe=tf, since=since, limit=req_limit ) if not batch: break collected.extend(batch) - since = int(batch[-1][0]) + 1 - if len(batch) < min(chunk_max, want - len(collected)): + next_since = int(batch[-1][0]) + 1 + if prev_since is not None and next_since <= prev_since: break + prev_since = since + since = next_since else: batch = exchange.fetch_ohlcv(ex_sym, timeframe=tf, limit=want) collected = list(batch or []) diff --git a/tests/test_hub_ohlcv_lib.py b/tests/test_hub_ohlcv_lib.py new file mode 100644 index 0000000..e91ea52 --- /dev/null +++ b/tests/test_hub_ohlcv_lib.py @@ -0,0 +1,56 @@ +"""hub_ohlcv_lib:分页拉取(Gate 等单次不足 chunk 时仍继续)。""" +from __future__ import annotations + +import unittest + +from hub_ohlcv_lib import fetch_ohlcv_for_hub + + +class _FakeExchange: + def __init__(self, pages): + self.pages = list(pages) + self.calls = [] + self.markets = {} + + def fetch_ohlcv(self, symbol, timeframe=None, since=None, limit=None): + self.calls.append({"symbol": symbol, "since": since, "limit": limit}) + if not self.pages: + return [] + page = self.pages.pop(0) + return [b for b in page if b[0] >= since] if since else page + + +class TestHubOhlcvLib(unittest.TestCase): + def test_pagination_continues_when_page_smaller_than_chunk(self): + """Gate 等常返回 299 根/次,不应误判为已到末尾。""" + base = 1_700_000_000_000 + step = 4 * 60 * 60 * 1000 + page1 = [ + [base + i * step, 1.0, 1.1, 0.9, 1.05, 100.0] for i in range(299) + ] + page2 = [ + [base + (299 + i) * step, 2.0, 2.1, 1.9, 2.05, 200.0] for i in range(299) + ] + page3 = [ + [base + (598 + i) * step, 3.0, 3.1, 2.9, 3.05, 300.0] for i in range(50) + ] + ex = _FakeExchange([page1, page2, page3]) + + out = fetch_ohlcv_for_hub( + symbol="INJ/USDT", + timeframe="4h", + since_ms=base, + limit=600, + normalize_symbol_input=lambda s: str(s).strip().upper(), + normalize_exchange_symbol=lambda s: f"{s}:USDT" if ":" not in s else s, + ensure_markets_loaded=lambda: None, + exchange=ex, + ) + self.assertTrue(out.get("ok")) + self.assertEqual(len(out.get("bars") or []), 600) + self.assertGreaterEqual(len(ex.calls), 3) + self.assertAlmostEqual(out["bars"][-1]["close"], 3.05) + + +if __name__ == "__main__": + unittest.main()