"""品种推荐 SSE 推送与后台刷新。""" from __future__ import annotations import json import logging import queue import threading import time from typing import Callable, Optional from db_conn import connect_db from kline_stream import sse_format from recommend_store import load_recommend_cache, refresh_recommend_cache logger = logging.getLogger(__name__) REFRESH_INTERVAL_SEC = 60 class RecommendStreamHub: def __init__(self) -> None: self._lock = threading.Lock() self._subs: list[queue.Queue] = [] def subscribe(self) -> queue.Queue: q: queue.Queue = queue.Queue(maxsize=8) with self._lock: self._subs.append(q) return q def unsubscribe(self, q: queue.Queue) -> None: with self._lock: try: self._subs.remove(q) except ValueError: pass def broadcast(self, event: str, data: dict) -> None: msg = {"event": event, "data": data} with self._lock: subs = list(self._subs) for q in subs: try: q.put_nowait(msg) except queue.Full: pass recommend_hub = RecommendStreamHub() def start_recommend_worker( *, db_path: str, get_capital_fn: Callable, price_fn: Callable[[str], Optional[float]], init_tables_fn: Callable | None = None, interval: int = REFRESH_INTERVAL_SEC, ) -> None: """后台定时刷新推荐并推送给 SSE 订阅者。""" def _loop() -> None: while True: try: conn = connect_db(db_path) try: if init_tables_fn: init_tables_fn(conn) capital = float(get_capital_fn(conn) or 0) refresh_recommend_cache(conn, capital, price_fn) payload = load_recommend_cache(conn) finally: conn.close() recommend_hub.broadcast("recommend", {"ok": True, **payload}) except Exception as exc: logger.warning("recommend worker failed: %s", exc) time.sleep(max(15, interval)) threading.Thread(target=_loop, daemon=True, name="recommend-worker").start()