Files
2026-05-26 10:35:52 +08:00

184 lines
6.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import logging
from typing import Any
import httpx
from .config import settings
from .http_client import httpx_client_kwargs
from .stats import compute_three_day_stats
logger = logging.getLogger(__name__)
# 企业微信 markdown 单条上限 4096,预留余量
WECOM_MD_MAX = 3800
def _format_period_label(period_start: str, period_end: str) -> str:
start = period_start[:16].replace("T", " ")
end = period_end[:16].replace("T", " ")
return f"{start} ~ {end}"
def _day_seg(label: str, row: dict | None) -> str:
if not row or row.get("rank") is None:
return f"{label}"
pct = row.get("price_change_pct_fmt") or f"{row.get('price_change_pct', 0):+.1f}%"
return f"{label}#{row['rank']}{pct}"
def _coin_line(rank: int, row: dict) -> str:
sym = row["symbol"]
y, t, b = row.get("yesterday"), row.get("today"), row.get("daybefore")
return (
f"**{rank}. {sym}** "
f"{_day_seg('', y)} {_day_seg('', t)} {_day_seg('', b)}"
)
def _build_header(period_label: str, count: int, part: int | None = None) -> list[str]:
title = "## 币安 U本位 · 三日Top30交集"
if part and part > 1:
title += f"{part}"
lines = [
title,
"",
f"> 昨日周期 {period_label}",
f"> 共 **{count}** 个 · 昨/今/前 = 排名+涨跌幅",
"",
]
return lines
def _split_messages(
period_label: str, count: int, items: list[dict]
) -> list[str]:
"""按企微长度限制拆成多条 markdown。"""
if not items:
body = "\n".join(_build_header(period_label, 0) + ["**暂无交集币种**"])
return [body]
messages: list[str] = []
part = 1
lines = _build_header(period_label, count, part)
for i, row in enumerate(items, 1):
coin = _coin_line(i, row)
extra = len(coin) + 1
if len("\n".join(lines)) + extra > WECOM_MD_MAX and len(lines) > 5:
messages.append("\n".join(lines).rstrip())
part += 1
lines = _build_header(period_label, count, part)
lines.append(coin)
if part == 1:
lines.append("")
lines.append("> 仅三日均为 Top30 交集,涨跌不限")
messages.append("\n".join(lines).rstrip())
return messages
def build_push_payload() -> dict[str, Any]:
"""构建企微推送:仅三日 Top30 交集,紧凑单行/币,超长自动分批。"""
stats = compute_three_day_stats()
periods = stats.get("periods") or {}
y_meta = periods.get("yesterday") or {}
period_label = ""
if y_meta.get("ready"):
period_label = _format_period_label(
y_meta.get("period_start", ""),
y_meta.get("period_end", ""),
)
if not stats.get("ok"):
md = "\n".join(
[
"## 币安 U本位 · 三日Top30交集",
"",
f"> 昨日周期 {period_label or ''}",
"",
f"**暂无法推送**{stats.get('message', '数据未就绪')}",
]
)
return {
"ok": False,
"message": stats.get("message", ""),
"count": 0,
"period_label": period_label,
"markdown": md,
"messages": [md],
"parts": 1,
"items": [],
}
items = stats.get("items") or []
messages = _split_messages(period_label, len(items), items)
preview_items: list[dict] = []
for i, row in enumerate(items, 1):
preview_items.append(
{
"rank": i,
"symbol": row["symbol"],
"today": row.get("today"),
"yesterday": row.get("yesterday"),
"daybefore": row.get("daybefore"),
"total_quote_volume": row.get("total_quote_volume"),
}
)
return {
"ok": True,
"message": stats.get("criteria", ""),
"count": len(items),
"period_label": period_label,
"markdown": messages[0] if messages else "",
"messages": messages,
"parts": len(messages),
"items": preview_items,
}
def build_markdown(snapshot: dict | None = None) -> str:
_ = snapshot
payload = build_push_payload()
return payload.get("markdown") or ""
async def send_wecom_markdown(content: str) -> tuple[bool, str]:
url = settings.wecom_webhook_url.strip()
if not url:
return False, "WECOM_WEBHOOK_URL 未配置"
if len(content) > 4096:
return False, f"内容过长({len(content)}字),请使用分批推送"
payload = {"msgtype": "markdown", "markdown": {"content": content}}
last_err = ""
for attempt in range(3):
try:
async with httpx.AsyncClient(
timeout=15.0, **httpx_client_kwargs("wecom")
) as client:
resp = await client.post(url, json=payload)
data = resp.json()
if data.get("errcode") == 0:
return True, "ok"
last_err = str(data)
except Exception as e:
last_err = str(e)
logger.warning("WeCom push attempt %d failed: %s", attempt + 1, e)
return False, last_err
async def send_push_payload(payload: dict) -> tuple[bool, str]:
"""发送推送,超长时按 messages 列表逐条发送。"""
parts = payload.get("messages") or [payload.get("markdown", "")]
if not parts or not parts[0]:
return False, "无推送内容"
for i, content in enumerate(parts, 1):
ok, msg = await send_wecom_markdown(content)
if not ok:
suffix = f"(第 {i}/{len(parts)} 条)" if len(parts) > 1 else ""
return False, f"{msg}{suffix}"
if i < len(parts):
logger.info("WeCom push part %d/%d sent", i, len(parts))
if len(parts) > 1:
return True, f"已分 {len(parts)} 条发送"
return True, "ok"