diff --git a/modules/ctp/vnpy_bridge.py b/modules/ctp/vnpy_bridge.py index ce8e722..327dcc3 100644 --- a/modules/ctp/vnpy_bridge.py +++ b/modules/ctp/vnpy_bridge.py @@ -986,48 +986,183 @@ class CtpBridge: logger.debug("find position: %s", exc) return None - def _resolve_close_offset(self, sym: str, ex_name: str, hold_direction: str, lots: int) -> Any: + def _position_td_yd( + self, sym: str, ex_name: str, hold_direction: str, + ) -> tuple[int, int, int, int]: + """返回 (今仓, 昨仓, 冻结, 总持仓)。""" + sym_l = (sym or "").lower() + hold = (hold_direction or "long").strip().lower() + for p in self._collect_positions(): + ps = (p.get("symbol") or "").lower() + if ps != sym_l: + continue + if (p.get("direction") or "long").strip().lower() != hold: + continue + vol = int(p.get("lots") or 0) + td = int(p.get("td_volume") or 0) + yd = int(p.get("yd_volume") or 0) + frozen = int(p.get("frozen") or 0) + if td + yd <= 0 and vol > 0: + td = max(0, vol - yd) + return td, yd, frozen, vol + pos = self._find_position(sym, ex_name, hold_direction) + if not pos: + return 0, 0, 0, 0 + vol = int(getattr(pos, "volume", 0) or 0) + yd = int(getattr(pos, "yd_volume", 0) or 0) + td_attr = getattr(pos, "today_volume", None) + if td_attr is not None: + td = int(td_attr or 0) + yd = max(0, vol - td) + else: + td = max(0, vol - yd) + frozen = int(getattr(pos, "frozen", 0) or 0) + return td, yd, frozen, vol + + def _resolve_close_legs( + self, sym: str, ex_name: str, hold_direction: str, lots: int, + ) -> list[tuple[Any, int]]: from vnpy.trader.constant import Offset ex_u = (ex_name or "").upper() - # 上期所/能源中心/郑商所/中金所须区分平今/平昨;大商所等可用通用 CLOSE + lots = max(1, int(lots or 1)) if ex_u not in ("CZCE", "CFFEX", "SHFE", "INE"): - return Offset.CLOSE - pos = self._find_position(sym, ex_u, hold_direction) - if not pos: - for p in self._collect_positions(): - ps = (p.get("symbol") or "").lower() - if ps != sym.lower(): + return [(Offset.CLOSE, lots)] + td, yd, frozen, vol = self._position_td_yd(sym, ex_name, hold_direction) + td_close = max(0, td) + yd_close = max(0, yd) + if frozen > 0: + cut = min(frozen, td_close) + td_close -= cut + frozen -= cut + yd_close = max(0, yd_close - frozen) + logger.debug( + "close legs %s %s hold=%s lots=%s td=%s yd=%s frozen=%s vol=%s", + sym, ex_u, hold_direction, lots, td_close, yd_close, frozen, vol, + ) + legs: list[tuple[Any, int]] = [] + remain = lots + if td_close > 0 and remain > 0: + take = min(remain, td_close) + legs.append((Offset.CLOSETODAY, take)) + remain -= take + if yd_close > 0 and remain > 0: + take = min(remain, yd_close) + legs.append((Offset.CLOSEYESTERDAY, take)) + remain -= take + if remain > 0: + # vnpy 今昨拆分不可靠时,先按今仓尝试,拒单后自动改平昨 + legs.append((Offset.CLOSETODAY, remain)) + return legs if legs else [(Offset.CLOSETODAY, lots)] + + def _resolve_close_offset(self, sym: str, ex_name: str, hold_direction: str, lots: int) -> Any: + legs = self._resolve_close_legs(sym, ex_name, hold_direction, lots) + return legs[0][0] + + def _wait_order_terminal(self, vt_orderid: str, *, timeout: float = 2.5) -> dict[str, Any]: + if not self._engine or not vt_orderid: + return {"pending": True} + deadline = time.time() + max(0.5, float(timeout)) + while time.time() < deadline: + try: + order = self._engine.get_order(vt_orderid) + except Exception: + order = None + if order is None: + time.sleep(0.08) + continue + status_s = str(getattr(order, "status", "") or "") + traded = int(getattr(order, "traded", 0) or 0) + vol = int(getattr(order, "volume", 0) or 0) + if traded > 0 and ( + "ALLTRADED" in status_s or "全部成交" in status_s or traded >= vol + ): + return {"filled": True, "traded": traded, "status": status_s} + if traded > 0: + return {"partial": True, "traded": traded, "status": status_s} + if any(x in status_s for x in ("REJECTED", "拒单")): + return {"rejected": True, "status": status_s} + if any(x in status_s for x in ("CANCELLED", "已撤销")): + return {"cancelled": True, "status": status_s} + time.sleep(0.1) + return {"pending": True} + + def _send_close_leg( + self, + *, + ths_code: str, + sym: str, + ex_name: str, + exchange: Any, + direction: Any, + lots: int, + primary_off: Any, + order_type: Any, + price: float, + tick: float, + use_market: bool, + ) -> str: + from vnpy.trader.constant import Offset, OrderType + from vnpy.trader.object import OrderRequest + + lots = max(1, int(lots)) + alt_off = ( + Offset.CLOSEYESTERDAY + if primary_off == Offset.CLOSETODAY + else Offset.CLOSETODAY + ) + ex_u = (ex_name or "").upper() + candidates = [primary_off] + if ex_u in ("SHFE", "INE", "CZCE", "CFFEX") and alt_off != primary_off: + candidates.append(alt_off) + last_vt = "" + for idx, off in enumerate(candidates): + lp = float(price) + if use_market: + lp = self._aggressive_limit_price( + ths_code, sym, ex_name, direction, tick, lp, + ) + else: + lp = round_to_tick(lp, tick) + if lp <= 0: + raise ValueError("委托价格无效,请检查行情或手动填写价格") + req = OrderRequest( + symbol=sym, + exchange=exchange, + direction=direction, + type=order_type, + volume=lots, + price=lp, + offset=off, + ) + logger.info( + "CTP 报单 %s %s %s %s手 @%s offset=%s type=%s", + sym, ex_name, direction, lots, lp, off, order_type, + ) + with _ctp_td_lock: + vt_orderid = self._engine.send_order(req, GATEWAY_NAME) + if not vt_orderid: + if idx + 1 < len(candidates): + logger.warning("CTP close no order id offset=%s, try alternate", off) continue - if (p.get("direction") or "long") != hold_direction: - continue - td = int(p.get("td_volume") or 0) - yd = int(p.get("yd_volume") or 0) - if td >= lots: - return Offset.CLOSETODAY - if yd >= lots: - return Offset.CLOSEYESTERDAY - if td + yd >= lots: - return Offset.CLOSETODAY - break - if ex_u in ("SHFE", "INE", "CZCE"): - return Offset.CLOSETODAY - return Offset.CLOSE - vol = int(getattr(pos, "volume", 0) or 0) - yd = int(getattr(pos, "yd_volume", 0) or 0) - td = max(0, vol - yd) - if td >= lots: - return Offset.CLOSETODAY - if yd >= lots: - # 夜盘仓在 vnpy 常记在 yd,日盘平昨易报「平昨仓位不足」→ 上期所/能源优先平今 - if ex_u in ("SHFE", "INE"): - return Offset.CLOSETODAY - return Offset.CLOSEYESTERDAY - if td + yd >= lots: - return Offset.CLOSETODAY - if ex_u in ("SHFE", "INE", "CZCE"): - return Offset.CLOSETODAY - return Offset.CLOSE + raise RuntimeError( + "CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)" + ) + last_vt = str(vt_orderid) + state = self._wait_order_terminal(last_vt) + if state.get("filled") or state.get("partial"): + return last_vt + if state.get("rejected") and idx + 1 < len(candidates): + logger.warning( + "CTP close rejected offset=%s status=%s, retry alternate", + off, state.get("status"), + ) + continue + if state.get("pending"): + return last_vt + if state.get("rejected"): + raise RuntimeError(f"CTP 平仓被拒 offset={off} status={state.get('status')}") + return last_vt def _aggressive_limit_price( self, @@ -2256,44 +2391,69 @@ class CtpBridge: if offset in ("open", "open_long", "open_short"): d = Direction.LONG if direction == "long" or offset == "open_long" else Direction.SHORT off = Offset.OPEN + use_market = (order_type or "limit").lower() == "market" + if use_market: + ot = OrderType.FAK + price = self._aggressive_limit_price(ths_code, sym, ex_name, d, tick, price) + else: + ot = OrderType.LIMIT + price = round_to_tick(float(price), tick) + if price <= 0: + raise ValueError("委托价格无效,请检查行情或手动填写价格") + req = OrderRequest( + symbol=sym, + exchange=exchange, + direction=d, + type=ot, + volume=lots, + price=price, + offset=off, + ) + logger.info( + "CTP 报单 %s %s %s %s手 @%s offset=%s type=%s", + sym, ex_name, d, lots, price, off, ot, + ) + with _ctp_td_lock: + vt_orderid = self._engine.send_order(req, GATEWAY_NAME) + if not vt_orderid: + raise RuntimeError( + "CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)" + ) + return str(vt_orderid) elif offset in ("close", "close_long", "close_short"): hold = "long" if direction == "long" or offset == "close_long" else "short" if hold == "long": d = Direction.SHORT else: d = Direction.LONG - off = self._resolve_close_offset(sym, ex_name, hold, lots) - else: - raise ValueError(f"未知开平: {offset}") - - use_market = (order_type or "limit").lower() == "market" - if use_market: - ot = OrderType.FAK - price = self._aggressive_limit_price(ths_code, sym, ex_name, d, tick, price) - else: - ot = OrderType.LIMIT - price = round_to_tick(float(price), tick) - if price <= 0: - raise ValueError("委托价格无效,请检查行情或手动填写价格") - - req = OrderRequest( - symbol=sym, - exchange=exchange, - direction=d, - type=ot, - volume=lots, - price=price, - offset=off, - ) - logger.info( - "CTP 报单 %s %s %s %s手 @%s offset=%s type=%s", - sym, ex_name, d, lots, price, off, ot, - ) - with _ctp_td_lock: - vt_orderid = self._engine.send_order(req, GATEWAY_NAME) - if not vt_orderid: - raise RuntimeError("CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)") - return str(vt_orderid) + use_market = (order_type or "limit").lower() == "market" + if use_market: + ot = OrderType.FAK + else: + ot = OrderType.LIMIT + price = round_to_tick(float(price), tick) + if use_market: + price = self._aggressive_limit_price(ths_code, sym, ex_name, d, tick, price) + if price <= 0: + raise ValueError("委托价格无效,请检查行情或手动填写价格") + close_legs = self._resolve_close_legs(sym, ex_name, hold, lots) + last_vt = "" + for off, leg_lots in close_legs: + last_vt = self._send_close_leg( + ths_code=ths_code, + sym=sym, + ex_name=ex_name, + exchange=exchange, + direction=d, + lots=leg_lots, + primary_off=off, + order_type=ot, + price=price, + tick=tick, + use_market=use_market, + ) + return last_vt + raise ValueError(f"未知开平: {offset}") def cancel_order(self, vt_orderid: str) -> bool: if not self._engine or not vt_orderid: