Files
crypto_monitor/tests/test_hub_ohlcv_lib.py
T
dekun 3ac854d74c Remove 12h timeframe and stabilize chart wheel zoom.
Drop 12h from market chart options and storage, and avoid left-pan reload and tail refresh from resetting the viewport while zooming.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-08 07:54:37 +08:00

224 lines
7.6 KiB
Python

"""hub_ohlcv_lib:分页拉取(Gate 等单次不足 chunk 时仍继续)。"""
from __future__ import annotations
import unittest
from hub_ohlcv_lib import (
aggregate_ohlcv_bars,
bars_spacing_matches_timeframe,
fetch_ohlcv_for_hub,
normalize_price_tick,
)
class _FakeExchange:
def __init__(self, pages, *, timeframes=None):
self.pages = list(pages)
self.calls = []
self.markets = {}
self.timeframes = timeframes if timeframes is not None else {}
def fetch_ohlcv(self, symbol, timeframe=None, since=None, limit=None):
self.calls.append(
{"symbol": symbol, "since": since, "limit": limit, "timeframe": timeframe}
)
if not self.pages:
return []
page = self.pages.pop(0)
if since is None:
return page
return [b for b in page if b[0] >= since]
class TestHubOhlcvLib(unittest.TestCase):
def test_normalize_price_tick_snaps_powers_of_ten(self):
self.assertAlmostEqual(normalize_price_tick(0.00001), 0.00001)
self.assertAlmostEqual(normalize_price_tick(0.001), 0.001)
self.assertIsNone(normalize_price_tick(0))
def test_price_tick_from_decimal_precision(self):
class _Ex:
markets = {"BTC/USDT:USDT": {"precision": {"price": 2}, "info": {}, "limits": {}}}
def load_markets(self):
return self.markets
def market(self, sym):
return self.markets[sym]
def price_to_precision(self, sym, price):
return "12345.67"
tick = __import__("hub_ohlcv_lib", fromlist=["price_tick_from_market"]).price_tick_from_market(
_Ex(), "BTC/USDT:USDT"
)
self.assertAlmostEqual(tick, 0.01)
def test_price_tick_from_binance_price_filter(self):
class _Ex:
markets = {
"BTC/USDT:USDT": {
"precision": {"price": 2},
"info": {
"filters": [
{"filterType": "PRICE_FILTER", "tickSize": "0.10"},
{"filterType": "LOT_SIZE", "stepSize": "0.001"},
]
},
"limits": {},
}
}
def load_markets(self):
return self.markets
def market(self, sym):
return self.markets[sym]
def price_to_precision(self, sym, price):
return "12345.6"
from hub_ohlcv_lib import price_tick_from_market
tick = price_tick_from_market(_Ex(), "BTC/USDT:USDT")
self.assertAlmostEqual(tick, 0.10)
def test_price_tick_from_info_tick_size(self):
class _Ex:
markets = {
"INJ/USDT:USDT": {
"precision": {"price": 4},
"info": {"tickSize": "0.001"},
"limits": {},
}
}
def load_markets(self):
return self.markets
def market(self, sym):
return self.markets[sym]
def price_to_precision(self, sym, price):
return "7.123"
from hub_ohlcv_lib import price_tick_from_market
tick = price_tick_from_market(_Ex(), "INJ/USDT:USDT")
self.assertAlmostEqual(tick, 0.001)
def test_full_fetch_without_since_paginates_okx_style(self):
"""OKX 等无 since 单次约 300 根,须分页至 limit。"""
from hub_ohlcv_lib import TIMEFRAME_MS
step = TIMEFRAME_MS["1h"]
want = 1000
base = max(0, int(__import__("time").time() * 1000) - want * step)
pages = [
[[base + i * step, 1.0, 1.1, 0.9, 1.05, 100.0] for i in range(300)],
[[base + (300 + i) * step, 2.0, 2.1, 1.9, 2.05, 200.0] for i in range(300)],
[[base + (600 + i) * step, 3.0, 3.1, 2.9, 3.05, 300.0] for i in range(300)],
[[base + (900 + i) * step, 4.0, 4.1, 3.9, 4.05, 400.0] for i in range(100)],
]
ex = _FakeExchange(pages)
out = fetch_ohlcv_for_hub(
symbol="ONDO/USDT",
timeframe="1h",
since_ms=None,
limit=want,
normalize_symbol_input=lambda s: str(s).strip().upper(),
normalize_exchange_symbol=lambda s: f"{s}:USDT" if ":" not in s else s,
ensure_markets_loaded=lambda: None,
exchange=ex,
)
self.assertTrue(out.get("ok"))
self.assertEqual(len(out.get("bars") or []), 1000)
self.assertGreaterEqual(len(ex.calls), 4)
self.assertAlmostEqual(out["bars"][-1]["close"], 4.05)
def test_pagination_continues_when_page_smaller_than_chunk(self):
"""Gate 等常返回 299 根/次,不应误判为已到末尾。"""
base = 1_700_000_000_000
step = 4 * 60 * 60 * 1000
page1 = [
[base + i * step, 1.0, 1.1, 0.9, 1.05, 100.0] for i in range(299)
]
page2 = [
[base + (299 + i) * step, 2.0, 2.1, 1.9, 2.05, 200.0] for i in range(299)
]
page3 = [
[base + (598 + i) * step, 3.0, 3.1, 2.9, 3.05, 300.0] for i in range(50)
]
ex = _FakeExchange([page1, page2, page3])
out = fetch_ohlcv_for_hub(
symbol="INJ/USDT",
timeframe="4h",
since_ms=base,
limit=600,
normalize_symbol_input=lambda s: str(s).strip().upper(),
normalize_exchange_symbol=lambda s: f"{s}:USDT" if ":" not in s else s,
ensure_markets_loaded=lambda: None,
exchange=ex,
)
self.assertTrue(out.get("ok"))
self.assertEqual(len(out.get("bars") or []), 600)
self.assertGreaterEqual(len(ex.calls), 3)
self.assertAlmostEqual(out["bars"][-1]["close"], 3.05)
def test_pagination_stops_when_next_since_reaches_now(self):
"""Gate 等:分页 since 不得越过当前时间,避免 from>to。"""
from hub_ohlcv_lib import TIMEFRAME_MS
step = TIMEFRAME_MS["1d"]
now_ms = int(__import__("time").time() * 1000)
# 最后一页最后一根 K 的 next_since 将 >= now_ms,应停止不再请求
last_open = ((now_ms // step) - 2) * step
page = [
[last_open - step, 1.0, 1.1, 0.9, 1.0, 10.0],
[last_open, 1.1, 1.2, 1.0, 1.1, 11.0],
]
ex = _FakeExchange([page])
out = fetch_ohlcv_for_hub(
symbol="ONDO/USDT",
timeframe="1d",
since_ms=last_open - step * 5,
limit=10,
normalize_symbol_input=lambda s: str(s).strip().upper(),
normalize_exchange_symbol=lambda s: f"{s}:USDT" if ":" not in s else s,
ensure_markets_loaded=lambda: None,
exchange=ex,
)
self.assertTrue(out.get("ok"))
self.assertGreaterEqual(len(out.get("bars") or []), 2)
self.assertLessEqual(len(ex.calls), 4)
def test_aggregate_ohlcv_bars_buckets(self):
from hub_ohlcv_lib import TIMEFRAME_MS
h1 = TIMEFRAME_MS["1h"]
h4 = TIMEFRAME_MS["4h"]
base = (1_700_000_000_000 // h4) * h4
src = [
{
"open_time_ms": base + i * h1,
"open": 1.0,
"high": 2.0,
"low": 0.5,
"close": 1.5,
"volume": 1.0,
}
for i in range(4)
]
out = aggregate_ohlcv_bars(src, "4h")
self.assertEqual(len(out), 1)
self.assertEqual(out[0]["volume"], 4.0)
self.assertEqual(out[0]["high"], 2.0)
self.assertEqual(out[0]["low"], 0.5)
if __name__ == "__main__":
unittest.main()