import logging from typing import Any import httpx from .config import settings from .http_client import httpx_client_kwargs from .stats import compute_three_day_stats logger = logging.getLogger(__name__) # 企业微信 markdown 单条上限 4096,预留余量 WECOM_MD_MAX = 3800 def _format_period_label(period_start: str, period_end: str) -> str: start = period_start[:16].replace("T", " ") end = period_end[:16].replace("T", " ") return f"{start} ~ {end}" def _day_seg(label: str, row: dict | None) -> str: if not row or row.get("rank") is None: return f"{label}—" pct = row.get("price_change_pct_fmt") or f"{row.get('price_change_pct', 0):+.1f}%" return f"{label}#{row['rank']}{pct}" def _coin_line(rank: int, row: dict) -> str: sym = row["symbol"] y, t, b = row.get("yesterday"), row.get("today"), row.get("daybefore") return ( f"**{rank}. {sym}** " f"{_day_seg('昨', y)} {_day_seg('今', t)} {_day_seg('前', b)}" ) def _build_header(period_label: str, count: int, part: int | None = None) -> list[str]: title = "## 币安 U本位 · 三日Top30交集" if part and part > 1: title += f"({part})" lines = [ title, "", f"> 昨日周期 {period_label}", f"> 共 **{count}** 个 · 昨/今/前 = 排名+涨跌幅", "", ] return lines def _split_messages( period_label: str, count: int, items: list[dict] ) -> list[str]: """按企微长度限制拆成多条 markdown。""" if not items: body = "\n".join(_build_header(period_label, 0) + ["**暂无交集币种**"]) return [body] messages: list[str] = [] part = 1 lines = _build_header(period_label, count, part) for i, row in enumerate(items, 1): coin = _coin_line(i, row) extra = len(coin) + 1 if len("\n".join(lines)) + extra > WECOM_MD_MAX and len(lines) > 5: messages.append("\n".join(lines).rstrip()) part += 1 lines = _build_header(period_label, count, part) lines.append(coin) if part == 1: lines.append("") lines.append("> 仅三日均为 Top30 交集,涨跌不限") messages.append("\n".join(lines).rstrip()) return messages def build_push_payload() -> dict[str, Any]: """构建企微推送:仅三日 Top30 交集,紧凑单行/币,超长自动分批。""" stats = compute_three_day_stats() periods = stats.get("periods") or {} y_meta = periods.get("yesterday") or {} period_label = "" if y_meta.get("ready"): period_label = _format_period_label( y_meta.get("period_start", ""), y_meta.get("period_end", ""), ) if not stats.get("ok"): md = "\n".join( [ "## 币安 U本位 · 三日Top30交集", "", f"> 昨日周期 {period_label or '—'}", "", f"**暂无法推送**:{stats.get('message', '数据未就绪')}", ] ) return { "ok": False, "message": stats.get("message", ""), "count": 0, "period_label": period_label, "markdown": md, "messages": [md], "parts": 1, "items": [], } items = stats.get("items") or [] messages = _split_messages(period_label, len(items), items) preview_items: list[dict] = [] for i, row in enumerate(items, 1): preview_items.append( { "rank": i, "symbol": row["symbol"], "today": row.get("today"), "yesterday": row.get("yesterday"), "daybefore": row.get("daybefore"), "total_quote_volume": row.get("total_quote_volume"), } ) return { "ok": True, "message": stats.get("criteria", ""), "count": len(items), "period_label": period_label, "markdown": messages[0] if messages else "", "messages": messages, "parts": len(messages), "items": preview_items, } def build_markdown(snapshot: dict | None = None) -> str: _ = snapshot payload = build_push_payload() return payload.get("markdown") or "" async def send_wecom_markdown(content: str) -> tuple[bool, str]: url = settings.wecom_webhook_url.strip() if not url: return False, "WECOM_WEBHOOK_URL 未配置" if len(content) > 4096: return False, f"内容过长({len(content)}字),请使用分批推送" payload = {"msgtype": "markdown", "markdown": {"content": content}} last_err = "" for attempt in range(3): try: async with httpx.AsyncClient( timeout=15.0, **httpx_client_kwargs("wecom") ) as client: resp = await client.post(url, json=payload) data = resp.json() if data.get("errcode") == 0: return True, "ok" last_err = str(data) except Exception as e: last_err = str(e) logger.warning("WeCom push attempt %d failed: %s", attempt + 1, e) return False, last_err async def send_push_payload(payload: dict) -> tuple[bool, str]: """发送推送,超长时按 messages 列表逐条发送。""" parts = payload.get("messages") or [payload.get("markdown", "")] if not parts or not parts[0]: return False, "无推送内容" for i, content in enumerate(parts, 1): ok, msg = await send_wecom_markdown(content) if not ok: suffix = f"(第 {i}/{len(parts)} 条)" if len(parts) > 1 else "" return False, f"{msg}{suffix}" if i < len(parts): logger.info("WeCom push part %d/%d sent", i, len(parts)) if len(parts) > 1: return True, f"已分 {len(parts)} 条发送" return True, "ok"