9d4aea60f0
Co-authored-by: Cursor <cursoragent@cursor.com>
80 lines
2.3 KiB
Python
80 lines
2.3 KiB
Python
"""品种推荐 SSE 推送与后台刷新。"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import queue
|
|
import threading
|
|
import time
|
|
from typing import Callable, Optional
|
|
|
|
from db_conn import connect_db
|
|
from kline_stream import sse_format
|
|
from recommend_store import load_recommend_cache, refresh_recommend_cache
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
REFRESH_INTERVAL_SEC = 60
|
|
|
|
|
|
class RecommendStreamHub:
|
|
def __init__(self) -> None:
|
|
self._lock = threading.Lock()
|
|
self._subs: list[queue.Queue] = []
|
|
|
|
def subscribe(self) -> queue.Queue:
|
|
q: queue.Queue = queue.Queue(maxsize=8)
|
|
with self._lock:
|
|
self._subs.append(q)
|
|
return q
|
|
|
|
def unsubscribe(self, q: queue.Queue) -> None:
|
|
with self._lock:
|
|
try:
|
|
self._subs.remove(q)
|
|
except ValueError:
|
|
pass
|
|
|
|
def broadcast(self, event: str, data: dict) -> None:
|
|
msg = {"event": event, "data": data}
|
|
with self._lock:
|
|
subs = list(self._subs)
|
|
for q in subs:
|
|
try:
|
|
q.put_nowait(msg)
|
|
except queue.Full:
|
|
pass
|
|
|
|
|
|
recommend_hub = RecommendStreamHub()
|
|
|
|
|
|
def start_recommend_worker(
|
|
*,
|
|
db_path: str,
|
|
get_capital_fn: Callable,
|
|
quote_fn: Callable[[str], Optional[dict]],
|
|
init_tables_fn: Callable | None = None,
|
|
interval: int = REFRESH_INTERVAL_SEC,
|
|
) -> None:
|
|
"""后台定时刷新推荐并推送给 SSE 订阅者。"""
|
|
|
|
def _loop() -> None:
|
|
while True:
|
|
try:
|
|
conn = connect_db(db_path)
|
|
try:
|
|
if init_tables_fn:
|
|
init_tables_fn(conn)
|
|
capital = float(get_capital_fn(conn) or 0)
|
|
refresh_recommend_cache(conn, capital, quote_fn)
|
|
payload = load_recommend_cache(conn)
|
|
finally:
|
|
conn.close()
|
|
recommend_hub.broadcast("recommend", {"ok": True, **payload})
|
|
except Exception as exc:
|
|
logger.warning("recommend worker failed: %s", exc)
|
|
time.sleep(max(15, interval))
|
|
|
|
threading.Thread(target=_loop, daemon=True, name="recommend-worker").start()
|