"""hub_ohlcv_lib:分页拉取(Gate 等单次不足 chunk 时仍继续)。""" from __future__ import annotations import unittest from hub_ohlcv_lib import ( aggregate_ohlcv_bars, bars_spacing_matches_timeframe, fetch_ohlcv_for_hub, normalize_price_tick, ) class _FakeExchange: def __init__(self, pages, *, timeframes=None): self.pages = list(pages) self.calls = [] self.markets = {} self.timeframes = timeframes if timeframes is not None else {} def fetch_ohlcv(self, symbol, timeframe=None, since=None, limit=None): self.calls.append( {"symbol": symbol, "since": since, "limit": limit, "timeframe": timeframe} ) if not self.pages: return [] page = self.pages.pop(0) if since is None: return page return [b for b in page if b[0] >= since] class TestHubOhlcvLib(unittest.TestCase): def test_normalize_price_tick_snaps_powers_of_ten(self): self.assertAlmostEqual(normalize_price_tick(0.00001), 0.00001) self.assertAlmostEqual(normalize_price_tick(0.001), 0.001) self.assertIsNone(normalize_price_tick(0)) def test_price_tick_from_decimal_precision(self): class _Ex: markets = {"BTC/USDT:USDT": {"precision": {"price": 2}, "info": {}, "limits": {}}} def load_markets(self): return self.markets def market(self, sym): return self.markets[sym] def price_to_precision(self, sym, price): return "12345.67" tick = __import__("hub_ohlcv_lib", fromlist=["price_tick_from_market"]).price_tick_from_market( _Ex(), "BTC/USDT:USDT" ) self.assertAlmostEqual(tick, 0.01) def test_price_tick_from_binance_price_filter(self): class _Ex: markets = { "BTC/USDT:USDT": { "precision": {"price": 2}, "info": { "filters": [ {"filterType": "PRICE_FILTER", "tickSize": "0.10"}, {"filterType": "LOT_SIZE", "stepSize": "0.001"}, ] }, "limits": {}, } } def load_markets(self): return self.markets def market(self, sym): return self.markets[sym] def price_to_precision(self, sym, price): return "12345.6" from hub_ohlcv_lib import price_tick_from_market tick = price_tick_from_market(_Ex(), "BTC/USDT:USDT") self.assertAlmostEqual(tick, 0.10) def test_price_tick_from_info_tick_size(self): class _Ex: markets = { "INJ/USDT:USDT": { "precision": {"price": 4}, "info": {"tickSize": "0.001"}, "limits": {}, } } def load_markets(self): return self.markets def market(self, sym): return self.markets[sym] def price_to_precision(self, sym, price): return "7.123" from hub_ohlcv_lib import price_tick_from_market tick = price_tick_from_market(_Ex(), "INJ/USDT:USDT") self.assertAlmostEqual(tick, 0.001) def test_full_fetch_without_since_paginates_okx_style(self): """OKX 等无 since 单次约 300 根,须分页至 limit。""" from hub_ohlcv_lib import TIMEFRAME_MS step = TIMEFRAME_MS["1h"] want = 1000 base = max(0, int(__import__("time").time() * 1000) - want * step) pages = [ [[base + i * step, 1.0, 1.1, 0.9, 1.05, 100.0] for i in range(300)], [[base + (300 + i) * step, 2.0, 2.1, 1.9, 2.05, 200.0] for i in range(300)], [[base + (600 + i) * step, 3.0, 3.1, 2.9, 3.05, 300.0] for i in range(300)], [[base + (900 + i) * step, 4.0, 4.1, 3.9, 4.05, 400.0] for i in range(100)], ] ex = _FakeExchange(pages) out = fetch_ohlcv_for_hub( symbol="ONDO/USDT", timeframe="1h", since_ms=None, limit=want, normalize_symbol_input=lambda s: str(s).strip().upper(), normalize_exchange_symbol=lambda s: f"{s}:USDT" if ":" not in s else s, ensure_markets_loaded=lambda: None, exchange=ex, ) self.assertTrue(out.get("ok")) self.assertEqual(len(out.get("bars") or []), 1000) self.assertGreaterEqual(len(ex.calls), 4) self.assertAlmostEqual(out["bars"][-1]["close"], 4.05) def test_pagination_continues_when_page_smaller_than_chunk(self): """Gate 等常返回 299 根/次,不应误判为已到末尾。""" base = 1_700_000_000_000 step = 4 * 60 * 60 * 1000 page1 = [ [base + i * step, 1.0, 1.1, 0.9, 1.05, 100.0] for i in range(299) ] page2 = [ [base + (299 + i) * step, 2.0, 2.1, 1.9, 2.05, 200.0] for i in range(299) ] page3 = [ [base + (598 + i) * step, 3.0, 3.1, 2.9, 3.05, 300.0] for i in range(50) ] ex = _FakeExchange([page1, page2, page3]) out = fetch_ohlcv_for_hub( symbol="INJ/USDT", timeframe="4h", since_ms=base, limit=600, normalize_symbol_input=lambda s: str(s).strip().upper(), normalize_exchange_symbol=lambda s: f"{s}:USDT" if ":" not in s else s, ensure_markets_loaded=lambda: None, exchange=ex, ) self.assertTrue(out.get("ok")) self.assertEqual(len(out.get("bars") or []), 600) self.assertGreaterEqual(len(ex.calls), 3) self.assertAlmostEqual(out["bars"][-1]["close"], 3.05) def test_pagination_stops_when_next_since_reaches_now(self): """Gate 等:分页 since 不得越过当前时间,避免 from>to。""" from hub_ohlcv_lib import TIMEFRAME_MS step = TIMEFRAME_MS["1d"] now_ms = int(__import__("time").time() * 1000) # 最后一页最后一根 K 的 next_since 将 >= now_ms,应停止不再请求 last_open = ((now_ms // step) - 2) * step page = [ [last_open - step, 1.0, 1.1, 0.9, 1.0, 10.0], [last_open, 1.1, 1.2, 1.0, 1.1, 11.0], ] ex = _FakeExchange([page]) out = fetch_ohlcv_for_hub( symbol="ONDO/USDT", timeframe="1d", since_ms=last_open - step * 5, limit=10, normalize_symbol_input=lambda s: str(s).strip().upper(), normalize_exchange_symbol=lambda s: f"{s}:USDT" if ":" not in s else s, ensure_markets_loaded=lambda: None, exchange=ex, ) self.assertTrue(out.get("ok")) self.assertGreaterEqual(len(out.get("bars") or []), 2) self.assertLessEqual(len(ex.calls), 4) def test_aggregate_ohlcv_bars_buckets(self): from hub_ohlcv_lib import TIMEFRAME_MS h1 = TIMEFRAME_MS["1h"] h4 = TIMEFRAME_MS["4h"] base = (1_700_000_000_000 // h4) * h4 src = [ { "open_time_ms": base + i * h1, "open": 1.0, "high": 2.0, "low": 0.5, "close": 1.5, "volume": 1.0, } for i in range(4) ] out = aggregate_ohlcv_bars(src, "4h") self.assertEqual(len(out), 1) self.assertEqual(out[0]["volume"], 4.0) self.assertEqual(out[0]["high"], 2.0) self.assertEqual(out[0]["low"], 0.5) if __name__ == "__main__": unittest.main()