Files
crypto_monitor/hub_ohlcv_lib.py
T
dekun 11cc482599 Refactor market K-line storage with tiered retention and chunked loading.
Store 1m/5m/1h/12h/1d/1w with per-timeframe policies, aggregate 15m and 2h/4h on read, and support left-pan history fetches via before_ms.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-08 07:27:16 +08:00

620 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""中控行情区:各实例 ccxt OHLCV 拉取(hub_bridge /api/hub/ohlcv 共用)。"""
from __future__ import annotations
import math
import os
import time
from typing import Any, Callable, Optional
CHART_TIMEFRAMES = frozenset(
{
"1m",
"5m",
"15m",
"1h",
"2h",
"4h",
"12h",
"1d",
"1w",
}
)
CHART_TIMEFRAME_ORDER = (
"1m",
"5m",
"15m",
"1h",
"2h",
"4h",
"12h",
"1d",
"1w",
)
DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"})
# 入库 / 同步真源(交易所拉取)
STORED_TIMEFRAMES = frozenset({"1m", "5m", "1h", "12h", "1d", "1w"})
PERMANENT_STORED_TIMEFRAMES = frozenset({"12h", "1d", "1w"})
YEAR_ROLLING_STORED = frozenset({"5m", "1h"})
# 展示周期 → 本地聚合源(不落库)
CHART_DISPLAY_AGGREGATE_FROM: dict[str, str] = {
"15m": "5m",
"2h": "1h",
"4h": "1h",
}
SMALL_DISPLAY_TFS = frozenset({"1m", "5m", "15m"})
MID_DISPLAY_TFS = frozenset({"1h", "2h", "4h"})
HUB_KLINE_1M_MAX_BARS = max(1000, int(os.getenv("HUB_KLINE_1M_MAX_BARS", "10000")))
HUB_KLINE_5M_1H_RETENTION_DAYS = max(30, int(os.getenv("HUB_KLINE_5M_1H_RETENTION_DAYS", "365")))
HUB_KLINE_SEED_BARS = max(100, int(os.getenv("HUB_KLINE_SEED_BARS", "500")))
# 部分交易所 ccxt 无原生 12h,或原生 K 线间隔异常时从 1h 聚合(仅远程拉取 fallback
OHLCV_AGGREGATE_FROM: dict[str, str] = {
"12h": "1h",
}
TIMEFRAME_MS: dict[str, int] = {
"1m": 60_000,
"5m": 5 * 60_000,
"15m": 15 * 60_000,
"1h": 60 * 60_000,
"2h": 2 * 60 * 60_000,
"4h": 4 * 60 * 60_000,
"12h": 12 * 60 * 60_000,
"1d": 24 * 60 * 60_000,
"1w": 7 * 24 * 60 * 60_000,
}
def normalize_chart_timeframe(raw: str | None, default: str = "5m") -> str:
tf = (raw or default).strip().lower()
return tf if tf in CHART_TIMEFRAMES else default
def sync_timeframe_for_display(timeframe: str) -> str:
"""展示周期对应的入库 / 同步周期。"""
tf = normalize_chart_timeframe(timeframe)
return CHART_DISPLAY_AGGREGATE_FROM.get(tf, tf)
def aggregation_source_for_display(timeframe: str) -> str | None:
tf = normalize_chart_timeframe(timeframe)
return CHART_DISPLAY_AGGREGATE_FROM.get(tf)
def aggregate_ratio(display_tf: str, source_tf: str) -> int:
d = normalize_chart_timeframe(display_tf)
s = normalize_chart_timeframe(source_tf)
return max(1, int(TIMEFRAME_MS[d] // TIMEFRAME_MS[s]))
def chart_initial_limit(timeframe: str) -> int:
tf = normalize_chart_timeframe(timeframe)
if tf in SMALL_DISPLAY_TFS:
return 300
if tf == "1w":
return 150
return 200
def chart_chunk_limit(timeframe: str) -> int:
tf = normalize_chart_timeframe(timeframe)
if tf in SMALL_DISPLAY_TFS:
return 500
if tf == "1w":
return 150
if tf in MID_DISPLAY_TFS:
return 300
return 200
def chart_memory_cap(timeframe: str) -> int:
tf = normalize_chart_timeframe(timeframe)
if tf in SMALL_DISPLAY_TFS:
return 5000
if tf == "1w":
return 500
return 1000
def bar_limit_for_timeframe(timeframe: str) -> int:
return chart_memory_cap(timeframe)
def storage_retention_days(storage_tf: str) -> int | None:
"""None 表示不按天截断(1m 按根数;12h/1d/1w 永久)。"""
tf = normalize_chart_timeframe(storage_tf)
if tf in YEAR_ROLLING_STORED:
return HUB_KLINE_5M_1H_RETENTION_DAYS
return None
def history_cutoff_ms_for_storage(storage_tf: str, now_ms: int | None = None) -> int:
days = storage_retention_days(storage_tf)
if days is None:
return 0
now = int(now_ms if now_ms is not None else time.time() * 1000)
return max(0, now - int(days) * 86400000)
def seed_bar_target(storage_tf: str) -> int:
tf = normalize_chart_timeframe(storage_tf)
if tf == "1m":
return HUB_KLINE_1M_MAX_BARS
if tf in YEAR_ROLLING_STORED:
period = TIMEFRAME_MS[tf]
return min(
int(86400000 * HUB_KLINE_5M_1H_RETENTION_DAYS / period) + 20,
150000,
)
return HUB_KLINE_SEED_BARS
def retention_policy_meta() -> dict[str, Any]:
return {
"1m": {"mode": "bars", "max_bars": HUB_KLINE_1M_MAX_BARS},
"5m": {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS},
"1h": {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS},
"12h": {"mode": "permanent"},
"1d": {"mode": "permanent"},
"1w": {"mode": "permanent"},
"aggregate_from": dict(CHART_DISPLAY_AGGREGATE_FROM),
}
def last_closed_bar_open_ms(timeframe: str, now_ms: int | None = None) -> int:
"""上一根已收盘 K 的 open_time(毫秒 UTC)。"""
tf = normalize_chart_timeframe(timeframe)
period = TIMEFRAME_MS[tf]
now = int(now_ms if now_ms is not None else time.time() * 1000)
current_open = (now // period) * period
return int(current_open - period)
def window_start_ms(timeframe: str, need: int, retention_days: int, now_ms: int | None = None) -> int:
"""本地库清理/读库窗口:不超过 retention_days。"""
now = int(now_ms if now_ms is not None else time.time() * 1000)
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
retention_cutoff = now - max(1, int(retention_days)) * 86400000
want = now - max(1, int(need)) * period
return max(retention_cutoff, want)
def chart_fetch_start_ms(timeframe: str, need: int, now_ms: int | None = None) -> int:
"""行情展示拉取起点:按 need 根回看(日线 500 / 日内 1000),不受 DB 保留天数限制。"""
now = int(now_ms if now_ms is not None else time.time() * 1000)
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
return max(0, now - max(1, int(need)) * period)
def _positive_float(value: Any) -> Optional[float]:
if value in (None, ""):
return None
try:
v = float(value)
except (TypeError, ValueError):
return None
return v if v > 0 else None
def _price_tick_from_market_info(info: dict) -> Optional[float]:
"""从 market.info 解析 tick(含币安 PRICE_FILTER.filters)。"""
for key in ("tickSize", "tickSz", "price_increment", "order_price_round", "quote_increment"):
v = _positive_float(info.get(key))
if v is not None:
return v
for key in ("pricePrecision", "price_precision"):
raw = info.get(key)
if raw in (None, ""):
continue
try:
p = float(raw)
except (TypeError, ValueError):
continue
if p >= 1 and abs(p - round(p)) < 1e-9 and p <= 12:
return 10 ** (-int(p))
if 0 < p < 1:
return p
filters = info.get("filters")
if isinstance(filters, list):
for f in filters:
if not isinstance(f, dict):
continue
if str(f.get("filterType") or "").upper() != "PRICE_FILTER":
continue
v = _positive_float(f.get("tickSize"))
if v is not None:
return v
return None
def round_price_to_tick(value: Any, tick: Optional[float]) -> Optional[float]:
"""按交易所 tick 对齐价格(K 线/标记线与坐标轴一致)。"""
t = normalize_price_tick(tick)
if t is None:
return None
try:
v = float(value)
except (TypeError, ValueError):
return None
n = round(v / t) * t
d = _decimals_from_tick(t)
return float(f"{n:.{d}f}")
def round_ohlcv_bars_to_tick(bars: list[dict[str, Any]], tick: Optional[float]) -> None:
t = normalize_price_tick(tick)
if t is None:
return
for b in bars:
for key in ("open", "high", "low", "close"):
if key in b:
rounded = round_price_to_tick(b.get(key), t)
if rounded is not None:
b[key] = rounded
def price_tick_from_market(exchange, exchange_symbol: str) -> Optional[float]:
"""最小价格变动单位(与交易所 tick / price_to_precision 一致)。"""
try:
if not getattr(exchange, "markets", None):
exchange.load_markets()
market = exchange.market(exchange_symbol)
except Exception:
return None
info = market.get("info") or {}
if isinstance(info, dict):
tick = _price_tick_from_market_info(info)
if tick is not None:
return tick
limits = market.get("limits") or {}
price_limits = limits.get("price") or {}
if price_limits.get("min") not in (None, ""):
try:
v = float(price_limits["min"])
if v > 0:
return v
except (TypeError, ValueError):
pass
try:
sample = exchange.price_to_precision(exchange_symbol, 12345.678901234)
s = str(sample).strip()
if "." in s:
frac = s.split(".", 1)[1]
if frac:
return 10 ** (-len(frac))
return 1.0
except Exception:
pass
prec = (market.get("precision") or {}).get("price")
if prec is not None:
try:
p = float(prec)
if p >= 1 and abs(p - round(p)) < 1e-9 and p <= 12:
return 10 ** (-int(p))
if 0 < p < 1:
return p
except (TypeError, ValueError):
pass
return None
def normalize_price_tick(tick: Optional[float]) -> Optional[float]:
"""将 tick 对齐为 10^-n,避免浮点噪声导致前端 lightweight-charts unexpected base。"""
if tick is None:
return None
try:
t = float(tick)
except (TypeError, ValueError):
return None
if t <= 0:
return None
if t >= 1:
return t
try:
exp = int(round(-math.log10(t)))
except (ValueError, OverflowError):
return None
exp = max(0, min(12, exp))
return 10 ** (-exp)
def _decimals_from_tick(tick: float) -> int:
if tick >= 1:
return 0
s = f"{tick:.12f}".rstrip("0")
if "." in s:
frac = s.split(".", 1)[1]
if frac:
return min(12, len(frac))
return max(0, min(12, int(round(-math.log10(tick)))))
def format_price_by_tick(value: Any, tick: Optional[float]) -> str:
if value in (None, ""):
return "-"
try:
v = float(value)
except (TypeError, ValueError):
return str(value)
if v == 0:
return "0"
if tick and tick > 0:
return f"{v:.{_decimals_from_tick(float(tick))}f}"
av = abs(v)
if av >= 10000:
d = 2
elif av >= 100:
d = 3
elif av >= 1:
d = 4
elif av >= 0.01:
d = 6
else:
d = 8
text = f"{v:.{d}f}"
return text.rstrip("0").rstrip(".") if "." in text else text
def exchange_supports_timeframe(exchange, timeframe: str) -> bool:
tf = normalize_chart_timeframe(timeframe)
tfs = getattr(exchange, "timeframes", None) or {}
if not tfs:
return True
return tf in tfs
def _median_bar_step_ms(bars: list[dict[str, Any]]) -> Optional[int]:
if len(bars) < 2:
return None
steps: list[int] = []
for i in range(1, min(len(bars), 64)):
step = int(bars[i]["open_time_ms"]) - int(bars[i - 1]["open_time_ms"])
if step > 0:
steps.append(step)
if not steps:
return None
steps.sort()
return steps[len(steps) // 2]
def bars_spacing_matches_timeframe(
bars: list[dict[str, Any]], timeframe: str, *, tolerance: float = 0.08
) -> bool:
if len(bars) < 2:
return True
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
step = _median_bar_step_ms(bars)
if step is None:
return False
return abs(step - period) <= period * tolerance
def align_bar_open_ms(open_time_ms: int, period_ms: int) -> int:
return (int(open_time_ms) // period_ms) * period_ms
def aggregate_ohlcv_bars(
bars: list[dict[str, Any]], target_timeframe: str
) -> list[dict[str, Any]]:
"""将细周期 OHLCV 聚合为目标周期(UTC 对齐 bucket)。"""
tf = normalize_chart_timeframe(target_timeframe)
period = TIMEFRAME_MS[tf]
buckets: dict[int, dict[str, Any]] = {}
for b in bars or []:
try:
key = align_bar_open_ms(int(b["open_time_ms"]), period)
o = float(b["open"])
h = float(b["high"])
l = float(b["low"])
c = float(b["close"])
v = float(b.get("volume") or 0)
except (KeyError, TypeError, ValueError):
continue
cur = buckets.get(key)
if cur is None:
buckets[key] = {
"open_time_ms": key,
"open": o,
"high": h,
"low": l,
"close": c,
"volume": v,
}
continue
cur["high"] = max(float(cur["high"]), h)
cur["low"] = min(float(cur["low"]), l)
cur["close"] = c
cur["volume"] = float(cur.get("volume") or 0) + v
return [buckets[k] for k in sorted(buckets.keys())]
def _next_since_from_batch(batch: list, period_ms: int) -> int:
last_ts = int(batch[-1][0])
if len(batch) >= 2:
step = int(batch[-1][0]) - int(batch[-2][0])
if step > 0:
return last_ts + step
return last_ts + period_ms
def _paginate_fetch_ohlcv(
exchange,
ex_sym: str,
timeframe: str,
*,
want: int,
since_ms: int | None,
period_ms: int,
chunk_max: int = 300,
) -> list[dict[str, Any]]:
tf = normalize_chart_timeframe(timeframe)
collected: list = []
if since_ms is not None and int(since_ms) > 0:
since = int(since_ms)
else:
since = max(0, int(time.time() * 1000) - want * period_ms)
now_ms = int(time.time() * 1000)
guard = 0
prev_since = None
while len(collected) < want and guard < 80:
guard += 1
if since >= now_ms:
break
req_limit = min(chunk_max, want - len(collected))
try:
batch = exchange.fetch_ohlcv(
ex_sym, timeframe=tf, since=since, limit=req_limit
)
except Exception as e:
err = str(e).lower()
if collected and (
"from" in err
and "to" in err
or "invalid request parameter" in err
):
break
raise
if not batch:
break
collected.extend(batch)
next_since = _next_since_from_batch(batch, period_ms)
if next_since >= now_ms:
break
if prev_since is not None and next_since <= prev_since:
break
prev_since = since
since = next_since
bars = _bars_to_dicts(collected)
uniq: dict[int, dict[str, Any]] = {}
for b in bars:
uniq[int(b["open_time_ms"])] = b
merged = [uniq[k] for k in sorted(uniq.keys())]
if len(merged) > want:
merged = merged[-want:]
return merged
def _bars_to_dicts(ohlcv: list) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for bar in ohlcv or []:
if not bar or len(bar) < 6:
continue
try:
out.append(
{
"open_time_ms": int(bar[0]),
"open": float(bar[1]),
"high": float(bar[2]),
"low": float(bar[3]),
"close": float(bar[4]),
"volume": float(bar[5]),
}
)
except (TypeError, ValueError):
continue
return out
def fetch_ohlcv_for_hub(
*,
symbol: str,
timeframe: str,
since_ms: int | None = None,
limit: int = 500,
normalize_symbol_input: Callable[[Any], str],
normalize_exchange_symbol: Callable[[str], str],
ensure_markets_loaded: Callable[[], None],
exchange,
friendly_error: Callable[[Exception], str] | None = None,
) -> dict[str, Any]:
"""从 ccxt 拉 OHLCV,供 hub_bridge /api/hub/ohlcv 返回。"""
tf = normalize_chart_timeframe(timeframe)
sym = normalize_symbol_input(symbol)
if not sym:
return {"ok": False, "msg": "symbol 不能为空"}
try:
ensure_markets_loaded()
ex_sym = normalize_exchange_symbol(sym)
want = max(1, min(int(limit or bar_limit_for_timeframe(tf)), 1500))
period = TIMEFRAME_MS[tf]
merged: list[dict[str, Any]] = []
src_tf = OHLCV_AGGREGATE_FROM.get(tf)
if exchange_supports_timeframe(exchange, tf):
candidate = _paginate_fetch_ohlcv(
exchange,
ex_sym,
tf,
want=want,
since_ms=since_ms,
period_ms=period,
)
if candidate and bars_spacing_matches_timeframe(candidate, tf):
merged = candidate
if (
not merged
and src_tf
and exchange_supports_timeframe(exchange, src_tf)
):
src_period = TIMEFRAME_MS[normalize_chart_timeframe(src_tf)]
ratio = max(1, int(math.ceil(period / src_period)))
src_want = min(1500, want * ratio + ratio * 4)
src_bars = _paginate_fetch_ohlcv(
exchange,
ex_sym,
src_tf,
want=src_want,
since_ms=since_ms,
period_ms=src_period,
)
if not src_bars or not bars_spacing_matches_timeframe(src_bars, src_tf):
return {
"ok": False,
"msg": f"无法获取 {tf} K 线(细周期 {src_tf} 数据异常)",
}
merged = aggregate_ohlcv_bars(src_bars, tf)
if len(merged) > want:
merged = merged[-want:]
if not merged:
try:
tail = exchange.fetch_ohlcv(
ex_sym, timeframe=tf, limit=min(want, 300)
)
merged = _bars_to_dicts(tail or [])
if len(merged) > want:
merged = merged[-want:]
except Exception:
pass
if not merged:
return {"ok": False, "msg": "交易所未返回 K 线"}
tick = normalize_price_tick(price_tick_from_market(exchange, ex_sym))
round_ohlcv_bars_to_tick(merged, tick)
return {
"ok": True,
"symbol": sym,
"exchange_symbol": ex_sym,
"timeframe": tf,
"price_tick": tick,
"bars": merged,
}
except Exception as e:
msg = friendly_error(e) if friendly_error else str(e)
return {"ok": False, "msg": f"K线加载失败:{msg}"}