From df79017b30149fa0f72fa2c438e9c3cc8efde8e7 Mon Sep 17 00:00:00 2001 From: dekun Date: Mon, 29 Jun 2026 21:14:41 +0800 Subject: [PATCH] Stream real-time position quotes via tick-driven SSE with incremental UI updates. Co-authored-by: Cursor --- install_trading.py | 57 ++++++++++++++++++++++++++++++++++++++++++++++ position_stream.py | 11 +++++++-- static/js/trade.js | 50 +++++++++++++++++++++++++++++++++++++--- vnpy_bridge.py | 38 ++++++++++++++++++++++++++++--- 4 files changed, 148 insertions(+), 8 deletions(-) diff --git a/install_trading.py b/install_trading.py index ba68a8c..9a12dfe 100644 --- a/install_trading.py +++ b/install_trading.py @@ -138,6 +138,7 @@ from vnpy_bridge import ( get_bridge, set_position_refresh_callback, set_tick_sl_tp_callback, + set_tick_quote_callback, set_ctp_connected_callback, ) @@ -1622,6 +1623,61 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se threading.Thread(target=_run, daemon=True).start() + def _build_position_quotes_payload(mode: str) -> dict: + """轻量现价/浮盈(仅读 tick 缓存,不走 SQLite)。""" + if not ctp_status(mode).get("connected"): + return {"ok": True, "quotes": []} + from contract_specs import get_contract_spec + + positions = trading_state.get_positions() + if not positions: + positions = _ctp_positions(mode, refresh_if_empty=False) + quotes: list[dict] = [] + for p in positions: + lots = int(p.get("lots") or 0) + if lots <= 0: + continue + ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "") + if not ths: + continue + entry = float(p.get("avg_price") or 0) + if entry <= 0: + continue + direction = (p.get("direction") or "long").strip().lower() + mark = ctp_get_tick_price(mode, ths) + if not mark or mark <= 0: + continue + mult = float(get_contract_spec(ths).get("mult") or 10) + if direction == "long": + float_pnl = round((mark - entry) * mult * lots, 2) + else: + float_pnl = round((entry - mark) * mult * lots, 2) + row_key = _canonical_position_key( + ths, direction, (p.get("exchange") or ""), + ) + quotes.append({ + "key": row_key, + "position_key": row_key, + "mark_price": mark, + "current_price": mark, + "float_pnl": float_pnl, + }) + return {"ok": True, "quotes": quotes} + + def _push_position_quotes_async() -> None: + def _run() -> None: + try: + if not is_trading_session(): + return + mode = get_trading_mode(get_setting) + payload = _build_position_quotes_payload(mode) + if payload.get("quotes"): + position_hub.push_event("position_quotes", payload) + except Exception as exc: + logger.debug("push position quotes: %s", exc) + + threading.Thread(target=_run, daemon=True, name="position-quotes").start() + def _on_tick_sl_tp(exchange: str, symbol: str, price: float) -> None: from sl_tp_guard import check_sl_tp_on_tick from db_conn import DB_PATH, connect_db @@ -1651,6 +1707,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se set_position_refresh_callback( lambda: _push_position_snapshot_async(fast=True) ) + set_tick_quote_callback(_push_position_quotes_async) set_tick_sl_tp_callback(_on_tick_sl_tp) set_ctp_connected_callback(_on_ctp_connected) diff --git a/position_stream.py b/position_stream.py index 0653e96..dadc36e 100644 --- a/position_stream.py +++ b/position_stream.py @@ -50,8 +50,7 @@ class PositionStreamHub: self._snapshot = dict(data) self._snapshot_ts = time.time() - def broadcast(self, event: str, data: dict) -> None: - self.set_snapshot(data) + def _fanout(self, event: str, data: dict) -> None: msg = {"event": event, "data": data} with self._lock: subs = list(self._subs) @@ -68,6 +67,14 @@ class PositionStreamHub: except queue.Full: pass + def broadcast(self, event: str, data: dict) -> None: + self.set_snapshot(data) + self._fanout(event, data) + + def push_event(self, event: str, data: dict) -> None: + """SSE 推送,不覆盖 positions 全量快照。""" + self._fanout(event, data) + position_hub = PositionStreamHub() diff --git a/static/js/trade.js b/static/js/trade.js index a7f856f..1cba767 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -202,6 +202,44 @@ bindCancelOrderButtons(orderList); } + function findPosCardByKey(key) { + if (!list || !key) return null; + var cards = list.querySelectorAll('.pos-card[data-pos-key]'); + for (var i = 0; i < cards.length; i++) { + if (cards[i].getAttribute('data-pos-key') === key) return cards[i]; + } + return null; + } + + function applyPositionQuotes(data) { + if (!data || !data.quotes || !list) return; + data.quotes.forEach(function (q) { + var card = findPosCardByKey(q.key || q.position_key); + if (!card) return; + var markEl = card.querySelector('.pos-q-mark'); + var pnlEl = card.querySelector('.pos-q-pnl'); + var pnlWrap = card.querySelector('.pos-q-pnl-wrap'); + if (markEl && q.mark_price != null) markEl.textContent = fmtNum(q.mark_price); + if (pnlEl && q.float_pnl != null) { + var pnl = q.float_pnl; + pnlEl.textContent = (pnl >= 0 ? '+' : '') + fmtNum(pnl) + ' 元'; + if (pnlWrap) { + pnlWrap.classList.remove('pnl-pos', 'pnl-neg'); + if (pnl > 0) pnlWrap.classList.add('pnl-pos'); + else if (pnl < 0) pnlWrap.classList.add('pnl-neg'); + } + } + var closeBtn = card.querySelector('[data-close]'); + if (closeBtn && q.mark_price != null) { + try { + var payload = JSON.parse(decodeURIComponent(closeBtn.getAttribute('data-close'))); + payload.mark_price = q.mark_price; + closeBtn.setAttribute('data-close', encodeURIComponent(JSON.stringify(payload))); + } catch (e) { /* ignore */ } + } + }); + } + function applyPositionsData(data) { if (!data) return; var cap = document.getElementById('cap-display'); @@ -1115,8 +1153,9 @@ var feeLabel = row.fee_source === 'ctp' ? '已扣手续费(柜台)' : '已扣手续费'; var marginLabel = row.margin_source === 'ctp' ? '占用保证金(柜台)' : '占用保证金'; var openLabel = '开仓'; + var rowKey = row.key || row.position_key || ''; return ( - '
' + + '
' + '
' + posSymbolTitleHtml(row, ' ' + dirBadge + '') + '
' + '
' + posSymbolSubHtml(row) + '
' + @@ -1125,10 +1164,10 @@ '
' + '
' + row.lots + ' 手
' + '
' + fmtNum(row.entry_price) + '
' + - '
' + (row.current_price != null ? fmtNum(row.current_price) : '--') + '
' + + '
' + (row.current_price != null ? fmtNum(row.current_price) : '--') + '
' + '
' + (row.margin != null ? fmtNum(row.margin) + ' 元' : '--') + '
' + '
' + (row.position_pct != null ? fmtNum(row.position_pct) + '%' : '--') + '
' + - '
' + pnlText + '
' + + '
' + pnlText + '
' + '
' + (row.est_fee != null ? fmtNum(row.est_fee) + ' 元' : '--') + '
' + '
' + (openT || '--') + '
' + '
' + (row.holding_duration || '--') + '
' + @@ -1395,6 +1434,11 @@ applyPositionsData(JSON.parse(ev.data)); } catch (e) { /* ignore */ } }); + positionSource.addEventListener('position_quotes', function (ev) { + try { + applyPositionQuotes(JSON.parse(ev.data)); + } catch (e) { /* ignore */ } + }); positionSource.onerror = function () { if (positionSource) { positionSource.close(); diff --git a/vnpy_bridge.py b/vnpy_bridge.py index 5c8173e..5a855f9 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -75,9 +75,13 @@ def _load_persisted_last_error() -> str: _position_refresh_callback: Optional[Callable[[], None]] = None _tick_sl_tp_callback: Optional[Callable[[str, str, float], None]] = None +_tick_quote_callback: Optional[Callable[[], None]] = None _ctp_connected_callback: Optional[Callable[[str], None]] = None _position_refresh_debounce_lock = threading.Lock() _position_refresh_debounce_ts: float = 0.0 +_tick_quote_timer: Optional[threading.Timer] = None +_tick_quote_timer_lock = threading.Lock() +TICK_QUOTE_DEBOUNCE_SEC = 0.12 def set_position_refresh_callback(fn: Optional[Callable[[], None]]) -> None: @@ -91,6 +95,33 @@ def set_tick_sl_tp_callback(fn: Optional[Callable[[str, str, float], None]]) -> _tick_sl_tp_callback = fn +def set_tick_quote_callback(fn: Optional[Callable[[], None]]) -> None: + """注册 tick 回调:推送持仓现价/浮盈(由 bridge 侧防抖)。""" + global _tick_quote_callback + _tick_quote_callback = fn + + +def _fire_tick_quote_callback_debounced() -> None: + """持仓品种 tick 后 trailing 防抖,批量推送现价/浮盈。""" + global _tick_quote_timer + + def _run() -> None: + fn = _tick_quote_callback + if not fn: + return + try: + fn() + except Exception as exc: + logger.debug("tick quote callback: %s", exc) + + with _tick_quote_timer_lock: + if _tick_quote_timer is not None: + _tick_quote_timer.cancel() + _tick_quote_timer = threading.Timer(TICK_QUOTE_DEBOUNCE_SEC, _run) + _tick_quote_timer.daemon = True + _tick_quote_timer.start() + + def set_ctp_connected_callback(fn: Optional[Callable[[str], None]]) -> None: """CTP 交易通道登录成功后回调(mode=simulation|live)。""" global _ctp_connected_callback @@ -319,7 +350,7 @@ class CtpBridge: break except Exception as exc: logger.debug("position margin cache: %s", exc) - _fire_position_refresh_callback_debounced() + _fire_position_refresh_callback() self._ee.register(EVENT_POSITION, _on_position) self._position_hooked = True @@ -352,7 +383,7 @@ class CtpBridge: trading_state.upsert_order(row, notify=False) except Exception as exc: logger.debug("order event: %s", exc) - _fire_position_refresh_callback_debounced(min_interval=0.2) + _fire_position_refresh_callback() self._ee.register(EVENT_ORDER, _on_order) self._order_hooked = True @@ -377,7 +408,7 @@ class CtpBridge: self._position_open_times[self._position_margin_key(sym, pd)] = dt except Exception as exc: logger.debug("trade event: %s", exc) - _fire_position_refresh_callback_debounced(min_interval=0.2) + _fire_position_refresh_callback() self._ee.register(EVENT_TRADE, _on_trade) self._trade_hooked = True @@ -1256,6 +1287,7 @@ class CtpBridge: fn(ex_s, sym, float(price)) except Exception as exc: logger.debug("tick sl/tp callback: %s", exc) + _fire_tick_quote_callback_debounced() key = self._tick_key(sym, ex_s) bg = self._bar_generators.get(key) if not bg: