fix: align OHLCV fetch window with limit so chart seeds past 30 bars

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-08 11:37:00 +08:00
parent ca6ef59a14
commit 1dcf62bb08
2 changed files with 172 additions and 13 deletions
+121 -13
View File
@@ -26,6 +26,9 @@ from hub_ohlcv_lib import (
seed_bar_target, seed_bar_target,
) )
HUB_KLINE_MIN_BARS_BEFORE_TAIL = 200
HUB_KLINE_REMOTE_FETCH_CAP = 1500
_DEFAULT_RETENTION_DAYS = 15 _DEFAULT_RETENTION_DAYS = 15
@@ -327,6 +330,56 @@ def load_bars_range(
conn.close() conn.close()
def count_series_bars(
exchange_key: str,
symbol: str,
timeframe: str,
db_path: Path | None = None,
) -> int:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
tf = normalize_chart_timeframe(timeframe)
conn = _connect(db_path)
try:
row = conn.execute(
"""
SELECT COUNT(*) AS c FROM ohlcv_bars
WHERE exchange_key=? AND symbol=? AND timeframe=?
""",
(ex_k, sym, tf),
).fetchone()
return int(row["c"] or 0) if row else 0
finally:
conn.close()
def _remote_fetch_limit(
*,
need: int,
force_refresh: bool,
storage_tf: str,
tail_only: bool,
) -> int:
if tail_only:
return min(need + 20, 300)
cap = HUB_KLINE_REMOTE_FETCH_CAP
if force_refresh:
return min(seed_bar_target(storage_tf), cap)
return min(max(need + 20, 1), cap)
def _since_ms_for_span(
*,
now_ms: int,
period_ms: int,
span_bars: int,
cutoff_ms: int,
) -> int:
"""拉取窗口起点:跨度必须与 fetch_limit 一致,保证数据能铺到最近。"""
span = max(1, int(span_bars))
return max(int(cutoff_ms), int(now_ms) - int(period_ms) * span)
def load_bars_latest( def load_bars_latest(
exchange_key: str, exchange_key: str,
symbol: str, symbol: str,
@@ -549,8 +602,15 @@ def resolve_chart_bars(
now_ms = int(time.time() * 1000) now_ms = int(time.time() * 1000)
period_display = TIMEFRAME_MS[display_tf] period_display = TIMEFRAME_MS[display_tf]
period_storage = TIMEFRAME_MS[storage_tf] period_storage = TIMEFRAME_MS[storage_tf]
series_bar_count = (
count_series_bars(ex_k, sym, storage_tf, db_path) if not is_history else 0
)
if tail_refresh and not is_history: if tail_refresh and not is_history:
need = min(need, 30) min_seed = min(chart_initial_limit(display_tf) // 5, HUB_KLINE_MIN_BARS_BEFORE_TAIL)
if series_bar_count < max(1, min_seed):
tail_refresh = False
else:
need = min(need, 30)
cutoff = history_cutoff_ms_for_storage(storage_tf, now_ms) cutoff = history_cutoff_ms_for_storage(storage_tf, now_ms)
if clear_db and not is_history and not tail_refresh: if clear_db and not is_history and not tail_refresh:
@@ -605,13 +665,27 @@ def resolve_chart_bars(
fetch_limit = min(need + 20, 1500) fetch_limit = min(need + 20, 1500)
elif tail_only: elif tail_only:
anchor_ms = int(newest_db) if newest_db is not None else now_ms anchor_ms = int(newest_db) if newest_db is not None else now_ms
since = max(cutoff, anchor_ms - period_storage * 5) fetch_limit = _remote_fetch_limit(
fetch_limit = min(need + 20, 300) need=need, force_refresh=False, storage_tf=storage_tf, tail_only=True
)
since = _since_ms_for_span(
now_ms=anchor_ms,
period_ms=period_storage,
span_bars=5,
cutoff_ms=cutoff,
)
else: else:
since = max(cutoff, now_ms - period_storage * min(need, seed_bar_target(storage_tf))) fetch_limit = _remote_fetch_limit(
fetch_limit = min( need=need,
seed_bar_target(storage_tf) if force_refresh else need + 20, force_refresh=force_refresh,
1500, storage_tf=storage_tf,
tail_only=False,
)
since = _since_ms_for_span(
now_ms=now_ms,
period_ms=period_storage,
span_bars=fetch_limit,
cutoff_ms=cutoff,
) )
remote = remote_fetch( remote = remote_fetch(
@@ -635,6 +709,33 @@ def resolve_chart_bars(
timeframe=storage_tf, timeframe=storage_tf,
db_path=db_path, db_path=db_path,
) )
if not is_history and not tail_only and db_rows:
newest_ms = int(db_rows[-1]["open_time_ms"])
if newest_ms < int(last_closed) - period_display:
gap_limit = min(
500,
int((now_ms - newest_ms) // period_storage) + 10,
)
if gap_limit > 1:
gap_remote = remote_fetch(
symbol=sym,
timeframe=storage_tf,
since_ms=newest_ms,
limit=gap_limit,
)
if gap_remote.get("ok") and gap_remote.get("bars"):
fetched += upsert_bars(
ex_k, sym, storage_tf, gap_remote["bars"], db_path
)
db_rows = load_display_rows()
db_rows = normalize_contiguous_db_rows(
db_rows,
period_ms=period_display,
exchange_key=ex_k,
symbol=sym,
timeframe=storage_tf,
db_path=db_path,
)
else: else:
remote_err = remote.get("msg") or remote.get("error") or "实例拉取 K 线失败" remote_err = remote.get("msg") or remote.get("error") or "实例拉取 K 线失败"
if not db_rows: if not db_rows:
@@ -683,15 +784,22 @@ def resolve_chart_bars(
if not is_history and len(db_rows) < need: if not is_history and len(db_rows) < need:
missing = need - len(db_rows) missing = need - len(db_rows)
backfill_limit = min(missing + 60, HUB_KLINE_REMOTE_FETCH_CAP)
if db_rows: if db_rows:
oldest = int(db_rows[0]["open_time_ms"]) oldest = int(db_rows[0]["open_time_ms"])
backfill_since = max(cutoff, oldest - period_storage * (missing + 40)) backfill_since = _since_ms_for_span(
backfill_limit = min(missing + 60, 1500) now_ms=oldest,
else: period_ms=period_storage,
backfill_since = max( span_bars=backfill_limit,
cutoff, now_ms - period_storage * min(need, seed_bar_target(storage_tf)) cutoff_ms=cutoff,
)
else:
backfill_since = _since_ms_for_span(
now_ms=now_ms,
period_ms=period_storage,
span_bars=backfill_limit,
cutoff_ms=cutoff,
) )
backfill_limit = min(need + 20, 1500)
try: try:
remote_back = remote_fetch( remote_back = remote_fetch(
symbol=sym, symbol=sym,
+51
View File
@@ -7,6 +7,8 @@ import unittest
from pathlib import Path from pathlib import Path
from hub_kline_store import ( from hub_kline_store import (
HUB_KLINE_REMOTE_FETCH_CAP,
_since_ms_for_span,
clear_series_bars, clear_series_bars,
init_db, init_db,
load_bars_before, load_bars_before,
@@ -391,6 +393,55 @@ class TestHubKlineStore(unittest.TestCase):
self.assertGreater(int(out.get("fetched") or 0), 0) self.assertGreater(int(out.get("fetched") or 0), 0)
self.assertGreaterEqual(len(out.get("candles") or []), 19) self.assertGreaterEqual(len(out.get("candles") or []), 19)
def test_since_span_matches_fetch_limit_not_need(self):
period = TIMEFRAME_MS["15m"]
now_ms = 1_800_000_000_000
fetch_limit = HUB_KLINE_REMOTE_FETCH_CAP
since = _since_ms_for_span(
now_ms=now_ms,
period_ms=period,
span_bars=fetch_limit,
cutoff_ms=0,
)
self.assertEqual(since, now_ms - period * fetch_limit)
wrong_since = now_ms - period * chart_initial_limit("15m")
self.assertGreater(since, wrong_since)
def test_thin_series_tail_refresh_fetches_full_window(self):
init_db(self.db)
period = TIMEFRAME_MS["15m"]
now = int(time.time() * 1000)
last_closed = last_closed_bar_open_ms("15m", now)
bars = [
{
"open_time_ms": last_closed - period * (150 - i),
"open": 100000,
"high": 100100,
"low": 99900,
"close": 100050,
"volume": 1,
}
for i in range(150)
]
fetch_calls: list[dict] = []
def remote_fetch(**kwargs):
fetch_calls.append(dict(kwargs))
return {"ok": True, "bars": bars, "price_tick": 0.01}
out = resolve_chart_bars(
"binance",
"BTC/USDT",
"15m",
remote_fetch,
db_path=self.db,
tail_refresh=True,
)
self.assertTrue(out.get("ok"))
self.assertGreaterEqual(len(out.get("candles") or []), 100)
self.assertGreater(int(out.get("fetched") or 0), 0)
self.assertTrue(any(int(c.get("limit") or 0) > 30 for c in fetch_calls))
def test_resolve_before_ms_exhausted(self): def test_resolve_before_ms_exhausted(self):
init_db(self.db) init_db(self.db)