From ee621976dbd5020eaae5b0ba1f4fd86686b6b4e3 Mon Sep 17 00:00:00 2001 From: dekun Date: Fri, 22 May 2026 13:30:50 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=8E=92=E5=BA=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env.example | 2 + DEPLOY.md | 2 +- backend/app/aggregator.py | 81 ++++++++++++++++++++------ backend/app/binance.py | 118 +++++++++++++++++++++++++++----------- backend/app/config.py | 3 + backend/app/exceptions.py | 7 +++ backend/app/main.py | 10 ++-- backend/app/scheduler.py | 105 ++++++++++++++++++++++++++------- 8 files changed, 251 insertions(+), 77 deletions(-) create mode 100644 backend/app/exceptions.py diff --git a/.env.example b/.env.example index d5c58a9..b7e367e 100644 --- a/.env.example +++ b/.env.example @@ -18,3 +18,5 @@ MAX_CONCURRENCY=3 REQUEST_INTERVAL_SEC=0.15 BAN_COOLDOWN_SEC=90 CANDIDATE_POOL=150 +TODAY_DATA_MODE=ticker24h +YESTERDAY_DATA_MODE=klines diff --git a/DEPLOY.md b/DEPLOY.md index c3b6564..1ca4ce1 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -286,7 +286,7 @@ git pull | `cannot pull with rebase: unstaged changes` | 执行 `git stash` 后重试;或 `DEPLOY_SKIP_GIT_PULL=1 ./deploy/pm2-deploy.sh` 跳过拉取 | | `No module named pip` | 执行 `sudo apt install -y python3-venv` 后重新 `./deploy/pm2-deploy.sh`(脚本会用 .venv) | | Web 无数据 | 检查能否访问币安;国内服务器尝试 `PROXY_ENABLED=true` | -| 大量 `418 I'm a teapot` | 币安 IP 限流封禁;保持 `MAX_CONCURRENCY=3`,等待 2 分钟后 `pm2 restart`;或开代理 | +| 大量 `418 I'm a teapot` | IP 被封禁;**不要反复 restart**(会加长封禁)。等待日志中的 cooldown 秒数(如 734s)后再启动;今日刷新已改为 `TODAY_DATA_MODE=ticker24h`(仅 1 次 API) | | 企微收不到 | 检查 `WECOM_WEBHOOK_URL`;`curl -X POST .../api/push/test` | | 08:10 未推送 | 确认容器/PM2 在 08:10 前已运行;查日志 | | 端口占用 | `ss -tlnp \| grep 21450` 或改 `.env` 中 `PORT` | diff --git a/backend/app/aggregator.py b/backend/app/aggregator.py index 4e6c520..3a90fec 100644 --- a/backend/app/aggregator.py +++ b/backend/app/aggregator.py @@ -5,6 +5,7 @@ from datetime import datetime from .binance import binance_client from .config import settings +from .exceptions import BinanceRateLimitedError from .periods import to_ms logger = logging.getLogger(__name__) @@ -20,6 +21,7 @@ class SymbolStats: rank: int = 0 is_high_volume: bool = False is_high_change: bool = False + data_source: str = "klines" def to_dict(self) -> dict: d = asdict(self) @@ -36,6 +38,16 @@ def format_volume(vol: float) -> str: return f"{vol:.0f}" +def _finalize_top(stats: list[SymbolStats]) -> list[dict]: + stats.sort(key=lambda x: x.quote_volume, reverse=True) + top = stats[: settings.top_n] + for i, s in enumerate(top, 1): + s.rank = i + s.is_high_volume = s.quote_volume >= settings.volume_threshold + s.is_high_change = abs(s.price_change_pct) >= settings.change_threshold + return [s.to_dict() for s in top] + + def _aggregate_klines(klines: list, start_ms: int, end_ms: int) -> tuple[float, float, float]: quote_vol = 0.0 open_price = 0.0 @@ -53,6 +65,31 @@ def _aggregate_klines(klines: list, start_ms: int, end_ms: int) -> tuple[float, return quote_vol, open_price, last_price +async def aggregate_from_ticker24hr() -> list[dict]: + """仅 1 次 API 请求,使用滚动 24h 数据(今日刷新推荐)。""" + tickers = await binance_client.get_24hr_tickers() + stats: list[SymbolStats] = [] + for t in tickers: + sym = t.get("symbol", "") + if not sym.endswith("USDT"): + continue + vol = float(t.get("quoteVolume", 0) or 0) + if vol <= 0: + continue + stats.append( + SymbolStats( + symbol=sym, + quote_volume=vol, + price_change_pct=float(t.get("priceChangePercent", 0) or 0), + open_price=float(t.get("openPrice", 0) or 0), + last_price=float(t.get("lastPrice", 0) or 0), + data_source="ticker24h", + ) + ) + logger.info("ticker24h mode: %d symbols, 1 API call", len(stats)) + return _finalize_top(stats) + + async def _fetch_symbol_stats( symbol: str, start_ms: int, @@ -79,14 +116,16 @@ async def _fetch_symbol_stats( price_change_pct=pct, open_price=open_price, last_price=last_price, + data_source="klines", ) + except BinanceRateLimitedError: + raise except Exception as e: logger.warning("Failed %s: %s", symbol, e) return None async def _pick_candidate_symbols(symbols: list[str]) -> list[str]: - """用 24h ticker 成交额预筛,避免对全市场并发拉 K 线触发 418 封禁。""" try: tickers = await binance_client.get_24hr_tickers() vol_map = { @@ -97,18 +136,16 @@ async def _pick_candidate_symbols(symbols: list[str]) -> list[str]: ranked = sorted(symbols, key=lambda s: vol_map.get(s, 0.0), reverse=True) pool = min(settings.candidate_pool, len(ranked)) picked = ranked[:pool] - logger.info( - "Candidate pool: %d / %d symbols (by 24h quoteVolume)", - len(picked), - len(symbols), - ) + logger.info("Candidate pool: %d / %d symbols", len(picked), len(symbols)) return picked + except BinanceRateLimitedError: + raise except Exception as e: - logger.warning("24hr ticker prescreen failed, using full list: %s", e) - return symbols + logger.warning("24hr prescreen failed: %s", e) + return symbols[: settings.candidate_pool] -async def aggregate_period( +async def aggregate_period_klines( start: datetime, end: datetime, use_live_prices: bool = False, @@ -122,6 +159,8 @@ async def aggregate_period( if use_live_prices: try: prices = await binance_client.get_prices_batch(symbols) + except BinanceRateLimitedError: + raise except Exception as e: logger.warning("Batch prices failed: %s", e) @@ -130,7 +169,7 @@ async def aggregate_period( _fetch_symbol_stats(s, start_ms, end_ms, prices, sem) for s in candidates ] logger.info( - "Aggregating period %s ~ %s (%d symbols, concurrency=%d)", + "klines mode: %s ~ %s, %d symbols, concurrency=%d", start.isoformat(), end.isoformat(), len(candidates), @@ -138,21 +177,28 @@ async def aggregate_period( ) results = await asyncio.gather(*tasks) stats = [r for r in results if r is not None and r.quote_volume > 0] - stats.sort(key=lambda x: x.quote_volume, reverse=True) - top = stats[: settings.top_n] + return _finalize_top(stats) - for i, s in enumerate(top, 1): - s.rank = i - s.is_high_volume = s.quote_volume >= settings.volume_threshold - s.is_high_change = abs(s.price_change_pct) >= settings.change_threshold - return [s.to_dict() for s in top] +async def aggregate_period( + start: datetime, + end: datetime, + use_live_prices: bool = False, + mode: str | None = None, +) -> list[dict]: + mode = mode or ( + settings.today_data_mode if use_live_prices else settings.yesterday_data_mode + ) + if mode == "ticker24h": + return await aggregate_from_ticker24hr() + return await aggregate_period_klines(start, end, use_live_prices) def enrich_snapshot_meta( items: list[dict], period_start: datetime, period_end: datetime, + data_mode: str = "", ) -> dict: return { "period_start": period_start.isoformat(), @@ -161,5 +207,6 @@ def enrich_snapshot_meta( "top_n": settings.top_n, "volume_threshold": settings.volume_threshold, "change_threshold": settings.change_threshold, + "data_mode": data_mode, "items": items, } diff --git a/backend/app/binance.py b/backend/app/binance.py index 20416c2..4071929 100644 --- a/backend/app/binance.py +++ b/backend/app/binance.py @@ -1,17 +1,20 @@ import asyncio +import json import logging import time +from pathlib import Path from typing import Any import httpx -from .config import settings +from .config import ROOT_DIR, settings +from .exceptions import BinanceRateLimitedError from .http_client import httpx_client_kwargs logger = logging.getLogger(__name__) -# 418 = IP 被币安临时封禁(请求过快);429 = 触发频率限制 _RATE_LIMIT_CODES = {418, 429} +_SYMBOLS_CACHE_FILE = ROOT_DIR / "data" / "symbols_cache.json" class BinanceFuturesClient: @@ -23,6 +26,16 @@ class BinanceFuturesClient: self._last_request_at: float = 0.0 self._ban_until: float = 0.0 + def is_rate_limited(self) -> bool: + return time.monotonic() < self._ban_until + + def rate_limit_remaining_sec(self) -> int: + return max(0, int(self._ban_until - time.monotonic())) + + def _set_ban(self, retry_after: int) -> None: + retry_after = max(retry_after, settings.ban_cooldown_sec) + self._ban_until = max(self._ban_until, time.monotonic() + retry_after) + async def _ensure_client(self) -> httpx.AsyncClient: if self._client is None or self._client.is_closed: limits = httpx.Limits( @@ -42,12 +55,15 @@ class BinanceFuturesClient: self._client = None async def _throttle(self) -> None: + if self.is_rate_limited(): + remaining = self.rate_limit_remaining_sec() + raise BinanceRateLimitedError(remaining, "throttle") async with self._throttle_lock: now = time.monotonic() if now < self._ban_until: - wait = self._ban_until - now - logger.warning("Binance IP cooldown, sleeping %.0fs", wait) - await asyncio.sleep(wait) + raise BinanceRateLimitedError( + int(self._ban_until - now), "throttle" + ) gap = settings.request_interval_sec - (now - self._last_request_at) if gap > 0: await asyncio.sleep(gap) @@ -58,7 +74,11 @@ class BinanceFuturesClient: last_err: Exception | None = None for attempt in range(1, settings.max_retries + 1): - await self._throttle() + try: + await self._throttle() + except BinanceRateLimitedError: + raise + try: client = await self._ensure_client() resp = await client.get(url, params=params or {}) @@ -66,58 +86,88 @@ class BinanceFuturesClient: retry_after = int( resp.headers.get("Retry-After", settings.ban_cooldown_sec) ) - retry_after = max(retry_after, settings.ban_cooldown_sec) - self._ban_until = time.monotonic() + retry_after - logger.warning( - "Binance HTTP %s on %s, cooldown %ss (attempt %d/%d)", + self._set_ban(retry_after) + logger.error( + "Binance HTTP %s on %s — IP 封禁约 %ss,停止重试", resp.status_code, path, retry_after, - attempt, - settings.max_retries, ) - last_err = httpx.HTTPStatusError( - f"{resp.status_code}", - request=resp.request, - response=resp, - ) - await asyncio.sleep(retry_after) - continue + raise BinanceRateLimitedError(retry_after, path) resp.raise_for_status() return resp.json() + except BinanceRateLimitedError: + raise except httpx.HTTPStatusError as e: last_err = e if e.response.status_code in _RATE_LIMIT_CODES: - continue + raise raise except (httpx.ConnectError, httpx.ReadTimeout) as e: last_err = e - logger.warning("Binance request error %s (attempt %d)", path, attempt) + logger.warning("Binance network error %s (attempt %d)", path, attempt) await asyncio.sleep(min(2 * attempt, 10)) raise last_err or RuntimeError(f"Binance request failed: {path}") + def _load_symbols_file(self) -> list[str] | None: + try: + if _SYMBOLS_CACHE_FILE.exists(): + data = json.loads(_SYMBOLS_CACHE_FILE.read_text(encoding="utf-8")) + if isinstance(data, list) and data: + return sorted(data) + except Exception as e: + logger.warning("Load symbols cache file failed: %s", e) + return None + + def _save_symbols_file(self, symbols: list[str]) -> None: + try: + _SYMBOLS_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True) + _SYMBOLS_CACHE_FILE.write_text( + json.dumps(symbols, ensure_ascii=False), + encoding="utf-8", + ) + except Exception as e: + logger.warning("Save symbols cache file failed: %s", e) + async def get_usdt_perpetual_symbols(self) -> list[str]: if self._symbols_cache: return self._symbols_cache - info = await self._get("/fapi/v1/exchangeInfo") - symbols = [] - for s in info.get("symbols", []): - if ( - s.get("contractType") == "PERPETUAL" - and s.get("quoteAsset") == "USDT" - and s.get("status") == "TRADING" - ): - symbols.append(s["symbol"]) - self._symbols_cache = sorted(symbols) - logger.info("Loaded %d USDT perpetual symbols", len(self._symbols_cache)) - return self._symbols_cache + + if self.is_rate_limited(): + cached = self._load_symbols_file() + if cached: + self._symbols_cache = cached + logger.info("Using cached symbols file (%d)", len(cached)) + return cached + raise BinanceRateLimitedError(self.rate_limit_remaining_sec(), "symbols") + + try: + info = await self._get("/fapi/v1/exchangeInfo") + symbols = [] + for s in info.get("symbols", []): + if ( + s.get("contractType") == "PERPETUAL" + and s.get("quoteAsset") == "USDT" + and s.get("status") == "TRADING" + ): + symbols.append(s["symbol"]) + self._symbols_cache = sorted(symbols) + self._save_symbols_file(self._symbols_cache) + logger.info("Loaded %d USDT perpetual symbols", len(self._symbols_cache)) + return self._symbols_cache + except BinanceRateLimitedError: + cached = self._load_symbols_file() + if cached: + self._symbols_cache = cached + logger.info("Rate limited, using symbols file (%d)", len(cached)) + return cached + raise def clear_symbol_cache(self) -> None: self._symbols_cache = None async def get_24hr_tickers(self) -> list[dict]: - """单次请求获取全市场 24h 行情(用于缩小 K 线拉取范围)。""" data = await self._get("/fapi/v1/ticker/24hr") return data if isinstance(data, list) else [] diff --git a/backend/app/config.py b/backend/app/config.py index 8970375..9282736 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -26,6 +26,9 @@ class Settings(BaseSettings): ban_cooldown_sec: int = 90 max_retries: int = 5 candidate_pool: int = 150 + # today: ticker24h=仅1次API(滚动24h); yesterday: klines=按8:00切日精确统计 + today_data_mode: str = "ticker24h" + yesterday_data_mode: str = "klines" # 代理默认关闭;仅当 PROXY_ENABLED=true 时生效 proxy_enabled: bool = False proxy_url: str = "socks5h://192.168.8.4:1081" diff --git a/backend/app/exceptions.py b/backend/app/exceptions.py new file mode 100644 index 0000000..bde6bba --- /dev/null +++ b/backend/app/exceptions.py @@ -0,0 +1,7 @@ +class BinanceRateLimitedError(Exception): + """币安 418/429,IP 临时封禁。""" + + def __init__(self, retry_after_sec: int, path: str = ""): + self.retry_after_sec = retry_after_sec + self.path = path + super().__init__(f"rate limited {retry_after_sec}s on {path}") diff --git a/backend/app/main.py b/backend/app/main.py index ed78da4..d3b86fc 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -71,8 +71,9 @@ async def api_yesterday_top30(): } start, end = get_yesterday_period() try: - items = await aggregate_period(start, end) - return enrich_snapshot_meta(items, start, end) + mode = settings.yesterday_data_mode + items = await aggregate_period(start, end, mode=mode) + return enrich_snapshot_meta(items, start, end, data_mode=mode) except Exception as e: logger.error("api yesterday failed: %s", e) meta = enrich_snapshot_meta([], start, end) @@ -87,8 +88,9 @@ async def api_today_top30(): return cached start, end = get_today_period() try: - items = await aggregate_period(start, end, use_live_prices=True) - return enrich_snapshot_meta(items, start, end) + mode = settings.today_data_mode + items = await aggregate_period(start, end, use_live_prices=True, mode=mode) + return enrich_snapshot_meta(items, start, end, data_mode=mode) except Exception as e: logger.error("api today failed: %s", e) meta = enrich_snapshot_meta([], start, end) diff --git a/backend/app/scheduler.py b/backend/app/scheduler.py index 08063a7..6316b05 100644 --- a/backend/app/scheduler.py +++ b/backend/app/scheduler.py @@ -1,4 +1,3 @@ -import asyncio import logging from datetime import datetime @@ -9,8 +8,9 @@ from .aggregator import aggregate_period, enrich_snapshot_meta from .binance import binance_client from .config import settings from .db import get_latest_snapshot, init_db, log_push, save_snapshot, was_pushed_today +from .exceptions import BinanceRateLimitedError from .periods import get_today_period, get_yesterday_period, now_shanghai -from .state import set_today_cache +from .state import get_today_cache, set_today_cache from .wecom import build_markdown, send_wecom_markdown logger = logging.getLogger(__name__) @@ -18,32 +18,63 @@ logger = logging.getLogger(__name__) scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") +def _restore_today_from_db() -> bool: + snap = get_latest_snapshot("today") + if snap and snap.get("items"): + set_today_cache( + { + "period_start": snap["period_start"], + "period_end": snap["period_end"], + "updated_at": snap["created_at"], + "top_n": settings.top_n, + "volume_threshold": settings.volume_threshold, + "change_threshold": settings.change_threshold, + "data_mode": "cached", + "items": snap["items"], + } + ) + return True + return False + + async def job_finalize_yesterday() -> None: - """08:00 — compute and persist the closed yesterday period.""" logger.info("Job: finalize yesterday period") + if binance_client.is_rate_limited(): + logger.warning( + "Skip yesterday job — rate limited %ss", + binance_client.rate_limit_remaining_sec(), + ) + return try: binance_client.clear_symbol_cache() start, end = get_yesterday_period() - items = await aggregate_period(start, end, use_live_prices=False) + items = await aggregate_period( + start, end, use_live_prices=False, mode=settings.yesterday_data_mode + ) save_snapshot("yesterday", start, end, items) - logger.info("Yesterday snapshot saved: %s ~ %s, %d items", start, end, len(items)) + logger.info("Yesterday snapshot saved: %d items", len(items)) + except BinanceRateLimitedError as e: + logger.error("Finalize yesterday rate limited %ss", e.retry_after_sec) except Exception as e: logger.error("Finalize yesterday failed: %s", e) async def job_push_wecom() -> None: - """08:10 — push yesterday Top30 to WeCom.""" logger.info("Job: WeCom push") start, end = get_yesterday_period() snapshot = get_latest_snapshot("yesterday") - if not snapshot: - logger.info("No yesterday snapshot, computing now") - items = await aggregate_period(start, end, use_live_prices=False) - save_snapshot("yesterday", start, end, items) - snapshot = get_latest_snapshot("yesterday") + if not snapshot and not binance_client.is_rate_limited(): + try: + items = await aggregate_period( + start, end, use_live_prices=False, mode=settings.yesterday_data_mode + ) + save_snapshot("yesterday", start, end, items) + snapshot = get_latest_snapshot("yesterday") + except BinanceRateLimitedError as e: + logger.error("Push prep rate limited %ss", e.retry_after_sec) if not snapshot: - logger.error("Failed to get yesterday snapshot for push") + logger.error("No yesterday snapshot for push") return ps, pe = snapshot["period_start"], snapshot["period_end"] @@ -61,17 +92,33 @@ async def job_push_wecom() -> None: async def job_refresh_today() -> None: - """Refresh today period cache.""" - logger.info("Job: refresh today") + logger.info("Job: refresh today (mode=%s)", settings.today_data_mode) + if binance_client.is_rate_limited(): + sec = binance_client.rate_limit_remaining_sec() + logger.warning("Rate limited %ss — using DB/cache", sec) + if _restore_today_from_db(): + logger.info("Today restored from DB cache") + return try: start, end = get_today_period() - items = await aggregate_period(start, end, use_live_prices=True) - meta = enrich_snapshot_meta(items, start, end) + items = await aggregate_period( + start, + end, + use_live_prices=True, + mode=settings.today_data_mode, + ) + meta = enrich_snapshot_meta( + items, start, end, data_mode=settings.today_data_mode + ) save_snapshot("today", start, end, items) set_today_cache(meta) logger.info("Today cache refreshed: %d items", len(items)) + except BinanceRateLimitedError as e: + logger.error("Refresh today rate limited %ss — use cache", e.retry_after_sec) + _restore_today_from_db() except Exception as e: logger.error("Refresh today failed: %s", e) + _restore_today_from_db() async def startup_tasks() -> None: @@ -79,25 +126,36 @@ async def startup_tasks() -> None: now = now_shanghai() start_y, end_y = get_yesterday_period(now) + if binance_client.is_rate_limited(): + logger.warning( + "Startup: Binance rate limited ~%ss, skip API; use DB cache", + binance_client.rate_limit_remaining_sec(), + ) + _restore_today_from_db() + return + snap = get_latest_snapshot("yesterday") if not snap or snap.get("period_end") != end_y.isoformat(): try: logger.info("Startup: computing yesterday snapshot") - items = await aggregate_period(start_y, end_y, use_live_prices=False) + items = await aggregate_period( + start_y, end_y, use_live_prices=False, mode=settings.yesterday_data_mode + ) save_snapshot("yesterday", start_y, end_y, items) + except BinanceRateLimitedError as e: + logger.error("Startup yesterday rate limited %ss", e.retry_after_sec) except Exception as e: - logger.error("Startup yesterday snapshot failed (will retry on schedule): %s", e) + logger.error("Startup yesterday failed: %s", e) try: await job_refresh_today() except Exception as e: - logger.error("Startup today refresh failed (will retry on schedule): %s", e) + logger.error("Startup today refresh failed: %s", e) if now.hour > 8 or (now.hour == 8 and now.minute >= 10): ps, pe = start_y.isoformat(), end_y.isoformat() if not was_pushed_today(ps, pe) and settings.wecom_webhook_url.strip(): try: - logger.info("Startup: catch-up WeCom push") await job_push_wecom() except Exception as e: logger.error("Startup catch-up push failed: %s", e) @@ -124,7 +182,12 @@ def start_scheduler() -> None: ) if not scheduler.running: scheduler.start() - logger.info("Scheduler started (refresh every %d min)", settings.refresh_minutes) + logger.info( + "Scheduler started (today=%s, yesterday=%s, every %d min)", + settings.today_data_mode, + settings.yesterday_data_mode, + settings.refresh_minutes, + ) def stop_scheduler() -> None: