"""中控行情区:各实例 ccxt OHLCV 拉取(hub_bridge /api/hub/ohlcv 共用)。""" from __future__ import annotations import math import time from typing import Any, Callable, Optional CHART_TIMEFRAMES = frozenset( { "1m", "3m", "5m", "10m", "15m", "20m", "30m", "1h", "2h", "4h", "6h", "8h", "12h", "1d", "1w", } ) CHART_TIMEFRAME_ORDER = ( "1m", "3m", "5m", "10m", "15m", "20m", "30m", "1h", "2h", "4h", "6h", "8h", "12h", "1d", "1w", ) DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"}) TIMEFRAME_MS: dict[str, int] = { "1m": 60_000, "3m": 3 * 60_000, "5m": 5 * 60_000, "10m": 10 * 60_000, "15m": 15 * 60_000, "20m": 20 * 60_000, "30m": 30 * 60_000, "1h": 60 * 60_000, "2h": 2 * 60 * 60_000, "4h": 4 * 60 * 60_000, "6h": 6 * 60 * 60_000, "8h": 8 * 60 * 60_000, "12h": 12 * 60 * 60_000, "1d": 24 * 60 * 60_000, "1w": 7 * 24 * 60 * 60_000, } def normalize_chart_timeframe(raw: str | None, default: str = "5m") -> str: tf = (raw or default).strip().lower() return tf if tf in CHART_TIMEFRAMES else default def bar_limit_for_timeframe(timeframe: str) -> int: tf = normalize_chart_timeframe(timeframe) return 500 if tf in DAILY_PLUS_TIMEFRAMES else 1000 def last_closed_bar_open_ms(timeframe: str, now_ms: int | None = None) -> int: """上一根已收盘 K 的 open_time(毫秒 UTC)。""" tf = normalize_chart_timeframe(timeframe) period = TIMEFRAME_MS[tf] now = int(now_ms if now_ms is not None else time.time() * 1000) current_open = (now // period) * period return int(current_open - period) def window_start_ms(timeframe: str, need: int, retention_days: int, now_ms: int | None = None) -> int: """本地库清理/读库窗口:不超过 retention_days。""" now = int(now_ms if now_ms is not None else time.time() * 1000) period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)] retention_cutoff = now - max(1, int(retention_days)) * 86400000 want = now - max(1, int(need)) * period return max(retention_cutoff, want) def chart_fetch_start_ms(timeframe: str, need: int, now_ms: int | None = None) -> int: """行情展示拉取起点:按 need 根回看(日线 500 / 日内 1000),不受 DB 保留天数限制。""" now = int(now_ms if now_ms is not None else time.time() * 1000) period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)] return max(0, now - max(1, int(need)) * period) def price_tick_from_market(exchange, exchange_symbol: str) -> Optional[float]: """最小价格变动单位(与交易所 tick / price_to_precision 一致)。""" try: if not getattr(exchange, "markets", None): exchange.load_markets() market = exchange.market(exchange_symbol) except Exception: return None info = market.get("info") or {} if isinstance(info, dict): for key in ("tickSize", "tickSz", "price_increment", "order_price_round", "quote_increment"): if info.get(key) not in (None, ""): try: v = float(info[key]) if v > 0: return v except (TypeError, ValueError): pass limits = market.get("limits") or {} price_limits = limits.get("price") or {} if price_limits.get("min") not in (None, ""): try: v = float(price_limits["min"]) if v > 0: return v except (TypeError, ValueError): pass try: sample = exchange.price_to_precision(exchange_symbol, 12345.678901234) s = str(sample).strip() if "." in s: frac = s.split(".", 1)[1] if frac: return 10 ** (-len(frac)) return 1.0 except Exception: pass prec = (market.get("precision") or {}).get("price") if prec is not None: try: p = float(prec) if p >= 1 and abs(p - round(p)) < 1e-9 and p <= 12: return 10 ** (-int(p)) if 0 < p < 1: return p except (TypeError, ValueError): pass return None def _decimals_from_tick(tick: float) -> int: if tick >= 1: return 0 s = f"{tick:.12f}".rstrip("0") if "." in s: frac = s.split(".", 1)[1] if frac: return min(12, len(frac)) return max(0, min(12, int(round(-math.log10(tick))))) def format_price_by_tick(value: Any, tick: Optional[float]) -> str: if value in (None, ""): return "-" try: v = float(value) except (TypeError, ValueError): return str(value) if v == 0: return "0" if tick and tick > 0: return f"{v:.{_decimals_from_tick(float(tick))}f}" av = abs(v) if av >= 10000: d = 2 elif av >= 100: d = 3 elif av >= 1: d = 4 elif av >= 0.01: d = 6 else: d = 8 text = f"{v:.{d}f}" return text.rstrip("0").rstrip(".") if "." in text else text def _bars_to_dicts(ohlcv: list) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] for bar in ohlcv or []: if not bar or len(bar) < 6: continue try: out.append( { "open_time_ms": int(bar[0]), "open": float(bar[1]), "high": float(bar[2]), "low": float(bar[3]), "close": float(bar[4]), "volume": float(bar[5]), } ) except (TypeError, ValueError): continue return out def fetch_ohlcv_for_hub( *, symbol: str, timeframe: str, since_ms: int | None = None, limit: int = 500, normalize_symbol_input: Callable[[Any], str], normalize_exchange_symbol: Callable[[str], str], ensure_markets_loaded: Callable[[], None], exchange, friendly_error: Callable[[Exception], str] | None = None, ) -> dict[str, Any]: """从 ccxt 拉 OHLCV,供 hub_bridge /api/hub/ohlcv 返回。""" tf = normalize_chart_timeframe(timeframe) sym = normalize_symbol_input(symbol) if not sym: return {"ok": False, "msg": "symbol 不能为空"} try: ensure_markets_loaded() ex_sym = normalize_exchange_symbol(sym) want = max(1, min(int(limit or bar_limit_for_timeframe(tf)), 1500)) chunk_max = 300 period = TIMEFRAME_MS[tf] collected: list = [] if since_ms is not None and int(since_ms) > 0: since = int(since_ms) else: # OKX/Gate 等无 since 时单次常被限制在 ~300 根,须从目标起点分页向前拉 since = max(0, int(time.time() * 1000) - want * period) guard = 0 prev_since = None while len(collected) < want and guard < 80: guard += 1 req_limit = min(chunk_max, want - len(collected)) batch = exchange.fetch_ohlcv( ex_sym, timeframe=tf, since=since, limit=req_limit ) if not batch: break collected.extend(batch) next_since = int(batch[-1][0]) + period if prev_since is not None and next_since <= prev_since: break prev_since = since since = next_since bars = _bars_to_dicts(collected) if not bars: return {"ok": False, "msg": "交易所未返回 K 线"} tick = price_tick_from_market(exchange, ex_sym) uniq: dict[int, dict] = {} for b in bars: uniq[int(b["open_time_ms"])] = b merged = [uniq[k] for k in sorted(uniq.keys())] if len(merged) > want: merged = merged[-want:] return { "ok": True, "symbol": sym, "exchange_symbol": ex_sym, "timeframe": tf, "price_tick": tick, "bars": merged, } except Exception as e: msg = friendly_error(e) if friendly_error else str(e) return {"ok": False, "msg": f"K线加载失败:{msg}"}