Files
crypto_monitor/strategy_wechat_notify.py
T
dekun 52d97482f2 feat(strategy): WeChat notify on trend and roll plan start/end
Add shared strategy_wechat_notify helpers; hook trend execute/finalize and roll group open/close across four exchanges and Gate bot.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-04 13:35:21 +08:00

193 lines
5.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""策略计划(趋势回调 / 滚仓)开始与结束 — 企业微信推送(四所共用)。"""
from __future__ import annotations
from typing import Any, Optional
from wechat_notify_lib import wechat_direction_label
def _send(cfg: dict[str, Any], content: str) -> None:
fn = cfg.get("send_wechat")
if callable(fn):
try:
fn(content)
return
except Exception:
pass
m = cfg.get("app_module")
if m is not None:
sw = getattr(m, "send_wechat_msg", None)
if callable(sw):
try:
sw(content)
except Exception:
pass
def _account(cfg: dict[str, Any]) -> str:
fn = cfg.get("wechat_account_label")
if callable(fn):
try:
return str(fn()).strip() or _exchange(cfg)
except Exception:
pass
return _exchange(cfg)
def _exchange(cfg: dict[str, Any]) -> str:
return str(cfg.get("exchange_display") or "").strip() or "交易账户"
def _dir_text(cfg: dict[str, Any], direction: str) -> str:
fn = cfg.get("wechat_direction_text")
if callable(fn):
try:
return str(fn(direction))
except Exception:
pass
return wechat_direction_label(direction)
def _fmt_price(cfg: dict[str, Any], symbol: str, price: Any) -> str:
if price is None or price == "":
return ""
fn = cfg.get("format_price") or cfg.get("price_fmt")
if callable(fn):
try:
return str(fn(symbol, price))
except Exception:
pass
m = cfg.get("app_module")
pf = getattr(m, "format_price_for_symbol", None) if m else None
if callable(pf):
try:
return str(pf(symbol, price))
except Exception:
pass
try:
return str(round(float(price), 8))
except (TypeError, ValueError):
return str(price)
def _fmt_pnl(pnl: Any) -> str:
if pnl is None:
return ""
try:
v = float(pnl)
return f"{'+' if v > 0 else ''}{round(v, 2)} U"
except (TypeError, ValueError):
return str(pnl)
def notify_trend_plan_started(
cfg: dict[str, Any],
*,
plan_id: int,
symbol: str,
direction: str,
leverage: int,
stop_loss: float,
take_profit: float,
add_upper: float,
risk_percent: float,
dca_legs: int,
first_order_amount: float,
avg_entry: Optional[float] = None,
snapshot_usdt: Optional[float] = None,
) -> None:
sym = symbol or ""
lines = [
f"# 🚀 {sym} 趋势回调计划已开始",
f"**账户:{_account(cfg)}**",
f"- 计划 ID**{plan_id}**",
f"- 方向:{_dir_text(cfg, direction)}|杠杆 **{int(leverage or 1)}x**",
f"- 止损:{_fmt_price(cfg, sym, stop_loss)}|止盈:{_fmt_price(cfg, sym, take_profit)}",
f"- 补仓区:{_fmt_price(cfg, sym, add_upper)}|补仓档 **{int(dca_legs or 0)}** 档",
f"- 风险:**{risk_percent}%**|首仓张数:**{first_order_amount}**",
]
if avg_entry is not None:
lines.append(f"- 首仓成交价:{_fmt_price(cfg, sym, avg_entry)}")
if snapshot_usdt is not None:
try:
lines.append(f"- 启动时合约可用:**{round(float(snapshot_usdt), 2)} U**")
except (TypeError, ValueError):
pass
lines.append("- 说明:交易所已挂止损;止盈由程序监控;结束/保本将另行推送")
_send(cfg, "\n".join(lines))
def notify_trend_plan_ended(
cfg: dict[str, Any],
*,
plan_id: int,
symbol: str,
direction: str,
end_type: str,
result_label: Optional[str] = None,
exit_price: Optional[float] = None,
pnl_amount: Optional[float] = None,
extra: Optional[str] = None,
) -> None:
sym = symbol or ""
res = (result_label or end_type or "").strip()
lines = [
f"# 🏁 {sym} 趋势回调计划已结束",
f"**账户:{_account(cfg)}**",
f"- 计划 ID**{plan_id}**",
f"- 方向:{_dir_text(cfg, direction)}",
f"- 结束方式:**{end_type}**",
f"- 结果:**{res}**",
]
if exit_price is not None:
lines.append(f"- 离场参考价:{_fmt_price(cfg, sym, exit_price)}")
if pnl_amount is not None:
lines.append(f"- 本单盈亏:**{_fmt_pnl(pnl_amount)}**")
if extra:
lines.append(f"- {extra}")
_send(cfg, "\n".join(lines))
def notify_roll_group_started(
cfg: dict[str, Any],
*,
group_id: int,
symbol: str,
direction: str,
order_monitor_id: int,
initial_take_profit: Optional[float] = None,
initial_stop_loss: Optional[float] = None,
) -> None:
sym = symbol or ""
lines = [
f"# 🚀 {sym} 滚仓计划已开始",
f"**账户:{_account(cfg)}**",
f"- 滚仓组 ID**{group_id}**|绑定下单监控 **#{order_monitor_id}**",
f"- 方向:{_dir_text(cfg, direction)}",
f"- 首仓止盈(锁定):{_fmt_price(cfg, sym, initial_take_profit)}",
f"- 当前止损:{_fmt_price(cfg, sym, initial_stop_loss)}",
"- 说明:顺势加仓为人工触发;组结束(无持仓/监控结案)将另行推送",
]
_send(cfg, "\n".join(lines))
def notify_roll_group_ended(
cfg: dict[str, Any],
*,
group_id: int,
symbol: str,
direction: str,
reason: str,
leg_count: int = 0,
) -> None:
sym = symbol or ""
lines = [
f"# 🏁 {sym} 滚仓计划已结束",
f"**账户:{_account(cfg)}**",
f"- 滚仓组 ID**{group_id}**",
f"- 方向:{_dir_text(cfg, direction)}",
f"- 结束原因:**{reason}**",
f"- 已完成滚仓腿数:**{int(leg_count or 0)}**",
]
_send(cfg, "\n".join(lines))