修改限速

This commit is contained in:
dekun
2026-05-22 13:24:03 +08:00
parent 83d6b19b5e
commit ee4ee87e37
7 changed files with 132 additions and 8 deletions
+32 -1
View File
@@ -85,12 +85,36 @@ async def _fetch_symbol_stats(
return None
async def _pick_candidate_symbols(symbols: list[str]) -> list[str]:
"""用 24h ticker 成交额预筛,避免对全市场并发拉 K 线触发 418 封禁。"""
try:
tickers = await binance_client.get_24hr_tickers()
vol_map = {
t["symbol"]: float(t.get("quoteVolume", 0) or 0)
for t in tickers
if t.get("symbol") in symbols
}
ranked = sorted(symbols, key=lambda s: vol_map.get(s, 0.0), reverse=True)
pool = min(settings.candidate_pool, len(ranked))
picked = ranked[:pool]
logger.info(
"Candidate pool: %d / %d symbols (by 24h quoteVolume)",
len(picked),
len(symbols),
)
return picked
except Exception as e:
logger.warning("24hr ticker prescreen failed, using full list: %s", e)
return symbols
async def aggregate_period(
start: datetime,
end: datetime,
use_live_prices: bool = False,
) -> list[dict]:
symbols = await binance_client.get_usdt_perpetual_symbols()
candidates = await _pick_candidate_symbols(symbols)
start_ms = to_ms(start)
end_ms = to_ms(end)
@@ -103,8 +127,15 @@ async def aggregate_period(
sem = asyncio.Semaphore(settings.max_concurrency)
tasks = [
_fetch_symbol_stats(s, start_ms, end_ms, prices, sem) for s in symbols
_fetch_symbol_stats(s, start_ms, end_ms, prices, sem) for s in candidates
]
logger.info(
"Aggregating period %s ~ %s (%d symbols, concurrency=%d)",
start.isoformat(),
end.isoformat(),
len(candidates),
settings.max_concurrency,
)
results = await asyncio.gather(*tasks)
stats = [r for r in results if r is not None and r.quote_volume > 0]
stats.sort(key=lambda x: x.quote_volume, reverse=True)
+84 -6
View File
@@ -1,5 +1,6 @@
import asyncio
import logging
import time
from typing import Any
import httpx
@@ -9,20 +10,92 @@ from .http_client import httpx_client_kwargs
logger = logging.getLogger(__name__)
# 418 = IP 被币安临时封禁(请求过快);429 = 触发频率限制
_RATE_LIMIT_CODES = {418, 429}
class BinanceFuturesClient:
def __init__(self) -> None:
self.base = settings.binance_fapi_base.rstrip("/")
self._symbols_cache: list[str] | None = None
self._client: httpx.AsyncClient | None = None
self._throttle_lock = asyncio.Lock()
self._last_request_at: float = 0.0
self._ban_until: float = 0.0
async def _ensure_client(self) -> httpx.AsyncClient:
if self._client is None or self._client.is_closed:
limits = httpx.Limits(
max_connections=max(2, settings.max_concurrency),
max_keepalive_connections=max(2, settings.max_concurrency),
)
self._client = httpx.AsyncClient(
timeout=30.0,
limits=limits,
**httpx_client_kwargs("binance"),
)
return self._client
async def close(self) -> None:
if self._client and not self._client.is_closed:
await self._client.aclose()
self._client = None
async def _throttle(self) -> None:
async with self._throttle_lock:
now = time.monotonic()
if now < self._ban_until:
wait = self._ban_until - now
logger.warning("Binance IP cooldown, sleeping %.0fs", wait)
await asyncio.sleep(wait)
gap = settings.request_interval_sec - (now - self._last_request_at)
if gap > 0:
await asyncio.sleep(gap)
self._last_request_at = time.monotonic()
async def _get(self, path: str, params: dict | None = None) -> Any:
url = f"{self.base}{path}"
async with httpx.AsyncClient(
timeout=30.0, **httpx_client_kwargs("binance")
) as client:
resp = await client.get(url, params=params or {})
resp.raise_for_status()
return resp.json()
last_err: Exception | None = None
for attempt in range(1, settings.max_retries + 1):
await self._throttle()
try:
client = await self._ensure_client()
resp = await client.get(url, params=params or {})
if resp.status_code in _RATE_LIMIT_CODES:
retry_after = int(
resp.headers.get("Retry-After", settings.ban_cooldown_sec)
)
retry_after = max(retry_after, settings.ban_cooldown_sec)
self._ban_until = time.monotonic() + retry_after
logger.warning(
"Binance HTTP %s on %s, cooldown %ss (attempt %d/%d)",
resp.status_code,
path,
retry_after,
attempt,
settings.max_retries,
)
last_err = httpx.HTTPStatusError(
f"{resp.status_code}",
request=resp.request,
response=resp,
)
await asyncio.sleep(retry_after)
continue
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
last_err = e
if e.response.status_code in _RATE_LIMIT_CODES:
continue
raise
except (httpx.ConnectError, httpx.ReadTimeout) as e:
last_err = e
logger.warning("Binance request error %s (attempt %d)", path, attempt)
await asyncio.sleep(min(2 * attempt, 10))
raise last_err or RuntimeError(f"Binance request failed: {path}")
async def get_usdt_perpetual_symbols(self) -> list[str]:
if self._symbols_cache:
@@ -43,6 +116,11 @@ class BinanceFuturesClient:
def clear_symbol_cache(self) -> None:
self._symbols_cache = None
async def get_24hr_tickers(self) -> list[dict]:
"""单次请求获取全市场 24h 行情(用于缩小 K 线拉取范围)。"""
data = await self._get("/fapi/v1/ticker/24hr")
return data if isinstance(data, list) else []
async def get_klines(
self,
symbol: str,
+5 -1
View File
@@ -21,7 +21,11 @@ class Settings(BaseSettings):
host: str = "127.0.0.1"
port: int = 21450
db_path: str = str(ROOT_DIR / "data" / "monitor.db")
max_concurrency: int = 20
max_concurrency: int = 3
request_interval_sec: float = 0.15
ban_cooldown_sec: int = 90
max_retries: int = 5
candidate_pool: int = 150
# 代理默认关闭;仅当 PROXY_ENABLED=true 时生效
proxy_enabled: bool = False
proxy_url: str = "socks5h://192.168.8.4:1081"
+2
View File
@@ -18,6 +18,8 @@ logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
WEB_DIR = ROOT_DIR / "web"