修复 Gate K 线分页:单次不足 300 根时继续拉取至最新

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-02 13:15:35 +08:00
parent 1f13638732
commit 73a51fd996
3 changed files with 67 additions and 7 deletions
+3 -3
View File
@@ -255,9 +255,9 @@ def resolve_chart_bars(
remote_err: Optional[str] = None remote_err: Optional[str] = None
if need_fetch: if need_fetch:
since = fetch_start_ms since = None
if db_rows and not force_refresh: if db_rows and not force_refresh and newest_ok and len(db_rows) >= need:
since = min(since, int(db_rows[0]["open_time_ms"])) since = max(0, int(newest_db) - period_ms * 2)
remote = remote_fetch( remote = remote_fetch(
symbol=sym, symbol=sym,
timeframe=tf, timeframe=tf,
+8 -4
View File
@@ -157,17 +157,21 @@ def fetch_ohlcv_for_hub(
if since_ms is not None and int(since_ms) > 0: if since_ms is not None and int(since_ms) > 0:
since = int(since_ms) since = int(since_ms)
guard = 0 guard = 0
while len(collected) < want and guard < 20: prev_since = None
while len(collected) < want and guard < 60:
guard += 1 guard += 1
req_limit = min(chunk_max, want - len(collected))
batch = exchange.fetch_ohlcv( batch = exchange.fetch_ohlcv(
ex_sym, timeframe=tf, since=since, limit=min(chunk_max, want - len(collected)) ex_sym, timeframe=tf, since=since, limit=req_limit
) )
if not batch: if not batch:
break break
collected.extend(batch) collected.extend(batch)
since = int(batch[-1][0]) + 1 next_since = int(batch[-1][0]) + 1
if len(batch) < min(chunk_max, want - len(collected)): if prev_since is not None and next_since <= prev_since:
break break
prev_since = since
since = next_since
else: else:
batch = exchange.fetch_ohlcv(ex_sym, timeframe=tf, limit=want) batch = exchange.fetch_ohlcv(ex_sym, timeframe=tf, limit=want)
collected = list(batch or []) collected = list(batch or [])
+56
View File
@@ -0,0 +1,56 @@
"""hub_ohlcv_lib:分页拉取(Gate 等单次不足 chunk 时仍继续)。"""
from __future__ import annotations
import unittest
from hub_ohlcv_lib import fetch_ohlcv_for_hub
class _FakeExchange:
def __init__(self, pages):
self.pages = list(pages)
self.calls = []
self.markets = {}
def fetch_ohlcv(self, symbol, timeframe=None, since=None, limit=None):
self.calls.append({"symbol": symbol, "since": since, "limit": limit})
if not self.pages:
return []
page = self.pages.pop(0)
return [b for b in page if b[0] >= since] if since else page
class TestHubOhlcvLib(unittest.TestCase):
def test_pagination_continues_when_page_smaller_than_chunk(self):
"""Gate 等常返回 299 根/次,不应误判为已到末尾。"""
base = 1_700_000_000_000
step = 4 * 60 * 60 * 1000
page1 = [
[base + i * step, 1.0, 1.1, 0.9, 1.05, 100.0] for i in range(299)
]
page2 = [
[base + (299 + i) * step, 2.0, 2.1, 1.9, 2.05, 200.0] for i in range(299)
]
page3 = [
[base + (598 + i) * step, 3.0, 3.1, 2.9, 3.05, 300.0] for i in range(50)
]
ex = _FakeExchange([page1, page2, page3])
out = fetch_ohlcv_for_hub(
symbol="INJ/USDT",
timeframe="4h",
since_ms=base,
limit=600,
normalize_symbol_input=lambda s: str(s).strip().upper(),
normalize_exchange_symbol=lambda s: f"{s}:USDT" if ":" not in s else s,
ensure_markets_loaded=lambda: None,
exchange=ex,
)
self.assertTrue(out.get("ok"))
self.assertEqual(len(out.get("bars") or []), 600)
self.assertGreaterEqual(len(ex.calls), 3)
self.assertAlmostEqual(out["bars"][-1]["close"], 3.05)
if __name__ == "__main__":
unittest.main()