Files
qihuo/position_stream.py
T

103 lines
3.2 KiB
Python

"""持仓监控:后台拉取 CTP 并 SSE 推送给前端(避免每次刷新阻塞读柜台)。"""
from __future__ import annotations
import logging
import queue
import threading
import time
from typing import Callable, Optional
from kline_stream import sse_format
from market_sessions import is_trading_session
logger = logging.getLogger(__name__)
PUSH_INTERVAL_SEC = 1
IDLE_INTERVAL_SEC = 5
class PositionStreamHub:
def __init__(self) -> None:
self._lock = threading.Lock()
self._subs: list[queue.Queue] = []
self._snapshot: Optional[dict] = None
self._snapshot_ts: float = 0.0
def subscribe(self) -> queue.Queue:
q: queue.Queue = queue.Queue(maxsize=16)
with self._lock:
self._subs.append(q)
return q
def unsubscribe(self, q: queue.Queue) -> None:
with self._lock:
try:
self._subs.remove(q)
except ValueError:
pass
def get_snapshot(self) -> Optional[dict]:
with self._lock:
return dict(self._snapshot) if self._snapshot else None
def set_snapshot(self, data: dict) -> None:
with self._lock:
self._snapshot = dict(data)
self._snapshot_ts = time.time()
def broadcast(self, event: str, data: dict) -> None:
self.set_snapshot(data)
msg = {"event": event, "data": data}
with self._lock:
subs = list(self._subs)
for q in subs:
try:
q.put_nowait(msg)
except queue.Full:
try:
q.get_nowait()
except queue.Empty:
pass
try:
q.put_nowait(msg)
except queue.Full:
pass
position_hub = PositionStreamHub()
def start_position_worker(
*,
refresh_fn: Callable[[], dict],
interval: int = PUSH_INTERVAL_SEC,
idle_interval: int = IDLE_INTERVAL_SEC,
) -> None:
"""后台定时刷新持仓快照并 SSE 广播。"""
def _loop() -> None:
time.sleep(3)
while True:
sleep_sec = idle_interval
try:
payload = refresh_fn()
if payload:
position_hub.broadcast("positions", payload)
ctp_st = (payload or {}).get("ctp_status") or {}
connected = bool(ctp_st.get("connected"))
in_session = bool((payload or {}).get("trading_session"))
rows = (payload or {}).get("rows") or []
has_sl_tp = any(
r.get("stop_loss") is not None or r.get("take_profit") is not None
for r in rows
)
if connected and in_session:
sleep_sec = max(1, interval)
elif connected:
sleep_sec = max(2, min(idle_interval, 3))
except Exception as exc:
logger.warning("position worker failed: %s", exc)
time.sleep(sleep_sec)
threading.Thread(target=_loop, daemon=True, name="position-stream").start()