From 6a4ec69dba2d8b916a7a515af29467f2c22b7432 Mon Sep 17 00:00:00 2001 From: dekun Date: Sun, 7 Jun 2026 18:05:30 +0800 Subject: [PATCH] fix(trend): align hub and four-exchange trend plan display Unify gate_bot with shared enrich_trend_plan for strategy pages and hub monitor, reconcile DCA avg with live entry price, and fix missing fill price display. Co-authored-by: Cursor --- crypto_monitor_gate_bot/app.py | 116 ++--------------------- hub_position_metrics.py | 2 + manual_trading_hub/static/app.js | 12 ++- strategy_trend_lib.py | 123 +++++++++++++++++++++++-- strategy_trend_register.py | 55 +++++++++-- tests/test_trend_dca_enrich_fills.py | 16 ++++ tests/test_trend_hub_enrich_unified.py | 90 ++++++++++++++++++ 7 files changed, 287 insertions(+), 127 deletions(-) create mode 100644 tests/test_trend_hub_enrich_unified.py diff --git a/crypto_monitor_gate_bot/app.py b/crypto_monitor_gate_bot/app.py index 7d34b73..966732c 100644 --- a/crypto_monitor_gate_bot/app.py +++ b/crypto_monitor_gate_bot/app.py @@ -3959,40 +3959,6 @@ def calc_trend_manual_breakeven_stop(direction, entry_price, offset_pct=None): return e * (1.0 + pct / 100.0) -def enrich_active_trend_plan_row(row): - d = row_to_dict(row) - try: - d["breakeven_applied"] = int(d.get("breakeven_applied") or 0) != 0 - except Exception: - d["breakeven_applied"] = False - ex_sym = d.get("exchange_symbol") or normalize_exchange_symbol(d.get("symbol") or "") - direction = (d.get("direction") or "long").lower() - m = get_live_position_exchange_metrics(ex_sym, direction) - if m and m.get("unrealized_pnl") is not None: - d["floating_pnl"] = float(m["unrealized_pnl"]) - else: - d["floating_pnl"] = None - if m and m.get("mark_price") is not None: - d["floating_mark"] = float(m["mark_price"]) - else: - d["floating_mark"] = None - try: - d["contract_size"] = float(get_contract_size(ex_sym)) - except (TypeError, ValueError): - pass - from strategy_snapshot_lib import attach_trend_dca_levels - from strategy_trend_lib import calc_trend_plan_money_metrics - - d = attach_trend_dca_levels(d) - money = calc_trend_plan_money_metrics(d) - if money.get("money_rr") is not None: - d["money_rr"] = money["money_rr"] - d["planned_rr"] = money["money_rr"] - if money.get("risk_amount_u") is not None: - d["risk_amount_u"] = money["risk_amount_u"] - return d - - def opened_at_str_to_ms(opened_at_str): if not opened_at_str: return None @@ -5412,30 +5378,6 @@ def render_main_page(page="trade"): trend_active = conn.execute( "SELECT COUNT(*) FROM trend_pullback_plans WHERE status='active'" ).fetchone()[0] - trend_plans_raw = conn.execute( - "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC" - ).fetchall() - trend_plans = [] - trend_dca_probes = [] - _trend_cfg = app.extensions.get("strategy_trend_cfg") - for r in trend_plans_raw: - try: - enriched = enrich_active_trend_plan_row(r) - trend_plans.append(enriched) - except Exception as e: - print(f"[render_main_page] enrich trend plan: {e}") - enriched = row_to_dict(r) - trend_plans.append(enriched) - if _trend_cfg and page in ("strategy", "strategy_trend", "strategy_roll"): - try: - from strategy_trend_register import summarize_trend_dca_probe - - probe = summarize_trend_dca_probe(_trend_cfg, r) - trend_dca_probes.append(probe) - if isinstance(enriched, dict): - enriched["dca_probe"] = probe - except Exception as e: - print(f"[render_main_page] trend dca probe: {e}") preview_snapshots = [] if page == "records": try: @@ -5456,49 +5398,17 @@ def render_main_page(page="trade"): and active_count < MAX_ACTIVE_POSITIONS and int(trend_active or 0) == 0 ) - trend_preview = None - trend_preview_levels = [] - preview_expires_ms = None - trend_preview_expired = False - trend_preview_id_arg = "" - if page in ("strategy", "strategy_trend", "strategy_roll"): - _trend_cleanup_stale_previews(conn) - if page in ("strategy", "strategy_trend"): - trend_preview_id_arg = (request.args.get("preview_id") or "").strip() - if trend_preview_id_arg: - pr = conn.execute( - "SELECT * FROM trend_pullback_previews WHERE id=?", - (trend_preview_id_arg,), - ).fetchone() - now_ms = int(time.time() * 1000) - if pr and int(pr["expires_at_ms"] or 0) >= now_ms: - from strategy_trend_lib import build_trend_preview_level_rows - - trend_preview = row_to_dict(pr) - preview_expires_ms = int(pr["expires_at_ms"]) - if not trend_preview.get("contract_size"): - try: - ensure_markets_loaded() - ex_sym = trend_preview.get("exchange_symbol") or trend_preview.get("symbol") - mk = exchange.market(ex_sym) - trend_preview["contract_size"] = float(mk.get("contractSize") or 1) - except Exception: - pass - trend_preview, trend_preview_levels = build_trend_preview_level_rows(trend_preview) - elif pr: - trend_preview_expired = True strategy_extra = {} - if page == "strategy_records": + if page in ("strategy", "strategy_trend", "strategy_roll", "strategy_records"): from strategy_ui import strategy_render_extras - strategy_extra = strategy_render_extras(conn, page) - elif page in ("strategy", "strategy_trend", "strategy_roll"): - from strategy_ui import fetch_roll_page_data - - strategy_extra = fetch_roll_page_data( + strategy_extra = strategy_render_extras( conn, + page, default_risk_percent=float(RISK_PERCENT), count_active_trends=lambda c, ta=trend_active: int(ta or 0), + request_obj=request, + trend_cfg=app.extensions.get("strategy_trend_cfg"), ) orphan_positions: list = [] if page == "trade": @@ -5540,20 +5450,9 @@ def render_main_page(page="trade"): max_active_positions=MAX_ACTIVE_POSITIONS, manual_min_planned_rr=MANUAL_MIN_PLANNED_RR, can_trade=can_trade, - trend_plans=trend_plans, - trend_dca_probes=trend_dca_probes, live_trading_enabled=LIVE_TRADING_ENABLED, preview_snapshots=preview_snapshots, exchange_sync_from_label=(EXCHANGE_POSITION_SYNC_FROM_BJ or "最近90天"), - trend_pullback_dca_legs=TREND_PULLBACK_DCA_LEGS, - trend_pullback_preview_ttl=TREND_PULLBACK_PREVIEW_TTL_SECONDS, - trend_preview=trend_preview, - trend_preview_levels=trend_preview_levels, - preview_expires_ms=preview_expires_ms, - trend_preview_expired=trend_preview_expired, - trend_preview_id_arg=trend_preview_id_arg, - trend_preview_max_drift_pct=TREND_PREVIEW_MAX_BALANCE_DRIFT_PCT, - trend_manual_breakeven_offset_pct=TREND_PULLBACK_MANUAL_BREAKEVEN_OFFSET_PCT, list_window=list_window, list_window_presets={ "utc_today": PRESET_UTC_TODAY, @@ -8032,6 +7931,11 @@ try: }, ohlcv_fn=_hub_fetch_ohlcv, ) + from strategy_trend_register import build_trend_config, patch_trend_hub_enrich + + _hub_trend_cfg = build_trend_config(sys.modules[__name__]) + app.extensions["strategy_trend_cfg"] = _hub_trend_cfg + patch_trend_hub_enrich(app, _hub_trend_cfg) except Exception as _hub_err: print(f"[hub_bridge] gate_bot: {_hub_err}") diff --git a/hub_position_metrics.py b/hub_position_metrics.py index 3fcc4b9..93fb8bb 100644 --- a/hub_position_metrics.py +++ b/hub_position_metrics.py @@ -179,6 +179,8 @@ def enrich_ccxt_position_metrics_out( return out side = position_side_from_ccxt(position, c) entry = parse_position_entry_price(position) + if entry is not None and entry > 0: + out["entry_price"] = round(entry, 8) cs = contract_size if contract_size and contract_size > 0 else 1.0 upnl = resolve_position_display_upnl( side, entry, mark, abs(c), cs, exchange_upnl diff --git a/manual_trading_hub/static/app.js b/manual_trading_hub/static/app.js index b1a120d..7df6633 100644 --- a/manual_trading_hub/static/app.js +++ b/manual_trading_hub/static/app.js @@ -391,11 +391,13 @@ : null; if (exchange != null && !Number.isFinite(exchange)) exchange = null; const entry = - p.entry_price != null && p.entry_price !== "" - ? Number(p.entry_price) - : t.trigger_price != null - ? Number(t.trigger_price) - : null; + t.avg_entry_price != null && t.avg_entry_price !== "" + ? Number(t.avg_entry_price) + : p.entry_price != null && p.entry_price !== "" + ? Number(p.entry_price) + : t.trigger_price != null + ? Number(t.trigger_price) + : null; let mark = markOverride != null && Number.isFinite(Number(markOverride)) ? Number(markOverride) diff --git a/strategy_trend_lib.py b/strategy_trend_lib.py index a26b4ce..1bca74d 100644 --- a/strategy_trend_lib.py +++ b/strategy_trend_lib.py @@ -359,6 +359,116 @@ def append_leg_fill_price_json(existing_json: str | None, fill_px: float) -> str return json.dumps(fills, ensure_ascii=False, separators=(",", ":")) +def _trend_leg_contracts( + leg_idx: int, first_amt: float, leg_amounts: list[float] +) -> float: + if leg_idx == 0: + return float(first_amt) + li = leg_idx - 1 + if 0 <= li < len(leg_amounts): + return float(leg_amounts[li]) + return 0.0 + + +def _infer_trend_fill_from_target_avg( + leg_idx: int, + prices: list[float], + *, + first_amt: float, + leg_amounts: list[float], + legs_done: int, + target_avg: float, +) -> float: + """已知其余档位成交价时,反推单档成交价使加权均价等于 target_avg。""" + total_contracts = 0.0 + known_cost = 0.0 + unknown_amt = _trend_leg_contracts(leg_idx, first_amt, leg_amounts) + for i in range(legs_done + 1): + amt = _trend_leg_contracts(i, first_amt, leg_amounts) + if amt <= 0: + continue + total_contracts += amt + if i == leg_idx: + continue + known_cost += float(prices[i]) * amt + if unknown_amt <= 0 or total_contracts <= 0: + return float(prices[leg_idx]) + return (float(target_avg) * total_contracts - known_cost) / unknown_amt + + +def reconcile_trend_leg_fill_prices(plan: dict) -> list[float]: + """ + 首仓(0)+已补仓(1..legs_done) 成交价。 + 优先 leg_fill_prices_json;缺口用计划网格价;再对齐 avg_entry_price。 + """ + p = plan or {} + if int(p.get("first_order_done") or 0) == 0: + return [] + try: + legs_done = int(p.get("legs_done") or 0) + except (TypeError, ValueError): + legs_done = 0 + try: + first_amt = float(p.get("first_order_amount")) + except (TypeError, ValueError): + first_amt = 0.0 + try: + target_avg = float(p.get("avg_entry_price")) + except (TypeError, ValueError): + target_avg = None + + fills = parse_leg_fill_prices(p) + try: + grid = [float(x) for x in json.loads(p.get("grid_prices_json") or "[]")] + except Exception: + grid = [] + try: + leg_amounts = [float(x) for x in json.loads(p.get("leg_amounts_json") or "[]")] + except Exception: + leg_amounts = [] + + def _default_px(leg_idx: int) -> float: + if leg_idx == 0: + if target_avg is not None and legs_done == 0: + return target_avg + try: + return float(p.get("avg_entry_price")) + except (TypeError, ValueError): + pass + try: + ref = p.get("live_price_ref") + if ref not in (None, ""): + return float(ref) + except (TypeError, ValueError): + pass + return 0.0 + gi = leg_idx - 1 + if 0 <= gi < len(grid): + return float(grid[gi]) + return _default_px(0) + + result: list[float] = [] + estimated: list[int] = [] + for leg_idx in range(legs_done + 1): + if len(fills) > leg_idx: + result.append(float(fills[leg_idx])) + else: + result.append(_default_px(leg_idx)) + estimated.append(leg_idx) + + if target_avg is not None and estimated: + adjust_idx = estimated[0] if len(estimated) == 1 else estimated[-1] + result[adjust_idx] = _infer_trend_fill_from_target_avg( + adjust_idx, + result, + first_amt=first_amt, + leg_amounts=leg_amounts, + legs_done=legs_done, + target_avg=target_avg, + ) + return result + + def calc_trend_plan_money_metrics(plan: dict) -> dict: """运行中计划头部:按快照风险金额计算盈亏比(止盈盈利 U / 风险 U)。""" out = {"money_rr": None, "risk_amount_u": None} @@ -526,12 +636,12 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict if risk_u is None or risk_u <= 0: return levels - fills = parse_leg_fill_prices(p) try: legs_done = int(p.get("legs_done") or 0) except (TypeError, ValueError): legs_done = 0 first_done = int(p.get("first_order_done") or 0) != 0 + reconciled_fills = reconcile_trend_leg_fill_prices(p) ref_raw = p.get("live_price_ref") if ref_raw in (None, ""): @@ -561,8 +671,9 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict except (TypeError, ValueError): amt_f = first_amt if first_done: - fill_px = fills[0] if fills else None - if fill_px is None: + if reconciled_fills: + fill_px = float(reconciled_fills[0]) + else: try: fill_px = float(p.get("avg_entry_price") or ref) except (TypeError, ValueError): @@ -571,6 +682,7 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict cum_contracts = amt_f row_cum = cum_contracts row["avg_entry"] = float(fill_px) + row["price"] = fill_px else: accumulated = [(ref, amt_f)] cum_contracts = amt_f @@ -592,9 +704,8 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict leg_contracts = 0.0 done = row.get("status") == "done" or (leg_num > 0 and leg_num <= legs_done) if done and leg_contracts > 0: - fill_idx = leg_num - if len(fills) > fill_idx: - fill_px = float(fills[fill_idx]) + if leg_num < len(reconciled_fills): + fill_px = float(reconciled_fills[leg_num]) elif grid_trigger_f is not None: fill_px = grid_trigger_f else: diff --git a/strategy_trend_register.py b/strategy_trend_register.py index dcee3a1..bf81660 100644 --- a/strategy_trend_register.py +++ b/strategy_trend_register.py @@ -456,11 +456,21 @@ def _trend_add_leg_fields(cfg: dict, d: dict) -> dict: except Exception: grid = [] add_prices: list[float] = [] - for x in grid[:legs_done]: - try: - add_prices.append(float(x)) - except (TypeError, ValueError): - pass + try: + from strategy_trend_lib import reconcile_trend_leg_fill_prices + + fills = reconcile_trend_leg_fill_prices(out) + for i in range(1, legs_done + 1): + if i < len(fills): + add_prices.append(float(fills[i])) + except Exception: + fills = [] + if not add_prices: + for x in grid[:legs_done]: + try: + add_prices.append(float(x)) + except (TypeError, ValueError): + pass sym = out.get("exchange_symbol") or out.get("symbol") or "" out["add_count"] = legs_done out["add_count_total"] = dca_legs @@ -504,6 +514,11 @@ def _patch_hub_trend_views(app: Flask) -> None: app.config["HUB_CTX"] = ctx +def patch_trend_hub_enrich(app: Flask, cfg: dict) -> None: + """hub_bridge install 之后调用:四所 /api/hub/monitor 趋势字段与策略页一致。""" + _patch_hub_monitor_enrich(app, cfg) + + def _patch_hub_monitor_enrich(app: Flask, cfg: dict) -> None: ctx = dict(app.config.get("HUB_CTX") or {}) prev = ctx.get("enrich_monitor") @@ -530,7 +545,6 @@ def _patch_hub_monitor_enrich(app: Flask, cfg: dict) -> None: def enrich_trend_plan(cfg: dict, row) -> dict: m = _m(cfg) d = _row(cfg, row) - d = _trend_add_leg_fields(cfg, d) try: d["breakeven_applied"] = int(d.get("breakeven_applied") or 0) != 0 except Exception: @@ -538,6 +552,7 @@ def enrich_trend_plan(cfg: dict, row) -> dict: ex_sym = d.get("exchange_symbol") or m.normalize_exchange_symbol(d.get("symbol") or "") direction = (d.get("direction") or "long").lower() metrics_fn = getattr(m, "get_live_position_exchange_metrics", None) + met = None if callable(metrics_fn): try: lev = int(d.get("leverage") or 0) or None @@ -547,6 +562,13 @@ def enrich_trend_plan(cfg: dict, row) -> dict: met = metrics_fn(ex_sym, direction, order_leverage=lev) except TypeError: met = metrics_fn(ex_sym, direction) + if met and met.get("entry_price") is not None: + try: + live_entry = float(met["entry_price"]) + if live_entry > 0: + d["avg_entry_price"] = live_entry + except (TypeError, ValueError): + pass if met and met.get("unrealized_pnl") is not None: d["floating_pnl"] = float(met["unrealized_pnl"]) elif ( @@ -587,6 +609,7 @@ def enrich_trend_plan(cfg: dict, row) -> dict: d["contract_size"] = float(get_cs(ex_sym)) except (TypeError, ValueError): pass + d = _trend_add_leg_fields(cfg, d) from strategy_snapshot_lib import attach_trend_dca_levels from strategy_trend_lib import calc_trend_plan_money_metrics @@ -1256,13 +1279,24 @@ def load_trend_page_context(conn, request_obj, cfg: dict) -> dict[str, Any]: or 0 ) trend_plans = [] - for r in conn.execute( + trend_dca_probes = [] + raw_plans = conn.execute( "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC" - ).fetchall(): + ).fetchall() + for r in raw_plans: try: - trend_plans.append(enrich_trend_plan(cfg, r)) + enriched = enrich_trend_plan(cfg, r) + trend_plans.append(enriched) except Exception: - trend_plans.append(_row(cfg, r)) + enriched = _row(cfg, r) + trend_plans.append(enriched) + try: + probe = summarize_trend_dca_probe(cfg, r) + trend_dca_probes.append(probe) + if isinstance(enriched, dict): + enriched["dca_probe"] = probe + except Exception: + pass now = m.app_now() active_count = m.get_active_position_count(conn) can_trade_trend = ( @@ -1298,6 +1332,7 @@ def load_trend_page_context(conn, request_obj, cfg: dict) -> dict[str, Any]: trend_preview_expired = True return { "trend_plans": trend_plans, + "trend_dca_probes": trend_dca_probes, "trend_active": trend_active, "can_trade_trend": can_trade_trend, "trend_preview": trend_preview, diff --git a/tests/test_trend_dca_enrich_fills.py b/tests/test_trend_dca_enrich_fills.py index 3fdc8a5..03e0e4c 100644 --- a/tests/test_trend_dca_enrich_fills.py +++ b/tests/test_trend_dca_enrich_fills.py @@ -62,6 +62,22 @@ class TestTrendDcaEnrichFills(unittest.TestCase): self.assertEqual(dca2["status"], "pending") self.assertAlmostEqual(dca2["price"], 0.343, places=4) + def test_missing_dca_fills_align_last_avg_with_header(self): + """缺补仓成交价时,末档加仓后均价应对齐计划头部 avg_entry_price。""" + plan = self._base_plan( + legs_done=2, + avg_entry_price=0.3507, + order_amount_open=161, + leg_fill_prices_json=json.dumps([0.3436]), + grid_prices_json=json.dumps([0.343, 0.343, 0.3395, 0.336, 0.3325]), + ) + enriched = attach_trend_dca_levels(plan) + levels = enriched["dca_levels"] + dca2 = levels[2] + self.assertEqual(dca2["status"], "done") + self.assertAlmostEqual(dca2["avg_entry"], 0.3507, places=4) + self.assertGreater(dca2["price"], 0.343) + if __name__ == "__main__": unittest.main() diff --git a/tests/test_trend_hub_enrich_unified.py b/tests/test_trend_hub_enrich_unified.py new file mode 100644 index 0000000..3445ea0 --- /dev/null +++ b/tests/test_trend_hub_enrich_unified.py @@ -0,0 +1,90 @@ +"""四所趋势 enrich:实例与中控 monitor 字段一致。""" +from __future__ import annotations + +import json +import sys +import unittest +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT)) + +from strategy_trend_register import ( # noqa: E402 + enrich_trend_plan, + enrich_trend_plan_for_hub, +) + + +class _FakeModule: + @staticmethod + def normalize_exchange_symbol(sym): + return sym + + @staticmethod + def ensure_markets_loaded(): + return None + + @staticmethod + def get_live_position_exchange_metrics(ex_sym, direction, order_leverage=None): + return { + "entry_price": 0.3507, + "mark_price": 0.3431, + "unrealized_pnl": -1.23, + } + + @staticmethod + def get_contract_size(_ex_sym): + return 1.0 + + +class TestTrendHubEnrichUnified(unittest.TestCase): + def _cfg(self): + return { + "app_module": _FakeModule(), + "breakeven_offset_pct": 0.3, + "row_to_dict": lambda row: dict(row) if not isinstance(row, dict) else row, + } + + def _plan_row(self): + return { + "id": 4, + "symbol": "ONDO/USDT:USDT", + "exchange_symbol": "ONDO/USDT:USDT", + "direction": "long", + "stop_loss": 0.329, + "take_profit": 0.476, + "add_upper": 0.35, + "first_order_amount": 115, + "snapshot_available_usdt": 97.98, + "risk_percent": 5, + "contract_size": 1.0, + "grid_prices_json": json.dumps([0.343, 0.343, 0.3395, 0.336, 0.3325]), + "leg_amounts_json": json.dumps([23, 23, 23, 23, 23]), + "dca_legs": 5, + "first_order_done": 1, + "legs_done": 2, + "avg_entry_price": 0.3434, + "order_amount_open": 161, + "leg_fill_prices_json": json.dumps([0.3436]), + "leverage": 10, + "plan_margin_capital": 8.17, + } + + def test_hub_and_page_share_live_avg_and_dca_levels(self): + cfg = self._cfg() + row = self._plan_row() + page = enrich_trend_plan(cfg, row) + hub = enrich_trend_plan_for_hub(cfg, row) + self.assertAlmostEqual(page["avg_entry_price"], 0.3507, places=4) + self.assertAlmostEqual(hub["avg_entry_price"], 0.3507, places=4) + self.assertIn("dca_levels", page) + self.assertIn("dca_levels", hub) + last_done = hub["dca_levels"][2] + self.assertEqual(last_done["status"], "done") + self.assertAlmostEqual(last_done["avg_entry"], 0.3507, places=4) + self.assertEqual(hub.get("monitor_source"), "趋势回调计划") + self.assertEqual(hub.get("add_count"), 2) + + +if __name__ == "__main__": + unittest.main()