diff --git a/modules/ctp/ctp_premarket_connect.py b/modules/ctp/ctp_premarket_connect.py index f3fec14..c6a05b3 100644 --- a/modules/ctp/ctp_premarket_connect.py +++ b/modules/ctp/ctp_premarket_connect.py @@ -18,7 +18,7 @@ from modules.market.market_sessions import ( is_trading_session, should_keep_ctp_connected, ) -from modules.ctp.vnpy_bridge import ctp_start_connect, ctp_status, ctp_session_needs_reconnect +from modules.ctp.vnpy_bridge import ctp_start_connect, ctp_status, ctp_session_needs_reconnect, get_bridge logger = logging.getLogger(__name__) @@ -114,6 +114,7 @@ def start_ctp_premarket_connect_worker( or is_trading_session() ) and ctp_session_needs_reconnect(mode) + and not get_bridge().connect_in_progress() ): info = ctp_start_connect(mode, force=True, scheduled=True) if info.get("started") or info.get("connecting"): diff --git a/modules/ctp/vnpy_bridge.py b/modules/ctp/vnpy_bridge.py index 0f30f14..080af5a 100644 --- a/modules/ctp/vnpy_bridge.py +++ b/modules/ctp/vnpy_bridge.py @@ -2904,27 +2904,27 @@ def ctp_start_connect(mode: str, *, force: bool = False, scheduled: bool = False def ctp_session_needs_reconnect(mode: str) -> bool: - """连接看似在线但不可交易、无资金回报或会话过旧时需强制重连。""" + """连接看似在线但不可交易或会话过旧时需强制重连(连接进行中不重复发起)。""" if _use_ctp_worker_client(): st = ctp_ipc_client.status(mode) + if st.get("connecting"): + return False if not st.get("connected"): return True if not st.get("td_logged_in"): return True return False b = get_bridge() + if b.connect_in_progress(): + return False if b.connected_mode != mode: return True + since_connect = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0) + if since_connect < 120: + return False if not b._td_logged_in(): return True - age = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0) - if age > 3 * 3600: - return True - try: - acc = b.get_account() - if float(acc.get("balance") or 0) <= 0: - return True - except Exception: + if since_connect > 3 * 3600: return True return False diff --git a/modules/trading/install.py b/modules/trading/install.py index 2dab018..c9a4e73 100644 --- a/modules/trading/install.py +++ b/modules/trading/install.py @@ -2955,13 +2955,26 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def recommend_page(): return redirect(url_for("positions") + "#recommend") + def _maybe_refresh_live_account(payload: dict, mode: str) -> dict: + """轻量刷新权益:快照较新时跳过,避免阻塞 HTTP 线程。""" + if not ctp_status(mode).get("connected"): + return payload + if payload.get("capital") and position_hub.snapshot_age_sec() < 3.0: + return payload + if not _live_refresh_lock.acquire(blocking=False): + return payload + try: + return _apply_live_account(dict(payload), mode) + finally: + _live_refresh_lock.release() + @app.route("/api/trading/live") @login_required def api_trading_live(): mode = get_trading_mode(get_setting) snap = position_hub.get_snapshot() if snap: - payload = _apply_live_account(dict(snap), mode) + payload = _maybe_refresh_live_account(dict(snap), mode) return jsonify(_normalize_live_payload(payload)) payload = _refresh_trading_live_snapshot(fast=True) payload = _normalize_live_payload(payload) @@ -2993,7 +3006,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se mode = get_trading_mode(get_setting) yield sse_format( "positions", - _apply_live_account(dict(snap), mode), + _maybe_refresh_live_account(dict(snap), mode), ) while True: try: diff --git a/modules/trading/position_stream.py b/modules/trading/position_stream.py index 192ef0a..ae9f6d2 100644 --- a/modules/trading/position_stream.py +++ b/modules/trading/position_stream.py @@ -45,6 +45,12 @@ class PositionStreamHub: with self._lock: return dict(self._snapshot) if self._snapshot else None + def snapshot_age_sec(self) -> float: + with self._lock: + if not self._snapshot_ts: + return 9999.0 + return max(0.0, time.time() - self._snapshot_ts) + def set_snapshot(self, data: dict) -> None: with self._lock: self._snapshot = dict(data) diff --git a/modules/web/static/js/trade.js b/modules/web/static/js/trade.js index 15dba86..82bf369 100644 --- a/modules/web/static/js/trade.js +++ b/modules/web/static/js/trade.js @@ -270,10 +270,10 @@ posFastPollCount = 0; posFastPollTimer = setInterval(function () { pollPositions(); - refreshCtpAccountQuick(); + if (posFastPollCount % 3 === 0) refreshCtpAccountQuick(); posFastPollCount += 1; if (posFastPollCount >= posFastPollMax) stopPosFastPoll(); - }, 1000); + }, 2000); } function refreshCtpAccountQuick() { @@ -328,8 +328,8 @@ if (connected) { showCtpError(''); if (data.sync_state === 'syncing') { - posFastPollMax = Math.max(posFastPollMax, 240); - if (!posFastPollTimer) startPosFastPoll(240); + posFastPollMax = Math.max(posFastPollMax, 90); + if (!posFastPollTimer) startPosFastPoll(90); } } else if (!connecting && data.ctp_status && data.ctp_status.last_error) { showCtpError(data.ctp_status.last_error); @@ -714,7 +714,7 @@ showCtpError(''); refreshCtpAccountQuick(); pollPositions(); - startPosFastPoll(180); + startPosFastPoll(60); return d; } if ((st.login_cooldown_sec || 0) > 0 || d.cooldown) { @@ -724,7 +724,7 @@ } if (d.connecting || st.connecting) { updateCtpBadge(false, true); - startPosFastPoll(180); + startPosFastPoll(60); return waitForCtpConnected(70000).then(function (ok) { if (!ok && d.error) showCtpError(d.error); else if (!ok && st.last_error) showCtpError(st.last_error);