feat: trend pullback preview TP per DCA leg with unified stop loss across exchanges

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-05 16:10:27 +08:00
parent 674d721072
commit 31756e838d
7 changed files with 292 additions and 23 deletions
+3 -7
View File
@@ -5607,15 +5607,11 @@ def render_main_page(page="trade"):
).fetchone()
now_ms = int(time.time() * 1000)
if pr and int(pr["expires_at_ms"] or 0) >= now_ms:
from strategy_trend_lib import build_trend_preview_level_rows
trend_preview = row_to_dict(pr)
preview_expires_ms = int(pr["expires_at_ms"])
try:
grid = json.loads(trend_preview.get("grid_prices_json") or "[]")
legs = json.loads(trend_preview.get("leg_amounts_json") or "[]")
except Exception:
grid, legs = [], []
for i, pair in enumerate(zip(grid, legs), 1):
trend_preview_levels.append({"i": i, "price": pair[0], "contracts": pair[1]})
trend_preview, trend_preview_levels = build_trend_preview_level_rows(trend_preview)
elif pr:
trend_preview_expired = True
strategy_extra = {}
+18 -4
View File
@@ -594,12 +594,26 @@ def _fetch_preview(pid):
now_ms = int(time.time() * 1000)
d["expires_in_sec"] = max(0, int((int(d.get("expires_at_ms") or 0) - now_ms) / 1000))
try:
grid = json.loads(d.get("grid_prices_json") or "[]")
legs = json.loads(d.get("leg_amounts_json") or "[]")
from strategy_trend_lib import build_trend_preview_level_rows
enriched, level_rows = build_trend_preview_level_rows(d)
for key in ("preview_target_rr", "preview_first_take_profit", "preview_unified_stop_loss"):
if key in enriched:
d[key] = enriched[key]
d["preview_level_rows"] = level_rows
d["grid_levels"] = [
{"i": i + 1, "price": grid[i], "contracts": legs[i] if i < len(legs) else None}
for i in range(len(grid))
{
"i": row.get("i"),
"label": row.get("label"),
"price": row.get("price"),
"contracts": row.get("contracts"),
"avg_entry": row.get("avg_entry"),
"take_profit": row.get("take_profit"),
"stop_loss": row.get("stop_loss"),
}
for row in level_rows
]
except Exception:
d["grid_levels"] = []
d["preview_level_rows"] = []
return d
+4 -1
View File
@@ -111,8 +111,11 @@ def build_trend_dca_levels(plan: dict) -> list[dict]:
def attach_trend_dca_levels(plan: dict) -> dict:
from strategy_trend_lib import enrich_trend_dca_levels_with_tp
d = dict(plan or {})
d["dca_levels"] = build_trend_dca_levels(d)
levels = build_trend_dca_levels(d)
d["dca_levels"] = enrich_trend_dca_levels_with_tp(d, levels)
return d
+14 -4
View File
@@ -47,13 +47,20 @@
{{ trend_preview.symbol }} {{ '做多' if trend_preview.direction == 'long' else '做空' }} {{ trend_preview.leverage }}x
预览可用快照 <strong>{{ mf(trend_preview.snapshot_available_usdt) }}</strong> U 参考价 {{ price_fmt(trend_preview.symbol, trend_preview.live_price_ref) }}
计划保证金≈{{ mf(trend_preview.plan_margin_capital) }} U 总张≈{{ amt_disp(trend_preview.symbol, trend_preview.target_order_amount) }}(首仓 {{ amt_disp(trend_preview.symbol, trend_preview.first_order_amount) }} + 补仓 {{ amt_disp(trend_preview.symbol, trend_preview.remainder_total) }}<br>
止损 {{ price_fmt(trend_preview.symbol, trend_preview.stop_loss) }} {{ trend_add_zone_label(trend_preview.direction) }} {{ price_fmt(trend_preview.symbol, trend_preview.add_upper) }} 止盈 {{ price_fmt(trend_preview.symbol, trend_preview.take_profit) }} 风险比例 {{ trend_preview.risk_percent }}%
统一止损 {{ price_fmt(trend_preview.symbol, trend_preview.preview_unified_stop_loss or trend_preview.stop_loss) }} {{ trend_add_zone_label(trend_preview.direction) }} {{ price_fmt(trend_preview.symbol, trend_preview.add_upper) }} 表单止盈 {{ price_fmt(trend_preview.symbol, trend_preview.take_profit) }} 目标RR {% if trend_preview.preview_target_rr is not none %}{{ '%.2f'|format(trend_preview.preview_target_rr) }}{% else %}—{% endif %} 风险比例 {{ trend_preview.risk_percent }}%
</div>
<div class="table-wrap" style="margin-bottom:10px">
<table>
<tr><th>#</th><th>补仓触发价</th><th>该档张数</th></tr>
<tr><th>档位</th><th>触发/参考</th><th>张数</th><th>加仓后均价</th><th>止盈</th><th>止损</th></tr>
{% for row in trend_preview_levels %}
<tr><td>{{ row.i }}</td><td>{{ price_fmt(trend_preview.symbol, row.price) }}</td><td>{{ amt_disp(trend_preview.symbol, row.contracts) }}</td></tr>
<tr>
<td>{{ row.label or row.i }}</td>
<td>{{ price_fmt(trend_preview.symbol, row.price) }}</td>
<td>{{ amt_disp(trend_preview.symbol, row.contracts) }}</td>
<td>{% if row.avg_entry is not none %}{{ price_fmt(trend_preview.symbol, row.avg_entry) }}{% else %}—{% endif %}</td>
<td>{% if row.take_profit is not none %}{{ price_fmt(trend_preview.symbol, row.take_profit) }}{% else %}—{% endif %}</td>
<td>{% if row.stop_loss is not none %}{{ price_fmt(trend_preview.symbol, row.stop_loss) }}{% else %}—{% endif %}</td>
</tr>
{% endfor %}
</table>
</div>
@@ -159,12 +166,15 @@
<div class="plan-dca-block">
<div class="plan-dca-title">补仓计划明细</div>
<table class="plan-dca-table">
<tr><th>档位</th><th>触发价</th><th>张数</th><th>状态</th></tr>
<tr><th>档位</th><th>触发价</th><th>张数</th><th>加仓后均价</th><th>止盈</th><th>止损</th><th>状态</th></tr>
{% for lv in t.dca_levels %}
<tr>
<td>{{ lv.label }}</td>
<td>{% if lv.price is not none %}{{ price_fmt(sym, lv.price) }}{% else %}—{% endif %}</td>
<td>{% if lv.contracts is not none %}{{ amt_disp(sym, lv.contracts) }}{% else %}—{% endif %}</td>
<td>{% if lv.avg_entry is not none %}{{ price_fmt(sym, lv.avg_entry) }}{% else %}—{% endif %}</td>
<td>{% if lv.take_profit is not none %}{{ price_fmt(sym, lv.take_profit) }}{% else %}—{% endif %}</td>
<td>{% if lv.stop_loss is not none %}{{ price_fmt(sym, lv.stop_loss) }}{% else %}—{% endif %}</td>
<td class="{% if lv.status == 'done' %}st-done{% else %}st-pending{% endif %}">{{ lv.status_label }}</td>
</tr>
{% endfor %}
+198
View File
@@ -191,3 +191,201 @@ def compute_trend_plan_core(
"leg_amounts": leg_list,
}
return payload, None
def calc_planned_reward_risk_ratio(
direction: str, entry_price: float, stop_loss: float, take_profit: float
) -> Optional[float]:
"""盈亏比(reward/risk),与四所 calc_rr_ratio 口径一致。"""
try:
entry = float(entry_price)
sl = float(stop_loss)
tp = float(take_profit)
if entry <= 0 or sl <= 0 or tp <= 0:
return None
direction = (direction or "long").strip().lower()
if direction == "short":
risk = sl - entry
reward = entry - tp
else:
risk = entry - sl
reward = tp - entry
if risk <= 0 or reward <= 0:
return None
return round(reward / risk, 4)
except (TypeError, ValueError):
return None
def calc_take_profit_for_rr(
direction: str, entry_price: float, stop_loss: float, reward_risk_ratio: float
) -> Optional[float]:
"""按统一止损与目标 RR 反推止盈价。"""
try:
entry = float(entry_price)
sl = float(stop_loss)
rr = float(reward_risk_ratio)
if entry <= 0 or sl <= 0 or rr <= 0:
return None
direction = (direction or "long").strip().lower()
if direction == "short":
risk = sl - entry
if risk <= 0:
return None
return round(entry - rr * risk, 10)
risk = entry - sl
if risk <= 0:
return None
return round(entry + rr * risk, 10)
except (TypeError, ValueError):
return None
def weighted_avg_entry(legs: list[tuple[float, float]]) -> Optional[float]:
"""按 (成交价, 张数) 加权均价。"""
total = 0.0
cost = 0.0
for price, amount in legs or []:
try:
p = float(price)
a = float(amount)
except (TypeError, ValueError):
continue
if a <= 0:
continue
total += a
cost += p * a
if total <= 0:
return None
return cost / total
def build_trend_preview_level_rows(preview: dict) -> tuple[dict, list[dict]]:
"""
预览:参考价首仓止盈 + 每档补仓后止盈;止损统一为计划止损(加仓后最大止损)。
返回 (增强后的 preview 字段, 表格行列表,含首仓行)。
"""
p = dict(preview or {})
direction = (p.get("direction") or "long").strip().lower()
try:
ref = float(p.get("live_price_ref"))
sl = float(p.get("stop_loss"))
user_tp = float(p.get("take_profit"))
first_amt = float(p.get("first_order_amount"))
except (TypeError, ValueError):
return p, []
rr = calc_planned_reward_risk_ratio(direction, ref, sl, user_tp)
if rr is None:
return p, []
first_tp = calc_take_profit_for_rr(direction, ref, sl, rr)
p["preview_target_rr"] = rr
p["preview_first_take_profit"] = first_tp
p["preview_unified_stop_loss"] = sl
try:
grid = json.loads(p.get("grid_prices_json") or "[]")
if not isinstance(grid, list):
grid = []
except Exception:
grid = []
try:
leg_amounts = json.loads(p.get("leg_amounts_json") or "[]")
if not isinstance(leg_amounts, list):
leg_amounts = []
except Exception:
leg_amounts = []
rows: list[dict] = [
{
"i": 0,
"label": "首仓",
"price": ref,
"contracts": first_amt,
"avg_entry": ref,
"take_profit": first_tp,
"stop_loss": sl,
"is_first": True,
}
]
accumulated: list[tuple[float, float]] = [(ref, first_amt)]
for i, pair in enumerate(zip(grid, leg_amounts), 1):
try:
price = float(pair[0])
contracts = float(pair[1])
except (TypeError, ValueError):
continue
accumulated.append((price, contracts))
avg = weighted_avg_entry(accumulated)
tp_after = calc_take_profit_for_rr(direction, avg, sl, rr) if avg is not None else None
rows.append(
{
"i": i,
"label": f"补仓{i}",
"price": price,
"contracts": contracts,
"avg_entry": avg,
"take_profit": tp_after,
"stop_loss": sl,
"is_first": False,
}
)
return p, rows
def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict]:
"""运行中计划:为 dca_levels 补充加仓后均价、止盈、统一止损。"""
if not levels:
return levels
p = plan or {}
direction = (p.get("direction") or "long").strip().lower()
try:
sl = float(p.get("stop_loss"))
user_tp = float(p.get("take_profit"))
first_amt = float(p.get("first_order_amount"))
except (TypeError, ValueError):
return levels
ref_raw = p.get("live_price_ref")
if ref_raw in (None, ""):
ref_raw = p.get("avg_entry_price")
try:
ref = float(ref_raw)
except (TypeError, ValueError):
return levels
rr = calc_planned_reward_risk_ratio(direction, ref, sl, user_tp)
if rr is None:
return levels
out: list[dict] = []
accumulated: list[tuple[float, float]] = []
for lv in levels:
row = dict(lv)
is_first = row.get("leg_key") == "first" or row.get("label") == "首仓" or row.get("i") == 0
if is_first:
amt = row.get("contracts")
try:
amt_f = float(amt if amt is not None else first_amt)
except (TypeError, ValueError):
amt_f = first_amt
accumulated = [(ref, amt_f)]
row["avg_entry"] = ref
row["take_profit"] = calc_take_profit_for_rr(direction, ref, sl, rr)
row["stop_loss"] = sl
else:
price = row.get("price")
contracts = row.get("contracts")
if price is not None and contracts is not None:
try:
accumulated.append((float(price), float(contracts)))
avg = weighted_avg_entry(accumulated)
if avg is not None:
row["avg_entry"] = avg
row["take_profit"] = calc_take_profit_for_rr(direction, avg, sl, rr)
row["stop_loss"] = sl
except (TypeError, ValueError):
pass
out.append(row)
return out
+3 -7
View File
@@ -1093,15 +1093,11 @@ def load_trend_page_context(conn, request_obj, cfg: dict) -> dict[str, Any]:
).fetchone()
now_ms = int(time.time() * 1000)
if pr and int(pr["expires_at_ms"] or 0) >= now_ms:
from strategy_trend_lib import build_trend_preview_level_rows
trend_preview = _row(cfg, pr)
preview_expires_ms = int(pr["expires_at_ms"])
try:
grid = json.loads(trend_preview.get("grid_prices_json") or "[]")
legs = json.loads(trend_preview.get("leg_amounts_json") or "[]")
except Exception:
grid, legs = [], []
for i, pair in enumerate(zip(grid, legs), 1):
trend_preview_levels.append({"i": i, "price": pair[0], "contracts": pair[1]})
trend_preview, trend_preview_levels = build_trend_preview_level_rows(trend_preview)
elif pr:
trend_preview_expired = True
return {
+52
View File
@@ -0,0 +1,52 @@
"""趋势回调预览:参考价首仓止盈与补仓后止盈。"""
from __future__ import annotations
import json
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from strategy_trend_lib import ( # noqa: E402
build_trend_preview_level_rows,
calc_planned_reward_risk_ratio,
calc_take_profit_for_rr,
)
class TestTrendPreviewTp(unittest.TestCase):
def test_short_ref_price_first_tp_matches_form_tp(self):
ref, sl, tp = 72.6, 75.5, 65.0
rr = calc_planned_reward_risk_ratio("short", ref, sl, tp)
self.assertIsNotNone(rr)
first_tp = calc_take_profit_for_rr("short", ref, sl, rr)
self.assertAlmostEqual(first_tp, tp, places=2)
def test_preview_levels_include_first_and_dca_tp(self):
preview = {
"direction": "short",
"live_price_ref": 72.6,
"stop_loss": 75.5,
"take_profit": 65.0,
"first_order_amount": 1113,
"grid_prices_json": json.dumps([73.42, 73.83]),
"leg_amounts_json": json.dumps([222, 222]),
}
enriched, rows = build_trend_preview_level_rows(preview)
self.assertEqual(enriched["preview_unified_stop_loss"], 75.5)
self.assertAlmostEqual(enriched["preview_first_take_profit"], 65.0, places=1)
self.assertEqual(len(rows), 3)
self.assertEqual(rows[0]["label"], "首仓")
self.assertAlmostEqual(rows[0]["take_profit"], 65.0, places=1)
self.assertEqual(rows[0]["stop_loss"], 75.5)
self.assertIsNotNone(rows[1]["avg_entry"])
self.assertIsNotNone(rows[1]["take_profit"])
self.assertEqual(rows[1]["stop_loss"], 75.5)
# 做空:补仓价上移 → 均价上移 → 同等 RR 下止盈价上移
self.assertGreater(rows[2]["take_profit"], rows[1]["take_profit"])
if __name__ == "__main__":
unittest.main()