From 11cc482599869c3980c32d8e584973b4897607f9 Mon Sep 17 00:00:00 2001 From: dekun Date: Mon, 8 Jun 2026 07:27:16 +0800 Subject: [PATCH] Refactor market K-line storage with tiered retention and chunked loading. Store 1m/5m/1h/12h/1d/1w with per-timeframe policies, aggregate 15m and 2h/4h on read, and support left-pan history fetches via before_ms. Co-authored-by: Cursor --- hub_kline_store.py | 340 +++++++++++++++++++++++------ hub_ohlcv_lib.py | 112 +++++++++- manual_trading_hub/hub.py | 33 ++- manual_trading_hub/static/chart.js | 268 +++++++++++++++++++---- tests/test_hub_kline_store.py | 157 ++++++++++--- 5 files changed, 762 insertions(+), 148 deletions(-) diff --git a/hub_kline_store.py b/hub_kline_store.py index 9413894..cd43345 100644 --- a/hub_kline_store.py +++ b/hub_kline_store.py @@ -1,4 +1,4 @@ -"""中控 K 线 SQLite 缓存:按需拉取、15 天滚动保留。""" +"""中控 K 线 SQLite:分周期保留、本地聚合、分页读取。""" from __future__ import annotations @@ -9,21 +9,31 @@ from pathlib import Path from typing import Any, Callable, Optional from hub_ohlcv_lib import ( + HUB_KLINE_1M_MAX_BARS, + HUB_KLINE_5M_1H_RETENTION_DAYS, TIMEFRAME_MS, - bar_limit_for_timeframe, - chart_fetch_start_ms, - format_price_by_tick, - last_closed_bar_open_ms, + aggregate_ohlcv_bars, + aggregate_ratio, + aggregation_source_for_display, + chart_chunk_limit, + chart_initial_limit, + chart_memory_cap, + history_cutoff_ms_for_storage, normalize_chart_timeframe, normalize_price_tick, + format_price_by_tick, + last_closed_bar_open_ms, + retention_policy_meta, round_ohlcv_bars_to_tick, - window_start_ms, + seed_bar_target, + sync_timeframe_for_display, ) _DEFAULT_RETENTION_DAYS = 15 def retention_days() -> int: + """兼容旧配置;新策略见 retention_policy_meta。""" try: return max(1, int(os.getenv("HUB_KLINE_RETENTION_DAYS", str(_DEFAULT_RETENTION_DAYS)))) except ValueError: @@ -145,18 +155,59 @@ def load_symbol_price_tick( conn.close() -def purge_retention(db_path: Path | None = None, *, days: int | None = None) -> int: - """删除早于 retention 的 K 线;返回删除行数。""" - keep = days if days is not None else retention_days() - cutoff = int(time.time() * 1000) - keep * 86400000 +def purge_timeframe_by_days( + timeframe: str, + days: int, + db_path: Path | None = None, +) -> int: + cutoff = int(time.time() * 1000) - max(1, int(days)) * 86400000 + tf = normalize_chart_timeframe(timeframe) conn = _connect(db_path) try: - cur = conn.execute("DELETE FROM ohlcv_bars WHERE open_time_ms < ?", (cutoff,)) + cur = conn.execute( + "DELETE FROM ohlcv_bars WHERE timeframe=? AND open_time_ms < ?", + (tf, cutoff), + ) return int(cur.rowcount or 0) finally: conn.close() +def purge_1m_bar_cap(db_path: Path | None = None, *, max_bars: int | None = None) -> int: + cap = max(100, int(max_bars or HUB_KLINE_1M_MAX_BARS)) + conn = _connect(db_path) + try: + cur = conn.execute( + """ + DELETE FROM ohlcv_bars + WHERE timeframe='1m' AND rowid IN ( + SELECT rowid FROM ( + SELECT rowid, + ROW_NUMBER() OVER ( + PARTITION BY exchange_key, symbol + ORDER BY open_time_ms DESC + ) AS rn + FROM ohlcv_bars + WHERE timeframe='1m' + ) WHERE rn > ? + ) + """, + (cap,), + ) + return int(cur.rowcount or 0) + finally: + conn.close() + + +def purge_retention(db_path: Path | None = None) -> int: + """按周期策略清理:5m/1h 一年;1m 保留最近 N 根;12h/1d/1w 不删。""" + n = 0 + n += purge_timeframe_by_days("5m", HUB_KLINE_5M_1H_RETENTION_DAYS, db_path) + n += purge_timeframe_by_days("1h", HUB_KLINE_5M_1H_RETENTION_DAYS, db_path) + n += purge_1m_bar_cap(db_path) + return n + + def upsert_bars( exchange_key: str, symbol: str, @@ -233,21 +284,84 @@ def load_bars_range( """, (ex_k, sym, tf, int(start_ms), int(end_ms)), ).fetchall() - return [ - { - "open_time_ms": int(r["open_time_ms"]), - "open": float(r["open"]), - "high": float(r["high"]), - "low": float(r["low"]), - "close": float(r["close"]), - "volume": float(r["volume"] or 0), - } - for r in rows - ] + return _rows_to_bars(rows) finally: conn.close() +def load_bars_latest( + exchange_key: str, + symbol: str, + timeframe: str, + limit: int, + db_path: Path | None = None, +) -> list[dict[str, Any]]: + ex_k = (exchange_key or "").strip().lower() + sym = (symbol or "").strip().upper() + tf = normalize_chart_timeframe(timeframe) + lim = max(1, int(limit)) + conn = _connect(db_path) + try: + rows = conn.execute( + """ + SELECT open_time_ms, open, high, low, close, volume + FROM ohlcv_bars + WHERE exchange_key=? AND symbol=? AND timeframe=? + ORDER BY open_time_ms DESC + LIMIT ? + """, + (ex_k, sym, tf, lim), + ).fetchall() + return list(reversed(_rows_to_bars(rows))) + finally: + conn.close() + + +def load_bars_before( + exchange_key: str, + symbol: str, + timeframe: str, + before_ms: int, + limit: int, + db_path: Path | None = None, +) -> list[dict[str, Any]]: + ex_k = (exchange_key or "").strip().lower() + sym = (symbol or "").strip().upper() + tf = normalize_chart_timeframe(timeframe) + lim = max(1, int(limit)) + bms = int(before_ms) + conn = _connect(db_path) + try: + rows = conn.execute( + """ + SELECT open_time_ms, open, high, low, close, volume + FROM ohlcv_bars + WHERE exchange_key=? AND symbol=? AND timeframe=? + AND open_time_ms < ? + ORDER BY open_time_ms DESC + LIMIT ? + """, + (ex_k, sym, tf, bms, lim), + ).fetchall() + return list(reversed(_rows_to_bars(rows))) + finally: + conn.close() + + +def _rows_to_bars(rows) -> list[dict[str, Any]]: + return [ + { + "open_time_ms": int(r["open_time_ms"]), + "open": float(r["open"]), + "high": float(r["high"]), + "low": float(r["low"]), + "close": float(r["close"]), + "volume": float(r["volume"] or 0), + } + for r in rows + ] + + def _to_chart_candles(bars: list[dict[str, Any]]) -> list[dict[str, Any]]: out = [] for b in bars: @@ -267,15 +381,36 @@ def _to_chart_candles(bars: list[dict[str, Any]]) -> list[dict[str, Any]]: return out -def _merge_bars(*groups: list[dict[str, Any]]) -> list[dict[str, Any]]: - merged: dict[int, dict[str, Any]] = {} - for g in groups: - for b in g or []: - try: - merged[int(b["open_time_ms"])] = b - except (KeyError, TypeError, ValueError): - continue - return [merged[k] for k in sorted(merged.keys())] +def _trim_display_bars( + bars: list[dict[str, Any]], + *, + need: int, + before_ms: int | None, +) -> list[dict[str, Any]]: + if not bars: + return [] + if before_ms is not None and int(before_ms) > 0: + bms = int(before_ms) + bars = [b for b in bars if int(b["open_time_ms"]) < bms] + if len(bars) > need: + bars = bars[-need:] + return bars + if len(bars) > need: + bars = bars[-need:] + return bars + + +def _aggregate_display_bars( + src_bars: list[dict[str, Any]], + display_tf: str, + *, + need: int, + before_ms: int | None, +) -> list[dict[str, Any]]: + if not src_bars: + return [] + agg = aggregate_ohlcv_bars(src_bars, display_tf) + return _trim_display_bars(agg, need=need, before_ms=before_ms) def resolve_chart_bars( @@ -287,39 +422,71 @@ def resolve_chart_bars( db_path: Path | None = None, force_refresh: bool = False, tail_refresh: bool = False, + limit: int | None = None, + before_ms: int | None = None, ) -> dict[str, Any]: """ - 按需:先读库,不足则 remote_fetch(symbol, timeframe, since_ms, limit) 补齐并写库。 - tail_refresh=True 时即使库内「够新」也增量拉取尾部 K 线(未收盘 K 的 OHLC 更新)。 + 分页读库:首屏 / 左拖 before_ms / 尾部 tail_refresh。 + 15m←5m,2h/4h←1h 现场聚合;其余直读入库周期。 """ init_db(db_path) purged = purge_retention(db_path) sym = (symbol or "").strip().upper() ex_k = (exchange_key or "").strip().lower() - tf = normalize_chart_timeframe(timeframe) + display_tf = normalize_chart_timeframe(timeframe) if not sym or not ex_k: return {"ok": False, "msg": "缺少 exchange 或 symbol"} - need = bar_limit_for_timeframe(tf) + agg_src = aggregation_source_for_display(display_tf) + storage_tf = agg_src or sync_timeframe_for_display(display_tf) + is_history = before_ms is not None and int(before_ms) > 0 + need = int( + limit + or (chart_chunk_limit(display_tf) if is_history else chart_initial_limit(display_tf)) + ) + need = max(1, min(need, chart_memory_cap(display_tf))) + now_ms = int(time.time() * 1000) - fetch_start_ms = chart_fetch_start_ms(tf, need, now_ms) - db_read_start_ms = window_start_ms(tf, need, retention_days(), now_ms) - last_closed = last_closed_bar_open_ms(tf, now_ms) + period_display = TIMEFRAME_MS[display_tf] + period_storage = TIMEFRAME_MS[storage_tf] + ratio = aggregate_ratio(display_tf, storage_tf) if agg_src else 1 + if tail_refresh and not is_history: + need = min(need, max(30, ratio * 6 if agg_src else 20)) + src_need = need * ratio + ratio * 4 + cutoff = history_cutoff_ms_for_storage(storage_tf, now_ms) + source_kind = "aggregate" if agg_src else "db" + + def load_display_rows() -> list[dict[str, Any]]: + if agg_src: + if is_history: + src = load_bars_before(ex_k, sym, storage_tf, int(before_ms), src_need, db_path) + else: + src = load_bars_latest(ex_k, sym, storage_tf, src_need, db_path) + return _aggregate_display_bars( + src, display_tf, need=need, before_ms=before_ms if is_history else None + ) + if is_history: + return load_bars_before(ex_k, sym, storage_tf, int(before_ms), need, db_path) + return load_bars_latest(ex_k, sym, storage_tf, need, db_path) db_rows: list[dict[str, Any]] = [] if not force_refresh: - period_ms = TIMEFRAME_MS[tf] - db_rows = load_bars_range( - ex_k, sym, tf, max(0, db_read_start_ms - period_ms), now_ms + period_ms, db_path - ) + db_rows = load_display_rows() + last_closed = last_closed_bar_open_ms(display_tf, now_ms) newest_db = db_rows[-1]["open_time_ms"] if db_rows else None - period_ms = TIMEFRAME_MS[tf] - newest_ok = newest_db is not None and int(newest_db) >= int(last_closed) - period_ms - need_fetch = force_refresh or len(db_rows) < need or not newest_ok + if is_history: + newest_ok = True + else: + newest_ok = newest_db is not None and int(newest_db) >= int(last_closed) - period_display + + need_fetch = force_refresh or (not is_history and (len(db_rows) < need or not newest_ok)) + if is_history and len(db_rows) < need: + need_fetch = True + tail_only = False - if tail_refresh and db_rows and not force_refresh and not need_fetch: + if tail_refresh and not is_history and db_rows and not force_refresh and not need_fetch: need_fetch = True tail_only = True @@ -328,44 +495,66 @@ def resolve_chart_bars( remote_err: Optional[str] = None if need_fetch: - since = fetch_start_ms - if tail_only and newest_db is not None: - since = max(0, int(newest_db) - period_ms * 3) - # 仅当库内根数已够且缺口在尾部时做增量拉取;否则全量回看,避免 Gate from>to - elif ( - db_rows - and not force_refresh - and newest_ok - and len(db_rows) >= need - ): - since = max(0, int(newest_db) - period_ms * 2) + if is_history: + bms = int(before_ms) + anchor = bms - period_display + since = max(cutoff, anchor - period_storage * src_need) + fetch_limit = min(src_need + 20, 1500) + elif tail_only: + if agg_src: + src_tail = load_bars_latest(ex_k, sym, storage_tf, 5, db_path) + anchor_ms = int(src_tail[-1]["open_time_ms"]) if src_tail else now_ms + else: + anchor_ms = int(newest_db) if newest_db is not None else now_ms + since = max(cutoff, anchor_ms - period_storage * max(5, ratio * 3)) + fetch_limit = min(max(20, ratio * 8), 300) + else: + since = max(cutoff, now_ms - period_storage * min(src_need, seed_bar_target(storage_tf))) + fetch_limit = min( + seed_bar_target(storage_tf) if force_refresh else src_need + 20, + 1500, + ) + remote = remote_fetch( symbol=sym, - timeframe=tf, + timeframe=storage_tf, since_ms=since, - limit=need + 20, + limit=fetch_limit, ) if remote.get("ok") and remote.get("bars"): - fetched = upsert_bars(ex_k, sym, tf, remote["bars"], db_path) + fetched = upsert_bars(ex_k, sym, storage_tf, remote["bars"], db_path) price_tick = remote.get("price_tick") if price_tick is not None: save_symbol_price_tick(ex_k, sym, price_tick, db_path) - db_rows = load_bars_range(ex_k, sym, tf, fetch_start_ms, now_ms, db_path) + db_rows = load_display_rows() + if fetched: + source_kind = "remote" if source_kind == "db" else source_kind else: remote_err = remote.get("msg") or remote.get("error") or "实例拉取 K 线失败" if not db_rows: - return {"ok": False, "msg": remote_err, "purged": purged} + if is_history: + exhausted = True + else: + return {"ok": False, "msg": remote_err, "purged": purged} - if len(db_rows) > need: - db_rows = db_rows[-need:] + exhausted = False + if is_history: + if not db_rows: + exhausted = True + elif len(db_rows) < need: + oldest = int(db_rows[0]["open_time_ms"]) + if cutoff > 0 and oldest <= cutoff + period_storage: + exhausted = True + elif fetched == 0: + exhausted = True if price_tick is None: price_tick = load_symbol_price_tick(ex_k, sym, db_path) - if price_tick is None: + if price_tick is None and not is_history: try: tick_probe = remote_fetch( symbol=sym, - timeframe=tf, + timeframe=storage_tf, since_ms=None, limit=3, ) @@ -381,20 +570,27 @@ def resolve_chart_bars( round_ohlcv_bars_to_tick(db_rows, price_tick) candles = _to_chart_candles(db_rows) - if not candles: + if not is_history and not candles and not exhausted: return {"ok": False, "msg": remote_err or "无 K 线数据", "purged": purged} - from_cache = max(0, len(candles) - (1 if fetched else 0)) - if fetched: - from_cache = max(0, len(candles) - min(fetched, len(candles))) + oldest_ms = int(db_rows[0]["open_time_ms"]) if db_rows else None + newest_ms = int(db_rows[-1]["open_time_ms"]) if db_rows else None + + from_cache = max(0, len(candles) - min(fetched, len(candles))) if fetched else len(candles) return { "ok": True, "symbol": sym, "exchange_key": ex_k, - "timeframe": tf, + "timeframe": display_tf, + "storage_timeframe": storage_tf, "limit": need, - "retention_days": retention_days(), + "before_ms": int(before_ms) if is_history else None, + "oldest_ms": oldest_ms, + "newest_ms": newest_ms, + "exhausted": exhausted, + "source": "remote" if fetched else source_kind, + "retention_policy": retention_policy_meta(), "candles": candles, "from_cache": from_cache, "fetched": fetched, diff --git a/hub_ohlcv_lib.py b/hub_ohlcv_lib.py index a282745..3420571 100644 --- a/hub_ohlcv_lib.py +++ b/hub_ohlcv_lib.py @@ -3,6 +3,7 @@ from __future__ import annotations import math +import os import time from typing import Any, Callable, Optional @@ -32,7 +33,26 @@ CHART_TIMEFRAME_ORDER = ( ) DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"}) -# 部分交易所 ccxt 无原生 12h,或原生 K 线间隔异常时从 1h 聚合 +# 入库 / 同步真源(交易所拉取) +STORED_TIMEFRAMES = frozenset({"1m", "5m", "1h", "12h", "1d", "1w"}) +PERMANENT_STORED_TIMEFRAMES = frozenset({"12h", "1d", "1w"}) +YEAR_ROLLING_STORED = frozenset({"5m", "1h"}) + +# 展示周期 → 本地聚合源(不落库) +CHART_DISPLAY_AGGREGATE_FROM: dict[str, str] = { + "15m": "5m", + "2h": "1h", + "4h": "1h", +} + +SMALL_DISPLAY_TFS = frozenset({"1m", "5m", "15m"}) +MID_DISPLAY_TFS = frozenset({"1h", "2h", "4h"}) + +HUB_KLINE_1M_MAX_BARS = max(1000, int(os.getenv("HUB_KLINE_1M_MAX_BARS", "10000"))) +HUB_KLINE_5M_1H_RETENTION_DAYS = max(30, int(os.getenv("HUB_KLINE_5M_1H_RETENTION_DAYS", "365"))) +HUB_KLINE_SEED_BARS = max(100, int(os.getenv("HUB_KLINE_SEED_BARS", "500"))) + +# 部分交易所 ccxt 无原生 12h,或原生 K 线间隔异常时从 1h 聚合(仅远程拉取 fallback) OHLCV_AGGREGATE_FROM: dict[str, str] = { "12h": "1h", } @@ -55,9 +75,95 @@ def normalize_chart_timeframe(raw: str | None, default: str = "5m") -> str: return tf if tf in CHART_TIMEFRAMES else default -def bar_limit_for_timeframe(timeframe: str) -> int: +def sync_timeframe_for_display(timeframe: str) -> str: + """展示周期对应的入库 / 同步周期。""" tf = normalize_chart_timeframe(timeframe) - return 500 if tf in DAILY_PLUS_TIMEFRAMES else 1000 + return CHART_DISPLAY_AGGREGATE_FROM.get(tf, tf) + + +def aggregation_source_for_display(timeframe: str) -> str | None: + tf = normalize_chart_timeframe(timeframe) + return CHART_DISPLAY_AGGREGATE_FROM.get(tf) + + +def aggregate_ratio(display_tf: str, source_tf: str) -> int: + d = normalize_chart_timeframe(display_tf) + s = normalize_chart_timeframe(source_tf) + return max(1, int(TIMEFRAME_MS[d] // TIMEFRAME_MS[s])) + + +def chart_initial_limit(timeframe: str) -> int: + tf = normalize_chart_timeframe(timeframe) + if tf in SMALL_DISPLAY_TFS: + return 300 + if tf == "1w": + return 150 + return 200 + + +def chart_chunk_limit(timeframe: str) -> int: + tf = normalize_chart_timeframe(timeframe) + if tf in SMALL_DISPLAY_TFS: + return 500 + if tf == "1w": + return 150 + if tf in MID_DISPLAY_TFS: + return 300 + return 200 + + +def chart_memory_cap(timeframe: str) -> int: + tf = normalize_chart_timeframe(timeframe) + if tf in SMALL_DISPLAY_TFS: + return 5000 + if tf == "1w": + return 500 + return 1000 + + +def bar_limit_for_timeframe(timeframe: str) -> int: + return chart_memory_cap(timeframe) + + +def storage_retention_days(storage_tf: str) -> int | None: + """None 表示不按天截断(1m 按根数;12h/1d/1w 永久)。""" + tf = normalize_chart_timeframe(storage_tf) + if tf in YEAR_ROLLING_STORED: + return HUB_KLINE_5M_1H_RETENTION_DAYS + return None + + +def history_cutoff_ms_for_storage(storage_tf: str, now_ms: int | None = None) -> int: + days = storage_retention_days(storage_tf) + if days is None: + return 0 + now = int(now_ms if now_ms is not None else time.time() * 1000) + return max(0, now - int(days) * 86400000) + + +def seed_bar_target(storage_tf: str) -> int: + tf = normalize_chart_timeframe(storage_tf) + if tf == "1m": + return HUB_KLINE_1M_MAX_BARS + if tf in YEAR_ROLLING_STORED: + period = TIMEFRAME_MS[tf] + return min( + int(86400000 * HUB_KLINE_5M_1H_RETENTION_DAYS / period) + 20, + 150000, + ) + return HUB_KLINE_SEED_BARS + + +def retention_policy_meta() -> dict[str, Any]: + return { + "1m": {"mode": "bars", "max_bars": HUB_KLINE_1M_MAX_BARS}, + "5m": {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS}, + "1h": {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS}, + "12h": {"mode": "permanent"}, + "1d": {"mode": "permanent"}, + "1w": {"mode": "permanent"}, + "aggregate_from": dict(CHART_DISPLAY_AGGREGATE_FROM), + } def last_closed_bar_open_ms(timeframe: str, now_ms: int | None = None) -> int: diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index ac33262..63fc8f0 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -15,7 +15,15 @@ if str(_REPO_ROOT) not in sys.path: sys.path.insert(0, str(_REPO_ROOT)) from hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days -from hub_ohlcv_lib import CHART_TIMEFRAME_ORDER, CHART_TIMEFRAMES, bar_limit_for_timeframe +from hub_ohlcv_lib import ( + CHART_TIMEFRAME_ORDER, + CHART_TIMEFRAMES, + bar_limit_for_timeframe, + chart_chunk_limit, + chart_initial_limit, + chart_memory_cap, + retention_policy_meta, +) from hub_symbol_archive_lib import ( ARCHIVE_DEFAULT_TIMEFRAME, ARCHIVE_SEED_LOOKBACK_DAYS, @@ -629,7 +637,11 @@ def api_chart_meta(): "ok": True, "timeframes": [tf for tf in tfs if tf in CHART_TIMEFRAMES], "retention_days": retention_days(), + "retention_policy": retention_policy_meta(), "limits": {tf: bar_limit_for_timeframe(tf) for tf in tfs if tf in CHART_TIMEFRAMES}, + "initial_limits": {tf: chart_initial_limit(tf) for tf in tfs if tf in CHART_TIMEFRAMES}, + "chunk_limits": {tf: chart_chunk_limit(tf) for tf in tfs if tf in CHART_TIMEFRAMES}, + "memory_caps": {tf: chart_memory_cap(tf) for tf in tfs if tf in CHART_TIMEFRAMES}, "exchanges": exchanges, } @@ -640,6 +652,8 @@ def api_chart_ohlcv( symbol: str = "", timeframe: str = "1d", refresh: str = "", + limit: int = 0, + before_ms: str = "", ): ex = _find_exchange_by_key(exchange_key) if not ex: @@ -651,14 +665,23 @@ def api_chart_ohlcv( raise HTTPException(status_code=400, detail="请输入币种") ex_key = str(ex.get("key") or "").strip().lower() force = (refresh or "").strip().lower() in ("1", "true", "yes", "on") + lim = int(limit) if int(limit or 0) > 0 else None + bms_raw = (before_ms or "").strip() + bms = None + if bms_raw: + try: + bms = int(bms_raw) + except ValueError: + raise HTTPException(status_code=400, detail="before_ms 无效") def remote_fetch(**kwargs): + tf_use = kwargs.get("timeframe") or timeframe return _fetch_instance_ohlcv_sync( ex, symbol=kwargs.get("symbol") or sym, - timeframe=kwargs.get("timeframe") or timeframe, + timeframe=tf_use, since_ms=kwargs.get("since_ms"), - limit=int(kwargs.get("limit") or bar_limit_for_timeframe(timeframe)), + limit=int(kwargs.get("limit") or bar_limit_for_timeframe(tf_use)), ) result = resolve_chart_bars( @@ -667,9 +690,13 @@ def api_chart_ohlcv( timeframe, remote_fetch, force_refresh=force, + limit=lim, + before_ms=bms, ) if not result.get("ok"): raise HTTPException(status_code=502, detail=result.get("msg") or "K线加载失败") + if not result.get("candles") and result.get("before_ms") is None: + raise HTTPException(status_code=502, detail=result.get("msg") or "无 K 线") tick = result.get("price_tick") last = result["candles"][-1] if result.get("candles") else None result["ohlcv"] = format_ohlcv_detail( diff --git a/manual_trading_hub/static/chart.js b/manual_trading_hub/static/chart.js index 2fe4541..bd1a1f3 100644 --- a/manual_trading_hub/static/chart.js +++ b/manual_trading_hub/static/chart.js @@ -6,6 +6,40 @@ const CHART_WATCH_HEARTBEAT_MS = 25000; const CHART_SSE_FALLBACK_MS = 30000; const DEFAULT_VISIBLE_BARS = 200; + const CHART_LOAD_LEFT_THRESHOLD = 25; + const CHART_INITIAL_LIMITS = { + "1m": 300, + "5m": 300, + "15m": 300, + "1h": 200, + "2h": 200, + "4h": 200, + "12h": 200, + "1d": 200, + "1w": 150, + }; + const CHART_CHUNK_LIMITS = { + "1m": 500, + "5m": 500, + "15m": 500, + "1h": 300, + "2h": 300, + "4h": 300, + "12h": 200, + "1d": 200, + "1w": 150, + }; + const CHART_MEMORY_CAPS = { + "1m": 5000, + "5m": 5000, + "15m": 5000, + "1h": 1000, + "2h": 1000, + "4h": 1000, + "12h": 1000, + "1d": 1000, + "1w": 500, + }; const RIGHT_OFFSET_BARS = 10; const CANDLE_SCALE_BOTTOM = 0.26; const VOLUME_SCALE_TOP = 0.73; @@ -141,6 +175,8 @@ let localSeriesVersion = 0; let lastViewKey = ""; let currentTf = "1d"; + let exhaustedLeft = false; + let loadingLeft = false; let priceTagTimer = null; let tfDigitBuf = ""; let tfDigitTimer = null; @@ -1914,9 +1950,13 @@ paintOhlcv(bar); }); - chart.timeScale().subscribeVisibleLogicalRangeChange(function () { + chart.timeScale().subscribeVisibleLogicalRangeChange(function (range) { updateVisibleRangeMarkers(); updatePriceTag(); + if (!range || loadingLeft || exhaustedLeft || !lastCandles.length) return; + if (range.from < CHART_LOAD_LEFT_THRESHOLD) { + void loadOlderCandles(); + } }); window.addEventListener("resize", function () { @@ -1939,6 +1979,169 @@ return (exKey || "") + "|" + (sym || "") + "|" + (tf || ""); } + function chartInitialLimit(tf) { + return CHART_INITIAL_LIMITS[tf] || 200; + } + + function chartChunkLimit(tf) { + return CHART_CHUNK_LIMITS[tf] || 200; + } + + function chartMemoryCap(tf) { + return CHART_MEMORY_CAPS[tf] || 1000; + } + + function resetChartHistoryState() { + exhaustedLeft = false; + loadingLeft = false; + } + + function mergeCandles(existing, incoming, opts) { + opts = opts || {}; + const prepend = !!opts.prepend; + const byTime = {}; + (existing || []).forEach(function (c) { + if (c && c.time != null) byTime[c.time] = c; + }); + (incoming || []).forEach(function (c) { + if (c && c.time != null) byTime[c.time] = c; + }); + let merged = Object.keys(byTime) + .map(function (t) { + return Number(t); + }) + .sort(function (a, b) { + return a - b; + }) + .map(function (t) { + return byTime[t]; + }); + const cap = chartMemoryCap(currentTf); + if (merged.length > cap) { + merged = prepend ? merged.slice(0, cap) : merged.slice(-cap); + } + return merged; + } + + function applyCandlesToChart(candles, rangeShift) { + lastCandles = alignCandlesToTick(candles); + indexCandles(lastCandles); + candleSeries.setData(lastCandles); + volumeSeries.setData(buildVolumeData(lastCandles)); + applyChartRightGap(); + if (rangeShift && chart) { + const range = chart.timeScale().getVisibleLogicalRange(); + if (range) { + chart.timeScale().setVisibleLogicalRange({ + from: range.from + rangeShift, + to: range.to + rangeShift, + }); + } + } + applyPriceAutoScale(); + updateVisibleRangeMarkers(); + try { + updateIndicators(); + } catch (indErr) {} + showLatestOhlcv(); + } + + async function fetchChartChunk(params) { + const qs = new URLSearchParams({ + exchange_key: params.exchange_key, + symbol: params.symbol, + timeframe: params.timeframe, + limit: String(params.limit), + }); + if (params.before_ms) qs.set("before_ms", String(params.before_ms)); + if (params.refresh) qs.set("refresh", "1"); + const r = await fetch("/api/chart/ohlcv?" + qs.toString(), { credentials: "same-origin" }); + const data = await r.json(); + if (!r.ok) { + throw new Error(data.detail || data.msg || "请求失败"); + } + return data; + } + + async function loadOlderCandles() { + if (loadingLeft || exhaustedLeft || !lastCandles.length) return; + const exKey = (elExchange && elExchange.value) || ""; + const sym = (elSymbol && elSymbol.value.trim().toUpperCase()) || ""; + const tf = (elTf && elTf.value) || "1d"; + if (!exKey || !sym) return; + loadingLeft = true; + const beforeMs = Number(lastCandles[0].time) * 1000; + try { + const data = await fetchChartChunk({ + exchange_key: exKey, + symbol: sym, + timeframe: tf, + limit: chartChunkLimit(tf), + before_ms: beforeMs, + }); + if (data.exhausted) exhaustedLeft = true; + const incoming = alignCandlesToTick(data.candles || []); + if (!incoming.length) return; + const shift = incoming.length; + applyCandlesToChart(mergeCandles(lastCandles, incoming, { prepend: true }), shift); + if (elStatus && !elStatus.classList.contains("err")) { + elStatus.textContent = + "已加载 " + + lastCandles.length + + " 根(向左 +" + + incoming.length + + (exhaustedLeft ? " · 已到最早" : "") + + ")"; + } + } catch (e) { + if (elStatus) { + elStatus.className = "market-status warn"; + elStatus.textContent = "加载更早 K 线失败:" + String(e.message || e); + } + } finally { + loadingLeft = false; + } + } + + async function refreshChartTail() { + const exKey = (elExchange && elExchange.value) || ""; + const sym = (elSymbol && elSymbol.value.trim().toUpperCase()) || ""; + const tf = (elTf && elTf.value) || "1d"; + if (!exKey || !sym || !lastCandles.length) return; + const myToken = loadToken; + let savedRange = null; + if (chart) savedRange = chart.timeScale().getVisibleLogicalRange(); + try { + const data = await fetchChartChunk({ + exchange_key: exKey, + symbol: sym, + timeframe: tf, + limit: chartChunkLimit(tf), + }); + if (myToken !== loadToken) return; + if (!data.ok || !data.candles || !data.candles.length) return; + if (data.price_tick != null) { + priceTick = data.price_tick; + try { + applyChartPriceFormat(); + } catch (fmtErr) { + priceTick = null; + applyChartPriceFormat(); + } + } + applyCandlesToChart(mergeCandles(lastCandles, alignCandlesToTick(data.candles), { prepend: false }), 0); + if (savedRange) chart.timeScale().setVisibleLogicalRange(savedRange); + if (posContext) { + updateLivePosPnl(); + refreshPosPnlFromBoard(); + } + if (data.series_version != null) localSeriesVersion = Number(data.series_version) || localSeriesVersion; + if (data.chart_version != null) localChartVersion = Number(data.chart_version) || localChartVersion; + if (elUpdated) elUpdated.textContent = "数据 " + (data.updated_at || "--"); + tickLiveClock(); + } catch (_) {} + } + function applyChartRightGap() { if (!chart) return; chart.timeScale().applyOptions({ @@ -2073,7 +2276,7 @@ if (seriesChanged) { localSeriesVersion = sVer; localChartVersion = ver; - loadChart(false, { autoTick: true }); + refreshChartTail(); } else if (posContext) { updateLivePosPnl(); } else if (ver !== localChartVersion) { @@ -2111,7 +2314,7 @@ refreshTimer = setInterval(function () { const page = document.getElementById("page-market"); if (!page || page.classList.contains("hidden")) return; - loadChart(false, { autoTick: true }); + refreshChartTail(); }, CHART_SSE_FALLBACK_MS); } @@ -2151,10 +2354,11 @@ async function loadChart(force, options) { options = options || {}; const autoTick = !!options.autoTick; - if (!autoTick) { - localSeriesVersion = 0; - void postChartWatch(); + if (autoTick) { + return refreshChartTail(); } + localSeriesVersion = 0; + void postChartWatch(); if (!ensureChart()) return; const exKey = (elExchange && elExchange.value) || ""; const sym = (elSymbol && elSymbol.value.trim().toUpperCase()) || ""; @@ -2169,31 +2373,23 @@ } const myToken = ++loadToken; const vKey = viewKey(exKey, sym, tf); - const resetView = !!force || !autoTick || vKey !== lastViewKey; - let savedRange = null; - if (!resetView && chart) { - savedRange = chart.timeScale().getVisibleLogicalRange(); - } - if (!autoTick && elStatus) { + const resetView = !!force || vKey !== lastViewKey; + if (resetView) resetChartHistoryState(); + if (elStatus) { elStatus.className = "market-status"; elStatus.textContent = "加载中…"; } updateHeaderLabels(sym, tf); - const qs = new URLSearchParams({ - exchange_key: exKey, - symbol: sym, - timeframe: tf, - }); - if (force) qs.set("refresh", "1"); - try { - const r = await fetch("/api/chart/ohlcv?" + qs.toString(), { credentials: "same-origin" }); - const data = await r.json(); + const data = await fetchChartChunk({ + exchange_key: exKey, + symbol: sym, + timeframe: tf, + limit: chartInitialLimit(tf), + refresh: !!force, + }); if (myToken !== loadToken) return; - if (!r.ok) { - throw new Error(data.detail || data.msg || "请求失败"); - } if (!data.ok || !data.candles || !data.candles.length) { throw new Error(data.msg || "无 K 线"); } @@ -2205,45 +2401,31 @@ priceTick = null; applyChartPriceFormat(); } - lastCandles = alignCandlesToTick(data.candles); - indexCandles(lastCandles); - candleSeries.setData(lastCandles); - volumeSeries.setData(buildVolumeData(lastCandles)); - applyChartRightGap(); + applyCandlesToChart(alignCandlesToTick(data.candles), 0); if (resetView) { lastViewKey = vKey; applyDefaultVisibleRange(); - } else if (savedRange) { - chart.timeScale().setVisibleLogicalRange(savedRange); } - applyPriceAutoScale(); - updateVisibleRangeMarkers(); syncPosContextForView(exKey, sym); if (posContext) { updateLivePosPnl(); refreshPosPnlFromBoard(); } - showLatestOhlcv(); - try { - updateIndicators(); - } catch (indErr) { - /* 指标序列 priceFormat 异常时不阻断主图 */ - } scheduleChartResize(); const limit = data.limit || lastCandles.length; let hint = "已加载 " + - data.candles.length + - " 根(目标 " + + lastCandles.length + + " 根(首屏 " + limit + ")· 库 " + (data.from_cache || 0) + " / 新拉 " + (data.fetched || 0) + - ")· 后台 " + + " · 左拖加载更多 · 后台 " + (data.chart_poll_interval_sec || 5) + - "s 轮询 · SSE"; + "s"; if (data.stale && data.stale_message) { hint += " · 缓存:" + data.stale_message; } diff --git a/tests/test_hub_kline_store.py b/tests/test_hub_kline_store.py index 3a3e460..e4a6606 100644 --- a/tests/test_hub_kline_store.py +++ b/tests/test_hub_kline_store.py @@ -1,21 +1,29 @@ -"""中控 K 线库:15 天滚动与按需合并。""" +"""中控 K 线库:分周期保留、聚合与分页读取。""" from __future__ import annotations -import sqlite3 import tempfile +import time import unittest from pathlib import Path from hub_kline_store import ( - bar_limit_for_timeframe, - load_bars_range, + init_db, + load_bars_before, + load_bars_latest, purge_retention, + purge_timeframe_by_days, resolve_chart_bars, retention_days, upsert_bars, +) +from hub_ohlcv_lib import ( + TIMEFRAME_MS, + bar_limit_for_timeframe, + chart_fetch_start_ms, + chart_initial_limit, + last_closed_bar_open_ms, window_start_ms, ) -from hub_ohlcv_lib import TIMEFRAME_MS, chart_fetch_start_ms, window_start_ms class TestHubKlineStore(unittest.TestCase): @@ -27,27 +35,22 @@ class TestHubKlineStore(unittest.TestCase): self.tmp.cleanup() def test_bar_limits(self): - self.assertEqual(bar_limit_for_timeframe("5m"), 1000) + self.assertEqual(bar_limit_for_timeframe("5m"), 5000) self.assertEqual(bar_limit_for_timeframe("1h"), 1000) - self.assertEqual(bar_limit_for_timeframe("1d"), 500) + self.assertEqual(bar_limit_for_timeframe("1d"), 1000) self.assertEqual(bar_limit_for_timeframe("1w"), 500) + self.assertEqual(chart_initial_limit("5m"), 300) def test_chart_fetch_window_exceeds_retention(self): - import time - now = int(time.time() * 1000) need = bar_limit_for_timeframe("1d") fetch_start = chart_fetch_start_ms("1d", need, now) db_start = window_start_ms("1d", need, retention_days(), now) self.assertLess(fetch_start, db_start) - def test_purge_retention(self): - import time - - from hub_kline_store import init_db - + def test_purge_retention_5m_one_year(self): init_db(self.db) - old_ms = int(time.time() * 1000) - 20 * 86400000 + old_ms = int(time.time() * 1000) - 400 * 86400000 upsert_bars( "okx", "BTC/USDT", @@ -64,26 +67,43 @@ class TestHubKlineStore(unittest.TestCase): ], self.db, ) - n = purge_retention(self.db, days=15) + n = purge_timeframe_by_days("5m", 365, self.db) self.assertGreaterEqual(n, 1) - rows = load_bars_range("okx", "BTC/USDT", "5m", old_ms - 1, old_ms + 1, self.db) + rows = load_bars_latest("okx", "BTC/USDT", "5m", 10, self.db) self.assertEqual(len(rows), 0) + def test_purge_retention_keeps_1d(self): + init_db(self.db) + old_ms = int(time.time() * 1000) - 400 * 86400000 + upsert_bars( + "okx", + "BTC/USDT", + "1d", + [ + { + "open_time_ms": old_ms, + "open": 1, + "high": 2, + "low": 0.5, + "close": 1.5, + "volume": 10, + } + ], + self.db, + ) + purge_retention(self.db) + rows = load_bars_latest("okx", "BTC/USDT", "1d", 10, self.db) + self.assertEqual(len(rows), 1) + def test_resolve_uses_cache_without_remote(self): - import time - - from hub_kline_store import init_db - - from hub_ohlcv_lib import last_closed_bar_open_ms - init_db(self.db) now = int(time.time() * 1000) tf = "5m" period = TIMEFRAME_MS[tf] last_closed = last_closed_bar_open_ms(tf, now) bars = [] - for i in range(1000): - oms = last_closed - (999 - i) * period + for i in range(400): + oms = last_closed - (399 - i) * period bars.append( { "open_time_ms": oms, @@ -99,9 +119,92 @@ class TestHubKlineStore(unittest.TestCase): def remote_fetch(**kwargs): self.fail("不应请求交易所") - out = resolve_chart_bars("okx", "ETH/USDT", tf, remote_fetch, db_path=self.db) + out = resolve_chart_bars( + "okx", + "ETH/USDT", + tf, + remote_fetch, + db_path=self.db, + limit=300, + ) self.assertTrue(out.get("ok")) - self.assertGreaterEqual(len(out.get("candles") or []), 1000) + self.assertEqual(len(out.get("candles") or []), 300) + + def test_resolve_15m_from_5m_aggregate(self): + init_db(self.db) + now = int(time.time() * 1000) + period = TIMEFRAME_MS["5m"] + last_closed = last_closed_bar_open_ms("5m", now) + bars = [] + for i in range(30): + oms = last_closed - (29 - i) * period + bars.append( + { + "open_time_ms": oms, + "open": 1.0 + i, + "high": 2.0 + i, + "low": 0.5 + i, + "close": 1.5 + i, + "volume": 10.0, + } + ) + upsert_bars("okx", "ETH/USDT", "5m", bars, self.db) + + def remote_fetch(**kwargs): + self.fail("不应请求交易所") + + out = resolve_chart_bars( + "okx", + "ETH/USDT", + "15m", + remote_fetch, + db_path=self.db, + limit=5, + ) + self.assertTrue(out.get("ok")) + self.assertEqual(out.get("source"), "aggregate") + self.assertGreaterEqual(len(out.get("candles") or []), 5) + + def test_load_bars_before(self): + init_db(self.db) + period = TIMEFRAME_MS["1h"] + base = 1_700_000_000_000 + bars = [] + for i in range(5): + bars.append( + { + "open_time_ms": base + i * period, + "open": 1, + "high": 2, + "low": 0.5, + "close": 1.5, + "volume": 1, + } + ) + upsert_bars("okx", "BTC/USDT", "1h", bars, self.db) + before = base + 3 * period + got = load_bars_before("okx", "BTC/USDT", "1h", before, 2, self.db) + self.assertEqual(len(got), 2) + self.assertEqual(got[-1]["open_time_ms"], base + 2 * period) + + def test_resolve_before_ms_exhausted(self): + init_db(self.db) + + def remote_fetch(**kwargs): + return {"ok": False, "msg": "no remote"} + + out = resolve_chart_bars( + "okx", + "BTC/USDT", + "5m", + remote_fetch, + db_path=self.db, + limit=100, + before_ms=int(time.time() * 1000), + ) + self.assertTrue(out.get("ok")) + self.assertEqual(out.get("candles"), []) + self.assertTrue(out.get("exhausted")) if __name__ == "__main__":