e5a586f903
Move business code under modules/, env template to config/, PM2 single qihuo process, and _legacy shims for old imports. Co-authored-by: Cursor <cursoragent@cursor.com>
140 lines
4.6 KiB
Python
140 lines
4.6 KiB
Python
# Copyright (c) 2025-2026 马建军. All rights reserved.
|
|
# 专有软件 — 未经授权禁止复制、传播、转售。
|
|
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
|
|
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
|
|
|
|
"""K 线 SSE 推送与后台刷新。"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
import queue
|
|
import threading
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Callable, Optional
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from modules.market.kline_chart import fetch_market_klines, ths_to_sina_chart_symbol
|
|
from modules.market.kline_store import is_cache_fresh, load_meta, ensure_kline_tables
|
|
from modules.market.market_sessions import is_trading_session
|
|
|
|
logger = logging.getLogger(__name__)
|
|
TZ = ZoneInfo("Asia/Shanghai")
|
|
|
|
FAST_PERIODS = frozenset({
|
|
"timeshare", "1m", "2m", "5m", "15m", "1h", "2h", "4h",
|
|
})
|
|
|
|
|
|
def sse_format(event: str, data: dict) -> str:
|
|
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False, default=str)}\n\n"
|
|
|
|
|
|
@dataclass
|
|
class KlineSubscription:
|
|
symbol: str
|
|
period: str
|
|
market_code: str = ""
|
|
sina_code: str = ""
|
|
queue: queue.Queue = field(default_factory=queue.Queue)
|
|
|
|
|
|
class KlineStreamHub:
|
|
def __init__(self):
|
|
self._lock = threading.Lock()
|
|
self._subs: list[KlineSubscription] = []
|
|
|
|
def subscribe(
|
|
self,
|
|
symbol: str,
|
|
period: str,
|
|
market_code: str = "",
|
|
sina_code: str = "",
|
|
) -> KlineSubscription:
|
|
sub = KlineSubscription(
|
|
symbol=symbol.strip(),
|
|
period=(period or "15m").strip().lower(),
|
|
market_code=market_code.strip(),
|
|
sina_code=sina_code.strip(),
|
|
)
|
|
with self._lock:
|
|
self._subs.append(sub)
|
|
return sub
|
|
|
|
def unsubscribe(self, sub: KlineSubscription) -> None:
|
|
with self._lock:
|
|
try:
|
|
self._subs.remove(sub)
|
|
except ValueError:
|
|
pass
|
|
|
|
def _snapshot_subs(self) -> list[KlineSubscription]:
|
|
with self._lock:
|
|
return list(self._subs)
|
|
|
|
def publish(self, sub: KlineSubscription, event: str, data: dict) -> None:
|
|
try:
|
|
sub.queue.put_nowait({"event": event, "data": data})
|
|
except queue.Full:
|
|
pass
|
|
|
|
def _should_refresh(self, sub: KlineSubscription, db_path: str) -> bool:
|
|
chart_sym = ths_to_sina_chart_symbol(sub.symbol)
|
|
if not chart_sym:
|
|
return False
|
|
if is_trading_session() and sub.period in FAST_PERIODS:
|
|
return True
|
|
try:
|
|
from modules.core.db_conn import connect_db
|
|
conn = connect_db(db_path)
|
|
ensure_kline_tables(conn)
|
|
meta = load_meta(conn, chart_sym, sub.period)
|
|
conn.close()
|
|
if not meta:
|
|
return True
|
|
return not is_cache_fresh(sub.period, meta.get("updated_at", ""))
|
|
except Exception as exc:
|
|
logger.warning("kline refresh check failed: %s", exc)
|
|
return True
|
|
|
|
def worker_loop(
|
|
self,
|
|
db_path: str,
|
|
quote_fn: Callable[..., dict],
|
|
get_mode_fn: Optional[Callable[[], str]] = None,
|
|
) -> None:
|
|
while True:
|
|
try:
|
|
subs = self._snapshot_subs()
|
|
for sub in subs:
|
|
if not self._should_refresh(sub, db_path):
|
|
continue
|
|
try:
|
|
kline_data = fetch_market_klines(
|
|
sub.symbol,
|
|
sub.period,
|
|
db_path,
|
|
force_remote=True,
|
|
prefer_ctp=False,
|
|
)
|
|
if kline_data.get("bars"):
|
|
self.publish(sub, "kline", kline_data)
|
|
quote_data = quote_fn(
|
|
sub.symbol, sub.market_code, sub.sina_code,
|
|
)
|
|
if quote_data:
|
|
self.publish(sub, "quote", quote_data)
|
|
except Exception as exc:
|
|
logger.warning(
|
|
"kline stream refresh %s %s: %s",
|
|
sub.symbol, sub.period, exc,
|
|
)
|
|
except Exception as exc:
|
|
logger.warning("kline stream worker: %s", exc)
|
|
time.sleep(1)
|
|
|
|
|
|
kline_hub = KlineStreamHub()
|