"""CTP 执行层:模拟盘 → SimNow;实盘 → 期货公司(vnpy_ctp)。""" from __future__ import annotations import logging import os import threading import time from typing import Any, Optional from locale_fix import ensure_process_locale ensure_process_locale() from ctp_symbol import ths_to_vnpy_symbol, to_vnpy_exchange logger = logging.getLogger(__name__) GATEWAY_NAME = "CTP" _bridge: Optional["CtpBridge"] = None _bridge_lock = threading.Lock() def _env(key: str, default: str = "") -> str: return (os.getenv(key) or default).strip() def _simnow_setting() -> dict[str, str]: """SimNow 仿真前置(可在 .env 覆盖)。看穿式前置需「柜台环境=实盘」。""" return { "用户名": _env("SIMNOW_USER"), "密码": _env("SIMNOW_PASSWORD"), "经纪商代码": _env("SIMNOW_BROKER_ID", "9999"), "交易服务器": _env("SIMNOW_TD_ADDRESS", "tcp://180.168.146.187:10201"), "行情服务器": _env("SIMNOW_MD_ADDRESS", "tcp://180.168.146.187:10211"), "产品名称": _env("SIMNOW_APP_ID", "simnow_client_test"), "授权编码": _env("SIMNOW_AUTH_CODE", "0000000000000000"), "柜台环境": _env("SIMNOW_ENV", "实盘"), } def _live_setting() -> dict[str, str]: return { "用户名": _env("CTP_LIVE_USER"), "密码": _env("CTP_LIVE_PASSWORD"), "经纪商代码": _env("CTP_LIVE_BROKER_ID"), "交易服务器": _env("CTP_LIVE_TD_ADDRESS"), "行情服务器": _env("CTP_LIVE_MD_ADDRESS"), "产品名称": _env("CTP_LIVE_APP_ID"), "授权编码": _env("CTP_LIVE_AUTH_CODE"), "柜台环境": _env("CTP_LIVE_ENV", "实盘"), } def _setting_for_mode(mode: str) -> dict[str, str]: return _simnow_setting() if mode == "simulation" else _live_setting() def _mode_label(mode: str) -> str: return "SimNow 模拟" if mode == "simulation" else "期货公司实盘" def _format_ctp_failure(ctp_logs: list[str]) -> str: """根据 CTP 网关日志拼出可读错误。""" text = "\n".join(ctp_logs) if "4097" in text or "Decrypt handshake" in text or "shake hand" in text.lower(): return ( "CTP 握手失败(4097):vnpy_ctp 与 SimNow 前置加密不匹配。" "请执行 pip install -U vnpy vnpy_ctp 后重启,并确认 .env 中 SIMNOW_ENV=实盘" ) if "不合法的登录" in text or "密码" in text or "账号" in text: tail = ctp_logs[-1] if ctp_logs else "" return f"CTP 登录被拒:{tail or '请检查投资者代码与密码(快期能否登录)'}" if "连接断开" in text or "disconnect" in text.lower(): tail = ctp_logs[-1] if ctp_logs else "" return f"CTP 连接断开:{tail or '请检查前置地址与网络'}" if ctp_logs: return f"CTP 连接失败:{ctp_logs[-1]}" return "CTP 连接超时:未收到柜台回报。请检查 SimNow 账号、前置地址、网络(nc 测端口),并用快期验证账号" class CtpBridge: def __init__(self) -> None: self._engine = None self._ee = None self._connected_mode: Optional[str] = None self._last_error: str = "" self._connect_lock = threading.Lock() self._commission_waiters: dict[int, threading.Event] = {} self._commission_results: dict[int, dict] = {} self._commission_hooked = False self._init_engine() def _init_engine(self) -> None: ensure_process_locale() try: from vnpy.event import EventEngine from vnpy.trader.engine import MainEngine from vnpy_ctp import CtpGateway self._ee = EventEngine() self._engine = MainEngine(self._ee) self._engine.add_gateway(CtpGateway) except ImportError: self._last_error = "未安装 vnpy / vnpy_ctp,请 pip install vnpy vnpy_ctp" except Exception as exc: self._last_error = str(exc) def available(self) -> bool: return self._engine is not None @property def last_error(self) -> str: return self._last_error @property def connected_mode(self) -> Optional[str]: return self._connected_mode def status(self, mode: str) -> dict[str, Any]: if self._connected_mode == mode: self.ping() st = _setting_for_mode(mode) missing = [k for k in ("用户名", "密码", "交易服务器") if not st.get(k)] return { "vnpy_installed": self.available(), "connected": self._connected_mode == mode, "connected_mode": self._connected_mode, "mode_label": _mode_label(mode), "missing_config": missing, "last_error": self._last_error, "broker_id": st.get("经纪商代码", ""), "td_address": st.get("交易服务器", ""), } def connect(self, mode: str, *, force: bool = False) -> None: if not self._engine: raise RuntimeError(self._last_error or "vnpy 引擎未初始化") if self._connected_mode == mode and not force: if self.ping(): return self._connected_mode = None setting = _setting_for_mode(mode) if not setting.get("用户名") or not setting.get("密码"): raise ValueError( f"{_mode_label(mode)}:请在 .env 配置 " f"{'SIMNOW_USER / SIMNOW_PASSWORD' if mode == 'simulation' else 'CTP_LIVE_USER / CTP_LIVE_PASSWORD'}" ) if not setting.get("交易服务器"): raise ValueError(f"{_mode_label(mode)}:未配置交易服务器地址") with self._connect_lock: if self._connected_mode and self._connected_mode != mode: try: self._engine.close() except Exception: pass self._connected_mode = None time.sleep(1) ctp_logs: list[str] = [] from vnpy.trader.event import EVENT_LOG def _on_log(event) -> None: msg = getattr(event.data, "msg", "") or str(event.data) if msg: ctp_logs.append(str(msg)) if len(ctp_logs) > 20: ctp_logs.pop(0) logger.info("CTP | %s", msg) self._ee.register(EVENT_LOG, _on_log) try: ensure_process_locale() logger.info( "CTP 连接 [%s] user=%s td=%s env=%s", mode, setting.get("用户名"), setting.get("交易服务器"), setting.get("柜台环境", "实盘"), ) self._engine.connect(setting, GATEWAY_NAME) # 等待登录与结算信息(最多约 30 秒) for _ in range(60): accounts = self._engine.get_all_accounts() if accounts: self._connected_mode = mode self._last_error = "" logger.info("CTP 已连接 [%s] account=%s", mode, len(accounts)) self._schedule_fee_sync(mode) return time.sleep(0.5) finally: self._ee.unregister(EVENT_LOG, _on_log) hint = _format_ctp_failure(ctp_logs) self._last_error = hint raise RuntimeError(hint) def ensure_connected(self, mode: str) -> None: if self._connected_mode == mode and self.ping(): return self.connect(mode) def ping(self) -> bool: """检测连接是否仍有效;无效则清除 connected 状态。""" if not self._engine or not self._connected_mode: return False try: if self._engine.get_all_accounts(): return True except Exception as exc: logger.debug("CTP ping failed: %s", exc) self._connected_mode = None return False def mark_disconnected(self) -> None: self._connected_mode = None def _schedule_fee_sync(self, mode: str) -> None: def _run() -> None: try: from ctp_fee_sync import sync_fees_from_ctp n, msg = sync_fees_from_ctp(mode, max_symbols=60) logger.info("CTP 手续费同步: %s", msg if n else msg) except Exception as exc: logger.debug("CTP 手续费后台同步: %s", exc) threading.Thread(target=_run, daemon=True, name="ctp-fee-sync").start() def _ensure_commission_callback(self) -> None: if self._commission_hooked or not self._engine: return try: gw = self._engine.get_gateway(GATEWAY_NAME) td = gw.td_api except Exception: return bridge = self def on_rsp(data: dict, error: dict, reqid: int, last: bool) -> None: if data and data.get("InstrumentID"): bridge._commission_results[reqid] = dict(data) ev = bridge._commission_waiters.get(reqid) if last and ev: ev.set() td.onRspQryInstrumentCommissionRate = on_rsp # type: ignore[method-assign] self._commission_hooked = True def query_instrument_commission(self, ths_code: str, *, mode: str) -> dict: """查询单合约 CTP 手续费率(需已连接)。""" if self._connected_mode != mode or not self._engine: return {} try: from ctp_symbol import ths_to_vnpy_symbol sym, _ = ths_to_vnpy_symbol(ths_code) gw = self._engine.get_gateway(GATEWAY_NAME) td = gw.td_api except Exception as exc: logger.debug("commission query init: %s", exc) return {} if not getattr(td, "login_status", False): return {} if not hasattr(td, "reqQryInstrumentCommissionRate"): return {} self._ensure_commission_callback() reqid = int(getattr(td, "reqid", 0)) + 1 td.reqid = reqid ev = threading.Event() self._commission_waiters[reqid] = ev req = { "BrokerID": td.brokerid, "InvestorID": td.userid, "InstrumentID": sym, } ret = td.reqQryInstrumentCommissionRate(req, reqid) if ret != 0: self._commission_waiters.pop(reqid, None) return {} ev.wait(timeout=8) self._commission_waiters.pop(reqid, None) return self._commission_results.pop(reqid, {}) def get_account(self) -> dict[str, Any]: if not self._engine: return {} accounts = self._engine.get_all_accounts() if not accounts: return {} acc = accounts[0] return { "balance": float(getattr(acc, "balance", 0) or 0), "available": float(getattr(acc, "available", 0) or 0), "frozen": float(getattr(acc, "frozen", 0) or 0), "accountid": getattr(acc, "accountid", ""), } def list_positions(self) -> list[dict[str, Any]]: if not self._engine: return [] out: list[dict[str, Any]] = [] for pos in self._engine.get_all_positions(): vol = int(getattr(pos, "volume", 0) or 0) if vol <= 0: continue direction = getattr(pos, "direction", None) d = "long" if direction is not None and str(direction).endswith("SHORT"): d = "short" elif direction is not None and "空" in str(direction): d = "short" sym = getattr(pos, "symbol", "") or "" exchange = getattr(pos, "exchange", None) ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "") out.append({ "symbol": sym, "exchange": ex_name, "direction": d, "lots": vol, "avg_price": float(getattr(pos, "price", 0) or 0), "pnl": float(getattr(pos, "pnl", 0) or 0), "frozen": int(getattr(pos, "frozen", 0) or 0), }) return out def list_active_orders(self) -> list[dict[str, Any]]: if not self._engine: return [] out: list[dict[str, Any]] = [] try: orders = self._engine.get_all_active_orders() except Exception: return [] for order in orders or []: status = getattr(order, "status", None) status_s = str(status) if status_s and not any(x in status_s for x in ("NOTTRADED", "PARTTRADED", "SUBMITTING")): continue vol = int(getattr(order, "volume", 0) or 0) traded = int(getattr(order, "traded", 0) or 0) remain = max(0, vol - traded) if remain <= 0: continue direction = getattr(order, "direction", None) d = "long" if direction is not None and str(direction).endswith("SHORT"): d = "short" offset = getattr(order, "offset", None) offset_s = str(offset or "") sym = getattr(order, "symbol", "") or "" exchange = getattr(order, "exchange", None) ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "") out.append({ "symbol": sym, "exchange": ex_name, "direction": d, "lots": remain, "price": float(getattr(order, "price", 0) or 0), "offset": offset_s, "order_id": str(getattr(order, "orderid", "") or ""), "status": status_s, }) return out def send_order( self, *, ths_code: str, offset: str, direction: str, lots: int, price: float, order_type: str = "limit", ) -> str: from vnpy.trader.constant import Direction, Offset, OrderType from vnpy.trader.object import OrderRequest if not self._engine: raise RuntimeError("CTP 未初始化") sym, ex_name = ths_to_vnpy_symbol(ths_code) exchange = to_vnpy_exchange(ex_name) lots = max(1, int(lots)) price = float(price) offset = (offset or "open").lower() direction = (direction or "long").lower() if offset in ("open", "open_long", "open_short"): d = Direction.LONG if direction == "long" or offset == "open_long" else Direction.SHORT off = Offset.OPEN elif offset in ("close", "close_long", "close_short"): # 平多 = 卖;平空 = 买 if direction == "long" or offset == "close_long": d = Direction.SHORT else: d = Direction.LONG off = Offset.CLOSE else: raise ValueError(f"未知开平: {offset}") ot = OrderType.MARKET if (order_type or "limit").lower() == "market" else OrderType.LIMIT req = OrderRequest( symbol=sym, exchange=exchange, direction=d, type=ot, volume=lots, price=price, offset=off, ) vt_orderid = self._engine.send_order(req, GATEWAY_NAME) if not vt_orderid: raise RuntimeError("CTP 拒单或未返回委托号") return str(vt_orderid) def get_bridge() -> CtpBridge: global _bridge with _bridge_lock: if _bridge is None: _bridge = CtpBridge() return _bridge def try_init_vnpy(_settings: dict | None = None) -> bool: return get_bridge().available() def vnpy_available() -> bool: return get_bridge().available() def ctp_connect(mode: str, *, force: bool = False) -> dict[str, Any]: b = get_bridge() b.connect(mode, force=force) return b.status(mode) def ctp_try_auto_reconnect(mode: str) -> bool: """断线时静默重连;已连接且 ping 正常则直接返回 True。""" b = get_bridge() if not b.available(): return False st = _setting_for_mode(mode) if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"): return False if b.connected_mode == mode and b.ping(): return True try: b.connect(mode, force=True) return b.connected_mode == mode except Exception as exc: logger.info("CTP 自动重连失败: %s", exc) return False def ctp_status(mode: str) -> dict[str, Any]: return get_bridge().status(mode) def ctp_get_account(mode: str) -> dict[str, Any]: b = get_bridge() b.ensure_connected(mode) return b.get_account() def ctp_list_positions(mode: str) -> list[dict[str, Any]]: b = get_bridge() b.ensure_connected(mode) return b.list_positions() def ctp_list_active_orders(mode: str) -> list[dict[str, Any]]: b = get_bridge() b.ensure_connected(mode) return b.list_active_orders() def get_ctp_balance(mode: str) -> Optional[float]: try: acc = ctp_get_account(mode) bal = acc.get("balance") return float(bal) if bal else None except Exception as exc: logger.debug("get_ctp_balance: %s", exc) return None def execute_order( conn, *, mode: str, offset: str, symbol: str, direction: str, lots: int, price: float, settings: dict | None = None, order_type: str = "limit", ) -> dict[str, Any]: """统一下单:simulation=SimNow,live=期货公司 CTP。""" del conn, settings if mode not in ("simulation", "live"): raise ValueError("未知交易模式") if not vnpy_available(): raise ValueError( "请先安装 vnpy 与 vnpy_ctp:pip install vnpy vnpy_ctp\n" f"模拟盘需配置 .env 中 SIMNOW_USER / SIMNOW_PASSWORD 等" ) b = get_bridge() b.ensure_connected(mode) order_id = b.send_order( ths_code=symbol, offset=offset, direction=direction, lots=lots, price=price, order_type=order_type, ) return { "order_id": order_id, "mode": mode, "mode_label": _mode_label(mode), "symbol": symbol, "lots": lots, "price": price, }