Files
crypto_monitor/tests/test_hub_kline_store.py
T

109 lines
3.1 KiB
Python

"""中控 K 线库:15 天滚动与按需合并。"""
from __future__ import annotations
import sqlite3
import tempfile
import unittest
from pathlib import Path
from hub_kline_store import (
bar_limit_for_timeframe,
load_bars_range,
purge_retention,
resolve_chart_bars,
retention_days,
upsert_bars,
window_start_ms,
)
from hub_ohlcv_lib import TIMEFRAME_MS, chart_fetch_start_ms, window_start_ms
class TestHubKlineStore(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.TemporaryDirectory()
self.db = Path(self.tmp.name) / "test_hub_kline.db"
def tearDown(self):
self.tmp.cleanup()
def test_bar_limits(self):
self.assertEqual(bar_limit_for_timeframe("5m"), 1000)
self.assertEqual(bar_limit_for_timeframe("1h"), 1000)
self.assertEqual(bar_limit_for_timeframe("1d"), 500)
self.assertEqual(bar_limit_for_timeframe("1w"), 500)
def test_chart_fetch_window_exceeds_retention(self):
import time
now = int(time.time() * 1000)
need = bar_limit_for_timeframe("1d")
fetch_start = chart_fetch_start_ms("1d", need, now)
db_start = window_start_ms("1d", need, retention_days(), now)
self.assertLess(fetch_start, db_start)
def test_purge_retention(self):
import time
from hub_kline_store import init_db
init_db(self.db)
old_ms = int(time.time() * 1000) - 20 * 86400000
upsert_bars(
"okx",
"BTC/USDT",
"5m",
[
{
"open_time_ms": old_ms,
"open": 1,
"high": 2,
"low": 0.5,
"close": 1.5,
"volume": 10,
}
],
self.db,
)
n = purge_retention(self.db, days=15)
self.assertGreaterEqual(n, 1)
rows = load_bars_range("okx", "BTC/USDT", "5m", old_ms - 1, old_ms + 1, self.db)
self.assertEqual(len(rows), 0)
def test_resolve_uses_cache_without_remote(self):
import time
from hub_kline_store import init_db
from hub_ohlcv_lib import last_closed_bar_open_ms
init_db(self.db)
now = int(time.time() * 1000)
tf = "5m"
period = TIMEFRAME_MS[tf]
last_closed = last_closed_bar_open_ms(tf, now)
bars = []
for i in range(1000):
oms = last_closed - (999 - i) * period
bars.append(
{
"open_time_ms": oms,
"open": 100 + i,
"high": 101 + i,
"low": 99 + i,
"close": 100.5 + i,
"volume": 1000 + i,
}
)
upsert_bars("okx", "ETH/USDT", tf, bars, self.db)
def remote_fetch(**kwargs):
self.fail("不应请求交易所")
out = resolve_chart_bars("okx", "ETH/USDT", tf, remote_fetch, db_path=self.db)
self.assertTrue(out.get("ok"))
self.assertGreaterEqual(len(out.get("candles") or []), 1000)
if __name__ == "__main__":
unittest.main()