Files
crypto_monitor/hub_ohlcv_lib.py
T

198 lines
6.4 KiB
Python

"""中控行情区:各实例 ccxt OHLCV 拉取(hub_bridge /api/hub/ohlcv 共用)。"""
from __future__ import annotations
import math
import time
from typing import Any, Callable, Optional
CHART_TIMEFRAMES = frozenset({"1m", "5m", "15m", "1h", "4h", "1d", "1w"})
DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"})
TIMEFRAME_MS: dict[str, int] = {
"1m": 60_000,
"5m": 5 * 60_000,
"15m": 15 * 60_000,
"1h": 60 * 60_000,
"4h": 4 * 60 * 60_000,
"1d": 24 * 60 * 60_000,
"1w": 7 * 24 * 60 * 60_000,
}
def normalize_chart_timeframe(raw: str | None, default: str = "5m") -> str:
tf = (raw or default).strip().lower()
return tf if tf in CHART_TIMEFRAMES else default
def bar_limit_for_timeframe(timeframe: str) -> int:
tf = normalize_chart_timeframe(timeframe)
return 500 if tf in DAILY_PLUS_TIMEFRAMES else 1000
def last_closed_bar_open_ms(timeframe: str, now_ms: int | None = None) -> int:
"""上一根已收盘 K 的 open_time(毫秒 UTC)。"""
tf = normalize_chart_timeframe(timeframe)
period = TIMEFRAME_MS[tf]
now = int(now_ms if now_ms is not None else time.time() * 1000)
current_open = (now // period) * period
return int(current_open - period)
def window_start_ms(timeframe: str, need: int, retention_days: int, now_ms: int | None = None) -> int:
"""本地库清理/读库窗口:不超过 retention_days。"""
now = int(now_ms if now_ms is not None else time.time() * 1000)
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
retention_cutoff = now - max(1, int(retention_days)) * 86400000
want = now - max(1, int(need)) * period
return max(retention_cutoff, want)
def chart_fetch_start_ms(timeframe: str, need: int, now_ms: int | None = None) -> int:
"""行情展示拉取起点:按 need 根回看(日线 500 / 日内 1000),不受 DB 保留天数限制。"""
now = int(now_ms if now_ms is not None else time.time() * 1000)
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
return max(0, now - max(1, int(need)) * period)
def price_tick_from_market(exchange, exchange_symbol: str) -> Optional[float]:
try:
markets = getattr(exchange, "markets", None) or {}
m = markets.get(exchange_symbol) or {}
prec = m.get("precision") or {}
p = prec.get("price")
if p is not None:
p = float(p)
if p > 0:
return p
info = m.get("info") or {}
for key in ("tickSize", "price_increment", "order_price_round"):
if info.get(key) not in (None, ""):
try:
v = float(info[key])
if v > 0:
return v
except (TypeError, ValueError):
pass
except Exception:
pass
return None
def format_price_by_tick(value: Any, tick: Optional[float]) -> str:
if value in (None, ""):
return "-"
try:
v = float(value)
except (TypeError, ValueError):
return str(value)
if v == 0:
return "0"
if tick and tick > 0:
decimals = max(0, min(12, int(round(-math.log10(tick))) if tick < 1 else 0))
if tick >= 1:
decimals = 0
text = f"{v:.{decimals}f}"
return text.rstrip("0").rstrip(".") if "." in text else text
av = abs(v)
if av >= 10000:
d = 2
elif av >= 100:
d = 3
elif av >= 1:
d = 4
elif av >= 0.01:
d = 6
else:
d = 8
text = f"{v:.{d}f}"
return text.rstrip("0").rstrip(".") if "." in text else text
def _bars_to_dicts(ohlcv: list) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for bar in ohlcv or []:
if not bar or len(bar) < 6:
continue
try:
out.append(
{
"open_time_ms": int(bar[0]),
"open": float(bar[1]),
"high": float(bar[2]),
"low": float(bar[3]),
"close": float(bar[4]),
"volume": float(bar[5]),
}
)
except (TypeError, ValueError):
continue
return out
def fetch_ohlcv_for_hub(
*,
symbol: str,
timeframe: str,
since_ms: int | None = None,
limit: int = 500,
normalize_symbol_input: Callable[[Any], str],
normalize_exchange_symbol: Callable[[str], str],
ensure_markets_loaded: Callable[[], None],
exchange,
friendly_error: Callable[[Exception], str] | None = None,
) -> dict[str, Any]:
"""从 ccxt 拉 OHLCV,供 hub_bridge /api/hub/ohlcv 返回。"""
tf = normalize_chart_timeframe(timeframe)
sym = normalize_symbol_input(symbol)
if not sym:
return {"ok": False, "msg": "symbol 不能为空"}
try:
ensure_markets_loaded()
ex_sym = normalize_exchange_symbol(sym)
want = max(1, min(int(limit or bar_limit_for_timeframe(tf)), 1500))
chunk_max = 300
collected: list = []
if since_ms is not None and int(since_ms) > 0:
since = int(since_ms)
guard = 0
while len(collected) < want and guard < 20:
guard += 1
batch = exchange.fetch_ohlcv(
ex_sym, timeframe=tf, since=since, limit=min(chunk_max, want - len(collected))
)
if not batch:
break
collected.extend(batch)
since = int(batch[-1][0]) + 1
if len(batch) < min(chunk_max, want - len(collected)):
break
else:
batch = exchange.fetch_ohlcv(ex_sym, timeframe=tf, limit=want)
collected = list(batch or [])
bars = _bars_to_dicts(collected)
if not bars:
return {"ok": False, "msg": "交易所未返回 K 线"}
tick = price_tick_from_market(exchange, ex_sym)
uniq: dict[int, dict] = {}
for b in bars:
uniq[int(b["open_time_ms"])] = b
merged = [uniq[k] for k in sorted(uniq.keys())]
if len(merged) > want:
merged = merged[-want:]
return {
"ok": True,
"symbol": sym,
"exchange_symbol": ex_sym,
"timeframe": tf,
"price_tick": tick,
"bars": merged,
}
except Exception as e:
msg = friendly_error(e) if friendly_error else str(e)
return {"ok": False, "msg": f"K线加载失败:{msg}"}