Files
qihuo/modules/ctp/ctp_entry_price.py
T
dekun dca773d6be Fix CTP open average price direction mapping and resolution order.
Correct PosiDirection 2=long/3=short so OpenCost caches under the right key, prefer open_cost over PositionCost for entry and float P/L, and refresh the cache when incomplete.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-07-02 21:13:28 +08:00

119 lines
3.8 KiB
Python

# Copyright (c) 2025-2026 马建军. All rights reserved.
# 详见 LICENSE.zh-CN.txt
"""CTP 持仓均价:优先 CTP OpenCost(柜台开仓均价),其次成交加权。"""
from __future__ import annotations
from typing import Any, Callable, Optional
from modules.core.contract_specs import get_contract_spec
from modules.ctp.ctp_symbol import ths_to_vnpy_symbol
from modules.core.symbols import ths_to_codes
def symbols_match(ctp_sym: str, ths: str) -> bool:
a = (ctp_sym or "").lower()
b = (ths or "").lower()
if a == b:
return True
if a and b and a.split(".")[0] == b.split(".")[0]:
return True
try:
vnpy_sym, _ = ths_to_vnpy_symbol(ths)
if a == vnpy_sym.lower():
return True
except Exception:
pass
try:
vnpy_sym, _ = ths_to_vnpy_symbol(ctp_sym)
if vnpy_sym.lower() == b.split(".")[0]:
return True
except Exception:
pass
return False
def _ths_code(sym: str) -> str:
codes = ths_to_codes(sym) or {}
return codes.get("ths_code") or sym
def round_to_tick(price: float, sym: str) -> float:
tick = float(get_contract_spec(_ths_code(sym)).get("tick_size") or 1.0)
if tick <= 0:
return round(price, 2)
return round(round(price / tick) * tick, 4)
def compute_open_avg_from_trades(
sym: str,
direction: str,
trades: Optional[list[dict[str, Any]]],
) -> float:
"""按开仓成交 FIFO 还原剩余持仓的开仓均价。"""
if not trades:
return 0.0
want = (direction or "long").strip().lower()
open_vol = 0.0
open_cost = 0.0
for t in sorted(trades, key=lambda x: x.get("datetime") or ""):
if (t.get("offset") or "").strip().lower() != "open":
continue
pos_dir = (t.get("position_direction") or t.get("direction") or "long").strip().lower()
if pos_dir != want:
continue
if not symbols_match(t.get("symbol") or "", sym):
continue
lots = float(int(t.get("lots") or 0))
px = float(t.get("price") or 0)
if lots <= 0 or px <= 0:
continue
open_vol += lots
open_cost += px * lots
if open_vol <= 0:
return 0.0
for t in sorted(trades, key=lambda x: x.get("datetime") or ""):
if (t.get("offset") or "").strip().lower() != "close":
continue
pos_dir = (t.get("position_direction") or t.get("direction") or "long").strip().lower()
if pos_dir != want:
continue
if not symbols_match(t.get("symbol") or "", sym):
continue
lots = float(int(t.get("lots") or 0))
if lots <= 0 or open_vol <= 0:
continue
avg = open_cost / open_vol
dec = min(lots, open_vol)
open_cost -= avg * dec
open_vol -= dec
if open_vol <= 0:
return 0.0
return round(open_cost / open_vol, 4)
def resolve_ctp_entry(
sym: str,
direction: str,
ctp: Optional[dict[str, Any]],
trades: Optional[list[dict[str, Any]]] = None,
*,
tick: Optional[float] = None,
open_avg_lookup: Optional[Callable[[str, str], float]] = None,
) -> tuple[float, str]:
"""均价:OpenCost 缓存 → 成交加权 → vnpy PositionCost。"""
del tick
want = (direction or "long").strip().lower()
if open_avg_lookup:
cached = float(open_avg_lookup(sym, want) or 0)
if cached > 0:
return round_to_tick(cached, sym), "open_cost"
trade_avg = compute_open_avg_from_trades(sym, want, trades)
if trade_avg > 0:
return round_to_tick(trade_avg, sym), "trades"
if ctp:
pos_avg = float(ctp.get("avg_price") or 0)
if pos_avg > 0:
return round_to_tick(pos_avg, sym), "position_cost"
return 0.0, "none"