From 84abf7e7f76dd8325b08dc79228e1219021dea40 Mon Sep 17 00:00:00 2001 From: dekun Date: Sun, 7 Jun 2026 17:09:22 +0800 Subject: [PATCH] fix(trend): correct DCA triggers and partial-position PnL across exchanges Co-authored-by: Cursor --- crypto_monitor_gate_bot/app.py | 247 +------------------ scripts/backfill_trend_strategy_snapshots.py | 6 +- strategy_trend_lib.py | 34 +++ strategy_trend_register.py | 63 +++-- tests/test_trend_dca_pnl.py | 37 +++ 5 files changed, 122 insertions(+), 265 deletions(-) create mode 100644 tests/test_trend_dca_pnl.py diff --git a/crypto_monitor_gate_bot/app.py b/crypto_monitor_gate_bot/app.py index 0759c7c..a4172dc 100644 --- a/crypto_monitor_gate_bot/app.py +++ b/crypto_monitor_gate_bot/app.py @@ -4700,242 +4700,14 @@ def _trend_weighted_avg(old_avg, old_amt, fill_px, add_amt): return float(fill_px or 0) -def _trend_plan_stop_status(result_label): - if result_label == "止盈": - return "stopped_tp" - if result_label == "止损": - return "stopped_sl" - return "stopped_manual" - - -def _trend_plan_trade_exists(conn, plan_id): - try: - return conn.execute( - "SELECT id FROM trade_records WHERE trend_plan_id=? LIMIT 1", - (int(plan_id),), - ).fetchone() is not None - except Exception: - return False - - -def _trend_finalize_plan(conn, row, result_label, exit_price, closed_at=None): - """平仓后记账、撤单、结束计划。""" - plan_id = int(row["id"]) - active = conn.execute( - "SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", - (plan_id,), - ).fetchone() - if not active: - return - row = active - sym = row["symbol"] - direction = row["direction"] or "long" - ex_sym = row["exchange_symbol"] or normalize_exchange_symbol(sym) - closed_at = closed_at or app_now_str() - opened_at = row["opened_at"] or app_now_str() - hold_seconds = calc_hold_seconds(opened_at, parse_dt_for_trading_day(closed_at) or app_now()) - margin_cap = float(row["plan_margin_capital"] or 0) - lev = int(row["leverage"] or 1) - avg_e = float(row["avg_entry_price"] or 0) - pnl_amount = calc_pnl(direction, avg_e, float(exit_price), margin_cap, lev) - res = normalize_result_with_pnl(result_label, pnl_amount) - risk_amt = calc_risk_amount_from_plan(direction, float(row["add_upper"]), float(row["stop_loss"]), margin_cap, lev) - planned_rr = calc_rr_ratio(direction, avg_e, float(row["stop_loss"]), float(row["take_profit"])) - try: - cancel_all_open_orders_for_symbol(ex_sym) - except Exception: - try: - cancel_gate_swap_trigger_orders(ex_sym) - except Exception: - pass - st = _trend_plan_stop_status(result_label) - cur = conn.execute( - "UPDATE trend_pullback_plans SET status=?, message=? WHERE id=? AND status='active'", - (st, res, plan_id), - ) - if not getattr(cur, "rowcount", 0): - return - conn.commit() - try: - from strategy_trend_register import build_trend_config - from strategy_wechat_notify import notify_trend_plan_ended - - _tcfg = build_trend_config(sys.modules[__name__]) - notify_trend_plan_ended( - _tcfg, - plan_id=plan_id, - symbol=sym, - direction=direction, - end_type=result_label, - result_label=res, - exit_price=float(exit_price) if exit_price is not None else None, - pnl_amount=float(pnl_amount) if pnl_amount is not None else None, - ) - except Exception: - pass - try: - from strategy_trend_register import build_trend_config - - cfg = app.extensions.get("strategy_trend_cfg") or build_trend_config( - sys.modules[__name__] - ) - closed = conn.execute( - "SELECT * FROM trend_pullback_plans WHERE id=?", (plan_id,) - ).fetchone() - if closed: - from strategy_snapshot_lib import save_trend_plan_snapshot - - save_trend_plan_snapshot( - cfg, - conn, - closed, - result_label=result_label, - exit_price=float(exit_price), - pnl_amount=float(pnl_amount) if pnl_amount is not None else None, - ) - conn.commit() - except Exception: - pass - if _trend_plan_trade_exists(conn, plan_id): - return - session_date = row["session_date"] or get_trading_day() - session_capital = update_session_capital(conn, session_date, pnl_amount) - insert_trade_record( - conn, - symbol=sym, - monitor_type=MONITOR_TYPE_TREND, - direction=direction, - trigger_price=avg_e, - stop_loss=float(row["stop_loss"]), - initial_stop_loss=float(row["stop_loss"]), - take_profit=float(row["take_profit"]), - margin_capital=margin_cap, - leverage=lev, - pnl_amount=pnl_amount, - hold_seconds=hold_seconds, - trade_style="trend_pullback", - risk_amount=risk_amt, - planned_rr=planned_rr, - actual_rr=calc_actual_rr(pnl_amount, risk_amt), - result=res, - opened_at=opened_at, - closed_at=closed_at, - trend_plan_id=plan_id, - ) - send_wechat_msg( - build_wechat_close_message( - symbol=sym, - direction=direction, - result=f"{res}({MONITOR_TYPE_TREND})", - pnl_amount=pnl_amount, - hold_seconds=hold_seconds, - trigger_price=avg_e, - current_price=float(exit_price), - stop_loss=float(row["stop_loss"]), - take_profit=float(row["take_profit"]), - close_order_id="-", - extra_note="计划本金口径:启动时合约可用余额快照;止盈由程序监控", - session_capital_fallback=session_capital, - ) - ) - conn.commit() - - def check_trend_pullback_plans(): - ok_live, _ = ensure_exchange_live_ready() - if not ok_live: - return - conn = get_db() - rows = conn.execute("SELECT * FROM trend_pullback_plans WHERE status='active'").fetchall() - for row in rows: - try: - sym = row["symbol"] - direction = (row["direction"] or "long").lower() - ex_sym = row["exchange_symbol"] or normalize_exchange_symbol(sym) - sl = float(row["stop_loss"]) - upper = float(row["add_upper"]) - tp = float(row["take_profit"]) - lev = int(row["leverage"] or 1) - p = get_price(sym) - if not p: - continue - pf = float(p) - last_p = row["last_mark_price"] - last_pf = float(last_p) if last_p is not None else pf + """轮询趋势回调:共用 strategy_trend_register(补仓触达 + 空仓连续确认)。""" + from strategy_trend_register import build_trend_config, check_trend_pullback_plans as _check - pos = get_live_position_contracts(ex_sym, direction) - if pos is None: - continue - - legs_done = int(row["legs_done"] or 0) - dca_legs = int(row["dca_legs"] or 0) - leg_amounts = [] - try: - leg_amounts = [float(x) for x in json.loads(row["leg_amounts_json"] or "[]")] - except Exception: - leg_amounts = [] - grid = [] - try: - grid = json.loads(row["grid_prices_json"] or "[]") - except Exception: - grid = [] - - hit_tp = (direction == "long" and pf >= tp) or (direction == "short" and pf <= tp) - if hit_tp and pos > 0: - try: - exchange.set_leverage(lev, ex_sym) - side = "sell" if direction == "long" else "buy" - params = build_gate_order_params(direction, reduce_only=True) - close_resp = exchange.create_order(ex_sym, "market", side, float(pos), None, params) - exit_p = extract_trade_price_from_order(close_resp) or pf - except Exception as e: - if not is_no_position_error(str(e)): - continue - exit_p = pf - _trend_finalize_plan(conn, row, "止盈", exit_p) - continue - - if pos <= 0 and int(row["first_order_done"] or 0): - exit_p = pf - _trend_finalize_plan(conn, row, "止损", exit_p) - continue - - if int(row["first_order_done"] or 0) and legs_done < len(grid) and legs_done < len(leg_amounts): - level = float(grid[legs_done]) - fired = False - if direction == "long": - if last_pf > level and pf <= level: - fired = True - else: - if last_pf < level and pf >= level: - fired = True - if fired: - amt = float(exchange.amount_to_precision(ex_sym, leg_amounts[legs_done])) - if amt > 0: - add_resp = _trend_market_add_contracts(ex_sym, direction, amt, lev) - fill_px = extract_trade_price_from_order(add_resp) or pf - old_avg = float(row["avg_entry_price"] or fill_px) - old_open = float(row["order_amount_open"] or 0) - new_open = old_open + amt - new_avg = _trend_weighted_avg(old_avg, old_open, fill_px, amt) - conn.execute( - "UPDATE trend_pullback_plans SET legs_done=?, avg_entry_price=?, order_amount_open=?, last_mark_price=? WHERE id=?", - (legs_done + 1, new_avg, new_open, pf, row["id"]), - ) - row = conn.execute("SELECT * FROM trend_pullback_plans WHERE id=?", (row["id"],)).fetchone() - try: - _trend_refresh_stop_only(ex_sym, direction, sl) - except Exception: - pass - - conn.execute( - "UPDATE trend_pullback_plans SET last_mark_price=? WHERE id=?", - (pf, row["id"]), - ) - except Exception: - continue - conn.commit() - conn.close() + cfg = app.extensions.get("strategy_trend_cfg") or build_trend_config( + sys.modules[__name__] + ) + _check(cfg) # 关键位监控(前端已下线时仍保留函数体,后台默认不再调用) @@ -7192,7 +6964,12 @@ def stop_trend_pullback(pid): except Exception: pass try: - _trend_finalize_plan(conn, row, "手动平仓", exit_p) + from strategy_trend_register import _finalize_plan, build_trend_config + + cfg = app.extensions.get("strategy_trend_cfg") or build_trend_config( + sys.modules[__name__] + ) + _finalize_plan(cfg, conn, row, "手动平仓", exit_p) except Exception as e: conn.execute( "UPDATE trend_pullback_plans SET status='stopped_manual', message=? " diff --git a/scripts/backfill_trend_strategy_snapshots.py b/scripts/backfill_trend_strategy_snapshots.py index 151273f..f55edcf 100644 --- a/scripts/backfill_trend_strategy_snapshots.py +++ b/scripts/backfill_trend_strategy_snapshots.py @@ -4,10 +4,10 @@ 适用:gate_bot 等在计划结束(止盈/止损/手动)时因 strategy_trend_cfg 未注册而漏写快照的历史数据。 保本移交路径通常已有快照,本脚本默认跳过「已有任意快照」的计划。 -用法(在仓库根目录): - python scripts/backfill_trend_strategy_snapshots.py \\ +用法(在仓库根目录,Linux 请用 python3): + python3 scripts/backfill_trend_strategy_snapshots.py \\ --db crypto_monitor_gate_bot/crypto.db --dry-run - python scripts/backfill_trend_strategy_snapshots.py \\ + python3 scripts/backfill_trend_strategy_snapshots.py \\ --db crypto_monitor_gate_bot/crypto.db --apply """ from __future__ import annotations diff --git a/strategy_trend_lib.py b/strategy_trend_lib.py index aaf8d88..fc72675 100644 --- a/strategy_trend_lib.py +++ b/strategy_trend_lib.py @@ -24,6 +24,40 @@ def calc_risk_fraction(direction: str, entry_price: float, stop_loss: float) -> return None +def trend_effective_margin_capital(plan: dict) -> float: + """按已开仓张数占计划总张数比例折算保证金(首仓/部分补仓时的盈亏估算)。""" + try: + plan_margin = float(plan.get("plan_margin_capital") or 0) + target = float(plan.get("target_order_amount") or 0) + open_amt = float(plan.get("order_amount_open") or 0) + except (TypeError, ValueError): + return float((plan or {}).get("plan_margin_capital") or 0) + if plan_margin <= 0: + return 0.0 + if target > 0 and open_amt > 0: + return round(plan_margin * min(1.0, open_amt / target), 8) + try: + first = float(plan.get("first_order_amount") or 0) + except (TypeError, ValueError): + first = 0.0 + if target > 0 and first > 0: + return round(plan_margin * min(1.0, first / target), 8) + return plan_margin + + +def trend_dca_level_reached(direction: str, mark_price: float, level: float) -> bool: + """做空:价升触达/越过档位即应补仓;做多:价跌触达/越过档位。""" + d = (direction or "long").strip().lower() + try: + pf = float(mark_price) + lv = float(level) + except (TypeError, ValueError): + return False + if d == "long": + return pf <= lv + return pf >= lv + + def validate_trend_bounds(direction: str, stop_loss: float, add_upper: float) -> Optional[str]: direction = (direction or "long").strip().lower() if direction == "long": diff --git a/strategy_trend_register.py b/strategy_trend_register.py index a6605c1..ec6c7dd 100644 --- a/strategy_trend_register.py +++ b/strategy_trend_register.py @@ -24,6 +24,8 @@ from strategy_trend_lib import ( build_grid_prices, build_leg_amounts_json, calc_risk_fraction, + trend_dca_level_reached, + trend_effective_margin_capital, validate_trend_bounds, ) from strategy_trade_labels import ( @@ -548,14 +550,22 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) - closed_at = m.app_now_str() opened_at = row["opened_at"] or closed_at hold_seconds = m.calc_hold_seconds(opened_at, m.parse_dt_for_trading_day(closed_at) or m.app_now()) - margin_cap = float(row["plan_margin_capital"] or 0) + plan_margin = float(row["plan_margin_capital"] or 0) + margin_cap = trend_effective_margin_capital(_row(cfg, row)) lev = int(row["leverage"] or 1) avg_e = float(row["avg_entry_price"] or 0) pnl_amount = m.calc_pnl(direction, avg_e, float(exit_price), margin_cap, lev) res = m.normalize_result_with_pnl(result_label, pnl_amount) risk_amt = m.calc_risk_amount_from_plan( - direction, float(row["add_upper"]), float(row["stop_loss"]), margin_cap, lev + direction, float(row["add_upper"]), float(row["stop_loss"]), plan_margin, lev ) + try: + target = float(row["target_order_amount"] or 0) + open_amt = float(row["order_amount_open"] or 0) + if risk_amt is not None and target > 0 and open_amt > 0: + risk_amt = round(float(risk_amt) * min(1.0, open_amt / target), 6) + except (TypeError, ValueError): + pass planned_rr = m.calc_rr_ratio(direction, avg_e, float(row["stop_loss"]), float(row["take_profit"])) try: cancel_symbol_orders(cfg, ex_sym) @@ -785,32 +795,31 @@ def check_trend_pullback_plans(cfg: dict) -> None: _TREND_FLAT_STREAK.pop(plan_id, None) continue if int(row["first_order_done"] or 0) and legs_done < len(grid) and legs_done < len(leg_amounts): - level = float(grid[legs_done]) - fired = False - if direction == "long": - fired = last_pf > level and pf <= level - else: - fired = last_pf < level and pf >= level - if fired: + while legs_done < len(grid) and legs_done < len(leg_amounts): + level = float(grid[legs_done]) + if not trend_dca_level_reached(direction, pf, level): + break amt = float(m.exchange.amount_to_precision(ex_sym, leg_amounts[legs_done])) - if amt > 0: - add_resp = trend_market_add(cfg, ex_sym, direction, amt, lev) - fill_px = m.extract_trade_price_from_order(add_resp) or pf - old_avg = float(row["avg_entry_price"] or fill_px) - old_open = float(row["order_amount_open"] or 0) - new_avg = _weighted_avg(old_avg, old_open, fill_px, amt) - conn.execute( - "UPDATE trend_pullback_plans SET legs_done=?, avg_entry_price=?, " - "order_amount_open=?, last_mark_price=? WHERE id=?", - (legs_done + 1, new_avg, old_open + amt, pf, row["id"]), - ) - row = conn.execute( - "SELECT * FROM trend_pullback_plans WHERE id=?", (row["id"],) - ).fetchone() - try: - trend_refresh_stop_only(cfg, ex_sym, direction, sl) - except Exception: - pass + if amt <= 0: + break + add_resp = trend_market_add(cfg, ex_sym, direction, amt, lev) + fill_px = m.extract_trade_price_from_order(add_resp) or pf + old_avg = float(row["avg_entry_price"] or fill_px) + old_open = float(row["order_amount_open"] or 0) + new_avg = _weighted_avg(old_avg, old_open, fill_px, amt) + legs_done += 1 + conn.execute( + "UPDATE trend_pullback_plans SET legs_done=?, avg_entry_price=?, " + "order_amount_open=?, last_mark_price=? WHERE id=?", + (legs_done, new_avg, old_open + amt, pf, row["id"]), + ) + row = conn.execute( + "SELECT * FROM trend_pullback_plans WHERE id=?", (row["id"],) + ).fetchone() + try: + trend_refresh_stop_only(cfg, ex_sym, direction, sl) + except Exception: + pass conn.execute( "UPDATE trend_pullback_plans SET last_mark_price=? WHERE id=?", (pf, row["id"]), diff --git a/tests/test_trend_dca_pnl.py b/tests/test_trend_dca_pnl.py new file mode 100644 index 0000000..81ee689 --- /dev/null +++ b/tests/test_trend_dca_pnl.py @@ -0,0 +1,37 @@ +"""趋势回调:补仓触达与有效保证金估算。""" +from strategy_trend_lib import trend_dca_level_reached, trend_effective_margin_capital + + +def test_trend_dca_short_monotonic_up_fills_missed_legs(): + """做空价升:旧逻辑需 last