Fetch native exchange OHLCV per timeframe instead of local aggregation.
Store and serve 15m/2h/4h directly from the exchange so market charts match venue candles. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+18
-51
@@ -1,4 +1,4 @@
|
|||||||
"""中控 K 线 SQLite:分周期保留、本地聚合、分页读取。"""
|
"""中控 K 线 SQLite:分周期保留、交易所直拉、分页读取。"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
@@ -12,9 +12,7 @@ from hub_ohlcv_lib import (
|
|||||||
HUB_KLINE_1M_MAX_BARS,
|
HUB_KLINE_1M_MAX_BARS,
|
||||||
HUB_KLINE_5M_1H_RETENTION_DAYS,
|
HUB_KLINE_5M_1H_RETENTION_DAYS,
|
||||||
TIMEFRAME_MS,
|
TIMEFRAME_MS,
|
||||||
aggregate_ohlcv_bars,
|
YEAR_ROLLING_STORED,
|
||||||
aggregate_ratio,
|
|
||||||
aggregation_source_for_display,
|
|
||||||
chart_chunk_limit,
|
chart_chunk_limit,
|
||||||
chart_initial_limit,
|
chart_initial_limit,
|
||||||
chart_memory_cap,
|
chart_memory_cap,
|
||||||
@@ -26,7 +24,6 @@ from hub_ohlcv_lib import (
|
|||||||
retention_policy_meta,
|
retention_policy_meta,
|
||||||
round_ohlcv_bars_to_tick,
|
round_ohlcv_bars_to_tick,
|
||||||
seed_bar_target,
|
seed_bar_target,
|
||||||
sync_timeframe_for_display,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
_DEFAULT_RETENTION_DAYS = 15
|
_DEFAULT_RETENTION_DAYS = 15
|
||||||
@@ -200,10 +197,10 @@ def purge_1m_bar_cap(db_path: Path | None = None, *, max_bars: int | None = None
|
|||||||
|
|
||||||
|
|
||||||
def purge_retention(db_path: Path | None = None) -> int:
|
def purge_retention(db_path: Path | None = None) -> int:
|
||||||
"""按周期策略清理:5m/1h 一年;1m 保留最近 N 根;1d/1w 不删。"""
|
"""按周期策略清理:5m/15m/1h/2h/4h 一年;1m 保留最近 N 根;1d/1w 不删。"""
|
||||||
n = 0
|
n = 0
|
||||||
n += purge_timeframe_by_days("5m", HUB_KLINE_5M_1H_RETENTION_DAYS, db_path)
|
for tf in sorted(YEAR_ROLLING_STORED):
|
||||||
n += purge_timeframe_by_days("1h", HUB_KLINE_5M_1H_RETENTION_DAYS, db_path)
|
n += purge_timeframe_by_days(tf, HUB_KLINE_5M_1H_RETENTION_DAYS, db_path)
|
||||||
n += purge_1m_bar_cap(db_path)
|
n += purge_1m_bar_cap(db_path)
|
||||||
return n
|
return n
|
||||||
|
|
||||||
@@ -400,19 +397,6 @@ def _trim_display_bars(
|
|||||||
return bars
|
return bars
|
||||||
|
|
||||||
|
|
||||||
def _aggregate_display_bars(
|
|
||||||
src_bars: list[dict[str, Any]],
|
|
||||||
display_tf: str,
|
|
||||||
*,
|
|
||||||
need: int,
|
|
||||||
before_ms: int | None,
|
|
||||||
) -> list[dict[str, Any]]:
|
|
||||||
if not src_bars:
|
|
||||||
return []
|
|
||||||
agg = aggregate_ohlcv_bars(src_bars, display_tf)
|
|
||||||
return _trim_display_bars(agg, need=need, before_ms=before_ms)
|
|
||||||
|
|
||||||
|
|
||||||
def resolve_chart_bars(
|
def resolve_chart_bars(
|
||||||
exchange_key: str,
|
exchange_key: str,
|
||||||
symbol: str,
|
symbol: str,
|
||||||
@@ -427,7 +411,7 @@ def resolve_chart_bars(
|
|||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""
|
"""
|
||||||
分页读库:首屏 / 左拖 before_ms / 尾部 tail_refresh。
|
分页读库:首屏 / 左拖 before_ms / 尾部 tail_refresh。
|
||||||
15m←5m,2h/4h←1h 现场聚合;其余直读入库周期。
|
各展示周期均直读交易所同步入库的同名 K 线。
|
||||||
"""
|
"""
|
||||||
init_db(db_path)
|
init_db(db_path)
|
||||||
purged = purge_retention(db_path)
|
purged = purge_retention(db_path)
|
||||||
@@ -438,8 +422,7 @@ def resolve_chart_bars(
|
|||||||
if not sym or not ex_k:
|
if not sym or not ex_k:
|
||||||
return {"ok": False, "msg": "缺少 exchange 或 symbol"}
|
return {"ok": False, "msg": "缺少 exchange 或 symbol"}
|
||||||
|
|
||||||
agg_src = aggregation_source_for_display(display_tf)
|
storage_tf = display_tf
|
||||||
storage_tf = agg_src or sync_timeframe_for_display(display_tf)
|
|
||||||
is_history = before_ms is not None and int(before_ms) > 0
|
is_history = before_ms is not None and int(before_ms) > 0
|
||||||
need = int(
|
need = int(
|
||||||
limit
|
limit
|
||||||
@@ -450,24 +433,14 @@ def resolve_chart_bars(
|
|||||||
now_ms = int(time.time() * 1000)
|
now_ms = int(time.time() * 1000)
|
||||||
period_display = TIMEFRAME_MS[display_tf]
|
period_display = TIMEFRAME_MS[display_tf]
|
||||||
period_storage = TIMEFRAME_MS[storage_tf]
|
period_storage = TIMEFRAME_MS[storage_tf]
|
||||||
ratio = aggregate_ratio(display_tf, storage_tf) if agg_src else 1
|
|
||||||
if tail_refresh and not is_history:
|
if tail_refresh and not is_history:
|
||||||
need = min(need, max(30, ratio * 6 if agg_src else 20))
|
need = min(need, 30)
|
||||||
src_need = need * ratio + ratio * 4
|
|
||||||
cutoff = history_cutoff_ms_for_storage(storage_tf, now_ms)
|
cutoff = history_cutoff_ms_for_storage(storage_tf, now_ms)
|
||||||
source_kind = "aggregate" if agg_src else "db"
|
|
||||||
|
|
||||||
def load_display_rows() -> list[dict[str, Any]]:
|
def load_display_rows() -> list[dict[str, Any]]:
|
||||||
if agg_src:
|
|
||||||
if is_history:
|
|
||||||
src = load_bars_before(ex_k, sym, storage_tf, int(before_ms), src_need, db_path)
|
|
||||||
else:
|
|
||||||
src = load_bars_latest(ex_k, sym, storage_tf, src_need, db_path)
|
|
||||||
return _aggregate_display_bars(
|
|
||||||
src, display_tf, need=need, before_ms=before_ms if is_history else None
|
|
||||||
)
|
|
||||||
if is_history:
|
if is_history:
|
||||||
return load_bars_before(ex_k, sym, storage_tf, int(before_ms), need, db_path)
|
rows = load_bars_before(ex_k, sym, storage_tf, int(before_ms), need, db_path)
|
||||||
|
return _trim_display_bars(rows, need=need, before_ms=int(before_ms))
|
||||||
return load_bars_latest(ex_k, sym, storage_tf, need, db_path)
|
return load_bars_latest(ex_k, sym, storage_tf, need, db_path)
|
||||||
|
|
||||||
db_rows: list[dict[str, Any]] = []
|
db_rows: list[dict[str, Any]] = []
|
||||||
@@ -498,20 +471,16 @@ def resolve_chart_bars(
|
|||||||
if is_history:
|
if is_history:
|
||||||
bms = int(before_ms)
|
bms = int(before_ms)
|
||||||
anchor = bms - period_display
|
anchor = bms - period_display
|
||||||
since = max(cutoff, anchor - period_storage * src_need)
|
since = max(cutoff, anchor - period_storage * need)
|
||||||
fetch_limit = min(src_need + 20, 1500)
|
fetch_limit = min(need + 20, 1500)
|
||||||
elif tail_only:
|
elif tail_only:
|
||||||
if agg_src:
|
anchor_ms = int(newest_db) if newest_db is not None else now_ms
|
||||||
src_tail = load_bars_latest(ex_k, sym, storage_tf, 5, db_path)
|
since = max(cutoff, anchor_ms - period_storage * 5)
|
||||||
anchor_ms = int(src_tail[-1]["open_time_ms"]) if src_tail else now_ms
|
fetch_limit = min(need + 20, 300)
|
||||||
else:
|
|
||||||
anchor_ms = int(newest_db) if newest_db is not None else now_ms
|
|
||||||
since = max(cutoff, anchor_ms - period_storage * max(5, ratio * 3))
|
|
||||||
fetch_limit = min(max(20, ratio * 8), 300)
|
|
||||||
else:
|
else:
|
||||||
since = max(cutoff, now_ms - period_storage * min(src_need, seed_bar_target(storage_tf)))
|
since = max(cutoff, now_ms - period_storage * min(need, seed_bar_target(storage_tf)))
|
||||||
fetch_limit = min(
|
fetch_limit = min(
|
||||||
seed_bar_target(storage_tf) if force_refresh else src_need + 20,
|
seed_bar_target(storage_tf) if force_refresh else need + 20,
|
||||||
1500,
|
1500,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -527,8 +496,6 @@ def resolve_chart_bars(
|
|||||||
if price_tick is not None:
|
if price_tick is not None:
|
||||||
save_symbol_price_tick(ex_k, sym, price_tick, db_path)
|
save_symbol_price_tick(ex_k, sym, price_tick, db_path)
|
||||||
db_rows = load_display_rows()
|
db_rows = load_display_rows()
|
||||||
if fetched:
|
|
||||||
source_kind = "remote" if source_kind == "db" else source_kind
|
|
||||||
else:
|
else:
|
||||||
remote_err = remote.get("msg") or remote.get("error") or "实例拉取 K 线失败"
|
remote_err = remote.get("msg") or remote.get("error") or "实例拉取 K 线失败"
|
||||||
if not db_rows:
|
if not db_rows:
|
||||||
@@ -589,7 +556,7 @@ def resolve_chart_bars(
|
|||||||
"oldest_ms": oldest_ms,
|
"oldest_ms": oldest_ms,
|
||||||
"newest_ms": newest_ms,
|
"newest_ms": newest_ms,
|
||||||
"exhausted": exhausted,
|
"exhausted": exhausted,
|
||||||
"source": "remote" if fetched else source_kind,
|
"source": "remote" if fetched else "db",
|
||||||
"retention_policy": retention_policy_meta(),
|
"retention_policy": retention_policy_meta(),
|
||||||
"candles": candles,
|
"candles": candles,
|
||||||
"from_cache": from_cache,
|
"from_cache": from_cache,
|
||||||
|
|||||||
+74
-12
@@ -31,17 +31,13 @@ CHART_TIMEFRAME_ORDER = (
|
|||||||
)
|
)
|
||||||
DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"})
|
DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"})
|
||||||
|
|
||||||
# 入库 / 同步真源(交易所拉取)
|
# 入库 / 同步真源(各周期直拉交易所,不做本地聚合)
|
||||||
STORED_TIMEFRAMES = frozenset({"1m", "5m", "1h", "1d", "1w"})
|
STORED_TIMEFRAMES = frozenset(CHART_TIMEFRAMES)
|
||||||
PERMANENT_STORED_TIMEFRAMES = frozenset({"1d", "1w"})
|
PERMANENT_STORED_TIMEFRAMES = frozenset({"1d", "1w"})
|
||||||
YEAR_ROLLING_STORED = frozenset({"5m", "1h"})
|
YEAR_ROLLING_STORED = frozenset({"5m", "15m", "1h", "2h", "4h"})
|
||||||
|
|
||||||
# 展示周期 → 本地聚合源(不落库)
|
# 行情区不做展示周期聚合;保留空映射供兼容读取
|
||||||
CHART_DISPLAY_AGGREGATE_FROM: dict[str, str] = {
|
CHART_DISPLAY_AGGREGATE_FROM: dict[str, str] = {}
|
||||||
"15m": "5m",
|
|
||||||
"2h": "1h",
|
|
||||||
"4h": "1h",
|
|
||||||
}
|
|
||||||
|
|
||||||
SMALL_DISPLAY_TFS = frozenset({"1m", "5m", "15m"})
|
SMALL_DISPLAY_TFS = frozenset({"1m", "5m", "15m"})
|
||||||
MID_DISPLAY_TFS = frozenset({"1h", "2h", "4h"})
|
MID_DISPLAY_TFS = frozenset({"1h", "2h", "4h"})
|
||||||
@@ -151,13 +147,17 @@ def seed_bar_target(storage_tf: str) -> int:
|
|||||||
|
|
||||||
|
|
||||||
def retention_policy_meta() -> dict[str, Any]:
|
def retention_policy_meta() -> dict[str, Any]:
|
||||||
|
year = {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS}
|
||||||
return {
|
return {
|
||||||
"1m": {"mode": "bars", "max_bars": HUB_KLINE_1M_MAX_BARS},
|
"1m": {"mode": "bars", "max_bars": HUB_KLINE_1M_MAX_BARS},
|
||||||
"5m": {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS},
|
"5m": dict(year),
|
||||||
"1h": {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS},
|
"15m": dict(year),
|
||||||
|
"1h": dict(year),
|
||||||
|
"2h": dict(year),
|
||||||
|
"4h": dict(year),
|
||||||
"1d": {"mode": "permanent"},
|
"1d": {"mode": "permanent"},
|
||||||
"1w": {"mode": "permanent"},
|
"1w": {"mode": "permanent"},
|
||||||
"aggregate_from": dict(CHART_DISPLAY_AGGREGATE_FROM),
|
"aggregate_from": {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -399,6 +399,68 @@ def align_bar_open_ms(open_time_ms: int, period_ms: int) -> int:
|
|||||||
return (int(open_time_ms) // period_ms) * period_ms
|
return (int(open_time_ms) // period_ms) * period_ms
|
||||||
|
|
||||||
|
|
||||||
|
def snap_to_bar_grid(ts_ms: int, origin_ms: int, step_ms: int) -> int:
|
||||||
|
step = max(1, int(step_ms))
|
||||||
|
origin = int(origin_ms)
|
||||||
|
if ts_ms <= origin:
|
||||||
|
return origin
|
||||||
|
idx = (int(ts_ms) - origin + step - 1) // step
|
||||||
|
return origin + idx * step
|
||||||
|
|
||||||
|
|
||||||
|
def fill_missing_ohlcv_bars(
|
||||||
|
bars: list[dict[str, Any]],
|
||||||
|
period_ms: int,
|
||||||
|
start_ms: int | None = None,
|
||||||
|
end_ms: int | None = None,
|
||||||
|
) -> list[dict[str, Any]]:
|
||||||
|
"""细周期缺口用上一根收盘价填平,保证聚合后 K 线时间轴连续。"""
|
||||||
|
by_ts: dict[int, dict[str, Any]] = {}
|
||||||
|
for b in bars or []:
|
||||||
|
try:
|
||||||
|
by_ts[int(b["open_time_ms"])] = b
|
||||||
|
except (KeyError, TypeError, ValueError):
|
||||||
|
continue
|
||||||
|
if not by_ts:
|
||||||
|
return []
|
||||||
|
keys = sorted(by_ts.keys())
|
||||||
|
step_ms = max(1, int(period_ms))
|
||||||
|
origin = keys[0]
|
||||||
|
aligned_start = snap_to_bar_grid(
|
||||||
|
int(start_ms if start_ms is not None else keys[0]), origin, step_ms
|
||||||
|
)
|
||||||
|
aligned_end = max(
|
||||||
|
int(end_ms if end_ms is not None else keys[-1]),
|
||||||
|
keys[-1],
|
||||||
|
)
|
||||||
|
out: list[dict[str, Any]] = []
|
||||||
|
last: dict[str, Any] | None = None
|
||||||
|
for ts_key in keys:
|
||||||
|
if ts_key <= aligned_start:
|
||||||
|
last = by_ts[ts_key]
|
||||||
|
ts = aligned_start
|
||||||
|
while ts <= aligned_end:
|
||||||
|
cur = by_ts.get(ts)
|
||||||
|
if cur is not None:
|
||||||
|
last = cur
|
||||||
|
out.append(cur)
|
||||||
|
elif last is not None:
|
||||||
|
c = float(last["close"])
|
||||||
|
out.append(
|
||||||
|
{
|
||||||
|
"open_time_ms": ts,
|
||||||
|
"open": c,
|
||||||
|
"high": c,
|
||||||
|
"low": c,
|
||||||
|
"close": c,
|
||||||
|
"volume": 0.0,
|
||||||
|
"filled": True,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
ts += step_ms
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
def aggregate_ohlcv_bars(
|
def aggregate_ohlcv_bars(
|
||||||
bars: list[dict[str, Any]], target_timeframe: str
|
bars: list[dict[str, Any]], target_timeframe: str
|
||||||
) -> list[dict[str, Any]]:
|
) -> list[dict[str, Any]]:
|
||||||
|
|||||||
@@ -130,14 +130,14 @@ class TestHubKlineStore(unittest.TestCase):
|
|||||||
self.assertTrue(out.get("ok"))
|
self.assertTrue(out.get("ok"))
|
||||||
self.assertEqual(len(out.get("candles") or []), 300)
|
self.assertEqual(len(out.get("candles") or []), 300)
|
||||||
|
|
||||||
def test_resolve_15m_from_5m_aggregate(self):
|
def test_resolve_15m_reads_native_bars(self):
|
||||||
init_db(self.db)
|
init_db(self.db)
|
||||||
now = int(time.time() * 1000)
|
now = int(time.time() * 1000)
|
||||||
period = TIMEFRAME_MS["5m"]
|
period = TIMEFRAME_MS["15m"]
|
||||||
last_closed = last_closed_bar_open_ms("5m", now)
|
last_closed = last_closed_bar_open_ms("15m", now)
|
||||||
bars = []
|
bars = []
|
||||||
for i in range(30):
|
for i in range(12):
|
||||||
oms = last_closed - (29 - i) * period
|
oms = last_closed - (11 - i) * period
|
||||||
bars.append(
|
bars.append(
|
||||||
{
|
{
|
||||||
"open_time_ms": oms,
|
"open_time_ms": oms,
|
||||||
@@ -148,7 +148,7 @@ class TestHubKlineStore(unittest.TestCase):
|
|||||||
"volume": 10.0,
|
"volume": 10.0,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
upsert_bars("okx", "ETH/USDT", "5m", bars, self.db)
|
upsert_bars("okx", "ETH/USDT", "15m", bars, self.db)
|
||||||
|
|
||||||
def remote_fetch(**kwargs):
|
def remote_fetch(**kwargs):
|
||||||
self.fail("不应请求交易所")
|
self.fail("不应请求交易所")
|
||||||
@@ -159,11 +159,12 @@ class TestHubKlineStore(unittest.TestCase):
|
|||||||
"15m",
|
"15m",
|
||||||
remote_fetch,
|
remote_fetch,
|
||||||
db_path=self.db,
|
db_path=self.db,
|
||||||
limit=5,
|
limit=10,
|
||||||
)
|
)
|
||||||
self.assertTrue(out.get("ok"))
|
self.assertTrue(out.get("ok"))
|
||||||
self.assertEqual(out.get("source"), "aggregate")
|
self.assertEqual(out.get("source"), "db")
|
||||||
self.assertGreaterEqual(len(out.get("candles") or []), 5)
|
self.assertEqual(out.get("storage_timeframe"), "15m")
|
||||||
|
self.assertGreaterEqual(len(out.get("candles") or []), 10)
|
||||||
|
|
||||||
def test_load_bars_before(self):
|
def test_load_bars_before(self):
|
||||||
init_db(self.db)
|
init_db(self.db)
|
||||||
|
|||||||
Reference in New Issue
Block a user