Files
crypto_monitor/hub_position_metrics.py
T

248 lines
7.4 KiB
Python

"""ccxt 持仓标记价解析(实例 price_snapshot 与中控子代理共用)。"""
from __future__ import annotations
import math
from typing import Any, Callable
def _finite_or_none(x: Any) -> float | None:
try:
f = float(x)
return f if math.isfinite(f) else None
except (TypeError, ValueError):
return None
def _coerce_float(*values: Any) -> float | None:
for v in values:
if v is None or v == "":
continue
px = _finite_or_none(v)
if px is not None and px > 0:
return px
return None
def position_contracts(p: dict[str, Any]) -> float:
raw = p.get("contracts")
if raw is not None:
try:
return float(raw)
except (TypeError, ValueError):
pass
info = p.get("info") or {}
if not isinstance(info, dict):
info = {}
for k in ("positionAmt", "positionamt", "pos", "size"):
if k in info:
try:
v = float(info[k])
if v != 0:
return v
except (TypeError, ValueError):
pass
return 0.0
def position_side_from_ccxt(p: dict[str, Any], contracts: float | None = None) -> str:
s = (p.get("side") or "").lower()
if s in ("long", "short"):
return s
c = contracts if contracts is not None else position_contracts(p)
if c > 0:
return "long"
if c < 0:
return "short"
return "long"
def parse_position_entry_price(p: dict[str, Any]) -> float | None:
"""四所 ccxt 持仓开仓均价。"""
if not isinstance(p, dict):
return None
info = p.get("info") or {}
if not isinstance(info, dict):
info = {}
return _coerce_float(
p.get("entryPrice"),
p.get("entry_price"),
p.get("average"),
info.get("entryPrice"),
info.get("entry_price"),
info.get("avgPx"),
info.get("avgEntryPrice"),
info.get("avg_entry_price"),
info.get("avgPrice"),
info.get("openAvgPx"),
)
def estimate_linear_swap_upnl_usdt(
side: str,
entry: float | None,
mark: float | None,
contracts: float | None,
contract_size: float | None = None,
) -> float | None:
"""U 本位线性永续:浮盈 = (标记价 - 开仓价) × 张数 × contractSize(空头取反)。"""
e = _finite_or_none(entry)
m = _finite_or_none(mark)
c = _finite_or_none(contracts)
if e is None or m is None or c is None or c <= 0:
return None
mult = _finite_or_none(contract_size)
if mult is None or mult <= 0:
mult = 1.0
diff = (m - e) if (side or "long").strip().lower() == "long" else (e - m)
return round(diff * abs(c) * mult, 2)
def resolve_position_display_upnl(
side: str,
entry: float | None,
mark: float | None,
contracts: float | None,
contract_size: float | None,
exchange_upnl: float | None,
) -> float | None:
"""展示用浮盈:优先与标记价/张数一致的推算;与交易所值偏差过大时用推算值。"""
computed = estimate_linear_swap_upnl_usdt(
side, entry, mark, contracts, contract_size
)
if computed is None:
return exchange_upnl
if exchange_upnl is None:
return computed
ref = max(abs(computed), 1.0)
if abs(exchange_upnl - computed) / ref > 0.2:
return computed
return exchange_upnl
def _coerce_signed(*values: Any) -> float | None:
"""解析可正可负的数值(未实现盈亏等)。"""
for v in values:
if v is None or v == "":
continue
f = _finite_or_none(v)
if f is not None:
return f
return None
def parse_position_unrealized_pnl(p: dict[str, Any]) -> float | None:
"""四所 ccxt 持仓统一解析未实现盈亏(Gate/OKX/Binance 字段名不一致)。"""
if not isinstance(p, dict):
return None
info = p.get("info") or {}
if not isinstance(info, dict):
info = {}
return _coerce_signed(
p.get("unrealizedPnl"),
p.get("unrealisedPnl"),
p.get("unrealized_pnl"),
p.get("unrealised_pnl"),
info.get("unrealised_pnl"),
info.get("unrealized_pnl"),
info.get("unrealisedPnl"),
info.get("unrealizedPnl"),
info.get("upl"),
info.get("uplLast"),
)
def enrich_ccxt_position_metrics_out(
position: dict[str, Any],
out: dict[str, Any],
*,
contract_size: float = 1.0,
funds_decimals: int = 2,
) -> dict[str, Any]:
"""
四所 parse_ccxt_position_metrics 产出后统一:
- 标记价用 hub 兜底
- 未实现盈亏 = resolve(交易所值, entry/mark/张数/contractSize 推算)
"""
if not isinstance(position, dict) or not isinstance(out, dict):
return out
mark = _finite_or_none(out.get("mark_price"))
if mark is None or mark <= 0:
mp = parse_position_mark_price(position)
if mp is not None and mp > 0:
out["mark_price"] = round(mp, 8)
mark = mp
exchange_upnl = parse_position_unrealized_pnl(position)
if exchange_upnl is None:
exchange_upnl = _coerce_signed(out.get("unrealized_pnl"))
c = position_contracts(position)
if abs(c) < 1e-12:
return out
side = position_side_from_ccxt(position, c)
entry = parse_position_entry_price(position)
cs = contract_size if contract_size and contract_size > 0 else 1.0
upnl = resolve_position_display_upnl(
side, entry, mark, abs(c), cs, exchange_upnl
)
if upnl is not None:
out["unrealized_pnl"] = round(upnl, funds_decimals)
return out
def parse_position_mark_price(p: dict[str, Any]) -> float | None:
"""四所 ccxt 持仓统一解析标记价(与 crypto_monitor_* parse_ccxt_position_metrics 口径一致)。"""
if not isinstance(p, dict):
return None
info = p.get("info") or {}
if not isinstance(info, dict):
info = {}
mark = _coerce_float(
p.get("markPrice"),
p.get("mark_price"),
p.get("mark"),
info.get("markPx"),
info.get("mark_price"),
info.get("markPrice"),
)
if mark is not None:
return mark
contracts = position_contracts(p)
if abs(contracts) >= 1e-12:
notional = _finite_or_none(p.get("notional"))
if notional is not None and abs(notional) > 0:
return abs(notional) / abs(contracts)
return None
def build_position_marks_list(
positions: list,
*,
format_mark_display: Callable[[str, float], str] | None = None,
) -> list[dict[str, Any]]:
"""从 fetch_positions 结果生成 position_marks,供 price_snapshot / 中控合并。"""
out: list[dict[str, Any]] = []
for p in positions or []:
if not isinstance(p, dict):
continue
c = position_contracts(p)
if abs(c) < 1e-12:
continue
mark = parse_position_mark_price(p)
if mark is None or mark <= 0:
continue
sym = (p.get("symbol") or "").strip()
side = position_side_from_ccxt(p, c)
row: dict[str, Any] = {
"symbol": sym,
"side": side,
"mark_price": mark,
}
if format_mark_display and sym:
try:
row["mark_price_display"] = format_mark_display(sym, mark)
except Exception:
row["mark_price_display"] = f"{mark:g}"
else:
row["mark_price_display"] = f"{mark:g}"
out.append(row)
return out