From 6e954da4e1bf0aaf9d5a4ae551d0951219d97f48 Mon Sep 17 00:00:00 2001 From: dekun Date: Tue, 30 Jun 2026 10:36:06 +0800 Subject: [PATCH] Lock CTP entry price from position PnL snapshot; match SimNow avg and float PnL. Co-authored-by: Cursor --- ctp_trading_state.py | 103 ++++++++++++++++++++++++++++++++++++++++--- install_trading.py | 20 ++++++--- 2 files changed, 113 insertions(+), 10 deletions(-) diff --git a/ctp_trading_state.py b/ctp_trading_state.py index 36eb857..cb1703f 100644 --- a/ctp_trading_state.py +++ b/ctp_trading_state.py @@ -12,9 +12,6 @@ from typing import Any, Callable, Optional logger = logging.getLogger(__name__) -CALIBRATE_INTERVAL_SEC = 30.0 - - def position_key(exchange: str, symbol: str, direction: str) -> str: """统一持仓键:exchange|symbol|direction""" ex = (exchange or "").strip().upper() @@ -34,6 +31,75 @@ def parse_position_key(key: str) -> tuple[str, str, str]: return "", (key or "").lower(), "long" +def avg_price_from_ctp_pnl( + row: dict[str, Any], + tick: Optional[float], +) -> Optional[float]: + """用柜台持仓盈亏 + 现价快照反推开仓均价(与 SimNow 浮动盈亏一致)。""" + if not tick or tick <= 0: + return None + lots = int(row.get("lots") or 0) + if lots <= 0: + return None + pnl = float(row.get("pnl") or 0) + if not pnl: + return None + sym = (row.get("symbol") or "").strip() + if not sym: + return None + try: + from contract_specs import get_contract_spec + from symbols import ths_to_codes + + codes = ths_to_codes(sym) or {} + ths = codes.get("ths_code") or sym + mult = float(get_contract_spec(ths).get("mult") or 10) + except Exception: + mult = 10.0 + if mult <= 0: + return None + direction = (row.get("direction") or "long").strip().lower() + if direction == "long": + derived = tick - pnl / (mult * lots) + else: + derived = tick + pnl / (mult * lots) + if derived <= 0: + return None + return round(derived, 2) + + +def reconcile_position_avg( + old: Optional[dict[str, Any]], + new: dict[str, Any], + tick: Optional[float], +) -> dict[str, Any]: + """手数不变时锁定均价;新开/加仓时用柜台盈亏快照校正一次。""" + row = dict(new) + lots = int(row.get("lots") or 0) + if lots <= 0: + return row + old_lots = int(old.get("lots") or 0) if old else 0 + if ( + old + and old_lots == lots + and old.get("avg_price_locked") + and float(old.get("avg_price") or 0) > 0 + ): + row["avg_price"] = float(old["avg_price"]) + row["avg_price_locked"] = True + return row + + refined = avg_price_from_ctp_pnl(row, tick) + pos_avg = float(row.get("avg_price") or 0) + if refined and refined > 0: + row["avg_price"] = refined + row["avg_price_locked"] = True + elif pos_avg > 0: + row["avg_price"] = pos_avg + row["avg_price_locked"] = bool(tick and refined) + return row + + class CtpTradingState: """进程内 CTP 快照:柜台回报为准,SQLite 仅挂 SL/TP 元数据。""" @@ -114,17 +180,42 @@ class CtpTradingState: if removed and notify: self._notify() + def get_position(self, pk: str) -> Optional[dict[str, Any]]: + with self._lock: + row = self._positions.get(pk) + return dict(row) if row else None + + def try_lock_entry_prices(self) -> bool: + """有 tick 后一次性校正未锁定的持仓均价。""" + changed = False + with self._lock: + for pk, row in list(self._positions.items()): + if row.get("avg_price_locked"): + continue + tick = self.get_tick_price(row.get("exchange") or "", row.get("symbol") or "") + refined = avg_price_from_ctp_pnl(row, tick) + if not refined or refined <= 0: + continue + updated = dict(row) + updated["avg_price"] = refined + updated["avg_price_locked"] = True + self._positions[pk] = updated + changed = True + return changed + def upsert_position(self, row: dict[str, Any], *, notify: bool = True) -> None: lots = int(row.get("lots") or 0) ex = row.get("exchange") or "" sym = row.get("symbol") or "" direction = row.get("direction") or "long" pk = position_key(ex, sym, direction) + tick = self.get_tick_price(ex, sym) with self._lock: if lots <= 0: self._positions.pop(pk, None) else: - row = dict(row) + old = self._positions.get(pk) + row = reconcile_position_avg(old, dict(row), tick) row["position_key"] = pk self._positions[pk] = row if notify: @@ -190,7 +281,9 @@ class CtpTradingState: pk = position_key(ex, sym, direction) row = dict(p) row["position_key"] = pk - new_positions[pk] = row + old = self._positions.get(pk) + tick = self.get_tick_price(ex, sym) + new_positions[pk] = reconcile_position_avg(old, row, tick) with self._lock: self._orders = new_orders self._positions = new_positions diff --git a/install_trading.py b/install_trading.py index 2579ec8..b4f1982 100644 --- a/install_trading.py +++ b/install_trading.py @@ -591,20 +591,21 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se direction: str, ctp: Optional[dict], ) -> tuple[float, str]: - """持仓均价:柜台持仓价 > 成交加权(均不随 tick 变化)。""" + """持仓均价:成交加权 > 柜台持仓价(锁定后不随 tick 变化)。""" if not ctp: return 0.0, "none" direction = (direction or "long").strip().lower() lots = int(ctp.get("lots") or 0) - pos_avg = float(ctp.get("avg_price") or 0) - if pos_avg > 0: - return pos_avg, "ctp" trade_avg = _ctp_avg_entry_from_trades( mode, sym, direction, expect_lots=lots, ) if trade_avg and trade_avg > 0: return float(trade_avg), "trades" + + pos_avg = float(ctp.get("avg_price") or 0) + if pos_avg > 0: + return pos_avg, "ctp" return 0.0, "none" def _open_commission_from_ctp_trades( @@ -1289,6 +1290,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se direction, entry, sl or entry, tp or entry, lots, mark, capital, sym, ) float_pnl = pos_tmp.get("float_pnl") + if ctp and ctp_status(mode).get("connected"): + ctp_pnl = float(ctp.get("pnl") or 0) + if ctp_pnl != 0: + float_pnl = round(ctp_pnl, 2) fee_info = calc_fee_breakdown( sym, entry, close_est, lots, open_time or now_iso, now_iso, trading_mode=mode, @@ -1866,7 +1871,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if entry <= 0: continue mult = float(get_contract_spec(ths).get("mult") or 10) - if direction == "long": + ctp_pnl = float(p.get("pnl") or 0) + if ctp_pnl != 0: + float_pnl = round(ctp_pnl, 2) + elif direction == "long": float_pnl = round((mark - entry) * mult * lots, 2) else: float_pnl = round((entry - mark) * mult * lots, 2) @@ -1888,6 +1896,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if not is_trading_session(): return mode = get_trading_mode(get_setting) + if trading_state.try_lock_entry_prices(): + _push_position_snapshot_async(fast=True) payload = _build_position_quotes_payload(mode) if payload.get("quotes"): position_hub.push_event("position_quotes", payload)