From ca6ef59a145c021df158b50ce63d155724f15157 Mon Sep 17 00:00:00 2001 From: dekun Date: Mon, 8 Jun 2026 11:31:16 +0800 Subject: [PATCH] Add clear-and-refetch for hub K-line cache. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Force refresh wipes the series in hub_kline.db before pulling from the exchange; add a Linux clear script and rename the UI button to 清库重拉. Co-authored-by: Cursor --- hub_kline_store.py | 47 ++++++++++++++ manual_trading_hub/hub.py | 2 + manual_trading_hub/static/chart.js | 1 + manual_trading_hub/static/index.html | 4 +- scripts/clear_hub_kline_db.py | 93 ++++++++++++++++++++++++++++ tests/test_hub_kline_store.py | 52 ++++++++++++++++ 6 files changed, 197 insertions(+), 2 deletions(-) create mode 100644 scripts/clear_hub_kline_db.py diff --git a/hub_kline_store.py b/hub_kline_store.py index f0c7a26..34742f5 100644 --- a/hub_kline_store.py +++ b/hub_kline_store.py @@ -196,6 +196,47 @@ def purge_1m_bar_cap(db_path: Path | None = None, *, max_bars: int | None = None conn.close() +def clear_series_bars( + exchange_key: str, + symbol: str, + timeframe: str | None = None, + db_path: Path | None = None, +) -> int: + """删除某交易所+币种 K 线(可指定周期);用于清库后全量重拉。""" + init_db(db_path) + ex_k = (exchange_key or "").strip().lower() + sym = (symbol or "").strip().upper() + if not ex_k or not sym: + return 0 + conn = _connect(db_path) + try: + if timeframe: + tf = normalize_chart_timeframe(timeframe) + cur = conn.execute( + "DELETE FROM ohlcv_bars WHERE exchange_key=? AND symbol=? AND timeframe=?", + (ex_k, sym, tf), + ) + else: + cur = conn.execute( + "DELETE FROM ohlcv_bars WHERE exchange_key=? AND symbol=?", + (ex_k, sym), + ) + return int(cur.rowcount or 0) + finally: + conn.close() + + +def clear_all_bars(db_path: Path | None = None) -> int: + """清空 hub K 线库全部 OHLCV 行。""" + init_db(db_path) + conn = _connect(db_path) + try: + cur = conn.execute("DELETE FROM ohlcv_bars") + return int(cur.rowcount or 0) + finally: + conn.close() + + def purge_retention(db_path: Path | None = None) -> int: """按周期策略清理:5m/15m/1h/2h/4h 一年;1m 保留最近 N 根;1d/1w 不删。""" n = 0 @@ -479,6 +520,7 @@ def resolve_chart_bars( db_path: Path | None = None, force_refresh: bool = False, tail_refresh: bool = False, + clear_db: bool = False, limit: int | None = None, before_ms: int | None = None, ) -> dict[str, Any]: @@ -488,6 +530,7 @@ def resolve_chart_bars( """ init_db(db_path) purged = purge_retention(db_path) + cleared = 0 sym = (symbol or "").strip().upper() ex_k = (exchange_key or "").strip().lower() @@ -510,6 +553,9 @@ def resolve_chart_bars( need = min(need, 30) cutoff = history_cutoff_ms_for_storage(storage_tf, now_ms) + if clear_db and not is_history and not tail_refresh: + cleared = clear_series_bars(ex_k, sym, storage_tf, db_path) + def load_display_rows() -> list[dict[str, Any]]: if is_history: rows = load_bars_before(ex_k, sym, storage_tf, int(before_ms), need, db_path) @@ -706,6 +752,7 @@ def resolve_chart_bars( "candles": candles, "from_cache": from_cache, "fetched": fetched, + "cleared": cleared, "purged": purged, "price_tick": price_tick, "stale": bool(remote_err), diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index 83c0e12..3a7ffcd 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -675,6 +675,7 @@ def api_chart_ohlcv( bms = int(bms_raw) except ValueError: raise HTTPException(status_code=400, detail="before_ms 无效") + clear_db = force and not tail_refresh and bms is None def remote_fetch(**kwargs): tf_use = kwargs.get("timeframe") or timeframe @@ -693,6 +694,7 @@ def api_chart_ohlcv( remote_fetch, force_refresh=force, tail_refresh=tail_refresh, + clear_db=clear_db, limit=lim, before_ms=bms, ) diff --git a/manual_trading_hub/static/chart.js b/manual_trading_hub/static/chart.js index 8466517..640bcb6 100644 --- a/manual_trading_hub/static/chart.js +++ b/manual_trading_hub/static/chart.js @@ -2648,6 +2648,7 @@ (data.from_cache || 0) + " / 新拉 " + (data.fetched || 0) + + (data.cleared ? " · 清库 " + data.cleared : "") + " · 左拖加载更多 · 后台 " + (data.chart_poll_interval_sec || 5) + "s"; diff --git a/manual_trading_hub/static/index.html b/manual_trading_hub/static/index.html index cb8342c..ac403fe 100644 --- a/manual_trading_hub/static/index.html +++ b/manual_trading_hub/static/index.html @@ -98,7 +98,7 @@ - + @@ -349,7 +349,7 @@
- + diff --git a/scripts/clear_hub_kline_db.py b/scripts/clear_hub_kline_db.py new file mode 100644 index 0000000..9ae5187 --- /dev/null +++ b/scripts/clear_hub_kline_db.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python3 +"""清空中控 K 线 SQLite 缓存(hub_kline.db),便于清库后全量重拉。 + +用法(Linux 云服务器,在仓库根目录): + python3 scripts/clear_hub_kline_db.py --dry-run + python3 scripts/clear_hub_kline_db.py --apply + python3 scripts/clear_hub_kline_db.py --apply --exchange binance --symbol BTC/USDT --timeframe 15m + +默认库路径:环境变量 HUB_KLINE_DB_PATH,或 manual_trading_hub/data/hub_kline.db +""" +from __future__ import annotations + +import argparse +import os +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT)) + +from hub_kline_store import ( # noqa: E402 + clear_all_bars, + clear_series_bars, + default_db_path, + init_db, +) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Clear manual-trading-hub K-line SQLite cache.") + parser.add_argument( + "--db", + default=os.getenv("HUB_KLINE_DB_PATH", "").strip() or str(default_db_path()), + help="hub_kline.db path", + ) + parser.add_argument("--exchange", default="", help="exchange_key, e.g. binance") + parser.add_argument("--symbol", default="", help="symbol, e.g. BTC/USDT") + parser.add_argument("--timeframe", default="", help="optional timeframe, e.g. 15m") + parser.add_argument("--dry-run", action="store_true", help="count only") + parser.add_argument("--apply", action="store_true", help="execute delete") + args = parser.parse_args() + + db_path = Path(args.db) + if not db_path.is_file(): + print(f"DB not found: {db_path}", file=sys.stderr) + return 1 + + init_db(db_path) + ex = (args.exchange or "").strip().lower() + sym = (args.symbol or "").strip().upper() + tf = (args.timeframe or "").strip().lower() or None + + if args.dry_run and not args.apply: + import sqlite3 + + conn = sqlite3.connect(str(db_path)) + try: + if ex and sym: + if tf: + n = conn.execute( + "SELECT COUNT(*) FROM ohlcv_bars WHERE exchange_key=? AND symbol=? AND timeframe=?", + (ex, sym, tf), + ).fetchone()[0] + print(f"would delete series rows: {n} ({ex} {sym} {tf})") + else: + n = conn.execute( + "SELECT COUNT(*) FROM ohlcv_bars WHERE exchange_key=? AND symbol=?", + (ex, sym), + ).fetchone()[0] + print(f"would delete symbol rows: {n} ({ex} {sym} all tf)") + else: + n = conn.execute("SELECT COUNT(*) FROM ohlcv_bars").fetchone()[0] + print(f"would delete all ohlcv_bars rows: {n}") + finally: + conn.close() + return 0 + + if not args.apply: + print("Specify --apply to delete (or --dry-run to preview).", file=sys.stderr) + return 1 + + if ex and sym: + removed = clear_series_bars(ex, sym, tf, db_path) + scope = f"{ex} {sym}" + (f" {tf}" if tf else " (all timeframes)") + print(f"cleared {removed} rows for {scope}") + else: + removed = clear_all_bars(db_path) + print(f"cleared all {removed} ohlcv_bars rows from {db_path}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_hub_kline_store.py b/tests/test_hub_kline_store.py index d7749e0..b8d835e 100644 --- a/tests/test_hub_kline_store.py +++ b/tests/test_hub_kline_store.py @@ -7,6 +7,7 @@ import unittest from pathlib import Path from hub_kline_store import ( + clear_series_bars, init_db, load_bars_before, load_bars_latest, @@ -339,6 +340,57 @@ class TestHubKlineStore(unittest.TestCase): self.assertGreaterEqual(len(out.get("candles") or []), 100) self.assertGreater(int(out.get("fetched") or 0), 0) + def test_clear_series_and_force_refetch(self): + init_db(self.db) + period = TIMEFRAME_MS["5m"] + now = int(time.time() * 1000) + stale = [ + { + "open_time_ms": now - period * (i + 100), + "open": 1, + "high": 2, + "low": 0.5, + "close": 1.5, + "volume": 1, + } + for i in range(40) + ] + upsert_bars("binance", "BTC/USDT", "5m", stale, self.db) + self.assertEqual(len(load_bars_latest("binance", "BTC/USDT", "5m", 100, self.db)), 40) + removed = clear_series_bars("binance", "BTC/USDT", "5m", self.db) + self.assertEqual(removed, 40) + self.assertEqual(len(load_bars_latest("binance", "BTC/USDT", "5m", 100, self.db)), 0) + + fresh = [ + { + "open_time_ms": now - period * (20 - i), + "open": 10, + "high": 11, + "low": 9, + "close": 10.5, + "volume": 2, + } + for i in range(20) + ] + + def remote_fetch(**kwargs): + return {"ok": True, "bars": fresh, "price_tick": 0.01} + + out = resolve_chart_bars( + "binance", + "BTC/USDT", + "5m", + remote_fetch, + db_path=self.db, + force_refresh=True, + clear_db=True, + limit=50, + ) + self.assertTrue(out.get("ok")) + self.assertGreaterEqual(int(out.get("cleared") or 0), 0) + self.assertGreater(int(out.get("fetched") or 0), 0) + self.assertGreaterEqual(len(out.get("candles") or []), 19) + def test_resolve_before_ms_exhausted(self): init_db(self.db)