import logging from typing import Any import httpx from .config import settings from .http_client import httpx_client_kwargs from .stats import compute_three_day_stats logger = logging.getLogger(__name__) def _format_period_label(period_start: str, period_end: str) -> str: start = period_start[:16].replace("T", " ") end = period_end[:16].replace("T", " ") return f"{start} ~ {end}" def _day_line(label: str, row: dict | None) -> str: if not row or row.get("rank") is None: return f"> {label}:—" pct = row.get("price_change_pct_fmt") or f"{row.get('price_change_pct', 0):+.2f}%" vol = row.get("quote_volume_fmt") or str(row.get("quote_volume", "")) fr = row.get("funding_rate_fmt") or "—" return f"> {label}:#{row['rank']} · 额 {vol} · 涨跌 {pct} · 费率 {fr}" def build_push_payload() -> dict[str, Any]: """构建企微推送内容:仅三日 Top30 交集,列表排版(非表格)。""" stats = compute_three_day_stats() periods = stats.get("periods") or {} y_meta = periods.get("yesterday") or {} period_label = "" if y_meta.get("ready"): period_label = _format_period_label( y_meta.get("period_start", ""), y_meta.get("period_end", ""), ) if not stats.get("ok"): md = "\n".join( [ "## 币安 U本位 · 三日Top30交集", "", f"> 昨日周期 {period_label or '—'}", "", f"**暂无法推送**:{stats.get('message', '数据未就绪')}", ] ) return { "ok": False, "message": stats.get("message", ""), "count": 0, "period_label": period_label, "markdown": md, "items": [], } items = stats.get("items") or [] lines = [ "## 币安 U本位 · 三日Top30交集", "", f"> **昨日周期**(北京时间 8:00 切日)", f"> {period_label}", f"> 连续三日成交额均为 Top{settings.top_n},共 **{len(items)}** 个", "", ] preview_items: list[dict] = [] for i, row in enumerate(items, 1): sym = row["symbol"] t, y, b = row.get("today"), row.get("yesterday"), row.get("daybefore") lines.append(f"### {i}. {sym}") lines.append(_day_line("昨日", y)) lines.append(_day_line("今日", t)) lines.append(_day_line("前日", b)) lines.append("") preview_items.append( { "rank": i, "symbol": sym, "today": t, "yesterday": y, "daybefore": b, "total_quote_volume": row.get("total_quote_volume"), } ) if not items: lines.append("**暂无交集币种**(请确认今日/昨日/前日快照均已生成)") lines.append("---") lines.append("> 说明:仅推送三日均为成交额 Top30 的合约;涨跌不限") return { "ok": True, "message": stats.get("criteria", ""), "count": len(items), "period_label": period_label, "markdown": "\n".join(lines), "items": preview_items, } def build_markdown(snapshot: dict | None = None) -> str: """兼容旧调用:返回企微 Markdown 文本(忽略 snapshot,以三日交集为准)。""" _ = snapshot return build_push_payload()["markdown"] async def send_wecom_markdown(content: str) -> tuple[bool, str]: url = settings.wecom_webhook_url.strip() if not url: return False, "WECOM_WEBHOOK_URL 未配置" payload = {"msgtype": "markdown", "markdown": {"content": content}} last_err = "" for attempt in range(3): try: async with httpx.AsyncClient( timeout=15.0, **httpx_client_kwargs("wecom") ) as client: resp = await client.post(url, json=payload) data = resp.json() if data.get("errcode") == 0: return True, "ok" last_err = str(data) except Exception as e: last_err = str(e) logger.warning("WeCom push attempt %d failed: %s", attempt + 1, e) return False, last_err