1dcf62bb08
Co-authored-by: Cursor <cursoragent@cursor.com>
467 lines
14 KiB
Python
467 lines
14 KiB
Python
"""中控 K 线库:分周期保留、聚合与分页读取。"""
|
|
from __future__ import annotations
|
|
|
|
import tempfile
|
|
import time
|
|
import unittest
|
|
from pathlib import Path
|
|
|
|
from hub_kline_store import (
|
|
HUB_KLINE_REMOTE_FETCH_CAP,
|
|
_since_ms_for_span,
|
|
clear_series_bars,
|
|
init_db,
|
|
load_bars_before,
|
|
load_bars_latest,
|
|
purge_retention,
|
|
purge_timeframe_by_days,
|
|
resolve_chart_bars,
|
|
retention_days,
|
|
trim_contiguous_tail,
|
|
upsert_bars,
|
|
)
|
|
from hub_ohlcv_lib import (
|
|
TIMEFRAME_MS,
|
|
bar_limit_for_timeframe,
|
|
chart_fetch_start_ms,
|
|
chart_initial_limit,
|
|
last_closed_bar_open_ms,
|
|
window_start_ms,
|
|
)
|
|
|
|
|
|
class TestHubKlineStore(unittest.TestCase):
|
|
def setUp(self):
|
|
self.tmp = tempfile.TemporaryDirectory()
|
|
self.db = Path(self.tmp.name) / "test_hub_kline.db"
|
|
|
|
def tearDown(self):
|
|
self.tmp.cleanup()
|
|
|
|
def test_bar_limits(self):
|
|
self.assertEqual(bar_limit_for_timeframe("5m"), 5000)
|
|
self.assertEqual(bar_limit_for_timeframe("1h"), 1000)
|
|
self.assertEqual(bar_limit_for_timeframe("1d"), 1000)
|
|
self.assertEqual(bar_limit_for_timeframe("1w"), 500)
|
|
self.assertEqual(chart_initial_limit("5m"), 2000)
|
|
self.assertEqual(chart_initial_limit("1h"), 1000)
|
|
self.assertEqual(chart_initial_limit("1d"), 500)
|
|
|
|
def test_chart_fetch_window_exceeds_retention(self):
|
|
now = int(time.time() * 1000)
|
|
need = bar_limit_for_timeframe("1d")
|
|
fetch_start = chart_fetch_start_ms("1d", need, now)
|
|
db_start = window_start_ms("1d", need, retention_days(), now)
|
|
self.assertLess(fetch_start, db_start)
|
|
|
|
def test_purge_retention_5m_one_year(self):
|
|
init_db(self.db)
|
|
old_ms = int(time.time() * 1000) - 400 * 86400000
|
|
upsert_bars(
|
|
"okx",
|
|
"BTC/USDT",
|
|
"5m",
|
|
[
|
|
{
|
|
"open_time_ms": old_ms,
|
|
"open": 1,
|
|
"high": 2,
|
|
"low": 0.5,
|
|
"close": 1.5,
|
|
"volume": 10,
|
|
}
|
|
],
|
|
self.db,
|
|
)
|
|
n = purge_timeframe_by_days("5m", 365, self.db)
|
|
self.assertGreaterEqual(n, 1)
|
|
rows = load_bars_latest("okx", "BTC/USDT", "5m", 10, self.db)
|
|
self.assertEqual(len(rows), 0)
|
|
|
|
def test_purge_retention_keeps_1d(self):
|
|
init_db(self.db)
|
|
old_ms = int(time.time() * 1000) - 400 * 86400000
|
|
upsert_bars(
|
|
"okx",
|
|
"BTC/USDT",
|
|
"1d",
|
|
[
|
|
{
|
|
"open_time_ms": old_ms,
|
|
"open": 1,
|
|
"high": 2,
|
|
"low": 0.5,
|
|
"close": 1.5,
|
|
"volume": 10,
|
|
}
|
|
],
|
|
self.db,
|
|
)
|
|
purge_retention(self.db)
|
|
rows = load_bars_latest("okx", "BTC/USDT", "1d", 10, self.db)
|
|
self.assertEqual(len(rows), 1)
|
|
|
|
def test_resolve_uses_cache_without_remote(self):
|
|
init_db(self.db)
|
|
now = int(time.time() * 1000)
|
|
tf = "5m"
|
|
period = TIMEFRAME_MS[tf]
|
|
last_closed = last_closed_bar_open_ms(tf, now)
|
|
bars = []
|
|
for i in range(400):
|
|
oms = last_closed - (399 - i) * period
|
|
bars.append(
|
|
{
|
|
"open_time_ms": oms,
|
|
"open": 100 + i,
|
|
"high": 101 + i,
|
|
"low": 99 + i,
|
|
"close": 100.5 + i,
|
|
"volume": 1000 + i,
|
|
}
|
|
)
|
|
upsert_bars("okx", "ETH/USDT", tf, bars, self.db)
|
|
|
|
def remote_fetch(**kwargs):
|
|
self.fail("不应请求交易所")
|
|
|
|
out = resolve_chart_bars(
|
|
"okx",
|
|
"ETH/USDT",
|
|
tf,
|
|
remote_fetch,
|
|
db_path=self.db,
|
|
limit=300,
|
|
)
|
|
self.assertTrue(out.get("ok"))
|
|
self.assertEqual(len(out.get("candles") or []), 300)
|
|
|
|
def test_resolve_15m_reads_native_bars(self):
|
|
init_db(self.db)
|
|
now = int(time.time() * 1000)
|
|
period = TIMEFRAME_MS["15m"]
|
|
last_closed = last_closed_bar_open_ms("15m", now)
|
|
bars = []
|
|
for i in range(12):
|
|
oms = last_closed - (11 - i) * period
|
|
bars.append(
|
|
{
|
|
"open_time_ms": oms,
|
|
"open": 1.0 + i,
|
|
"high": 2.0 + i,
|
|
"low": 0.5 + i,
|
|
"close": 1.5 + i,
|
|
"volume": 10.0,
|
|
}
|
|
)
|
|
upsert_bars("okx", "ETH/USDT", "15m", bars, self.db)
|
|
|
|
def remote_fetch(**kwargs):
|
|
self.fail("不应请求交易所")
|
|
|
|
out = resolve_chart_bars(
|
|
"okx",
|
|
"ETH/USDT",
|
|
"15m",
|
|
remote_fetch,
|
|
db_path=self.db,
|
|
limit=10,
|
|
)
|
|
self.assertTrue(out.get("ok"))
|
|
self.assertEqual(out.get("source"), "db")
|
|
self.assertEqual(out.get("storage_timeframe"), "15m")
|
|
self.assertGreaterEqual(len(out.get("candles") or []), 10)
|
|
|
|
def test_load_bars_before(self):
|
|
init_db(self.db)
|
|
period = TIMEFRAME_MS["1h"]
|
|
base = 1_700_000_000_000
|
|
bars = []
|
|
for i in range(5):
|
|
bars.append(
|
|
{
|
|
"open_time_ms": base + i * period,
|
|
"open": 1,
|
|
"high": 2,
|
|
"low": 0.5,
|
|
"close": 1.5,
|
|
"volume": 1,
|
|
}
|
|
)
|
|
upsert_bars("okx", "BTC/USDT", "1h", bars, self.db)
|
|
before = base + 3 * period
|
|
got = load_bars_before("okx", "BTC/USDT", "1h", before, 2, self.db)
|
|
self.assertEqual(len(got), 2)
|
|
self.assertEqual(got[-1]["open_time_ms"], base + 2 * period)
|
|
|
|
def test_trim_contiguous_tail_drops_orphan_prefix(self):
|
|
period = TIMEFRAME_MS["15m"]
|
|
base_old = 1_700_000_000_000
|
|
base_new = base_old + period * 500
|
|
bars = []
|
|
for i in range(3):
|
|
bars.append(
|
|
{
|
|
"open_time_ms": base_old + i * period,
|
|
"open": 1,
|
|
"high": 2,
|
|
"low": 0.5,
|
|
"close": 1.5,
|
|
"volume": 1,
|
|
}
|
|
)
|
|
for i in range(5):
|
|
bars.append(
|
|
{
|
|
"open_time_ms": base_new + i * period,
|
|
"open": 2,
|
|
"high": 3,
|
|
"low": 1.5,
|
|
"close": 2.5,
|
|
"volume": 2,
|
|
}
|
|
)
|
|
trimmed, split = trim_contiguous_tail(bars, period)
|
|
self.assertEqual(split, 3)
|
|
self.assertEqual(len(trimmed), 5)
|
|
self.assertEqual(trimmed[0]["open_time_ms"], base_new)
|
|
|
|
def test_resolve_drops_discontinuous_orphans(self):
|
|
init_db(self.db)
|
|
period = TIMEFRAME_MS["15m"]
|
|
now = int(time.time() * 1000)
|
|
old_ms = now - period * 800
|
|
upsert_bars(
|
|
"okx",
|
|
"ONDO/USDT",
|
|
"15m",
|
|
[
|
|
{
|
|
"open_time_ms": old_ms,
|
|
"open": 0.33,
|
|
"high": 0.34,
|
|
"low": 0.32,
|
|
"close": 0.335,
|
|
"volume": 100,
|
|
}
|
|
],
|
|
self.db,
|
|
)
|
|
recent = []
|
|
start = now - period * 20
|
|
for i in range(20):
|
|
recent.append(
|
|
{
|
|
"open_time_ms": start + i * period,
|
|
"open": 0.35,
|
|
"high": 0.36,
|
|
"low": 0.34,
|
|
"close": 0.355,
|
|
"volume": 50,
|
|
}
|
|
)
|
|
|
|
def remote_fetch(**kwargs):
|
|
return {"ok": True, "bars": recent, "price_tick": 0.0001}
|
|
|
|
out = resolve_chart_bars(
|
|
"okx",
|
|
"ONDO/USDT",
|
|
"15m",
|
|
remote_fetch,
|
|
db_path=self.db,
|
|
limit=50,
|
|
)
|
|
self.assertTrue(out.get("ok"))
|
|
candles = out.get("candles") or []
|
|
self.assertGreaterEqual(len(candles), 19)
|
|
if len(candles) >= 2:
|
|
for i in range(1, len(candles)):
|
|
gap = candles[i]["time"] - candles[i - 1]["time"]
|
|
self.assertLessEqual(gap, int(period / 1000 * 3.0))
|
|
|
|
def test_resolve_refetches_when_db_has_discontinuous_full_count(self):
|
|
init_db(self.db)
|
|
period = TIMEFRAME_MS["15m"]
|
|
now = int(time.time() * 1000)
|
|
old_start = now - period * 3000
|
|
recent_start = now - period * 25
|
|
old_bars = [
|
|
{
|
|
"open_time_ms": old_start + i * period,
|
|
"open": 62000,
|
|
"high": 62100,
|
|
"low": 61900,
|
|
"close": 62050,
|
|
"volume": 10,
|
|
}
|
|
for i in range(500)
|
|
]
|
|
recent = [
|
|
{
|
|
"open_time_ms": recent_start + i * period,
|
|
"open": 104000,
|
|
"high": 104100,
|
|
"low": 103900,
|
|
"close": 104050,
|
|
"volume": 20,
|
|
}
|
|
for i in range(30)
|
|
]
|
|
upsert_bars("binance", "BTC/USDT", "15m", old_bars, self.db)
|
|
upsert_bars("binance", "BTC/USDT", "15m", recent, self.db)
|
|
fetch_calls = []
|
|
|
|
def remote_fetch(**kwargs):
|
|
fetch_calls.append(dict(kwargs))
|
|
full = []
|
|
start = now - period * 120
|
|
for i in range(120):
|
|
full.append(
|
|
{
|
|
"open_time_ms": start + i * period,
|
|
"open": 104000 + i,
|
|
"high": 104100 + i,
|
|
"low": 103900 + i,
|
|
"close": 104050 + i,
|
|
"volume": 30,
|
|
}
|
|
)
|
|
return {"ok": True, "bars": full, "price_tick": 0.01}
|
|
|
|
out = resolve_chart_bars(
|
|
"binance",
|
|
"BTC/USDT",
|
|
"15m",
|
|
remote_fetch,
|
|
db_path=self.db,
|
|
limit=2000,
|
|
)
|
|
self.assertTrue(out.get("ok"))
|
|
self.assertGreater(len(fetch_calls), 0)
|
|
self.assertGreaterEqual(len(out.get("candles") or []), 100)
|
|
self.assertGreater(int(out.get("fetched") or 0), 0)
|
|
|
|
def test_clear_series_and_force_refetch(self):
|
|
init_db(self.db)
|
|
period = TIMEFRAME_MS["5m"]
|
|
now = int(time.time() * 1000)
|
|
stale = [
|
|
{
|
|
"open_time_ms": now - period * (i + 100),
|
|
"open": 1,
|
|
"high": 2,
|
|
"low": 0.5,
|
|
"close": 1.5,
|
|
"volume": 1,
|
|
}
|
|
for i in range(40)
|
|
]
|
|
upsert_bars("binance", "BTC/USDT", "5m", stale, self.db)
|
|
self.assertEqual(len(load_bars_latest("binance", "BTC/USDT", "5m", 100, self.db)), 40)
|
|
removed = clear_series_bars("binance", "BTC/USDT", "5m", self.db)
|
|
self.assertEqual(removed, 40)
|
|
self.assertEqual(len(load_bars_latest("binance", "BTC/USDT", "5m", 100, self.db)), 0)
|
|
|
|
fresh = [
|
|
{
|
|
"open_time_ms": now - period * (20 - i),
|
|
"open": 10,
|
|
"high": 11,
|
|
"low": 9,
|
|
"close": 10.5,
|
|
"volume": 2,
|
|
}
|
|
for i in range(20)
|
|
]
|
|
|
|
def remote_fetch(**kwargs):
|
|
return {"ok": True, "bars": fresh, "price_tick": 0.01}
|
|
|
|
out = resolve_chart_bars(
|
|
"binance",
|
|
"BTC/USDT",
|
|
"5m",
|
|
remote_fetch,
|
|
db_path=self.db,
|
|
force_refresh=True,
|
|
clear_db=True,
|
|
limit=50,
|
|
)
|
|
self.assertTrue(out.get("ok"))
|
|
self.assertGreaterEqual(int(out.get("cleared") or 0), 0)
|
|
self.assertGreater(int(out.get("fetched") or 0), 0)
|
|
self.assertGreaterEqual(len(out.get("candles") or []), 19)
|
|
|
|
def test_since_span_matches_fetch_limit_not_need(self):
|
|
period = TIMEFRAME_MS["15m"]
|
|
now_ms = 1_800_000_000_000
|
|
fetch_limit = HUB_KLINE_REMOTE_FETCH_CAP
|
|
since = _since_ms_for_span(
|
|
now_ms=now_ms,
|
|
period_ms=period,
|
|
span_bars=fetch_limit,
|
|
cutoff_ms=0,
|
|
)
|
|
self.assertEqual(since, now_ms - period * fetch_limit)
|
|
wrong_since = now_ms - period * chart_initial_limit("15m")
|
|
self.assertGreater(since, wrong_since)
|
|
|
|
def test_thin_series_tail_refresh_fetches_full_window(self):
|
|
init_db(self.db)
|
|
period = TIMEFRAME_MS["15m"]
|
|
now = int(time.time() * 1000)
|
|
last_closed = last_closed_bar_open_ms("15m", now)
|
|
bars = [
|
|
{
|
|
"open_time_ms": last_closed - period * (150 - i),
|
|
"open": 100000,
|
|
"high": 100100,
|
|
"low": 99900,
|
|
"close": 100050,
|
|
"volume": 1,
|
|
}
|
|
for i in range(150)
|
|
]
|
|
fetch_calls: list[dict] = []
|
|
|
|
def remote_fetch(**kwargs):
|
|
fetch_calls.append(dict(kwargs))
|
|
return {"ok": True, "bars": bars, "price_tick": 0.01}
|
|
|
|
out = resolve_chart_bars(
|
|
"binance",
|
|
"BTC/USDT",
|
|
"15m",
|
|
remote_fetch,
|
|
db_path=self.db,
|
|
tail_refresh=True,
|
|
)
|
|
self.assertTrue(out.get("ok"))
|
|
self.assertGreaterEqual(len(out.get("candles") or []), 100)
|
|
self.assertGreater(int(out.get("fetched") or 0), 0)
|
|
self.assertTrue(any(int(c.get("limit") or 0) > 30 for c in fetch_calls))
|
|
|
|
def test_resolve_before_ms_exhausted(self):
|
|
init_db(self.db)
|
|
|
|
def remote_fetch(**kwargs):
|
|
return {"ok": False, "msg": "no remote"}
|
|
|
|
out = resolve_chart_bars(
|
|
"okx",
|
|
"BTC/USDT",
|
|
"5m",
|
|
remote_fetch,
|
|
db_path=self.db,
|
|
limit=100,
|
|
before_ms=int(time.time() * 1000),
|
|
)
|
|
self.assertTrue(out.get("ok"))
|
|
self.assertEqual(out.get("candles"), [])
|
|
self.assertTrue(out.get("exhausted"))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|