"""CTP tick 聚合 K 线(1 分钟为基础,再合成各周期)。""" from __future__ import annotations import logging from typing import Optional from kline_chart import ( PERIOD_MINUTES, _aggregate_bars, _bar_datetime, _merge_bars, _timeshare_session, _weekly_from_daily, ) logger = logging.getLogger(__name__) PERIOD_AGG = { "2m": 2, "3m": 3, "5m": 5, "15m": 15, "30m": 30, "1h": 60, "2h": 120, "4h": 240, } def _daily_from_1m(bars_1m: list) -> list: if not bars_1m: return [] buckets: dict[str, list] = {} for bar in bars_1m: dt = _bar_datetime(bar) if not dt: continue key = dt.strftime("%Y-%m-%d") buckets.setdefault(key, []).append(bar) out = [] for day in sorted(buckets.keys()): chunk = buckets[day] merged = _merge_bars(chunk) merged["d"] = day + " 15:00:00" out.append(merged) return out def compose_period_bars(bars_1m: list, period: str) -> list: p = (period or "15m").lower() if p == "timeshare": return _timeshare_session(bars_1m) if p in ("1d", "d"): return _daily_from_1m(bars_1m) if p == "w": return _weekly_from_daily(_daily_from_1m(bars_1m)) if p == "1m": return list(bars_1m) n = PERIOD_AGG.get(p) if n: return _aggregate_bars(bars_1m, n) if p in PERIOD_MINUTES: try: n = int(PERIOD_MINUTES[p]) return _aggregate_bars(bars_1m, n) except (TypeError, ValueError): pass return list(bars_1m) def fetch_ctp_klines(symbol: str, period: str, mode: str) -> Optional[list]: """CTP 已连接时由 tick 聚合 K 线;失败返回 None。""" try: from vnpy_bridge import ctp_status, get_bridge if not ctp_status(mode).get("connected"): return None bars_1m = get_bridge().get_kline_bars_1m(symbol, mode=mode) if not bars_1m: return None return compose_period_bars(bars_1m, period) except Exception as exc: logger.debug("fetch_ctp_klines %s %s: %s", symbol, period, exc) return None