Use VeighNa OffsetConverter for SHFE close today/yesterday split.

Feed CTP PositionDate-corrected positions to converter like CTA engine sell/cover.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-03 09:35:20 +08:00
parent 2ec534c547
commit d46cd7c3e1
2 changed files with 142 additions and 83 deletions
+3 -3
View File
@@ -113,13 +113,13 @@ CTP_LIVE_ENV=实盘
| `offset=close_long` + 持多 | Direction.**SHORT**(卖平) | 见下表 |
| `offset=close_short` + 持空 | Direction.**LONG**(买平) | 见下表 |
**开平标志**`_resolve_close_offset()`**交易所规则** 自动选择(与 SimNow/实盘 CTP 规范一致):
**开平标志**VeighNa **`OffsetConverter`**`vnpy/trader/converter.py`)自动转换:策略层发 `Offset.CLOSE`,框架按今/昨仓拆单为 `CLOSETODAY` / `CLOSEYESTERDAY`(与 CTA 引擎 `sell()`/`cover()` 一致)。持仓今昨数据来自 CTP `OnRspQryInvestorPosition`**PositionDate** 字段,修正 vnpy 网关合并误差后喂给 OffsetConverter。
| 交易所 | 规则 |
|--------|------|
| **大商所(DCE** 等 | 使用通用 **CLOSE** |
| **上期所(SHFE)、能源(INE、郑商所(CZCE)、中金所(CFFEX)** | 区分 **平今 CLOSETODAY** / **平昨 CLOSEYESTERDAY** |
| 选择依据 | CTP `OnRspQryInvestorPosition`**PositionDate**(1=今仓、2=昨仓);上期所今昨为**两条独立回报**,与同花顺/快期一致 |
| **上期所(SHFE)、能源(INE** | OffsetConverter 按今/昨可平量自动拆单 |
| 持仓来源 | CTP **PositionDate**1=今仓、2=昨仓)缓存,修正 `PositionData.yd_volume` |
这与国内期货公司 CTP 客户端、快期等 **标准投机平仓逻辑** 一致。
+122 -63
View File
@@ -315,6 +315,16 @@ def _is_long_direction(direction_obj: Any) -> bool:
return "LONG" in s.upper() or "" in s
class _OmsContractAdapter:
"""VeighNa OffsetConverter 仅需 get_contract,用 MainEngine 适配。"""
def __init__(self, main_engine: Any) -> None:
self._engine = main_engine
def get_contract(self, vt_symbol: str) -> Any:
return self._engine.get_contract(vt_symbol)
class CtpBridge:
def __init__(self) -> None:
self._engine = None
@@ -347,6 +357,7 @@ class CtpBridge:
self._last_open_cost_query_ts: float = 0.0
self._ctp_pos_legs: dict[str, dict[str, Any]] = {}
self._ctp_pos_leg_query_active = False
self._offset_converter: Any = None
self._margin_hooked = False
self._trade_hooked = False
self._trade_query_results: list[dict[str, Any]] = []
@@ -394,6 +405,7 @@ class CtpBridge:
from modules.ctp.ctp_trading_state import trading_state
pos = event.data
self._sync_offset_converter_position(pos)
row = self._position_row_from_vnpy(pos)
if row:
sym = row.get("symbol") or ""
@@ -445,6 +457,11 @@ class CtpBridge:
from modules.ctp.ctp_trading_state import trading_state
order = event.data
if self._offset_converter:
try:
self._offset_converter.update_order(order)
except Exception as exc:
logger.debug("offset converter order: %s", exc)
row = self._order_row_from_vnpy(order)
if not row:
return
@@ -476,6 +493,11 @@ class CtpBridge:
def _on_trade(event) -> None:
try:
trade = event.data
if self._offset_converter:
try:
self._offset_converter.update_trade(trade)
except Exception as exc:
logger.debug("offset converter trade: %s", exc)
row = self._trade_row_from_vnpy(trade)
if row and row.get("offset") == "open":
sym = row.get("symbol") or ""
@@ -680,6 +702,7 @@ class CtpBridge:
self._margin_rate_hooked = False
self._ctp_pos_legs.clear()
self._ctp_pos_leg_query_active = False
self._offset_converter = None
self._last_position_query_ts = 0.0
self._last_instruments_ready_ts = 0.0
try:
@@ -1091,6 +1114,50 @@ class CtpBridge:
frozen = int(getattr(pos, "frozen", 0) or 0)
return td, yd, frozen, vol
def _ensure_offset_converter(self) -> None:
if self._offset_converter or not self._engine:
return
try:
from vnpy.trader.converter import OffsetConverter
except ImportError:
logger.debug("vnpy OffsetConverter unavailable")
return
self._offset_converter = OffsetConverter(_OmsContractAdapter(self._engine))
try:
for pos in self._engine_collection_items(self._engine.get_all_positions()):
self._sync_offset_converter_position(pos)
except Exception as exc:
logger.debug("offset converter seed: %s", exc)
def _patch_position_yd_from_ctp(self, pos: Any) -> None:
"""用 CTP PositionDate 回报修正 vnpy PositionData.yd_volume,供 OffsetConverter 使用。"""
sym = str(getattr(pos, "symbol", "") or "")
hold = "long" if _is_long_direction(getattr(pos, "direction", None)) else "short"
ctp = self._position_td_yd_from_ctp_legs(sym, hold)
if ctp is None:
return
td, yd, _, vol = ctp
try:
pos.yd_volume = yd
cur_vol = int(getattr(pos, "volume", 0) or 0)
if vol > 0 and cur_vol != vol:
pos.volume = vol
except Exception as exc:
logger.debug("patch position yd: %s", exc)
def _sync_offset_converter_position(self, pos: Any) -> None:
self._ensure_offset_converter()
if not self._offset_converter or not pos:
return
try:
vol = int(getattr(pos, "volume", 0) or 0)
if vol <= 0:
return
self._patch_position_yd_from_ctp(pos)
self._offset_converter.update_position(pos)
except Exception as exc:
logger.debug("offset converter position: %s", exc)
def _refresh_ctp_position_legs_for_close(
self, sym: str, hold_direction: str, *, timeout: float = 3.0,
) -> bool:
@@ -1108,54 +1175,7 @@ class CtpBridge:
time.sleep(0.1)
return False
def _resolve_close_legs(
self, sym: str, ex_name: str, hold_direction: str, lots: int,
) -> list[tuple[Any, int]]:
from vnpy.trader.constant import Offset
ex_u = (ex_name or "").upper()
lots = max(1, int(lots or 1))
if ex_u not in ("CZCE", "CFFEX", "SHFE", "INE"):
return [(Offset.CLOSE, lots)]
hold = (hold_direction or "long").strip().lower()
if ex_u in ("SHFE", "INE"):
self._refresh_ctp_position_legs_for_close(sym, hold)
ctp_legs = self._position_td_yd_from_ctp_legs(sym, hold)
td, yd, frozen, vol = self._position_td_yd(sym, ex_name, hold_direction)
td_close = max(0, td)
yd_close = max(0, yd)
if frozen > 0:
cut = min(frozen, td_close)
td_close -= cut
frozen -= cut
yd_close = max(0, yd_close - frozen)
logger.info(
"close legs %s %s hold=%s want=%s td=%s yd=%s frozen=%s source=%s",
sym, ex_u, hold, lots, td_close, yd_close, frozen,
"ctp_position_date" if ctp_legs is not None else "vnpy_fallback",
)
legs: list[tuple[Any, int]] = []
remain = lots
if td_close > 0 and remain > 0:
take = min(remain, td_close)
legs.append((Offset.CLOSETODAY, take))
remain -= take
if yd_close > 0 and remain > 0:
take = min(remain, yd_close)
legs.append((Offset.CLOSEYESTERDAY, take))
remain -= take
if remain > 0:
raise ValueError(
f"可平仓位不足:今仓可平{td_close}手、昨仓可平{yd_close}手,"
f"需平{lots}手(请检查未成交平仓挂单是否占用仓位)"
)
return legs if legs else [(Offset.CLOSETODAY, lots)]
def _resolve_close_offset(self, sym: str, ex_name: str, hold_direction: str, lots: int) -> Any:
legs = self._resolve_close_legs(sym, ex_name, hold_direction, lots)
return legs[0][0]
def _send_close_leg(
def _submit_close_orders(
self,
*,
ths_code: str,
@@ -1163,25 +1183,39 @@ class CtpBridge:
ex_name: str,
exchange: Any,
direction: Any,
hold: str,
lots: int,
primary_off: Any,
order_type: Any,
price: float,
tick: float,
use_market: bool,
) -> str:
"""平仓:VeighNa OffsetConverter 自动拆分平今/平昨(与 CTA 引擎一致)。"""
from vnpy.trader.constant import Offset
from vnpy.trader.object import OrderRequest
lots = max(1, int(lots))
ex_u = (ex_name or "").upper()
if ex_u in ("SHFE", "INE"):
self._refresh_ctp_position_legs_for_close(sym, hold)
self._ensure_offset_converter()
if self._offset_converter:
for pos in self._engine_collection_items(self._engine.get_all_positions()):
ps = (getattr(pos, "symbol", "") or "").lower()
if ps != sym.lower():
continue
pd = "long" if _is_long_direction(getattr(pos, "direction", None)) else "short"
if pd == hold:
self._sync_offset_converter_position(pos)
lp = float(price)
if use_market:
lp = self._aggressive_limit_price(
ths_code, sym, ex_name, direction, tick, lp,
)
lp = self._aggressive_limit_price(ths_code, sym, ex_name, direction, tick, lp)
else:
lp = round_to_tick(lp, tick)
if lp <= 0:
raise ValueError("委托价格无效,请检查行情或手动填写价格")
req = OrderRequest(
symbol=sym,
exchange=exchange,
@@ -1189,19 +1223,48 @@ class CtpBridge:
type=order_type,
volume=lots,
price=lp,
offset=primary_off,
offset=Offset.CLOSE,
)
if not self._offset_converter:
raise RuntimeError("VeighNa OffsetConverter 未初始化,无法平仓")
req_list = self._offset_converter.convert_order_request(req, lock=False, net=False)
if not req_list:
raise ValueError(
"可平仓位不足(OffsetConverter 返回空列表,请检查挂单冻结或持仓同步)"
)
logger.info(
"OffsetConverter %s %s close %s手 -> %s",
sym,
hold,
lots,
[f"{getattr(r.offset, 'value', r.offset)}:{int(r.volume)}" for r in req_list],
)
last_vt = ""
for sub in req_list:
sub_price = lp
if use_market:
sub_price = self._aggressive_limit_price(
ths_code, sym, ex_name, sub.direction, tick, float(price),
)
else:
sub_price = round_to_tick(float(sub.price or lp), tick)
sub.price = sub_price
logger.info(
"CTP 报单 %s %s %s %s手 @%s offset=%s type=%s",
sym, ex_name, direction, lots, lp, primary_off, order_type,
sym, ex_name, sub.direction, sub.volume, sub_price, sub.offset, order_type,
)
with _ctp_td_lock:
vt_orderid = self._engine.send_order(req, GATEWAY_NAME)
vt_orderid = self._engine.send_order(sub, GATEWAY_NAME)
if not vt_orderid:
raise RuntimeError(
"CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)"
)
return str(vt_orderid)
last_vt = str(vt_orderid)
try:
self._offset_converter.update_order_request(sub, last_vt)
except Exception as exc:
logger.debug("offset converter order req: %s", exc)
return last_vt
def _aggressive_limit_price(
self,
@@ -2481,23 +2544,19 @@ class CtpBridge:
price = self._aggressive_limit_price(ths_code, sym, ex_name, d, tick, price)
if price <= 0:
raise ValueError("委托价格无效,请检查行情或手动填写价格")
close_legs = self._resolve_close_legs(sym, ex_name, hold, lots)
last_vt = ""
for off, leg_lots in close_legs:
last_vt = self._send_close_leg(
return self._submit_close_orders(
ths_code=ths_code,
sym=sym,
ex_name=ex_name,
exchange=exchange,
direction=d,
lots=leg_lots,
primary_off=off,
hold=hold,
lots=lots,
order_type=ot,
price=price,
tick=tick,
use_market=use_market,
)
return last_vt
raise ValueError(f"未知开平: {offset}")
def cancel_order(self, vt_orderid: str) -> bool: