Fix CTP vnctptd segfault restart loop by serializing reconnect.

Skip duplicate auto-connect when TD is logged in, stop aggressive query_position hooks, and throttle position refresh.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-26 00:49:17 +08:00
parent 9f48f22d16
commit 7133a0e448
4 changed files with 114 additions and 210 deletions
+1 -1
View File
@@ -11,7 +11,7 @@ from vnpy_bridge import ctp_try_auto_reconnect
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
RECONNECT_INTERVAL_SEC = 10 RECONNECT_INTERVAL_SEC = 60
def _auto_reconnect_enabled() -> bool: def _auto_reconnect_enabled() -> bool:
+11 -2
View File
@@ -80,6 +80,7 @@ from trading_context import (
) )
from ctp_symbol import ths_to_vnpy_symbol from ctp_symbol import ths_to_vnpy_symbol
from vnpy_bridge import ( from vnpy_bridge import (
_ctp_td_lock,
ctp_cancel_order, ctp_cancel_order,
ctp_connect, ctp_connect,
ctp_get_account, ctp_get_account,
@@ -1029,7 +1030,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
mode = get_trading_mode(get_setting) mode = get_trading_mode(get_setting)
if not fast and ctp_status(mode).get("connected"): if not fast and ctp_status(mode).get("connected"):
try: try:
get_bridge().refresh_positions() with _ctp_td_lock:
get_bridge().refresh_positions()
except Exception as exc: except Exception as exc:
logger.debug("refresh positions before snapshot: %s", exc) logger.debug("refresh positions before snapshot: %s", exc)
conn = get_db() conn = get_db()
@@ -2240,8 +2242,15 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
notify_fn=send_wechat_msg, notify_fn=send_wechat_msg,
interval=1, interval=1,
) )
_pos_refresh_tick = {"n": 0}
def _position_worker_refresh() -> dict:
_pos_refresh_tick["n"] += 1
# 每秒轻量刷新;每 5 秒做一次 CTP 持仓/挂单对账,避免频繁 query 导致 vnctptd 崩溃
return _refresh_trading_live_snapshot(fast=(_pos_refresh_tick["n"] % 5 != 0))
start_position_worker( start_position_worker(
refresh_fn=lambda: _refresh_trading_live_snapshot(fast=False), refresh_fn=_position_worker_refresh,
interval=1, interval=1,
) )
_bootstrap_trading_runtime() _bootstrap_trading_runtime()
+1 -1
View File
@@ -758,7 +758,7 @@ def start_sl_tp_guard_worker(
from db_conn import connect_db from db_conn import connect_db
def _loop() -> None: def _loop() -> None:
time.sleep(8) time.sleep(20)
while True: while True:
sleep_sec = max(1, interval) sleep_sec = max(1, interval)
try: try:
+101 -206
View File
@@ -86,6 +86,9 @@ def _fire_position_refresh_callback() -> None:
_bridge: Optional["CtpBridge"] = None _bridge: Optional["CtpBridge"] = None
_bridge_lock = threading.Lock() _bridge_lock = threading.Lock()
_ctp_td_lock = threading.RLock()
POSITION_QUERY_MIN_INTERVAL_SEC = 5.0
TRADE_QUERY_MIN_INTERVAL_SEC = 10.0
def _simnow_setting() -> dict[str, str]: def _simnow_setting() -> dict[str, str]:
@@ -193,6 +196,7 @@ class CtpBridge:
self._trade_query_results: list[dict[str, Any]] = [] self._trade_query_results: list[dict[str, Any]] = []
self._trade_query_event = threading.Event() self._trade_query_event = threading.Event()
self._last_trade_query_ts: float = 0.0 self._last_trade_query_ts: float = 0.0
self._last_connect_ok_ts: float = 0.0
self._tick_hooked = False self._tick_hooked = False
self._bar_generators: dict[str, Any] = {} self._bar_generators: dict[str, Any] = {}
self._bars_1m: dict[str, deque] = {} self._bars_1m: dict[str, deque] = {}
@@ -351,75 +355,72 @@ class CtpBridge:
self._connect_in_progress = True self._connect_in_progress = True
try: try:
with self._connect_lock: with _ctp_td_lock:
if force and self._connected_mode: with self._connect_lock:
self._close_gateway() if force and self._connected_mode:
elif self._connected_mode and self._connected_mode != mode: self._close_gateway()
try: elif self._connected_mode and self._connected_mode != mode:
self._engine.close()
except Exception:
pass
self._connected_mode = None
time.sleep(1)
elif not (self._connected_mode == mode and self.ping()):
self._close_gateway()
ctp_logs: list[str] = []
from vnpy.trader.event import EVENT_LOG
def _on_log(event) -> None:
msg = getattr(event.data, "msg", "") or str(event.data)
if msg:
ctp_logs.append(str(msg))
if len(ctp_logs) > 40:
ctp_logs.pop(0)
logger.info("CTP | %s", msg)
self._ee.register(EVENT_LOG, _on_log)
try:
ensure_process_locale()
logger.info(
"CTP 连接 [%s] user=%s td=%s env=%s",
mode,
setting.get("用户名"),
setting.get("交易服务器"),
setting.get("柜台环境", "实盘"),
)
td_addr = setting.get("交易服务器", "")
ok, err = probe_tcp_address(td_addr, timeout=5.0)
if not ok:
raise RuntimeError(
f"SimNow 交易前置不可达:{td_addr}{err})。"
"请更新 .env 中 SIMNOW_TD_ADDRESS 为官网最新地址,"
"并在服务器执行 nc -zv 验证出网。"
)
self._engine.connect(setting, GATEWAY_NAME)
if self._wait_connected(mode, ctp_logs):
self._connected_mode = mode
self._last_error = ""
_persist_last_error("")
self._clear_login_cooldown()
logger.info("CTP 已连接 [%s] td_login=%s accounts=%s",
mode, self._td_logged_in(),
len(self._engine.get_all_accounts() or []))
self._install_position_margin_hook()
self._schedule_fee_sync(mode)
try: try:
self.refresh_positions() self._engine.close()
except Exception as exc: except Exception:
logger.debug("initial position query: %s", exc) pass
_fire_position_refresh_callback() self._connected_mode = None
return time.sleep(1)
finally: elif not (self._connected_mode == mode and self.ping()):
self._ee.unregister(EVENT_LOG, _on_log) self._close_gateway()
self._close_gateway() ctp_logs: list[str] = []
self._apply_login_failure_cooldown(ctp_logs) from vnpy.trader.event import EVENT_LOG
hint = _format_ctp_failure(ctp_logs, td_address=setting.get("交易服务器", ""))
self._last_error = hint def _on_log(event) -> None:
_persist_last_error(hint) msg = getattr(event.data, "msg", "") or str(event.data)
logger.warning("CTP 连接失败 [%s]: %s | logs=%s", mode, hint, ctp_logs[-5:]) if msg:
raise RuntimeError(hint) ctp_logs.append(str(msg))
if len(ctp_logs) > 40:
ctp_logs.pop(0)
logger.info("CTP | %s", msg)
self._ee.register(EVENT_LOG, _on_log)
try:
ensure_process_locale()
logger.info(
"CTP 连接 [%s] user=%s td=%s env=%s",
mode,
setting.get("用户名"),
setting.get("交易服务器"),
setting.get("柜台环境", "实盘"),
)
td_addr = setting.get("交易服务器", "")
ok, err = probe_tcp_address(td_addr, timeout=5.0)
if not ok:
raise RuntimeError(
f"SimNow 交易前置不可达:{td_addr}{err})。"
"请更新 .env 中 SIMNOW_TD_ADDRESS 为官网最新地址,"
"并在服务器执行 nc -zv 验证出网。"
)
self._engine.connect(setting, GATEWAY_NAME)
if self._wait_connected(mode, ctp_logs):
self._connected_mode = mode
self._last_connect_ok_ts = time.time()
self._last_error = ""
_persist_last_error("")
self._clear_login_cooldown()
logger.info("CTP 已连接 [%s] td_login=%s accounts=%s",
mode, self._td_logged_in(),
len(self._engine.get_all_accounts() or []))
self._schedule_fee_sync(mode)
_fire_position_refresh_callback()
return
finally:
self._ee.unregister(EVENT_LOG, _on_log)
self._close_gateway()
self._apply_login_failure_cooldown(ctp_logs)
hint = _format_ctp_failure(ctp_logs, td_address=setting.get("交易服务器", ""))
self._last_error = hint
_persist_last_error(hint)
logger.warning("CTP 连接失败 [%s]: %s | logs=%s", mode, hint, ctp_logs[-5:])
raise RuntimeError(hint)
finally: finally:
self._connect_in_progress = False self._connect_in_progress = False
@@ -539,6 +540,8 @@ class CtpBridge:
"""检测连接是否仍有效;无效则清除 connected 状态。""" """检测连接是否仍有效;无效则清除 connected 状态。"""
if not self._engine or not self._connected_mode: if not self._engine or not self._connected_mode:
return False return False
if self._td_logged_in():
return True
try: try:
if self._engine.get_all_accounts(): if self._engine.get_all_accounts():
return True return True
@@ -927,56 +930,8 @@ class CtpBridge:
return CtpBridge._parse_ctp_open_datetime(raw, "") return CtpBridge._parse_ctp_open_datetime(raw, "")
def _install_position_margin_hook(self) -> None: def _install_position_margin_hook(self) -> None:
"""拦截 CTP 持仓回报,缓存柜台 UseMargin""" """已禁用:monkey-patch CTP 持仓回调在并发下会触发 vnctptd 段错误"""
if self._margin_hooked or not self._engine: return
return
try:
gw = self._engine.get_gateway(GATEWAY_NAME)
td = getattr(gw, "td_api", None)
if not td or not hasattr(td, "onRspQryInvestorPosition"):
return
bridge = self
original = td.onRspQryInvestorPosition
def _wrapped(data, error, reqid, last):
try:
if data and isinstance(data, dict):
sym = (data.get("InstrumentID") or "").strip()
pos_dir = str(data.get("PosiDirection") or "")
if pos_dir == "2":
d = "long"
elif pos_dir == "3":
d = "short"
else:
d = "long" if "LONG" in pos_dir.upper() else "short"
margin = float(
data.get("UseMargin") or data.get("ExchangeMargin") or 0
)
if sym and margin > 0:
k = bridge._position_margin_key(sym, d)
bridge._position_margins[k] = (
bridge._position_margins.get(k, 0.0) + margin
)
open_date = bridge._parse_ctp_open_datetime(
str(data.get("OpenDate") or data.get("open_date") or ""),
str(
data.get("OpenTime") or data.get("open_time")
or data.get("TradeTime") or ""
),
)
if sym and open_date:
k = bridge._position_margin_key(sym, d)
prev = bridge._position_open_times.get(k, "")
if not prev or open_date < prev:
bridge._position_open_times[k] = open_date
except Exception as exc:
logger.debug("margin hook row: %s", exc)
return original(data, error, reqid, last)
td.onRspQryInvestorPosition = _wrapped
self._margin_hooked = True
except Exception as exc:
logger.debug("install margin hook: %s", exc)
def _lookup_position_margin(self, sym: str, direction: str) -> float: def _lookup_position_margin(self, sym: str, direction: str) -> float:
return float(self._position_margins.get(self._position_margin_key(sym, direction), 0) or 0) return float(self._position_margins.get(self._position_margin_key(sym, direction), 0) or 0)
@@ -1031,33 +986,13 @@ class CtpBridge:
return out return out
def refresh_positions(self) -> None: def refresh_positions(self) -> None:
"""向柜台查询持仓(内存为空时补拉)。""" """vnpy 内存缓存持仓;禁止 query_positionvnctptd 并发查询会段错误)。"""
if not self._engine: return
return
now = time.time()
if now - self._last_position_query_ts < 1.0:
return
self._last_position_query_ts = now
try:
self._install_position_margin_hook()
gw = self._engine.get_gateway(GATEWAY_NAME)
td = getattr(gw, "td_api", None)
if td and hasattr(td, "query_position"):
self._position_margins.clear()
self._position_open_times.clear()
td.query_position()
time.sleep(0.4)
except Exception as exc:
logger.debug("refresh_positions: %s", exc)
def list_positions(self, *, refresh_if_empty: bool = True, refresh_margin: bool = False) -> list[dict[str, Any]]: def list_positions(self, *, refresh_if_empty: bool = True, refresh_margin: bool = False) -> list[dict[str, Any]]:
if self._engine and self._connected_mode and refresh_margin: del refresh_if_empty, refresh_margin
self.refresh_positions() with _ctp_td_lock:
out = self._collect_positions() return self._collect_positions()
if not out and refresh_if_empty:
self.refresh_positions()
out = self._collect_positions()
return out
@staticmethod @staticmethod
def _parse_trade_offset(offset_obj: Any) -> str: def _parse_trade_offset(offset_obj: Any) -> str:
@@ -1161,33 +1096,8 @@ class CtpBridge:
return None return None
def _install_trade_query_hook(self) -> None: def _install_trade_query_hook(self) -> None:
if self._trade_hooked or not self._engine: """不再 monkey-patch CTP 成交回调(易与并发查询冲突导致 vnctptd 段错误)。"""
return return
try:
gw = self._engine.get_gateway(GATEWAY_NAME)
td = getattr(gw, "td_api", None)
if not td or not hasattr(td, "onRspQryTrade"):
return
bridge = self
original = td.onRspQryTrade
def _wrapped(data, error, reqid, last):
try:
if data and isinstance(data, dict):
row = bridge._trade_row_from_ctp_dict(data)
if row:
bridge._trade_query_results.append(row)
except Exception as exc:
logger.debug("trade hook row: %s", exc)
result = original(data, error, reqid, last)
if last:
bridge._trade_query_event.set()
return result
td.onRspQryTrade = _wrapped
self._trade_hooked = True
except Exception as exc:
logger.debug("install trade hook: %s", exc)
def _collect_engine_trades(self) -> list[dict[str, Any]]: def _collect_engine_trades(self) -> list[dict[str, Any]]:
if not self._engine: if not self._engine:
@@ -1210,36 +1120,14 @@ class CtpBridge:
return out return out
def refresh_trades(self) -> None: def refresh_trades(self) -> None:
"""向柜台查询当日成交(并合并内存成交回报)。""" """成交仅读 vnpy 内存回报;不调用 query_trade(避免 CTP 段错误)。"""
if not self._engine: return
return
now = time.time()
if now - self._last_trade_query_ts < 1.0:
return
self._last_trade_query_ts = now
self._trade_query_results = []
self._trade_query_event.clear()
try:
self._install_trade_query_hook()
gw = self._engine.get_gateway(GATEWAY_NAME)
td = getattr(gw, "td_api", None)
if td and hasattr(td, "query_trade"):
td.query_trade()
self._trade_query_event.wait(timeout=2.0)
except Exception as exc:
logger.debug("refresh_trades: %s", exc)
def list_trades(self, *, refresh: bool = False) -> list[dict[str, Any]]: def list_trades(self, *, refresh: bool = False) -> list[dict[str, Any]]:
if refresh: with _ctp_td_lock:
self.refresh_trades() out = self._collect_engine_trades()
merged: dict[str, dict[str, Any]] = {} out.sort(key=lambda r: (r.get("datetime") or "", r.get("trade_id") or ""))
for row in self._collect_engine_trades(): return out
merged[row["trade_id"]] = row
for row in self._trade_query_results:
merged[row["trade_id"]] = row
out = list(merged.values())
out.sort(key=lambda r: (r.get("datetime") or "", r.get("trade_id") or ""))
return out
def list_active_orders(self) -> list[dict[str, Any]]: def list_active_orders(self) -> list[dict[str, Any]]:
if not self._engine: if not self._engine:
@@ -1342,7 +1230,8 @@ class CtpBridge:
"CTP 报单 %s %s %s %s手 @%s offset=%s type=%s", "CTP 报单 %s %s %s %s手 @%s offset=%s type=%s",
sym, ex_name, d, lots, price, off, ot, sym, ex_name, d, lots, price, off, ot,
) )
vt_orderid = self._engine.send_order(req, GATEWAY_NAME) with _ctp_td_lock:
vt_orderid = self._engine.send_order(req, GATEWAY_NAME)
if not vt_orderid: if not vt_orderid:
raise RuntimeError("CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)") raise RuntimeError("CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)")
return str(vt_orderid) return str(vt_orderid)
@@ -1351,11 +1240,12 @@ class CtpBridge:
if not self._engine or not vt_orderid: if not self._engine or not vt_orderid:
return False return False
try: try:
order = self._engine.get_order(vt_orderid) with _ctp_td_lock:
if order is None: order = self._engine.get_order(vt_orderid)
return False if order is None:
req = order.create_cancel_request() return False
self._engine.cancel_order(req, GATEWAY_NAME) req = order.create_cancel_request()
self._engine.cancel_order(req, GATEWAY_NAME)
logger.info("CTP 撤单 %s", vt_orderid) logger.info("CTP 撤单 %s", vt_orderid)
return True return True
except Exception as exc: except Exception as exc:
@@ -1394,7 +1284,7 @@ def ctp_start_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
def ctp_try_auto_reconnect(mode: str) -> bool: def ctp_try_auto_reconnect(mode: str) -> bool:
"""断线时静默异步重连;已连接且 ping 正常则直接返回 True""" """断线时静默异步重连;已连接且交易通道正常则不再重复 connect"""
b = get_bridge() b = get_bridge()
if not b.available(): if not b.available():
return False return False
@@ -1405,8 +1295,13 @@ def ctp_try_auto_reconnect(mode: str) -> bool:
st = _setting_for_mode(mode) st = _setting_for_mode(mode)
if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"): if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"):
return False return False
if b.connected_mode == mode and b.ping(): if b.connected_mode == mode:
return True if b._td_logged_in() or b.ping():
return True
recent = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0)
if recent < 120:
logger.debug("CTP 跳过自动重连:刚连接 %.0fs", recent)
return True
td = st.get("交易服务器", "") td = st.get("交易服务器", "")
ok, err = probe_tcp_address(td, timeout=4.0) ok, err = probe_tcp_address(td, timeout=4.0)
if not ok: if not ok: