fix(trend): align hub and four-exchange trend plan display

Unify gate_bot with shared enrich_trend_plan for strategy pages and hub monitor, reconcile DCA avg with live entry price, and fix missing fill price display.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-07 18:05:30 +08:00
parent e5576eaaed
commit 6a4ec69dba
7 changed files with 287 additions and 127 deletions
+10 -106
View File
@@ -3959,40 +3959,6 @@ def calc_trend_manual_breakeven_stop(direction, entry_price, offset_pct=None):
return e * (1.0 + pct / 100.0)
def enrich_active_trend_plan_row(row):
d = row_to_dict(row)
try:
d["breakeven_applied"] = int(d.get("breakeven_applied") or 0) != 0
except Exception:
d["breakeven_applied"] = False
ex_sym = d.get("exchange_symbol") or normalize_exchange_symbol(d.get("symbol") or "")
direction = (d.get("direction") or "long").lower()
m = get_live_position_exchange_metrics(ex_sym, direction)
if m and m.get("unrealized_pnl") is not None:
d["floating_pnl"] = float(m["unrealized_pnl"])
else:
d["floating_pnl"] = None
if m and m.get("mark_price") is not None:
d["floating_mark"] = float(m["mark_price"])
else:
d["floating_mark"] = None
try:
d["contract_size"] = float(get_contract_size(ex_sym))
except (TypeError, ValueError):
pass
from strategy_snapshot_lib import attach_trend_dca_levels
from strategy_trend_lib import calc_trend_plan_money_metrics
d = attach_trend_dca_levels(d)
money = calc_trend_plan_money_metrics(d)
if money.get("money_rr") is not None:
d["money_rr"] = money["money_rr"]
d["planned_rr"] = money["money_rr"]
if money.get("risk_amount_u") is not None:
d["risk_amount_u"] = money["risk_amount_u"]
return d
def opened_at_str_to_ms(opened_at_str):
if not opened_at_str:
return None
@@ -5412,30 +5378,6 @@ def render_main_page(page="trade"):
trend_active = conn.execute(
"SELECT COUNT(*) FROM trend_pullback_plans WHERE status='active'"
).fetchone()[0]
trend_plans_raw = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC"
).fetchall()
trend_plans = []
trend_dca_probes = []
_trend_cfg = app.extensions.get("strategy_trend_cfg")
for r in trend_plans_raw:
try:
enriched = enrich_active_trend_plan_row(r)
trend_plans.append(enriched)
except Exception as e:
print(f"[render_main_page] enrich trend plan: {e}")
enriched = row_to_dict(r)
trend_plans.append(enriched)
if _trend_cfg and page in ("strategy", "strategy_trend", "strategy_roll"):
try:
from strategy_trend_register import summarize_trend_dca_probe
probe = summarize_trend_dca_probe(_trend_cfg, r)
trend_dca_probes.append(probe)
if isinstance(enriched, dict):
enriched["dca_probe"] = probe
except Exception as e:
print(f"[render_main_page] trend dca probe: {e}")
preview_snapshots = []
if page == "records":
try:
@@ -5456,49 +5398,17 @@ def render_main_page(page="trade"):
and active_count < MAX_ACTIVE_POSITIONS
and int(trend_active or 0) == 0
)
trend_preview = None
trend_preview_levels = []
preview_expires_ms = None
trend_preview_expired = False
trend_preview_id_arg = ""
if page in ("strategy", "strategy_trend", "strategy_roll"):
_trend_cleanup_stale_previews(conn)
if page in ("strategy", "strategy_trend"):
trend_preview_id_arg = (request.args.get("preview_id") or "").strip()
if trend_preview_id_arg:
pr = conn.execute(
"SELECT * FROM trend_pullback_previews WHERE id=?",
(trend_preview_id_arg,),
).fetchone()
now_ms = int(time.time() * 1000)
if pr and int(pr["expires_at_ms"] or 0) >= now_ms:
from strategy_trend_lib import build_trend_preview_level_rows
trend_preview = row_to_dict(pr)
preview_expires_ms = int(pr["expires_at_ms"])
if not trend_preview.get("contract_size"):
try:
ensure_markets_loaded()
ex_sym = trend_preview.get("exchange_symbol") or trend_preview.get("symbol")
mk = exchange.market(ex_sym)
trend_preview["contract_size"] = float(mk.get("contractSize") or 1)
except Exception:
pass
trend_preview, trend_preview_levels = build_trend_preview_level_rows(trend_preview)
elif pr:
trend_preview_expired = True
strategy_extra = {}
if page == "strategy_records":
if page in ("strategy", "strategy_trend", "strategy_roll", "strategy_records"):
from strategy_ui import strategy_render_extras
strategy_extra = strategy_render_extras(conn, page)
elif page in ("strategy", "strategy_trend", "strategy_roll"):
from strategy_ui import fetch_roll_page_data
strategy_extra = fetch_roll_page_data(
strategy_extra = strategy_render_extras(
conn,
page,
default_risk_percent=float(RISK_PERCENT),
count_active_trends=lambda c, ta=trend_active: int(ta or 0),
request_obj=request,
trend_cfg=app.extensions.get("strategy_trend_cfg"),
)
orphan_positions: list = []
if page == "trade":
@@ -5540,20 +5450,9 @@ def render_main_page(page="trade"):
max_active_positions=MAX_ACTIVE_POSITIONS,
manual_min_planned_rr=MANUAL_MIN_PLANNED_RR,
can_trade=can_trade,
trend_plans=trend_plans,
trend_dca_probes=trend_dca_probes,
live_trading_enabled=LIVE_TRADING_ENABLED,
preview_snapshots=preview_snapshots,
exchange_sync_from_label=(EXCHANGE_POSITION_SYNC_FROM_BJ or "最近90天"),
trend_pullback_dca_legs=TREND_PULLBACK_DCA_LEGS,
trend_pullback_preview_ttl=TREND_PULLBACK_PREVIEW_TTL_SECONDS,
trend_preview=trend_preview,
trend_preview_levels=trend_preview_levels,
preview_expires_ms=preview_expires_ms,
trend_preview_expired=trend_preview_expired,
trend_preview_id_arg=trend_preview_id_arg,
trend_preview_max_drift_pct=TREND_PREVIEW_MAX_BALANCE_DRIFT_PCT,
trend_manual_breakeven_offset_pct=TREND_PULLBACK_MANUAL_BREAKEVEN_OFFSET_PCT,
list_window=list_window,
list_window_presets={
"utc_today": PRESET_UTC_TODAY,
@@ -8032,6 +7931,11 @@ try:
},
ohlcv_fn=_hub_fetch_ohlcv,
)
from strategy_trend_register import build_trend_config, patch_trend_hub_enrich
_hub_trend_cfg = build_trend_config(sys.modules[__name__])
app.extensions["strategy_trend_cfg"] = _hub_trend_cfg
patch_trend_hub_enrich(app, _hub_trend_cfg)
except Exception as _hub_err:
print(f"[hub_bridge] gate_bot: {_hub_err}")
+2
View File
@@ -179,6 +179,8 @@ def enrich_ccxt_position_metrics_out(
return out
side = position_side_from_ccxt(position, c)
entry = parse_position_entry_price(position)
if entry is not None and entry > 0:
out["entry_price"] = round(entry, 8)
cs = contract_size if contract_size and contract_size > 0 else 1.0
upnl = resolve_position_display_upnl(
side, entry, mark, abs(c), cs, exchange_upnl
+7 -5
View File
@@ -391,11 +391,13 @@
: null;
if (exchange != null && !Number.isFinite(exchange)) exchange = null;
const entry =
p.entry_price != null && p.entry_price !== ""
? Number(p.entry_price)
: t.trigger_price != null
? Number(t.trigger_price)
: null;
t.avg_entry_price != null && t.avg_entry_price !== ""
? Number(t.avg_entry_price)
: p.entry_price != null && p.entry_price !== ""
? Number(p.entry_price)
: t.trigger_price != null
? Number(t.trigger_price)
: null;
let mark =
markOverride != null && Number.isFinite(Number(markOverride))
? Number(markOverride)
+117 -6
View File
@@ -359,6 +359,116 @@ def append_leg_fill_price_json(existing_json: str | None, fill_px: float) -> str
return json.dumps(fills, ensure_ascii=False, separators=(",", ":"))
def _trend_leg_contracts(
leg_idx: int, first_amt: float, leg_amounts: list[float]
) -> float:
if leg_idx == 0:
return float(first_amt)
li = leg_idx - 1
if 0 <= li < len(leg_amounts):
return float(leg_amounts[li])
return 0.0
def _infer_trend_fill_from_target_avg(
leg_idx: int,
prices: list[float],
*,
first_amt: float,
leg_amounts: list[float],
legs_done: int,
target_avg: float,
) -> float:
"""已知其余档位成交价时,反推单档成交价使加权均价等于 target_avg。"""
total_contracts = 0.0
known_cost = 0.0
unknown_amt = _trend_leg_contracts(leg_idx, first_amt, leg_amounts)
for i in range(legs_done + 1):
amt = _trend_leg_contracts(i, first_amt, leg_amounts)
if amt <= 0:
continue
total_contracts += amt
if i == leg_idx:
continue
known_cost += float(prices[i]) * amt
if unknown_amt <= 0 or total_contracts <= 0:
return float(prices[leg_idx])
return (float(target_avg) * total_contracts - known_cost) / unknown_amt
def reconcile_trend_leg_fill_prices(plan: dict) -> list[float]:
"""
首仓(0)+已补仓(1..legs_done) 成交价
优先 leg_fill_prices_json缺口用计划网格价再对齐 avg_entry_price
"""
p = plan or {}
if int(p.get("first_order_done") or 0) == 0:
return []
try:
legs_done = int(p.get("legs_done") or 0)
except (TypeError, ValueError):
legs_done = 0
try:
first_amt = float(p.get("first_order_amount"))
except (TypeError, ValueError):
first_amt = 0.0
try:
target_avg = float(p.get("avg_entry_price"))
except (TypeError, ValueError):
target_avg = None
fills = parse_leg_fill_prices(p)
try:
grid = [float(x) for x in json.loads(p.get("grid_prices_json") or "[]")]
except Exception:
grid = []
try:
leg_amounts = [float(x) for x in json.loads(p.get("leg_amounts_json") or "[]")]
except Exception:
leg_amounts = []
def _default_px(leg_idx: int) -> float:
if leg_idx == 0:
if target_avg is not None and legs_done == 0:
return target_avg
try:
return float(p.get("avg_entry_price"))
except (TypeError, ValueError):
pass
try:
ref = p.get("live_price_ref")
if ref not in (None, ""):
return float(ref)
except (TypeError, ValueError):
pass
return 0.0
gi = leg_idx - 1
if 0 <= gi < len(grid):
return float(grid[gi])
return _default_px(0)
result: list[float] = []
estimated: list[int] = []
for leg_idx in range(legs_done + 1):
if len(fills) > leg_idx:
result.append(float(fills[leg_idx]))
else:
result.append(_default_px(leg_idx))
estimated.append(leg_idx)
if target_avg is not None and estimated:
adjust_idx = estimated[0] if len(estimated) == 1 else estimated[-1]
result[adjust_idx] = _infer_trend_fill_from_target_avg(
adjust_idx,
result,
first_amt=first_amt,
leg_amounts=leg_amounts,
legs_done=legs_done,
target_avg=target_avg,
)
return result
def calc_trend_plan_money_metrics(plan: dict) -> dict:
"""运行中计划头部:按快照风险金额计算盈亏比(止盈盈利 U / 风险 U)。"""
out = {"money_rr": None, "risk_amount_u": None}
@@ -526,12 +636,12 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict
if risk_u is None or risk_u <= 0:
return levels
fills = parse_leg_fill_prices(p)
try:
legs_done = int(p.get("legs_done") or 0)
except (TypeError, ValueError):
legs_done = 0
first_done = int(p.get("first_order_done") or 0) != 0
reconciled_fills = reconcile_trend_leg_fill_prices(p)
ref_raw = p.get("live_price_ref")
if ref_raw in (None, ""):
@@ -561,8 +671,9 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict
except (TypeError, ValueError):
amt_f = first_amt
if first_done:
fill_px = fills[0] if fills else None
if fill_px is None:
if reconciled_fills:
fill_px = float(reconciled_fills[0])
else:
try:
fill_px = float(p.get("avg_entry_price") or ref)
except (TypeError, ValueError):
@@ -571,6 +682,7 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict
cum_contracts = amt_f
row_cum = cum_contracts
row["avg_entry"] = float(fill_px)
row["price"] = fill_px
else:
accumulated = [(ref, amt_f)]
cum_contracts = amt_f
@@ -592,9 +704,8 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict
leg_contracts = 0.0
done = row.get("status") == "done" or (leg_num > 0 and leg_num <= legs_done)
if done and leg_contracts > 0:
fill_idx = leg_num
if len(fills) > fill_idx:
fill_px = float(fills[fill_idx])
if leg_num < len(reconciled_fills):
fill_px = float(reconciled_fills[leg_num])
elif grid_trigger_f is not None:
fill_px = grid_trigger_f
else:
+45 -10
View File
@@ -456,11 +456,21 @@ def _trend_add_leg_fields(cfg: dict, d: dict) -> dict:
except Exception:
grid = []
add_prices: list[float] = []
for x in grid[:legs_done]:
try:
add_prices.append(float(x))
except (TypeError, ValueError):
pass
try:
from strategy_trend_lib import reconcile_trend_leg_fill_prices
fills = reconcile_trend_leg_fill_prices(out)
for i in range(1, legs_done + 1):
if i < len(fills):
add_prices.append(float(fills[i]))
except Exception:
fills = []
if not add_prices:
for x in grid[:legs_done]:
try:
add_prices.append(float(x))
except (TypeError, ValueError):
pass
sym = out.get("exchange_symbol") or out.get("symbol") or ""
out["add_count"] = legs_done
out["add_count_total"] = dca_legs
@@ -504,6 +514,11 @@ def _patch_hub_trend_views(app: Flask) -> None:
app.config["HUB_CTX"] = ctx
def patch_trend_hub_enrich(app: Flask, cfg: dict) -> None:
"""hub_bridge install 之后调用:四所 /api/hub/monitor 趋势字段与策略页一致。"""
_patch_hub_monitor_enrich(app, cfg)
def _patch_hub_monitor_enrich(app: Flask, cfg: dict) -> None:
ctx = dict(app.config.get("HUB_CTX") or {})
prev = ctx.get("enrich_monitor")
@@ -530,7 +545,6 @@ def _patch_hub_monitor_enrich(app: Flask, cfg: dict) -> None:
def enrich_trend_plan(cfg: dict, row) -> dict:
m = _m(cfg)
d = _row(cfg, row)
d = _trend_add_leg_fields(cfg, d)
try:
d["breakeven_applied"] = int(d.get("breakeven_applied") or 0) != 0
except Exception:
@@ -538,6 +552,7 @@ def enrich_trend_plan(cfg: dict, row) -> dict:
ex_sym = d.get("exchange_symbol") or m.normalize_exchange_symbol(d.get("symbol") or "")
direction = (d.get("direction") or "long").lower()
metrics_fn = getattr(m, "get_live_position_exchange_metrics", None)
met = None
if callable(metrics_fn):
try:
lev = int(d.get("leverage") or 0) or None
@@ -547,6 +562,13 @@ def enrich_trend_plan(cfg: dict, row) -> dict:
met = metrics_fn(ex_sym, direction, order_leverage=lev)
except TypeError:
met = metrics_fn(ex_sym, direction)
if met and met.get("entry_price") is not None:
try:
live_entry = float(met["entry_price"])
if live_entry > 0:
d["avg_entry_price"] = live_entry
except (TypeError, ValueError):
pass
if met and met.get("unrealized_pnl") is not None:
d["floating_pnl"] = float(met["unrealized_pnl"])
elif (
@@ -587,6 +609,7 @@ def enrich_trend_plan(cfg: dict, row) -> dict:
d["contract_size"] = float(get_cs(ex_sym))
except (TypeError, ValueError):
pass
d = _trend_add_leg_fields(cfg, d)
from strategy_snapshot_lib import attach_trend_dca_levels
from strategy_trend_lib import calc_trend_plan_money_metrics
@@ -1256,13 +1279,24 @@ def load_trend_page_context(conn, request_obj, cfg: dict) -> dict[str, Any]:
or 0
)
trend_plans = []
for r in conn.execute(
trend_dca_probes = []
raw_plans = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC"
).fetchall():
).fetchall()
for r in raw_plans:
try:
trend_plans.append(enrich_trend_plan(cfg, r))
enriched = enrich_trend_plan(cfg, r)
trend_plans.append(enriched)
except Exception:
trend_plans.append(_row(cfg, r))
enriched = _row(cfg, r)
trend_plans.append(enriched)
try:
probe = summarize_trend_dca_probe(cfg, r)
trend_dca_probes.append(probe)
if isinstance(enriched, dict):
enriched["dca_probe"] = probe
except Exception:
pass
now = m.app_now()
active_count = m.get_active_position_count(conn)
can_trade_trend = (
@@ -1298,6 +1332,7 @@ def load_trend_page_context(conn, request_obj, cfg: dict) -> dict[str, Any]:
trend_preview_expired = True
return {
"trend_plans": trend_plans,
"trend_dca_probes": trend_dca_probes,
"trend_active": trend_active,
"can_trade_trend": can_trade_trend,
"trend_preview": trend_preview,
+16
View File
@@ -62,6 +62,22 @@ class TestTrendDcaEnrichFills(unittest.TestCase):
self.assertEqual(dca2["status"], "pending")
self.assertAlmostEqual(dca2["price"], 0.343, places=4)
def test_missing_dca_fills_align_last_avg_with_header(self):
"""缺补仓成交价时,末档加仓后均价应对齐计划头部 avg_entry_price。"""
plan = self._base_plan(
legs_done=2,
avg_entry_price=0.3507,
order_amount_open=161,
leg_fill_prices_json=json.dumps([0.3436]),
grid_prices_json=json.dumps([0.343, 0.343, 0.3395, 0.336, 0.3325]),
)
enriched = attach_trend_dca_levels(plan)
levels = enriched["dca_levels"]
dca2 = levels[2]
self.assertEqual(dca2["status"], "done")
self.assertAlmostEqual(dca2["avg_entry"], 0.3507, places=4)
self.assertGreater(dca2["price"], 0.343)
if __name__ == "__main__":
unittest.main()
+90
View File
@@ -0,0 +1,90 @@
"""四所趋势 enrich:实例与中控 monitor 字段一致。"""
from __future__ import annotations
import json
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from strategy_trend_register import ( # noqa: E402
enrich_trend_plan,
enrich_trend_plan_for_hub,
)
class _FakeModule:
@staticmethod
def normalize_exchange_symbol(sym):
return sym
@staticmethod
def ensure_markets_loaded():
return None
@staticmethod
def get_live_position_exchange_metrics(ex_sym, direction, order_leverage=None):
return {
"entry_price": 0.3507,
"mark_price": 0.3431,
"unrealized_pnl": -1.23,
}
@staticmethod
def get_contract_size(_ex_sym):
return 1.0
class TestTrendHubEnrichUnified(unittest.TestCase):
def _cfg(self):
return {
"app_module": _FakeModule(),
"breakeven_offset_pct": 0.3,
"row_to_dict": lambda row: dict(row) if not isinstance(row, dict) else row,
}
def _plan_row(self):
return {
"id": 4,
"symbol": "ONDO/USDT:USDT",
"exchange_symbol": "ONDO/USDT:USDT",
"direction": "long",
"stop_loss": 0.329,
"take_profit": 0.476,
"add_upper": 0.35,
"first_order_amount": 115,
"snapshot_available_usdt": 97.98,
"risk_percent": 5,
"contract_size": 1.0,
"grid_prices_json": json.dumps([0.343, 0.343, 0.3395, 0.336, 0.3325]),
"leg_amounts_json": json.dumps([23, 23, 23, 23, 23]),
"dca_legs": 5,
"first_order_done": 1,
"legs_done": 2,
"avg_entry_price": 0.3434,
"order_amount_open": 161,
"leg_fill_prices_json": json.dumps([0.3436]),
"leverage": 10,
"plan_margin_capital": 8.17,
}
def test_hub_and_page_share_live_avg_and_dca_levels(self):
cfg = self._cfg()
row = self._plan_row()
page = enrich_trend_plan(cfg, row)
hub = enrich_trend_plan_for_hub(cfg, row)
self.assertAlmostEqual(page["avg_entry_price"], 0.3507, places=4)
self.assertAlmostEqual(hub["avg_entry_price"], 0.3507, places=4)
self.assertIn("dca_levels", page)
self.assertIn("dca_levels", hub)
last_done = hub["dca_levels"][2]
self.assertEqual(last_done["status"], "done")
self.assertAlmostEqual(last_done["avg_entry"], 0.3507, places=4)
self.assertEqual(hub.get("monitor_source"), "趋势回调计划")
self.assertEqual(hub.get("add_count"), 2)
if __name__ == "__main__":
unittest.main()