From 52d97482f2849cae705eaa4b41c7f39a3d822512 Mon Sep 17 00:00:00 2001 From: dekun Date: Thu, 4 Jun 2026 13:35:21 +0800 Subject: [PATCH] feat(strategy): WeChat notify on trend and roll plan start/end Add shared strategy_wechat_notify helpers; hook trend execute/finalize and roll group open/close across four exchanges and Gate bot. Co-authored-by: Cursor --- crypto_monitor_gate_bot/app.py | 39 +++++++ strategy_register.py | 53 ++++++++- strategy_roll_monitor_lib.py | 17 ++- strategy_trend_register.py | 78 ++++++++++++++ strategy_wechat_notify.py | 192 +++++++++++++++++++++++++++++++++ 5 files changed, 373 insertions(+), 6 deletions(-) create mode 100644 strategy_wechat_notify.py diff --git a/crypto_monitor_gate_bot/app.py b/crypto_monitor_gate_bot/app.py index 382b44f..c4d2c23 100644 --- a/crypto_monitor_gate_bot/app.py +++ b/crypto_monitor_gate_bot/app.py @@ -4604,6 +4604,23 @@ def _trend_finalize_plan(conn, row, result_label, exit_price, closed_at=None): if not getattr(cur, "rowcount", 0): return conn.commit() + try: + from strategy_trend_register import build_trend_config + from strategy_wechat_notify import notify_trend_plan_ended + + _tcfg = build_trend_config(sys.modules[__name__]) + notify_trend_plan_ended( + _tcfg, + plan_id=plan_id, + symbol=sym, + direction=direction, + end_type=result_label, + result_label=res, + exit_price=float(exit_price) if exit_price is not None else None, + pnl_amount=float(pnl_amount) if pnl_amount is not None else None, + ) + except Exception: + pass try: cfg = app.extensions.get("strategy_trend_cfg") or {} closed = conn.execute( @@ -6624,6 +6641,28 @@ def execute_trend_pullback(): ) conn.execute("DELETE FROM trend_pullback_previews WHERE id=?", (pid,)) conn.commit() + try: + from strategy_trend_register import build_trend_config + from strategy_wechat_notify import notify_trend_plan_started + + _tcfg = build_trend_config(sys.modules[__name__]) + notify_trend_plan_started( + _tcfg, + plan_id=new_plan_id, + symbol=symbol, + direction=direction, + leverage=leverage, + stop_loss=stop_loss, + take_profit=take_profit, + add_upper=add_upper, + risk_percent=risk_percent, + dca_legs=n_legs, + first_order_amount=first_amt, + avg_entry=fill1, + snapshot_usdt=snap, + ) + except Exception: + pass conn.close() flash( f"趋势回调已执行:可用余额(执行时){round(snap, 2)}U;计划保证金约 {round(margin_plan, 2)}U;" diff --git a/strategy_register.py b/strategy_register.py index 08689a6..26eafef 100644 --- a/strategy_register.py +++ b/strategy_register.py @@ -132,7 +132,7 @@ def _roll_preview_response(cfg: dict, data: dict, json_mode: bool = False) -> di if not mon: conn.close() return {"ok": False, "msg": "未找到该币种同向的下单监控持仓,请先在「实盘下单」开仓"} - rg, legs_done = _get_or_create_roll_group_meta(conn, mon) + rg, legs_done, _is_new = _get_or_create_roll_group_meta(conn, mon) conn.close() pos = cfg["get_position"](ex_sym, direction) qty = float(pos.get("contracts") or 0) @@ -217,7 +217,7 @@ def _roll_execute(cfg: dict, data: dict) -> tuple[bool, str]: mon = _get_active_monitor(conn, cfg, symbol, direction) if not mon: return False, "监控单已不存在" - rg, legs_done = _get_or_create_roll_group_meta(conn, mon) + rg, legs_done, roll_is_new = _get_or_create_roll_group_meta(conn, mon) new_sl = float(preview["new_stop_loss"]) tp0 = float(preview["initial_take_profit"]) if add_mode == "market": @@ -253,6 +253,21 @@ def _roll_execute(cfg: dict, data: dict) -> tuple[bool, str]: (legs_done + 1, cfg["app_now_str"](), rg["id"]), ) conn.commit() + if roll_is_new: + try: + from strategy_wechat_notify import notify_roll_group_started + + notify_roll_group_started( + cfg, + group_id=int(rg["id"]), + symbol=symbol, + direction=direction, + order_monitor_id=int(mon["id"]), + initial_take_profit=tp0, + initial_stop_loss=float(mon.get("stop_loss") or new_sl), + ) + except Exception: + pass return True, f"已挂限价加仓单 #{oid},成交后请在页面点「同步持仓并更新止损」" cfg["replace_tpsl"](ex_sym, direction, new_sl, tp0, mon) conn.execute( @@ -284,6 +299,23 @@ def _roll_execute(cfg: dict, data: dict) -> tuple[bool, str]: (new_sl, mon["id"]), ) conn.commit() + try: + from strategy_wechat_notify import ( + notify_roll_group_started, + ) + + if roll_is_new: + notify_roll_group_started( + cfg, + group_id=int(rg["id"]), + symbol=symbol, + direction=direction, + order_monitor_id=int(mon["id"]), + initial_take_profit=tp0, + initial_stop_loss=new_sl, + ) + except Exception: + pass return True, f"滚仓第 {legs_done + 1} 腿已市价成交,交易所止损已更新,止盈仍为首仓 {tp0}" except Exception as e: fe = cfg.get("friendly_error") @@ -304,14 +336,14 @@ def _get_active_monitor(conn, cfg: dict, symbol: str, direction: str) -> Optiona return _row_to_dict(row) if row else None -def _get_or_create_roll_group_meta(conn, mon: dict) -> tuple[dict, int]: +def _get_or_create_roll_group_meta(conn, mon: dict) -> tuple[dict, int, bool]: row = conn.execute( "SELECT * FROM roll_groups WHERE order_monitor_id=? AND status='active' ORDER BY id DESC LIMIT 1", (mon["id"],), ).fetchone() if row: d = _row_to_dict(row) - return d, int(d.get("leg_count") or 0) + return d, int(d.get("leg_count") or 0), False now = mon.get("created_at") or "" cur = conn.execute( """INSERT INTO roll_groups ( @@ -335,6 +367,17 @@ def _get_or_create_roll_group_meta(conn, mon: dict) -> tuple[dict, int]: ), ) gid = int(cur.lastrowid) - return {"id": gid, "leg_count": 0, "initial_take_profit": mon.get("take_profit")}, 0 + return ( + { + "id": gid, + "leg_count": 0, + "initial_take_profit": mon.get("take_profit"), + "initial_stop_loss": mon.get("stop_loss"), + "symbol": mon.get("symbol"), + "direction": mon.get("direction"), + }, + 0, + True, + ) diff --git a/strategy_roll_monitor_lib.py b/strategy_roll_monitor_lib.py index 905b813..f748a10 100644 --- a/strategy_roll_monitor_lib.py +++ b/strategy_roll_monitor_lib.py @@ -72,10 +72,25 @@ def _close_roll_group( "UPDATE roll_legs SET status='cancelled' WHERE id=? AND status='pending'", (ld["id"],), ) - conn.execute( + cur = conn.execute( "UPDATE roll_groups SET status='closed', updated_at=? WHERE id=? AND status='active'", (_now(cfg), gid), ) + if getattr(cur, "rowcount", 0): + try: + from strategy_wechat_notify import notify_roll_group_ended + + reason = "下单监控已结案或交易所无同向持仓" + notify_roll_group_ended( + cfg, + group_id=gid, + symbol=group.get("symbol") or "", + direction=group.get("direction") or "long", + reason=reason, + leg_count=int(group.get("leg_count") or 0), + ) + except Exception: + pass try: from strategy_snapshot_lib import save_roll_group_snapshot diff --git a/strategy_trend_register.py b/strategy_trend_register.py index 46b6f04..b787a8e 100644 --- a/strategy_trend_register.py +++ b/strategy_trend_register.py @@ -79,8 +79,33 @@ def build_trend_config(app_module: Any = None, **kw) -> dict[str, Any]: except Exception: return None + def send_wechat(content): + fn = getattr(m, "send_wechat_msg", None) + if callable(fn): + fn(content) + + def wechat_account_label(): + fn = getattr(m, "_wechat_account_label", None) + if callable(fn): + try: + return fn() + except Exception: + pass + return getattr(m, "EXCHANGE_DISPLAY_NAME", "") or "" + + def wechat_direction_text(direction): + fn = getattr(m, "_wechat_direction_text", None) + if callable(fn): + try: + return fn(direction) + except Exception: + pass + d = (direction or "long").strip().lower() + return "做多" if d == "long" else "做空" + return { "app_module": m, + "exchange_display": getattr(m, "EXCHANGE_DISPLAY_NAME", ""), "login_required": m.login_required, "get_db": m.get_db, "row_to_dict": m.row_to_dict, @@ -93,6 +118,10 @@ def build_trend_config(app_module: Any = None, **kw) -> dict[str, Any]: "max_active_positions": int(getattr(m, "MAX_ACTIVE_POSITIONS", 1)), "reset_hour": int(getattr(m, "TRADING_DAY_RESET_HOUR", 8)), "monitor_type_trend": MONITOR_TYPE_TREND, + "send_wechat": send_wechat, + "format_price": getattr(m, "format_price_for_symbol", None), + "wechat_account_label": wechat_account_label, + "wechat_direction_text": wechat_direction_text, } @@ -504,6 +533,21 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) - if not getattr(cur, "rowcount", 0): return conn.commit() + try: + from strategy_wechat_notify import notify_trend_plan_ended + + notify_trend_plan_ended( + cfg, + plan_id=plan_id, + symbol=sym, + direction=direction, + end_type=result_label, + result_label=res, + exit_price=float(exit_price) if exit_price is not None else None, + pnl_amount=float(pnl_amount) if pnl_amount is not None else None, + ) + except Exception: + pass try: closed = conn.execute( "SELECT * FROM trend_pullback_plans WHERE id=?", (plan_id,) @@ -945,6 +989,20 @@ def apply_manual_breakeven(cfg: dict, conn, row, offset_pct=None) -> tuple[bool, send = getattr(m, "send_wechat_msg", None) pf = getattr(m, "format_price_for_symbol", None) fmt = (lambda s, p: pf(s, p)) if callable(pf) else (lambda _s, p: str(p)) + try: + from strategy_wechat_notify import notify_trend_plan_ended + + notify_trend_plan_ended( + cfg, + plan_id=plan_id, + symbol=sym, + direction=direction, + end_type="保本移交", + result_label=TREND_HANDOFF_TRADE_NOTE, + extra=f"已移交下单监控 #{mon_id};止损 {fmt(sym, new_sl)} | 止盈 {fmt(sym, tp)}", + ) + except Exception: + pass if callable(send): lines = [ f"# ✅ {sym} 趋势回调保本移交", @@ -1213,6 +1271,26 @@ def register_trend_routes(app: Flask, cfg: dict) -> None: ) conn.execute("DELETE FROM trend_pullback_previews WHERE id=?", (pid,)) conn.commit() + try: + from strategy_wechat_notify import notify_trend_plan_started + + notify_trend_plan_started( + cfg, + plan_id=new_id, + symbol=symbol, + direction=direction, + leverage=leverage, + stop_loss=stop_loss, + take_profit=float(pr["take_profit"]), + add_upper=float(pr["add_upper"]), + risk_percent=float(pr["risk_percent"] or 5), + dca_legs=int(pr["dca_legs"] or 0), + first_order_amount=first_amt, + avg_entry=fill1, + snapshot_usdt=float(snap_now), + ) + except Exception: + pass conn.close() flash("趋势回调已执行:首仓已成交并挂交易所止损,止盈由程序监控。") return _redirect_trend() diff --git a/strategy_wechat_notify.py b/strategy_wechat_notify.py new file mode 100644 index 0000000..dabaabf --- /dev/null +++ b/strategy_wechat_notify.py @@ -0,0 +1,192 @@ +"""策略计划(趋势回调 / 滚仓)开始与结束 — 企业微信推送(四所共用)。""" +from __future__ import annotations + +from typing import Any, Optional + +from wechat_notify_lib import wechat_direction_label + + +def _send(cfg: dict[str, Any], content: str) -> None: + fn = cfg.get("send_wechat") + if callable(fn): + try: + fn(content) + return + except Exception: + pass + m = cfg.get("app_module") + if m is not None: + sw = getattr(m, "send_wechat_msg", None) + if callable(sw): + try: + sw(content) + except Exception: + pass + + +def _account(cfg: dict[str, Any]) -> str: + fn = cfg.get("wechat_account_label") + if callable(fn): + try: + return str(fn()).strip() or _exchange(cfg) + except Exception: + pass + return _exchange(cfg) + + +def _exchange(cfg: dict[str, Any]) -> str: + return str(cfg.get("exchange_display") or "").strip() or "交易账户" + + +def _dir_text(cfg: dict[str, Any], direction: str) -> str: + fn = cfg.get("wechat_direction_text") + if callable(fn): + try: + return str(fn(direction)) + except Exception: + pass + return wechat_direction_label(direction) + + +def _fmt_price(cfg: dict[str, Any], symbol: str, price: Any) -> str: + if price is None or price == "": + return "—" + fn = cfg.get("format_price") or cfg.get("price_fmt") + if callable(fn): + try: + return str(fn(symbol, price)) + except Exception: + pass + m = cfg.get("app_module") + pf = getattr(m, "format_price_for_symbol", None) if m else None + if callable(pf): + try: + return str(pf(symbol, price)) + except Exception: + pass + try: + return str(round(float(price), 8)) + except (TypeError, ValueError): + return str(price) + + +def _fmt_pnl(pnl: Any) -> str: + if pnl is None: + return "—" + try: + v = float(pnl) + return f"{'+' if v > 0 else ''}{round(v, 2)} U" + except (TypeError, ValueError): + return str(pnl) + + +def notify_trend_plan_started( + cfg: dict[str, Any], + *, + plan_id: int, + symbol: str, + direction: str, + leverage: int, + stop_loss: float, + take_profit: float, + add_upper: float, + risk_percent: float, + dca_legs: int, + first_order_amount: float, + avg_entry: Optional[float] = None, + snapshot_usdt: Optional[float] = None, +) -> None: + sym = symbol or "—" + lines = [ + f"# 🚀 {sym} 趋势回调计划已开始", + f"**账户:{_account(cfg)}**", + f"- 计划 ID:**{plan_id}**", + f"- 方向:{_dir_text(cfg, direction)}|杠杆 **{int(leverage or 1)}x**", + f"- 止损:{_fmt_price(cfg, sym, stop_loss)}|止盈:{_fmt_price(cfg, sym, take_profit)}", + f"- 补仓区:{_fmt_price(cfg, sym, add_upper)}|补仓档 **{int(dca_legs or 0)}** 档", + f"- 风险:**{risk_percent}%**|首仓张数:**{first_order_amount}**", + ] + if avg_entry is not None: + lines.append(f"- 首仓成交价:{_fmt_price(cfg, sym, avg_entry)}") + if snapshot_usdt is not None: + try: + lines.append(f"- 启动时合约可用:**{round(float(snapshot_usdt), 2)} U**") + except (TypeError, ValueError): + pass + lines.append("- 说明:交易所已挂止损;止盈由程序监控;结束/保本将另行推送") + _send(cfg, "\n".join(lines)) + + +def notify_trend_plan_ended( + cfg: dict[str, Any], + *, + plan_id: int, + symbol: str, + direction: str, + end_type: str, + result_label: Optional[str] = None, + exit_price: Optional[float] = None, + pnl_amount: Optional[float] = None, + extra: Optional[str] = None, +) -> None: + sym = symbol or "—" + res = (result_label or end_type or "—").strip() + lines = [ + f"# 🏁 {sym} 趋势回调计划已结束", + f"**账户:{_account(cfg)}**", + f"- 计划 ID:**{plan_id}**", + f"- 方向:{_dir_text(cfg, direction)}", + f"- 结束方式:**{end_type}**", + f"- 结果:**{res}**", + ] + if exit_price is not None: + lines.append(f"- 离场参考价:{_fmt_price(cfg, sym, exit_price)}") + if pnl_amount is not None: + lines.append(f"- 本单盈亏:**{_fmt_pnl(pnl_amount)}**") + if extra: + lines.append(f"- {extra}") + _send(cfg, "\n".join(lines)) + + +def notify_roll_group_started( + cfg: dict[str, Any], + *, + group_id: int, + symbol: str, + direction: str, + order_monitor_id: int, + initial_take_profit: Optional[float] = None, + initial_stop_loss: Optional[float] = None, +) -> None: + sym = symbol or "—" + lines = [ + f"# 🚀 {sym} 滚仓计划已开始", + f"**账户:{_account(cfg)}**", + f"- 滚仓组 ID:**{group_id}**|绑定下单监控 **#{order_monitor_id}**", + f"- 方向:{_dir_text(cfg, direction)}", + f"- 首仓止盈(锁定):{_fmt_price(cfg, sym, initial_take_profit)}", + f"- 当前止损:{_fmt_price(cfg, sym, initial_stop_loss)}", + "- 说明:顺势加仓为人工触发;组结束(无持仓/监控结案)将另行推送", + ] + _send(cfg, "\n".join(lines)) + + +def notify_roll_group_ended( + cfg: dict[str, Any], + *, + group_id: int, + symbol: str, + direction: str, + reason: str, + leg_count: int = 0, +) -> None: + sym = symbol or "—" + lines = [ + f"# 🏁 {sym} 滚仓计划已结束", + f"**账户:{_account(cfg)}**", + f"- 滚仓组 ID:**{group_id}**", + f"- 方向:{_dir_text(cfg, direction)}", + f"- 结束原因:**{reason}**", + f"- 已完成滚仓腿数:**{int(leg_count or 0)}**", + ] + _send(cfg, "\n".join(lines))