"""中控行情区:各实例 ccxt OHLCV 拉取(hub_bridge /api/hub/ohlcv 共用)。""" from __future__ import annotations import math import os import time from typing import Any, Callable, Optional CHART_TIMEFRAMES = frozenset( { "1m", "5m", "15m", "1h", "2h", "4h", "1d", "1w", } ) CHART_TIMEFRAME_ORDER = ( "1m", "5m", "15m", "1h", "2h", "4h", "1d", "1w", ) DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"}) # 入库 / 同步真源(各周期直拉交易所,不做本地聚合) STORED_TIMEFRAMES = frozenset(CHART_TIMEFRAMES) PERMANENT_STORED_TIMEFRAMES = frozenset({"1d", "1w"}) YEAR_ROLLING_STORED = frozenset({"5m", "15m", "1h", "2h", "4h"}) # 行情区不做展示周期聚合;保留空映射供兼容读取 CHART_DISPLAY_AGGREGATE_FROM: dict[str, str] = {} SMALL_DISPLAY_TFS = frozenset({"1m", "5m", "15m"}) MID_DISPLAY_TFS = frozenset({"1h", "2h", "4h"}) HUB_KLINE_1M_MAX_BARS = max(1000, int(os.getenv("HUB_KLINE_1M_MAX_BARS", "10000"))) HUB_KLINE_5M_1H_RETENTION_DAYS = max(30, int(os.getenv("HUB_KLINE_5M_1H_RETENTION_DAYS", "365"))) HUB_KLINE_SEED_BARS = max(100, int(os.getenv("HUB_KLINE_SEED_BARS", "500"))) # 交易所无原生周期时的远程拉取 fallback(行情区当前无映射) OHLCV_AGGREGATE_FROM: dict[str, str] = {} TIMEFRAME_MS: dict[str, int] = { "1m": 60_000, "5m": 5 * 60_000, "15m": 15 * 60_000, "1h": 60 * 60_000, "2h": 2 * 60 * 60_000, "4h": 4 * 60 * 60_000, "12h": 12 * 60 * 60_000, "1d": 24 * 60 * 60_000, "1w": 7 * 24 * 60 * 60_000, } def normalize_chart_timeframe(raw: str | None, default: str = "5m") -> str: tf = (raw or default).strip().lower() return tf if tf in CHART_TIMEFRAMES else default def normalize_perpetual_symbol(symbol: str) -> str: """BTC/USDT → BTC/USDT:USDT(与四所 ccxt swap 行情一致)。""" sym = (symbol or "").strip().upper() if not sym: return "" if ":" in sym: return sym if "/" in sym: base, quote = sym.split("/", 1) quote_clean = quote.split(":")[0] return f"{base}/{quote_clean}:{quote_clean}" return sym def sync_timeframe_for_display(timeframe: str) -> str: """展示周期对应的入库 / 同步周期。""" tf = normalize_chart_timeframe(timeframe) return CHART_DISPLAY_AGGREGATE_FROM.get(tf, tf) def aggregation_source_for_display(timeframe: str) -> str | None: tf = normalize_chart_timeframe(timeframe) return CHART_DISPLAY_AGGREGATE_FROM.get(tf) def aggregate_ratio(display_tf: str, source_tf: str) -> int: d = normalize_chart_timeframe(display_tf) s = normalize_chart_timeframe(source_tf) return max(1, int(TIMEFRAME_MS[d] // TIMEFRAME_MS[s])) def chart_initial_limit(timeframe: str) -> int: tf = normalize_chart_timeframe(timeframe) if tf in SMALL_DISPLAY_TFS: return 2000 if tf in MID_DISPLAY_TFS: return 1000 if tf in DAILY_PLUS_TIMEFRAMES: return 500 return 500 def chart_chunk_limit(timeframe: str) -> int: tf = normalize_chart_timeframe(timeframe) if tf in SMALL_DISPLAY_TFS: return 500 if tf == "1w": return 150 if tf in MID_DISPLAY_TFS: return 300 return 200 def chart_memory_cap(timeframe: str) -> int: tf = normalize_chart_timeframe(timeframe) if tf in SMALL_DISPLAY_TFS: return 5000 if tf == "1w": return 500 return 1000 def bar_limit_for_timeframe(timeframe: str) -> int: return chart_memory_cap(timeframe) def storage_retention_days(storage_tf: str) -> int | None: """None 表示不按天截断(1m 按根数;1d/1w 永久)。""" tf = normalize_chart_timeframe(storage_tf) if tf in YEAR_ROLLING_STORED: return HUB_KLINE_5M_1H_RETENTION_DAYS return None def history_cutoff_ms_for_storage(storage_tf: str, now_ms: int | None = None) -> int: days = storage_retention_days(storage_tf) if days is None: return 0 now = int(now_ms if now_ms is not None else time.time() * 1000) return max(0, now - int(days) * 86400000) def seed_bar_target(storage_tf: str) -> int: tf = normalize_chart_timeframe(storage_tf) if tf == "1m": return HUB_KLINE_1M_MAX_BARS if tf in YEAR_ROLLING_STORED: period = TIMEFRAME_MS[tf] return min( int(86400000 * HUB_KLINE_5M_1H_RETENTION_DAYS / period) + 20, 150000, ) return HUB_KLINE_SEED_BARS def retention_policy_meta() -> dict[str, Any]: year = {"mode": "days", "days": HUB_KLINE_5M_1H_RETENTION_DAYS} return { "1m": {"mode": "bars", "max_bars": HUB_KLINE_1M_MAX_BARS}, "5m": dict(year), "15m": dict(year), "1h": dict(year), "2h": dict(year), "4h": dict(year), "1d": {"mode": "permanent"}, "1w": {"mode": "permanent"}, "aggregate_from": {}, } def last_closed_bar_open_ms(timeframe: str, now_ms: int | None = None) -> int: """上一根已收盘 K 的 open_time(毫秒 UTC)。""" tf = normalize_chart_timeframe(timeframe) period = TIMEFRAME_MS[tf] now = int(now_ms if now_ms is not None else time.time() * 1000) current_open = (now // period) * period return int(current_open - period) def window_start_ms(timeframe: str, need: int, retention_days: int, now_ms: int | None = None) -> int: """本地库清理/读库窗口:不超过 retention_days。""" now = int(now_ms if now_ms is not None else time.time() * 1000) period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)] retention_cutoff = now - max(1, int(retention_days)) * 86400000 want = now - max(1, int(need)) * period return max(retention_cutoff, want) def chart_fetch_start_ms(timeframe: str, need: int, now_ms: int | None = None) -> int: """行情展示拉取起点:按 need 根回看(日线 500 / 日内 1000),不受 DB 保留天数限制。""" now = int(now_ms if now_ms is not None else time.time() * 1000) period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)] return max(0, now - max(1, int(need)) * period) def _positive_float(value: Any) -> Optional[float]: if value in (None, ""): return None try: v = float(value) except (TypeError, ValueError): return None return v if v > 0 else None def _price_tick_from_market_info(info: dict) -> Optional[float]: """从 market.info 解析 tick(含币安 PRICE_FILTER.filters)。""" for key in ("tickSize", "tickSz", "price_increment", "order_price_round", "quote_increment"): v = _positive_float(info.get(key)) if v is not None: return v for key in ("pricePrecision", "price_precision"): raw = info.get(key) if raw in (None, ""): continue try: p = float(raw) except (TypeError, ValueError): continue if p >= 1 and abs(p - round(p)) < 1e-9 and p <= 12: return 10 ** (-int(p)) if 0 < p < 1: return p filters = info.get("filters") if isinstance(filters, list): for f in filters: if not isinstance(f, dict): continue if str(f.get("filterType") or "").upper() != "PRICE_FILTER": continue v = _positive_float(f.get("tickSize")) if v is not None: return v return None def round_price_to_tick(value: Any, tick: Optional[float]) -> Optional[float]: """按交易所 tick 对齐价格(K 线/标记线与坐标轴一致)。""" t = normalize_price_tick(tick) if t is None: return None try: v = float(value) except (TypeError, ValueError): return None n = round(v / t) * t d = _decimals_from_tick(t) return float(f"{n:.{d}f}") def round_ohlcv_bars_to_tick(bars: list[dict[str, Any]], tick: Optional[float]) -> None: t = normalize_price_tick(tick) if t is None: return for b in bars: for key in ("open", "high", "low", "close"): if key in b: rounded = round_price_to_tick(b.get(key), t) if rounded is not None: b[key] = rounded def price_tick_from_market(exchange, exchange_symbol: str) -> Optional[float]: """最小价格变动单位(与交易所 tick / price_to_precision 一致)。""" try: if not getattr(exchange, "markets", None): exchange.load_markets() market = exchange.market(exchange_symbol) except Exception: return None info = market.get("info") or {} if isinstance(info, dict): tick = _price_tick_from_market_info(info) if tick is not None: return tick limits = market.get("limits") or {} price_limits = limits.get("price") or {} if price_limits.get("min") not in (None, ""): try: v = float(price_limits["min"]) if v > 0: return v except (TypeError, ValueError): pass try: sample = exchange.price_to_precision(exchange_symbol, 12345.678901234) s = str(sample).strip() if "." in s: frac = s.split(".", 1)[1] if frac: return 10 ** (-len(frac)) return 1.0 except Exception: pass prec = (market.get("precision") or {}).get("price") if prec is not None: try: p = float(prec) if p >= 1 and abs(p - round(p)) < 1e-9 and p <= 12: return 10 ** (-int(p)) if 0 < p < 1: return p except (TypeError, ValueError): pass return None def normalize_price_tick(tick: Optional[float]) -> Optional[float]: """将 tick 对齐为 10^-n,避免浮点噪声导致前端 lightweight-charts unexpected base。""" if tick is None: return None try: t = float(tick) except (TypeError, ValueError): return None if t <= 0: return None if t >= 1: return t try: exp = int(round(-math.log10(t))) except (ValueError, OverflowError): return None exp = max(0, min(12, exp)) return 10 ** (-exp) def _decimals_from_tick(tick: float) -> int: if tick >= 1: return 0 s = f"{tick:.12f}".rstrip("0") if "." in s: frac = s.split(".", 1)[1] if frac: return min(12, len(frac)) return max(0, min(12, int(round(-math.log10(tick))))) def format_price_by_tick(value: Any, tick: Optional[float]) -> str: if value in (None, ""): return "-" try: v = float(value) except (TypeError, ValueError): return str(value) if v == 0: return "0" if tick and tick > 0: return f"{v:.{_decimals_from_tick(float(tick))}f}" av = abs(v) if av >= 10000: d = 2 elif av >= 100: d = 3 elif av >= 1: d = 4 elif av >= 0.01: d = 6 else: d = 8 text = f"{v:.{d}f}" return text.rstrip("0").rstrip(".") if "." in text else text def exchange_supports_timeframe(exchange, timeframe: str) -> bool: tf = normalize_chart_timeframe(timeframe) tfs = getattr(exchange, "timeframes", None) or {} if not tfs: return True return tf in tfs def _median_bar_step_ms(bars: list[dict[str, Any]]) -> Optional[int]: if len(bars) < 2: return None steps: list[int] = [] for i in range(1, min(len(bars), 64)): step = int(bars[i]["open_time_ms"]) - int(bars[i - 1]["open_time_ms"]) if step > 0: steps.append(step) if not steps: return None steps.sort() return steps[len(steps) // 2] def bars_spacing_matches_timeframe( bars: list[dict[str, Any]], timeframe: str, *, tolerance: float = 0.08 ) -> bool: if len(bars) < 2: return True period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)] step = _median_bar_step_ms(bars) if step is None: return False return abs(step - period) <= period * tolerance def align_bar_open_ms(open_time_ms: int, period_ms: int) -> int: return (int(open_time_ms) // period_ms) * period_ms def snap_to_bar_grid(ts_ms: int, origin_ms: int, step_ms: int) -> int: step = max(1, int(step_ms)) origin = int(origin_ms) if ts_ms <= origin: return origin idx = (int(ts_ms) - origin + step - 1) // step return origin + idx * step def fill_missing_ohlcv_bars( bars: list[dict[str, Any]], period_ms: int, start_ms: int | None = None, end_ms: int | None = None, ) -> list[dict[str, Any]]: """细周期缺口用上一根收盘价填平,保证聚合后 K 线时间轴连续。""" by_ts: dict[int, dict[str, Any]] = {} for b in bars or []: try: by_ts[int(b["open_time_ms"])] = b except (KeyError, TypeError, ValueError): continue if not by_ts: return [] keys = sorted(by_ts.keys()) step_ms = max(1, int(period_ms)) origin = keys[0] aligned_start = snap_to_bar_grid( int(start_ms if start_ms is not None else keys[0]), origin, step_ms ) aligned_end = max( int(end_ms if end_ms is not None else keys[-1]), keys[-1], ) out: list[dict[str, Any]] = [] last: dict[str, Any] | None = None for ts_key in keys: if ts_key <= aligned_start: last = by_ts[ts_key] ts = aligned_start while ts <= aligned_end: cur = by_ts.get(ts) if cur is not None: last = cur out.append(cur) elif last is not None: c = float(last["close"]) out.append( { "open_time_ms": ts, "open": c, "high": c, "low": c, "close": c, "volume": 0.0, "filled": True, } ) ts += step_ms return out def aggregate_ohlcv_bars( bars: list[dict[str, Any]], target_timeframe: str ) -> list[dict[str, Any]]: """将细周期 OHLCV 聚合为目标周期(UTC 对齐 bucket)。""" tf = normalize_chart_timeframe(target_timeframe) period = TIMEFRAME_MS[tf] buckets: dict[int, dict[str, Any]] = {} for b in bars or []: try: key = align_bar_open_ms(int(b["open_time_ms"]), period) o = float(b["open"]) h = float(b["high"]) l = float(b["low"]) c = float(b["close"]) v = float(b.get("volume") or 0) except (KeyError, TypeError, ValueError): continue cur = buckets.get(key) if cur is None: buckets[key] = { "open_time_ms": key, "open": o, "high": h, "low": l, "close": c, "volume": v, } continue cur["high"] = max(float(cur["high"]), h) cur["low"] = min(float(cur["low"]), l) cur["close"] = c cur["volume"] = float(cur.get("volume") or 0) + v return [buckets[k] for k in sorted(buckets.keys())] def _next_since_from_batch(batch: list, period_ms: int) -> int: last_ts = int(batch[-1][0]) if len(batch) >= 2: step = int(batch[-1][0]) - int(batch[-2][0]) if step > 0: return last_ts + step return last_ts + period_ms def _paginate_fetch_ohlcv( exchange, ex_sym: str, timeframe: str, *, want: int, since_ms: int | None, period_ms: int, chunk_max: int = 300, ) -> list[dict[str, Any]]: tf = normalize_chart_timeframe(timeframe) collected: list = [] if since_ms is not None and int(since_ms) > 0: since = int(since_ms) else: since = max(0, int(time.time() * 1000) - want * period_ms) now_ms = int(time.time() * 1000) guard = 0 prev_since = None while len(collected) < want and guard < 80: guard += 1 if since >= now_ms: break req_limit = min(chunk_max, want - len(collected)) try: batch = exchange.fetch_ohlcv( ex_sym, timeframe=tf, since=since, limit=req_limit ) except Exception as e: err = str(e).lower() if collected and ( "from" in err and "to" in err or "invalid request parameter" in err ): break raise if not batch: break collected.extend(batch) next_since = _next_since_from_batch(batch, period_ms) if next_since >= now_ms: break if prev_since is not None and next_since <= prev_since: break prev_since = since since = next_since bars = _bars_to_dicts(collected) uniq: dict[int, dict[str, Any]] = {} for b in bars: uniq[int(b["open_time_ms"])] = b merged = [uniq[k] for k in sorted(uniq.keys())] if len(merged) > want: merged = merged[-want:] return merged def _bars_to_dicts(ohlcv: list) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] for bar in ohlcv or []: if not bar or len(bar) < 6: continue try: out.append( { "open_time_ms": int(bar[0]), "open": float(bar[1]), "high": float(bar[2]), "low": float(bar[3]), "close": float(bar[4]), "volume": float(bar[5]), } ) except (TypeError, ValueError): continue return out def fetch_ohlcv_for_hub( *, symbol: str, timeframe: str, since_ms: int | None = None, limit: int = 500, normalize_symbol_input: Callable[[Any], str], normalize_exchange_symbol: Callable[[str], str], ensure_markets_loaded: Callable[[], None], exchange, friendly_error: Callable[[Exception], str] | None = None, ) -> dict[str, Any]: """从 ccxt 拉 OHLCV,供 hub_bridge /api/hub/ohlcv 返回。""" tf = normalize_chart_timeframe(timeframe) sym = normalize_symbol_input(symbol) if not sym: return {"ok": False, "msg": "symbol 不能为空"} try: ensure_markets_loaded() ex_sym = normalize_exchange_symbol(sym) want = max(1, min(int(limit or bar_limit_for_timeframe(tf)), 1500)) period = TIMEFRAME_MS[tf] merged: list[dict[str, Any]] = [] src_tf = OHLCV_AGGREGATE_FROM.get(tf) if exchange_supports_timeframe(exchange, tf): candidate = _paginate_fetch_ohlcv( exchange, ex_sym, tf, want=want, since_ms=since_ms, period_ms=period, ) if candidate and bars_spacing_matches_timeframe(candidate, tf): merged = candidate if ( not merged and src_tf and exchange_supports_timeframe(exchange, src_tf) ): src_period = TIMEFRAME_MS[normalize_chart_timeframe(src_tf)] ratio = max(1, int(math.ceil(period / src_period))) src_want = min(1500, want * ratio + ratio * 4) src_bars = _paginate_fetch_ohlcv( exchange, ex_sym, src_tf, want=src_want, since_ms=since_ms, period_ms=src_period, ) if not src_bars or not bars_spacing_matches_timeframe(src_bars, src_tf): return { "ok": False, "msg": f"无法获取 {tf} K 线(细周期 {src_tf} 数据异常)", } merged = aggregate_ohlcv_bars(src_bars, tf) if len(merged) > want: merged = merged[-want:] if not merged: try: tail = exchange.fetch_ohlcv( ex_sym, timeframe=tf, limit=min(want, 300) ) merged = _bars_to_dicts(tail or []) if len(merged) > want: merged = merged[-want:] except Exception: pass if not merged: return {"ok": False, "msg": "交易所未返回 K 线"} tick = normalize_price_tick(price_tick_from_market(exchange, ex_sym)) round_ohlcv_bars_to_tick(merged, tick) return { "ok": True, "symbol": sym, "exchange_symbol": ex_sym, "timeframe": tf, "price_tick": tick, "bars": merged, } except Exception as e: msg = friendly_error(e) if friendly_error else str(e) return {"ok": False, "msg": f"K线加载失败:{msg}"}