"""持仓监控:后台拉取 CTP 并 SSE 推送给前端(避免每次刷新阻塞读柜台)。""" from __future__ import annotations import logging import queue import threading import time from typing import Callable, Optional from kline_stream import sse_format from market_sessions import is_trading_session logger = logging.getLogger(__name__) PUSH_INTERVAL_SEC = 1 IDLE_INTERVAL_SEC = 5 class PositionStreamHub: def __init__(self) -> None: self._lock = threading.Lock() self._subs: list[queue.Queue] = [] self._snapshot: Optional[dict] = None self._snapshot_ts: float = 0.0 def subscribe(self) -> queue.Queue: q: queue.Queue = queue.Queue(maxsize=16) with self._lock: self._subs.append(q) return q def unsubscribe(self, q: queue.Queue) -> None: with self._lock: try: self._subs.remove(q) except ValueError: pass def get_snapshot(self) -> Optional[dict]: with self._lock: return dict(self._snapshot) if self._snapshot else None def set_snapshot(self, data: dict) -> None: with self._lock: self._snapshot = dict(data) self._snapshot_ts = time.time() def broadcast(self, event: str, data: dict) -> None: self.set_snapshot(data) msg = {"event": event, "data": data} with self._lock: subs = list(self._subs) for q in subs: try: q.put_nowait(msg) except queue.Full: try: q.get_nowait() except queue.Empty: pass try: q.put_nowait(msg) except queue.Full: pass position_hub = PositionStreamHub() def start_position_worker( *, refresh_fn: Callable[[], dict], interval: int = PUSH_INTERVAL_SEC, idle_interval: int = IDLE_INTERVAL_SEC, ) -> None: """后台定时刷新持仓快照并 SSE 广播。""" def _loop() -> None: while True: sleep_sec = idle_interval try: payload = refresh_fn() if payload: position_hub.broadcast("positions", payload) ctp_st = (payload or {}).get("ctp_status") or {} connected = bool(ctp_st.get("connected")) in_session = bool((payload or {}).get("trading_session")) rows = (payload or {}).get("rows") or [] has_sl_tp = any( r.get("stop_loss") is not None or r.get("take_profit") is not None for r in rows ) if connected and in_session: sleep_sec = max(1, interval) elif connected: sleep_sec = max(2, min(idle_interval, 3)) except Exception as exc: logger.warning("position worker failed: %s", exc) time.sleep(sleep_sec) threading.Thread(target=_loop, daemon=True, name="position-stream").start()