Files
qihuo/kline_stream.py
T
dekun 55d95b4c39 进一步修复 SQLite 并发锁冲突,统一连接与重试机制。
新增 db_conn 模块、缓存 schema 初始化、positions 页 commit,风控读库自动重试。

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-24 10:30:26 +08:00

146 lines
4.6 KiB
Python

"""K 线 SSE 推送与后台刷新。"""
from __future__ import annotations
import json
import logging
import queue
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Optional
from zoneinfo import ZoneInfo
from kline_chart import fetch_market_klines, ths_to_sina_chart_symbol
from kline_store import is_cache_fresh, load_meta, ensure_kline_tables
logger = logging.getLogger(__name__)
TZ = ZoneInfo("Asia/Shanghai")
FAST_PERIODS = frozenset({
"timeshare", "1m", "2m", "5m", "15m", "1h", "2h", "4h",
})
def is_trading_session() -> bool:
d = datetime.now(TZ)
wd = d.weekday()
if wd == 6:
return False
if wd == 5 and d.hour < 21:
return False
t = d.hour * 60 + d.minute()
def in_range(sh: int, sm: int, eh: int, em: int) -> bool:
return t >= sh * 60 + sm and t < eh * 60 + em
if in_range(9, 0, 11, 30):
return True
if in_range(13, 30, 15, 0):
return True
if in_range(21, 0, 24, 0):
return True
if in_range(0, 0, 2, 30):
return True
return False
def sse_format(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
@dataclass
class KlineSubscription:
symbol: str
period: str
market_code: str = ""
sina_code: str = ""
queue: queue.Queue = field(default_factory=queue.Queue)
class KlineStreamHub:
def __init__(self):
self._lock = threading.Lock()
self._subs: list[KlineSubscription] = []
def subscribe(
self,
symbol: str,
period: str,
market_code: str = "",
sina_code: str = "",
) -> KlineSubscription:
sub = KlineSubscription(
symbol=symbol.strip(),
period=(period or "15m").strip().lower(),
market_code=market_code.strip(),
sina_code=sina_code.strip(),
)
with self._lock:
self._subs.append(sub)
return sub
def unsubscribe(self, sub: KlineSubscription) -> None:
with self._lock:
try:
self._subs.remove(sub)
except ValueError:
pass
def _snapshot_subs(self) -> list[KlineSubscription]:
with self._lock:
return list(self._subs)
def publish(self, sub: KlineSubscription, event: str, data: dict) -> None:
try:
sub.queue.put_nowait({"event": event, "data": data})
except queue.Full:
pass
def _should_refresh(self, sub: KlineSubscription, db_path: str) -> bool:
chart_sym = ths_to_sina_chart_symbol(sub.symbol)
if not chart_sym:
return False
if is_trading_session() and sub.period in FAST_PERIODS:
return True
try:
from db_conn import connect_db
conn = connect_db(db_path)
ensure_kline_tables(conn)
meta = load_meta(conn, chart_sym, sub.period)
conn.close()
if not meta:
return True
return not is_cache_fresh(sub.period, meta.get("updated_at", ""))
except Exception as exc:
logger.warning("kline refresh check failed: %s", exc)
return True
def worker_loop(self, db_path: str, quote_fn: Callable[..., dict]) -> None:
while True:
try:
subs = self._snapshot_subs()
for sub in subs:
if not self._should_refresh(sub, db_path):
continue
try:
kline_data = fetch_market_klines(
sub.symbol, sub.period, db_path, force_remote=True,
)
if kline_data.get("bars"):
self.publish(sub, "kline", kline_data)
quote_data = quote_fn(
sub.symbol, sub.market_code, sub.sina_code,
)
if quote_data:
self.publish(sub, "quote", quote_data)
except Exception as exc:
logger.warning(
"kline stream refresh %s %s: %s",
sub.symbol, sub.period, exc,
)
except Exception as exc:
logger.warning("kline stream worker: %s", exc)
time.sleep(1)
kline_hub = KlineStreamHub()