fix: trend preview uses USDT profit, snapshot risk budget, and money RR across four exchanges

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-05 16:20:59 +08:00
parent 31756e838d
commit 32f4eec1d3
6 changed files with 223 additions and 70 deletions
+145 -38
View File
@@ -241,6 +241,48 @@ def calc_take_profit_for_rr(
return None
def calc_risk_budget_usdt(snapshot_usdt: float, risk_percent: float) -> Optional[float]:
"""计划止损金额 U = 可用快照 × 风险比例。"""
try:
snap = float(snapshot_usdt)
rp = float(risk_percent)
if snap <= 0 or rp <= 0:
return None
return round(snap * rp / 100.0, 4)
except (TypeError, ValueError):
return None
def calc_money_reward_risk_ratio(profit_u: float, risk_u: float) -> Optional[float]:
"""金额盈亏比 = 止盈盈利 U / 止损金额 U。"""
try:
r = float(risk_u)
p = float(profit_u)
if r <= 0:
return None
return round(p / r, 4)
except (TypeError, ValueError):
return None
def calc_tp_profit_usdt(
direction: str,
avg_entry: float,
take_profit_price: float,
contracts: float,
contract_size: float = 1.0,
) -> Optional[float]:
"""到达止盈价时,按累计张数与加仓后均价的盈利 U。"""
try:
from hub_position_metrics import estimate_linear_swap_upnl_usdt
return estimate_linear_swap_upnl_usdt(
direction, float(avg_entry), float(take_profit_price), float(contracts), float(contract_size)
)
except (TypeError, ValueError):
return None
def weighted_avg_entry(legs: list[tuple[float, float]]) -> Optional[float]:
"""按 (成交价, 张数) 加权均价。"""
total = 0.0
@@ -262,7 +304,7 @@ def weighted_avg_entry(legs: list[tuple[float, float]]) -> Optional[float]:
def build_trend_preview_level_rows(preview: dict) -> tuple[dict, list[dict]]:
"""
预览:参考价首仓止盈 + 每档补仓后止盈;止损统一为计划止损(加仓后最大止损)
预览:表单止盈价下每档累计持仓的盈利 U;止损金额 = 快照×风险;盈亏比按金额对比
返回 (增强后的 preview 字段, 表格行列表,含首仓行)。
"""
p = dict(preview or {})
@@ -272,16 +314,24 @@ def build_trend_preview_level_rows(preview: dict) -> tuple[dict, list[dict]]:
sl = float(p.get("stop_loss"))
user_tp = float(p.get("take_profit"))
first_amt = float(p.get("first_order_amount"))
snapshot = float(p.get("snapshot_available_usdt"))
risk_percent = float(p.get("risk_percent"))
except (TypeError, ValueError):
return p, []
rr = calc_planned_reward_risk_ratio(direction, ref, sl, user_tp)
if rr is None:
risk_u = calc_risk_budget_usdt(snapshot, risk_percent)
if risk_u is None or risk_u <= 0:
return p, []
first_tp = calc_take_profit_for_rr(direction, ref, sl, rr)
p["preview_target_rr"] = rr
p["preview_first_take_profit"] = first_tp
try:
contract_size = float(p.get("contract_size") or 1.0)
if contract_size <= 0:
contract_size = 1.0
except (TypeError, ValueError):
contract_size = 1.0
p["preview_risk_amount_u"] = risk_u
p["preview_take_profit_price"] = user_tp
p["preview_unified_stop_loss"] = sl
try:
@@ -297,45 +347,81 @@ def build_trend_preview_level_rows(preview: dict) -> tuple[dict, list[dict]]:
except Exception:
leg_amounts = []
rows: list[dict] = [
{
"i": 0,
"label": "首仓",
"price": ref,
"contracts": first_amt,
"avg_entry": ref,
"take_profit": first_tp,
"stop_loss": sl,
"is_first": True,
def _row_dict(
*,
i: int,
label: str,
price: float,
leg_contracts: float,
cum_contracts: float,
avg: float,
is_first: bool,
) -> dict:
profit_u = calc_tp_profit_usdt(direction, avg, user_tp, cum_contracts, contract_size)
rr_money = calc_money_reward_risk_ratio(profit_u, risk_u) if profit_u is not None else None
return {
"i": i,
"label": label,
"price": price,
"contracts": leg_contracts,
"cum_contracts": cum_contracts,
"avg_entry": avg,
"take_profit_price": user_tp,
"profit_u": profit_u,
"risk_u": risk_u,
"rr": rr_money,
"stop_loss_price": sl,
"take_profit": profit_u,
"stop_loss": risk_u,
"is_first": is_first,
}
cum_contracts = first_amt
first_profit = calc_tp_profit_usdt(direction, ref, user_tp, cum_contracts, contract_size)
first_rr = calc_money_reward_risk_ratio(first_profit, risk_u) if first_profit is not None else None
p["preview_first_profit_u"] = first_profit
p["preview_target_rr"] = first_rr
p["preview_first_take_profit"] = user_tp
rows: list[dict] = [
_row_dict(
i=0,
label="首仓",
price=ref,
leg_contracts=first_amt,
cum_contracts=cum_contracts,
avg=ref,
is_first=True,
)
]
accumulated: list[tuple[float, float]] = [(ref, first_amt)]
for i, pair in enumerate(zip(grid, leg_amounts), 1):
try:
price = float(pair[0])
contracts = float(pair[1])
leg_contracts = float(pair[1])
except (TypeError, ValueError):
continue
accumulated.append((price, contracts))
accumulated.append((price, leg_contracts))
avg = weighted_avg_entry(accumulated)
tp_after = calc_take_profit_for_rr(direction, avg, sl, rr) if avg is not None else None
if avg is None:
continue
cum_contracts += leg_contracts
rows.append(
{
"i": i,
"label": f"补仓{i}",
"price": price,
"contracts": contracts,
"avg_entry": avg,
"take_profit": tp_after,
"stop_loss": sl,
"is_first": False,
}
_row_dict(
i=i,
label=f"补仓{i}",
price=price,
leg_contracts=leg_contracts,
cum_contracts=cum_contracts,
avg=avg,
is_first=False,
)
)
return p, rows
def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict]:
"""运行中计划:为 dca_levels 补充加仓后均价、止盈、统一止损"""
"""运行中计划:为 dca_levels 补充加仓后均价、止盈盈利 U、止损金额 U、金额盈亏比"""
if not levels:
return levels
p = plan or {}
@@ -344,9 +430,15 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict
sl = float(p.get("stop_loss"))
user_tp = float(p.get("take_profit"))
first_amt = float(p.get("first_order_amount"))
snapshot = float(p.get("snapshot_available_usdt"))
risk_percent = float(p.get("risk_percent"))
except (TypeError, ValueError):
return levels
risk_u = calc_risk_budget_usdt(snapshot, risk_percent)
if risk_u is None or risk_u <= 0:
return levels
ref_raw = p.get("live_price_ref")
if ref_raw in (None, ""):
ref_raw = p.get("avg_entry_price")
@@ -355,12 +447,16 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict
except (TypeError, ValueError):
return levels
rr = calc_planned_reward_risk_ratio(direction, ref, sl, user_tp)
if rr is None:
return levels
try:
contract_size = float(p.get("contract_size") or 1.0)
if contract_size <= 0:
contract_size = 1.0
except (TypeError, ValueError):
contract_size = 1.0
out: list[dict] = []
accumulated: list[tuple[float, float]] = []
cum_contracts = 0.0
for lv in levels:
row = dict(lv)
is_first = row.get("leg_key") == "first" or row.get("label") == "首仓" or row.get("i") == 0
@@ -371,21 +467,32 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict
except (TypeError, ValueError):
amt_f = first_amt
accumulated = [(ref, amt_f)]
cum_contracts = amt_f
row["avg_entry"] = ref
row["take_profit"] = calc_take_profit_for_rr(direction, ref, sl, rr)
row["stop_loss"] = sl
else:
price = row.get("price")
contracts = row.get("contracts")
if price is not None and contracts is not None:
try:
accumulated.append((float(price), float(contracts)))
leg_contracts = float(contracts)
accumulated.append((float(price), leg_contracts))
avg = weighted_avg_entry(accumulated)
if avg is not None:
row["avg_entry"] = avg
row["take_profit"] = calc_take_profit_for_rr(direction, avg, sl, rr)
row["stop_loss"] = sl
cum_contracts += leg_contracts
except (TypeError, ValueError):
pass
avg_entry = row.get("avg_entry")
if avg_entry is not None:
profit_u = calc_tp_profit_usdt(
direction, float(avg_entry), user_tp, cum_contracts, contract_size
)
row["take_profit_price"] = user_tp
row["profit_u"] = profit_u
row["risk_u"] = risk_u
row["rr"] = calc_money_reward_risk_ratio(profit_u, risk_u) if profit_u is not None else None
row["take_profit"] = profit_u
row["stop_loss"] = risk_u
row["stop_loss_price"] = sl
out.append(row)
return out