"""中控行情区:各实例 ccxt OHLCV 拉取(hub_bridge /api/hub/ohlcv 共用)。""" from __future__ import annotations import math import time from typing import Any, Callable, Optional CHART_TIMEFRAMES = frozenset( { "1m", "5m", "15m", "30m", "1h", "2h", "4h", "12h", "1d", "1w", } ) CHART_TIMEFRAME_ORDER = ( "1m", "5m", "15m", "30m", "1h", "2h", "4h", "12h", "1d", "1w", ) DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"}) # 部分交易所 ccxt 无原生 12h,或原生 K 线间隔异常时从 1h 聚合 OHLCV_AGGREGATE_FROM: dict[str, str] = { "12h": "1h", } TIMEFRAME_MS: dict[str, int] = { "1m": 60_000, "5m": 5 * 60_000, "15m": 15 * 60_000, "30m": 30 * 60_000, "1h": 60 * 60_000, "2h": 2 * 60 * 60_000, "4h": 4 * 60 * 60_000, "12h": 12 * 60 * 60_000, "1d": 24 * 60 * 60_000, "1w": 7 * 24 * 60 * 60_000, } def normalize_chart_timeframe(raw: str | None, default: str = "5m") -> str: tf = (raw or default).strip().lower() return tf if tf in CHART_TIMEFRAMES else default def bar_limit_for_timeframe(timeframe: str) -> int: tf = normalize_chart_timeframe(timeframe) return 500 if tf in DAILY_PLUS_TIMEFRAMES else 1000 def last_closed_bar_open_ms(timeframe: str, now_ms: int | None = None) -> int: """上一根已收盘 K 的 open_time(毫秒 UTC)。""" tf = normalize_chart_timeframe(timeframe) period = TIMEFRAME_MS[tf] now = int(now_ms if now_ms is not None else time.time() * 1000) current_open = (now // period) * period return int(current_open - period) def window_start_ms(timeframe: str, need: int, retention_days: int, now_ms: int | None = None) -> int: """本地库清理/读库窗口:不超过 retention_days。""" now = int(now_ms if now_ms is not None else time.time() * 1000) period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)] retention_cutoff = now - max(1, int(retention_days)) * 86400000 want = now - max(1, int(need)) * period return max(retention_cutoff, want) def chart_fetch_start_ms(timeframe: str, need: int, now_ms: int | None = None) -> int: """行情展示拉取起点:按 need 根回看(日线 500 / 日内 1000),不受 DB 保留天数限制。""" now = int(now_ms if now_ms is not None else time.time() * 1000) period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)] return max(0, now - max(1, int(need)) * period) def price_tick_from_market(exchange, exchange_symbol: str) -> Optional[float]: """最小价格变动单位(与交易所 tick / price_to_precision 一致)。""" try: if not getattr(exchange, "markets", None): exchange.load_markets() market = exchange.market(exchange_symbol) except Exception: return None info = market.get("info") or {} if isinstance(info, dict): for key in ("tickSize", "tickSz", "price_increment", "order_price_round", "quote_increment"): if info.get(key) not in (None, ""): try: v = float(info[key]) if v > 0: return v except (TypeError, ValueError): pass limits = market.get("limits") or {} price_limits = limits.get("price") or {} if price_limits.get("min") not in (None, ""): try: v = float(price_limits["min"]) if v > 0: return v except (TypeError, ValueError): pass try: sample = exchange.price_to_precision(exchange_symbol, 12345.678901234) s = str(sample).strip() if "." in s: frac = s.split(".", 1)[1] if frac: return 10 ** (-len(frac)) return 1.0 except Exception: pass prec = (market.get("precision") or {}).get("price") if prec is not None: try: p = float(prec) if p >= 1 and abs(p - round(p)) < 1e-9 and p <= 12: return 10 ** (-int(p)) if 0 < p < 1: return p except (TypeError, ValueError): pass return None def _decimals_from_tick(tick: float) -> int: if tick >= 1: return 0 s = f"{tick:.12f}".rstrip("0") if "." in s: frac = s.split(".", 1)[1] if frac: return min(12, len(frac)) return max(0, min(12, int(round(-math.log10(tick))))) def format_price_by_tick(value: Any, tick: Optional[float]) -> str: if value in (None, ""): return "-" try: v = float(value) except (TypeError, ValueError): return str(value) if v == 0: return "0" if tick and tick > 0: return f"{v:.{_decimals_from_tick(float(tick))}f}" av = abs(v) if av >= 10000: d = 2 elif av >= 100: d = 3 elif av >= 1: d = 4 elif av >= 0.01: d = 6 else: d = 8 text = f"{v:.{d}f}" return text.rstrip("0").rstrip(".") if "." in text else text def exchange_supports_timeframe(exchange, timeframe: str) -> bool: tf = normalize_chart_timeframe(timeframe) tfs = getattr(exchange, "timeframes", None) or {} if not tfs: return True return tf in tfs def _median_bar_step_ms(bars: list[dict[str, Any]]) -> Optional[int]: if len(bars) < 2: return None steps: list[int] = [] for i in range(1, min(len(bars), 64)): step = int(bars[i]["open_time_ms"]) - int(bars[i - 1]["open_time_ms"]) if step > 0: steps.append(step) if not steps: return None steps.sort() return steps[len(steps) // 2] def bars_spacing_matches_timeframe( bars: list[dict[str, Any]], timeframe: str, *, tolerance: float = 0.08 ) -> bool: if len(bars) < 2: return True period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)] step = _median_bar_step_ms(bars) if step is None: return False return abs(step - period) <= period * tolerance def align_bar_open_ms(open_time_ms: int, period_ms: int) -> int: return (int(open_time_ms) // period_ms) * period_ms def aggregate_ohlcv_bars( bars: list[dict[str, Any]], target_timeframe: str ) -> list[dict[str, Any]]: """将细周期 OHLCV 聚合为目标周期(UTC 对齐 bucket)。""" tf = normalize_chart_timeframe(target_timeframe) period = TIMEFRAME_MS[tf] buckets: dict[int, dict[str, Any]] = {} for b in bars or []: try: key = align_bar_open_ms(int(b["open_time_ms"]), period) o = float(b["open"]) h = float(b["high"]) l = float(b["low"]) c = float(b["close"]) v = float(b.get("volume") or 0) except (KeyError, TypeError, ValueError): continue cur = buckets.get(key) if cur is None: buckets[key] = { "open_time_ms": key, "open": o, "high": h, "low": l, "close": c, "volume": v, } continue cur["high"] = max(float(cur["high"]), h) cur["low"] = min(float(cur["low"]), l) cur["close"] = c cur["volume"] = float(cur.get("volume") or 0) + v return [buckets[k] for k in sorted(buckets.keys())] def _next_since_from_batch(batch: list, period_ms: int) -> int: last_ts = int(batch[-1][0]) if len(batch) >= 2: step = int(batch[-1][0]) - int(batch[-2][0]) if step > 0: return last_ts + step return last_ts + period_ms def _paginate_fetch_ohlcv( exchange, ex_sym: str, timeframe: str, *, want: int, since_ms: int | None, period_ms: int, chunk_max: int = 300, ) -> list[dict[str, Any]]: tf = normalize_chart_timeframe(timeframe) collected: list = [] if since_ms is not None and int(since_ms) > 0: since = int(since_ms) else: since = max(0, int(time.time() * 1000) - want * period_ms) guard = 0 prev_since = None while len(collected) < want and guard < 80: guard += 1 req_limit = min(chunk_max, want - len(collected)) batch = exchange.fetch_ohlcv( ex_sym, timeframe=tf, since=since, limit=req_limit ) if not batch: break collected.extend(batch) next_since = _next_since_from_batch(batch, period_ms) if prev_since is not None and next_since <= prev_since: break prev_since = since since = next_since bars = _bars_to_dicts(collected) uniq: dict[int, dict[str, Any]] = {} for b in bars: uniq[int(b["open_time_ms"])] = b merged = [uniq[k] for k in sorted(uniq.keys())] if len(merged) > want: merged = merged[-want:] return merged def _bars_to_dicts(ohlcv: list) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] for bar in ohlcv or []: if not bar or len(bar) < 6: continue try: out.append( { "open_time_ms": int(bar[0]), "open": float(bar[1]), "high": float(bar[2]), "low": float(bar[3]), "close": float(bar[4]), "volume": float(bar[5]), } ) except (TypeError, ValueError): continue return out def fetch_ohlcv_for_hub( *, symbol: str, timeframe: str, since_ms: int | None = None, limit: int = 500, normalize_symbol_input: Callable[[Any], str], normalize_exchange_symbol: Callable[[str], str], ensure_markets_loaded: Callable[[], None], exchange, friendly_error: Callable[[Exception], str] | None = None, ) -> dict[str, Any]: """从 ccxt 拉 OHLCV,供 hub_bridge /api/hub/ohlcv 返回。""" tf = normalize_chart_timeframe(timeframe) sym = normalize_symbol_input(symbol) if not sym: return {"ok": False, "msg": "symbol 不能为空"} try: ensure_markets_loaded() ex_sym = normalize_exchange_symbol(sym) want = max(1, min(int(limit or bar_limit_for_timeframe(tf)), 1500)) period = TIMEFRAME_MS[tf] merged: list[dict[str, Any]] = [] src_tf = OHLCV_AGGREGATE_FROM.get(tf) if exchange_supports_timeframe(exchange, tf): candidate = _paginate_fetch_ohlcv( exchange, ex_sym, tf, want=want, since_ms=since_ms, period_ms=period, ) if candidate and bars_spacing_matches_timeframe(candidate, tf): merged = candidate if ( not merged and src_tf and exchange_supports_timeframe(exchange, src_tf) ): src_period = TIMEFRAME_MS[normalize_chart_timeframe(src_tf)] ratio = max(1, int(math.ceil(period / src_period))) src_want = min(1500, want * ratio + ratio * 4) src_bars = _paginate_fetch_ohlcv( exchange, ex_sym, src_tf, want=src_want, since_ms=since_ms, period_ms=src_period, ) if not src_bars or not bars_spacing_matches_timeframe(src_bars, src_tf): return { "ok": False, "msg": f"无法获取 {tf} K 线(细周期 {src_tf} 数据异常)", } merged = aggregate_ohlcv_bars(src_bars, tf) if len(merged) > want: merged = merged[-want:] if not merged: return {"ok": False, "msg": "交易所未返回 K 线"} tick = price_tick_from_market(exchange, ex_sym) return { "ok": True, "symbol": sym, "exchange_symbol": ex_sym, "timeframe": tf, "price_tick": tick, "bars": merged, } except Exception as e: msg = friendly_error(e) if friendly_error else str(e) return {"ok": False, "msg": f"K线加载失败:{msg}"}