feat(strategy): WeChat notify on trend and roll plan start/end

Add shared strategy_wechat_notify helpers; hook trend execute/finalize and roll group open/close across four exchanges and Gate bot.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-04 13:35:21 +08:00
parent 3b4120a36e
commit 52d97482f2
5 changed files with 373 additions and 6 deletions
+39
View File
@@ -4604,6 +4604,23 @@ def _trend_finalize_plan(conn, row, result_label, exit_price, closed_at=None):
if not getattr(cur, "rowcount", 0):
return
conn.commit()
try:
from strategy_trend_register import build_trend_config
from strategy_wechat_notify import notify_trend_plan_ended
_tcfg = build_trend_config(sys.modules[__name__])
notify_trend_plan_ended(
_tcfg,
plan_id=plan_id,
symbol=sym,
direction=direction,
end_type=result_label,
result_label=res,
exit_price=float(exit_price) if exit_price is not None else None,
pnl_amount=float(pnl_amount) if pnl_amount is not None else None,
)
except Exception:
pass
try:
cfg = app.extensions.get("strategy_trend_cfg") or {}
closed = conn.execute(
@@ -6624,6 +6641,28 @@ def execute_trend_pullback():
)
conn.execute("DELETE FROM trend_pullback_previews WHERE id=?", (pid,))
conn.commit()
try:
from strategy_trend_register import build_trend_config
from strategy_wechat_notify import notify_trend_plan_started
_tcfg = build_trend_config(sys.modules[__name__])
notify_trend_plan_started(
_tcfg,
plan_id=new_plan_id,
symbol=symbol,
direction=direction,
leverage=leverage,
stop_loss=stop_loss,
take_profit=take_profit,
add_upper=add_upper,
risk_percent=risk_percent,
dca_legs=n_legs,
first_order_amount=first_amt,
avg_entry=fill1,
snapshot_usdt=snap,
)
except Exception:
pass
conn.close()
flash(
f"趋势回调已执行:可用余额(执行时){round(snap, 2)}U;计划保证金约 {round(margin_plan, 2)}U"
+48 -5
View File
@@ -132,7 +132,7 @@ def _roll_preview_response(cfg: dict, data: dict, json_mode: bool = False) -> di
if not mon:
conn.close()
return {"ok": False, "msg": "未找到该币种同向的下单监控持仓,请先在「实盘下单」开仓"}
rg, legs_done = _get_or_create_roll_group_meta(conn, mon)
rg, legs_done, _is_new = _get_or_create_roll_group_meta(conn, mon)
conn.close()
pos = cfg["get_position"](ex_sym, direction)
qty = float(pos.get("contracts") or 0)
@@ -217,7 +217,7 @@ def _roll_execute(cfg: dict, data: dict) -> tuple[bool, str]:
mon = _get_active_monitor(conn, cfg, symbol, direction)
if not mon:
return False, "监控单已不存在"
rg, legs_done = _get_or_create_roll_group_meta(conn, mon)
rg, legs_done, roll_is_new = _get_or_create_roll_group_meta(conn, mon)
new_sl = float(preview["new_stop_loss"])
tp0 = float(preview["initial_take_profit"])
if add_mode == "market":
@@ -253,6 +253,21 @@ def _roll_execute(cfg: dict, data: dict) -> tuple[bool, str]:
(legs_done + 1, cfg["app_now_str"](), rg["id"]),
)
conn.commit()
if roll_is_new:
try:
from strategy_wechat_notify import notify_roll_group_started
notify_roll_group_started(
cfg,
group_id=int(rg["id"]),
symbol=symbol,
direction=direction,
order_monitor_id=int(mon["id"]),
initial_take_profit=tp0,
initial_stop_loss=float(mon.get("stop_loss") or new_sl),
)
except Exception:
pass
return True, f"已挂限价加仓单 #{oid},成交后请在页面点「同步持仓并更新止损」"
cfg["replace_tpsl"](ex_sym, direction, new_sl, tp0, mon)
conn.execute(
@@ -284,6 +299,23 @@ def _roll_execute(cfg: dict, data: dict) -> tuple[bool, str]:
(new_sl, mon["id"]),
)
conn.commit()
try:
from strategy_wechat_notify import (
notify_roll_group_started,
)
if roll_is_new:
notify_roll_group_started(
cfg,
group_id=int(rg["id"]),
symbol=symbol,
direction=direction,
order_monitor_id=int(mon["id"]),
initial_take_profit=tp0,
initial_stop_loss=new_sl,
)
except Exception:
pass
return True, f"滚仓第 {legs_done + 1} 腿已市价成交,交易所止损已更新,止盈仍为首仓 {tp0}"
except Exception as e:
fe = cfg.get("friendly_error")
@@ -304,14 +336,14 @@ def _get_active_monitor(conn, cfg: dict, symbol: str, direction: str) -> Optiona
return _row_to_dict(row) if row else None
def _get_or_create_roll_group_meta(conn, mon: dict) -> tuple[dict, int]:
def _get_or_create_roll_group_meta(conn, mon: dict) -> tuple[dict, int, bool]:
row = conn.execute(
"SELECT * FROM roll_groups WHERE order_monitor_id=? AND status='active' ORDER BY id DESC LIMIT 1",
(mon["id"],),
).fetchone()
if row:
d = _row_to_dict(row)
return d, int(d.get("leg_count") or 0)
return d, int(d.get("leg_count") or 0), False
now = mon.get("created_at") or ""
cur = conn.execute(
"""INSERT INTO roll_groups (
@@ -335,6 +367,17 @@ def _get_or_create_roll_group_meta(conn, mon: dict) -> tuple[dict, int]:
),
)
gid = int(cur.lastrowid)
return {"id": gid, "leg_count": 0, "initial_take_profit": mon.get("take_profit")}, 0
return (
{
"id": gid,
"leg_count": 0,
"initial_take_profit": mon.get("take_profit"),
"initial_stop_loss": mon.get("stop_loss"),
"symbol": mon.get("symbol"),
"direction": mon.get("direction"),
},
0,
True,
)
+16 -1
View File
@@ -72,10 +72,25 @@ def _close_roll_group(
"UPDATE roll_legs SET status='cancelled' WHERE id=? AND status='pending'",
(ld["id"],),
)
conn.execute(
cur = conn.execute(
"UPDATE roll_groups SET status='closed', updated_at=? WHERE id=? AND status='active'",
(_now(cfg), gid),
)
if getattr(cur, "rowcount", 0):
try:
from strategy_wechat_notify import notify_roll_group_ended
reason = "下单监控已结案或交易所无同向持仓"
notify_roll_group_ended(
cfg,
group_id=gid,
symbol=group.get("symbol") or "",
direction=group.get("direction") or "long",
reason=reason,
leg_count=int(group.get("leg_count") or 0),
)
except Exception:
pass
try:
from strategy_snapshot_lib import save_roll_group_snapshot
+78
View File
@@ -79,8 +79,33 @@ def build_trend_config(app_module: Any = None, **kw) -> dict[str, Any]:
except Exception:
return None
def send_wechat(content):
fn = getattr(m, "send_wechat_msg", None)
if callable(fn):
fn(content)
def wechat_account_label():
fn = getattr(m, "_wechat_account_label", None)
if callable(fn):
try:
return fn()
except Exception:
pass
return getattr(m, "EXCHANGE_DISPLAY_NAME", "") or ""
def wechat_direction_text(direction):
fn = getattr(m, "_wechat_direction_text", None)
if callable(fn):
try:
return fn(direction)
except Exception:
pass
d = (direction or "long").strip().lower()
return "做多" if d == "long" else "做空"
return {
"app_module": m,
"exchange_display": getattr(m, "EXCHANGE_DISPLAY_NAME", ""),
"login_required": m.login_required,
"get_db": m.get_db,
"row_to_dict": m.row_to_dict,
@@ -93,6 +118,10 @@ def build_trend_config(app_module: Any = None, **kw) -> dict[str, Any]:
"max_active_positions": int(getattr(m, "MAX_ACTIVE_POSITIONS", 1)),
"reset_hour": int(getattr(m, "TRADING_DAY_RESET_HOUR", 8)),
"monitor_type_trend": MONITOR_TYPE_TREND,
"send_wechat": send_wechat,
"format_price": getattr(m, "format_price_for_symbol", None),
"wechat_account_label": wechat_account_label,
"wechat_direction_text": wechat_direction_text,
}
@@ -504,6 +533,21 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) -
if not getattr(cur, "rowcount", 0):
return
conn.commit()
try:
from strategy_wechat_notify import notify_trend_plan_ended
notify_trend_plan_ended(
cfg,
plan_id=plan_id,
symbol=sym,
direction=direction,
end_type=result_label,
result_label=res,
exit_price=float(exit_price) if exit_price is not None else None,
pnl_amount=float(pnl_amount) if pnl_amount is not None else None,
)
except Exception:
pass
try:
closed = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=?", (plan_id,)
@@ -945,6 +989,20 @@ def apply_manual_breakeven(cfg: dict, conn, row, offset_pct=None) -> tuple[bool,
send = getattr(m, "send_wechat_msg", None)
pf = getattr(m, "format_price_for_symbol", None)
fmt = (lambda s, p: pf(s, p)) if callable(pf) else (lambda _s, p: str(p))
try:
from strategy_wechat_notify import notify_trend_plan_ended
notify_trend_plan_ended(
cfg,
plan_id=plan_id,
symbol=sym,
direction=direction,
end_type="保本移交",
result_label=TREND_HANDOFF_TRADE_NOTE,
extra=f"已移交下单监控 #{mon_id};止损 {fmt(sym, new_sl)} 止盈 {fmt(sym, tp)}",
)
except Exception:
pass
if callable(send):
lines = [
f"# ✅ {sym} 趋势回调保本移交",
@@ -1213,6 +1271,26 @@ def register_trend_routes(app: Flask, cfg: dict) -> None:
)
conn.execute("DELETE FROM trend_pullback_previews WHERE id=?", (pid,))
conn.commit()
try:
from strategy_wechat_notify import notify_trend_plan_started
notify_trend_plan_started(
cfg,
plan_id=new_id,
symbol=symbol,
direction=direction,
leverage=leverage,
stop_loss=stop_loss,
take_profit=float(pr["take_profit"]),
add_upper=float(pr["add_upper"]),
risk_percent=float(pr["risk_percent"] or 5),
dca_legs=int(pr["dca_legs"] or 0),
first_order_amount=first_amt,
avg_entry=fill1,
snapshot_usdt=float(snap_now),
)
except Exception:
pass
conn.close()
flash("趋势回调已执行:首仓已成交并挂交易所止损,止盈由程序监控。")
return _redirect_trend()
+192
View File
@@ -0,0 +1,192 @@
"""策略计划(趋势回调 / 滚仓)开始与结束 — 企业微信推送(四所共用)。"""
from __future__ import annotations
from typing import Any, Optional
from wechat_notify_lib import wechat_direction_label
def _send(cfg: dict[str, Any], content: str) -> None:
fn = cfg.get("send_wechat")
if callable(fn):
try:
fn(content)
return
except Exception:
pass
m = cfg.get("app_module")
if m is not None:
sw = getattr(m, "send_wechat_msg", None)
if callable(sw):
try:
sw(content)
except Exception:
pass
def _account(cfg: dict[str, Any]) -> str:
fn = cfg.get("wechat_account_label")
if callable(fn):
try:
return str(fn()).strip() or _exchange(cfg)
except Exception:
pass
return _exchange(cfg)
def _exchange(cfg: dict[str, Any]) -> str:
return str(cfg.get("exchange_display") or "").strip() or "交易账户"
def _dir_text(cfg: dict[str, Any], direction: str) -> str:
fn = cfg.get("wechat_direction_text")
if callable(fn):
try:
return str(fn(direction))
except Exception:
pass
return wechat_direction_label(direction)
def _fmt_price(cfg: dict[str, Any], symbol: str, price: Any) -> str:
if price is None or price == "":
return ""
fn = cfg.get("format_price") or cfg.get("price_fmt")
if callable(fn):
try:
return str(fn(symbol, price))
except Exception:
pass
m = cfg.get("app_module")
pf = getattr(m, "format_price_for_symbol", None) if m else None
if callable(pf):
try:
return str(pf(symbol, price))
except Exception:
pass
try:
return str(round(float(price), 8))
except (TypeError, ValueError):
return str(price)
def _fmt_pnl(pnl: Any) -> str:
if pnl is None:
return ""
try:
v = float(pnl)
return f"{'+' if v > 0 else ''}{round(v, 2)} U"
except (TypeError, ValueError):
return str(pnl)
def notify_trend_plan_started(
cfg: dict[str, Any],
*,
plan_id: int,
symbol: str,
direction: str,
leverage: int,
stop_loss: float,
take_profit: float,
add_upper: float,
risk_percent: float,
dca_legs: int,
first_order_amount: float,
avg_entry: Optional[float] = None,
snapshot_usdt: Optional[float] = None,
) -> None:
sym = symbol or ""
lines = [
f"# 🚀 {sym} 趋势回调计划已开始",
f"**账户:{_account(cfg)}**",
f"- 计划 ID**{plan_id}**",
f"- 方向:{_dir_text(cfg, direction)}|杠杆 **{int(leverage or 1)}x**",
f"- 止损:{_fmt_price(cfg, sym, stop_loss)}|止盈:{_fmt_price(cfg, sym, take_profit)}",
f"- 补仓区:{_fmt_price(cfg, sym, add_upper)}|补仓档 **{int(dca_legs or 0)}** 档",
f"- 风险:**{risk_percent}%**|首仓张数:**{first_order_amount}**",
]
if avg_entry is not None:
lines.append(f"- 首仓成交价:{_fmt_price(cfg, sym, avg_entry)}")
if snapshot_usdt is not None:
try:
lines.append(f"- 启动时合约可用:**{round(float(snapshot_usdt), 2)} U**")
except (TypeError, ValueError):
pass
lines.append("- 说明:交易所已挂止损;止盈由程序监控;结束/保本将另行推送")
_send(cfg, "\n".join(lines))
def notify_trend_plan_ended(
cfg: dict[str, Any],
*,
plan_id: int,
symbol: str,
direction: str,
end_type: str,
result_label: Optional[str] = None,
exit_price: Optional[float] = None,
pnl_amount: Optional[float] = None,
extra: Optional[str] = None,
) -> None:
sym = symbol or ""
res = (result_label or end_type or "").strip()
lines = [
f"# 🏁 {sym} 趋势回调计划已结束",
f"**账户:{_account(cfg)}**",
f"- 计划 ID**{plan_id}**",
f"- 方向:{_dir_text(cfg, direction)}",
f"- 结束方式:**{end_type}**",
f"- 结果:**{res}**",
]
if exit_price is not None:
lines.append(f"- 离场参考价:{_fmt_price(cfg, sym, exit_price)}")
if pnl_amount is not None:
lines.append(f"- 本单盈亏:**{_fmt_pnl(pnl_amount)}**")
if extra:
lines.append(f"- {extra}")
_send(cfg, "\n".join(lines))
def notify_roll_group_started(
cfg: dict[str, Any],
*,
group_id: int,
symbol: str,
direction: str,
order_monitor_id: int,
initial_take_profit: Optional[float] = None,
initial_stop_loss: Optional[float] = None,
) -> None:
sym = symbol or ""
lines = [
f"# 🚀 {sym} 滚仓计划已开始",
f"**账户:{_account(cfg)}**",
f"- 滚仓组 ID**{group_id}**|绑定下单监控 **#{order_monitor_id}**",
f"- 方向:{_dir_text(cfg, direction)}",
f"- 首仓止盈(锁定):{_fmt_price(cfg, sym, initial_take_profit)}",
f"- 当前止损:{_fmt_price(cfg, sym, initial_stop_loss)}",
"- 说明:顺势加仓为人工触发;组结束(无持仓/监控结案)将另行推送",
]
_send(cfg, "\n".join(lines))
def notify_roll_group_ended(
cfg: dict[str, Any],
*,
group_id: int,
symbol: str,
direction: str,
reason: str,
leg_count: int = 0,
) -> None:
sym = symbol or ""
lines = [
f"# 🏁 {sym} 滚仓计划已结束",
f"**账户:{_account(cfg)}**",
f"- 滚仓组 ID**{group_id}**",
f"- 方向:{_dir_text(cfg, direction)}",
f"- 结束原因:**{reason}**",
f"- 已完成滚仓腿数:**{int(leg_count or 0)}**",
]
_send(cfg, "\n".join(lines))