feat: 持仓监控后台 SSE 推送与浏览器缓存,刷新不再阻塞读柜台。

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-25 13:49:44 +08:00
parent f31164076f
commit bbcc5607ad
4 changed files with 304 additions and 97 deletions
+89 -18
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
import json
import logging
import threading
from datetime import datetime
from typing import Any, Callable, Optional
@@ -28,6 +29,7 @@ from recommend_store import (
refresh_recommend_cache,
)
from recommend_stream import recommend_hub, start_recommend_worker
from position_stream import position_hub, start_position_worker
from ctp_reconnect import start_ctp_reconnect_worker
from ctp_premarket_connect import start_ctp_premarket_connect_worker
from ctp_fee_worker import start_ctp_fee_worker
@@ -70,6 +72,7 @@ from ctp_symbol import ths_to_vnpy_symbol
from vnpy_bridge import (
ctp_connect,
ctp_get_account,
ctp_get_tick_price,
ctp_list_active_orders,
ctp_list_positions,
ctp_status,
@@ -290,8 +293,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
tp = float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None
open_time = (mon.get("open_time") or "") if mon else ""
holding = _holding_duration(open_time, now_iso) if open_time else ""
mark = None
if codes:
mark = ctp_get_tick_price(mode, sym)
if (mark is None or mark <= 0) and codes:
mark = fetch_price(
sym,
codes.get("market_code", ""),
@@ -382,6 +385,45 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
})
return rows
def _build_trading_live_payload(conn) -> dict:
mode = get_trading_mode(get_setting)
ctp_st = ctp_status(mode)
_sync_trade_monitors_with_ctp(conn, mode)
rows = _build_trading_live_rows(conn)
pending_orders = _build_pending_orders(conn, mode)
capital = _capital(conn)
risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode))
return {
"ok": True,
"rows": rows,
"pending_orders": pending_orders,
"capital": capital,
"ctp_status": ctp_st,
"trading_mode_label": trading_mode_label(get_setting),
"risk_status": risk,
"trading_session": is_trading_session(),
}
def _refresh_trading_live_snapshot() -> dict:
conn = get_db()
try:
init_strategy_tables(conn)
payload = _build_trading_live_payload(conn)
conn.commit()
return payload
finally:
conn.close()
def _push_position_snapshot_async() -> None:
def _run() -> None:
try:
payload = _refresh_trading_live_snapshot()
position_hub.broadcast("positions", payload)
except Exception as exc:
logger.debug("push position snapshot: %s", exc)
threading.Thread(target=_run, daemon=True).start()
@app.route("/trade")
@login_required
def trade_page():
@@ -466,29 +508,52 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
@app.route("/api/trading/live")
@login_required
def api_trading_live():
cached = position_hub.get_snapshot()
if cached:
return jsonify(cached)
conn = get_db()
try:
init_strategy_tables(conn)
mode = get_trading_mode(get_setting)
ctp_st = ctp_status(mode)
_sync_trade_monitors_with_ctp(conn, mode)
rows = _build_trading_live_rows(conn)
pending_orders = _build_pending_orders(conn, mode)
capital = _capital(conn)
risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode))
payload = _build_trading_live_payload(conn)
conn.commit()
return jsonify({
"rows": rows,
"pending_orders": pending_orders,
"capital": capital,
"ctp_status": ctp_st,
"trading_mode_label": trading_mode_label(get_setting),
"risk_status": risk,
"trading_session": is_trading_session(),
})
position_hub.set_snapshot(payload)
return jsonify(payload)
finally:
conn.close()
@app.route("/api/trading/stream")
@login_required
def api_trading_stream():
from queue import Empty
def generate():
q = position_hub.subscribe()
try:
snap = position_hub.get_snapshot()
if snap:
yield sse_format("positions", snap)
else:
payload = _refresh_trading_live_snapshot()
position_hub.set_snapshot(payload)
yield sse_format("positions", payload)
while True:
try:
msg = q.get(timeout=25)
yield sse_format(msg["event"], msg["data"])
except Empty:
yield ": heartbeat\n\n"
finally:
position_hub.unsubscribe(q)
return Response(
generate(),
mimetype="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@app.route("/api/trading/monitor/upsert", methods=["POST"])
@login_required
def api_trading_monitor_upsert():
@@ -749,6 +814,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
)
conn.commit()
conn.close()
_push_position_snapshot_async()
return jsonify({"ok": True, "message": "已平仓并记入交易记录(手动平仓)"})
except ValueError as exc:
conn.close()
@@ -1018,6 +1084,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
conn.commit()
send_wechat_msg(f"{trading_mode_label(get_setting)} {offset} {sym} {direction} {lots}手 @{price}")
conn.close()
_push_position_snapshot_async()
return jsonify({"ok": True, "result": result, "lots": lots, "message": "委托已提交柜台,限价单需成交后才会显示持仓"})
except (ValueError, RuntimeError) as exc:
conn.close()
@@ -1501,6 +1568,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
notify_fn=send_wechat_msg,
interval=1,
)
start_position_worker(
refresh_fn=_refresh_trading_live_snapshot,
interval=1,
)
start_ctp_fee_worker(
get_mode_fn=lambda: get_trading_mode(get_setting),
get_setting_fn=get_setting,
+101
View File
@@ -0,0 +1,101 @@
"""持仓监控:后台拉取 CTP 并 SSE 推送给前端(避免每次刷新阻塞读柜台)。"""
from __future__ import annotations
import logging
import queue
import threading
import time
from typing import Callable, Optional
from kline_stream import sse_format
from market_sessions import is_trading_session
logger = logging.getLogger(__name__)
PUSH_INTERVAL_SEC = 1
IDLE_INTERVAL_SEC = 5
class PositionStreamHub:
def __init__(self) -> None:
self._lock = threading.Lock()
self._subs: list[queue.Queue] = []
self._snapshot: Optional[dict] = None
self._snapshot_ts: float = 0.0
def subscribe(self) -> queue.Queue:
q: queue.Queue = queue.Queue(maxsize=16)
with self._lock:
self._subs.append(q)
return q
def unsubscribe(self, q: queue.Queue) -> None:
with self._lock:
try:
self._subs.remove(q)
except ValueError:
pass
def get_snapshot(self) -> Optional[dict]:
with self._lock:
return dict(self._snapshot) if self._snapshot else None
def set_snapshot(self, data: dict) -> None:
with self._lock:
self._snapshot = dict(data)
self._snapshot_ts = time.time()
def broadcast(self, event: str, data: dict) -> None:
self.set_snapshot(data)
msg = {"event": event, "data": data}
with self._lock:
subs = list(self._subs)
for q in subs:
try:
q.put_nowait(msg)
except queue.Full:
try:
q.get_nowait()
except queue.Empty:
pass
try:
q.put_nowait(msg)
except queue.Full:
pass
position_hub = PositionStreamHub()
def start_position_worker(
*,
refresh_fn: Callable[[], dict],
interval: int = PUSH_INTERVAL_SEC,
idle_interval: int = IDLE_INTERVAL_SEC,
) -> None:
"""后台定时刷新持仓快照并 SSE 广播。"""
def _loop() -> None:
time.sleep(3)
while True:
sleep_sec = idle_interval
try:
payload = refresh_fn()
if payload:
position_hub.broadcast("positions", payload)
connected = bool((payload or {}).get("ctp_status") or {}).get("connected")
in_session = bool((payload or {}).get("trading_session"))
rows = (payload or {}).get("rows") or []
has_sl_tp = any(
r.get("stop_loss") is not None or r.get("take_profit") is not None
for r in rows
)
if connected and in_session and (rows or has_sl_tp):
sleep_sec = max(1, interval)
elif connected:
sleep_sec = max(2, min(idle_interval, 3))
except Exception as exc:
logger.warning("position worker failed: %s", exc)
time.sleep(sleep_sec)
threading.Thread(target=_loop, daemon=True, name="position-stream").start()
+112 -77
View File
@@ -11,8 +11,8 @@
var tpInput = document.getElementById('trade-tp');
var marketHint = document.getElementById('market-hint');
var metricsHint = document.getElementById('trade-metrics-hint');
var pollTimer = null;
var recommendSource = null;
var positionSource = null;
var quoteTimer = null;
var calcTimer = null;
var lastQuotePrice = null;
@@ -22,10 +22,11 @@
var isTradingSession = false;
var hasSlTpMonitoring = false;
var ctpConnected = false;
var pollIntervalMs = 0;
var positionsRendered = false;
var selectedMaxLots = null;
var recommendMaxByProduct = {};
var recommendMaxByCode = {};
var POS_CACHE_KEY = 'qihuo_trading_live_v1';
function runWhenReady(fn) {
if (document.readyState === 'loading') {
@@ -98,22 +99,86 @@
}
}
function loadPosCache() {
try {
var raw = sessionStorage.getItem(POS_CACHE_KEY);
if (!raw) return null;
return JSON.parse(raw);
} catch (e) {
return null;
}
}
function savePosCache(data) {
try {
sessionStorage.setItem(POS_CACHE_KEY, JSON.stringify(data));
} catch (e) { /* quota */ }
}
function applyPositionsData(data) {
if (!list || !data) return;
var cap = document.getElementById('cap-display');
if (cap && data.capital != null) cap.textContent = Number(data.capital).toFixed(2);
var connected = data.ctp_status && data.ctp_status.connected;
var connecting = data.ctp_status && data.ctp_status.connecting;
ctpConnected = !!connected;
isTradingSession = !!data.trading_session;
updateCtpBadge(!!connected, !!connecting);
var riskBadge = document.getElementById('risk-badge');
if (riskBadge && data.risk_status) {
riskBadge.textContent = data.risk_status.status_label || '';
riskBadge.className = 'badge ' + (data.risk_status.can_trade ? 'profit' : 'loss');
}
var rows = data.rows || [];
hasSlTpMonitoring = rows.some(function (row) {
return row.stop_loss != null || row.take_profit != null;
});
updateSessionUi();
savePosCache(data);
positionsRendered = true;
if (!connected) {
if (connecting) {
list.innerHTML = '<div class="empty-hint">CTP 连接中,请稍候…</div>';
return;
}
list.innerHTML = '<div class="empty-hint">CTP 未连接,正在尝试自动重连…</div>';
tryAutoCtpReconnect();
return;
}
if (!rows.length) {
var pendingOnly = data.pending_orders || [];
if (pendingOnly.length) {
list.innerHTML = '<div class="empty-hint" style="margin-bottom:.75rem">柜台暂无持仓</div>' +
pendingOnly.map(function (p) {
var dismissBtn = p.monitor_id ?
'<button type="button" class="pos-dismiss-btn" data-monitor-id="' + p.monitor_id + '">取消</button>' : '';
return (
'<div class="pos-pending-item ' +
(p.order_kind === 'stop_loss' ? 'sl' : (p.order_kind === 'take_profit' ? 'tp' : 'ctp')) +
'"><span>' + (p.label || '挂单') + ' · ' + (p.symbol || p.symbol_code) + '</span>' +
'<span class="pos-pending-right"><strong>' + fmtNum(p.price) + '</strong> · ' +
(p.lots || 1) + ' 手' + dismissBtn + '</span></div>'
);
}).join('');
bindPendingDismiss(list);
} else {
list.innerHTML = '<div class="empty-hint">柜台暂无持仓。</div>';
}
return;
}
list.innerHTML = rows.map(buildPosCard).join('');
bindPendingDismiss(list);
bindSlTpButtons(list);
bindPlaceOrderButtons(list);
list.querySelectorAll('[data-close]').forEach(function (btn) {
btn.addEventListener('click', function () {
closePosition(JSON.parse(decodeURIComponent(btn.getAttribute('data-close'))), btn);
});
});
}
function schedulePositionPoll() {
var nextMs = 0;
if (hasSlTpMonitoring && isTradingSession) {
nextMs = 1000;
} else if (!ctpConnected) {
nextMs = 5000;
}
if (nextMs === pollIntervalMs && pollTimer) return;
pollIntervalMs = nextMs;
if (pollTimer) {
clearInterval(pollTimer);
pollTimer = null;
}
if (nextMs > 0) {
pollTimer = setInterval(pollPositions, nextMs);
}
/* 持仓改由后台 SSE 推送,保留空函数兼容旧调用 */
}
function updateSessionUi() {
@@ -654,71 +719,35 @@
return r.json();
})
.then(function (data) {
var cap = document.getElementById('cap-display');
if (cap && data.capital != null) cap.textContent = Number(data.capital).toFixed(2);
var connected = data.ctp_status && data.ctp_status.connected;
var connecting = data.ctp_status && data.ctp_status.connecting;
ctpConnected = !!connected;
isTradingSession = !!data.trading_session;
updateCtpBadge(!!connected, !!connecting);
var riskBadge = document.getElementById('risk-badge');
if (riskBadge && data.risk_status) {
riskBadge.textContent = data.risk_status.status_label || '';
riskBadge.className = 'badge ' + (data.risk_status.can_trade ? 'profit' : 'loss');
}
var rows = data.rows || [];
hasSlTpMonitoring = rows.some(function (row) {
return row.stop_loss != null || row.take_profit != null;
});
schedulePositionPoll();
updateSessionUi();
if (!connected) {
if (connecting) {
list.innerHTML = '<div class="empty-hint">CTP 连接中,请稍候…</div>';
return;
}
list.innerHTML = '<div class="empty-hint">CTP 未连接,正在尝试自动重连…</div>';
tryAutoCtpReconnect();
return;
}
if (!rows.length) {
var pendingOnly = data.pending_orders || [];
if (pendingOnly.length) {
list.innerHTML = '<div class="empty-hint" style="margin-bottom:.75rem">柜台暂无持仓</div>' +
pendingOnly.map(function (p) {
var dismissBtn = p.monitor_id ?
'<button type="button" class="pos-dismiss-btn" data-monitor-id="' + p.monitor_id + '">取消</button>' : '';
return (
'<div class="pos-pending-item ' +
(p.order_kind === 'stop_loss' ? 'sl' : (p.order_kind === 'take_profit' ? 'tp' : 'ctp')) +
'"><span>' + (p.label || '挂单') + ' · ' + (p.symbol || p.symbol_code) + '</span>' +
'<span class="pos-pending-right"><strong>' + fmtNum(p.price) + '</strong> · ' +
(p.lots || 1) + ' 手' + dismissBtn + '</span></div>'
);
}).join('');
bindPendingDismiss(list);
} else {
list.innerHTML = '<div class="empty-hint">柜台暂无持仓。</div>';
}
return;
}
list.innerHTML = rows.map(buildPosCard).join('');
bindPendingDismiss(list);
bindSlTpButtons(list);
bindPlaceOrderButtons(list);
list.querySelectorAll('[data-close]').forEach(function (btn) {
btn.addEventListener('click', function () {
closePosition(JSON.parse(decodeURIComponent(btn.getAttribute('data-close'))), btn);
});
});
applyPositionsData(data);
})
.catch(function () {
if (list.innerHTML.indexOf('pos-card') < 0) {
if (!positionsRendered && list.innerHTML.indexOf('pos-card') < 0) {
list.innerHTML = '<div class="empty-hint text-loss">持仓加载失败</div>';
}
});
}
function connectPositionStream() {
if (positionSource) {
positionSource.close();
positionSource = null;
}
positionSource = new EventSource('/api/trading/stream');
positionSource.addEventListener('positions', function (ev) {
try {
applyPositionsData(JSON.parse(ev.data));
} catch (e) { /* ignore */ }
});
positionSource.onerror = function () {
if (positionSource) {
positionSource.close();
positionSource = null;
}
setTimeout(connectPositionStream, 3000);
};
}
function renderRecommendations(data) {
if (!recommendList || !data) return;
updateRecommendMaxMaps(data);
@@ -811,14 +840,20 @@
runWhenReady(function () {
setPriceType('limit');
pollPositions();
var cached = loadPosCache();
if (cached) {
applyPositionsData(cached);
}
connectPositionStream();
connectRecommendStream();
fetch('/api/recommend/list')
.then(function (r) { return r.json(); })
.then(function (data) { if (data.ok) renderRecommendations(data); })
.catch(function () {});
document.addEventListener('visibilitychange', function () {
if (document.visibilityState === 'visible') pollPositions();
if (document.visibilityState === 'visible' && !positionSource) {
connectPositionStream();
}
});
updateSessionUi();
scheduleQuote();
+2 -2
View File
@@ -103,9 +103,9 @@
<div class="card trade-card" id="positions">
<h2>持仓监控</h2>
<p class="hint pos-hint">数据来自 CTP 柜台;设止盈/止损后程序在开盘期间每秒监控,触发即市价平仓并记入交易记录</p>
<p class="hint pos-hint">后台每秒拉取 CTP 并推送;刷新页面会使用浏览器缓存,不再阻塞读柜台</p>
<div class="card-body card-scroll" id="position-live-list">
<div class="empty-hint">{% if ctp_status.connected %}加载中…{% else %}请先连接 CTP 查看柜台持仓{% endif %}</div>
<div class="empty-hint" id="position-placeholder">{% if ctp_status.connected %}等待持仓推送…{% else %}请先连接 CTP 查看柜台持仓{% endif %}</div>
</div>
</div>
</div>