Files
crypto_monitor/wechat_notify_lib.py
T
2026-05-28 11:54:37 +08:00

118 lines
3.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""企业微信机器人 Webhook 推送(多实例共用)。"""
from __future__ import annotations
import re
from typing import Optional
import requests
def strip_markdown_for_text(content: str) -> str:
s = str(content or "")
s = re.sub(r"\*\*([^*]+)\*\*", r"\1", s)
s = re.sub(r"`([^`]+)`", r"\1", s)
s = re.sub(r"^#+\s*", "", s, flags=re.MULTILINE)
s = re.sub(r"^---\s*$", "", s, flags=re.MULTILINE)
return s.strip()
def looks_like_wechat_markdown(content: str) -> bool:
if not content:
return False
if re.search(r"^#+\s", content, re.MULTILINE):
return True
return "**" in content or "`" in content
def send_wechat_webhook(
webhook_url: str,
content: str,
*,
timeout: int = 10,
prefix: str = "【加密货币】",
) -> bool:
url = (webhook_url or "").strip()
if not url or "replace-me" in url:
return False
body = str(content or "").strip()
if prefix:
full = f"{prefix}\n{body}" if body else prefix
else:
full = body
if not full.strip():
return False
payloads = []
if looks_like_wechat_markdown(full):
payloads.append({"msgtype": "markdown", "markdown": {"content": full}})
plain = strip_markdown_for_text(full) if looks_like_wechat_markdown(full) else full
payloads.append({"msgtype": "text", "text": {"content": plain}})
seen = set()
for payload in payloads:
key = payload["msgtype"]
if key in seen:
continue
seen.add(key)
try:
resp = requests.post(url, json=payload, timeout=timeout)
if resp.status_code != 200:
continue
data = resp.json()
if int(data.get("errcode", -1)) == 0:
return True
except Exception:
continue
return False
def wechat_direction_label(direction: str) -> str:
d = (direction or "").strip().lower()
if d == "long":
return "多头(long"
if d == "short":
return "空头(short"
return "双向(watch"
def build_wechat_rs_level_message(
*,
symbol: str,
monitor_type: str,
account_label: str,
trigger_time: str,
upper_txt: str,
lower_txt: str,
close_txt: str,
edge_txt: str,
break_label: str,
direction: str,
notify_index: int,
notify_max: int,
interval_min: int,
extra_note: Optional[str] = None,
) -> str:
"""阻力/支撑突破提醒(与开平仓推送一致的 emoji 纯文本风格)。"""
head = "📈" if (direction or "").strip().lower() == "long" else "📉"
dir_txt = wechat_direction_label(direction)
lines = [
f"{head} {symbol} 关键位突破提醒({notify_index}/{notify_max}",
f"💼 账户:{account_label}",
"",
"🧾 突破概要",
f"📌 类型:{monitor_type}",
f"⏱ 触发时间:{trigger_time}",
f"📊 上沿:{upper_txt}|下沿:{lower_txt}",
f"💹 触发收盘:{close_txt}",
f"🎯 {break_label}{dir_txt}",
f"📍 突破价位:{edge_txt}",
"",
"📎 说明",
f"· 人工盯盘,共推送 {notify_max} 次(间隔约 {interval_min} 分钟)",
"· 推送完毕后本条监控自动结案",
"· 不参与自动开仓",
]
if extra_note:
lines.append(f"· {extra_note}")
return "\n".join(lines)