Files
crypto_monitor/hub_ohlcv_lib.py
T
dekun c8ffc764e1 Increase default chart initial bar counts per timeframe.
Load 2000 bars for 1m/5m/15m, 1000 for 1h/2h/4h, and 500 for 1d/1w on first screen instead of 300-bar chunked defaults.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-08 08:23:43 +08:00

679 lines
20 KiB
Python

"""中控行情区:各实例 ccxt OHLCV 拉取(hub_bridge /api/hub/ohlcv 共用)。"""
from __future__ import annotations
import math
import os
import time
from typing import Any, Callable, Optional
CHART_TIMEFRAMES = frozenset(
{
"1m",
"5m",
"15m",
"1h",
"2h",
"4h",
"1d",
"1w",
}
)
CHART_TIMEFRAME_ORDER = (
"1m",
"5m",
"15m",
"1h",
"2h",
"4h",
"1d",
"1w",
)
DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"})
# 入库 / 同步真源(各周期直拉交易所,不做本地聚合)
STORED_TIMEFRAMES = frozenset(CHART_TIMEFRAMES)
PERMANENT_STORED_TIMEFRAMES = frozenset({"1d", "1w"})
YEAR_ROLLING_STORED = frozenset({"5m", "15m", "1h", "2h", "4h"})
# 行情区不做展示周期聚合;保留空映射供兼容读取
CHART_DISPLAY_AGGREGATE_FROM: dict[str, str] = {}
SMALL_DISPLAY_TFS = frozenset({"1m", "5m", "15m"})
MID_DISPLAY_TFS = frozenset({"1h", "2h", "4h"})
HUB_KLINE_1M_MAX_BARS = max(1000, int(os.getenv("HUB_KLINE_1M_MAX_BARS", "10000")))
HUB_KLINE_5M_1H_RETENTION_DAYS = max(30, int(os.getenv("HUB_KLINE_5M_1H_RETENTION_DAYS", "365")))
HUB_KLINE_SEED_BARS = max(100, int(os.getenv("HUB_KLINE_SEED_BARS", "500")))
# 交易所无原生周期时的远程拉取 fallback(行情区当前无映射)
OHLCV_AGGREGATE_FROM: dict[str, str] = {}
TIMEFRAME_MS: dict[str, int] = {
"1m": 60_000,
"5m": 5 * 60_000,
"15m": 15 * 60_000,
"1h": 60 * 60_000,
"2h": 2 * 60 * 60_000,
"4h": 4 * 60 * 60_000,
"12h": 12 * 60 * 60_000,
"1d": 24 * 60 * 60_000,
"1w": 7 * 24 * 60 * 60_000,
}
def normalize_chart_timeframe(raw: str | None, default: str = "5m") -> str:
tf = (raw or default).strip().lower()
return tf if tf in CHART_TIMEFRAMES else default
def sync_timeframe_for_display(timeframe: str) -> str:
"""展示周期对应的入库 / 同步周期。"""
tf = normalize_chart_timeframe(timeframe)
return CHART_DISPLAY_AGGREGATE_FROM.get(tf, tf)
def aggregation_source_for_display(timeframe: str) -> str | None:
tf = normalize_chart_timeframe(timeframe)
return CHART_DISPLAY_AGGREGATE_FROM.get(tf)
def aggregate_ratio(display_tf: str, source_tf: str) -> int:
d = normalize_chart_timeframe(display_tf)
s = normalize_chart_timeframe(source_tf)
return max(1, int(TIMEFRAME_MS[d] // TIMEFRAME_MS[s]))
def chart_initial_limit(timeframe: str) -> int:
tf = normalize_chart_timeframe(timeframe)
if tf in SMALL_DISPLAY_TFS:
return 2000
if tf in MID_DISPLAY_TFS:
return 1000
if tf in DAILY_PLUS_TIMEFRAMES:
return 500
return 500
def chart_chunk_limit(timeframe: str) -> int:
tf = normalize_chart_timeframe(timeframe)
if tf in SMALL_DISPLAY_TFS:
return 500
if tf == "1w":
return 150
if tf in MID_DISPLAY_TFS:
return 300
return 200
def chart_memory_cap(timeframe: str) -> int:
tf = normalize_chart_timeframe(timeframe)
if tf in SMALL_DISPLAY_TFS:
return 5000
if tf == "1w":
return 500
return 1000
def bar_limit_for_timeframe(timeframe: str) -> int:
return chart_memory_cap(timeframe)
def storage_retention_days(storage_tf: str) -> int | None:
"""None 表示不按天截断(1m 按根数;1d/1w 永久)。"""
tf = normalize_chart_timeframe(storage_tf)
if tf in YEAR_ROLLING_STORED:
return HUB_KLINE_5M_1H_RETENTION_DAYS
return None
def history_cutoff_ms_for_storage(storage_tf: str, now_ms: int | None = None) -> int:
days = storage_retention_days(storage_tf)
if days is None:
return 0
now = int(now_ms if now_ms is not None else time.time() * 1000)
return max(0, now - int(days) * 86400000)
def seed_bar_target(storage_tf: str) -> int:
tf = normalize_chart_timeframe(storage_tf)
if tf == "1m":
return HUB_KLINE_1M_MAX_BARS
if tf in YEAR_ROLLING_STORED:
period = TIMEFRAME_MS[tf]
return min(
int(86400000 * HUB_KLINE_5M_1H_RETENTION_DAYS / period) + 20,
150000,
)
return HUB_KLINE_SEED_BARS
def retention_policy_meta() -> dict[str, Any]:
year = {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS}
return {
"1m": {"mode": "bars", "max_bars": HUB_KLINE_1M_MAX_BARS},
"5m": dict(year),
"15m": dict(year),
"1h": dict(year),
"2h": dict(year),
"4h": dict(year),
"1d": {"mode": "permanent"},
"1w": {"mode": "permanent"},
"aggregate_from": {},
}
def last_closed_bar_open_ms(timeframe: str, now_ms: int | None = None) -> int:
"""上一根已收盘 K 的 open_time(毫秒 UTC)。"""
tf = normalize_chart_timeframe(timeframe)
period = TIMEFRAME_MS[tf]
now = int(now_ms if now_ms is not None else time.time() * 1000)
current_open = (now // period) * period
return int(current_open - period)
def window_start_ms(timeframe: str, need: int, retention_days: int, now_ms: int | None = None) -> int:
"""本地库清理/读库窗口:不超过 retention_days。"""
now = int(now_ms if now_ms is not None else time.time() * 1000)
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
retention_cutoff = now - max(1, int(retention_days)) * 86400000
want = now - max(1, int(need)) * period
return max(retention_cutoff, want)
def chart_fetch_start_ms(timeframe: str, need: int, now_ms: int | None = None) -> int:
"""行情展示拉取起点:按 need 根回看(日线 500 / 日内 1000),不受 DB 保留天数限制。"""
now = int(now_ms if now_ms is not None else time.time() * 1000)
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
return max(0, now - max(1, int(need)) * period)
def _positive_float(value: Any) -> Optional[float]:
if value in (None, ""):
return None
try:
v = float(value)
except (TypeError, ValueError):
return None
return v if v > 0 else None
def _price_tick_from_market_info(info: dict) -> Optional[float]:
"""从 market.info 解析 tick(含币安 PRICE_FILTER.filters)。"""
for key in ("tickSize", "tickSz", "price_increment", "order_price_round", "quote_increment"):
v = _positive_float(info.get(key))
if v is not None:
return v
for key in ("pricePrecision", "price_precision"):
raw = info.get(key)
if raw in (None, ""):
continue
try:
p = float(raw)
except (TypeError, ValueError):
continue
if p >= 1 and abs(p - round(p)) < 1e-9 and p <= 12:
return 10 ** (-int(p))
if 0 < p < 1:
return p
filters = info.get("filters")
if isinstance(filters, list):
for f in filters:
if not isinstance(f, dict):
continue
if str(f.get("filterType") or "").upper() != "PRICE_FILTER":
continue
v = _positive_float(f.get("tickSize"))
if v is not None:
return v
return None
def round_price_to_tick(value: Any, tick: Optional[float]) -> Optional[float]:
"""按交易所 tick 对齐价格(K 线/标记线与坐标轴一致)。"""
t = normalize_price_tick(tick)
if t is None:
return None
try:
v = float(value)
except (TypeError, ValueError):
return None
n = round(v / t) * t
d = _decimals_from_tick(t)
return float(f"{n:.{d}f}")
def round_ohlcv_bars_to_tick(bars: list[dict[str, Any]], tick: Optional[float]) -> None:
t = normalize_price_tick(tick)
if t is None:
return
for b in bars:
for key in ("open", "high", "low", "close"):
if key in b:
rounded = round_price_to_tick(b.get(key), t)
if rounded is not None:
b[key] = rounded
def price_tick_from_market(exchange, exchange_symbol: str) -> Optional[float]:
"""最小价格变动单位(与交易所 tick / price_to_precision 一致)。"""
try:
if not getattr(exchange, "markets", None):
exchange.load_markets()
market = exchange.market(exchange_symbol)
except Exception:
return None
info = market.get("info") or {}
if isinstance(info, dict):
tick = _price_tick_from_market_info(info)
if tick is not None:
return tick
limits = market.get("limits") or {}
price_limits = limits.get("price") or {}
if price_limits.get("min") not in (None, ""):
try:
v = float(price_limits["min"])
if v > 0:
return v
except (TypeError, ValueError):
pass
try:
sample = exchange.price_to_precision(exchange_symbol, 12345.678901234)
s = str(sample).strip()
if "." in s:
frac = s.split(".", 1)[1]
if frac:
return 10 ** (-len(frac))
return 1.0
except Exception:
pass
prec = (market.get("precision") or {}).get("price")
if prec is not None:
try:
p = float(prec)
if p >= 1 and abs(p - round(p)) < 1e-9 and p <= 12:
return 10 ** (-int(p))
if 0 < p < 1:
return p
except (TypeError, ValueError):
pass
return None
def normalize_price_tick(tick: Optional[float]) -> Optional[float]:
"""将 tick 对齐为 10^-n,避免浮点噪声导致前端 lightweight-charts unexpected base。"""
if tick is None:
return None
try:
t = float(tick)
except (TypeError, ValueError):
return None
if t <= 0:
return None
if t >= 1:
return t
try:
exp = int(round(-math.log10(t)))
except (ValueError, OverflowError):
return None
exp = max(0, min(12, exp))
return 10 ** (-exp)
def _decimals_from_tick(tick: float) -> int:
if tick >= 1:
return 0
s = f"{tick:.12f}".rstrip("0")
if "." in s:
frac = s.split(".", 1)[1]
if frac:
return min(12, len(frac))
return max(0, min(12, int(round(-math.log10(tick)))))
def format_price_by_tick(value: Any, tick: Optional[float]) -> str:
if value in (None, ""):
return "-"
try:
v = float(value)
except (TypeError, ValueError):
return str(value)
if v == 0:
return "0"
if tick and tick > 0:
return f"{v:.{_decimals_from_tick(float(tick))}f}"
av = abs(v)
if av >= 10000:
d = 2
elif av >= 100:
d = 3
elif av >= 1:
d = 4
elif av >= 0.01:
d = 6
else:
d = 8
text = f"{v:.{d}f}"
return text.rstrip("0").rstrip(".") if "." in text else text
def exchange_supports_timeframe(exchange, timeframe: str) -> bool:
tf = normalize_chart_timeframe(timeframe)
tfs = getattr(exchange, "timeframes", None) or {}
if not tfs:
return True
return tf in tfs
def _median_bar_step_ms(bars: list[dict[str, Any]]) -> Optional[int]:
if len(bars) < 2:
return None
steps: list[int] = []
for i in range(1, min(len(bars), 64)):
step = int(bars[i]["open_time_ms"]) - int(bars[i - 1]["open_time_ms"])
if step > 0:
steps.append(step)
if not steps:
return None
steps.sort()
return steps[len(steps) // 2]
def bars_spacing_matches_timeframe(
bars: list[dict[str, Any]], timeframe: str, *, tolerance: float = 0.08
) -> bool:
if len(bars) < 2:
return True
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
step = _median_bar_step_ms(bars)
if step is None:
return False
return abs(step - period) <= period * tolerance
def align_bar_open_ms(open_time_ms: int, period_ms: int) -> int:
return (int(open_time_ms) // period_ms) * period_ms
def snap_to_bar_grid(ts_ms: int, origin_ms: int, step_ms: int) -> int:
step = max(1, int(step_ms))
origin = int(origin_ms)
if ts_ms <= origin:
return origin
idx = (int(ts_ms) - origin + step - 1) // step
return origin + idx * step
def fill_missing_ohlcv_bars(
bars: list[dict[str, Any]],
period_ms: int,
start_ms: int | None = None,
end_ms: int | None = None,
) -> list[dict[str, Any]]:
"""细周期缺口用上一根收盘价填平,保证聚合后 K 线时间轴连续。"""
by_ts: dict[int, dict[str, Any]] = {}
for b in bars or []:
try:
by_ts[int(b["open_time_ms"])] = b
except (KeyError, TypeError, ValueError):
continue
if not by_ts:
return []
keys = sorted(by_ts.keys())
step_ms = max(1, int(period_ms))
origin = keys[0]
aligned_start = snap_to_bar_grid(
int(start_ms if start_ms is not None else keys[0]), origin, step_ms
)
aligned_end = max(
int(end_ms if end_ms is not None else keys[-1]),
keys[-1],
)
out: list[dict[str, Any]] = []
last: dict[str, Any] | None = None
for ts_key in keys:
if ts_key <= aligned_start:
last = by_ts[ts_key]
ts = aligned_start
while ts <= aligned_end:
cur = by_ts.get(ts)
if cur is not None:
last = cur
out.append(cur)
elif last is not None:
c = float(last["close"])
out.append(
{
"open_time_ms": ts,
"open": c,
"high": c,
"low": c,
"close": c,
"volume": 0.0,
"filled": True,
}
)
ts += step_ms
return out
def aggregate_ohlcv_bars(
bars: list[dict[str, Any]], target_timeframe: str
) -> list[dict[str, Any]]:
"""将细周期 OHLCV 聚合为目标周期(UTC 对齐 bucket)。"""
tf = normalize_chart_timeframe(target_timeframe)
period = TIMEFRAME_MS[tf]
buckets: dict[int, dict[str, Any]] = {}
for b in bars or []:
try:
key = align_bar_open_ms(int(b["open_time_ms"]), period)
o = float(b["open"])
h = float(b["high"])
l = float(b["low"])
c = float(b["close"])
v = float(b.get("volume") or 0)
except (KeyError, TypeError, ValueError):
continue
cur = buckets.get(key)
if cur is None:
buckets[key] = {
"open_time_ms": key,
"open": o,
"high": h,
"low": l,
"close": c,
"volume": v,
}
continue
cur["high"] = max(float(cur["high"]), h)
cur["low"] = min(float(cur["low"]), l)
cur["close"] = c
cur["volume"] = float(cur.get("volume") or 0) + v
return [buckets[k] for k in sorted(buckets.keys())]
def _next_since_from_batch(batch: list, period_ms: int) -> int:
last_ts = int(batch[-1][0])
if len(batch) >= 2:
step = int(batch[-1][0]) - int(batch[-2][0])
if step > 0:
return last_ts + step
return last_ts + period_ms
def _paginate_fetch_ohlcv(
exchange,
ex_sym: str,
timeframe: str,
*,
want: int,
since_ms: int | None,
period_ms: int,
chunk_max: int = 300,
) -> list[dict[str, Any]]:
tf = normalize_chart_timeframe(timeframe)
collected: list = []
if since_ms is not None and int(since_ms) > 0:
since = int(since_ms)
else:
since = max(0, int(time.time() * 1000) - want * period_ms)
now_ms = int(time.time() * 1000)
guard = 0
prev_since = None
while len(collected) < want and guard < 80:
guard += 1
if since >= now_ms:
break
req_limit = min(chunk_max, want - len(collected))
try:
batch = exchange.fetch_ohlcv(
ex_sym, timeframe=tf, since=since, limit=req_limit
)
except Exception as e:
err = str(e).lower()
if collected and (
"from" in err
and "to" in err
or "invalid request parameter" in err
):
break
raise
if not batch:
break
collected.extend(batch)
next_since = _next_since_from_batch(batch, period_ms)
if next_since >= now_ms:
break
if prev_since is not None and next_since <= prev_since:
break
prev_since = since
since = next_since
bars = _bars_to_dicts(collected)
uniq: dict[int, dict[str, Any]] = {}
for b in bars:
uniq[int(b["open_time_ms"])] = b
merged = [uniq[k] for k in sorted(uniq.keys())]
if len(merged) > want:
merged = merged[-want:]
return merged
def _bars_to_dicts(ohlcv: list) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for bar in ohlcv or []:
if not bar or len(bar) < 6:
continue
try:
out.append(
{
"open_time_ms": int(bar[0]),
"open": float(bar[1]),
"high": float(bar[2]),
"low": float(bar[3]),
"close": float(bar[4]),
"volume": float(bar[5]),
}
)
except (TypeError, ValueError):
continue
return out
def fetch_ohlcv_for_hub(
*,
symbol: str,
timeframe: str,
since_ms: int | None = None,
limit: int = 500,
normalize_symbol_input: Callable[[Any], str],
normalize_exchange_symbol: Callable[[str], str],
ensure_markets_loaded: Callable[[], None],
exchange,
friendly_error: Callable[[Exception], str] | None = None,
) -> dict[str, Any]:
"""从 ccxt 拉 OHLCV,供 hub_bridge /api/hub/ohlcv 返回。"""
tf = normalize_chart_timeframe(timeframe)
sym = normalize_symbol_input(symbol)
if not sym:
return {"ok": False, "msg": "symbol 不能为空"}
try:
ensure_markets_loaded()
ex_sym = normalize_exchange_symbol(sym)
want = max(1, min(int(limit or bar_limit_for_timeframe(tf)), 1500))
period = TIMEFRAME_MS[tf]
merged: list[dict[str, Any]] = []
src_tf = OHLCV_AGGREGATE_FROM.get(tf)
if exchange_supports_timeframe(exchange, tf):
candidate = _paginate_fetch_ohlcv(
exchange,
ex_sym,
tf,
want=want,
since_ms=since_ms,
period_ms=period,
)
if candidate and bars_spacing_matches_timeframe(candidate, tf):
merged = candidate
if (
not merged
and src_tf
and exchange_supports_timeframe(exchange, src_tf)
):
src_period = TIMEFRAME_MS[normalize_chart_timeframe(src_tf)]
ratio = max(1, int(math.ceil(period / src_period)))
src_want = min(1500, want * ratio + ratio * 4)
src_bars = _paginate_fetch_ohlcv(
exchange,
ex_sym,
src_tf,
want=src_want,
since_ms=since_ms,
period_ms=src_period,
)
if not src_bars or not bars_spacing_matches_timeframe(src_bars, src_tf):
return {
"ok": False,
"msg": f"无法获取 {tf} K 线(细周期 {src_tf} 数据异常)",
}
merged = aggregate_ohlcv_bars(src_bars, tf)
if len(merged) > want:
merged = merged[-want:]
if not merged:
try:
tail = exchange.fetch_ohlcv(
ex_sym, timeframe=tf, limit=min(want, 300)
)
merged = _bars_to_dicts(tail or [])
if len(merged) > want:
merged = merged[-want:]
except Exception:
pass
if not merged:
return {"ok": False, "msg": "交易所未返回 K 线"}
tick = normalize_price_tick(price_tick_from_market(exchange, ex_sym))
round_ohlcv_bars_to_tick(merged, tick)
return {
"ok": True,
"symbol": sym,
"exchange_symbol": ex_sym,
"timeframe": tf,
"price_tick": tick,
"bars": merged,
}
except Exception as e:
msg = friendly_error(e) if friendly_error else str(e)
return {"ok": False, "msg": f"K线加载失败:{msg}"}