Files
qihuo/ctp_entry_price.py
T
2026-06-30 10:51:16 +08:00

152 lines
4.4 KiB
Python

# Copyright (c) 2025-2026 马建军. All rights reserved.
# 详见 LICENSE.zh-CN.txt
"""CTP 持仓均价:成交加权 / 柜台持仓价 / 盈亏一致校正。"""
from __future__ import annotations
from typing import Any, Optional
from contract_specs import get_contract_spec
from ctp_symbol import ths_to_vnpy_symbol
from symbols import ths_to_codes
def symbols_match(ctp_sym: str, ths: str) -> bool:
a = (ctp_sym or "").lower()
b = (ths or "").lower()
if a == b:
return True
if a and b and a.split(".")[0] == b.split(".")[0]:
return True
try:
vnpy_sym, _ = ths_to_vnpy_symbol(ths)
if a == vnpy_sym.lower():
return True
except Exception:
pass
try:
vnpy_sym, _ = ths_to_vnpy_symbol(ctp_sym)
if vnpy_sym.lower() == b.split(".")[0]:
return True
except Exception:
pass
return False
def _ths_code(sym: str) -> str:
codes = ths_to_codes(sym) or {}
return codes.get("ths_code") or sym
def round_to_tick(price: float, sym: str) -> float:
tick = float(get_contract_spec(_ths_code(sym)).get("tick_size") or 1.0)
if tick <= 0:
return round(price, 2)
return round(round(price / tick) * tick, 4)
def entry_from_ctp_pnl(
ctp: dict[str, Any],
tick: Optional[float],
*,
ths_sym: str = "",
) -> Optional[float]:
"""用柜台持仓盈亏 + 现价反推均价(与 SimNow 浮动盈亏一致)。"""
if not tick or tick <= 0:
return None
lots = int(ctp.get("lots") or 0)
if lots <= 0:
return None
pnl = float(ctp.get("pnl") or 0)
if not pnl:
return None
sym = ths_sym or (ctp.get("symbol") or "")
mult = float(get_contract_spec(_ths_code(sym)).get("mult") or 10)
if mult <= 0:
return None
direction = (ctp.get("direction") or "long").strip().lower()
if direction == "long":
derived = tick - pnl / (mult * lots)
else:
derived = tick + pnl / (mult * lots)
if derived <= 0:
return None
return round_to_tick(derived, sym)
def avg_from_trades(
trades: list[dict[str, Any]],
sym: str,
direction: str,
*,
expect_lots: int = 0,
) -> Optional[float]:
"""按成交回报移动加权均价(滚仓多笔开仓后应与柜台一致)。"""
direction = (direction or "long").strip().lower()
vol = 0
cost = 0.0
for t in sorted(trades, key=lambda x: (x.get("datetime") or "", x.get("trade_id") or "")):
if not symbols_match(t.get("symbol") or "", sym):
continue
off = (t.get("offset") or "").strip().lower()
pos_dir = (
t.get("position_direction") or t.get("direction") or "long"
).strip().lower()
if pos_dir != direction:
continue
lots = int(t.get("lots") or 0)
px = float(t.get("price") or 0)
if lots <= 0 or px <= 0:
continue
if off == "open":
cost += px * lots
vol += lots
elif off == "close" and vol > 0:
avg = cost / vol
dec = min(lots, vol)
cost -= avg * dec
vol -= dec
if vol <= 0:
return None
if expect_lots > 0 and vol != expect_lots:
return None
return round_to_tick(cost / vol, sym)
def resolve_ctp_entry(
sym: str,
direction: str,
ctp: Optional[dict[str, Any]],
trades: Optional[list[dict[str, Any]]] = None,
*,
tick: Optional[float] = None,
) -> tuple[float, str]:
"""均价:成交加权 > 盈亏一致校正 > 柜台持仓价。"""
if not ctp:
return 0.0, "none"
direction = (direction or "long").strip().lower()
lots = int(ctp.get("lots") or 0)
if trades:
trade_avg = avg_from_trades(trades, sym, direction, expect_lots=lots)
if trade_avg and trade_avg > 0:
return float(trade_avg), "trades"
pos_avg = float(ctp.get("avg_price") or 0)
if pos_avg > 0:
pos_avg = round_to_tick(pos_avg, sym)
pnl_avg = entry_from_ctp_pnl(ctp, tick, ths_sym=sym)
tick_sz = float(get_contract_spec(_ths_code(sym)).get("tick_size") or 1.0)
if pnl_avg and pos_avg > 0:
if abs(pnl_avg - pos_avg) >= max(tick_sz * 0.5, 0.01):
return float(pnl_avg), "pnl"
return pos_avg, "ctp"
if pos_avg > 0:
return pos_avg, "ctp"
if pnl_avg and pnl_avg > 0:
return float(pnl_avg), "pnl"
return 0.0, "none"