diff --git a/modules/ctp/ctp_premarket_connect.py b/modules/ctp/ctp_premarket_connect.py index 94cb381..f3fec14 100644 --- a/modules/ctp/ctp_premarket_connect.py +++ b/modules/ctp/ctp_premarket_connect.py @@ -18,7 +18,7 @@ from modules.market.market_sessions import ( is_trading_session, should_keep_ctp_connected, ) -from modules.ctp.vnpy_bridge import ctp_start_connect, ctp_status +from modules.ctp.vnpy_bridge import ctp_start_connect, ctp_status, ctp_session_needs_reconnect logger = logging.getLogger(__name__) @@ -105,6 +105,19 @@ def start_ctp_premarket_connect_worker( mode, mins_b, ) + elif ( + st.get("connected") + and not st.get("connecting") + and int(st.get("login_cooldown_sec") or 0) <= 0 + and ( + in_premarket_connect_window(minutes_before=mins_b) + or is_trading_session() + ) + and ctp_session_needs_reconnect(mode) + ): + info = ctp_start_connect(mode, force=True, scheduled=True) + if info.get("started") or info.get("connecting"): + logger.info("CTP 会话刷新(盘前/交易时段)[%s]", mode) if is_trading_session(): sleep_sec = TRADING_CHECK_INTERVAL_SEC elif in_premarket_connect_window(minutes_before=mins_b): diff --git a/modules/ctp/vnpy_bridge.py b/modules/ctp/vnpy_bridge.py index 3c6426b..0f30f14 100644 --- a/modules/ctp/vnpy_bridge.py +++ b/modules/ctp/vnpy_bridge.py @@ -173,7 +173,7 @@ def _fire_position_refresh_callback_debounced(*, min_interval: float = 0.35) -> def _fire_position_refresh_burst() -> None: """连接后持仓回报可能分批到达,分多次触发快照刷新。""" _fire_position_refresh_callback() - for delay in (0.4, 0.9, 1.5, 3.0, 6.0, 12.0, 20.0): + for delay in (0.3, 0.7, 1.2, 2.5, 5.0, 10.0): threading.Timer(delay, _fire_position_refresh_callback).start() @@ -228,7 +228,7 @@ _bridge: Optional["CtpBridge"] = None _bridge_lock = threading.Lock() _ctp_td_lock = threading.RLock() POSITION_QUERY_MIN_INTERVAL_SEC = 5.0 -POSITION_QUERY_RETRY_DELAYS_SEC = (1.5, 4.0, 9.0, 18.0, 35.0) +POSITION_QUERY_RETRY_DELAYS_SEC = (0.8, 2.0, 4.0, 8.0, 15.0) TRADE_QUERY_MIN_INTERVAL_SEC = 10.0 @@ -783,6 +783,7 @@ class CtpBridge: "connected": self._connected_mode == mode, "connecting": connecting, "connected_mode": self._connected_mode, + "td_logged_in": self._td_logged_in() if self._connected_mode == mode else False, "mode_label": _mode_label(mode), "missing_config": missing, "last_error": last_error, @@ -1375,11 +1376,13 @@ class CtpBridge: return False if self._td_logged_in(): return True - try: - if self._engine.get_all_accounts(): - return True - except Exception as exc: - logger.debug("CTP ping failed: %s", exc) + age = time.time() - float(getattr(self, "_last_connect_ok_ts", 0) or 0) + if age < 90: + try: + if self._engine.get_all_accounts(): + return True + except Exception as exc: + logger.debug("CTP ping failed: %s", exc) self._connected_mode = None return False @@ -2900,10 +2903,39 @@ def ctp_start_connect(mode: str, *, force: bool = False, scheduled: bool = False return {**info, "status": st} +def ctp_session_needs_reconnect(mode: str) -> bool: + """连接看似在线但不可交易、无资金回报或会话过旧时需强制重连。""" + if _use_ctp_worker_client(): + st = ctp_ipc_client.status(mode) + if not st.get("connected"): + return True + if not st.get("td_logged_in"): + return True + return False + b = get_bridge() + if b.connected_mode != mode: + return True + if not b._td_logged_in(): + return True + age = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0) + if age > 3 * 3600: + return True + try: + acc = b.get_account() + if float(acc.get("balance") or 0) <= 0: + return True + except Exception: + return True + return False + + def ctp_try_auto_reconnect(mode: str) -> bool: """断线时静默异步重连;已连接且交易通道正常则不再重复 connect。""" if _use_ctp_worker_client(): - info = ctp_ipc_client.start_connect(mode, force=False, scheduled=True) + if ctp_session_needs_reconnect(mode): + info = ctp_ipc_client.start_connect(mode, force=True, scheduled=True) + else: + info = ctp_ipc_client.start_connect(mode, force=False, scheduled=True) return bool( info.get("connected") or info.get("connecting") @@ -2921,13 +2953,15 @@ def ctp_try_auto_reconnect(mode: str) -> bool: st = _setting_for_mode(mode) if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"): return False + force = False if b.connected_mode == mode: - if b._td_logged_in() or b.ping(): + if not ctp_session_needs_reconnect(mode): return True recent = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0) - if recent < 120: - logger.debug("CTP 跳过自动重连:刚连接 %.0fs", recent) + if recent < 45 and b._td_logged_in(): return True + force = True + logger.info("CTP 会话失效或过旧,强制重连 [%s]", mode) td = st.get("交易服务器", "") ok, err = probe_tcp_address(td, timeout=4.0) if not ok: @@ -2936,7 +2970,7 @@ def ctp_try_auto_reconnect(mode: str) -> bool: "请更新 SIMNOW_TD_ADDRESS 并确认服务器出网。" ) return False - info = b.start_connect_async(mode, force=False, scheduled=True) + info = b.start_connect_async(mode, force=force, scheduled=True) return bool( info.get("connected") or info.get("connecting") diff --git a/modules/trading/install.py b/modules/trading/install.py index a17e6a3..2dab018 100644 --- a/modules/trading/install.py +++ b/modules/trading/install.py @@ -2583,7 +2583,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se since_connect = time.time() - float( getattr(get_bridge(), "_last_connect_ok_ts", 0) or 0, ) - if since_connect < 45: + if since_connect < 90: payload = dict(payload) payload["sync_state"] = "syncing" payload["sync_label"] = "持仓同步中…" @@ -2771,7 +2771,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se position_hub.broadcast("positions", payload) def _slow_sync() -> None: - time.sleep(20) + time.sleep(8) try: pl = _refresh_trading_live_snapshot(fast=False) position_hub.set_snapshot(pl) @@ -2828,13 +2828,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se conn.close() def _slow_sync() -> None: - time.sleep(8) + time.sleep(3) try: _push_position_snapshot_async(fast=False) except Exception as exc: logger.debug("ctp connected slow sync: %s", exc) threading.Thread(target=_slow_sync, daemon=True, name="ctp-slow-sync").start() + threading.Timer( + 1.2, + lambda: _push_position_snapshot_async(fast=False), + ).start() except Exception as exc: logger.debug("ctp connected monitor restore: %s", exc) @@ -2954,9 +2958,11 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se @app.route("/api/trading/live") @login_required def api_trading_live(): + mode = get_trading_mode(get_setting) snap = position_hub.get_snapshot() if snap: - return jsonify(_normalize_live_payload(snap)) + payload = _apply_live_account(dict(snap), mode) + return jsonify(_normalize_live_payload(payload)) payload = _refresh_trading_live_snapshot(fast=True) payload = _normalize_live_payload(payload) position_hub.set_snapshot(payload) @@ -2984,7 +2990,11 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se yield sse_format("positions", payload) _push_position_snapshot_async(fast=True) else: - yield sse_format("positions", snap) + mode = get_trading_mode(get_setting) + yield sse_format( + "positions", + _apply_live_account(dict(snap), mode), + ) while True: try: msg = q.get(timeout=25) @@ -5045,7 +5055,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se since_connect = now - float( getattr(get_bridge(), "_last_connect_ok_ts", 0) or 0, ) - if connected and since_connect < 45: + if connected and since_connect < 90: return _refresh_trading_live_snapshot(fast=True) need_full = ( connected diff --git a/modules/web/static/js/trade.js b/modules/web/static/js/trade.js index 908e25c..15dba86 100644 --- a/modules/web/static/js/trade.js +++ b/modules/web/static/js/trade.js @@ -37,6 +37,7 @@ var positionsRendered = false; var posFastPollTimer = null; var posFastPollCount = 0; + var posFastPollMax = 120; var lastPosRowCount = 0; var selectedMaxLots = null; var recommendMaxByProduct = {}; @@ -259,16 +260,40 @@ posFastPollCount = 0; } - function startPosFastPoll() { + function startPosFastPoll(maxTicks) { + if (typeof maxTicks === 'number' && maxTicks > 0) { + posFastPollMax = maxTicks; + } else { + posFastPollMax = 120; + } if (posFastPollTimer) return; posFastPollCount = 0; posFastPollTimer = setInterval(function () { pollPositions(); + refreshCtpAccountQuick(); posFastPollCount += 1; - if (posFastPollCount >= 120) stopPosFastPoll(); + if (posFastPollCount >= posFastPollMax) stopPosFastPoll(); }, 1000); } + function refreshCtpAccountQuick() { + if (!ctpConnected && !ctpConnecting) return; + fetch('/api/ctp/status') + .then(function (r) { return r.json(); }) + .then(function (d) { + if (!d || !d.account) return; + var cap = document.getElementById('cap-display'); + if (cap && d.account.balance != null) { + cap.textContent = Number(d.account.balance).toFixed(2); + } + var avail = document.getElementById('avail-display'); + if (avail && d.account.available != null) { + avail.textContent = Number(d.account.available).toFixed(2); + } + }) + .catch(function () {}); + } + function applyPositionsData(data) { if (!data) return; var cap = document.getElementById('cap-display'); @@ -302,6 +327,10 @@ } if (connected) { showCtpError(''); + if (data.sync_state === 'syncing') { + posFastPollMax = Math.max(posFastPollMax, 240); + if (!posFastPollTimer) startPosFastPoll(240); + } } else if (!connecting && data.ctp_status && data.ctp_status.last_error) { showCtpError(data.ctp_status.last_error); if (isCtpLoginBanError(data.ctp_status.last_error)) { @@ -683,7 +712,9 @@ if (st.connected) { syncCtpBadgeFromStatus(st); showCtpError(''); + refreshCtpAccountQuick(); pollPositions(); + startPosFastPoll(180); return d; } if ((st.login_cooldown_sec || 0) > 0 || d.cooldown) { @@ -693,6 +724,7 @@ } if (d.connecting || st.connecting) { updateCtpBadge(false, true); + startPosFastPoll(180); return waitForCtpConnected(70000).then(function (ok) { if (!ok && d.error) showCtpError(d.error); else if (!ok && st.last_error) showCtpError(st.last_error);