diff --git a/.env.example b/.env.example index 5933367..d5c58a9 100644 --- a/.env.example +++ b/.env.example @@ -12,3 +12,9 @@ PROXY_ENABLED=false PROXY_URL=socks5h://192.168.8.4:1081 # 代理范围:binance=仅币安 | wecom=仅企微 | all=全部外网请求 PROXY_FOR=binance + +# 币安 API 限速(418=IP 临时封禁,请勿把 MAX_CONCURRENCY 调太大) +MAX_CONCURRENCY=3 +REQUEST_INTERVAL_SEC=0.15 +BAN_COOLDOWN_SEC=90 +CANDIDATE_POOL=150 diff --git a/DEPLOY.md b/DEPLOY.md index 1e1a842..c3b6564 100644 --- a/DEPLOY.md +++ b/DEPLOY.md @@ -286,6 +286,7 @@ git pull | `cannot pull with rebase: unstaged changes` | 执行 `git stash` 后重试;或 `DEPLOY_SKIP_GIT_PULL=1 ./deploy/pm2-deploy.sh` 跳过拉取 | | `No module named pip` | 执行 `sudo apt install -y python3-venv` 后重新 `./deploy/pm2-deploy.sh`(脚本会用 .venv) | | Web 无数据 | 检查能否访问币安;国内服务器尝试 `PROXY_ENABLED=true` | +| 大量 `418 I'm a teapot` | 币安 IP 限流封禁;保持 `MAX_CONCURRENCY=3`,等待 2 分钟后 `pm2 restart`;或开代理 | | 企微收不到 | 检查 `WECOM_WEBHOOK_URL`;`curl -X POST .../api/push/test` | | 08:10 未推送 | 确认容器/PM2 在 08:10 前已运行;查日志 | | 端口占用 | `ss -tlnp \| grep 21450` 或改 `.env` 中 `PORT` | diff --git a/README.md b/README.md index 8c35a08..c9622e7 100644 --- a/README.md +++ b/README.md @@ -50,6 +50,8 @@ python run.py | `PROXY_ENABLED` | 是否启用 SOCKS5 代理 | false | | `PROXY_URL` | 代理地址 | socks5h://192.168.8.4:1081 | | `PROXY_FOR` | 代理范围 binance/wecom/all | binance | +| `MAX_CONCURRENCY` | 币安 K 线并发数(过大易 418 封禁) | 3 | +| `CANDIDATE_POOL` | 预筛候选合约数(按 24h 成交额) | 150 | ## API | 方法 | 路径 | 说明 | diff --git a/backend/app/aggregator.py b/backend/app/aggregator.py index d6d4aa8..4e6c520 100644 --- a/backend/app/aggregator.py +++ b/backend/app/aggregator.py @@ -85,12 +85,36 @@ async def _fetch_symbol_stats( return None +async def _pick_candidate_symbols(symbols: list[str]) -> list[str]: + """用 24h ticker 成交额预筛,避免对全市场并发拉 K 线触发 418 封禁。""" + try: + tickers = await binance_client.get_24hr_tickers() + vol_map = { + t["symbol"]: float(t.get("quoteVolume", 0) or 0) + for t in tickers + if t.get("symbol") in symbols + } + ranked = sorted(symbols, key=lambda s: vol_map.get(s, 0.0), reverse=True) + pool = min(settings.candidate_pool, len(ranked)) + picked = ranked[:pool] + logger.info( + "Candidate pool: %d / %d symbols (by 24h quoteVolume)", + len(picked), + len(symbols), + ) + return picked + except Exception as e: + logger.warning("24hr ticker prescreen failed, using full list: %s", e) + return symbols + + async def aggregate_period( start: datetime, end: datetime, use_live_prices: bool = False, ) -> list[dict]: symbols = await binance_client.get_usdt_perpetual_symbols() + candidates = await _pick_candidate_symbols(symbols) start_ms = to_ms(start) end_ms = to_ms(end) @@ -103,8 +127,15 @@ async def aggregate_period( sem = asyncio.Semaphore(settings.max_concurrency) tasks = [ - _fetch_symbol_stats(s, start_ms, end_ms, prices, sem) for s in symbols + _fetch_symbol_stats(s, start_ms, end_ms, prices, sem) for s in candidates ] + logger.info( + "Aggregating period %s ~ %s (%d symbols, concurrency=%d)", + start.isoformat(), + end.isoformat(), + len(candidates), + settings.max_concurrency, + ) results = await asyncio.gather(*tasks) stats = [r for r in results if r is not None and r.quote_volume > 0] stats.sort(key=lambda x: x.quote_volume, reverse=True) diff --git a/backend/app/binance.py b/backend/app/binance.py index c0d2c80..20416c2 100644 --- a/backend/app/binance.py +++ b/backend/app/binance.py @@ -1,5 +1,6 @@ import asyncio import logging +import time from typing import Any import httpx @@ -9,20 +10,92 @@ from .http_client import httpx_client_kwargs logger = logging.getLogger(__name__) +# 418 = IP 被币安临时封禁(请求过快);429 = 触发频率限制 +_RATE_LIMIT_CODES = {418, 429} + class BinanceFuturesClient: def __init__(self) -> None: self.base = settings.binance_fapi_base.rstrip("/") self._symbols_cache: list[str] | None = None + self._client: httpx.AsyncClient | None = None + self._throttle_lock = asyncio.Lock() + self._last_request_at: float = 0.0 + self._ban_until: float = 0.0 + + async def _ensure_client(self) -> httpx.AsyncClient: + if self._client is None or self._client.is_closed: + limits = httpx.Limits( + max_connections=max(2, settings.max_concurrency), + max_keepalive_connections=max(2, settings.max_concurrency), + ) + self._client = httpx.AsyncClient( + timeout=30.0, + limits=limits, + **httpx_client_kwargs("binance"), + ) + return self._client + + async def close(self) -> None: + if self._client and not self._client.is_closed: + await self._client.aclose() + self._client = None + + async def _throttle(self) -> None: + async with self._throttle_lock: + now = time.monotonic() + if now < self._ban_until: + wait = self._ban_until - now + logger.warning("Binance IP cooldown, sleeping %.0fs", wait) + await asyncio.sleep(wait) + gap = settings.request_interval_sec - (now - self._last_request_at) + if gap > 0: + await asyncio.sleep(gap) + self._last_request_at = time.monotonic() async def _get(self, path: str, params: dict | None = None) -> Any: url = f"{self.base}{path}" - async with httpx.AsyncClient( - timeout=30.0, **httpx_client_kwargs("binance") - ) as client: - resp = await client.get(url, params=params or {}) - resp.raise_for_status() - return resp.json() + last_err: Exception | None = None + + for attempt in range(1, settings.max_retries + 1): + await self._throttle() + try: + client = await self._ensure_client() + resp = await client.get(url, params=params or {}) + if resp.status_code in _RATE_LIMIT_CODES: + retry_after = int( + resp.headers.get("Retry-After", settings.ban_cooldown_sec) + ) + retry_after = max(retry_after, settings.ban_cooldown_sec) + self._ban_until = time.monotonic() + retry_after + logger.warning( + "Binance HTTP %s on %s, cooldown %ss (attempt %d/%d)", + resp.status_code, + path, + retry_after, + attempt, + settings.max_retries, + ) + last_err = httpx.HTTPStatusError( + f"{resp.status_code}", + request=resp.request, + response=resp, + ) + await asyncio.sleep(retry_after) + continue + resp.raise_for_status() + return resp.json() + except httpx.HTTPStatusError as e: + last_err = e + if e.response.status_code in _RATE_LIMIT_CODES: + continue + raise + except (httpx.ConnectError, httpx.ReadTimeout) as e: + last_err = e + logger.warning("Binance request error %s (attempt %d)", path, attempt) + await asyncio.sleep(min(2 * attempt, 10)) + + raise last_err or RuntimeError(f"Binance request failed: {path}") async def get_usdt_perpetual_symbols(self) -> list[str]: if self._symbols_cache: @@ -43,6 +116,11 @@ class BinanceFuturesClient: def clear_symbol_cache(self) -> None: self._symbols_cache = None + async def get_24hr_tickers(self) -> list[dict]: + """单次请求获取全市场 24h 行情(用于缩小 K 线拉取范围)。""" + data = await self._get("/fapi/v1/ticker/24hr") + return data if isinstance(data, list) else [] + async def get_klines( self, symbol: str, diff --git a/backend/app/config.py b/backend/app/config.py index aee4299..8970375 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -21,7 +21,11 @@ class Settings(BaseSettings): host: str = "127.0.0.1" port: int = 21450 db_path: str = str(ROOT_DIR / "data" / "monitor.db") - max_concurrency: int = 20 + max_concurrency: int = 3 + request_interval_sec: float = 0.15 + ban_cooldown_sec: int = 90 + max_retries: int = 5 + candidate_pool: int = 150 # 代理默认关闭;仅当 PROXY_ENABLED=true 时生效 proxy_enabled: bool = False proxy_url: str = "socks5h://192.168.8.4:1081" diff --git a/backend/app/main.py b/backend/app/main.py index 361b612..ed78da4 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -18,6 +18,8 @@ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("httpcore").setLevel(logging.WARNING) logger = logging.getLogger(__name__) WEB_DIR = ROOT_DIR / "web"