184 lines
6.0 KiB
Python
184 lines
6.0 KiB
Python
import logging
|
||
from typing import Any
|
||
|
||
import httpx
|
||
|
||
from .config import settings
|
||
from .http_client import httpx_client_kwargs
|
||
from .stats import compute_three_day_stats
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 企业微信 markdown 单条上限 4096,预留余量
|
||
WECOM_MD_MAX = 3800
|
||
|
||
|
||
def _format_period_label(period_start: str, period_end: str) -> str:
|
||
start = period_start[:16].replace("T", " ")
|
||
end = period_end[:16].replace("T", " ")
|
||
return f"{start} ~ {end}"
|
||
|
||
|
||
def _day_seg(label: str, row: dict | None) -> str:
|
||
if not row or row.get("rank") is None:
|
||
return f"{label}—"
|
||
pct = row.get("price_change_pct_fmt") or f"{row.get('price_change_pct', 0):+.1f}%"
|
||
return f"{label}#{row['rank']}{pct}"
|
||
|
||
|
||
def _coin_line(rank: int, row: dict) -> str:
|
||
sym = row["symbol"]
|
||
y, t, b = row.get("yesterday"), row.get("today"), row.get("daybefore")
|
||
return (
|
||
f"**{rank}. {sym}** "
|
||
f"{_day_seg('昨', y)} {_day_seg('今', t)} {_day_seg('前', b)}"
|
||
)
|
||
|
||
|
||
def _build_header(period_label: str, count: int, part: int | None = None) -> list[str]:
|
||
title = "## 币安 U本位 · 三日Top30交集"
|
||
if part and part > 1:
|
||
title += f"({part})"
|
||
lines = [
|
||
title,
|
||
"",
|
||
f"> 昨日周期 {period_label}",
|
||
f"> 共 **{count}** 个 · 昨/今/前 = 排名+涨跌幅",
|
||
"",
|
||
]
|
||
return lines
|
||
|
||
|
||
def _split_messages(
|
||
period_label: str, count: int, items: list[dict]
|
||
) -> list[str]:
|
||
"""按企微长度限制拆成多条 markdown。"""
|
||
if not items:
|
||
body = "\n".join(_build_header(period_label, 0) + ["**暂无交集币种**"])
|
||
return [body]
|
||
|
||
messages: list[str] = []
|
||
part = 1
|
||
lines = _build_header(period_label, count, part)
|
||
for i, row in enumerate(items, 1):
|
||
coin = _coin_line(i, row)
|
||
extra = len(coin) + 1
|
||
if len("\n".join(lines)) + extra > WECOM_MD_MAX and len(lines) > 5:
|
||
messages.append("\n".join(lines).rstrip())
|
||
part += 1
|
||
lines = _build_header(period_label, count, part)
|
||
lines.append(coin)
|
||
if part == 1:
|
||
lines.append("")
|
||
lines.append("> 仅三日均为 Top30 交集,涨跌不限")
|
||
messages.append("\n".join(lines).rstrip())
|
||
return messages
|
||
|
||
|
||
def build_push_payload() -> dict[str, Any]:
|
||
"""构建企微推送:仅三日 Top30 交集,紧凑单行/币,超长自动分批。"""
|
||
stats = compute_three_day_stats()
|
||
periods = stats.get("periods") or {}
|
||
y_meta = periods.get("yesterday") or {}
|
||
period_label = ""
|
||
if y_meta.get("ready"):
|
||
period_label = _format_period_label(
|
||
y_meta.get("period_start", ""),
|
||
y_meta.get("period_end", ""),
|
||
)
|
||
|
||
if not stats.get("ok"):
|
||
md = "\n".join(
|
||
[
|
||
"## 币安 U本位 · 三日Top30交集",
|
||
"",
|
||
f"> 昨日周期 {period_label or '—'}",
|
||
"",
|
||
f"**暂无法推送**:{stats.get('message', '数据未就绪')}",
|
||
]
|
||
)
|
||
return {
|
||
"ok": False,
|
||
"message": stats.get("message", ""),
|
||
"count": 0,
|
||
"period_label": period_label,
|
||
"markdown": md,
|
||
"messages": [md],
|
||
"parts": 1,
|
||
"items": [],
|
||
}
|
||
|
||
items = stats.get("items") or []
|
||
messages = _split_messages(period_label, len(items), items)
|
||
|
||
preview_items: list[dict] = []
|
||
for i, row in enumerate(items, 1):
|
||
preview_items.append(
|
||
{
|
||
"rank": i,
|
||
"symbol": row["symbol"],
|
||
"today": row.get("today"),
|
||
"yesterday": row.get("yesterday"),
|
||
"daybefore": row.get("daybefore"),
|
||
"total_quote_volume": row.get("total_quote_volume"),
|
||
}
|
||
)
|
||
|
||
return {
|
||
"ok": True,
|
||
"message": stats.get("criteria", ""),
|
||
"count": len(items),
|
||
"period_label": period_label,
|
||
"markdown": messages[0] if messages else "",
|
||
"messages": messages,
|
||
"parts": len(messages),
|
||
"items": preview_items,
|
||
}
|
||
|
||
|
||
def build_markdown(snapshot: dict | None = None) -> str:
|
||
_ = snapshot
|
||
payload = build_push_payload()
|
||
return payload.get("markdown") or ""
|
||
|
||
|
||
async def send_wecom_markdown(content: str) -> tuple[bool, str]:
|
||
url = settings.wecom_webhook_url.strip()
|
||
if not url:
|
||
return False, "WECOM_WEBHOOK_URL 未配置"
|
||
if len(content) > 4096:
|
||
return False, f"内容过长({len(content)}字),请使用分批推送"
|
||
payload = {"msgtype": "markdown", "markdown": {"content": content}}
|
||
last_err = ""
|
||
for attempt in range(3):
|
||
try:
|
||
async with httpx.AsyncClient(
|
||
timeout=15.0, **httpx_client_kwargs("wecom")
|
||
) as client:
|
||
resp = await client.post(url, json=payload)
|
||
data = resp.json()
|
||
if data.get("errcode") == 0:
|
||
return True, "ok"
|
||
last_err = str(data)
|
||
except Exception as e:
|
||
last_err = str(e)
|
||
logger.warning("WeCom push attempt %d failed: %s", attempt + 1, e)
|
||
return False, last_err
|
||
|
||
|
||
async def send_push_payload(payload: dict) -> tuple[bool, str]:
|
||
"""发送推送,超长时按 messages 列表逐条发送。"""
|
||
parts = payload.get("messages") or [payload.get("markdown", "")]
|
||
if not parts or not parts[0]:
|
||
return False, "无推送内容"
|
||
for i, content in enumerate(parts, 1):
|
||
ok, msg = await send_wecom_markdown(content)
|
||
if not ok:
|
||
suffix = f"(第 {i}/{len(parts)} 条)" if len(parts) > 1 else ""
|
||
return False, f"{msg}{suffix}"
|
||
if i < len(parts):
|
||
logger.info("WeCom push part %d/%d sent", i, len(parts))
|
||
if len(parts) > 1:
|
||
return True, f"已分 {len(parts)} 条发送"
|
||
return True, "ok"
|