K线后台自动刷新并通过SSE推送到前端,移除轮询

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-15 17:33:31 +08:00
parent b804bd19a7
commit 65992eb35e
5 changed files with 327 additions and 134 deletions
+79 -17
View File
@@ -13,7 +13,7 @@ from werkzeug.utils import secure_filename
from dotenv import load_dotenv
from flask import (
Flask, render_template, request, redirect, url_for,
flash, session, jsonify,
flash, session, jsonify, Response, stream_with_context,
)
from werkzeug.security import check_password_hash, generate_password_hash
@@ -31,6 +31,7 @@ from fee_sync import sync_fees_from_akshare
from contract_profile import get_contract_profile
from stats_engine import STATS_VIEWS, load_stats_cache, refresh_stats_cache
from kline_store import ensure_kline_tables
from kline_stream import kline_hub, sse_format
from kline_chart import generate_review_kline_chart, fetch_market_klines, MARKET_PERIODS
from market import get_price as market_get_price, set_ths_refresh_token, get_quote_source_label
@@ -364,6 +365,29 @@ def sync_ths_token():
sync_ths_token()
def build_market_quote_payload(
symbol: str,
market_code: str = "",
sina_code: str = "",
) -> dict:
if not market_code or not sina_code:
codes = ths_to_codes(symbol)
if codes:
market_code = codes.get("market_code", "") or market_code
sina_code = codes.get("sina_code", "") or sina_code
price = market_get_price(market_code, sina_code)
name = symbol
codes = ths_to_codes(symbol)
if codes:
name = codes.get("name", symbol)
return {
"symbol": symbol,
"name": name,
"price": price,
}
# —————————————— 推送 ——————————————
def send_wechat_msg(content: str):
@@ -542,6 +566,17 @@ def background_task():
pass
time.sleep(3)
def start_background_threads():
threading.Thread(target=background_task, daemon=True).start()
threading.Thread(
target=lambda: kline_hub.worker_loop(DB_PATH, build_market_quote_payload),
daemon=True,
).start()
start_background_threads()
# —————————————— 登录 ——————————————
def login_required(f):
@@ -1334,6 +1369,48 @@ def api_kline():
return jsonify(data)
@app.route("/api/kline/stream")
@login_required
def api_kline_stream():
from queue import Empty
symbol = request.args.get("symbol", "").strip()
period = request.args.get("period", "15m").strip()
market_code = request.args.get("market_code", "").strip()
sina_code = request.args.get("sina_code", "").strip()
if not symbol:
return jsonify({"error": "请提供合约代码"}), 400
def generate():
sub = kline_hub.subscribe(symbol, period, market_code, sina_code)
try:
kline_data = fetch_market_klines(symbol, period, DB_PATH)
if kline_data.get("bars"):
yield sse_format("kline", kline_data)
yield sse_format(
"quote",
build_market_quote_payload(symbol, market_code, sina_code),
)
while True:
try:
msg = sub.queue.get(timeout=20)
yield sse_format(msg["event"], msg["data"])
except Empty:
yield ": heartbeat\n\n"
finally:
kline_hub.unsubscribe(sub)
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
@app.route("/api/market_quote")
@login_required
def api_market_quote():
@@ -1342,21 +1419,7 @@ def api_market_quote():
sina_code = request.args.get("sina_code", "").strip()
if not symbol and not market_code:
return jsonify({"error": "请提供合约"}), 400
if not market_code or not sina_code:
codes = ths_to_codes(symbol)
if codes:
market_code = codes.get("market_code", "") or market_code
sina_code = codes.get("sina_code", "") or sina_code
price = market_get_price(market_code, sina_code)
name = symbol
codes = ths_to_codes(symbol)
if codes:
name = codes.get("name", symbol)
return jsonify({
"symbol": symbol,
"name": name,
"price": price,
})
return jsonify(build_market_quote_payload(symbol, market_code, sina_code))
@app.route("/contract")
@@ -1491,5 +1554,4 @@ def settings():
# —————————————— 启动 ——————————————
if __name__ == "__main__":
threading.Thread(target=background_task, daemon=True).start()
app.run(host=HOST, port=PORT, debug=DEBUG)
+9 -4
View File
@@ -215,7 +215,12 @@ def bars_to_api(bars: list) -> list[dict]:
return result
def fetch_market_klines(symbol: str, period: str, db_path: Optional[str] = None) -> dict:
def fetch_market_klines(
symbol: str,
period: str,
db_path: Optional[str] = None,
force_remote: bool = False,
) -> dict:
chart_sym = ths_to_sina_chart_symbol(symbol)
p = (period or "15m").lower()
if p == "timeshare":
@@ -227,7 +232,7 @@ def fetch_market_klines(symbol: str, period: str, db_path: Optional[str] = None)
source = "remote"
cached_at = None
if db_path and chart_sym:
if db_path and chart_sym and not force_remote:
try:
conn = sqlite3.connect(db_path)
cached = get_cached_entry(conn, chart_sym, p)
@@ -239,7 +244,7 @@ def fetch_market_klines(symbol: str, period: str, db_path: Optional[str] = None)
except Exception as exc:
logger.warning("kline cache read failed %s %s: %s", chart_sym, p, exc)
if not bars:
if force_remote or not bars:
remote_bars = fetch_sina_klines(symbol, p)
if remote_bars:
bars = remote_bars
@@ -257,7 +262,7 @@ def fetch_market_klines(symbol: str, period: str, db_path: Optional[str] = None)
cached_at = meta[0] if meta else None
except Exception as exc:
logger.warning("kline cache write failed %s %s: %s", chart_sym, p, exc)
elif db_path and chart_sym:
elif not bars and db_path and chart_sym:
try:
conn = sqlite3.connect(db_path)
cached = get_cached_entry(conn, chart_sym, p)
+145
View File
@@ -0,0 +1,145 @@
"""K 线 SSE 推送与后台刷新。"""
from __future__ import annotations
import json
import logging
import queue
import threading
import time
from dataclasses import dataclass, field
from datetime import datetime
from typing import Callable, Optional
from zoneinfo import ZoneInfo
from kline_chart import fetch_market_klines, ths_to_sina_chart_symbol
from kline_store import is_cache_fresh, load_meta, ensure_kline_tables
logger = logging.getLogger(__name__)
TZ = ZoneInfo("Asia/Shanghai")
FAST_PERIODS = frozenset({
"timeshare", "1m", "2m", "5m", "15m", "1h", "2h", "4h",
})
def is_trading_session() -> bool:
d = datetime.now(TZ)
wd = d.weekday()
if wd == 6:
return False
if wd == 5 and d.hour < 21:
return False
t = d.hour * 60 + d.minute()
def in_range(sh: int, sm: int, eh: int, em: int) -> bool:
return t >= sh * 60 + sm and t < eh * 60 + em
if in_range(9, 0, 11, 30):
return True
if in_range(13, 30, 15, 0):
return True
if in_range(21, 0, 24, 0):
return True
if in_range(0, 0, 2, 30):
return True
return False
def sse_format(event: str, data: dict) -> str:
return f"event: {event}\ndata: {json.dumps(data, ensure_ascii=False)}\n\n"
@dataclass
class KlineSubscription:
symbol: str
period: str
market_code: str = ""
sina_code: str = ""
queue: queue.Queue = field(default_factory=queue.Queue)
class KlineStreamHub:
def __init__(self):
self._lock = threading.Lock()
self._subs: list[KlineSubscription] = []
def subscribe(
self,
symbol: str,
period: str,
market_code: str = "",
sina_code: str = "",
) -> KlineSubscription:
sub = KlineSubscription(
symbol=symbol.strip(),
period=(period or "15m").strip().lower(),
market_code=market_code.strip(),
sina_code=sina_code.strip(),
)
with self._lock:
self._subs.append(sub)
return sub
def unsubscribe(self, sub: KlineSubscription) -> None:
with self._lock:
try:
self._subs.remove(sub)
except ValueError:
pass
def _snapshot_subs(self) -> list[KlineSubscription]:
with self._lock:
return list(self._subs)
def publish(self, sub: KlineSubscription, event: str, data: dict) -> None:
try:
sub.queue.put_nowait({"event": event, "data": data})
except queue.Full:
pass
def _should_refresh(self, sub: KlineSubscription, db_path: str) -> bool:
chart_sym = ths_to_sina_chart_symbol(sub.symbol)
if not chart_sym:
return False
if is_trading_session() and sub.period in FAST_PERIODS:
return True
try:
import sqlite3
conn = sqlite3.connect(db_path)
ensure_kline_tables(conn)
meta = load_meta(conn, chart_sym, sub.period)
conn.close()
if not meta:
return True
return not is_cache_fresh(sub.period, meta.get("updated_at", ""))
except Exception as exc:
logger.warning("kline refresh check failed: %s", exc)
return True
def worker_loop(self, db_path: str, quote_fn: Callable[..., dict]) -> None:
while True:
try:
subs = self._snapshot_subs()
for sub in subs:
if not self._should_refresh(sub, db_path):
continue
try:
kline_data = fetch_market_klines(
sub.symbol, sub.period, db_path, force_remote=True,
)
if kline_data.get("bars"):
self.publish(sub, "kline", kline_data)
quote_data = quote_fn(
sub.symbol, sub.market_code, sub.sina_code,
)
if quote_data:
self.publish(sub, "quote", quote_data)
except Exception as exc:
logger.warning(
"kline stream refresh %s %s: %s",
sub.symbol, sub.period, exc,
)
except Exception as exc:
logger.warning("kline stream worker: %s", exc)
time.sleep(1)
kline_hub = KlineStreamHub()
+93 -112
View File
@@ -4,12 +4,10 @@
var wrapEl = chartEl && chartEl.parentElement;
var chart = null;
var currentPeriod = '15m';
var quoteTimer = null;
var klineTimer = null;
var klineSource = null;
var streamActive = false;
var reconnectTimer = null;
var lastData = null;
var klineLoading = false;
var FAST_PERIODS = ['timeshare', '1m', '2m', '5m', '15m', '1h', '2h', '4h'];
function getSymbol() {
var hidden = document.getElementById('market-symbol-hidden');
@@ -59,19 +57,6 @@
return false;
}
function klinePollMs() {
if (!isTradingSession()) return 0;
if (currentPeriod === 'timeshare' || FAST_PERIODS.indexOf(currentPeriod) >= 0) {
return 1000;
}
if (currentPeriod === 'd' || currentPeriod === 'w') return 30000;
return 5000;
}
function quotePollMs() {
return isTradingSession() ? 1000 : 10000;
}
function initChart() {
if (!chartEl || !window.echarts) return;
chart = echarts.init(chartEl);
@@ -254,9 +239,7 @@
}
function hideEmptyOverlay() {
if (emptyEl) {
emptyEl.style.display = '';
}
if (emptyEl) emptyEl.style.display = '';
if (wrapEl) wrapEl.classList.add('has-data');
}
@@ -272,66 +255,106 @@
var btn = document.getElementById('market-load-btn');
if (btn) {
btn.disabled = on;
btn.textContent = on ? '加载中…' : '查看';
}
if (on) {
showEmptyOverlay('加载中…');
} else if (lastData) {
hideEmptyOverlay();
btn.textContent = on ? '连接中…' : '查看';
}
if (on) showEmptyOverlay('连接中…');
else if (lastData) hideEmptyOverlay();
}
function updateRefreshHint() {
function updateRefreshHint(disconnected) {
var el = document.getElementById('market-refresh-hint');
if (!el) return;
if (!getSymbol()) {
el.textContent = '';
return;
}
if (disconnected) {
el.textContent = 'SSE 连接中断,正在重连…';
return;
}
if (!streamActive) {
el.textContent = '';
return;
}
var src = lastData && lastData.source === 'local' ? ' · 本地缓存' : '';
if (isTradingSession()) {
var ms = klinePollMs();
var src = lastData && lastData.source === 'local' ? ' · 本地缓存' : '';
el.textContent = ms === 1000
? '交易中 · 1秒刷新' + src
: '交易中 · 自动刷新' + src;
el.textContent = '交易中 · 后台刷新 · SSE 推送(约1秒)' + src;
} else {
el.textContent = '非交易时段 · 暂停高频刷新';
el.textContent = 'SSE 推送 · 非交易时段低频刷新' + src;
}
}
function loadKline(silent) {
function applyQuote(data) {
var priceEl = document.getElementById('market-quote-price');
var nameEl = document.getElementById('market-quote-name');
if (nameEl && data.name) nameEl.textContent = data.name + ' ' + (data.symbol || '');
if (priceEl) {
priceEl.textContent = data.price != null ? Number(data.price).toFixed(2) : '—';
}
}
function stopKlineStream() {
streamActive = false;
if (reconnectTimer) {
clearTimeout(reconnectTimer);
reconnectTimer = null;
}
if (klineSource) {
klineSource.close();
klineSource = null;
}
}
function scheduleReconnect() {
if (reconnectTimer) return;
updateRefreshHint(true);
reconnectTimer = setTimeout(function () {
reconnectTimer = null;
if (getSymbol()) startKlineStream(false);
}, 3000);
}
function startKlineStream(showLoading) {
stopKlineStream();
var symbol = getSymbol();
if (!symbol) {
if (!silent) alert('请先选择或输入合约代码');
alert('请先选择或输入合约代码');
return;
}
if (klineLoading) return;
klineLoading = true;
if (!silent) setLoading(true);
if (showLoading) setLoading(true);
var url = '/api/kline?symbol=' + encodeURIComponent(symbol) + '&period=' + encodeURIComponent(currentPeriod);
fetch(url)
.then(function (r) {
return r.json().then(function (j) { return { ok: r.ok, data: j }; });
})
.then(function (res) {
if (!res.ok) throw new Error(res.data.error || '加载失败');
var codes = getMarketCodes();
var q = 'symbol=' + encodeURIComponent(symbol) +
'&period=' + encodeURIComponent(currentPeriod);
if (codes.market_code) q += '&market_code=' + encodeURIComponent(codes.market_code);
if (codes.sina_code) q += '&sina_code=' + encodeURIComponent(codes.sina_code);
klineSource = new EventSource('/api/kline/stream?' + q);
streamActive = true;
updateRefreshHint(false);
klineSource.addEventListener('kline', function (e) {
try {
var data = JSON.parse(e.data);
if (!data.bars || !data.bars.length) return;
hideEmptyOverlay();
renderChart(res.data, silent);
updateQuoteMeta(res.data);
updateRefreshHint();
if (!quoteTimer) startQuotePoll();
if (!klineTimer) startKlinePoll();
})
.catch(function (err) {
if (!silent) {
showEmptyOverlay(err.message || '加载失败');
}
})
.finally(function () {
klineLoading = false;
if (!silent) setLoading(false);
});
renderChart(data, lastData !== null);
updateQuoteMeta(data);
updateRefreshHint(false);
setLoading(false);
} catch (err) { /* ignore */ }
});
klineSource.addEventListener('quote', function (e) {
try {
applyQuote(JSON.parse(e.data));
} catch (err) { /* ignore */ }
});
klineSource.onerror = function () {
stopKlineStream();
scheduleReconnect();
};
}
function updateQuoteMeta(data) {
@@ -341,52 +364,13 @@
}
var nameEl = document.getElementById('market-quote-name');
var hiddenName = document.getElementById('market-symbol-name');
if (nameEl) {
if (nameEl && !(nameEl.textContent && nameEl.textContent.trim())) {
nameEl.textContent = (hiddenName && hiddenName.value) || data.symbol || '—';
}
}
function loadQuote() {
var codes = getMarketCodes();
if (!codes.symbol) return;
var q = 'symbol=' + encodeURIComponent(codes.symbol);
if (codes.market_code) q += '&market_code=' + encodeURIComponent(codes.market_code);
if (codes.sina_code) q += '&sina_code=' + encodeURIComponent(codes.sina_code);
fetch('/api/market_quote?' + q)
.then(function (r) { return r.json(); })
.then(function (data) {
var priceEl = document.getElementById('market-quote-price');
var nameEl = document.getElementById('market-quote-name');
if (nameEl && data.name) nameEl.textContent = data.name + ' ' + (data.symbol || '');
if (priceEl) {
priceEl.textContent = data.price != null ? Number(data.price).toFixed(2) : '—';
}
})
.catch(function () { /* ignore */ });
}
function startQuotePoll() {
if (quoteTimer) clearInterval(quoteTimer);
loadQuote();
var ms = quotePollMs();
if (ms > 0) quoteTimer = setInterval(loadQuote, ms);
}
function startKlinePoll() {
if (klineTimer) clearInterval(klineTimer);
var ms = klinePollMs();
if (ms > 0 && getSymbol()) {
klineTimer = setInterval(function () {
loadKline(true);
updateRefreshHint();
}, ms);
}
}
function restartPollers() {
startQuotePoll();
startKlinePoll();
updateRefreshHint();
function loadKline(showLoading) {
startKlineStream(showLoading);
}
function shiftDataZoom(delta) {
@@ -421,8 +405,7 @@
tabs.querySelectorAll('.period-tab').forEach(function (el) { el.classList.remove('active'); });
btn.classList.add('active');
currentPeriod = btn.getAttribute('data-period') || '15m';
restartPollers();
if (getSymbol()) loadKline(false);
if (getSymbol()) loadKline(true);
});
}
@@ -444,19 +427,17 @@
if (active) currentPeriod = active.getAttribute('data-period') || '15m';
var loadBtn = document.getElementById('market-load-btn');
if (loadBtn) loadBtn.addEventListener('click', function () {
restartPollers();
loadKline(false);
});
if (loadBtn) loadBtn.addEventListener('click', function () { loadKline(true); });
var hidden = document.getElementById('market-symbol-hidden');
var input = document.getElementById('market-symbol-input');
if (hidden && hidden.value) {
if (input && !input.value) input.value = hidden.value;
restartPollers();
loadKline(false);
loadKline(true);
} else {
updateRefreshHint();
updateRefreshHint(false);
}
window.addEventListener('beforeunload', stopKlineStream);
});
})();
+1 -1
View File
@@ -38,7 +38,7 @@
<div id="market-chart" class="market-chart" aria-label="K线图"></div>
<div class="market-chart-empty" id="market-chart-empty">请选择合约并点击「查看」</div>
</div>
<p class="hint">数据来源:新浪财经。支持滚轮/拖拽缩放 K 线;交易时段内行情与 K 线约 1 秒刷新</p>
<p class="hint">数据来源:新浪财经。K 线由后台自动刷新并经 SSE 推送到前端;支持滚轮/拖拽缩放。</p>
</div>
<style>