feat: CTP 断线重连、下单卡片优化、手数自动计算

后台每 30s 检测并重连;以损定仓填止损后自动算手数;开仓/平仓按钮并排对齐。

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-24 11:56:20 +08:00
parent 36e26973fb
commit b4250171d5
7 changed files with 177 additions and 45 deletions
+41 -3
View File
@@ -115,6 +115,8 @@ class CtpBridge:
return self._connected_mode
def status(self, mode: str) -> dict[str, Any]:
if self._connected_mode == mode:
self.ping()
st = _setting_for_mode(mode)
missing = [k for k in ("用户名", "密码", "交易服务器") if not st.get(k)]
return {
@@ -132,7 +134,9 @@ class CtpBridge:
if not self._engine:
raise RuntimeError(self._last_error or "vnpy 引擎未初始化")
if self._connected_mode == mode and not force:
return
if self.ping():
return
self._connected_mode = None
setting = _setting_for_mode(mode)
if not setting.get("用户名") or not setting.get("密码"):
raise ValueError(
@@ -190,8 +194,24 @@ class CtpBridge:
raise RuntimeError(hint)
def ensure_connected(self, mode: str) -> None:
if self._connected_mode != mode:
self.connect(mode)
if self._connected_mode == mode and self.ping():
return
self.connect(mode)
def ping(self) -> bool:
"""检测连接是否仍有效;无效则清除 connected 状态。"""
if not self._engine or not self._connected_mode:
return False
try:
if self._engine.get_all_accounts():
return True
except Exception as exc:
logger.debug("CTP ping failed: %s", exc)
self._connected_mode = None
return False
def mark_disconnected(self) -> None:
self._connected_mode = None
def get_account(self) -> dict[str, Any]:
if not self._engine:
@@ -310,6 +330,24 @@ def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
return b.status(mode)
def ctp_try_auto_reconnect(mode: str) -> bool:
"""断线时静默重连;已连接且 ping 正常则直接返回 True。"""
b = get_bridge()
if not b.available():
return False
st = _setting_for_mode(mode)
if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"):
return False
if b.connected_mode == mode and b.ping():
return True
try:
b.connect(mode, force=True)
return b.connected_mode == mode
except Exception as exc:
logger.info("CTP 自动重连失败: %s", exc)
return False
def ctp_status(mode: str) -> dict[str, Any]:
return get_bridge().status(mode)