From d46cd7c3e116d196aa91cd8a95717bede7c3e94e Mon Sep 17 00:00:00 2001 From: dekun Date: Fri, 3 Jul 2026 09:35:20 +0800 Subject: [PATCH] Use VeighNa OffsetConverter for SHFE close today/yesterday split. Feed CTP PositionDate-corrected positions to converter like CTA engine sell/cover. Co-authored-by: Cursor --- docs/CTP_LIVE.md | 6 +- modules/ctp/vnpy_bridge.py | 219 +++++++++++++++++++++++-------------- 2 files changed, 142 insertions(+), 83 deletions(-) diff --git a/docs/CTP_LIVE.md b/docs/CTP_LIVE.md index d5575fb..49d81a5 100644 --- a/docs/CTP_LIVE.md +++ b/docs/CTP_LIVE.md @@ -113,13 +113,13 @@ CTP_LIVE_ENV=实盘 | `offset=close_long` + 持多 | Direction.**SHORT**(卖平) | 见下表 | | `offset=close_short` + 持空 | Direction.**LONG**(买平) | 见下表 | -**开平标志** 由 `_resolve_close_offset()` 按 **交易所规则** 自动选择(与 SimNow/实盘 CTP 规范一致): +**开平标志** 由 VeighNa **`OffsetConverter`**(`vnpy/trader/converter.py`)自动转换:策略层发 `Offset.CLOSE`,框架按今/昨仓拆单为 `CLOSETODAY` / `CLOSEYESTERDAY`(与 CTA 引擎 `sell()`/`cover()` 一致)。持仓今昨数据来自 CTP `OnRspQryInvestorPosition` 的 **PositionDate** 字段,修正 vnpy 网关合并误差后喂给 OffsetConverter。 | 交易所 | 规则 | |--------|------| | **大商所(DCE)** 等 | 使用通用 **CLOSE** | -| **上期所(SHFE)、能源(INE)、郑商所(CZCE)、中金所(CFFEX)** | 区分 **平今 CLOSETODAY** / **平昨 CLOSEYESTERDAY** | -| 选择依据 | CTP `OnRspQryInvestorPosition` 的 **PositionDate**(1=今仓、2=昨仓);上期所今昨为**两条独立回报**,与同花顺/快期一致 | +| **上期所(SHFE)、能源(INE)** | OffsetConverter 按今/昨可平量自动拆单 | +| 持仓来源 | CTP **PositionDate**(1=今仓、2=昨仓)缓存,修正 `PositionData.yd_volume` | 这与国内期货公司 CTP 客户端、快期等 **标准投机平仓逻辑** 一致。 diff --git a/modules/ctp/vnpy_bridge.py b/modules/ctp/vnpy_bridge.py index ee21e43..5fe59d2 100644 --- a/modules/ctp/vnpy_bridge.py +++ b/modules/ctp/vnpy_bridge.py @@ -315,6 +315,16 @@ def _is_long_direction(direction_obj: Any) -> bool: return "LONG" in s.upper() or "多" in s +class _OmsContractAdapter: + """VeighNa OffsetConverter 仅需 get_contract,用 MainEngine 适配。""" + + def __init__(self, main_engine: Any) -> None: + self._engine = main_engine + + def get_contract(self, vt_symbol: str) -> Any: + return self._engine.get_contract(vt_symbol) + + class CtpBridge: def __init__(self) -> None: self._engine = None @@ -347,6 +357,7 @@ class CtpBridge: self._last_open_cost_query_ts: float = 0.0 self._ctp_pos_legs: dict[str, dict[str, Any]] = {} self._ctp_pos_leg_query_active = False + self._offset_converter: Any = None self._margin_hooked = False self._trade_hooked = False self._trade_query_results: list[dict[str, Any]] = [] @@ -394,6 +405,7 @@ class CtpBridge: from modules.ctp.ctp_trading_state import trading_state pos = event.data + self._sync_offset_converter_position(pos) row = self._position_row_from_vnpy(pos) if row: sym = row.get("symbol") or "" @@ -445,6 +457,11 @@ class CtpBridge: from modules.ctp.ctp_trading_state import trading_state order = event.data + if self._offset_converter: + try: + self._offset_converter.update_order(order) + except Exception as exc: + logger.debug("offset converter order: %s", exc) row = self._order_row_from_vnpy(order) if not row: return @@ -476,6 +493,11 @@ class CtpBridge: def _on_trade(event) -> None: try: trade = event.data + if self._offset_converter: + try: + self._offset_converter.update_trade(trade) + except Exception as exc: + logger.debug("offset converter trade: %s", exc) row = self._trade_row_from_vnpy(trade) if row and row.get("offset") == "open": sym = row.get("symbol") or "" @@ -680,6 +702,7 @@ class CtpBridge: self._margin_rate_hooked = False self._ctp_pos_legs.clear() self._ctp_pos_leg_query_active = False + self._offset_converter = None self._last_position_query_ts = 0.0 self._last_instruments_ready_ts = 0.0 try: @@ -1091,6 +1114,50 @@ class CtpBridge: frozen = int(getattr(pos, "frozen", 0) or 0) return td, yd, frozen, vol + def _ensure_offset_converter(self) -> None: + if self._offset_converter or not self._engine: + return + try: + from vnpy.trader.converter import OffsetConverter + except ImportError: + logger.debug("vnpy OffsetConverter unavailable") + return + self._offset_converter = OffsetConverter(_OmsContractAdapter(self._engine)) + try: + for pos in self._engine_collection_items(self._engine.get_all_positions()): + self._sync_offset_converter_position(pos) + except Exception as exc: + logger.debug("offset converter seed: %s", exc) + + def _patch_position_yd_from_ctp(self, pos: Any) -> None: + """用 CTP PositionDate 回报修正 vnpy PositionData.yd_volume,供 OffsetConverter 使用。""" + sym = str(getattr(pos, "symbol", "") or "") + hold = "long" if _is_long_direction(getattr(pos, "direction", None)) else "short" + ctp = self._position_td_yd_from_ctp_legs(sym, hold) + if ctp is None: + return + td, yd, _, vol = ctp + try: + pos.yd_volume = yd + cur_vol = int(getattr(pos, "volume", 0) or 0) + if vol > 0 and cur_vol != vol: + pos.volume = vol + except Exception as exc: + logger.debug("patch position yd: %s", exc) + + def _sync_offset_converter_position(self, pos: Any) -> None: + self._ensure_offset_converter() + if not self._offset_converter or not pos: + return + try: + vol = int(getattr(pos, "volume", 0) or 0) + if vol <= 0: + return + self._patch_position_yd_from_ctp(pos) + self._offset_converter.update_position(pos) + except Exception as exc: + logger.debug("offset converter position: %s", exc) + def _refresh_ctp_position_legs_for_close( self, sym: str, hold_direction: str, *, timeout: float = 3.0, ) -> bool: @@ -1108,54 +1175,7 @@ class CtpBridge: time.sleep(0.1) return False - def _resolve_close_legs( - self, sym: str, ex_name: str, hold_direction: str, lots: int, - ) -> list[tuple[Any, int]]: - from vnpy.trader.constant import Offset - - ex_u = (ex_name or "").upper() - lots = max(1, int(lots or 1)) - if ex_u not in ("CZCE", "CFFEX", "SHFE", "INE"): - return [(Offset.CLOSE, lots)] - hold = (hold_direction or "long").strip().lower() - if ex_u in ("SHFE", "INE"): - self._refresh_ctp_position_legs_for_close(sym, hold) - ctp_legs = self._position_td_yd_from_ctp_legs(sym, hold) - td, yd, frozen, vol = self._position_td_yd(sym, ex_name, hold_direction) - td_close = max(0, td) - yd_close = max(0, yd) - if frozen > 0: - cut = min(frozen, td_close) - td_close -= cut - frozen -= cut - yd_close = max(0, yd_close - frozen) - logger.info( - "close legs %s %s hold=%s want=%s td=%s yd=%s frozen=%s source=%s", - sym, ex_u, hold, lots, td_close, yd_close, frozen, - "ctp_position_date" if ctp_legs is not None else "vnpy_fallback", - ) - legs: list[tuple[Any, int]] = [] - remain = lots - if td_close > 0 and remain > 0: - take = min(remain, td_close) - legs.append((Offset.CLOSETODAY, take)) - remain -= take - if yd_close > 0 and remain > 0: - take = min(remain, yd_close) - legs.append((Offset.CLOSEYESTERDAY, take)) - remain -= take - if remain > 0: - raise ValueError( - f"可平仓位不足:今仓可平{td_close}手、昨仓可平{yd_close}手," - f"需平{lots}手(请检查未成交平仓挂单是否占用仓位)" - ) - return legs if legs else [(Offset.CLOSETODAY, lots)] - - def _resolve_close_offset(self, sym: str, ex_name: str, hold_direction: str, lots: int) -> Any: - legs = self._resolve_close_legs(sym, ex_name, hold_direction, lots) - return legs[0][0] - - def _send_close_leg( + def _submit_close_orders( self, *, ths_code: str, @@ -1163,25 +1183,39 @@ class CtpBridge: ex_name: str, exchange: Any, direction: Any, + hold: str, lots: int, - primary_off: Any, order_type: Any, price: float, tick: float, use_market: bool, ) -> str: + """平仓:VeighNa OffsetConverter 自动拆分平今/平昨(与 CTA 引擎一致)。""" + from vnpy.trader.constant import Offset from vnpy.trader.object import OrderRequest lots = max(1, int(lots)) + ex_u = (ex_name or "").upper() + if ex_u in ("SHFE", "INE"): + self._refresh_ctp_position_legs_for_close(sym, hold) + self._ensure_offset_converter() + if self._offset_converter: + for pos in self._engine_collection_items(self._engine.get_all_positions()): + ps = (getattr(pos, "symbol", "") or "").lower() + if ps != sym.lower(): + continue + pd = "long" if _is_long_direction(getattr(pos, "direction", None)) else "short" + if pd == hold: + self._sync_offset_converter_position(pos) + lp = float(price) if use_market: - lp = self._aggressive_limit_price( - ths_code, sym, ex_name, direction, tick, lp, - ) + lp = self._aggressive_limit_price(ths_code, sym, ex_name, direction, tick, lp) else: lp = round_to_tick(lp, tick) if lp <= 0: raise ValueError("委托价格无效,请检查行情或手动填写价格") + req = OrderRequest( symbol=sym, exchange=exchange, @@ -1189,19 +1223,48 @@ class CtpBridge: type=order_type, volume=lots, price=lp, - offset=primary_off, + offset=Offset.CLOSE, ) - logger.info( - "CTP 报单 %s %s %s %s手 @%s offset=%s type=%s", - sym, ex_name, direction, lots, lp, primary_off, order_type, - ) - with _ctp_td_lock: - vt_orderid = self._engine.send_order(req, GATEWAY_NAME) - if not vt_orderid: - raise RuntimeError( - "CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)" + if not self._offset_converter: + raise RuntimeError("VeighNa OffsetConverter 未初始化,无法平仓") + req_list = self._offset_converter.convert_order_request(req, lock=False, net=False) + if not req_list: + raise ValueError( + "可平仓位不足(OffsetConverter 返回空列表,请检查挂单冻结或持仓同步)" ) - return str(vt_orderid) + logger.info( + "OffsetConverter %s %s close %s手 -> %s", + sym, + hold, + lots, + [f"{getattr(r.offset, 'value', r.offset)}:{int(r.volume)}" for r in req_list], + ) + last_vt = "" + for sub in req_list: + sub_price = lp + if use_market: + sub_price = self._aggressive_limit_price( + ths_code, sym, ex_name, sub.direction, tick, float(price), + ) + else: + sub_price = round_to_tick(float(sub.price or lp), tick) + sub.price = sub_price + logger.info( + "CTP 报单 %s %s %s %s手 @%s offset=%s type=%s", + sym, ex_name, sub.direction, sub.volume, sub_price, sub.offset, order_type, + ) + with _ctp_td_lock: + vt_orderid = self._engine.send_order(sub, GATEWAY_NAME) + if not vt_orderid: + raise RuntimeError( + "CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)" + ) + last_vt = str(vt_orderid) + try: + self._offset_converter.update_order_request(sub, last_vt) + except Exception as exc: + logger.debug("offset converter order req: %s", exc) + return last_vt def _aggressive_limit_price( self, @@ -2481,23 +2544,19 @@ class CtpBridge: price = self._aggressive_limit_price(ths_code, sym, ex_name, d, tick, price) if price <= 0: raise ValueError("委托价格无效,请检查行情或手动填写价格") - close_legs = self._resolve_close_legs(sym, ex_name, hold, lots) - last_vt = "" - for off, leg_lots in close_legs: - last_vt = self._send_close_leg( - ths_code=ths_code, - sym=sym, - ex_name=ex_name, - exchange=exchange, - direction=d, - lots=leg_lots, - primary_off=off, - order_type=ot, - price=price, - tick=tick, - use_market=use_market, - ) - return last_vt + return self._submit_close_orders( + ths_code=ths_code, + sym=sym, + ex_name=ex_name, + exchange=exchange, + direction=d, + hold=hold, + lots=lots, + order_type=ot, + price=price, + tick=tick, + use_market=use_market, + ) raise ValueError(f"未知开平: {offset}") def cancel_order(self, vt_orderid: str) -> bool: