Files
qihuo/kline_stream.py
T
dekun d127a53870 fix: 修正 d.minute() 调用导致盘前连接 worker 报错。
datetime.minute 是属性而非方法,修复后交易时段与盘前自动连 CTP 可正常工作。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-25 12:36:01 +08:00

155 lines
4.8 KiB
Python

"""K 线 SSE 推送与后台刷新。"""
from __future__ import annotations
import json
import logging
import queue
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Optional
from zoneinfo import ZoneInfo
from kline_chart import fetch_market_klines, ths_to_sina_chart_symbol
from kline_store import is_cache_fresh, load_meta, ensure_kline_tables
logger = logging.getLogger(__name__)
TZ = ZoneInfo("Asia/Shanghai")
FAST_PERIODS = frozenset({
"timeshare", "1m", "2m", "5m", "15m", "1h", "2h", "4h",
})
def is_trading_session() -> bool:
d = datetime.now(TZ)
wd = d.weekday()
if wd == 6:
return False
if wd == 5 and d.hour < 21:
return False
t = d.hour * 60 + d.minute
def in_range(sh: int, sm: int, eh: int, em: int) -> bool:
return t >= sh * 60 + sm and t < eh * 60 + em
if in_range(9, 0, 11, 30):
return True
if in_range(13, 30, 15, 0):
return True
if in_range(21, 0, 24, 0):
return True
if in_range(0, 0, 2, 30):
return True
return False
def sse_format(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
@dataclass
class KlineSubscription:
symbol: str
period: str
market_code: str = ""
sina_code: str = ""
queue: queue.Queue = field(default_factory=queue.Queue)
class KlineStreamHub:
def __init__(self):
self._lock = threading.Lock()
self._subs: list[KlineSubscription] = []
def subscribe(
self,
symbol: str,
period: str,
market_code: str = "",
sina_code: str = "",
) -> KlineSubscription:
sub = KlineSubscription(
symbol=symbol.strip(),
period=(period or "15m").strip().lower(),
market_code=market_code.strip(),
sina_code=sina_code.strip(),
)
with self._lock:
self._subs.append(sub)
return sub
def unsubscribe(self, sub: KlineSubscription) -> None:
with self._lock:
try:
self._subs.remove(sub)
except ValueError:
pass
def _snapshot_subs(self) -> list[KlineSubscription]:
with self._lock:
return list(self._subs)
def publish(self, sub: KlineSubscription, event: str, data: dict) -> None:
try:
sub.queue.put_nowait({"event": event, "data": data})
except queue.Full:
pass
def _should_refresh(self, sub: KlineSubscription, db_path: str) -> bool:
chart_sym = ths_to_sina_chart_symbol(sub.symbol)
if not chart_sym:
return False
if is_trading_session() and sub.period in FAST_PERIODS:
return True
try:
from db_conn import connect_db
conn = connect_db(db_path)
ensure_kline_tables(conn)
meta = load_meta(conn, chart_sym, sub.period)
conn.close()
if not meta:
return True
return not is_cache_fresh(sub.period, meta.get("updated_at", ""))
except Exception as exc:
logger.warning("kline refresh check failed: %s", exc)
return True
def worker_loop(
self,
db_path: str,
quote_fn: Callable[..., dict],
get_mode_fn: Optional[Callable[[], str]] = None,
) -> None:
while True:
try:
subs = self._snapshot_subs()
for sub in subs:
if not self._should_refresh(sub, db_path):
continue
try:
kline_data = fetch_market_klines(
sub.symbol,
sub.period,
db_path,
force_remote=True,
trading_mode=get_mode_fn() if get_mode_fn else None,
)
if kline_data.get("bars"):
self.publish(sub, "kline", kline_data)
quote_data = quote_fn(
sub.symbol, sub.market_code, sub.sina_code,
)
if quote_data:
self.publish(sub, "quote", quote_data)
except Exception as exc:
logger.warning(
"kline stream refresh %s %s: %s",
sub.symbol, sub.period, exc,
)
except Exception as exc:
logger.warning("kline stream worker: %s", exc)
time.sleep(1)
kline_hub = KlineStreamHub()