diff --git a/app.py b/app.py
index 0c57700..74cc604 100644
--- a/app.py
+++ b/app.py
@@ -1682,4 +1682,4 @@ start_background_threads()
# —————————————— 启动 ——————————————
if __name__ == "__main__":
- app.run(host=HOST, port=PORT, debug=DEBUG)
+ app.run(host=HOST, port=PORT, debug=DEBUG, threaded=True)
diff --git a/install_trading.py b/install_trading.py
index 4d5be0a..218869e 100644
--- a/install_trading.py
+++ b/install_trading.py
@@ -586,8 +586,23 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
@app.route("/api/ctp/connect", methods=["POST"])
@login_required
def api_ctp_connect():
+ from vnpy_bridge import ctp_start_connect
+
mode = get_trading_mode(get_setting)
- force = bool((request.get_json(silent=True) or {}).get("force"))
+ body = request.get_json(silent=True) or {}
+ force = bool(body.get("force"))
+ info = ctp_start_connect(mode, force=force)
+ st = info.get("status") or ctp_status(mode)
+ acc = _ctp_account(mode) if st.get("connected") else {}
+ if st.get("connected"):
+ return jsonify({"ok": True, "status": st, "account": acc})
+ if info.get("connecting") or info.get("started"):
+ return jsonify({
+ "ok": True,
+ "connecting": True,
+ "status": st,
+ "account": acc,
+ })
try:
st = ctp_connect(mode, force=force)
acc = _ctp_account(mode)
diff --git a/static/js/trade.js b/static/js/trade.js
index d718d9f..baac372 100644
--- a/static/js/trade.js
+++ b/static/js/trade.js
@@ -69,18 +69,95 @@
if (marketHint) marketHint.hidden = priceType !== 'market';
}
- function updateCtpBadge(connected) {
+ function updateCtpBadge(connected, connecting) {
var ctpBadge = document.getElementById('ctp-badge');
var btnConnect = document.getElementById('btn-ctp-connect');
if (ctpBadge) {
- ctpBadge.textContent = connected ? 'CTP 已连接' : 'CTP 未连接';
- ctpBadge.className = 'badge ' + (connected ? 'profit' : 'planned');
+ if (connecting) {
+ ctpBadge.textContent = 'CTP 连接中';
+ ctpBadge.className = 'badge planned';
+ } else {
+ ctpBadge.textContent = connected ? 'CTP 已连接' : 'CTP 未连接';
+ ctpBadge.className = 'badge ' + (connected ? 'profit' : 'planned');
+ }
}
- if (btnConnect && connected) {
- btnConnect.textContent = '重连 CTP';
+ if (btnConnect) {
+ if (connecting) {
+ btnConnect.textContent = '连接中…';
+ btnConnect.disabled = true;
+ } else {
+ btnConnect.disabled = false;
+ btnConnect.textContent = connected ? '重连 CTP' : '连接 CTP';
+ }
}
}
+ function waitForCtpConnected(maxMs) {
+ var deadline = Date.now() + (maxMs || 35000);
+ function tick() {
+ return fetch('/api/ctp/status')
+ .then(function (r) { return r.json(); })
+ .then(function (d) {
+ var st = d.status || {};
+ if (st.connected) {
+ updateCtpBadge(true, false);
+ if (d.account && d.account.available != null) {
+ var avail = document.getElementById('avail-display');
+ if (avail) avail.textContent = Number(d.account.available).toFixed(2);
+ }
+ pollPositions();
+ return true;
+ }
+ if (st.connecting && Date.now() < deadline) {
+ updateCtpBadge(false, true);
+ return new Promise(function (resolve) {
+ setTimeout(function () { resolve(tick()); }, 2000);
+ });
+ }
+ updateCtpBadge(false, false);
+ if (st.last_error) {
+ var hint = document.querySelector('.ctp-install-hint');
+ if (hint) hint.textContent = st.last_error;
+ }
+ return false;
+ })
+ .catch(function () { updateCtpBadge(false, false); return false; });
+ }
+ return tick();
+ }
+
+ function requestCtpConnect(force) {
+ updateCtpBadge(false, true);
+ return fetch('/api/ctp/connect', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ force: !!force, auto: !force })
+ })
+ .then(function (r) { return r.json(); })
+ .then(function (d) {
+ if (d.status && d.status.connected) {
+ updateCtpBadge(true, false);
+ pollPositions();
+ return d;
+ }
+ if (d.connecting || (d.status && d.status.connecting)) {
+ return waitForCtpConnected(35000).then(function (ok) {
+ if (!ok && d.error) alert(d.error);
+ else if (!ok && d.status && d.status.last_error) alert(d.status.last_error);
+ return d;
+ });
+ }
+ if (!d.ok) {
+ updateCtpBadge(false, false);
+ alert(d.error || (d.status && d.status.last_error) || '连接失败');
+ }
+ return d;
+ })
+ .catch(function () {
+ updateCtpBadge(false, false);
+ });
+ }
+
function refreshQuote() {
var sym = selectedSymbol();
var lots = isRiskMode() ? (effectiveLots() || 1) : (lotsInput ? lotsInput.value : '1');
@@ -155,29 +232,12 @@
function tryAutoCtpReconnect() {
if (ctpReconnecting) return;
var now = Date.now();
- if (now - lastCtpReconnectAt < 30000) return;
+ if (now - lastCtpReconnectAt < 60000) return;
lastCtpReconnectAt = now;
ctpReconnecting = true;
- fetch('/api/ctp/connect', {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({ auto: true })
- })
- .then(function (r) { return r.json(); })
- .then(function (d) {
- if (d.ok && d.status && d.status.connected) {
- updateCtpBadge(true);
- var avail = document.getElementById('avail-display');
- if (avail && d.account && d.account.available != null) {
- avail.textContent = Number(d.account.available).toFixed(2);
- }
- pollPositions();
- }
- })
- .catch(function () { /* ignore */ })
- .finally(function () {
- ctpReconnecting = false;
- });
+ requestCtpConnect(false).finally(function () {
+ ctpReconnecting = false;
+ });
}
function showOrderMsg(text, ok) {
@@ -342,9 +402,14 @@
var cap = document.getElementById('cap-display');
if (cap && data.capital != null) cap.textContent = Number(data.capital).toFixed(2);
var connected = data.ctp_status && data.ctp_status.connected;
- updateCtpBadge(!!connected);
+ var connecting = data.ctp_status && data.ctp_status.connecting;
+ updateCtpBadge(!!connected, !!connecting);
var rows = data.rows || [];
if (!connected) {
+ if (connecting) {
+ list.innerHTML = '
CTP 连接中,请稍候…
';
+ return;
+ }
list.innerHTML = 'CTP 未连接,正在尝试自动重连…
';
tryAutoCtpReconnect();
return;
@@ -446,19 +511,7 @@
var btnConnect = document.getElementById('btn-ctp-connect');
if (btnConnect) {
btnConnect.addEventListener('click', function () {
- btnConnect.disabled = true;
- btnConnect.textContent = '连接中…';
- fetch('/api/ctp/connect', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: '{}' })
- .then(function (r) { return r.json(); })
- .then(function (d) {
- if (!d.ok) { alert(d.error || '连接失败'); return; }
- updateCtpBadge(true);
- pollPositions();
- })
- .finally(function () {
- btnConnect.disabled = false;
- btnConnect.textContent = '重连 CTP';
- });
+ requestCtpConnect(true);
});
}
diff --git a/vnpy_bridge.py b/vnpy_bridge.py
index 627acd6..af9b694 100644
--- a/vnpy_bridge.py
+++ b/vnpy_bridge.py
@@ -87,6 +87,7 @@ class CtpBridge:
self._connected_mode: Optional[str] = None
self._last_error: str = ""
self._connect_lock = threading.Lock()
+ self._connect_in_progress = False
self._commission_waiters: dict[int, threading.Event] = {}
self._commission_lists: dict[int, list] = {}
self._commission_hooked = False
@@ -122,6 +123,9 @@ class CtpBridge:
def connected_mode(self) -> Optional[str]:
return self._connected_mode
+ def connect_in_progress(self) -> bool:
+ return self._connect_in_progress
+
def status(self, mode: str) -> dict[str, Any]:
if self._connected_mode == mode:
self.ping()
@@ -130,6 +134,7 @@ class CtpBridge:
return {
"vnpy_installed": self.available(),
"connected": self._connected_mode == mode,
+ "connecting": self._connect_in_progress,
"connected_mode": self._connected_mode,
"mode_label": _mode_label(mode),
"missing_config": missing,
@@ -139,6 +144,8 @@ class CtpBridge:
}
def connect(self, mode: str, *, force: bool = False) -> None:
+ if self._connect_in_progress:
+ raise RuntimeError("CTP 正在连接中,请稍候")
if not self._engine:
raise RuntimeError(self._last_error or "vnpy 引擎未初始化")
if self._connected_mode == mode and not force:
@@ -154,53 +161,81 @@ class CtpBridge:
if not setting.get("交易服务器"):
raise ValueError(f"{_mode_label(mode)}:未配置交易服务器地址")
- with self._connect_lock:
- if self._connected_mode and self._connected_mode != mode:
+ self._connect_in_progress = True
+ try:
+ with self._connect_lock:
+ if force and self._connected_mode:
+ try:
+ gw = self._engine.get_gateway(GATEWAY_NAME)
+ if gw:
+ gw.close()
+ except Exception:
+ pass
+ self._connected_mode = None
+ time.sleep(0.8)
+ elif self._connected_mode and self._connected_mode != mode:
+ try:
+ self._engine.close()
+ except Exception:
+ pass
+ self._connected_mode = None
+ time.sleep(1)
+
+ ctp_logs: list[str] = []
+ from vnpy.trader.event import EVENT_LOG
+
+ def _on_log(event) -> None:
+ msg = getattr(event.data, "msg", "") or str(event.data)
+ if msg:
+ ctp_logs.append(str(msg))
+ if len(ctp_logs) > 20:
+ ctp_logs.pop(0)
+ logger.info("CTP | %s", msg)
+
+ self._ee.register(EVENT_LOG, _on_log)
try:
- self._engine.close()
- except Exception:
- pass
- self._connected_mode = None
- time.sleep(1)
+ ensure_process_locale()
+ logger.info(
+ "CTP 连接 [%s] user=%s td=%s env=%s",
+ mode,
+ setting.get("用户名"),
+ setting.get("交易服务器"),
+ setting.get("柜台环境", "实盘"),
+ )
+ self._engine.connect(setting, GATEWAY_NAME)
+ for _ in range(60):
+ accounts = self._engine.get_all_accounts()
+ if accounts:
+ self._connected_mode = mode
+ self._last_error = ""
+ logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts))
+ self._schedule_fee_sync(mode)
+ return
+ time.sleep(0.5)
+ finally:
+ self._ee.unregister(EVENT_LOG, _on_log)
- ctp_logs: list[str] = []
- from vnpy.trader.event import EVENT_LOG
+ hint = _format_ctp_failure(ctp_logs)
+ self._last_error = hint
+ raise RuntimeError(hint)
+ finally:
+ self._connect_in_progress = False
- def _on_log(event) -> None:
- msg = getattr(event.data, "msg", "") or str(event.data)
- if msg:
- ctp_logs.append(str(msg))
- if len(ctp_logs) > 20:
- ctp_logs.pop(0)
- logger.info("CTP | %s", msg)
+ def start_connect_async(self, mode: str, *, force: bool = False) -> dict[str, Any]:
+ """后台连接,不阻塞 HTTP 请求。"""
+ if self._connected_mode == mode and self.ping() and not force:
+ return {"started": False, "connecting": False, "connected": True}
+ if self._connect_in_progress:
+ return {"started": False, "connecting": True, "connected": False}
- self._ee.register(EVENT_LOG, _on_log)
+ def _run() -> None:
try:
- ensure_process_locale()
- logger.info(
- "CTP 连接 [%s] user=%s td=%s env=%s",
- mode,
- setting.get("用户名"),
- setting.get("交易服务器"),
- setting.get("柜台环境", "实盘"),
- )
- self._engine.connect(setting, GATEWAY_NAME)
- # 等待登录与结算信息(最多约 30 秒)
- for _ in range(60):
- accounts = self._engine.get_all_accounts()
- if accounts:
- self._connected_mode = mode
- self._last_error = ""
- logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts))
- self._schedule_fee_sync(mode)
- return
- time.sleep(0.5)
- finally:
- self._ee.unregister(EVENT_LOG, _on_log)
+ self.connect(mode, force=force)
+ except Exception as exc:
+ logger.warning("CTP 后台连接失败: %s", exc)
- hint = _format_ctp_failure(ctp_logs)
- self._last_error = hint
- raise RuntimeError(hint)
+ threading.Thread(target=_run, daemon=True, name="ctp-connect-async").start()
+ return {"started": True, "connecting": True, "connected": False}
def ensure_connected(self, mode: str) -> None:
if self._connected_mode == mode and self.ping():
@@ -226,6 +261,7 @@ class CtpBridge:
"""连接成功后触发每日同步检查(非每次全量)。"""
def _run() -> None:
+ time.sleep(45)
try:
from ctp_fee_worker import try_daily_ctp_fee_sync
@@ -708,18 +744,28 @@ def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
return b.status(mode)
+def ctp_start_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
+ """非阻塞发起连接,供 Web API 使用。"""
+ b = get_bridge()
+ info = b.start_connect_async(mode, force=force)
+ st = b.status(mode)
+ return {**info, "status": st}
+
+
def ctp_try_auto_reconnect(mode: str) -> bool:
"""断线时静默重连;已连接且 ping 正常则直接返回 True。"""
b = get_bridge()
if not b.available():
return False
+ if b.connect_in_progress():
+ return False
st = _setting_for_mode(mode)
if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"):
return False
if b.connected_mode == mode and b.ping():
return True
try:
- b.connect(mode, force=True)
+ b.connect(mode, force=False)
return b.connected_mode == mode
except Exception as exc:
logger.info("CTP 自动重连失败: %s", exc)