Use CTP contract margin rates for position margin and ratio display.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-26 11:14:01 +08:00
parent 7a4a3f08e5
commit 038eb9a403
2 changed files with 136 additions and 22 deletions
+64 -15
View File
@@ -88,6 +88,7 @@ from vnpy_bridge import (
_ctp_td_lock,
ctp_cancel_order,
ctp_connect,
ctp_estimate_margin_one_lot,
ctp_get_account,
ctp_get_tick_price,
ctp_list_active_orders,
@@ -211,6 +212,41 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
return codes.get("ths_code") or sym
return sym
def _resolve_position_margin(
*,
sym: str,
direction: str,
lots: int,
entry: float,
mode: str,
ctp: Optional[dict] = None,
mon_margin: Optional[float] = None,
est_margin: Optional[float] = None,
) -> tuple[Optional[float], str]:
"""占用保证金:柜台持仓 > CTP 合约率估算 > 本地规格估算 > 库内缓存。"""
ctp_margin = float(ctp.get("margin") or 0) if ctp else 0.0
if ctp_margin > 0:
return round(ctp_margin, 2), "ctp"
connected = bool(ctp_status(mode).get("connected"))
ths_sym = sym
if ctp:
ths_sym = _ctp_pos_to_ths_code(ctp) or sym
else:
codes = ths_to_codes(sym)
if codes and codes.get("ths_code"):
ths_sym = codes["ths_code"]
if connected and ths_sym and entry > 0 and lots > 0:
per_lot = ctp_estimate_margin_one_lot(
mode, ths_sym, entry, direction=direction,
)
if per_lot and per_lot > 0:
return round(per_lot * lots, 2), "ctp"
if est_margin and float(est_margin) > 0:
return round(float(est_margin), 2), "estimate"
if not connected and mon_margin and float(mon_margin) > 0:
return round(float(mon_margin), 2), "db"
return None, "estimate"
def _ensure_monitors_from_ctp(conn, mode: str) -> None:
"""CTP 有持仓但本地无监控时,自动补写一条 active 记录供展示。"""
if not ctp_status(mode).get("connected"):
@@ -611,7 +647,18 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
mark = ctp_get_tick_price(mode, sym)
if mark is None or mark <= 0:
mark = entry if entry else None
margin = ctp_margin if ctp_margin > 0 else None
est = calc_position_metrics(
direction, entry, entry, entry, lots, mark or entry, capital, sym,
).get("margin")
margin, _src = _resolve_position_margin(
sym=sym,
direction=direction,
lots=lots,
entry=entry,
mode=mode,
ctp=p,
est_margin=est,
)
position_pct = None
if margin and capital > 0:
position_pct = round(float(margin) / float(capital) * 100, 2)
@@ -733,20 +780,22 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
direction, entry, sl if sl is not None else entry,
tp if tp is not None else entry, lots, mark, capital, sym,
)
if margin is None or float(margin or 0) <= 0:
ctp_margin = float(ctp.get("margin") or 0) if ctp else 0.0
est_margin = pos_metrics.get("margin")
margin = ctp_margin if ctp_margin > 0 else est_margin
margin_source = "ctp" if ctp_margin > 0 else "estimate"
else:
margin_source = "ctp"
if position_pct is None or float(position_pct or 0) <= 0:
position_pct = (
round(float(margin) / capital * 100, 2)
if capital > 0 and margin
else pos_metrics.get("position_pct")
)
else:
mon_margin = margin
margin, margin_source = _resolve_position_margin(
sym=sym,
direction=direction,
lots=lots,
entry=entry,
mode=mode,
ctp=ctp,
mon_margin=mon_margin if isinstance(mon_margin, (int, float)) else None,
est_margin=pos_metrics.get("margin"),
)
if margin and capital > 0:
position_pct = round(float(margin) / float(capital) * 100, 2)
elif position_pct is None or float(position_pct or 0) <= 0:
position_pct = pos_metrics.get("position_pct")
elif position_pct is not None:
position_pct = float(position_pct)
order_st = monitor_order_status(
mon or {}, mode=mode, ths_code=sym, direction=direction,
+72 -7
View File
@@ -976,7 +976,29 @@ class CtpBridge:
def _lookup_position_margin(self, sym: str, direction: str) -> float:
return float(self._position_margins.get(self._position_margin_key(sym, direction), 0) or 0)
def estimate_margin_one_lot(self, ths_code: str, price: float) -> Optional[float]:
@staticmethod
def _vnpy_sym_to_ths(sym: str, ex_name: str) -> str:
import re
s = (sym or "").strip()
if not s:
return ""
ex = (ex_name or "").upper()
m = re.match(r"^([A-Za-z]+)(\d+)$", s)
if not m:
return s
letters, digits = m.group(1), m.group(2)
if ex == "CZCE":
return letters.upper() + (digits[-3:] if len(digits) >= 4 else digits)
return letters.lower() + digits
def estimate_margin_one_lot(
self,
ths_code: str,
price: float,
*,
direction: str = "long",
) -> Optional[float]:
"""用 CTP 合约信息估算 1 手保证金(需已连接并完成合约查询)。"""
if not self._engine or not price or price <= 0:
return None
@@ -990,7 +1012,13 @@ class CtpBridge:
mult = float(getattr(contract, "size", 0) or 0)
long_r = float(getattr(contract, "long_margin_ratio", 0) or 0)
short_r = float(getattr(contract, "short_margin_ratio", 0) or 0)
ratio = max(long_r, short_r)
d = (direction or "long").strip().lower()
if d == "short" and short_r > 0:
ratio = short_r
elif d != "short" and long_r > 0:
ratio = long_r
else:
ratio = max(long_r, short_r)
if mult <= 0 or ratio <= 0:
return None
return round(float(price) * mult * ratio, 2)
@@ -998,6 +1026,34 @@ class CtpBridge:
logger.debug("estimate_margin_one_lot %s: %s", ths_code, exc)
return None
def estimate_position_margin(
self,
sym: str,
ex_name: str,
direction: str,
lots: int,
price: float,
*,
pos: Any = None,
) -> Optional[float]:
"""持仓占用保证金:优先 vnpy 字段,其次 CTP 合约保证金率估算。"""
if lots <= 0 or price <= 0:
return None
if pos is not None:
raw = float(getattr(pos, "margin", 0) or getattr(pos, "use_margin", 0) or 0)
if raw > 0:
return round(raw, 2)
cached = self._lookup_position_margin(sym, direction)
if cached > 0:
return round(cached, 2)
ths = self._vnpy_sym_to_ths(sym, ex_name)
if not ths:
return None
per_lot = self.estimate_margin_one_lot(ths, price, direction=direction)
if per_lot and per_lot > 0:
return round(per_lot * lots, 2)
return None
def lookup_contract_spec(self, ths_code: str) -> Optional[dict]:
"""从 CTP 合约信息读取乘数与最小变动价位。"""
if not self._engine:
@@ -1037,17 +1093,20 @@ class CtpBridge:
sym = getattr(pos, "symbol", "") or ""
exchange = getattr(pos, "exchange", None)
ex_name = str(exchange.value if hasattr(exchange, "value") else exchange or "")
margin = self._lookup_position_margin(sym, d)
price = float(getattr(pos, "price", 0) or 0)
margin = self.estimate_position_margin(
sym, ex_name, d, vol, price, pos=pos,
)
open_time = self._lookup_position_open_time(sym, d) or None
out.append({
"symbol": sym,
"exchange": ex_name,
"direction": d,
"lots": vol,
"avg_price": float(getattr(pos, "price", 0) or 0),
"avg_price": price,
"pnl": float(getattr(pos, "pnl", 0) or 0),
"frozen": int(getattr(pos, "frozen", 0) or 0),
"margin": round(margin, 2) if margin > 0 else None,
"margin": margin,
"open_time": open_time,
})
return out
@@ -1471,12 +1530,18 @@ def ctp_get_tick_detail(mode: str, ths_code: str) -> dict[str, Any]:
return {}
def ctp_estimate_margin_one_lot(mode: str, ths_code: str, price: float) -> Optional[float]:
def ctp_estimate_margin_one_lot(
mode: str,
ths_code: str,
price: float,
*,
direction: str = "long",
) -> Optional[float]:
b = get_bridge()
if b.connected_mode != mode or not b.ping():
return None
try:
return b.estimate_margin_one_lot(ths_code, price)
return b.estimate_margin_one_lot(ths_code, price, direction=direction)
except Exception as exc:
logger.debug("ctp_estimate_margin_one_lot: %s", exc)
return None