362 lines
13 KiB
Python
362 lines
13 KiB
Python
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from .config import ROOT_DIR, settings
|
|
from .exceptions import BinanceRateLimitedError
|
|
from .http_client import httpx_client_kwargs
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_RATE_LIMIT_CODES = {418, 429}
|
|
_SYMBOLS_CACHE_FILE = ROOT_DIR / "data" / "symbols_cache.json"
|
|
_SYMBOL_META_CACHE_FILE = ROOT_DIR / "data" / "symbol_meta_cache.json"
|
|
|
|
|
|
def _precision_from_tick_size(tick_size: str) -> int:
|
|
tick = tick_size.strip()
|
|
if "." not in tick:
|
|
return 0
|
|
dec = tick.split(".", 1)[1]
|
|
trimmed = dec.rstrip("0")
|
|
return len(trimmed) if trimmed else len(dec)
|
|
|
|
|
|
def _parse_symbol_price_meta(symbol_info: dict[str, Any]) -> dict[str, Any]:
|
|
tick_size = "0.01"
|
|
for f in symbol_info.get("filters", []):
|
|
if f.get("filterType") == "PRICE_FILTER":
|
|
tick_size = str(f.get("tickSize", tick_size))
|
|
break
|
|
precision = _precision_from_tick_size(tick_size)
|
|
api_precision = symbol_info.get("pricePrecision")
|
|
if isinstance(api_precision, int) and api_precision > precision:
|
|
precision = api_precision
|
|
return {"tick_size": tick_size, "price_precision": precision}
|
|
|
|
|
|
class BinanceFuturesClient:
|
|
def __init__(self) -> None:
|
|
self.base = settings.binance_fapi_base.rstrip("/")
|
|
self._symbols_cache: list[str] | None = None
|
|
self._symbol_meta_cache: dict[str, dict[str, Any]] | None = None
|
|
self._client: httpx.AsyncClient | None = None
|
|
self._throttle_lock = asyncio.Lock()
|
|
self._last_request_at: float = 0.0
|
|
self._ban_until: float = 0.0
|
|
|
|
def is_rate_limited(self) -> bool:
|
|
return time.monotonic() < self._ban_until
|
|
|
|
def rate_limit_remaining_sec(self) -> int:
|
|
return max(0, int(self._ban_until - time.monotonic()))
|
|
|
|
def _set_ban(self, retry_after: int) -> None:
|
|
retry_after = max(retry_after, settings.ban_cooldown_sec)
|
|
self._ban_until = max(self._ban_until, time.monotonic() + retry_after)
|
|
|
|
async def _ensure_client(self) -> httpx.AsyncClient:
|
|
if self._client is None or self._client.is_closed:
|
|
limits = httpx.Limits(
|
|
max_connections=max(2, settings.max_concurrency),
|
|
max_keepalive_connections=max(2, settings.max_concurrency),
|
|
)
|
|
self._client = httpx.AsyncClient(
|
|
timeout=30.0,
|
|
limits=limits,
|
|
**httpx_client_kwargs("binance"),
|
|
)
|
|
return self._client
|
|
|
|
async def close(self) -> None:
|
|
if self._client and not self._client.is_closed:
|
|
await self._client.aclose()
|
|
self._client = None
|
|
|
|
async def _throttle(self) -> None:
|
|
if self.is_rate_limited():
|
|
remaining = self.rate_limit_remaining_sec()
|
|
raise BinanceRateLimitedError(remaining, "throttle")
|
|
async with self._throttle_lock:
|
|
now = time.monotonic()
|
|
if now < self._ban_until:
|
|
raise BinanceRateLimitedError(
|
|
int(self._ban_until - now), "throttle"
|
|
)
|
|
gap = settings.request_interval_sec - (now - self._last_request_at)
|
|
if gap > 0:
|
|
await asyncio.sleep(gap)
|
|
self._last_request_at = time.monotonic()
|
|
|
|
async def _get(self, path: str, params: dict | None = None) -> Any:
|
|
url = f"{self.base}{path}"
|
|
last_err: Exception | None = None
|
|
|
|
for attempt in range(1, settings.max_retries + 1):
|
|
try:
|
|
await self._throttle()
|
|
except BinanceRateLimitedError:
|
|
raise
|
|
|
|
try:
|
|
client = await self._ensure_client()
|
|
resp = await client.get(url, params=params or {})
|
|
if resp.status_code in _RATE_LIMIT_CODES:
|
|
retry_after = int(
|
|
resp.headers.get("Retry-After", settings.ban_cooldown_sec)
|
|
)
|
|
self._set_ban(retry_after)
|
|
logger.error(
|
|
"Binance HTTP %s on %s — IP 封禁约 %ss,停止重试",
|
|
resp.status_code,
|
|
path,
|
|
retry_after,
|
|
)
|
|
raise BinanceRateLimitedError(retry_after, path)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
except BinanceRateLimitedError:
|
|
raise
|
|
except httpx.HTTPStatusError as e:
|
|
last_err = e
|
|
if e.response.status_code in _RATE_LIMIT_CODES:
|
|
raise
|
|
raise
|
|
except (httpx.ConnectError, httpx.ReadTimeout) as e:
|
|
last_err = e
|
|
logger.warning("Binance network error %s (attempt %d)", path, attempt)
|
|
await asyncio.sleep(min(2 * attempt, 10))
|
|
|
|
raise last_err or RuntimeError(f"Binance request failed: {path}")
|
|
|
|
def _load_symbols_file(self) -> list[str] | None:
|
|
try:
|
|
if _SYMBOLS_CACHE_FILE.exists():
|
|
data = json.loads(_SYMBOLS_CACHE_FILE.read_text(encoding="utf-8"))
|
|
if isinstance(data, list) and data:
|
|
return sorted(data)
|
|
except Exception as e:
|
|
logger.warning("Load symbols cache file failed: %s", e)
|
|
return None
|
|
|
|
def _save_symbols_file(self, symbols: list[str]) -> None:
|
|
try:
|
|
_SYMBOLS_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
_SYMBOLS_CACHE_FILE.write_text(
|
|
json.dumps(symbols, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Save symbols cache file failed: %s", e)
|
|
|
|
def _load_symbol_meta_file(self) -> dict[str, dict[str, Any]] | None:
|
|
try:
|
|
if _SYMBOL_META_CACHE_FILE.is_file():
|
|
data = json.loads(_SYMBOL_META_CACHE_FILE.read_text(encoding="utf-8"))
|
|
if isinstance(data, dict):
|
|
return data
|
|
except Exception as e:
|
|
logger.warning("Load symbol meta cache file failed: %s", e)
|
|
return None
|
|
|
|
def _save_symbol_meta_file(self, meta: dict[str, dict[str, Any]]) -> None:
|
|
try:
|
|
_SYMBOL_META_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
_SYMBOL_META_CACHE_FILE.write_text(
|
|
json.dumps(meta, ensure_ascii=False),
|
|
encoding="utf-8",
|
|
)
|
|
except Exception as e:
|
|
logger.warning("Save symbol meta cache file failed: %s", e)
|
|
|
|
async def _ensure_symbol_meta(self) -> dict[str, dict[str, Any]]:
|
|
if self._symbol_meta_cache:
|
|
return self._symbol_meta_cache
|
|
|
|
if self.is_rate_limited():
|
|
cached = self._load_symbol_meta_file()
|
|
if cached:
|
|
self._symbol_meta_cache = cached
|
|
return cached
|
|
raise BinanceRateLimitedError(self.rate_limit_remaining_sec(), "symbol_meta")
|
|
|
|
try:
|
|
info = await self._get("/fapi/v1/exchangeInfo")
|
|
meta: dict[str, dict[str, Any]] = {}
|
|
for s in info.get("symbols", []):
|
|
if (
|
|
s.get("contractType") == "PERPETUAL"
|
|
and s.get("quoteAsset") == "USDT"
|
|
and s.get("status") == "TRADING"
|
|
):
|
|
sym = s["symbol"]
|
|
meta[sym] = _parse_symbol_price_meta(s)
|
|
self._symbol_meta_cache = meta
|
|
self._save_symbol_meta_file(meta)
|
|
return meta
|
|
except BinanceRateLimitedError:
|
|
cached = self._load_symbol_meta_file()
|
|
if cached:
|
|
self._symbol_meta_cache = cached
|
|
return cached
|
|
raise
|
|
|
|
async def get_symbol_price_meta(self, symbol: str) -> dict[str, Any]:
|
|
sym = symbol.upper().strip()
|
|
meta_map = await self._ensure_symbol_meta()
|
|
if sym in meta_map:
|
|
return meta_map[sym]
|
|
return {"tick_size": "0.01", "price_precision": 2}
|
|
|
|
def clear_symbol_cache(self) -> None:
|
|
self._symbols_cache = None
|
|
self._symbol_meta_cache = None
|
|
|
|
async def get_usdt_perpetual_symbols(self) -> list[str]:
|
|
if self._symbols_cache:
|
|
return self._symbols_cache
|
|
|
|
if self.is_rate_limited():
|
|
cached = self._load_symbols_file()
|
|
if cached:
|
|
self._symbols_cache = cached
|
|
logger.info("Using cached symbols file (%d)", len(cached))
|
|
return cached
|
|
raise BinanceRateLimitedError(self.rate_limit_remaining_sec(), "symbols")
|
|
|
|
try:
|
|
info = await self._get("/fapi/v1/exchangeInfo")
|
|
symbols = []
|
|
meta: dict[str, dict[str, Any]] = {}
|
|
for s in info.get("symbols", []):
|
|
if (
|
|
s.get("contractType") == "PERPETUAL"
|
|
and s.get("quoteAsset") == "USDT"
|
|
and s.get("status") == "TRADING"
|
|
):
|
|
sym = s["symbol"]
|
|
symbols.append(sym)
|
|
meta[sym] = _parse_symbol_price_meta(s)
|
|
self._symbols_cache = sorted(symbols)
|
|
self._symbol_meta_cache = meta
|
|
self._save_symbols_file(self._symbols_cache)
|
|
self._save_symbol_meta_file(meta)
|
|
logger.info("Loaded %d USDT perpetual symbols", len(self._symbols_cache))
|
|
return self._symbols_cache
|
|
except BinanceRateLimitedError:
|
|
cached = self._load_symbols_file()
|
|
if cached:
|
|
self._symbols_cache = cached
|
|
logger.info("Rate limited, using symbols file (%d)", len(cached))
|
|
return cached
|
|
raise
|
|
|
|
async def get_24hr_tickers(self) -> list[dict]:
|
|
data = await self._get("/fapi/v1/ticker/24hr")
|
|
return data if isinstance(data, list) else []
|
|
|
|
async def get_klines(
|
|
self,
|
|
symbol: str,
|
|
start_ms: int,
|
|
end_ms: int,
|
|
interval: str = "1h",
|
|
) -> list[list]:
|
|
all_klines: list[list] = []
|
|
cursor = start_ms
|
|
while cursor < end_ms:
|
|
batch = await self._get(
|
|
"/fapi/v1/klines",
|
|
{
|
|
"symbol": symbol,
|
|
"interval": interval,
|
|
"startTime": cursor,
|
|
"endTime": end_ms,
|
|
"limit": 1500,
|
|
},
|
|
)
|
|
if not batch:
|
|
break
|
|
all_klines.extend(batch)
|
|
last_open = int(batch[-1][0])
|
|
next_cursor = last_open + 3600_000
|
|
if next_cursor <= cursor:
|
|
break
|
|
cursor = next_cursor
|
|
if len(batch) < 1500:
|
|
break
|
|
return all_klines
|
|
|
|
async def get_price(self, symbol: str) -> float:
|
|
data = await self._get("/fapi/v1/ticker/price", {"symbol": symbol})
|
|
return float(data["price"])
|
|
|
|
async def get_prices_batch(self, symbols: list[str]) -> dict[str, float]:
|
|
tickers = await self._get("/fapi/v1/ticker/price")
|
|
sym_set = set(symbols)
|
|
return {t["symbol"]: float(t["price"]) for t in tickers if t["symbol"] in sym_set}
|
|
|
|
async def get_premium_index_all(self) -> dict[str, dict]:
|
|
data = await self._get("/fapi/v1/premiumIndex")
|
|
if isinstance(data, dict):
|
|
items = [data]
|
|
else:
|
|
items = data or []
|
|
return {str(x["symbol"]).upper(): x for x in items if x.get("symbol")}
|
|
|
|
async def get_funding_rate_history(self, symbol: str, limit: int = 90) -> list[dict]:
|
|
raw = await self._get(
|
|
"/fapi/v1/fundingRate",
|
|
{"symbol": symbol.upper(), "limit": min(limit, 1000)},
|
|
)
|
|
rows = []
|
|
for r in raw or []:
|
|
rows.append(
|
|
{
|
|
"time": int(r["fundingTime"]),
|
|
"rate": float(r["fundingRate"]),
|
|
"mark_price": float(r.get("markPrice", 0) or 0),
|
|
}
|
|
)
|
|
return rows
|
|
|
|
def _parse_kline_rows(self, raw: list | None) -> list[dict]:
|
|
candles = []
|
|
for k in raw or []:
|
|
candles.append(
|
|
{
|
|
"time": int(k[0]),
|
|
"open": float(k[1]),
|
|
"high": float(k[2]),
|
|
"low": float(k[3]),
|
|
"close": float(k[4]),
|
|
"volume": float(k[5]),
|
|
"quote_volume": float(k[7]),
|
|
}
|
|
)
|
|
return candles
|
|
|
|
async def get_klines_limit(
|
|
self, symbol: str, interval: str, limit: int = 500
|
|
) -> list[dict]:
|
|
raw = await self._get(
|
|
"/fapi/v1/klines",
|
|
{
|
|
"symbol": symbol.upper(),
|
|
"interval": interval,
|
|
"limit": min(limit, 1500),
|
|
},
|
|
)
|
|
return self._parse_kline_rows(raw)
|
|
|
|
async def get_daily_klines(self, symbol: str, limit: int = 300) -> list[dict]:
|
|
return await self.get_klines_limit(symbol, "1d", limit)
|
|
|
|
|
|
binance_client = BinanceFuturesClient()
|