diff --git a/install_trading.py b/install_trading.py index 830a009..49aca31 100644 --- a/install_trading.py +++ b/install_trading.py @@ -3,6 +3,7 @@ from __future__ import annotations import json import logging +import threading from datetime import datetime from typing import Any, Callable, Optional @@ -28,6 +29,7 @@ from recommend_store import ( refresh_recommend_cache, ) from recommend_stream import recommend_hub, start_recommend_worker +from position_stream import position_hub, start_position_worker from ctp_reconnect import start_ctp_reconnect_worker from ctp_premarket_connect import start_ctp_premarket_connect_worker from ctp_fee_worker import start_ctp_fee_worker @@ -70,6 +72,7 @@ from ctp_symbol import ths_to_vnpy_symbol from vnpy_bridge import ( ctp_connect, ctp_get_account, + ctp_get_tick_price, ctp_list_active_orders, ctp_list_positions, ctp_status, @@ -290,8 +293,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se tp = float(mon["take_profit"]) if mon and mon.get("take_profit") is not None else None open_time = (mon.get("open_time") or "") if mon else "" holding = _holding_duration(open_time, now_iso) if open_time else "" - mark = None - if codes: + mark = ctp_get_tick_price(mode, sym) + if (mark is None or mark <= 0) and codes: mark = fetch_price( sym, codes.get("market_code", ""), @@ -382,6 +385,45 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se }) return rows + def _build_trading_live_payload(conn) -> dict: + mode = get_trading_mode(get_setting) + ctp_st = ctp_status(mode) + _sync_trade_monitors_with_ctp(conn, mode) + rows = _build_trading_live_rows(conn) + pending_orders = _build_pending_orders(conn, mode) + capital = _capital(conn) + risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode)) + return { + "ok": True, + "rows": rows, + "pending_orders": pending_orders, + "capital": capital, + "ctp_status": ctp_st, + "trading_mode_label": trading_mode_label(get_setting), + "risk_status": risk, + "trading_session": is_trading_session(), + } + + def _refresh_trading_live_snapshot() -> dict: + conn = get_db() + try: + init_strategy_tables(conn) + payload = _build_trading_live_payload(conn) + conn.commit() + return payload + finally: + conn.close() + + def _push_position_snapshot_async() -> None: + def _run() -> None: + try: + payload = _refresh_trading_live_snapshot() + position_hub.broadcast("positions", payload) + except Exception as exc: + logger.debug("push position snapshot: %s", exc) + + threading.Thread(target=_run, daemon=True).start() + @app.route("/trade") @login_required def trade_page(): @@ -466,29 +508,52 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se @app.route("/api/trading/live") @login_required def api_trading_live(): + cached = position_hub.get_snapshot() + if cached: + return jsonify(cached) conn = get_db() try: init_strategy_tables(conn) - mode = get_trading_mode(get_setting) - ctp_st = ctp_status(mode) - _sync_trade_monitors_with_ctp(conn, mode) - rows = _build_trading_live_rows(conn) - pending_orders = _build_pending_orders(conn, mode) - capital = _capital(conn) - risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode)) + payload = _build_trading_live_payload(conn) conn.commit() - return jsonify({ - "rows": rows, - "pending_orders": pending_orders, - "capital": capital, - "ctp_status": ctp_st, - "trading_mode_label": trading_mode_label(get_setting), - "risk_status": risk, - "trading_session": is_trading_session(), - }) + position_hub.set_snapshot(payload) + return jsonify(payload) finally: conn.close() + @app.route("/api/trading/stream") + @login_required + def api_trading_stream(): + from queue import Empty + + def generate(): + q = position_hub.subscribe() + try: + snap = position_hub.get_snapshot() + if snap: + yield sse_format("positions", snap) + else: + payload = _refresh_trading_live_snapshot() + position_hub.set_snapshot(payload) + yield sse_format("positions", payload) + while True: + try: + msg = q.get(timeout=25) + yield sse_format(msg["event"], msg["data"]) + except Empty: + yield ": heartbeat\n\n" + finally: + position_hub.unsubscribe(q) + + return Response( + generate(), + mimetype="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + @app.route("/api/trading/monitor/upsert", methods=["POST"]) @login_required def api_trading_monitor_upsert(): @@ -749,6 +814,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se ) conn.commit() conn.close() + _push_position_snapshot_async() return jsonify({"ok": True, "message": "已平仓并记入交易记录(手动平仓)"}) except ValueError as exc: conn.close() @@ -1018,6 +1084,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se conn.commit() send_wechat_msg(f"{trading_mode_label(get_setting)} {offset} {sym} {direction} {lots}手 @{price}") conn.close() + _push_position_snapshot_async() return jsonify({"ok": True, "result": result, "lots": lots, "message": "委托已提交柜台,限价单需成交后才会显示持仓"}) except (ValueError, RuntimeError) as exc: conn.close() @@ -1501,6 +1568,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se notify_fn=send_wechat_msg, interval=1, ) + start_position_worker( + refresh_fn=_refresh_trading_live_snapshot, + interval=1, + ) start_ctp_fee_worker( get_mode_fn=lambda: get_trading_mode(get_setting), get_setting_fn=get_setting, diff --git a/position_stream.py b/position_stream.py new file mode 100644 index 0000000..debb218 --- /dev/null +++ b/position_stream.py @@ -0,0 +1,101 @@ +"""持仓监控:后台拉取 CTP 并 SSE 推送给前端(避免每次刷新阻塞读柜台)。""" +from __future__ import annotations + +import logging +import queue +import threading +import time +from typing import Callable, Optional + +from kline_stream import sse_format +from market_sessions import is_trading_session + +logger = logging.getLogger(__name__) + +PUSH_INTERVAL_SEC = 1 +IDLE_INTERVAL_SEC = 5 + + +class PositionStreamHub: + def __init__(self) -> None: + self._lock = threading.Lock() + self._subs: list[queue.Queue] = [] + self._snapshot: Optional[dict] = None + self._snapshot_ts: float = 0.0 + + def subscribe(self) -> queue.Queue: + q: queue.Queue = queue.Queue(maxsize=16) + with self._lock: + self._subs.append(q) + return q + + def unsubscribe(self, q: queue.Queue) -> None: + with self._lock: + try: + self._subs.remove(q) + except ValueError: + pass + + def get_snapshot(self) -> Optional[dict]: + with self._lock: + return dict(self._snapshot) if self._snapshot else None + + def set_snapshot(self, data: dict) -> None: + with self._lock: + self._snapshot = dict(data) + self._snapshot_ts = time.time() + + def broadcast(self, event: str, data: dict) -> None: + self.set_snapshot(data) + msg = {"event": event, "data": data} + with self._lock: + subs = list(self._subs) + for q in subs: + try: + q.put_nowait(msg) + except queue.Full: + try: + q.get_nowait() + except queue.Empty: + pass + try: + q.put_nowait(msg) + except queue.Full: + pass + + +position_hub = PositionStreamHub() + + +def start_position_worker( + *, + refresh_fn: Callable[[], dict], + interval: int = PUSH_INTERVAL_SEC, + idle_interval: int = IDLE_INTERVAL_SEC, +) -> None: + """后台定时刷新持仓快照并 SSE 广播。""" + + def _loop() -> None: + time.sleep(3) + while True: + sleep_sec = idle_interval + try: + payload = refresh_fn() + if payload: + position_hub.broadcast("positions", payload) + connected = bool((payload or {}).get("ctp_status") or {}).get("connected") + in_session = bool((payload or {}).get("trading_session")) + rows = (payload or {}).get("rows") or [] + has_sl_tp = any( + r.get("stop_loss") is not None or r.get("take_profit") is not None + for r in rows + ) + if connected and in_session and (rows or has_sl_tp): + sleep_sec = max(1, interval) + elif connected: + sleep_sec = max(2, min(idle_interval, 3)) + except Exception as exc: + logger.warning("position worker failed: %s", exc) + time.sleep(sleep_sec) + + threading.Thread(target=_loop, daemon=True, name="position-stream").start() diff --git a/static/js/trade.js b/static/js/trade.js index b718126..a73fce9 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -11,8 +11,8 @@ var tpInput = document.getElementById('trade-tp'); var marketHint = document.getElementById('market-hint'); var metricsHint = document.getElementById('trade-metrics-hint'); - var pollTimer = null; var recommendSource = null; + var positionSource = null; var quoteTimer = null; var calcTimer = null; var lastQuotePrice = null; @@ -22,10 +22,11 @@ var isTradingSession = false; var hasSlTpMonitoring = false; var ctpConnected = false; - var pollIntervalMs = 0; + var positionsRendered = false; var selectedMaxLots = null; var recommendMaxByProduct = {}; var recommendMaxByCode = {}; + var POS_CACHE_KEY = 'qihuo_trading_live_v1'; function runWhenReady(fn) { if (document.readyState === 'loading') { @@ -98,22 +99,86 @@ } } + function loadPosCache() { + try { + var raw = sessionStorage.getItem(POS_CACHE_KEY); + if (!raw) return null; + return JSON.parse(raw); + } catch (e) { + return null; + } + } + + function savePosCache(data) { + try { + sessionStorage.setItem(POS_CACHE_KEY, JSON.stringify(data)); + } catch (e) { /* quota */ } + } + + function applyPositionsData(data) { + if (!list || !data) return; + var cap = document.getElementById('cap-display'); + if (cap && data.capital != null) cap.textContent = Number(data.capital).toFixed(2); + var connected = data.ctp_status && data.ctp_status.connected; + var connecting = data.ctp_status && data.ctp_status.connecting; + ctpConnected = !!connected; + isTradingSession = !!data.trading_session; + updateCtpBadge(!!connected, !!connecting); + var riskBadge = document.getElementById('risk-badge'); + if (riskBadge && data.risk_status) { + riskBadge.textContent = data.risk_status.status_label || ''; + riskBadge.className = 'badge ' + (data.risk_status.can_trade ? 'profit' : 'loss'); + } + var rows = data.rows || []; + hasSlTpMonitoring = rows.some(function (row) { + return row.stop_loss != null || row.take_profit != null; + }); + updateSessionUi(); + savePosCache(data); + positionsRendered = true; + if (!connected) { + if (connecting) { + list.innerHTML = '
CTP 连接中,请稍候…
'; + return; + } + list.innerHTML = '
CTP 未连接,正在尝试自动重连…
'; + tryAutoCtpReconnect(); + return; + } + if (!rows.length) { + var pendingOnly = data.pending_orders || []; + if (pendingOnly.length) { + list.innerHTML = '
柜台暂无持仓
' + + pendingOnly.map(function (p) { + var dismissBtn = p.monitor_id ? + '' : ''; + return ( + '
' + (p.label || '挂单') + ' · ' + (p.symbol || p.symbol_code) + '' + + '' + fmtNum(p.price) + ' · ' + + (p.lots || 1) + ' 手' + dismissBtn + '
' + ); + }).join(''); + bindPendingDismiss(list); + } else { + list.innerHTML = '
柜台暂无持仓。
'; + } + return; + } + list.innerHTML = rows.map(buildPosCard).join(''); + bindPendingDismiss(list); + bindSlTpButtons(list); + bindPlaceOrderButtons(list); + list.querySelectorAll('[data-close]').forEach(function (btn) { + btn.addEventListener('click', function () { + closePosition(JSON.parse(decodeURIComponent(btn.getAttribute('data-close'))), btn); + }); + }); + } + function schedulePositionPoll() { - var nextMs = 0; - if (hasSlTpMonitoring && isTradingSession) { - nextMs = 1000; - } else if (!ctpConnected) { - nextMs = 5000; - } - if (nextMs === pollIntervalMs && pollTimer) return; - pollIntervalMs = nextMs; - if (pollTimer) { - clearInterval(pollTimer); - pollTimer = null; - } - if (nextMs > 0) { - pollTimer = setInterval(pollPositions, nextMs); - } + /* 持仓改由后台 SSE 推送,保留空函数兼容旧调用 */ } function updateSessionUi() { @@ -654,71 +719,35 @@ return r.json(); }) .then(function (data) { - var cap = document.getElementById('cap-display'); - if (cap && data.capital != null) cap.textContent = Number(data.capital).toFixed(2); - var connected = data.ctp_status && data.ctp_status.connected; - var connecting = data.ctp_status && data.ctp_status.connecting; - ctpConnected = !!connected; - isTradingSession = !!data.trading_session; - updateCtpBadge(!!connected, !!connecting); - var riskBadge = document.getElementById('risk-badge'); - if (riskBadge && data.risk_status) { - riskBadge.textContent = data.risk_status.status_label || ''; - riskBadge.className = 'badge ' + (data.risk_status.can_trade ? 'profit' : 'loss'); - } - var rows = data.rows || []; - hasSlTpMonitoring = rows.some(function (row) { - return row.stop_loss != null || row.take_profit != null; - }); - schedulePositionPoll(); - updateSessionUi(); - if (!connected) { - if (connecting) { - list.innerHTML = '
CTP 连接中,请稍候…
'; - return; - } - list.innerHTML = '
CTP 未连接,正在尝试自动重连…
'; - tryAutoCtpReconnect(); - return; - } - if (!rows.length) { - var pendingOnly = data.pending_orders || []; - if (pendingOnly.length) { - list.innerHTML = '
柜台暂无持仓
' + - pendingOnly.map(function (p) { - var dismissBtn = p.monitor_id ? - '' : ''; - return ( - '
' + (p.label || '挂单') + ' · ' + (p.symbol || p.symbol_code) + '' + - '' + fmtNum(p.price) + ' · ' + - (p.lots || 1) + ' 手' + dismissBtn + '
' - ); - }).join(''); - bindPendingDismiss(list); - } else { - list.innerHTML = '
柜台暂无持仓。
'; - } - return; - } - list.innerHTML = rows.map(buildPosCard).join(''); - bindPendingDismiss(list); - bindSlTpButtons(list); - bindPlaceOrderButtons(list); - list.querySelectorAll('[data-close]').forEach(function (btn) { - btn.addEventListener('click', function () { - closePosition(JSON.parse(decodeURIComponent(btn.getAttribute('data-close'))), btn); - }); - }); + applyPositionsData(data); }) .catch(function () { - if (list.innerHTML.indexOf('pos-card') < 0) { + if (!positionsRendered && list.innerHTML.indexOf('pos-card') < 0) { list.innerHTML = '
持仓加载失败
'; } }); } + function connectPositionStream() { + if (positionSource) { + positionSource.close(); + positionSource = null; + } + positionSource = new EventSource('/api/trading/stream'); + positionSource.addEventListener('positions', function (ev) { + try { + applyPositionsData(JSON.parse(ev.data)); + } catch (e) { /* ignore */ } + }); + positionSource.onerror = function () { + if (positionSource) { + positionSource.close(); + positionSource = null; + } + setTimeout(connectPositionStream, 3000); + }; + } + function renderRecommendations(data) { if (!recommendList || !data) return; updateRecommendMaxMaps(data); @@ -811,14 +840,20 @@ runWhenReady(function () { setPriceType('limit'); - pollPositions(); + var cached = loadPosCache(); + if (cached) { + applyPositionsData(cached); + } + connectPositionStream(); connectRecommendStream(); fetch('/api/recommend/list') .then(function (r) { return r.json(); }) .then(function (data) { if (data.ok) renderRecommendations(data); }) .catch(function () {}); document.addEventListener('visibilitychange', function () { - if (document.visibilityState === 'visible') pollPositions(); + if (document.visibilityState === 'visible' && !positionSource) { + connectPositionStream(); + } }); updateSessionUi(); scheduleQuote(); diff --git a/templates/trade.html b/templates/trade.html index 2ac0e98..25e4481 100644 --- a/templates/trade.html +++ b/templates/trade.html @@ -103,9 +103,9 @@

持仓监控

-

数据来自 CTP 柜台;设止盈/止损后程序在开盘期间每秒监控,触发即市价平仓并记入交易记录。

+

后台每秒拉取 CTP 并推送;刷新页面会使用浏览器缓存,不再阻塞读柜台。

-
{% if ctp_status.connected %}加载中…{% else %}请先连接 CTP 查看柜台持仓{% endif %}
+
{% if ctp_status.connected %}等待持仓推送…{% else %}请先连接 CTP 查看柜台持仓{% endif %}