diff --git a/hub_kline_store.py b/hub_kline_store.py index 3a15832..425910c 100644 --- a/hub_kline_store.py +++ b/hub_kline_store.py @@ -1,4 +1,4 @@ -"""中控 K 线 SQLite:分周期保留、本地聚合、分页读取。""" +"""中控 K 线 SQLite:分周期保留、交易所直拉、分页读取。""" from __future__ import annotations @@ -12,9 +12,7 @@ from hub_ohlcv_lib import ( HUB_KLINE_1M_MAX_BARS, HUB_KLINE_5M_1H_RETENTION_DAYS, TIMEFRAME_MS, - aggregate_ohlcv_bars, - aggregate_ratio, - aggregation_source_for_display, + YEAR_ROLLING_STORED, chart_chunk_limit, chart_initial_limit, chart_memory_cap, @@ -26,7 +24,6 @@ from hub_ohlcv_lib import ( retention_policy_meta, round_ohlcv_bars_to_tick, seed_bar_target, - sync_timeframe_for_display, ) _DEFAULT_RETENTION_DAYS = 15 @@ -200,10 +197,10 @@ def purge_1m_bar_cap(db_path: Path | None = None, *, max_bars: int | None = None def purge_retention(db_path: Path | None = None) -> int: - """按周期策略清理:5m/1h 一年;1m 保留最近 N 根;1d/1w 不删。""" + """按周期策略清理:5m/15m/1h/2h/4h 一年;1m 保留最近 N 根;1d/1w 不删。""" n = 0 - n += purge_timeframe_by_days("5m", HUB_KLINE_5M_1H_RETENTION_DAYS, db_path) - n += purge_timeframe_by_days("1h", HUB_KLINE_5M_1H_RETENTION_DAYS, db_path) + for tf in sorted(YEAR_ROLLING_STORED): + n += purge_timeframe_by_days(tf, HUB_KLINE_5M_1H_RETENTION_DAYS, db_path) n += purge_1m_bar_cap(db_path) return n @@ -400,19 +397,6 @@ def _trim_display_bars( return bars -def _aggregate_display_bars( - src_bars: list[dict[str, Any]], - display_tf: str, - *, - need: int, - before_ms: int | None, -) -> list[dict[str, Any]]: - if not src_bars: - return [] - agg = aggregate_ohlcv_bars(src_bars, display_tf) - return _trim_display_bars(agg, need=need, before_ms=before_ms) - - def resolve_chart_bars( exchange_key: str, symbol: str, @@ -427,7 +411,7 @@ def resolve_chart_bars( ) -> dict[str, Any]: """ 分页读库:首屏 / 左拖 before_ms / 尾部 tail_refresh。 - 15m←5m,2h/4h←1h 现场聚合;其余直读入库周期。 + 各展示周期均直读交易所同步入库的同名 K 线。 """ init_db(db_path) purged = purge_retention(db_path) @@ -438,8 +422,7 @@ def resolve_chart_bars( if not sym or not ex_k: return {"ok": False, "msg": "缺少 exchange 或 symbol"} - agg_src = aggregation_source_for_display(display_tf) - storage_tf = agg_src or sync_timeframe_for_display(display_tf) + storage_tf = display_tf is_history = before_ms is not None and int(before_ms) > 0 need = int( limit @@ -450,24 +433,14 @@ def resolve_chart_bars( now_ms = int(time.time() * 1000) period_display = TIMEFRAME_MS[display_tf] period_storage = TIMEFRAME_MS[storage_tf] - ratio = aggregate_ratio(display_tf, storage_tf) if agg_src else 1 if tail_refresh and not is_history: - need = min(need, max(30, ratio * 6 if agg_src else 20)) - src_need = need * ratio + ratio * 4 + need = min(need, 30) cutoff = history_cutoff_ms_for_storage(storage_tf, now_ms) - source_kind = "aggregate" if agg_src else "db" def load_display_rows() -> list[dict[str, Any]]: - if agg_src: - if is_history: - src = load_bars_before(ex_k, sym, storage_tf, int(before_ms), src_need, db_path) - else: - src = load_bars_latest(ex_k, sym, storage_tf, src_need, db_path) - return _aggregate_display_bars( - src, display_tf, need=need, before_ms=before_ms if is_history else None - ) if is_history: - return load_bars_before(ex_k, sym, storage_tf, int(before_ms), need, db_path) + rows = load_bars_before(ex_k, sym, storage_tf, int(before_ms), need, db_path) + return _trim_display_bars(rows, need=need, before_ms=int(before_ms)) return load_bars_latest(ex_k, sym, storage_tf, need, db_path) db_rows: list[dict[str, Any]] = [] @@ -498,20 +471,16 @@ def resolve_chart_bars( if is_history: bms = int(before_ms) anchor = bms - period_display - since = max(cutoff, anchor - period_storage * src_need) - fetch_limit = min(src_need + 20, 1500) + since = max(cutoff, anchor - period_storage * need) + fetch_limit = min(need + 20, 1500) elif tail_only: - if agg_src: - src_tail = load_bars_latest(ex_k, sym, storage_tf, 5, db_path) - anchor_ms = int(src_tail[-1]["open_time_ms"]) if src_tail else now_ms - else: - anchor_ms = int(newest_db) if newest_db is not None else now_ms - since = max(cutoff, anchor_ms - period_storage * max(5, ratio * 3)) - fetch_limit = min(max(20, ratio * 8), 300) + anchor_ms = int(newest_db) if newest_db is not None else now_ms + since = max(cutoff, anchor_ms - period_storage * 5) + fetch_limit = min(need + 20, 300) else: - since = max(cutoff, now_ms - period_storage * min(src_need, seed_bar_target(storage_tf))) + since = max(cutoff, now_ms - period_storage * min(need, seed_bar_target(storage_tf))) fetch_limit = min( - seed_bar_target(storage_tf) if force_refresh else src_need + 20, + seed_bar_target(storage_tf) if force_refresh else need + 20, 1500, ) @@ -527,8 +496,6 @@ def resolve_chart_bars( if price_tick is not None: save_symbol_price_tick(ex_k, sym, price_tick, db_path) db_rows = load_display_rows() - if fetched: - source_kind = "remote" if source_kind == "db" else source_kind else: remote_err = remote.get("msg") or remote.get("error") or "实例拉取 K 线失败" if not db_rows: @@ -589,7 +556,7 @@ def resolve_chart_bars( "oldest_ms": oldest_ms, "newest_ms": newest_ms, "exhausted": exhausted, - "source": "remote" if fetched else source_kind, + "source": "remote" if fetched else "db", "retention_policy": retention_policy_meta(), "candles": candles, "from_cache": from_cache, diff --git a/hub_ohlcv_lib.py b/hub_ohlcv_lib.py index 49ca062..98ca998 100644 --- a/hub_ohlcv_lib.py +++ b/hub_ohlcv_lib.py @@ -31,17 +31,13 @@ CHART_TIMEFRAME_ORDER = ( ) DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"}) -# 入库 / 同步真源(交易所拉取) -STORED_TIMEFRAMES = frozenset({"1m", "5m", "1h", "1d", "1w"}) +# 入库 / 同步真源(各周期直拉交易所,不做本地聚合) +STORED_TIMEFRAMES = frozenset(CHART_TIMEFRAMES) PERMANENT_STORED_TIMEFRAMES = frozenset({"1d", "1w"}) -YEAR_ROLLING_STORED = frozenset({"5m", "1h"}) +YEAR_ROLLING_STORED = frozenset({"5m", "15m", "1h", "2h", "4h"}) -# 展示周期 → 本地聚合源(不落库) -CHART_DISPLAY_AGGREGATE_FROM: dict[str, str] = { - "15m": "5m", - "2h": "1h", - "4h": "1h", -} +# 行情区不做展示周期聚合;保留空映射供兼容读取 +CHART_DISPLAY_AGGREGATE_FROM: dict[str, str] = {} SMALL_DISPLAY_TFS = frozenset({"1m", "5m", "15m"}) MID_DISPLAY_TFS = frozenset({"1h", "2h", "4h"}) @@ -151,13 +147,17 @@ def seed_bar_target(storage_tf: str) -> int: def retention_policy_meta() -> dict[str, Any]: + year = {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS} return { "1m": {"mode": "bars", "max_bars": HUB_KLINE_1M_MAX_BARS}, - "5m": {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS}, - "1h": {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS}, + "5m": dict(year), + "15m": dict(year), + "1h": dict(year), + "2h": dict(year), + "4h": dict(year), "1d": {"mode": "permanent"}, "1w": {"mode": "permanent"}, - "aggregate_from": dict(CHART_DISPLAY_AGGREGATE_FROM), + "aggregate_from": {}, } @@ -399,6 +399,68 @@ def align_bar_open_ms(open_time_ms: int, period_ms: int) -> int: return (int(open_time_ms) // period_ms) * period_ms +def snap_to_bar_grid(ts_ms: int, origin_ms: int, step_ms: int) -> int: + step = max(1, int(step_ms)) + origin = int(origin_ms) + if ts_ms <= origin: + return origin + idx = (int(ts_ms) - origin + step - 1) // step + return origin + idx * step + + +def fill_missing_ohlcv_bars( + bars: list[dict[str, Any]], + period_ms: int, + start_ms: int | None = None, + end_ms: int | None = None, +) -> list[dict[str, Any]]: + """细周期缺口用上一根收盘价填平,保证聚合后 K 线时间轴连续。""" + by_ts: dict[int, dict[str, Any]] = {} + for b in bars or []: + try: + by_ts[int(b["open_time_ms"])] = b + except (KeyError, TypeError, ValueError): + continue + if not by_ts: + return [] + keys = sorted(by_ts.keys()) + step_ms = max(1, int(period_ms)) + origin = keys[0] + aligned_start = snap_to_bar_grid( + int(start_ms if start_ms is not None else keys[0]), origin, step_ms + ) + aligned_end = max( + int(end_ms if end_ms is not None else keys[-1]), + keys[-1], + ) + out: list[dict[str, Any]] = [] + last: dict[str, Any] | None = None + for ts_key in keys: + if ts_key <= aligned_start: + last = by_ts[ts_key] + ts = aligned_start + while ts <= aligned_end: + cur = by_ts.get(ts) + if cur is not None: + last = cur + out.append(cur) + elif last is not None: + c = float(last["close"]) + out.append( + { + "open_time_ms": ts, + "open": c, + "high": c, + "low": c, + "close": c, + "volume": 0.0, + "filled": True, + } + ) + ts += step_ms + return out + + def aggregate_ohlcv_bars( bars: list[dict[str, Any]], target_timeframe: str ) -> list[dict[str, Any]]: diff --git a/tests/test_hub_kline_store.py b/tests/test_hub_kline_store.py index e4a6606..d923ce8 100644 --- a/tests/test_hub_kline_store.py +++ b/tests/test_hub_kline_store.py @@ -130,14 +130,14 @@ class TestHubKlineStore(unittest.TestCase): self.assertTrue(out.get("ok")) self.assertEqual(len(out.get("candles") or []), 300) - def test_resolve_15m_from_5m_aggregate(self): + def test_resolve_15m_reads_native_bars(self): init_db(self.db) now = int(time.time() * 1000) - period = TIMEFRAME_MS["5m"] - last_closed = last_closed_bar_open_ms("5m", now) + period = TIMEFRAME_MS["15m"] + last_closed = last_closed_bar_open_ms("15m", now) bars = [] - for i in range(30): - oms = last_closed - (29 - i) * period + for i in range(12): + oms = last_closed - (11 - i) * period bars.append( { "open_time_ms": oms, @@ -148,7 +148,7 @@ class TestHubKlineStore(unittest.TestCase): "volume": 10.0, } ) - upsert_bars("okx", "ETH/USDT", "5m", bars, self.db) + upsert_bars("okx", "ETH/USDT", "15m", bars, self.db) def remote_fetch(**kwargs): self.fail("不应请求交易所") @@ -159,11 +159,12 @@ class TestHubKlineStore(unittest.TestCase): "15m", remote_fetch, db_path=self.db, - limit=5, + limit=10, ) self.assertTrue(out.get("ok")) - self.assertEqual(out.get("source"), "aggregate") - self.assertGreaterEqual(len(out.get("candles") or []), 5) + self.assertEqual(out.get("source"), "db") + self.assertEqual(out.get("storage_timeframe"), "15m") + self.assertGreaterEqual(len(out.get("candles") or []), 10) def test_load_bars_before(self): init_db(self.db)