diff --git a/app.py b/app.py index 6e2f25c..f6dc65c 100644 --- a/app.py +++ b/app.py @@ -13,7 +13,7 @@ from werkzeug.utils import secure_filename from dotenv import load_dotenv from flask import ( Flask, render_template, request, redirect, url_for, - flash, session, jsonify, + flash, session, jsonify, Response, stream_with_context, ) from werkzeug.security import check_password_hash, generate_password_hash @@ -31,6 +31,7 @@ from fee_sync import sync_fees_from_akshare from contract_profile import get_contract_profile from stats_engine import STATS_VIEWS, load_stats_cache, refresh_stats_cache from kline_store import ensure_kline_tables +from kline_stream import kline_hub, sse_format from kline_chart import generate_review_kline_chart, fetch_market_klines, MARKET_PERIODS from market import get_price as market_get_price, set_ths_refresh_token, get_quote_source_label @@ -364,6 +365,29 @@ def sync_ths_token(): sync_ths_token() + +def build_market_quote_payload( + symbol: str, + market_code: str = "", + sina_code: str = "", +) -> dict: + if not market_code or not sina_code: + codes = ths_to_codes(symbol) + if codes: + market_code = codes.get("market_code", "") or market_code + sina_code = codes.get("sina_code", "") or sina_code + price = market_get_price(market_code, sina_code) + name = symbol + codes = ths_to_codes(symbol) + if codes: + name = codes.get("name", symbol) + return { + "symbol": symbol, + "name": name, + "price": price, + } + + # —————————————— 推送 —————————————— def send_wechat_msg(content: str): @@ -542,6 +566,17 @@ def background_task(): pass time.sleep(3) + +def start_background_threads(): + threading.Thread(target=background_task, daemon=True).start() + threading.Thread( + target=lambda: kline_hub.worker_loop(DB_PATH, build_market_quote_payload), + daemon=True, + ).start() + + +start_background_threads() + # —————————————— 登录 —————————————— def login_required(f): @@ -1334,6 +1369,48 @@ def api_kline(): return jsonify(data) +@app.route("/api/kline/stream") +@login_required +def api_kline_stream(): + from queue import Empty + + symbol = request.args.get("symbol", "").strip() + period = request.args.get("period", "15m").strip() + market_code = request.args.get("market_code", "").strip() + sina_code = request.args.get("sina_code", "").strip() + if not symbol: + return jsonify({"error": "请提供合约代码"}), 400 + + def generate(): + sub = kline_hub.subscribe(symbol, period, market_code, sina_code) + try: + kline_data = fetch_market_klines(symbol, period, DB_PATH) + if kline_data.get("bars"): + yield sse_format("kline", kline_data) + yield sse_format( + "quote", + build_market_quote_payload(symbol, market_code, sina_code), + ) + while True: + try: + msg = sub.queue.get(timeout=20) + yield sse_format(msg["event"], msg["data"]) + except Empty: + yield ": heartbeat\n\n" + finally: + kline_hub.unsubscribe(sub) + + return Response( + stream_with_context(generate()), + mimetype="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + @app.route("/api/market_quote") @login_required def api_market_quote(): @@ -1342,21 +1419,7 @@ def api_market_quote(): sina_code = request.args.get("sina_code", "").strip() if not symbol and not market_code: return jsonify({"error": "请提供合约"}), 400 - if not market_code or not sina_code: - codes = ths_to_codes(symbol) - if codes: - market_code = codes.get("market_code", "") or market_code - sina_code = codes.get("sina_code", "") or sina_code - price = market_get_price(market_code, sina_code) - name = symbol - codes = ths_to_codes(symbol) - if codes: - name = codes.get("name", symbol) - return jsonify({ - "symbol": symbol, - "name": name, - "price": price, - }) + return jsonify(build_market_quote_payload(symbol, market_code, sina_code)) @app.route("/contract") @@ -1491,5 +1554,4 @@ def settings(): # —————————————— 启动 —————————————— if __name__ == "__main__": - threading.Thread(target=background_task, daemon=True).start() app.run(host=HOST, port=PORT, debug=DEBUG) diff --git a/kline_chart.py b/kline_chart.py index 3fbfb2c..245f080 100644 --- a/kline_chart.py +++ b/kline_chart.py @@ -215,7 +215,12 @@ def bars_to_api(bars: list) -> list[dict]: return result -def fetch_market_klines(symbol: str, period: str, db_path: Optional[str] = None) -> dict: +def fetch_market_klines( + symbol: str, + period: str, + db_path: Optional[str] = None, + force_remote: bool = False, +) -> dict: chart_sym = ths_to_sina_chart_symbol(symbol) p = (period or "15m").lower() if p == "timeshare": @@ -227,7 +232,7 @@ def fetch_market_klines(symbol: str, period: str, db_path: Optional[str] = None) source = "remote" cached_at = None - if db_path and chart_sym: + if db_path and chart_sym and not force_remote: try: conn = sqlite3.connect(db_path) cached = get_cached_entry(conn, chart_sym, p) @@ -239,7 +244,7 @@ def fetch_market_klines(symbol: str, period: str, db_path: Optional[str] = None) except Exception as exc: logger.warning("kline cache read failed %s %s: %s", chart_sym, p, exc) - if not bars: + if force_remote or not bars: remote_bars = fetch_sina_klines(symbol, p) if remote_bars: bars = remote_bars @@ -257,7 +262,7 @@ def fetch_market_klines(symbol: str, period: str, db_path: Optional[str] = None) cached_at = meta[0] if meta else None except Exception as exc: logger.warning("kline cache write failed %s %s: %s", chart_sym, p, exc) - elif db_path and chart_sym: + elif not bars and db_path and chart_sym: try: conn = sqlite3.connect(db_path) cached = get_cached_entry(conn, chart_sym, p) diff --git a/kline_stream.py b/kline_stream.py new file mode 100644 index 0000000..28030d5 --- /dev/null +++ b/kline_stream.py @@ -0,0 +1,145 @@ +"""K 线 SSE 推送与后台刷新。""" +from __future__ import annotations + +import json +import logging +import queue +import threading +import time +from dataclasses import dataclass, field +from datetime import datetime +from typing import Callable, Optional +from zoneinfo import ZoneInfo + +from kline_chart import fetch_market_klines, ths_to_sina_chart_symbol +from kline_store import is_cache_fresh, load_meta, ensure_kline_tables + +logger = logging.getLogger(__name__) +TZ = ZoneInfo("Asia/Shanghai") + +FAST_PERIODS = frozenset({ + "timeshare", "1m", "2m", "5m", "15m", "1h", "2h", "4h", +}) + + +def is_trading_session() -> bool: + d = datetime.now(TZ) + wd = d.weekday() + if wd == 6: + return False + if wd == 5 and d.hour < 21: + return False + t = d.hour * 60 + d.minute() + def in_range(sh: int, sm: int, eh: int, em: int) -> bool: + return t >= sh * 60 + sm and t < eh * 60 + em + if in_range(9, 0, 11, 30): + return True + if in_range(13, 30, 15, 0): + return True + if in_range(21, 0, 24, 0): + return True + if in_range(0, 0, 2, 30): + return True + return False + + +def sse_format(event: str, data: dict) -> str: + return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n" + + +@dataclass +class KlineSubscription: + symbol: str + period: str + market_code: str = "" + sina_code: str = "" + queue: queue.Queue = field(default_factory=queue.Queue) + + +class KlineStreamHub: + def __init__(self): + self._lock = threading.Lock() + self._subs: list[KlineSubscription] = [] + + def subscribe( + self, + symbol: str, + period: str, + market_code: str = "", + sina_code: str = "", + ) -> KlineSubscription: + sub = KlineSubscription( + symbol=symbol.strip(), + period=(period or "15m").strip().lower(), + market_code=market_code.strip(), + sina_code=sina_code.strip(), + ) + with self._lock: + self._subs.append(sub) + return sub + + def unsubscribe(self, sub: KlineSubscription) -> None: + with self._lock: + try: + self._subs.remove(sub) + except ValueError: + pass + + def _snapshot_subs(self) -> list[KlineSubscription]: + with self._lock: + return list(self._subs) + + def publish(self, sub: KlineSubscription, event: str, data: dict) -> None: + try: + sub.queue.put_nowait({"event": event, "data": data}) + except queue.Full: + pass + + def _should_refresh(self, sub: KlineSubscription, db_path: str) -> bool: + chart_sym = ths_to_sina_chart_symbol(sub.symbol) + if not chart_sym: + return False + if is_trading_session() and sub.period in FAST_PERIODS: + return True + try: + import sqlite3 + conn = sqlite3.connect(db_path) + ensure_kline_tables(conn) + meta = load_meta(conn, chart_sym, sub.period) + conn.close() + if not meta: + return True + return not is_cache_fresh(sub.period, meta.get("updated_at", "")) + except Exception as exc: + logger.warning("kline refresh check failed: %s", exc) + return True + + def worker_loop(self, db_path: str, quote_fn: Callable[..., dict]) -> None: + while True: + try: + subs = self._snapshot_subs() + for sub in subs: + if not self._should_refresh(sub, db_path): + continue + try: + kline_data = fetch_market_klines( + sub.symbol, sub.period, db_path, force_remote=True, + ) + if kline_data.get("bars"): + self.publish(sub, "kline", kline_data) + quote_data = quote_fn( + sub.symbol, sub.market_code, sub.sina_code, + ) + if quote_data: + self.publish(sub, "quote", quote_data) + except Exception as exc: + logger.warning( + "kline stream refresh %s %s: %s", + sub.symbol, sub.period, exc, + ) + except Exception as exc: + logger.warning("kline stream worker: %s", exc) + time.sleep(1) + + +kline_hub = KlineStreamHub() diff --git a/static/js/market.js b/static/js/market.js index ccc2bf1..95e1fe2 100644 --- a/static/js/market.js +++ b/static/js/market.js @@ -4,12 +4,10 @@ var wrapEl = chartEl && chartEl.parentElement; var chart = null; var currentPeriod = '15m'; - var quoteTimer = null; - var klineTimer = null; + var klineSource = null; + var streamActive = false; + var reconnectTimer = null; var lastData = null; - var klineLoading = false; - - var FAST_PERIODS = ['timeshare', '1m', '2m', '5m', '15m', '1h', '2h', '4h']; function getSymbol() { var hidden = document.getElementById('market-symbol-hidden'); @@ -59,19 +57,6 @@ return false; } - function klinePollMs() { - if (!isTradingSession()) return 0; - if (currentPeriod === 'timeshare' || FAST_PERIODS.indexOf(currentPeriod) >= 0) { - return 1000; - } - if (currentPeriod === 'd' || currentPeriod === 'w') return 30000; - return 5000; - } - - function quotePollMs() { - return isTradingSession() ? 1000 : 10000; - } - function initChart() { if (!chartEl || !window.echarts) return; chart = echarts.init(chartEl); @@ -254,9 +239,7 @@ } function hideEmptyOverlay() { - if (emptyEl) { - emptyEl.style.display = ''; - } + if (emptyEl) emptyEl.style.display = ''; if (wrapEl) wrapEl.classList.add('has-data'); } @@ -272,66 +255,106 @@ var btn = document.getElementById('market-load-btn'); if (btn) { btn.disabled = on; - btn.textContent = on ? '加载中…' : '查看'; - } - if (on) { - showEmptyOverlay('加载中…'); - } else if (lastData) { - hideEmptyOverlay(); + btn.textContent = on ? '连接中…' : '查看'; } + if (on) showEmptyOverlay('连接中…'); + else if (lastData) hideEmptyOverlay(); } - function updateRefreshHint() { + function updateRefreshHint(disconnected) { var el = document.getElementById('market-refresh-hint'); if (!el) return; if (!getSymbol()) { el.textContent = ''; return; } + if (disconnected) { + el.textContent = 'SSE 连接中断,正在重连…'; + return; + } + if (!streamActive) { + el.textContent = ''; + return; + } + var src = lastData && lastData.source === 'local' ? ' · 本地缓存' : ''; if (isTradingSession()) { - var ms = klinePollMs(); - var src = lastData && lastData.source === 'local' ? ' · 本地缓存' : ''; - el.textContent = ms === 1000 - ? '交易中 · 1秒刷新' + src - : '交易中 · 自动刷新' + src; + el.textContent = '交易中 · 后台刷新 · SSE 推送(约1秒)' + src; } else { - el.textContent = '非交易时段 · 暂停高频刷新'; + el.textContent = 'SSE 推送 · 非交易时段低频刷新' + src; } } - function loadKline(silent) { + function applyQuote(data) { + var priceEl = document.getElementById('market-quote-price'); + var nameEl = document.getElementById('market-quote-name'); + if (nameEl && data.name) nameEl.textContent = data.name + ' ' + (data.symbol || ''); + if (priceEl) { + priceEl.textContent = data.price != null ? Number(data.price).toFixed(2) : '—'; + } + } + + function stopKlineStream() { + streamActive = false; + if (reconnectTimer) { + clearTimeout(reconnectTimer); + reconnectTimer = null; + } + if (klineSource) { + klineSource.close(); + klineSource = null; + } + } + + function scheduleReconnect() { + if (reconnectTimer) return; + updateRefreshHint(true); + reconnectTimer = setTimeout(function () { + reconnectTimer = null; + if (getSymbol()) startKlineStream(false); + }, 3000); + } + + function startKlineStream(showLoading) { + stopKlineStream(); var symbol = getSymbol(); if (!symbol) { - if (!silent) alert('请先选择或输入合约代码'); + alert('请先选择或输入合约代码'); return; } - if (klineLoading) return; - klineLoading = true; - if (!silent) setLoading(true); + if (showLoading) setLoading(true); - var url = '/api/kline?symbol=' + encodeURIComponent(symbol) + '&period=' + encodeURIComponent(currentPeriod); - fetch(url) - .then(function (r) { - return r.json().then(function (j) { return { ok: r.ok, data: j }; }); - }) - .then(function (res) { - if (!res.ok) throw new Error(res.data.error || '加载失败'); + var codes = getMarketCodes(); + var q = 'symbol=' + encodeURIComponent(symbol) + + '&period=' + encodeURIComponent(currentPeriod); + if (codes.market_code) q += '&market_code=' + encodeURIComponent(codes.market_code); + if (codes.sina_code) q += '&sina_code=' + encodeURIComponent(codes.sina_code); + + klineSource = new EventSource('/api/kline/stream?' + q); + streamActive = true; + updateRefreshHint(false); + + klineSource.addEventListener('kline', function (e) { + try { + var data = JSON.parse(e.data); + if (!data.bars || !data.bars.length) return; hideEmptyOverlay(); - renderChart(res.data, silent); - updateQuoteMeta(res.data); - updateRefreshHint(); - if (!quoteTimer) startQuotePoll(); - if (!klineTimer) startKlinePoll(); - }) - .catch(function (err) { - if (!silent) { - showEmptyOverlay(err.message || '加载失败'); - } - }) - .finally(function () { - klineLoading = false; - if (!silent) setLoading(false); - }); + renderChart(data, lastData !== null); + updateQuoteMeta(data); + updateRefreshHint(false); + setLoading(false); + } catch (err) { /* ignore */ } + }); + + klineSource.addEventListener('quote', function (e) { + try { + applyQuote(JSON.parse(e.data)); + } catch (err) { /* ignore */ } + }); + + klineSource.onerror = function () { + stopKlineStream(); + scheduleReconnect(); + }; } function updateQuoteMeta(data) { @@ -341,52 +364,13 @@ } var nameEl = document.getElementById('market-quote-name'); var hiddenName = document.getElementById('market-symbol-name'); - if (nameEl) { + if (nameEl && !(nameEl.textContent && nameEl.textContent.trim())) { nameEl.textContent = (hiddenName && hiddenName.value) || data.symbol || '—'; } } - function loadQuote() { - var codes = getMarketCodes(); - if (!codes.symbol) return; - var q = 'symbol=' + encodeURIComponent(codes.symbol); - if (codes.market_code) q += '&market_code=' + encodeURIComponent(codes.market_code); - if (codes.sina_code) q += '&sina_code=' + encodeURIComponent(codes.sina_code); - fetch('/api/market_quote?' + q) - .then(function (r) { return r.json(); }) - .then(function (data) { - var priceEl = document.getElementById('market-quote-price'); - var nameEl = document.getElementById('market-quote-name'); - if (nameEl && data.name) nameEl.textContent = data.name + ' ' + (data.symbol || ''); - if (priceEl) { - priceEl.textContent = data.price != null ? Number(data.price).toFixed(2) : '—'; - } - }) - .catch(function () { /* ignore */ }); - } - - function startQuotePoll() { - if (quoteTimer) clearInterval(quoteTimer); - loadQuote(); - var ms = quotePollMs(); - if (ms > 0) quoteTimer = setInterval(loadQuote, ms); - } - - function startKlinePoll() { - if (klineTimer) clearInterval(klineTimer); - var ms = klinePollMs(); - if (ms > 0 && getSymbol()) { - klineTimer = setInterval(function () { - loadKline(true); - updateRefreshHint(); - }, ms); - } - } - - function restartPollers() { - startQuotePoll(); - startKlinePoll(); - updateRefreshHint(); + function loadKline(showLoading) { + startKlineStream(showLoading); } function shiftDataZoom(delta) { @@ -421,8 +405,7 @@ tabs.querySelectorAll('.period-tab').forEach(function (el) { el.classList.remove('active'); }); btn.classList.add('active'); currentPeriod = btn.getAttribute('data-period') || '15m'; - restartPollers(); - if (getSymbol()) loadKline(false); + if (getSymbol()) loadKline(true); }); } @@ -444,19 +427,17 @@ if (active) currentPeriod = active.getAttribute('data-period') || '15m'; var loadBtn = document.getElementById('market-load-btn'); - if (loadBtn) loadBtn.addEventListener('click', function () { - restartPollers(); - loadKline(false); - }); + if (loadBtn) loadBtn.addEventListener('click', function () { loadKline(true); }); var hidden = document.getElementById('market-symbol-hidden'); var input = document.getElementById('market-symbol-input'); if (hidden && hidden.value) { if (input && !input.value) input.value = hidden.value; - restartPollers(); - loadKline(false); + loadKline(true); } else { - updateRefreshHint(); + updateRefreshHint(false); } + + window.addEventListener('beforeunload', stopKlineStream); }); })(); diff --git a/templates/market.html b/templates/market.html index 4ede7ef..cfefc56 100644 --- a/templates/market.html +++ b/templates/market.html @@ -38,7 +38,7 @@
请选择合约并点击「查看」
-

数据来源:新浪财经。支持滚轮/拖拽缩放 K 线;交易时段内行情与 K 线约 1 秒刷新。

+

数据来源:新浪财经。K 线由后台自动刷新并经 SSE 推送到前端;支持滚轮/拖拽缩放。