"""K 线 SSE 推送与后台刷新。""" from __future__ import annotations import json import logging import queue import threading import time from dataclasses import dataclass, field from datetime import datetime from typing import Callable, Optional from zoneinfo import ZoneInfo from kline_chart import fetch_market_klines, ths_to_sina_chart_symbol from kline_store import is_cache_fresh, load_meta, ensure_kline_tables logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") FAST_PERIODS = frozenset({ "timeshare", "1m", "2m", "5m", "15m", "1h", "2h", "4h", }) def is_trading_session() -> bool: d = datetime.now(TZ) wd = d.weekday() if wd == 6: return False if wd == 5 and d.hour < 21: return False t = d.hour * 60 + d.minute def in_range(sh: int, sm: int, eh: int, em: int) -> bool: return t >= sh * 60 + sm and t < eh * 60 + em if in_range(9, 0, 11, 30): return True if in_range(13, 30, 15, 0): return True if in_range(21, 0, 24, 0): return True if in_range(0, 0, 2, 30): return True return False def sse_format(event: str, data: dict) -> str: return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" @dataclass class KlineSubscription: symbol: str period: str market_code: str = "" sina_code: str = "" queue: queue.Queue = field(default_factory=queue.Queue) class KlineStreamHub: def __init__(self): self._lock = threading.Lock() self._subs: list[KlineSubscription] = [] def subscribe( self, symbol: str, period: str, market_code: str = "", sina_code: str = "", ) -> KlineSubscription: sub = KlineSubscription( symbol=symbol.strip(), period=(period or "15m").strip().lower(), market_code=market_code.strip(), sina_code=sina_code.strip(), ) with self._lock: self._subs.append(sub) return sub def unsubscribe(self, sub: KlineSubscription) -> None: with self._lock: try: self._subs.remove(sub) except ValueError: pass def _snapshot_subs(self) -> list[KlineSubscription]: with self._lock: return list(self._subs) def publish(self, sub: KlineSubscription, event: str, data: dict) -> None: try: sub.queue.put_nowait({"event": event, "data": data}) except queue.Full: pass def _should_refresh(self, sub: KlineSubscription, db_path: str) -> bool: chart_sym = ths_to_sina_chart_symbol(sub.symbol) if not chart_sym: return False if is_trading_session() and sub.period in FAST_PERIODS: return True try: from db_conn import connect_db conn = connect_db(db_path) ensure_kline_tables(conn) meta = load_meta(conn, chart_sym, sub.period) conn.close() if not meta: return True return not is_cache_fresh(sub.period, meta.get("updated_at", "")) except Exception as exc: logger.warning("kline refresh check failed: %s", exc) return True def worker_loop( self, db_path: str, quote_fn: Callable[..., dict], get_mode_fn: Optional[Callable[[], str]] = None, ) -> None: while True: try: subs = self._snapshot_subs() for sub in subs: if not self._should_refresh(sub, db_path): continue try: kline_data = fetch_market_klines( sub.symbol, sub.period, db_path, force_remote=True, trading_mode=get_mode_fn() if get_mode_fn else None, ) if kline_data.get("bars"): self.publish(sub, "kline", kline_data) quote_data = quote_fn( sub.symbol, sub.market_code, sub.sina_code, ) if quote_data: self.publish(sub, "quote", quote_data) except Exception as exc: logger.warning( "kline stream refresh %s %s: %s", sub.symbol, sub.period, exc, ) except Exception as exc: logger.warning("kline stream worker: %s", exc) time.sleep(1) kline_hub = KlineStreamHub()