fix(trend): use money RR, track DCA fills, snapshot before close

Align running-plan header and DCA table with risk-budget RR, record actual fill prices after each leg, and save pre-close snapshots on stop/TP/handoff across hub and exchanges.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-07 17:34:50 +08:00
parent 84abf7e7f7
commit d56d9050aa
7 changed files with 268 additions and 87 deletions
+19 -3
View File
@@ -3976,9 +3976,21 @@ def enrich_active_trend_plan_row(row):
d["floating_mark"] = float(m["mark_price"])
else:
d["floating_mark"] = None
try:
d["contract_size"] = float(get_contract_size(ex_sym))
except (TypeError, ValueError):
pass
from strategy_snapshot_lib import attach_trend_dca_levels
from strategy_trend_lib import calc_trend_plan_money_metrics
return attach_trend_dca_levels(d)
d = attach_trend_dca_levels(d)
money = calc_trend_plan_money_metrics(d)
if money.get("money_rr") is not None:
d["money_rr"] = money["money_rr"]
d["planned_rr"] = money["money_rr"]
if money.get("risk_amount_u") is not None:
d["risk_amount_u"] = money["risk_amount_u"]
return d
def opened_at_str_to_ms(opened_at_str):
@@ -6801,12 +6813,15 @@ def execute_trend_pullback():
trading_day = get_trading_day(now)
opened_at = app_now_str()
opened_ms = _to_ms_with_fallback(None, opened_at)
from strategy_trend_lib import append_leg_fill_price_json
fills_json = append_leg_fill_price_json(None, fill1)
cur = conn.execute(
"""INSERT INTO trend_pullback_plans (
status,symbol,exchange_symbol,direction,leverage,stop_loss,initial_stop_loss,add_upper,take_profit,risk_percent,
snapshot_available_usdt,snapshot_at,plan_margin_capital,target_order_amount,first_order_amount,remainder_total,
dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,legs_done,first_order_done,last_mark_price,avg_entry_price,order_amount_open,opened_at,opened_at_ms,session_date,message
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,legs_done,first_order_done,last_mark_price,avg_entry_price,order_amount_open,opened_at,opened_at_ms,session_date,message,leg_fill_prices_json
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
"active",
symbol,
@@ -6837,6 +6852,7 @@ def execute_trend_pullback():
opened_ms,
trading_day,
f"预览ID:{pid[:8]}",
fills_json,
),
)
new_plan_id = int(cur.lastrowid)
+17 -1
View File
@@ -1082,6 +1082,10 @@
function resolveTrendPlanRr(trendPlan, side, entry, sl, tp) {
const t = trendPlan || {};
if (t.money_rr != null && t.money_rr !== "") {
const n = Number(t.money_rr);
if (Number.isFinite(n) && n > 0) return n;
}
if (t.planned_rr != null && t.planned_rr !== "") {
const n = Number(t.planned_rr);
if (Number.isFinite(n) && n > 0) return n;
@@ -1839,12 +1843,24 @@
: "—";
const amt =
lv.contracts != null && lv.contracts !== "" ? esc(String(lv.contracts)) : "—";
const avg =
lv.avg_entry != null && lv.avg_entry !== ""
? fmtSymbolPrice(lv.avg_entry, sym, tickMap)
: "—";
const profitU =
lv.profit_u != null && lv.profit_u !== "" ? fmt(lv.profit_u, 2) : "—";
const riskU = lv.risk_u != null && lv.risk_u !== "" ? fmt(lv.risk_u, 2) : "—";
const rr = lv.rr != null && lv.rr !== "" ? `${fmt(lv.rr, 2)}:1` : "—";
const stCls = lv.status === "done" ? "st-done" : "st-pending";
const label = lv.status_label || (lv.status === "done" ? "已补仓" : "待补仓");
return `<tr>
<td>${esc(lv.label || lv.leg_key || "—")}</td>
<td>${esc(price)}</td>
<td>${amt}</td>
<td>${esc(avg)}</td>
<td>${esc(profitU)}</td>
<td>${esc(riskU)}</td>
<td>${esc(rr)}</td>
<td class="${stCls}">${esc(label)}</td>
</tr>`;
})
@@ -1852,7 +1868,7 @@
return `<div class="plan-dca-block plan-dca-block--side">
<div class="plan-dca-title">补仓计划明细</div>
<table class="plan-dca-table">
<tr><th>档位</th><th>触发价</th><th>张数</th><th>状态</th></tr>
<tr><th>档位</th><th>触发价</th><th>张数</th><th>加仓后均价</th><th>止盈盈利(U)</th><th>止损(U)</th><th>盈亏比</th><th>状态</th></tr>
${rows}
</table>
</div>`;
+1
View File
@@ -153,6 +153,7 @@ def init_strategy_tables(conn) -> None:
"ALTER TABLE order_monitors ADD COLUMN trend_plan_id INTEGER",
"ALTER TABLE order_monitors ADD COLUMN monitor_type TEXT",
"ALTER TABLE order_monitors ADD COLUMN key_signal_type TEXT",
"ALTER TABLE trend_pullback_plans ADD COLUMN leg_fill_prices_json TEXT",
):
try:
conn.execute(ddl)
+2 -17
View File
@@ -101,22 +101,7 @@
<div class="running-plans-stack">
{% for t in trend_plans %}
{% set sym = t.exchange_symbol or t.symbol %}
{% set calc = namespace(rr=None, pnlpct=None) %}
{% if t.avg_entry_price is not none and t.stop_loss is not none and t.take_profit is not none %}
{% set e = t.avg_entry_price|float %}
{% set sl = t.stop_loss|float %}
{% set tp = t.take_profit|float %}
{% if t.direction == 'long' %}
{% set risk = e - sl %}
{% set reward = tp - e %}
{% else %}
{% set risk = sl - e %}
{% set reward = e - tp %}
{% endif %}
{% if risk > 0 %}
{% set calc.rr = reward / risk %}
{% endif %}
{% endif %}
{% set calc = namespace(pnlpct=None) %}
{% if t.floating_pnl is not none and t.plan_margin_capital is not none and t.plan_margin_capital|float > 0 %}
{% set calc.pnlpct = (t.floating_pnl|float) / (t.plan_margin_capital|float) * 100 %}
{% endif %}
@@ -148,7 +133,7 @@
</div>
<div class="plan-cell">
<span class="lbl">盈亏比</span>
<span class="val">{% if calc.rr is not none %}{{ '%.2f'|format(calc.rr) }}:1{% else %}—{% endif %}</span>
<span class="val">{% if t.money_rr is not none %}{{ '%.2f'|format(t.money_rr) }}:1{% elif t.planned_rr is not none %}{{ '%.2f'|format(t.planned_rr) }}:1{% else %}—{% endif %}</span>
</div>
<div class="plan-cell">
<span class="lbl">标记价</span>
+120 -20
View File
@@ -336,6 +336,59 @@ def weighted_avg_entry(legs: list[tuple[float, float]]) -> Optional[float]:
return cost / total
def parse_leg_fill_prices(plan: dict) -> list[float]:
"""首仓 + 各档补仓实际成交价列表。"""
try:
raw = json.loads((plan or {}).get("leg_fill_prices_json") or "[]")
if not isinstance(raw, list):
return []
out: list[float] = []
for item in raw:
try:
out.append(float(item))
except (TypeError, ValueError):
continue
return out
except Exception:
return []
def append_leg_fill_price_json(existing_json: str | None, fill_px: float) -> str:
fills = parse_leg_fill_prices({"leg_fill_prices_json": existing_json})
fills.append(float(fill_px))
return json.dumps(fills, ensure_ascii=False, separators=(",", ":"))
def calc_trend_plan_money_metrics(plan: dict) -> dict:
"""运行中计划头部:按快照风险金额计算盈亏比(止盈盈利 U / 风险 U)。"""
out = {"money_rr": None, "risk_amount_u": None}
p = plan or {}
try:
direction = (p.get("direction") or "long").strip().lower()
user_tp = float(p.get("take_profit"))
avg = float(p.get("avg_entry_price"))
open_amt = float(p.get("order_amount_open") or p.get("first_order_amount") or 0)
snapshot = float(p.get("snapshot_available_usdt"))
risk_percent = float(p.get("risk_percent"))
except (TypeError, ValueError):
return out
if avg <= 0 or open_amt <= 0:
return out
risk_u = calc_risk_budget_usdt(snapshot, risk_percent)
if risk_u is None or risk_u <= 0:
return out
out["risk_amount_u"] = risk_u
try:
contract_size = float(p.get("contract_size") or 1.0)
if contract_size <= 0:
contract_size = 1.0
except (TypeError, ValueError):
contract_size = 1.0
profit_u = calc_tp_profit_usdt(direction, avg, user_tp, open_amt, contract_size)
out["money_rr"] = calc_money_reward_risk_ratio(profit_u, risk_u)
return out
def build_trend_preview_level_rows(preview: dict) -> tuple[dict, list[dict]]:
"""
预览:表单止盈价下每档累计持仓的盈利 U;止损金额 = 快照×风险;盈亏比按金额对比。
@@ -455,7 +508,7 @@ def build_trend_preview_level_rows(preview: dict) -> tuple[dict, list[dict]]:
def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict]:
"""运行中计划:为 dca_levels 补充加仓后均价、止盈盈利 U、止损金额 U、金额盈亏比"""
"""运行中计划:补仓表按实际成交价重算触发价/均价/金额盈亏比;未补档仍用计划触发价预估"""
if not levels:
return levels
p = plan or {}
@@ -473,6 +526,13 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict
if risk_u is None or risk_u <= 0:
return levels
fills = parse_leg_fill_prices(p)
try:
legs_done = int(p.get("legs_done") or 0)
except (TypeError, ValueError):
legs_done = 0
first_done = int(p.get("first_order_done") or 0) != 0
ref_raw = p.get("live_price_ref")
if ref_raw in (None, ""):
ref_raw = p.get("avg_entry_price")
@@ -494,32 +554,72 @@ def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict
for lv in levels:
row = dict(lv)
is_first = row.get("leg_key") == "first" or row.get("label") == "首仓" or row.get("i") == 0
row_cum = cum_contracts
if is_first:
amt = row.get("contracts")
try:
amt_f = float(amt if amt is not None else first_amt)
amt_f = float(row.get("contracts") if row.get("contracts") is not None else first_amt)
except (TypeError, ValueError):
amt_f = first_amt
accumulated = [(ref, amt_f)]
cum_contracts = amt_f
row["avg_entry"] = ref
if first_done:
fill_px = fills[0] if fills else None
if fill_px is None:
try:
fill_px = float(p.get("avg_entry_price") or ref)
except (TypeError, ValueError):
fill_px = ref
accumulated = [(float(fill_px), amt_f)]
cum_contracts = amt_f
row_cum = cum_contracts
row["avg_entry"] = float(fill_px)
else:
accumulated = [(ref, amt_f)]
cum_contracts = amt_f
row_cum = cum_contracts
row["avg_entry"] = ref
else:
price = row.get("price")
contracts = row.get("contracts")
if price is not None and contracts is not None:
try:
leg_contracts = float(contracts)
accumulated.append((float(price), leg_contracts))
avg = weighted_avg_entry(accumulated)
if avg is not None:
row["avg_entry"] = avg
cum_contracts += leg_contracts
except (TypeError, ValueError):
pass
try:
leg_num = int(row.get("i") or 0)
except (TypeError, ValueError):
leg_num = 0
grid_trigger = row.get("price")
try:
grid_trigger_f = float(grid_trigger) if grid_trigger is not None else None
except (TypeError, ValueError):
grid_trigger_f = None
try:
leg_contracts = float(row.get("contracts") or 0)
except (TypeError, ValueError):
leg_contracts = 0.0
done = row.get("status") == "done" or (leg_num > 0 and leg_num <= legs_done)
if done and leg_contracts > 0:
fill_idx = leg_num
if len(fills) > fill_idx:
fill_px = float(fills[fill_idx])
elif grid_trigger_f is not None:
fill_px = grid_trigger_f
else:
fill_px = ref
row["price"] = fill_px
accumulated.append((fill_px, leg_contracts))
cum_contracts += leg_contracts
row_cum = cum_contracts
avg = weighted_avg_entry(accumulated)
if avg is not None:
row["avg_entry"] = avg
elif grid_trigger_f is not None and leg_contracts > 0:
row["price"] = grid_trigger_f
projected = accumulated + [(grid_trigger_f, leg_contracts)]
avg = weighted_avg_entry(projected)
if avg is not None:
row["avg_entry"] = avg
row_cum = cum_contracts + leg_contracts
elif grid_trigger_f is not None:
row["price"] = grid_trigger_f
avg_entry = row.get("avg_entry")
if avg_entry is not None:
if avg_entry is not None and row_cum > 0:
profit_u = calc_tp_profit_usdt(
direction, float(avg_entry), user_tp, cum_contracts, contract_size
direction, float(avg_entry), user_tp, row_cum, contract_size
)
row["take_profit_price"] = user_tp
row["profit_u"] = profit_u
+42 -46
View File
@@ -374,18 +374,6 @@ def enrich_trend_plan_for_hub(cfg: dict, raw: dict) -> dict:
d = enrich_trend_plan(cfg, dict(raw or {}))
d["monitor_source"] = "趋势回调计划"
m = _m(cfg)
direction = (d.get("direction") or "long").lower()
try:
avg_e = float(d["avg_entry_price"])
sl = float(d["stop_loss"])
tp = float(d["take_profit"])
rr_fn = getattr(m, "calc_rr_ratio", None)
if callable(rr_fn):
rr = rr_fn(direction, avg_e, sl, tp)
if rr is not None:
d["planned_rr"] = float(rr)
except (TypeError, ValueError, KeyError):
pass
try:
snap = float(d.get("snapshot_available_usdt") or 0)
margin = float(d.get("plan_margin_capital") or 0)
@@ -497,8 +485,15 @@ def enrich_trend_plan(cfg: dict, row) -> dict:
except (TypeError, ValueError):
pass
from strategy_snapshot_lib import attach_trend_dca_levels
from strategy_trend_lib import calc_trend_plan_money_metrics
d = attach_trend_dca_levels(d)
money = calc_trend_plan_money_metrics(d)
if money.get("money_rr") is not None:
d["money_rr"] = money["money_rr"]
d["planned_rr"] = money["money_rr"]
if money.get("risk_amount_u") is not None:
d["risk_amount_u"] = money["risk_amount_u"]
try:
d["breakeven_default_offset_pct"] = float(cfg.get("breakeven_offset_pct", 0.3))
except (TypeError, ValueError):
@@ -567,6 +562,19 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) -
except (TypeError, ValueError):
pass
planned_rr = m.calc_rr_ratio(direction, avg_e, float(row["stop_loss"]), float(row["take_profit"]))
try:
from strategy_snapshot_lib import save_trend_plan_snapshot
save_trend_plan_snapshot(
cfg,
conn,
row,
result_label=result_label,
exit_price=float(exit_price) if exit_price is not None else None,
pnl_amount=float(pnl_amount) if pnl_amount is not None else None,
)
except Exception:
pass
try:
cancel_symbol_orders(cfg, ex_sym)
except Exception:
@@ -594,24 +602,6 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) -
)
except Exception:
pass
try:
closed = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=?", (plan_id,)
).fetchone()
if closed:
from strategy_snapshot_lib import save_trend_plan_snapshot
save_trend_plan_snapshot(
cfg,
conn,
closed,
result_label=result_label,
exit_price=float(exit_price),
pnl_amount=float(pnl_amount) if pnl_amount is not None else None,
)
conn.commit()
except Exception:
pass
if _trend_plan_trade_exists(conn, plan_id):
return
session_date = row["session_date"] or m.get_trading_day()
@@ -808,10 +798,16 @@ def check_trend_pullback_plans(cfg: dict) -> None:
old_open = float(row["order_amount_open"] or 0)
new_avg = _weighted_avg(old_avg, old_open, fill_px, amt)
legs_done += 1
from strategy_trend_lib import append_leg_fill_price_json
fills_json = append_leg_fill_price_json(
row["leg_fill_prices_json"] if "leg_fill_prices_json" in row.keys() else None,
fill_px,
)
conn.execute(
"UPDATE trend_pullback_plans SET legs_done=?, avg_entry_price=?, "
"order_amount_open=?, last_mark_price=? WHERE id=?",
(legs_done, new_avg, old_open + amt, pf, row["id"]),
"order_amount_open=?, last_mark_price=?, leg_fill_prices_json=? WHERE id=?",
(legs_done, new_avg, old_open + amt, pf, fills_json, row["id"]),
)
row = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=?", (row["id"],)
@@ -991,6 +987,14 @@ def apply_manual_breakeven(cfg: dict, conn, row, offset_pct=None) -> tuple[bool,
if not ok_live:
return False, live_reason or "实盘未就绪"
plan_id = int(row["id"])
try:
from strategy_snapshot_lib import save_trend_plan_snapshot
save_trend_plan_snapshot(
cfg, conn, row, result_label="保本移交", exit_price=None, pnl_amount=None
)
except Exception:
pass
handoff_row = {
"symbol": sym,
"exchange_symbol": ex_sym,
@@ -1060,18 +1064,6 @@ def apply_manual_breakeven(cfg: dict, conn, row, offset_pct=None) -> tuple[bool,
if callable(wl):
lines.insert(1, f"**账户:{wl()}**")
send("\n".join(lines))
try:
handoff = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=?", (plan_id,)
).fetchone()
if handoff:
from strategy_snapshot_lib import save_trend_plan_snapshot
save_trend_plan_snapshot(
cfg, conn, handoff, result_label="保本移交", exit_price=None, pnl_amount=None
)
except Exception:
pass
return True, None
@@ -1275,12 +1267,15 @@ def register_trend_routes(app: Flask, cfg: dict) -> None:
trading_day = m.get_trading_day(m.app_now())
opened_at = m.app_now_str()
opened_ms = getattr(m, "_to_ms_with_fallback", lambda a, b: None)(None, opened_at)
from strategy_trend_lib import append_leg_fill_price_json
fills_json = append_leg_fill_price_json(None, fill1)
cur = conn.execute(
"""INSERT INTO trend_pullback_plans (
status,symbol,exchange_symbol,direction,leverage,stop_loss,initial_stop_loss,add_upper,take_profit,risk_percent,
snapshot_available_usdt,snapshot_at,plan_margin_capital,target_order_amount,first_order_amount,remainder_total,
dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,legs_done,first_order_done,last_mark_price,avg_entry_price,order_amount_open,opened_at,opened_at_ms,session_date,message
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,legs_done,first_order_done,last_mark_price,avg_entry_price,order_amount_open,opened_at,opened_at_ms,session_date,message,leg_fill_prices_json
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
"active",
symbol,
@@ -1311,6 +1306,7 @@ def register_trend_routes(app: Flask, cfg: dict) -> None:
opened_ms,
trading_day,
f"预览ID:{pid[:8]}",
fills_json,
),
)
new_id = int(cur.lastrowid)
+67
View File
@@ -0,0 +1,67 @@
"""趋势回调运行中计划:实际成交价重算补仓表与金额盈亏比。"""
from __future__ import annotations
import json
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from strategy_snapshot_lib import attach_trend_dca_levels # noqa: E402
from strategy_trend_lib import calc_trend_plan_money_metrics # noqa: E402
class TestTrendDcaEnrichFills(unittest.TestCase):
def _base_plan(self, **overrides):
plan = {
"direction": "long",
"stop_loss": 0.329,
"take_profit": 0.476,
"first_order_amount": 115,
"snapshot_available_usdt": 97.98,
"risk_percent": 5,
"contract_size": 1.0,
"grid_prices_json": json.dumps([0.3465, 0.343, 0.3395, 0.336, 0.3325]),
"leg_amounts_json": json.dumps([23, 23, 23, 23, 23]),
"dca_legs": 5,
"first_order_done": 1,
"legs_done": 0,
"avg_entry_price": 0.3537,
"order_amount_open": 115,
"target_order_amount": 230,
"leg_fill_prices_json": json.dumps([0.3537]),
}
plan.update(overrides)
return plan
def test_header_money_rr_not_price_rr(self):
plan = self._base_plan()
metrics = calc_trend_plan_money_metrics(plan)
self.assertAlmostEqual(metrics["risk_amount_u"], 4.899, places=2)
self.assertIsNotNone(metrics["money_rr"])
self.assertLess(metrics["money_rr"], 4.0)
def test_done_dca_uses_actual_fill_price(self):
plan = self._base_plan(
legs_done=1,
avg_entry_price=0.3512,
order_amount_open=138,
leg_fill_prices_json=json.dumps([0.3537, 0.3458]),
)
enriched = attach_trend_dca_levels(plan)
levels = enriched["dca_levels"]
self.assertEqual(len(levels), 6)
dca1 = levels[1]
self.assertEqual(dca1["status"], "done")
self.assertAlmostEqual(dca1["price"], 0.3458, places=4)
self.assertIsNotNone(dca1["avg_entry"])
self.assertIsNotNone(dca1["rr"])
dca2 = levels[2]
self.assertEqual(dca2["status"], "pending")
self.assertAlmostEqual(dca2["price"], 0.343, places=4)
if __name__ == "__main__":
unittest.main()