From 049aaffdcf3779db0607e570c55d7f6dd9f36155 Mon Sep 17 00:00:00 2001 From: dekun Date: Wed, 24 Jun 2026 13:49:25 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20CTP=E8=BF=9E=E6=8E=A5=E6=94=B9=E5=90=8E?= =?UTF-8?q?=E5=8F=B0=E5=BC=82=E6=AD=A5=EF=BC=8C=E9=81=BF=E5=85=8D=E5=A4=9A?= =?UTF-8?q?=E8=B7=AF=E9=87=8D=E8=BF=9E=E4=BA=92=E7=9B=B8=E9=98=BB=E5=A1=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Cursor --- app.py | 2 +- install_trading.py | 17 +++++- static/js/trade.js | 133 +++++++++++++++++++++++++++++++-------------- vnpy_bridge.py | 130 ++++++++++++++++++++++++++++++-------------- 4 files changed, 198 insertions(+), 84 deletions(-) diff --git a/app.py b/app.py index 0c57700..74cc604 100644 --- a/app.py +++ b/app.py @@ -1682,4 +1682,4 @@ start_background_threads() # —————————————— 启动 —————————————— if __name__ == "__main__": - app.run(host=HOST, port=PORT, debug=DEBUG) + app.run(host=HOST, port=PORT, debug=DEBUG, threaded=True) diff --git a/install_trading.py b/install_trading.py index 4d5be0a..218869e 100644 --- a/install_trading.py +++ b/install_trading.py @@ -586,8 +586,23 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se @app.route("/api/ctp/connect", methods=["POST"]) @login_required def api_ctp_connect(): + from vnpy_bridge import ctp_start_connect + mode = get_trading_mode(get_setting) - force = bool((request.get_json(silent=True) or {}).get("force")) + body = request.get_json(silent=True) or {} + force = bool(body.get("force")) + info = ctp_start_connect(mode, force=force) + st = info.get("status") or ctp_status(mode) + acc = _ctp_account(mode) if st.get("connected") else {} + if st.get("connected"): + return jsonify({"ok": True, "status": st, "account": acc}) + if info.get("connecting") or info.get("started"): + return jsonify({ + "ok": True, + "connecting": True, + "status": st, + "account": acc, + }) try: st = ctp_connect(mode, force=force) acc = _ctp_account(mode) diff --git a/static/js/trade.js b/static/js/trade.js index d718d9f..baac372 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -69,18 +69,95 @@ if (marketHint) marketHint.hidden = priceType !== 'market'; } - function updateCtpBadge(connected) { + function updateCtpBadge(connected, connecting) { var ctpBadge = document.getElementById('ctp-badge'); var btnConnect = document.getElementById('btn-ctp-connect'); if (ctpBadge) { - ctpBadge.textContent = connected ? 'CTP 已连接' : 'CTP 未连接'; - ctpBadge.className = 'badge ' + (connected ? 'profit' : 'planned'); + if (connecting) { + ctpBadge.textContent = 'CTP 连接中'; + ctpBadge.className = 'badge planned'; + } else { + ctpBadge.textContent = connected ? 'CTP 已连接' : 'CTP 未连接'; + ctpBadge.className = 'badge ' + (connected ? 'profit' : 'planned'); + } } - if (btnConnect && connected) { - btnConnect.textContent = '重连 CTP'; + if (btnConnect) { + if (connecting) { + btnConnect.textContent = '连接中…'; + btnConnect.disabled = true; + } else { + btnConnect.disabled = false; + btnConnect.textContent = connected ? '重连 CTP' : '连接 CTP'; + } } } + function waitForCtpConnected(maxMs) { + var deadline = Date.now() + (maxMs || 35000); + function tick() { + return fetch('/api/ctp/status') + .then(function (r) { return r.json(); }) + .then(function (d) { + var st = d.status || {}; + if (st.connected) { + updateCtpBadge(true, false); + if (d.account && d.account.available != null) { + var avail = document.getElementById('avail-display'); + if (avail) avail.textContent = Number(d.account.available).toFixed(2); + } + pollPositions(); + return true; + } + if (st.connecting && Date.now() < deadline) { + updateCtpBadge(false, true); + return new Promise(function (resolve) { + setTimeout(function () { resolve(tick()); }, 2000); + }); + } + updateCtpBadge(false, false); + if (st.last_error) { + var hint = document.querySelector('.ctp-install-hint'); + if (hint) hint.textContent = st.last_error; + } + return false; + }) + .catch(function () { updateCtpBadge(false, false); return false; }); + } + return tick(); + } + + function requestCtpConnect(force) { + updateCtpBadge(false, true); + return fetch('/api/ctp/connect', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ force: !!force, auto: !force }) + }) + .then(function (r) { return r.json(); }) + .then(function (d) { + if (d.status && d.status.connected) { + updateCtpBadge(true, false); + pollPositions(); + return d; + } + if (d.connecting || (d.status && d.status.connecting)) { + return waitForCtpConnected(35000).then(function (ok) { + if (!ok && d.error) alert(d.error); + else if (!ok && d.status && d.status.last_error) alert(d.status.last_error); + return d; + }); + } + if (!d.ok) { + updateCtpBadge(false, false); + alert(d.error || (d.status && d.status.last_error) || '连接失败'); + } + return d; + }) + .catch(function () { + updateCtpBadge(false, false); + }); + } + function refreshQuote() { var sym = selectedSymbol(); var lots = isRiskMode() ? (effectiveLots() || 1) : (lotsInput ? lotsInput.value : '1'); @@ -155,29 +232,12 @@ function tryAutoCtpReconnect() { if (ctpReconnecting) return; var now = Date.now(); - if (now - lastCtpReconnectAt < 30000) return; + if (now - lastCtpReconnectAt < 60000) return; lastCtpReconnectAt = now; ctpReconnecting = true; - fetch('/api/ctp/connect', { - method: 'POST', - headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ auto: true }) - }) - .then(function (r) { return r.json(); }) - .then(function (d) { - if (d.ok && d.status && d.status.connected) { - updateCtpBadge(true); - var avail = document.getElementById('avail-display'); - if (avail && d.account && d.account.available != null) { - avail.textContent = Number(d.account.available).toFixed(2); - } - pollPositions(); - } - }) - .catch(function () { /* ignore */ }) - .finally(function () { - ctpReconnecting = false; - }); + requestCtpConnect(false).finally(function () { + ctpReconnecting = false; + }); } function showOrderMsg(text, ok) { @@ -342,9 +402,14 @@ var cap = document.getElementById('cap-display'); if (cap && data.capital != null) cap.textContent = Number(data.capital).toFixed(2); var connected = data.ctp_status && data.ctp_status.connected; - updateCtpBadge(!!connected); + var connecting = data.ctp_status && data.ctp_status.connecting; + updateCtpBadge(!!connected, !!connecting); var rows = data.rows || []; if (!connected) { + if (connecting) { + list.innerHTML = '
CTP 连接中,请稍候…
'; + return; + } list.innerHTML = '
CTP 未连接,正在尝试自动重连…
'; tryAutoCtpReconnect(); return; @@ -446,19 +511,7 @@ var btnConnect = document.getElementById('btn-ctp-connect'); if (btnConnect) { btnConnect.addEventListener('click', function () { - btnConnect.disabled = true; - btnConnect.textContent = '连接中…'; - fetch('/api/ctp/connect', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: '{}' }) - .then(function (r) { return r.json(); }) - .then(function (d) { - if (!d.ok) { alert(d.error || '连接失败'); return; } - updateCtpBadge(true); - pollPositions(); - }) - .finally(function () { - btnConnect.disabled = false; - btnConnect.textContent = '重连 CTP'; - }); + requestCtpConnect(true); }); } diff --git a/vnpy_bridge.py b/vnpy_bridge.py index 627acd6..af9b694 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -87,6 +87,7 @@ class CtpBridge: self._connected_mode: Optional[str] = None self._last_error: str = "" self._connect_lock = threading.Lock() + self._connect_in_progress = False self._commission_waiters: dict[int, threading.Event] = {} self._commission_lists: dict[int, list] = {} self._commission_hooked = False @@ -122,6 +123,9 @@ class CtpBridge: def connected_mode(self) -> Optional[str]: return self._connected_mode + def connect_in_progress(self) -> bool: + return self._connect_in_progress + def status(self, mode: str) -> dict[str, Any]: if self._connected_mode == mode: self.ping() @@ -130,6 +134,7 @@ class CtpBridge: return { "vnpy_installed": self.available(), "connected": self._connected_mode == mode, + "connecting": self._connect_in_progress, "connected_mode": self._connected_mode, "mode_label": _mode_label(mode), "missing_config": missing, @@ -139,6 +144,8 @@ class CtpBridge: } def connect(self, mode: str, *, force: bool = False) -> None: + if self._connect_in_progress: + raise RuntimeError("CTP 正在连接中,请稍候") if not self._engine: raise RuntimeError(self._last_error or "vnpy 引擎未初始化") if self._connected_mode == mode and not force: @@ -154,53 +161,81 @@ class CtpBridge: if not setting.get("交易服务器"): raise ValueError(f"{_mode_label(mode)}:未配置交易服务器地址") - with self._connect_lock: - if self._connected_mode and self._connected_mode != mode: + self._connect_in_progress = True + try: + with self._connect_lock: + if force and self._connected_mode: + try: + gw = self._engine.get_gateway(GATEWAY_NAME) + if gw: + gw.close() + except Exception: + pass + self._connected_mode = None + time.sleep(0.8) + elif self._connected_mode and self._connected_mode != mode: + try: + self._engine.close() + except Exception: + pass + self._connected_mode = None + time.sleep(1) + + ctp_logs: list[str] = [] + from vnpy.trader.event import EVENT_LOG + + def _on_log(event) -> None: + msg = getattr(event.data, "msg", "") or str(event.data) + if msg: + ctp_logs.append(str(msg)) + if len(ctp_logs) > 20: + ctp_logs.pop(0) + logger.info("CTP | %s", msg) + + self._ee.register(EVENT_LOG, _on_log) try: - self._engine.close() - except Exception: - pass - self._connected_mode = None - time.sleep(1) + ensure_process_locale() + logger.info( + "CTP 连接 [%s] user=%s td=%s env=%s", + mode, + setting.get("用户名"), + setting.get("交易服务器"), + setting.get("柜台环境", "实盘"), + ) + self._engine.connect(setting, GATEWAY_NAME) + for _ in range(60): + accounts = self._engine.get_all_accounts() + if accounts: + self._connected_mode = mode + self._last_error = "" + logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts)) + self._schedule_fee_sync(mode) + return + time.sleep(0.5) + finally: + self._ee.unregister(EVENT_LOG, _on_log) - ctp_logs: list[str] = [] - from vnpy.trader.event import EVENT_LOG + hint = _format_ctp_failure(ctp_logs) + self._last_error = hint + raise RuntimeError(hint) + finally: + self._connect_in_progress = False - def _on_log(event) -> None: - msg = getattr(event.data, "msg", "") or str(event.data) - if msg: - ctp_logs.append(str(msg)) - if len(ctp_logs) > 20: - ctp_logs.pop(0) - logger.info("CTP | %s", msg) + def start_connect_async(self, mode: str, *, force: bool = False) -> dict[str, Any]: + """后台连接,不阻塞 HTTP 请求。""" + if self._connected_mode == mode and self.ping() and not force: + return {"started": False, "connecting": False, "connected": True} + if self._connect_in_progress: + return {"started": False, "connecting": True, "connected": False} - self._ee.register(EVENT_LOG, _on_log) + def _run() -> None: try: - ensure_process_locale() - logger.info( - "CTP 连接 [%s] user=%s td=%s env=%s", - mode, - setting.get("用户名"), - setting.get("交易服务器"), - setting.get("柜台环境", "实盘"), - ) - self._engine.connect(setting, GATEWAY_NAME) - # 等待登录与结算信息(最多约 30 秒) - for _ in range(60): - accounts = self._engine.get_all_accounts() - if accounts: - self._connected_mode = mode - self._last_error = "" - logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts)) - self._schedule_fee_sync(mode) - return - time.sleep(0.5) - finally: - self._ee.unregister(EVENT_LOG, _on_log) + self.connect(mode, force=force) + except Exception as exc: + logger.warning("CTP 后台连接失败: %s", exc) - hint = _format_ctp_failure(ctp_logs) - self._last_error = hint - raise RuntimeError(hint) + threading.Thread(target=_run, daemon=True, name="ctp-connect-async").start() + return {"started": True, "connecting": True, "connected": False} def ensure_connected(self, mode: str) -> None: if self._connected_mode == mode and self.ping(): @@ -226,6 +261,7 @@ class CtpBridge: """连接成功后触发每日同步检查(非每次全量)。""" def _run() -> None: + time.sleep(45) try: from ctp_fee_worker import try_daily_ctp_fee_sync @@ -708,18 +744,28 @@ def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]: return b.status(mode) +def ctp_start_connect(mode: str, *, force: bool = False) -> dict[str, Any]: + """非阻塞发起连接,供 Web API 使用。""" + b = get_bridge() + info = b.start_connect_async(mode, force=force) + st = b.status(mode) + return {**info, "status": st} + + def ctp_try_auto_reconnect(mode: str) -> bool: """断线时静默重连;已连接且 ping 正常则直接返回 True。""" b = get_bridge() if not b.available(): return False + if b.connect_in_progress(): + return False st = _setting_for_mode(mode) if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"): return False if b.connected_mode == mode and b.ping(): return True try: - b.connect(mode, force=True) + b.connect(mode, force=False) return b.connected_mode == mode except Exception as exc: logger.info("CTP 自动重连失败: %s", exc)