Add roll leg avg/TP profit display and reduce instance nav flicker

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-23 23:42:02 +08:00
parent 54ba412d1d
commit e03863d780
12 changed files with 530 additions and 24 deletions
+402
View File
@@ -0,0 +1,402 @@
"""顺势加仓 UI:滚仓腿合并均价与止盈盈利展示(实例页 + 中控)。"""
from __future__ import annotations
from typing import Any, Callable, Optional
from flask import Flask
FILLED_LEG_STATUSES = frozenset({"filled", "done", "complete"})
def reward_at_tp_usdt(
direction: str,
avg_entry: float,
take_profit: float,
qty: float,
*,
contract_size: float = 1.0,
) -> Optional[float]:
"""与 strategy_roll_lib.preview_roll 一致:线性合约 U 本位盈利。"""
try:
avg = float(avg_entry)
tp = float(take_profit)
q = float(qty)
cs = float(contract_size or 1.0)
except (TypeError, ValueError):
return None
if avg <= 0 or tp <= 0 or q <= 0:
return None
direction = (direction or "long").strip().lower()
if direction == "short":
return (avg - tp) * q * cs
return (tp - avg) * q * cs
def leg_fill_price(leg: dict) -> Optional[float]:
if not isinstance(leg, dict):
return None
for key in ("fill_price", "limit_price"):
try:
v = float(leg.get(key) or 0)
if v > 0:
return v
except (TypeError, ValueError):
continue
return None
def leg_is_filled(leg: dict) -> bool:
st = str(leg.get("status") or "").strip().lower()
return st in FILLED_LEG_STATUSES
def infer_initial_position(
qty_live: float,
entry_live: float,
filled_legs: list[dict],
*,
monitor: dict | None = None,
) -> tuple[Optional[float], Optional[float]]:
"""由当前持仓与各腿成交价反推首仓张数/均价。"""
try:
qty_live = float(qty_live)
entry_live = float(entry_live)
except (TypeError, ValueError):
qty_live = entry_live = 0.0
legs = [
lg
for lg in filled_legs or []
if isinstance(lg, dict) and leg_is_filled(lg) and leg_fill_price(lg) and float(lg.get("amount") or 0) > 0
]
add_sum = sum(float(lg.get("amount") or 0) for lg in legs)
leg_notional = sum(float(lg.get("amount") or 0) * float(leg_fill_price(lg) or 0) for lg in legs)
q0 = qty_live - add_sum
if q0 > 1e-12 and entry_live > 0 and qty_live > 0:
e0 = (entry_live * qty_live - leg_notional) / q0
if e0 > 0:
return q0, e0
mon = monitor if isinstance(monitor, dict) else {}
try:
trig = float(mon.get("trigger_price") or 0)
except (TypeError, ValueError):
trig = 0.0
try:
mon_amt = float(mon.get("order_amount") or mon.get("amount") or 0)
except (TypeError, ValueError):
mon_amt = 0.0
if trig > 0:
q_base = q0 if q0 > 1e-12 else (mon_amt if mon_amt > 0 else max(qty_live - add_sum, 0))
if q_base > 0:
return q_base, trig
return None, None
def compute_roll_chain_metrics(
group: dict,
legs: list[dict],
*,
qty_live: Optional[float] = None,
entry_live: Optional[float] = None,
monitor: dict | None = None,
contract_size: float = 1.0,
) -> tuple[dict[Any, dict], dict]:
"""
返回 (leg_metrics_by_id, group_metrics)。
leg_metrics: leg id -> {avg_entry_after, reward_at_tp_usdt}
group_metrics: 最后一腿后的 {avg_entry, reward_at_tp_usdt}
"""
per_leg: dict[Any, dict] = {}
group_out: dict[str, Any] = {"avg_entry": None, "reward_at_tp_usdt": None}
if not isinstance(group, dict):
return per_leg, group_out
direction = (group.get("direction") or "long").strip().lower()
try:
tp = float(group.get("initial_take_profit") or 0)
except (TypeError, ValueError):
tp = 0.0
sorted_legs = sorted(
[lg for lg in legs or [] if isinstance(lg, dict)],
key=lambda x: int(x.get("leg_index") or 0),
)
filled = [lg for lg in sorted_legs if leg_is_filled(lg)]
q0 = e0 = None
if qty_live is not None and entry_live is not None:
q0, e0 = infer_initial_position(float(qty_live), float(entry_live), filled, monitor=monitor)
if q0 is None or e0 is None:
return per_leg, group_out
qty = float(q0)
avg = float(e0)
if tp > 0:
group_out["avg_entry"] = avg
group_out["reward_at_tp_usdt"] = reward_at_tp_usdt(
direction, avg, tp, qty, contract_size=contract_size
)
for leg in sorted_legs:
if not leg_is_filled(leg):
continue
try:
amt = float(leg.get("amount") or 0)
except (TypeError, ValueError):
continue
px = leg_fill_price(leg)
if not px or amt <= 0:
continue
prev_qty = qty
qty = prev_qty + amt
avg = (prev_qty * avg + amt * px) / qty
reward = reward_at_tp_usdt(direction, avg, tp, qty, contract_size=contract_size) if tp > 0 else None
lid = leg.get("id")
if lid is None:
lid = f"{group.get('id')}|{leg.get('leg_index')}"
per_leg[lid] = {
"avg_entry_after": round(avg, 10),
"reward_at_tp_usdt": round(reward, 4) if reward is not None else None,
}
group_out["avg_entry"] = round(avg, 10)
group_out["reward_at_tp_usdt"] = round(reward, 4) if reward is not None else None
return per_leg, group_out
def _row_to_dict(row) -> dict:
if row is None:
return {}
try:
return dict(row)
except Exception:
return {}
def _resolve_roll_live(cfg: dict, group: dict, monitor: dict | None) -> tuple[Optional[float], Optional[float], float]:
"""读取交易所持仓张数、均价、contract_size。"""
m = cfg.get("app_module")
ex_sym = group.get("exchange_symbol")
sym = group.get("symbol") or ""
direction = (group.get("direction") or "long").strip().lower()
if not ex_sym and m is not None:
norm = getattr(m, "normalize_exchange_symbol", None)
if callable(norm):
try:
ex_sym = norm(sym)
except Exception:
ex_sym = sym
cs = 1.0
get_cs = cfg.get("get_contract_size")
if not callable(get_cs) and m is not None:
get_cs = getattr(m, "get_contract_size", None)
if callable(get_cs):
try:
cs = float(get_cs(ex_sym or sym) or 1.0)
except Exception:
cs = 1.0
get_pos = cfg.get("get_position")
if not callable(get_pos):
return None, None, cs
try:
pos = get_pos(ex_sym or sym, direction) or {}
qty = float(pos.get("contracts") or 0)
entry = float(pos.get("entry_price") or 0)
if qty > 0 and entry > 0:
return qty, entry, cs
except Exception:
pass
metrics_fn = getattr(m, "get_live_position_exchange_metrics", None) if m else None
if callable(metrics_fn):
try:
met = metrics_fn(ex_sym or sym, direction)
if isinstance(met, dict):
qty = float(met.get("contracts") or met.get("size") or 0)
entry = float(met.get("entry_price") or 0)
if qty > 0 and entry > 0:
return qty, entry, cs
except Exception:
pass
if monitor:
try:
trig = float(monitor.get("trigger_price") or 0)
amt = float(monitor.get("order_amount") or monitor.get("amount") or 0)
if trig > 0 and amt > 0:
return amt, trig, cs
except (TypeError, ValueError):
pass
return None, None, cs
def enrich_roll_page_data(conn, page_data: dict, cfg: dict | None) -> dict:
"""为 roll_groups / roll_legs 附加 avg_entry、reward_at_tp 展示字段。"""
if not isinstance(page_data, dict) or not cfg:
return page_data
groups = list(page_data.get("roll_groups") or [])
legs = list(page_data.get("roll_legs") or [])
if not groups:
return page_data
monitors_by_id: dict[int, dict] = {}
try:
for row in conn.execute("SELECT * FROM order_monitors WHERE status='active'").fetchall():
od = _row_to_dict(row)
mid = od.get("id")
if mid is not None:
monitors_by_id[int(mid)] = od
except Exception:
pass
legs_by_gid: dict[int, list] = {}
for leg in legs:
if not isinstance(leg, dict):
continue
try:
gid = int(leg.get("roll_group_id"))
except (TypeError, ValueError):
continue
legs_by_gid.setdefault(gid, []).append(leg)
price_fmt = cfg.get("price_fmt")
for g in groups:
if not isinstance(g, dict) or g.get("id") is None:
continue
gid = int(g["id"])
mon = monitors_by_id.get(int(g.get("order_monitor_id") or 0))
qty, entry, cs = _resolve_roll_live(cfg, g, mon)
per_leg, group_metrics = compute_roll_chain_metrics(
g,
legs_by_gid.get(gid, []),
qty_live=qty,
entry_live=entry,
monitor=mon,
contract_size=cs,
)
g["avg_entry"] = group_metrics.get("avg_entry")
g["reward_at_tp_usdt"] = group_metrics.get("reward_at_tp_usdt")
if callable(price_fmt) and g.get("avg_entry") is not None:
try:
g["avg_entry_display"] = price_fmt(g.get("symbol"), g["avg_entry"])
except Exception:
pass
for leg in legs_by_gid.get(gid, []):
lid = leg.get("id")
if lid is None:
lid = f"{gid}|{leg.get('leg_index')}"
metrics = per_leg.get(lid) or per_leg.get(leg.get("id"))
if not metrics:
continue
leg["avg_entry_after"] = metrics.get("avg_entry_after")
leg["reward_at_tp_usdt"] = metrics.get("reward_at_tp_usdt")
if callable(price_fmt) and leg.get("avg_entry_after") is not None:
try:
leg["avg_entry_display"] = price_fmt(g.get("symbol"), leg["avg_entry_after"])
except Exception:
pass
page_data["roll_groups"] = groups
page_data["roll_legs"] = legs
return page_data
def enrich_roll_groups_for_hub(rolls: list[dict], conn, cfg: dict | None) -> list[dict]:
"""中控 monitor API:每组附带当前均价、止盈盈利与最近滚仓腿。"""
if not rolls or not cfg:
return rolls
out = []
gid_list = []
for g in rolls:
if isinstance(g, dict) and g.get("id") is not None:
try:
gid_list.append(int(g["id"]))
except (TypeError, ValueError):
pass
legs_by_gid: dict[int, list] = {gid: [] for gid in gid_list}
if gid_list:
placeholders = ",".join("?" for _ in gid_list)
try:
rows = conn.execute(
f"SELECT * FROM roll_legs WHERE roll_group_id IN ({placeholders}) ORDER BY id DESC",
gid_list,
).fetchall()
for row in rows:
leg = _row_to_dict(row)
try:
legs_by_gid[int(leg.get("roll_group_id"))].append(leg)
except (TypeError, ValueError):
pass
except Exception:
pass
monitors_by_id: dict[int, dict] = {}
try:
for row in conn.execute("SELECT * FROM order_monitors WHERE status='active'").fetchall():
od = _row_to_dict(row)
if od.get("id") is not None:
monitors_by_id[int(od["id"])] = od
except Exception:
pass
price_fmt = cfg.get("price_fmt")
for g in rolls:
if not isinstance(g, dict):
continue
gd = dict(g)
try:
gid = int(gd.get("id"))
except (TypeError, ValueError):
out.append(gd)
continue
mon = monitors_by_id.get(int(gd.get("order_monitor_id") or 0))
group_legs = legs_by_gid.get(gid, [])
qty, entry, cs = _resolve_roll_live(cfg, gd, mon)
per_leg, group_metrics = compute_roll_chain_metrics(
gd,
group_legs,
qty_live=qty,
entry_live=entry,
monitor=mon,
contract_size=cs,
)
gd.update(group_metrics)
if callable(price_fmt) and gd.get("avg_entry") is not None:
try:
gd["avg_entry_display"] = price_fmt(gd.get("symbol"), gd["avg_entry"])
except Exception:
pass
recent = []
for leg in sorted(group_legs, key=lambda x: int(x.get("leg_index") or 0), reverse=True)[:6]:
ld = dict(leg)
lid = ld.get("id")
if lid is None:
lid = f"{gid}|{ld.get('leg_index')}"
metrics = per_leg.get(lid) or per_leg.get(ld.get("id"))
if metrics:
ld.update(metrics)
if callable(price_fmt) and ld.get("avg_entry_after") is not None:
try:
ld["avg_entry_display"] = price_fmt(gd.get("symbol"), ld["avg_entry_after"])
except Exception:
pass
recent.append(ld)
gd["recent_legs"] = recent
out.append(gd)
return out
def patch_roll_hub_enrich(app: Flask, cfg: dict) -> None:
"""hub_bridge install 后:/api/hub/monitor 的 rolls 附带均价/止盈盈利。"""
ctx = dict(app.config.get("HUB_CTX") or {})
prev: Callable | None = ctx.get("enrich_monitor")
def enrich_monitor(keys=None, orders=None, trends=None, rolls=None):
payload: dict[str, Any] = {}
if callable(prev):
try:
prev_out = prev(keys=keys, orders=orders, trends=trends, rolls=rolls)
if isinstance(prev_out, dict):
payload.update(prev_out)
except Exception:
pass
if rolls:
get_db = cfg.get("get_db")
if callable(get_db):
conn = get_db()
try:
payload["rolls"] = enrich_roll_groups_for_hub(list(rolls), conn, cfg)
finally:
try:
conn.close()
except Exception:
pass
return payload
ctx["enrich_monitor"] = enrich_monitor
app.config["HUB_CTX"] = ctx