Files
2026-05-22 22:15:46 +08:00

211 lines
9.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import httpx
from .config import WeComConfig
from .proxy_util import httpx_proxy_url
from .time_cn import format_beijing_wall, utc_now
class WeComNotifier:
def __init__(self, conf: WeComConfig, proxy_url: str | None = None) -> None:
self.conf = conf
self._proxy = httpx_proxy_url(proxy_url.strip() if proxy_url and str(proxy_url).strip() else None)
self.timeout = httpx.Timeout(8.0, read=10.0)
def _client_kwargs(self) -> dict:
if self._proxy:
return {"timeout": self.timeout, "proxy": self._proxy, "trust_env": False}
return {"timeout": self.timeout, "trust_env": True}
async def send_breakout_alert(
self,
symbol: str,
bar: str,
inst_id: str,
trigger_types: list[str],
metrics: dict,
) -> None:
sym_u = symbol.strip().upper()
pair_line = f"{sym_u}-USDT 永续"
bar_cn = "5分钟" if bar == "5m" else f"{bar}"
range_h = float(metrics.get("range_hours") or 8)
range_pct = float(metrics.get("range_pct") or 0)
vol_ratio = float(metrics.get("volume_ratio") or 0)
range_high = float(metrics.get("range_high") or 0.0)
range_low = float(metrics.get("range_low") or 0.0)
confirm_close = float(metrics.get("confirm_close") or metrics.get("last_close") or 0.0)
breakout_high = float(metrics.get("breakout_high") or 0.0)
breakout_low = float(metrics.get("breakout_low") or 0.0)
est_vol = float(metrics.get("est_quote_vol_24h_usdt") or 0.0)
est_vol_rank = int(metrics.get("est_quote_vol_rank") or 0)
est_vol_rank_total = int(metrics.get("est_quote_vol_rank_total") or 0)
btc_env_8h_15m = str(metrics.get("btc_env_8h_15m") or metrics.get("btc_8h_status") or "横盘")
symbol_4h_status = str(metrics.get("symbol_4h_status") or "横盘")
def _px(x: float) -> str:
s = f"{x:.8f}".rstrip("0").rstrip(".")
return s or "0"
signal_side = str(metrics.get("signal_side") or "NONE")
signal_cn = "多头突破" if signal_side == "LONG" else ("空头破位" if signal_side == "SHORT" else "方向未定")
dir_line = "做多突破" if signal_side == "LONG" else ("做空破位" if signal_side == "SHORT" else "方向未定")
move_line = "放量上破" if signal_side == "LONG" else ("放量下破" if signal_side == "SHORT" else "等待确认")
state_line = f"{range_h:g}小时横盘箱体 {move_line}"
vol24_line = f"{est_vol:,.0f} USDT" if est_vol > 0 else "未知"
rank_line = (
f"#{est_vol_rank} / {est_vol_rank_total}"
if est_vol_rank > 0 and est_vol_rank_total > 0
else "未知"
)
key_ref = _px(range_low if signal_side == "LONG" else range_high)
stop_pct = float(metrics.get("stop_buffer_pct") or 0.2)
stop_pct = max(0.0, min(stop_pct, 10.0))
long_m = 1.0 - stop_pct / 100.0
short_m = 1.0 + stop_pct / 100.0
if abs(stop_pct - round(stop_pct)) < 1e-9:
stop_pct_label = str(int(round(stop_pct)))
else:
stop_pct_label = f"{stop_pct:.4f}".rstrip("0").rstrip(".") or "0"
box_size = (range_high - range_low)
stop_sl = _px(breakout_low * long_m if signal_side == "LONG" else breakout_high * short_m)
tp_sl = _px(confirm_close + box_size if signal_side == "LONG" else confirm_close - box_size)
t_cn = format_beijing_wall(utc_now())
content = (
"🚨 Gate 扫描突破参考(自动箱体)\n"
"━━━━━━━━━━━━━━\n"
f"🔹 交易对:{pair_line}\n"
f"⏱️ K线周期:{bar_cn}\n"
f"📊 行情状态:{state_line}\n"
f"🧭 信号方向:{dir_line}\n"
"✅ 确认条件:\n"
f" 1. 震荡幅度:{range_pct:.2f}%\n"
f" 2. 成交量放大:{vol_ratio:.2f}\n"
f" 3. BTC 近8小时(15m){btc_env_8h_15m}(横盘多空均可推送;涨→仅LONG;跌→仅SHORT)\n"
f" 4. 日成交量:{vol24_line}\n"
f" 5. 当日成交量排名:{rank_line}\n"
f" 6. 本币种4h状态:{symbol_4h_status}(仅同向推送)\n"
"📌 关键价位:\n"
f" {'箱体下沿' if signal_side == 'LONG' else '箱体上沿'}{key_ref}\n"
f" 确认K收盘价:{_px(confirm_close)}\n"
"💡 参考计划(非关键位录入):\n"
f" · 止盈:{signal_cn} 箱体1.0倍({tp_sl}\n"
f" · 止损:突破K极值外 {stop_pct_label}%{stop_sl}\n"
" · 正式下单请以「关键位突破监控」录入上下沿后的方案为准。\n"
f"⏰ 触发时间:{t_cn}(北京时间 UTC+8"
)
payload = {
"msgtype": "text",
"text": {
"content": content,
"mentioned_mobile_list": self.conf.mentioned_mobile_list,
},
}
async with httpx.AsyncClient(**self._client_kwargs()) as client:
resp = await client.post(self.conf.webhook, json=payload)
resp.raise_for_status()
async def send_text(self, content: str) -> None:
payload = {
"msgtype": "text",
"text": {
"content": content,
"mentioned_mobile_list": self.conf.mentioned_mobile_list,
},
}
async with httpx.AsyncClient(**self._client_kwargs()) as client:
resp = await client.post(self.conf.webhook, json=payload)
resp.raise_for_status()
async def send_funnel_priority(
self,
symbol: str,
inst_id: str,
composite_score: float,
gemma: dict,
programmatic: dict,
) -> None:
sym_u = symbol.strip().upper()
pair_line = f"{sym_u}-USDT 永续"
pri = gemma.get("priority", "?")
one = str(gemma.get("one_liner", "") or "").strip()
t_cn = format_beijing_wall(utc_now())
def _pg(key: str, default: str = "") -> str:
v = programmatic.get(key)
if v is None:
return default
if isinstance(v, bool):
return "" if v else ""
if isinstance(v, (int, float)):
return f"{float(v):.6f}".rstrip("0").rstrip(".") or "0"
return str(v)
vol24 = programmatic.get("est_quote_vol_24h_usdt")
vol24_s = f"{float(vol24):,.0f}" if isinstance(vol24, (int, float)) else str(vol24 or "")
prog_lines = [
f" · 现价:{_pg('last_close')}",
f" · 24h 估算成交额 USDT{vol24_s}",
f" · 60日区间高 / 低:{_pg('range_60d_high')} / {_pg('range_60d_low')}",
f" · 区间振幅%(回看):{_pg('range_pct_lookback')}",
f" · 距区间上沿空间%{_pg('upside_to_range_high_pct')}",
f" · 结构提示:{_pg('structure_hint')}",
f" · SMA20{_pg('sma20')}",
]
content = (
"🎯 MATRIX · 漏斗优先推送\n"
"━━━━━━━━━━━━━━\n"
f"🔹 交易对:{pair_line}\n"
f"🔗 合约 ID{inst_id}\n"
f"📈 合成评分:{composite_score:.2f}\n"
"🧩 Gemma 分项:\n"
f" 优先级 P{pri}|结构 {gemma.get('daily_structure', '?')}|量 {gemma.get('volume_view', '?')}"
f"上方 {gemma.get('upside_space', '?')}|中间阻力 {gemma.get('mid_resistance', '?')}\n"
"💬 一句话:\n"
f" {one}\n"
"📌 程序化摘录:\n"
+ "\n".join(prog_lines)
+ "\n"
"💡 操作提示:\n"
"仅结构信号,严格执行交易纪律+仓位管理\n"
f"⏰ 触发时间:{t_cn}(北京时间 UTC+8"
)
payload = {
"msgtype": "text",
"text": {
"content": content,
"mentioned_mobile_list": self.conf.mentioned_mobile_list,
},
}
async with httpx.AsyncClient(**self._client_kwargs()) as client:
resp = await client.post(self.conf.webhook, json=payload)
resp.raise_for_status()
async def send_daily_report(self, report: dict) -> None:
text = report.get("text") or {}
btc = report.get("btc") or {}
stats = report.get("stats") or {}
risk_lines = text.get("risk_points") or []
risk_block = "\n".join([f" - {str(x)}" for x in risk_lines[:3]]) if risk_lines else " - 暂无"
content = (
"🗞️ MATRIX 每日晨报\n"
"━━━━━━━━━━━━━━\n"
f"📅 复盘日期:{report.get('report_day_cn', '')}\n"
f"🤖 AI 生成:{'' if report.get('ai_used') else '否(规则回退)'}\n"
f"📈 BTC 方向:{btc.get('direction', '')} | 日涨跌 {btc.get('day_change_pct', '')}%\n"
f"🧭 方向说明:{text.get('btc_explain', '')}\n"
f"📊 昨日统计:WATCH {stats.get('watch_count', 0)} / TRIGGER {stats.get('trigger_count', 0)} / 漏斗优先 {stats.get('funnel_push_count', 0)}\n"
f"📝 总结:{text.get('summary', '')}\n"
f"⚠️ 风险点:\n{risk_block}\n"
f"🎯 执行提示:{text.get('action_hint', '')}\n"
f"⏰ 生成时间:{report.get('generated_at_cn', format_beijing_wall(utc_now()))}(北京时间 UTC+8"
)
payload = {
"msgtype": "text",
"text": {"content": content, "mentioned_mobile_list": self.conf.mentioned_mobile_list},
}
async with httpx.AsyncClient(**self._client_kwargs()) as client:
resp = await client.post(self.conf.webhook, json=payload)
resp.raise_for_status()