diff --git a/ctp_reconnect.py b/ctp_reconnect.py index ddb7d33..95b1b48 100644 --- a/ctp_reconnect.py +++ b/ctp_reconnect.py @@ -11,7 +11,7 @@ from vnpy_bridge import ctp_try_auto_reconnect logger = logging.getLogger(__name__) -RECONNECT_INTERVAL_SEC = 10 +RECONNECT_INTERVAL_SEC = 60 def _auto_reconnect_enabled() -> bool: diff --git a/install_trading.py b/install_trading.py index dbe50f8..ef6e091 100644 --- a/install_trading.py +++ b/install_trading.py @@ -80,6 +80,7 @@ from trading_context import ( ) from ctp_symbol import ths_to_vnpy_symbol from vnpy_bridge import ( + _ctp_td_lock, ctp_cancel_order, ctp_connect, ctp_get_account, @@ -1029,7 +1030,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se mode = get_trading_mode(get_setting) if not fast and ctp_status(mode).get("connected"): try: - get_bridge().refresh_positions() + with _ctp_td_lock: + get_bridge().refresh_positions() except Exception as exc: logger.debug("refresh positions before snapshot: %s", exc) conn = get_db() @@ -2240,8 +2242,15 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se notify_fn=send_wechat_msg, interval=1, ) + _pos_refresh_tick = {"n": 0} + + def _position_worker_refresh() -> dict: + _pos_refresh_tick["n"] += 1 + # 每秒轻量刷新;每 5 秒做一次 CTP 持仓/挂单对账,避免频繁 query 导致 vnctptd 崩溃 + return _refresh_trading_live_snapshot(fast=(_pos_refresh_tick["n"] % 5 != 0)) + start_position_worker( - refresh_fn=lambda: _refresh_trading_live_snapshot(fast=False), + refresh_fn=_position_worker_refresh, interval=1, ) _bootstrap_trading_runtime() diff --git a/sl_tp_guard.py b/sl_tp_guard.py index 0d8528d..80ab22d 100644 --- a/sl_tp_guard.py +++ b/sl_tp_guard.py @@ -758,7 +758,7 @@ def start_sl_tp_guard_worker( from db_conn import connect_db def _loop() -> None: - time.sleep(8) + time.sleep(20) while True: sleep_sec = max(1, interval) try: diff --git a/vnpy_bridge.py b/vnpy_bridge.py index 58a5a9c..dde7012 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -86,6 +86,9 @@ def _fire_position_refresh_callback() -> None: _bridge: Optional["CtpBridge"] = None _bridge_lock = threading.Lock() +_ctp_td_lock = threading.RLock() +POSITION_QUERY_MIN_INTERVAL_SEC = 5.0 +TRADE_QUERY_MIN_INTERVAL_SEC = 10.0 def _simnow_setting() -> dict[str, str]: @@ -193,6 +196,7 @@ class CtpBridge: self._trade_query_results: list[dict[str, Any]] = [] self._trade_query_event = threading.Event() self._last_trade_query_ts: float = 0.0 + self._last_connect_ok_ts: float = 0.0 self._tick_hooked = False self._bar_generators: dict[str, Any] = {} self._bars_1m: dict[str, deque] = {} @@ -351,75 +355,72 @@ class CtpBridge: self._connect_in_progress = True try: - with self._connect_lock: - if force and self._connected_mode: - self._close_gateway() - elif self._connected_mode and self._connected_mode != mode: - try: - self._engine.close() - except Exception: - pass - self._connected_mode = None - time.sleep(1) - elif not (self._connected_mode == mode and self.ping()): - self._close_gateway() - - ctp_logs: list[str] = [] - from vnpy.trader.event import EVENT_LOG - - def _on_log(event) -> None: - msg = getattr(event.data, "msg", "") or str(event.data) - if msg: - ctp_logs.append(str(msg)) - if len(ctp_logs) > 40: - ctp_logs.pop(0) - logger.info("CTP | %s", msg) - - self._ee.register(EVENT_LOG, _on_log) - try: - ensure_process_locale() - logger.info( - "CTP 连接 [%s] user=%s td=%s env=%s", - mode, - setting.get("用户名"), - setting.get("交易服务器"), - setting.get("柜台环境", "实盘"), - ) - td_addr = setting.get("交易服务器", "") - ok, err = probe_tcp_address(td_addr, timeout=5.0) - if not ok: - raise RuntimeError( - f"SimNow 交易前置不可达:{td_addr}({err})。" - "请更新 .env 中 SIMNOW_TD_ADDRESS 为官网最新地址," - "并在服务器执行 nc -zv 验证出网。" - ) - self._engine.connect(setting, GATEWAY_NAME) - if self._wait_connected(mode, ctp_logs): - self._connected_mode = mode - self._last_error = "" - _persist_last_error("") - self._clear_login_cooldown() - logger.info("CTP 已连接 [%s] td_login=%s accounts=%s", - mode, self._td_logged_in(), - len(self._engine.get_all_accounts() or [])) - self._install_position_margin_hook() - self._schedule_fee_sync(mode) + with _ctp_td_lock: + with self._connect_lock: + if force and self._connected_mode: + self._close_gateway() + elif self._connected_mode and self._connected_mode != mode: try: - self.refresh_positions() - except Exception as exc: - logger.debug("initial position query: %s", exc) - _fire_position_refresh_callback() - return - finally: - self._ee.unregister(EVENT_LOG, _on_log) + self._engine.close() + except Exception: + pass + self._connected_mode = None + time.sleep(1) + elif not (self._connected_mode == mode and self.ping()): + self._close_gateway() - self._close_gateway() - self._apply_login_failure_cooldown(ctp_logs) - hint = _format_ctp_failure(ctp_logs, td_address=setting.get("交易服务器", "")) - self._last_error = hint - _persist_last_error(hint) - logger.warning("CTP 连接失败 [%s]: %s | logs=%s", mode, hint, ctp_logs[-5:]) - raise RuntimeError(hint) + ctp_logs: list[str] = [] + from vnpy.trader.event import EVENT_LOG + + def _on_log(event) -> None: + msg = getattr(event.data, "msg", "") or str(event.data) + if msg: + ctp_logs.append(str(msg)) + if len(ctp_logs) > 40: + ctp_logs.pop(0) + logger.info("CTP | %s", msg) + + self._ee.register(EVENT_LOG, _on_log) + try: + ensure_process_locale() + logger.info( + "CTP 连接 [%s] user=%s td=%s env=%s", + mode, + setting.get("用户名"), + setting.get("交易服务器"), + setting.get("柜台环境", "实盘"), + ) + td_addr = setting.get("交易服务器", "") + ok, err = probe_tcp_address(td_addr, timeout=5.0) + if not ok: + raise RuntimeError( + f"SimNow 交易前置不可达:{td_addr}({err})。" + "请更新 .env 中 SIMNOW_TD_ADDRESS 为官网最新地址," + "并在服务器执行 nc -zv 验证出网。" + ) + self._engine.connect(setting, GATEWAY_NAME) + if self._wait_connected(mode, ctp_logs): + self._connected_mode = mode + self._last_connect_ok_ts = time.time() + self._last_error = "" + _persist_last_error("") + self._clear_login_cooldown() + logger.info("CTP 已连接 [%s] td_login=%s accounts=%s", + mode, self._td_logged_in(), + len(self._engine.get_all_accounts() or [])) + self._schedule_fee_sync(mode) + _fire_position_refresh_callback() + return + finally: + self._ee.unregister(EVENT_LOG, _on_log) + + self._close_gateway() + self._apply_login_failure_cooldown(ctp_logs) + hint = _format_ctp_failure(ctp_logs, td_address=setting.get("交易服务器", "")) + self._last_error = hint + _persist_last_error(hint) + logger.warning("CTP 连接失败 [%s]: %s | logs=%s", mode, hint, ctp_logs[-5:]) + raise RuntimeError(hint) finally: self._connect_in_progress = False @@ -539,6 +540,8 @@ class CtpBridge: """检测连接是否仍有效;无效则清除 connected 状态。""" if not self._engine or not self._connected_mode: return False + if self._td_logged_in(): + return True try: if self._engine.get_all_accounts(): return True @@ -927,56 +930,8 @@ class CtpBridge: return CtpBridge._parse_ctp_open_datetime(raw, "") def _install_position_margin_hook(self) -> None: - """拦截 CTP 持仓回报,缓存柜台 UseMargin。""" - if self._margin_hooked or not self._engine: - return - try: - gw = self._engine.get_gateway(GATEWAY_NAME) - td = getattr(gw, "td_api", None) - if not td or not hasattr(td, "onRspQryInvestorPosition"): - return - bridge = self - original = td.onRspQryInvestorPosition - - def _wrapped(data, error, reqid, last): - try: - if data and isinstance(data, dict): - sym = (data.get("InstrumentID") or "").strip() - pos_dir = str(data.get("PosiDirection") or "") - if pos_dir == "2": - d = "long" - elif pos_dir == "3": - d = "short" - else: - d = "long" if "LONG" in pos_dir.upper() else "short" - margin = float( - data.get("UseMargin") or data.get("ExchangeMargin") or 0 - ) - if sym and margin > 0: - k = bridge._position_margin_key(sym, d) - bridge._position_margins[k] = ( - bridge._position_margins.get(k, 0.0) + margin - ) - open_date = bridge._parse_ctp_open_datetime( - str(data.get("OpenDate") or data.get("open_date") or ""), - str( - data.get("OpenTime") or data.get("open_time") - or data.get("TradeTime") or "" - ), - ) - if sym and open_date: - k = bridge._position_margin_key(sym, d) - prev = bridge._position_open_times.get(k, "") - if not prev or open_date < prev: - bridge._position_open_times[k] = open_date - except Exception as exc: - logger.debug("margin hook row: %s", exc) - return original(data, error, reqid, last) - - td.onRspQryInvestorPosition = _wrapped - self._margin_hooked = True - except Exception as exc: - logger.debug("install margin hook: %s", exc) + """已禁用:monkey-patch CTP 持仓回调在并发下会触发 vnctptd 段错误。""" + return def _lookup_position_margin(self, sym: str, direction: str) -> float: return float(self._position_margins.get(self._position_margin_key(sym, direction), 0) or 0) @@ -1031,33 +986,13 @@ class CtpBridge: return out def refresh_positions(self) -> None: - """向柜台查询持仓(内存为空时补拉)。""" - if not self._engine: - return - now = time.time() - if now - self._last_position_query_ts < 1.0: - return - self._last_position_query_ts = now - try: - self._install_position_margin_hook() - gw = self._engine.get_gateway(GATEWAY_NAME) - td = getattr(gw, "td_api", None) - if td and hasattr(td, "query_position"): - self._position_margins.clear() - self._position_open_times.clear() - td.query_position() - time.sleep(0.4) - except Exception as exc: - logger.debug("refresh_positions: %s", exc) + """vnpy 内存缓存持仓;禁止 query_position(vnctptd 并发查询会段错误)。""" + return def list_positions(self, *, refresh_if_empty: bool = True, refresh_margin: bool = False) -> list[dict[str, Any]]: - if self._engine and self._connected_mode and refresh_margin: - self.refresh_positions() - out = self._collect_positions() - if not out and refresh_if_empty: - self.refresh_positions() - out = self._collect_positions() - return out + del refresh_if_empty, refresh_margin + with _ctp_td_lock: + return self._collect_positions() @staticmethod def _parse_trade_offset(offset_obj: Any) -> str: @@ -1161,33 +1096,8 @@ class CtpBridge: return None def _install_trade_query_hook(self) -> None: - if self._trade_hooked or not self._engine: - return - try: - gw = self._engine.get_gateway(GATEWAY_NAME) - td = getattr(gw, "td_api", None) - if not td or not hasattr(td, "onRspQryTrade"): - return - bridge = self - original = td.onRspQryTrade - - def _wrapped(data, error, reqid, last): - try: - if data and isinstance(data, dict): - row = bridge._trade_row_from_ctp_dict(data) - if row: - bridge._trade_query_results.append(row) - except Exception as exc: - logger.debug("trade hook row: %s", exc) - result = original(data, error, reqid, last) - if last: - bridge._trade_query_event.set() - return result - - td.onRspQryTrade = _wrapped - self._trade_hooked = True - except Exception as exc: - logger.debug("install trade hook: %s", exc) + """不再 monkey-patch CTP 成交回调(易与并发查询冲突导致 vnctptd 段错误)。""" + return def _collect_engine_trades(self) -> list[dict[str, Any]]: if not self._engine: @@ -1210,36 +1120,14 @@ class CtpBridge: return out def refresh_trades(self) -> None: - """向柜台查询当日成交(并合并内存成交回报)。""" - if not self._engine: - return - now = time.time() - if now - self._last_trade_query_ts < 1.0: - return - self._last_trade_query_ts = now - self._trade_query_results = [] - self._trade_query_event.clear() - try: - self._install_trade_query_hook() - gw = self._engine.get_gateway(GATEWAY_NAME) - td = getattr(gw, "td_api", None) - if td and hasattr(td, "query_trade"): - td.query_trade() - self._trade_query_event.wait(timeout=2.0) - except Exception as exc: - logger.debug("refresh_trades: %s", exc) + """成交仅读 vnpy 内存回报;不调用 query_trade(避免 CTP 段错误)。""" + return def list_trades(self, *, refresh: bool = False) -> list[dict[str, Any]]: - if refresh: - self.refresh_trades() - merged: dict[str, dict[str, Any]] = {} - for row in self._collect_engine_trades(): - merged[row["trade_id"]] = row - for row in self._trade_query_results: - merged[row["trade_id"]] = row - out = list(merged.values()) - out.sort(key=lambda r: (r.get("datetime") or "", r.get("trade_id") or "")) - return out + with _ctp_td_lock: + out = self._collect_engine_trades() + out.sort(key=lambda r: (r.get("datetime") or "", r.get("trade_id") or "")) + return out def list_active_orders(self) -> list[dict[str, Any]]: if not self._engine: @@ -1342,7 +1230,8 @@ class CtpBridge: "CTP 报单 %s %s %s %s手 @%s offset=%s type=%s", sym, ex_name, d, lots, price, off, ot, ) - vt_orderid = self._engine.send_order(req, GATEWAY_NAME) + with _ctp_td_lock: + vt_orderid = self._engine.send_order(req, GATEWAY_NAME) if not vt_orderid: raise RuntimeError("CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)") return str(vt_orderid) @@ -1351,11 +1240,12 @@ class CtpBridge: if not self._engine or not vt_orderid: return False try: - order = self._engine.get_order(vt_orderid) - if order is None: - return False - req = order.create_cancel_request() - self._engine.cancel_order(req, GATEWAY_NAME) + with _ctp_td_lock: + order = self._engine.get_order(vt_orderid) + if order is None: + return False + req = order.create_cancel_request() + self._engine.cancel_order(req, GATEWAY_NAME) logger.info("CTP 撤单 %s", vt_orderid) return True except Exception as exc: @@ -1394,7 +1284,7 @@ def ctp_start_connect(mode: str, *, force: bool = False) -> dict[str, Any]: def ctp_try_auto_reconnect(mode: str) -> bool: - """断线时静默异步重连;已连接且 ping 正常则直接返回 True。""" + """断线时静默异步重连;已连接且交易通道正常则不再重复 connect。""" b = get_bridge() if not b.available(): return False @@ -1405,8 +1295,13 @@ def ctp_try_auto_reconnect(mode: str) -> bool: st = _setting_for_mode(mode) if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"): return False - if b.connected_mode == mode and b.ping(): - return True + if b.connected_mode == mode: + if b._td_logged_in() or b.ping(): + return True + recent = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0) + if recent < 120: + logger.debug("CTP 跳过自动重连:刚连接 %.0fs", recent) + return True td = st.get("交易服务器", "") ok, err = probe_tcp_address(td, timeout=4.0) if not ok: