From 7daed9bd3a99e4ad9cdad4926d2f90f7e3720747 Mon Sep 17 00:00:00 2001 From: dekun Date: Thu, 25 Jun 2026 14:57:39 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E9=87=8D=E5=90=AF=E5=90=8E=E7=AB=8B?= =?UTF-8?q?=E5=8D=B3=E8=AF=BB=E5=BA=93=E5=B1=95=E7=A4=BA=E6=8C=81=E4=BB=93?= =?UTF-8?q?=EF=BC=8CCTP=E5=BC=82=E6=AD=A5=E9=87=8D=E8=BF=9E=E4=B8=8D?= =?UTF-8?q?=E5=86=8D=E9=98=BB=E5=A1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Cursor --- ctp_reconnect.py | 5 +-- install_trading.py | 100 ++++++++++++++++++++++++++++++++++----------- position_stream.py | 1 - static/js/trade.js | 5 +++ vnpy_bridge.py | 38 ++++++++++++----- 5 files changed, 111 insertions(+), 38 deletions(-) diff --git a/ctp_reconnect.py b/ctp_reconnect.py index 494588b..ddb7d33 100644 --- a/ctp_reconnect.py +++ b/ctp_reconnect.py @@ -11,7 +11,7 @@ from vnpy_bridge import ctp_try_auto_reconnect logger = logging.getLogger(__name__) -RECONNECT_INTERVAL_SEC = 30 +RECONNECT_INTERVAL_SEC = 10 def _auto_reconnect_enabled() -> bool: @@ -26,7 +26,6 @@ def start_ctp_reconnect_worker(*, get_mode_fn: Callable[[], str], interval: int """定时检测 CTP 连接,断线后自动重连。""" def _loop() -> None: - time.sleep(5) while True: try: if _auto_reconnect_enabled(): @@ -35,6 +34,6 @@ def start_ctp_reconnect_worker(*, get_mode_fn: Callable[[], str], interval: int logger.debug("CTP 连接正常 [%s]", mode) except Exception as exc: logger.warning("CTP reconnect worker: %s", exc) - time.sleep(max(15, interval)) + time.sleep(max(5, interval)) threading.Thread(target=_loop, daemon=True, name="ctp-reconnect-worker").start() diff --git a/install_trading.py b/install_trading.py index 132d65a..b2e95d3 100644 --- a/install_trading.py +++ b/install_trading.py @@ -78,6 +78,7 @@ from vnpy_bridge import ( ctp_status, execute_order, get_bridge, + set_position_refresh_callback, ) @@ -128,9 +129,18 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se except Exception: return {} - def _ctp_positions(mode: str, *, refresh_if_empty: bool = True) -> list: + def _ctp_positions( + mode: str, + *, + refresh_if_empty: bool = True, + refresh_margin: bool = False, + ) -> list: try: - return ctp_list_positions(mode, refresh_if_empty=refresh_if_empty) + return ctp_list_positions( + mode, + refresh_if_empty=refresh_if_empty, + refresh_margin=refresh_margin, + ) except Exception: return [] @@ -431,6 +441,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se mode: str, capital: float, now_iso: str, + fast: bool = False, ) -> Optional[dict]: if not mon and not ctp: return None @@ -463,14 +474,16 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se holding = _holding_duration(open_time, now_iso) if open_time else "" mark = None - if ctp_status(mode).get("connected"): + if not fast and ctp_status(mode).get("connected"): mark = ctp_get_tick_price(mode, sym) - if (mark is None or mark <= 0) and codes: + if not fast and (mark is None or mark <= 0) and codes: mark = fetch_price( sym, codes.get("market_code", ""), codes.get("sina_code", ""), ) + if mark is None or mark <= 0: + mark = entry if entry else None close_est = float(mark) if mark and mark > 0 else entry if float_pnl is None and mark and entry: pos_tmp = calc_position_metrics( @@ -564,7 +577,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "trailing_r_locked": int(mon.get("trailing_r_locked") or 0) if mon else 0, } - def _build_trading_live_rows(conn) -> list[dict]: + def _build_trading_live_rows(conn, *, fast: bool = False) -> list[dict]: from zoneinfo import ZoneInfo tz = ZoneInfo("Asia/Shanghai") now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M") @@ -583,7 +596,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if key not in monitor_by_key: monitor_by_key[key] = mon - ctp_list: list[dict] = _ctp_positions(mode) if ctp_status(mode).get("connected") else [] + ctp_list: list[dict] = ( + _ctp_positions(mode, refresh_if_empty=not fast, refresh_margin=not fast) + if ctp_status(mode).get("connected") else [] + ) ctp_by_key: dict[str, dict] = {} for p in ctp_list: if int(p.get("lots") or 0) <= 0: @@ -617,6 +633,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se try: row = _compose_position_row( conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso, + fast=fast, ) if row: rows.append(row) @@ -647,6 +664,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se try: row = _compose_position_row( conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso, + fast=fast, ) if row: rows.append(row) @@ -663,11 +681,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se deduped.append(row) return deduped - def _build_trading_live_payload(conn) -> dict: + def _build_trading_live_payload(conn, *, fast: bool = False) -> dict: mode = get_trading_mode(get_setting) ctp_st = ctp_status(mode) - _ensure_monitors_from_ctp(conn, mode) - rows = _build_trading_live_rows(conn) + if not fast: + _ensure_monitors_from_ctp(conn, mode) + rows = _build_trading_live_rows(conn, fast=fast) pending_orders = _build_pending_orders(conn, mode) capital = _capital(conn) risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode)) @@ -682,26 +701,54 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "trading_session": is_trading_session(), } - def _refresh_trading_live_snapshot() -> dict: + def _refresh_trading_live_snapshot(*, fast: bool = False) -> dict: + mode = get_trading_mode(get_setting) + if not fast and ctp_status(mode).get("connected"): + try: + get_bridge().refresh_positions() + except Exception as exc: + logger.debug("refresh positions before snapshot: %s", exc) conn = get_db() try: init_strategy_tables(conn) - payload = _build_trading_live_payload(conn) + payload = _build_trading_live_payload(conn, fast=fast) conn.commit() return payload finally: conn.close() - def _push_position_snapshot_async() -> None: + def _push_position_snapshot_async(*, fast: bool = False) -> None: def _run() -> None: try: - payload = _refresh_trading_live_snapshot() + payload = _refresh_trading_live_snapshot(fast=fast) position_hub.broadcast("positions", payload) except Exception as exc: logger.debug("push position snapshot: %s", exc) threading.Thread(target=_run, daemon=True).start() + def _bootstrap_trading_runtime() -> None: + """进程启动:立刻读库展示持仓,并异步连 CTP。""" + set_position_refresh_callback( + lambda: _push_position_snapshot_async(fast=False) + ) + + def _warm() -> None: + try: + payload = _refresh_trading_live_snapshot(fast=True) + position_hub.set_snapshot(payload) + position_hub.broadcast("positions", payload) + except Exception as exc: + logger.warning("bootstrap position snapshot: %s", exc) + + threading.Thread(target=_warm, daemon=True, name="position-bootstrap").start() + try: + from vnpy_bridge import ctp_start_connect + mode = get_trading_mode(get_setting) + ctp_start_connect(mode, force=False) + except Exception as exc: + logger.debug("bootstrap ctp connect: %s", exc) + @app.route("/trade") @login_required def trade_page(): @@ -788,7 +835,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se conn = get_db() try: init_strategy_tables(conn) - payload = _build_trading_live_payload(conn) + payload = _build_trading_live_payload(conn, fast=True) conn.commit() position_hub.set_snapshot(payload) return jsonify(payload) @@ -807,7 +854,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if snap: yield sse_format("positions", snap) else: - payload = _refresh_trading_live_snapshot() + payload = _refresh_trading_live_snapshot(fast=True) position_hub.set_snapshot(payload) yield sse_format("positions", payload) while True: @@ -1322,20 +1369,24 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "status": st, "account": acc, }) - try: - st = ctp_connect(mode, force=force) - acc = _ctp_account(mode) - return jsonify({"ok": True, "status": st, "account": acc}) - except Exception as exc: - st = ctp_status(mode) - return jsonify({"ok": False, "error": str(exc), "status": st}), 400 + return jsonify({ + "ok": False, + "error": st.get("last_error") or "CTP 连接未启动", + "status": st, + "account": acc, + }), 400 @app.route("/api/ctp/status") @login_required def api_ctp_status(): mode = get_trading_mode(get_setting) st = ctp_status(mode) - acc = _ctp_account(mode) if st.get("connected") else {} + acc = {} + if st.get("connected"): + try: + acc = _ctp_account(mode) + except Exception: + acc = {} return jsonify({"ok": True, "status": st, "account": acc}) @app.route("/api/account_snapshot") @@ -1777,9 +1828,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se interval=1, ) start_position_worker( - refresh_fn=_refresh_trading_live_snapshot, + refresh_fn=lambda: _refresh_trading_live_snapshot(fast=False), interval=1, ) + _bootstrap_trading_runtime() start_ctp_fee_worker( get_mode_fn=lambda: get_trading_mode(get_setting), get_setting_fn=get_setting, diff --git a/position_stream.py b/position_stream.py index a90f82d..e8561a1 100644 --- a/position_stream.py +++ b/position_stream.py @@ -76,7 +76,6 @@ def start_position_worker( """后台定时刷新持仓快照并 SSE 广播。""" def _loop() -> None: - time.sleep(3) while True: sleep_sec = idle_interval try: diff --git a/static/js/trade.js b/static/js/trade.js index 9f92559..7c6ac74 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -111,6 +111,10 @@ function savePosCache(data) { try { + if (!data || !data.rows || !data.rows.length) { + var prev = loadPosCache(); + if (prev && prev.rows && prev.rows.length) return; + } sessionStorage.setItem(POS_CACHE_KEY, JSON.stringify(data)); } catch (e) { /* quota */ } } @@ -947,6 +951,7 @@ } pollPositions(); connectPositionStream(); + requestCtpConnect(false); connectRecommendStream(); fetch('/api/recommend/list') .then(function (r) { return r.json(); }) diff --git a/vnpy_bridge.py b/vnpy_bridge.py index fa86b88..004f143 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -6,7 +6,7 @@ import os import threading import time from collections import deque -from typing import Any, Optional +from typing import Any, Callable, Optional from locale_fix import ensure_process_locale @@ -19,6 +19,23 @@ logger = logging.getLogger(__name__) GATEWAY_NAME = "CTP" +_position_refresh_callback: Optional[Callable[[], None]] = None + + +def set_position_refresh_callback(fn: Optional[Callable[[], None]]) -> None: + global _position_refresh_callback + _position_refresh_callback = fn + + +def _fire_position_refresh_callback() -> None: + fn = _position_refresh_callback + if not fn: + return + try: + threading.Thread(target=fn, daemon=True, name="ctp-position-refresh").start() + except Exception as exc: + logger.debug("position refresh callback: %s", exc) + _bridge: Optional["CtpBridge"] = None _bridge_lock = threading.Lock() @@ -231,6 +248,7 @@ class CtpBridge: self.refresh_positions() except Exception as exc: logger.debug("initial position query: %s", exc) + _fire_position_refresh_callback() return time.sleep(0.5) finally: @@ -796,7 +814,7 @@ class CtpBridge: except Exception as exc: logger.debug("refresh_positions: %s", exc) - def list_positions(self, *, refresh_if_empty: bool = True, refresh_margin: bool = True) -> list[dict[str, Any]]: + def list_positions(self, *, refresh_if_empty: bool = True, refresh_margin: bool = False) -> list[dict[str, Any]]: if self._engine and self._connected_mode and refresh_margin: self.refresh_positions() out = self._collect_positions() @@ -958,23 +976,23 @@ def ctp_start_connect(mode: str, *, force: bool = False) -> dict[str, Any]: def ctp_try_auto_reconnect(mode: str) -> bool: - """断线时静默重连;已连接且 ping 正常则直接返回 True。""" + """断线时静默异步重连;已连接且 ping 正常则直接返回 True。""" b = get_bridge() if not b.available(): return False if b.connect_in_progress(): - return False + return True st = _setting_for_mode(mode) if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"): return False if b.connected_mode == mode and b.ping(): return True - try: - b.connect(mode, force=False) - return b.connected_mode == mode - except Exception as exc: - logger.info("CTP 自动重连失败: %s", exc) - return False + info = b.start_connect_async(mode, force=False) + return bool( + info.get("connected") + or info.get("connecting") + or info.get("started") + ) def ctp_status(mode: str) -> dict[str, Any]: