fix: CTP连接改后台异步,避免多路重连互相阻塞
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+88
-42
@@ -87,6 +87,7 @@ class CtpBridge:
|
||||
self._connected_mode: Optional[str] = None
|
||||
self._last_error: str = ""
|
||||
self._connect_lock = threading.Lock()
|
||||
self._connect_in_progress = False
|
||||
self._commission_waiters: dict[int, threading.Event] = {}
|
||||
self._commission_lists: dict[int, list] = {}
|
||||
self._commission_hooked = False
|
||||
@@ -122,6 +123,9 @@ class CtpBridge:
|
||||
def connected_mode(self) -> Optional[str]:
|
||||
return self._connected_mode
|
||||
|
||||
def connect_in_progress(self) -> bool:
|
||||
return self._connect_in_progress
|
||||
|
||||
def status(self, mode: str) -> dict[str, Any]:
|
||||
if self._connected_mode == mode:
|
||||
self.ping()
|
||||
@@ -130,6 +134,7 @@ class CtpBridge:
|
||||
return {
|
||||
"vnpy_installed": self.available(),
|
||||
"connected": self._connected_mode == mode,
|
||||
"connecting": self._connect_in_progress,
|
||||
"connected_mode": self._connected_mode,
|
||||
"mode_label": _mode_label(mode),
|
||||
"missing_config": missing,
|
||||
@@ -139,6 +144,8 @@ class CtpBridge:
|
||||
}
|
||||
|
||||
def connect(self, mode: str, *, force: bool = False) -> None:
|
||||
if self._connect_in_progress:
|
||||
raise RuntimeError("CTP 正在连接中,请稍候")
|
||||
if not self._engine:
|
||||
raise RuntimeError(self._last_error or "vnpy 引擎未初始化")
|
||||
if self._connected_mode == mode and not force:
|
||||
@@ -154,53 +161,81 @@ class CtpBridge:
|
||||
if not setting.get("交易服务器"):
|
||||
raise ValueError(f"{_mode_label(mode)}:未配置交易服务器地址")
|
||||
|
||||
with self._connect_lock:
|
||||
if self._connected_mode and self._connected_mode != mode:
|
||||
self._connect_in_progress = True
|
||||
try:
|
||||
with self._connect_lock:
|
||||
if force and self._connected_mode:
|
||||
try:
|
||||
gw = self._engine.get_gateway(GATEWAY_NAME)
|
||||
if gw:
|
||||
gw.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._connected_mode = None
|
||||
time.sleep(0.8)
|
||||
elif self._connected_mode and self._connected_mode != mode:
|
||||
try:
|
||||
self._engine.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._connected_mode = None
|
||||
time.sleep(1)
|
||||
|
||||
ctp_logs: list[str] = []
|
||||
from vnpy.trader.event import EVENT_LOG
|
||||
|
||||
def _on_log(event) -> None:
|
||||
msg = getattr(event.data, "msg", "") or str(event.data)
|
||||
if msg:
|
||||
ctp_logs.append(str(msg))
|
||||
if len(ctp_logs) > 20:
|
||||
ctp_logs.pop(0)
|
||||
logger.info("CTP | %s", msg)
|
||||
|
||||
self._ee.register(EVENT_LOG, _on_log)
|
||||
try:
|
||||
self._engine.close()
|
||||
except Exception:
|
||||
pass
|
||||
self._connected_mode = None
|
||||
time.sleep(1)
|
||||
ensure_process_locale()
|
||||
logger.info(
|
||||
"CTP 连接 [%s] user=%s td=%s env=%s",
|
||||
mode,
|
||||
setting.get("用户名"),
|
||||
setting.get("交易服务器"),
|
||||
setting.get("柜台环境", "实盘"),
|
||||
)
|
||||
self._engine.connect(setting, GATEWAY_NAME)
|
||||
for _ in range(60):
|
||||
accounts = self._engine.get_all_accounts()
|
||||
if accounts:
|
||||
self._connected_mode = mode
|
||||
self._last_error = ""
|
||||
logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts))
|
||||
self._schedule_fee_sync(mode)
|
||||
return
|
||||
time.sleep(0.5)
|
||||
finally:
|
||||
self._ee.unregister(EVENT_LOG, _on_log)
|
||||
|
||||
ctp_logs: list[str] = []
|
||||
from vnpy.trader.event import EVENT_LOG
|
||||
hint = _format_ctp_failure(ctp_logs)
|
||||
self._last_error = hint
|
||||
raise RuntimeError(hint)
|
||||
finally:
|
||||
self._connect_in_progress = False
|
||||
|
||||
def _on_log(event) -> None:
|
||||
msg = getattr(event.data, "msg", "") or str(event.data)
|
||||
if msg:
|
||||
ctp_logs.append(str(msg))
|
||||
if len(ctp_logs) > 20:
|
||||
ctp_logs.pop(0)
|
||||
logger.info("CTP | %s", msg)
|
||||
def start_connect_async(self, mode: str, *, force: bool = False) -> dict[str, Any]:
|
||||
"""后台连接,不阻塞 HTTP 请求。"""
|
||||
if self._connected_mode == mode and self.ping() and not force:
|
||||
return {"started": False, "connecting": False, "connected": True}
|
||||
if self._connect_in_progress:
|
||||
return {"started": False, "connecting": True, "connected": False}
|
||||
|
||||
self._ee.register(EVENT_LOG, _on_log)
|
||||
def _run() -> None:
|
||||
try:
|
||||
ensure_process_locale()
|
||||
logger.info(
|
||||
"CTP 连接 [%s] user=%s td=%s env=%s",
|
||||
mode,
|
||||
setting.get("用户名"),
|
||||
setting.get("交易服务器"),
|
||||
setting.get("柜台环境", "实盘"),
|
||||
)
|
||||
self._engine.connect(setting, GATEWAY_NAME)
|
||||
# 等待登录与结算信息(最多约 30 秒)
|
||||
for _ in range(60):
|
||||
accounts = self._engine.get_all_accounts()
|
||||
if accounts:
|
||||
self._connected_mode = mode
|
||||
self._last_error = ""
|
||||
logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts))
|
||||
self._schedule_fee_sync(mode)
|
||||
return
|
||||
time.sleep(0.5)
|
||||
finally:
|
||||
self._ee.unregister(EVENT_LOG, _on_log)
|
||||
self.connect(mode, force=force)
|
||||
except Exception as exc:
|
||||
logger.warning("CTP 后台连接失败: %s", exc)
|
||||
|
||||
hint = _format_ctp_failure(ctp_logs)
|
||||
self._last_error = hint
|
||||
raise RuntimeError(hint)
|
||||
threading.Thread(target=_run, daemon=True, name="ctp-connect-async").start()
|
||||
return {"started": True, "connecting": True, "connected": False}
|
||||
|
||||
def ensure_connected(self, mode: str) -> None:
|
||||
if self._connected_mode == mode and self.ping():
|
||||
@@ -226,6 +261,7 @@ class CtpBridge:
|
||||
"""连接成功后触发每日同步检查(非每次全量)。"""
|
||||
|
||||
def _run() -> None:
|
||||
time.sleep(45)
|
||||
try:
|
||||
from ctp_fee_worker import try_daily_ctp_fee_sync
|
||||
|
||||
@@ -708,18 +744,28 @@ def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
|
||||
return b.status(mode)
|
||||
|
||||
|
||||
def ctp_start_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
|
||||
"""非阻塞发起连接,供 Web API 使用。"""
|
||||
b = get_bridge()
|
||||
info = b.start_connect_async(mode, force=force)
|
||||
st = b.status(mode)
|
||||
return {**info, "status": st}
|
||||
|
||||
|
||||
def ctp_try_auto_reconnect(mode: str) -> bool:
|
||||
"""断线时静默重连;已连接且 ping 正常则直接返回 True。"""
|
||||
b = get_bridge()
|
||||
if not b.available():
|
||||
return False
|
||||
if b.connect_in_progress():
|
||||
return False
|
||||
st = _setting_for_mode(mode)
|
||||
if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"):
|
||||
return False
|
||||
if b.connected_mode == mode and b.ping():
|
||||
return True
|
||||
try:
|
||||
b.connect(mode, force=True)
|
||||
b.connect(mode, force=False)
|
||||
return b.connected_mode == mode
|
||||
except Exception as exc:
|
||||
logger.info("CTP 自动重连失败: %s", exc)
|
||||
|
||||
Reference in New Issue
Block a user