From d07fc4b70d40dd2f88c3d33cfe262a9aac713d8a Mon Sep 17 00:00:00 2001 From: dekun Date: Tue, 30 Jun 2026 10:51:16 +0800 Subject: [PATCH] Prefer CTP PnL-consistent entry when vnpy avg differs from SimNow. Co-authored-by: Cursor --- ctp_entry_price.py | 66 +++++++++++++++++++++++++++++-- ctp_trading_state.py | 94 +++++++++++++++++++------------------------- install_trading.py | 11 ++++-- vnpy_bridge.py | 19 ++++++++- 4 files changed, 128 insertions(+), 62 deletions(-) diff --git a/ctp_entry_price.py b/ctp_entry_price.py index e66fd53..d8efde2 100644 --- a/ctp_entry_price.py +++ b/ctp_entry_price.py @@ -1,12 +1,14 @@ # Copyright (c) 2025-2026 马建军. All rights reserved. # 详见 LICENSE.zh-CN.txt -"""CTP 持仓均价:成交加权 / 柜台持仓价(滚仓加仓后以柜台为准)。""" +"""CTP 持仓均价:成交加权 / 柜台持仓价 / 盈亏一致校正。""" from __future__ import annotations from typing import Any, Optional +from contract_specs import get_contract_spec from ctp_symbol import ths_to_vnpy_symbol +from symbols import ths_to_codes def symbols_match(ctp_sym: str, ths: str) -> bool: @@ -31,6 +33,47 @@ def symbols_match(ctp_sym: str, ths: str) -> bool: return False +def _ths_code(sym: str) -> str: + codes = ths_to_codes(sym) or {} + return codes.get("ths_code") or sym + + +def round_to_tick(price: float, sym: str) -> float: + tick = float(get_contract_spec(_ths_code(sym)).get("tick_size") or 1.0) + if tick <= 0: + return round(price, 2) + return round(round(price / tick) * tick, 4) + + +def entry_from_ctp_pnl( + ctp: dict[str, Any], + tick: Optional[float], + *, + ths_sym: str = "", +) -> Optional[float]: + """用柜台持仓盈亏 + 现价反推均价(与 SimNow 浮动盈亏一致)。""" + if not tick or tick <= 0: + return None + lots = int(ctp.get("lots") or 0) + if lots <= 0: + return None + pnl = float(ctp.get("pnl") or 0) + if not pnl: + return None + sym = ths_sym or (ctp.get("symbol") or "") + mult = float(get_contract_spec(_ths_code(sym)).get("mult") or 10) + if mult <= 0: + return None + direction = (ctp.get("direction") or "long").strip().lower() + if direction == "long": + derived = tick - pnl / (mult * lots) + else: + derived = tick + pnl / (mult * lots) + if derived <= 0: + return None + return round_to_tick(derived, sym) + + def avg_from_trades( trades: list[dict[str, Any]], sym: str, @@ -67,7 +110,7 @@ def avg_from_trades( return None if expect_lots > 0 and vol != expect_lots: return None - return round(cost / vol, 4) + return round_to_tick(cost / vol, sym) def resolve_ctp_entry( @@ -75,17 +118,34 @@ def resolve_ctp_entry( direction: str, ctp: Optional[dict[str, Any]], trades: Optional[list[dict[str, Any]]] = None, + *, + tick: Optional[float] = None, ) -> tuple[float, str]: - """均价:成交加权 > 柜台 PositionCost 持仓价。""" + """均价:成交加权 > 盈亏一致校正 > 柜台持仓价。""" if not ctp: return 0.0, "none" direction = (direction or "long").strip().lower() lots = int(ctp.get("lots") or 0) + if trades: trade_avg = avg_from_trades(trades, sym, direction, expect_lots=lots) if trade_avg and trade_avg > 0: return float(trade_avg), "trades" + pos_avg = float(ctp.get("avg_price") or 0) if pos_avg > 0: + pos_avg = round_to_tick(pos_avg, sym) + + pnl_avg = entry_from_ctp_pnl(ctp, tick, ths_sym=sym) + tick_sz = float(get_contract_spec(_ths_code(sym)).get("tick_size") or 1.0) + + if pnl_avg and pos_avg > 0: + if abs(pnl_avg - pos_avg) >= max(tick_sz * 0.5, 0.01): + return float(pnl_avg), "pnl" return pos_avg, "ctp" + + if pos_avg > 0: + return pos_avg, "ctp" + if pnl_avg and pnl_avg > 0: + return float(pnl_avg), "pnl" return 0.0, "none" diff --git a/ctp_trading_state.py b/ctp_trading_state.py index bf3251c..3f54388 100644 --- a/ctp_trading_state.py +++ b/ctp_trading_state.py @@ -34,43 +34,6 @@ def parse_position_key(key: str) -> tuple[str, str, str]: return "", (key or "").lower(), "long" -def avg_price_from_ctp_pnl( - row: dict[str, Any], - tick: Optional[float], -) -> Optional[float]: - """用柜台持仓盈亏 + 现价快照反推开仓均价(与 SimNow 浮动盈亏一致)。""" - if not tick or tick <= 0: - return None - lots = int(row.get("lots") or 0) - if lots <= 0: - return None - pnl = float(row.get("pnl") or 0) - if not pnl: - return None - sym = (row.get("symbol") or "").strip() - if not sym: - return None - try: - from contract_specs import get_contract_spec - from symbols import ths_to_codes - - codes = ths_to_codes(sym) or {} - ths = codes.get("ths_code") or sym - mult = float(get_contract_spec(ths).get("mult") or 10) - except Exception: - mult = 10.0 - if mult <= 0: - return None - direction = (row.get("direction") or "long").strip().lower() - if direction == "long": - derived = tick - pnl / (mult * lots) - else: - derived = tick + pnl / (mult * lots) - if derived <= 0: - return None - return round(derived, 2) - - def reconcile_position_avg( old: Optional[dict[str, Any]], new: dict[str, Any], @@ -80,7 +43,7 @@ def reconcile_position_avg( ths_sym: str = "", ) -> dict[str, Any]: """手数不变时锁定均价;滚仓/加仓(手数变化)时以柜台加权均价为准。""" - from ctp_entry_price import resolve_ctp_entry + from ctp_entry_price import entry_from_ctp_pnl, resolve_ctp_entry row = dict(new) lots = int(row.get("lots") or 0) @@ -89,6 +52,7 @@ def reconcile_position_avg( direction = (row.get("direction") or "long").strip().lower() old_lots = int(old.get("lots") or 0) if old else 0 lots_changed = not old or old_lots != lots + sym = ths_sym or (row.get("symbol") or "") if ( not lots_changed @@ -96,12 +60,22 @@ def reconcile_position_avg( and old.get("avg_price_locked") and float(old.get("avg_price") or 0) > 0 ): - row["avg_price"] = float(old["avg_price"]) + locked = float(old["avg_price"]) + corrected, _ = resolve_ctp_entry(sym, direction, row, trades, tick=tick) + pnl_entry = entry_from_ctp_pnl(row, tick, ths_sym=sym) + if corrected > 0 and abs(corrected - locked) >= 0.5: + row["avg_price"] = corrected + row["avg_price_locked"] = True + return row + if pnl_entry and abs(pnl_entry - locked) >= 0.5: + row["avg_price"] = pnl_entry + row["avg_price_locked"] = True + return row + row["avg_price"] = locked row["avg_price_locked"] = True return row - sym = ths_sym or (row.get("symbol") or "") - entry, _src = resolve_ctp_entry(sym, direction, row, trades) + entry, _src = resolve_ctp_entry(sym, direction, row, trades, tick=tick) if entry > 0: row["avg_price"] = entry row["avg_price_locked"] = True @@ -111,13 +85,6 @@ def reconcile_position_avg( if pos_avg > 0: row["avg_price"] = pos_avg row["avg_price_locked"] = lots_changed or bool(tick) - return row - - if not lots_changed: - refined = avg_price_from_ctp_pnl(row, tick) - if refined and refined > 0: - row["avg_price"] = refined - row["avg_price_locked"] = True return row @@ -207,18 +174,37 @@ class CtpTradingState: return dict(row) if row else None def try_lock_entry_prices(self) -> bool: - """有 tick 后一次性校正未锁定的持仓均价。""" + """有 tick 后校正持仓均价(含已锁定但与柜台盈亏不一致的)。""" + from ctp_entry_price import resolve_ctp_entry + changed = False with self._lock: for pk, row in list(self._positions.items()): - if row.get("avg_price_locked"): + ex = row.get("exchange") or "" + sym = row.get("symbol") or "" + tick = self.get_tick_price(ex, sym) + if not tick or tick <= 0: continue - tick = self.get_tick_price(row.get("exchange") or "", row.get("symbol") or "") - refined = avg_price_from_ctp_pnl(row, tick) - if not refined or refined <= 0: + ths = sym + try: + from vnpy_bridge import CtpBridge + ths = CtpBridge._vnpy_sym_to_ths(sym, ex) or sym + except Exception: + pass + entry, _ = resolve_ctp_entry( + ths, + row.get("direction") or "long", + row, + tick=tick, + ) + if not entry or entry <= 0: continue + current = float(row.get("avg_price") or 0) + if row.get("avg_price_locked") and current > 0: + if abs(entry - current) < 0.5: + continue updated = dict(row) - updated["avg_price"] = refined + updated["avg_price"] = entry updated["avg_price_locked"] = True self._positions[pk] = updated changed = True diff --git a/install_trading.py b/install_trading.py index 337b22a..3dec526 100644 --- a/install_trading.py +++ b/install_trading.py @@ -564,7 +564,9 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se continue if not _match_ctp_symbol(p.get("symbol") or "", sym): continue - entry, _ = resolve_ctp_entry(sym, direction, p, trades) + entry, _ = resolve_ctp_entry( + sym, direction, p, trades, tick=ctp_get_tick_price(mode, sym), + ) if entry > 0: return float(entry) return fallback @@ -578,12 +580,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if not ctp: return 0.0, "none" trades: list = [] + tick = None if ctp_status(mode).get("connected"): try: trades = ctp_list_trades(mode) except Exception: pass - return resolve_ctp_entry(sym, direction, ctp, trades) + tick = ctp_get_tick_price(mode, sym) + return resolve_ctp_entry(sym, direction, ctp, trades, tick=tick) def _open_commission_from_ctp_trades( mode: str, sym: str, direction: str, @@ -1874,7 +1878,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se return mode = get_trading_mode(get_setting) if trading_state.try_lock_entry_prices(): - _push_position_snapshot_async(fast=True) + _push_position_snapshot_async(fast=False) + return payload = _build_position_quotes_payload(mode) if payload.get("quotes"): position_hub.push_event("position_quotes", payload) diff --git a/vnpy_bridge.py b/vnpy_bridge.py index 65ef6fd..33d9f85 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -464,19 +464,34 @@ class CtpBridge: td = max(0, vol - yd) margin = self.estimate_position_margin(sym, ex_name, d, vol, price, pos=pos) open_time = self._lookup_position_open_time(sym, d) or None - return { + pnl = float(getattr(pos, "pnl", 0) or 0) + row = { "symbol": sym, "exchange": ex_name, "direction": d, "lots": vol, "avg_price": price, - "pnl": float(getattr(pos, "pnl", 0) or 0), + "pnl": pnl, "frozen": int(getattr(pos, "frozen", 0) or 0), "margin": margin, "open_time": open_time, "yd_volume": yd, "td_volume": td, } + try: + from ctp_entry_price import entry_from_ctp_pnl, round_to_tick + from ctp_trading_state import trading_state + + ths = CtpBridge._vnpy_sym_to_ths(sym, ex_name) or sym + tick = trading_state.get_tick_price(ex_name, sym) + pnl_entry = entry_from_ctp_pnl(row, tick, ths_sym=ths) + if pnl_entry and price > 0 and abs(pnl_entry - price) >= 0.5: + row["avg_price"] = pnl_entry + elif price > 0: + row["avg_price"] = round_to_tick(price, ths) + except Exception as exc: + logger.debug("position avg refine: %s", exc) + return row except Exception as exc: logger.debug("position_row_from_vnpy: %s", exc) return None