Files
qihuo/ctp_kline.py
T

85 lines
2.2 KiB
Python

"""CTP tick 聚合 K 线(1 分钟为基础,再合成各周期)。"""
from __future__ import annotations
import logging
from typing import Optional
from kline_chart import (
PERIOD_MINUTES,
_aggregate_bars,
_bar_datetime,
_merge_bars,
_timeshare_session,
_weekly_from_daily,
)
logger = logging.getLogger(__name__)
PERIOD_AGG = {
"2m": 2,
"3m": 3,
"5m": 5,
"15m": 15,
"30m": 30,
"1h": 60,
"2h": 120,
"4h": 240,
}
def _daily_from_1m(bars_1m: list) -> list:
if not bars_1m:
return []
buckets: dict[str, list] = {}
for bar in bars_1m:
dt = _bar_datetime(bar)
if not dt:
continue
key = dt.strftime("%Y-%m-%d")
buckets.setdefault(key, []).append(bar)
out = []
for day in sorted(buckets.keys()):
chunk = buckets[day]
merged = _merge_bars(chunk)
merged["d"] = day + " 15:00:00"
out.append(merged)
return out
def compose_period_bars(bars_1m: list, period: str) -> list:
p = (period or "15m").lower()
if p == "timeshare":
return _timeshare_session(bars_1m)
if p in ("1d", "d"):
return _daily_from_1m(bars_1m)
if p == "w":
return _weekly_from_daily(_daily_from_1m(bars_1m))
if p == "1m":
return list(bars_1m)
n = PERIOD_AGG.get(p)
if n:
return _aggregate_bars(bars_1m, n)
if p in PERIOD_MINUTES:
try:
n = int(PERIOD_MINUTES[p])
return _aggregate_bars(bars_1m, n)
except (TypeError, ValueError):
pass
return list(bars_1m)
def fetch_ctp_klines(symbol: str, period: str, mode: str) -> Optional[list]:
"""CTP 已连接时由 tick 聚合 K 线;失败返回 None。"""
try:
from vnpy_bridge import ctp_status, get_bridge
if not ctp_status(mode).get("connected"):
return None
bars_1m = get_bridge().get_kline_bars_1m(symbol, mode=mode)
if not bars_1m:
return None
return compose_period_bars(bars_1m, period)
except Exception as exc:
logger.debug("fetch_ctp_klines %s %s: %s", symbol, period, exc)
return None