"""中控 K 线库:分周期保留、聚合与分页读取。""" from __future__ import annotations import tempfile import time import unittest from pathlib import Path from hub_kline_store import ( HUB_KLINE_REMOTE_FETCH_CAP, _since_ms_for_span, clear_series_bars, init_db, load_bars_before, load_bars_latest, purge_retention, purge_timeframe_by_days, resolve_chart_bars, retention_days, trim_contiguous_tail, upsert_bars, ) from hub_ohlcv_lib import ( TIMEFRAME_MS, bar_limit_for_timeframe, chart_fetch_start_ms, chart_initial_limit, last_closed_bar_open_ms, window_start_ms, ) class TestHubKlineStore(unittest.TestCase): def setUp(self): self.tmp = tempfile.TemporaryDirectory() self.db = Path(self.tmp.name) / "test_hub_kline.db" def tearDown(self): self.tmp.cleanup() def test_bar_limits(self): self.assertEqual(bar_limit_for_timeframe("5m"), 5000) self.assertEqual(bar_limit_for_timeframe("1h"), 1000) self.assertEqual(bar_limit_for_timeframe("1d"), 1000) self.assertEqual(bar_limit_for_timeframe("1w"), 500) self.assertEqual(chart_initial_limit("5m"), 2000) self.assertEqual(chart_initial_limit("1h"), 1000) self.assertEqual(chart_initial_limit("1d"), 500) def test_chart_fetch_window_exceeds_retention(self): now = int(time.time() * 1000) need = bar_limit_for_timeframe("1d") fetch_start = chart_fetch_start_ms("1d", need, now) db_start = window_start_ms("1d", need, retention_days(), now) self.assertLess(fetch_start, db_start) def test_purge_retention_5m_one_year(self): init_db(self.db) old_ms = int(time.time() * 1000) - 400 * 86400000 upsert_bars( "okx", "BTC/USDT", "5m", [ { "open_time_ms": old_ms, "open": 1, "high": 2, "low": 0.5, "close": 1.5, "volume": 10, } ], self.db, ) n = purge_timeframe_by_days("5m", 365, self.db) self.assertGreaterEqual(n, 1) rows = load_bars_latest("okx", "BTC/USDT", "5m", 10, self.db) self.assertEqual(len(rows), 0) def test_purge_retention_keeps_1d(self): init_db(self.db) old_ms = int(time.time() * 1000) - 400 * 86400000 upsert_bars( "okx", "BTC/USDT", "1d", [ { "open_time_ms": old_ms, "open": 1, "high": 2, "low": 0.5, "close": 1.5, "volume": 10, } ], self.db, ) purge_retention(self.db) rows = load_bars_latest("okx", "BTC/USDT", "1d", 10, self.db) self.assertEqual(len(rows), 1) def test_resolve_uses_cache_without_remote(self): init_db(self.db) now = int(time.time() * 1000) tf = "5m" period = TIMEFRAME_MS[tf] last_closed = last_closed_bar_open_ms(tf, now) bars = [] for i in range(400): oms = last_closed - (399 - i) * period bars.append( { "open_time_ms": oms, "open": 100 + i, "high": 101 + i, "low": 99 + i, "close": 100.5 + i, "volume": 1000 + i, } ) upsert_bars("okx", "ETH/USDT", tf, bars, self.db) def remote_fetch(**kwargs): self.fail("不应请求交易所") out = resolve_chart_bars( "okx", "ETH/USDT", tf, remote_fetch, db_path=self.db, limit=300, ) self.assertTrue(out.get("ok")) self.assertEqual(len(out.get("candles") or []), 300) def test_resolve_15m_reads_native_bars(self): init_db(self.db) now = int(time.time() * 1000) period = TIMEFRAME_MS["15m"] last_closed = last_closed_bar_open_ms("15m", now) bars = [] for i in range(12): oms = last_closed - (11 - i) * period bars.append( { "open_time_ms": oms, "open": 1.0 + i, "high": 2.0 + i, "low": 0.5 + i, "close": 1.5 + i, "volume": 10.0, } ) upsert_bars("okx", "ETH/USDT", "15m", bars, self.db) def remote_fetch(**kwargs): self.fail("不应请求交易所") out = resolve_chart_bars( "okx", "ETH/USDT", "15m", remote_fetch, db_path=self.db, limit=10, ) self.assertTrue(out.get("ok")) self.assertEqual(out.get("source"), "db") self.assertEqual(out.get("storage_timeframe"), "15m") self.assertGreaterEqual(len(out.get("candles") or []), 10) def test_load_bars_before(self): init_db(self.db) period = TIMEFRAME_MS["1h"] base = 1_700_000_000_000 bars = [] for i in range(5): bars.append( { "open_time_ms": base + i * period, "open": 1, "high": 2, "low": 0.5, "close": 1.5, "volume": 1, } ) upsert_bars("okx", "BTC/USDT", "1h", bars, self.db) before = base + 3 * period got = load_bars_before("okx", "BTC/USDT", "1h", before, 2, self.db) self.assertEqual(len(got), 2) self.assertEqual(got[-1]["open_time_ms"], base + 2 * period) def test_trim_contiguous_tail_drops_orphan_prefix(self): period = TIMEFRAME_MS["15m"] base_old = 1_700_000_000_000 base_new = base_old + period * 500 bars = [] for i in range(3): bars.append( { "open_time_ms": base_old + i * period, "open": 1, "high": 2, "low": 0.5, "close": 1.5, "volume": 1, } ) for i in range(5): bars.append( { "open_time_ms": base_new + i * period, "open": 2, "high": 3, "low": 1.5, "close": 2.5, "volume": 2, } ) trimmed, split = trim_contiguous_tail(bars, period) self.assertEqual(split, 3) self.assertEqual(len(trimmed), 5) self.assertEqual(trimmed[0]["open_time_ms"], base_new) def test_resolve_drops_discontinuous_orphans(self): init_db(self.db) period = TIMEFRAME_MS["15m"] now = int(time.time() * 1000) old_ms = now - period * 800 upsert_bars( "okx", "ONDO/USDT", "15m", [ { "open_time_ms": old_ms, "open": 0.33, "high": 0.34, "low": 0.32, "close": 0.335, "volume": 100, } ], self.db, ) recent = [] start = now - period * 20 for i in range(20): recent.append( { "open_time_ms": start + i * period, "open": 0.35, "high": 0.36, "low": 0.34, "close": 0.355, "volume": 50, } ) def remote_fetch(**kwargs): return {"ok": True, "bars": recent, "price_tick": 0.0001} out = resolve_chart_bars( "okx", "ONDO/USDT", "15m", remote_fetch, db_path=self.db, limit=50, ) self.assertTrue(out.get("ok")) candles = out.get("candles") or [] self.assertGreaterEqual(len(candles), 19) if len(candles) >= 2: for i in range(1, len(candles)): gap = candles[i]["time"] - candles[i - 1]["time"] self.assertLessEqual(gap, int(period / 1000 * 3.0)) def test_resolve_refetches_when_db_has_discontinuous_full_count(self): init_db(self.db) period = TIMEFRAME_MS["15m"] now = int(time.time() * 1000) old_start = now - period * 3000 recent_start = now - period * 25 old_bars = [ { "open_time_ms": old_start + i * period, "open": 62000, "high": 62100, "low": 61900, "close": 62050, "volume": 10, } for i in range(500) ] recent = [ { "open_time_ms": recent_start + i * period, "open": 104000, "high": 104100, "low": 103900, "close": 104050, "volume": 20, } for i in range(30) ] upsert_bars("binance", "BTC/USDT", "15m", old_bars, self.db) upsert_bars("binance", "BTC/USDT", "15m", recent, self.db) fetch_calls = [] def remote_fetch(**kwargs): fetch_calls.append(dict(kwargs)) full = [] start = now - period * 120 for i in range(120): full.append( { "open_time_ms": start + i * period, "open": 104000 + i, "high": 104100 + i, "low": 103900 + i, "close": 104050 + i, "volume": 30, } ) return {"ok": True, "bars": full, "price_tick": 0.01} out = resolve_chart_bars( "binance", "BTC/USDT", "15m", remote_fetch, db_path=self.db, limit=2000, ) self.assertTrue(out.get("ok")) self.assertGreater(len(fetch_calls), 0) self.assertGreaterEqual(len(out.get("candles") or []), 100) self.assertGreater(int(out.get("fetched") or 0), 0) def test_clear_series_and_force_refetch(self): init_db(self.db) period = TIMEFRAME_MS["5m"] now = int(time.time() * 1000) stale = [ { "open_time_ms": now - period * (i + 100), "open": 1, "high": 2, "low": 0.5, "close": 1.5, "volume": 1, } for i in range(40) ] upsert_bars("binance", "BTC/USDT", "5m", stale, self.db) self.assertEqual(len(load_bars_latest("binance", "BTC/USDT", "5m", 100, self.db)), 40) removed = clear_series_bars("binance", "BTC/USDT", "5m", self.db) self.assertEqual(removed, 40) self.assertEqual(len(load_bars_latest("binance", "BTC/USDT", "5m", 100, self.db)), 0) fresh = [ { "open_time_ms": now - period * (20 - i), "open": 10, "high": 11, "low": 9, "close": 10.5, "volume": 2, } for i in range(20) ] def remote_fetch(**kwargs): return {"ok": True, "bars": fresh, "price_tick": 0.01} out = resolve_chart_bars( "binance", "BTC/USDT", "5m", remote_fetch, db_path=self.db, force_refresh=True, clear_db=True, limit=50, ) self.assertTrue(out.get("ok")) self.assertGreaterEqual(int(out.get("cleared") or 0), 0) self.assertGreater(int(out.get("fetched") or 0), 0) self.assertGreaterEqual(len(out.get("candles") or []), 19) def test_since_span_matches_fetch_limit_not_need(self): period = TIMEFRAME_MS["15m"] now_ms = 1_800_000_000_000 fetch_limit = HUB_KLINE_REMOTE_FETCH_CAP since = _since_ms_for_span( now_ms=now_ms, period_ms=period, span_bars=fetch_limit, cutoff_ms=0, ) self.assertEqual(since, now_ms - period * fetch_limit) wrong_since = now_ms - period * chart_initial_limit("15m") self.assertGreater(since, wrong_since) def test_thin_series_tail_refresh_fetches_full_window(self): init_db(self.db) period = TIMEFRAME_MS["15m"] now = int(time.time() * 1000) last_closed = last_closed_bar_open_ms("15m", now) bars = [ { "open_time_ms": last_closed - period * (150 - i), "open": 100000, "high": 100100, "low": 99900, "close": 100050, "volume": 1, } for i in range(150) ] fetch_calls: list[dict] = [] def remote_fetch(**kwargs): fetch_calls.append(dict(kwargs)) return {"ok": True, "bars": bars, "price_tick": 0.01} out = resolve_chart_bars( "binance", "BTC/USDT", "15m", remote_fetch, db_path=self.db, tail_refresh=True, ) self.assertTrue(out.get("ok")) self.assertGreaterEqual(len(out.get("candles") or []), 100) self.assertGreater(int(out.get("fetched") or 0), 0) self.assertTrue(any(int(c.get("limit") or 0) > 30 for c in fetch_calls)) def test_resolve_before_ms_exhausted(self): init_db(self.db) def remote_fetch(**kwargs): return {"ok": False, "msg": "no remote"} out = resolve_chart_bars( "okx", "BTC/USDT", "5m", remote_fetch, db_path=self.db, limit=100, before_ms=int(time.time() * 1000), ) self.assertTrue(out.get("ok")) self.assertEqual(out.get("candles"), []) self.assertTrue(out.get("exhausted")) if __name__ == "__main__": unittest.main()