diff --git a/docs/CTP_LIVE.md b/docs/CTP_LIVE.md index e25b6bc..d5575fb 100644 --- a/docs/CTP_LIVE.md +++ b/docs/CTP_LIVE.md @@ -119,7 +119,7 @@ CTP_LIVE_ENV=实盘 |--------|------| | **大商所(DCE)** 等 | 使用通用 **CLOSE** | | **上期所(SHFE)、能源(INE)、郑商所(CZCE)、中金所(CFFEX)** | 区分 **平今 CLOSETODAY** / **平昨 CLOSEYESTERDAY** | -| 选择依据 | 查询持仓的今仓 `td_volume`、昨仓 `yd_volume`,优先平今再平昨 | +| 选择依据 | CTP `OnRspQryInvestorPosition` 的 **PositionDate**(1=今仓、2=昨仓);上期所今昨为**两条独立回报**,与同花顺/快期一致 | 这与国内期货公司 CTP 客户端、快期等 **标准投机平仓逻辑** 一致。 diff --git a/modules/ctp/vnpy_bridge.py b/modules/ctp/vnpy_bridge.py index 327dcc3..ee21e43 100644 --- a/modules/ctp/vnpy_bridge.py +++ b/modules/ctp/vnpy_bridge.py @@ -345,6 +345,8 @@ class CtpBridge: self._position_open_avg: dict[str, float] = {} self._position_open_cost_acc: dict[str, dict[str, float]] = {} self._last_open_cost_query_ts: float = 0.0 + self._ctp_pos_legs: dict[str, dict[str, Any]] = {} + self._ctp_pos_leg_query_active = False self._margin_hooked = False self._trade_hooked = False self._trade_query_results: list[dict[str, Any]] = [] @@ -676,6 +678,8 @@ class CtpBridge: self._hooks_td_api_id = None self._instrument_hooked = False self._margin_rate_hooked = False + self._ctp_pos_legs.clear() + self._ctp_pos_leg_query_active = False self._last_position_query_ts = 0.0 self._last_instruments_ready_ts = 0.0 try: @@ -986,12 +990,80 @@ class CtpBridge: logger.debug("find position: %s", exc) return None + @staticmethod + def _ctp_hold_from_posi_direction(posi_direction: str) -> str: + """CTP PosiDirection: 2=多头持仓 3=空头持仓。""" + return "short" if str(posi_direction or "").strip() == "3" else "long" + + @staticmethod + def _ctp_is_today_position_date(position_date: str) -> bool: + """CTP PositionDate: 1=今仓 2=昨仓(上期所/能源分两条回报)。""" + pd = str(position_date or "1").strip() + if pd in ("2", "History", "HISTORY", "history", "昨"): + return False + return True + + def _ingest_ctp_position_leg(self, data: dict) -> None: + """按 CTP 官方 PositionDate 缓存今/昨仓(与同花顺/快期一致)。""" + sym = str(data.get("InstrumentID") or "").strip() + if not sym: + return + hold = self._ctp_hold_from_posi_direction(data.get("PosiDirection")) + position_date = str(data.get("PositionDate") or "1") + lots = int(data.get("Position") or 0) + key = f"{sym.lower()}|{hold}|{position_date}" + if lots <= 0: + self._ctp_pos_legs.pop(key, None) + return + if hold == "short": + frozen = int(data.get("LongFrozen") or 0) + else: + frozen = int(data.get("ShortFrozen") or 0) + self._ctp_pos_legs[key] = { + "symbol": sym, + "direction": hold, + "position_date": position_date, + "lots": lots, + "today_position": int(data.get("TodayPosition") or 0), + "yd_position": int(data.get("YdPosition") or 0), + "frozen": frozen, + "exchange": str(data.get("ExchangeID") or ""), + } + + def _position_td_yd_from_ctp_legs( + self, sym: str, hold_direction: str, + ) -> Optional[tuple[int, int, int, int]]: + sym_l = (sym or "").lower() + hold = (hold_direction or "long").strip().lower() + td = yd = 0 + frozen = 0 + found = False + for leg in self._ctp_pos_legs.values(): + if (leg.get("symbol") or "").lower() != sym_l: + continue + if (leg.get("direction") or "long") != hold: + continue + found = True + lots = int(leg.get("lots") or 0) + fz = int(leg.get("frozen") or 0) + frozen += fz + if self._ctp_is_today_position_date(leg.get("position_date")): + td += lots + else: + yd += lots + if not found: + return None + return td, yd, frozen, td + yd + def _position_td_yd( self, sym: str, ex_name: str, hold_direction: str, ) -> tuple[int, int, int, int]: - """返回 (今仓, 昨仓, 冻结, 总持仓)。""" - sym_l = (sym or "").lower() + """返回 (今仓, 昨仓, 冻结, 总持仓)。优先 CTP PositionDate 回报。""" hold = (hold_direction or "long").strip().lower() + ctp = self._position_td_yd_from_ctp_legs(sym, hold) + if ctp is not None: + return ctp + sym_l = (sym or "").lower() for p in self._collect_positions(): ps = (p.get("symbol") or "").lower() if ps != sym_l: @@ -1019,6 +1091,23 @@ class CtpBridge: frozen = int(getattr(pos, "frozen", 0) or 0) return td, yd, frozen, vol + def _refresh_ctp_position_legs_for_close( + self, sym: str, hold_direction: str, *, timeout: float = 3.0, + ) -> bool: + """平仓前确保已收到 CTP PositionDate 持仓回报。""" + if self._position_td_yd_from_ctp_legs(sym, hold_direction) is not None: + return True + try: + self.request_position_snapshot(force=True) + except Exception as exc: + logger.debug("position snapshot for close: %s", exc) + deadline = time.time() + max(0.5, float(timeout)) + while time.time() < deadline: + if self._position_td_yd_from_ctp_legs(sym, hold_direction) is not None: + return True + time.sleep(0.1) + return False + def _resolve_close_legs( self, sym: str, ex_name: str, hold_direction: str, lots: int, ) -> list[tuple[Any, int]]: @@ -1028,6 +1117,10 @@ class CtpBridge: lots = max(1, int(lots or 1)) if ex_u not in ("CZCE", "CFFEX", "SHFE", "INE"): return [(Offset.CLOSE, lots)] + hold = (hold_direction or "long").strip().lower() + if ex_u in ("SHFE", "INE"): + self._refresh_ctp_position_legs_for_close(sym, hold) + ctp_legs = self._position_td_yd_from_ctp_legs(sym, hold) td, yd, frozen, vol = self._position_td_yd(sym, ex_name, hold_direction) td_close = max(0, td) yd_close = max(0, yd) @@ -1036,9 +1129,10 @@ class CtpBridge: td_close -= cut frozen -= cut yd_close = max(0, yd_close - frozen) - logger.debug( - "close legs %s %s hold=%s lots=%s td=%s yd=%s frozen=%s vol=%s", - sym, ex_u, hold_direction, lots, td_close, yd_close, frozen, vol, + logger.info( + "close legs %s %s hold=%s want=%s td=%s yd=%s frozen=%s source=%s", + sym, ex_u, hold, lots, td_close, yd_close, frozen, + "ctp_position_date" if ctp_legs is not None else "vnpy_fallback", ) legs: list[tuple[Any, int]] = [] remain = lots @@ -1051,42 +1145,16 @@ class CtpBridge: legs.append((Offset.CLOSEYESTERDAY, take)) remain -= take if remain > 0: - # vnpy 今昨拆分不可靠时,先按今仓尝试,拒单后自动改平昨 - legs.append((Offset.CLOSETODAY, remain)) + raise ValueError( + f"可平仓位不足:今仓可平{td_close}手、昨仓可平{yd_close}手," + f"需平{lots}手(请检查未成交平仓挂单是否占用仓位)" + ) return legs if legs else [(Offset.CLOSETODAY, lots)] def _resolve_close_offset(self, sym: str, ex_name: str, hold_direction: str, lots: int) -> Any: legs = self._resolve_close_legs(sym, ex_name, hold_direction, lots) return legs[0][0] - def _wait_order_terminal(self, vt_orderid: str, *, timeout: float = 2.5) -> dict[str, Any]: - if not self._engine or not vt_orderid: - return {"pending": True} - deadline = time.time() + max(0.5, float(timeout)) - while time.time() < deadline: - try: - order = self._engine.get_order(vt_orderid) - except Exception: - order = None - if order is None: - time.sleep(0.08) - continue - status_s = str(getattr(order, "status", "") or "") - traded = int(getattr(order, "traded", 0) or 0) - vol = int(getattr(order, "volume", 0) or 0) - if traded > 0 and ( - "ALLTRADED" in status_s or "全部成交" in status_s or traded >= vol - ): - return {"filled": True, "traded": traded, "status": status_s} - if traded > 0: - return {"partial": True, "traded": traded, "status": status_s} - if any(x in status_s for x in ("REJECTED", "拒单")): - return {"rejected": True, "status": status_s} - if any(x in status_s for x in ("CANCELLED", "已撤销")): - return {"cancelled": True, "status": status_s} - time.sleep(0.1) - return {"pending": True} - def _send_close_leg( self, *, @@ -1102,67 +1170,38 @@ class CtpBridge: tick: float, use_market: bool, ) -> str: - from vnpy.trader.constant import Offset, OrderType from vnpy.trader.object import OrderRequest lots = max(1, int(lots)) - alt_off = ( - Offset.CLOSEYESTERDAY - if primary_off == Offset.CLOSETODAY - else Offset.CLOSETODAY + lp = float(price) + if use_market: + lp = self._aggressive_limit_price( + ths_code, sym, ex_name, direction, tick, lp, + ) + else: + lp = round_to_tick(lp, tick) + if lp <= 0: + raise ValueError("委托价格无效,请检查行情或手动填写价格") + req = OrderRequest( + symbol=sym, + exchange=exchange, + direction=direction, + type=order_type, + volume=lots, + price=lp, + offset=primary_off, ) - ex_u = (ex_name or "").upper() - candidates = [primary_off] - if ex_u in ("SHFE", "INE", "CZCE", "CFFEX") and alt_off != primary_off: - candidates.append(alt_off) - last_vt = "" - for idx, off in enumerate(candidates): - lp = float(price) - if use_market: - lp = self._aggressive_limit_price( - ths_code, sym, ex_name, direction, tick, lp, - ) - else: - lp = round_to_tick(lp, tick) - if lp <= 0: - raise ValueError("委托价格无效,请检查行情或手动填写价格") - req = OrderRequest( - symbol=sym, - exchange=exchange, - direction=direction, - type=order_type, - volume=lots, - price=lp, - offset=off, + logger.info( + "CTP 报单 %s %s %s %s手 @%s offset=%s type=%s", + sym, ex_name, direction, lots, lp, primary_off, order_type, + ) + with _ctp_td_lock: + vt_orderid = self._engine.send_order(req, GATEWAY_NAME) + if not vt_orderid: + raise RuntimeError( + "CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)" ) - logger.info( - "CTP 报单 %s %s %s %s手 @%s offset=%s type=%s", - sym, ex_name, direction, lots, lp, off, order_type, - ) - with _ctp_td_lock: - vt_orderid = self._engine.send_order(req, GATEWAY_NAME) - if not vt_orderid: - if idx + 1 < len(candidates): - logger.warning("CTP close no order id offset=%s, try alternate", off) - continue - raise RuntimeError( - "CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)" - ) - last_vt = str(vt_orderid) - state = self._wait_order_terminal(last_vt) - if state.get("filled") or state.get("partial"): - return last_vt - if state.get("rejected") and idx + 1 < len(candidates): - logger.warning( - "CTP close rejected offset=%s status=%s, retry alternate", - off, state.get("status"), - ) - continue - if state.get("pending"): - return last_vt - if state.get("rejected"): - raise RuntimeError(f"CTP 平仓被拒 offset={off} status={state.get('status')}") - return last_vt + return str(vt_orderid) def _aggressive_limit_price( self, @@ -1384,11 +1423,17 @@ class CtpBridge: data: dict, error: dict, reqid: int, last: bool, ) -> None: try: + if data: + if not bridge._ctp_pos_leg_query_active: + bridge._ctp_pos_legs.clear() + bridge._ctp_pos_leg_query_active = True + bridge._ingest_ctp_position_leg(data) if data: if not bridge._position_open_cost_acc: bridge._position_open_avg.clear() bridge._ingest_position_open_cost(data) if last: + bridge._ctp_pos_leg_query_active = False bridge._finalize_position_open_cost_acc() except Exception as exc: logger.debug("position open avg cache: %s", exc)