174 lines
6.4 KiB
Python
174 lines
6.4 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
from .config import GateConfig
|
|
from .proxy_util import httpx_proxy_url
|
|
|
|
|
|
def _to_gate_interval(bar: str) -> str:
|
|
b = (bar or "").strip()
|
|
mapping = {
|
|
"1m": "1m",
|
|
"3m": "3m",
|
|
"5m": "5m",
|
|
"15m": "15m",
|
|
"30m": "30m",
|
|
"1H": "1h",
|
|
"2H": "2h",
|
|
"4H": "4h",
|
|
"6H": "6h",
|
|
"8H": "8h",
|
|
"12H": "12h",
|
|
"1D": "1d",
|
|
"1W": "7d",
|
|
"1M": "1M",
|
|
}
|
|
if b in mapping:
|
|
return mapping[b]
|
|
if len(b) >= 2 and b.endswith("H") and b[:-1].isdigit():
|
|
return f"{b[:-1]}h"
|
|
if len(b) >= 2 and b.endswith("D"):
|
|
return b[:-1] + "d"
|
|
return b.lower()
|
|
|
|
|
|
def _candle_row(obj: dict[str, Any]) -> list[str]:
|
|
ts_ms = str(int(float(obj["t"])) * 1000)
|
|
o = str(obj.get("o") or "")
|
|
h = str(obj.get("h") or "")
|
|
l = str(obj.get("l") or "")
|
|
c = str(obj.get("c") or "")
|
|
v = str(obj.get("v") or "")
|
|
sum_q = str(obj.get("sum") or "")
|
|
return [ts_ms, o, h, l, c, v, v, sum_q, "1"]
|
|
|
|
|
|
def _is_linear_usdt_perp_contract(item: dict[str, Any]) -> bool:
|
|
name = str(item.get("name") or "")
|
|
parts = name.split("_")
|
|
if len(parts) != 2 or parts[1].upper() != "USDT":
|
|
return False
|
|
if item.get("in_delisting") is True:
|
|
return False
|
|
return True
|
|
|
|
|
|
class GateClient:
|
|
"""Gate.io USDT 结算永续合约公共行情(REST v4)。"""
|
|
|
|
def __init__(self, conf: GateConfig, proxy_url: str | None = None) -> None:
|
|
self.conf = conf
|
|
self._proxy = httpx_proxy_url(proxy_url.strip() if proxy_url and str(proxy_url).strip() else None)
|
|
self.timeout = httpx.Timeout(10.0, read=16.0)
|
|
self._candle_sem = asyncio.Semaphore(3)
|
|
|
|
def _base_url(self) -> str:
|
|
return str(self.conf.api_base).rstrip("/")
|
|
|
|
def _futures_prefix(self) -> str:
|
|
return f"{self._base_url()}/futures/{self.conf.settle.strip().lower()}"
|
|
|
|
def _client_kwargs(self, timeout: httpx.Timeout) -> dict:
|
|
if self._proxy:
|
|
return {"timeout": timeout, "proxy": self._proxy, "trust_env": False}
|
|
return {"timeout": timeout, "trust_env": True}
|
|
|
|
def symbol_to_swap_inst_id(self, symbol: str) -> str:
|
|
base = symbol.strip().upper()
|
|
return f"{base}_{self.conf.quote_currency.upper()}"
|
|
|
|
def inst_id_to_base_symbol(self, inst_id: str) -> str:
|
|
inst = inst_id.strip().upper()
|
|
suf = f"_{self.conf.quote_currency.upper()}"
|
|
if inst.endswith(suf):
|
|
return inst[: -len(suf)]
|
|
return inst.split("_")[0].upper() if "_" in inst else inst
|
|
|
|
async def _fetch_contracts(self) -> list[dict[str, Any]]:
|
|
url = f"{self._futures_prefix()}/contracts"
|
|
async with httpx.AsyncClient(**self._client_kwargs(self.timeout)) as client:
|
|
resp = await client.get(url)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
if not isinstance(data, list):
|
|
raise RuntimeError(f"Gate contracts unexpected payload: {type(data)}")
|
|
return data
|
|
|
|
async def list_live_usdt_swap_inst_ids(self) -> list[str]:
|
|
"""全部 USDT 本位线性永续合约名(如 BTC_USDT),剔除交割/下架中的条目。"""
|
|
data = await self._fetch_contracts()
|
|
out: list[str] = []
|
|
for item in data:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
if not _is_linear_usdt_perp_contract(item):
|
|
continue
|
|
name = str(item.get("name") or "").strip()
|
|
if name:
|
|
out.append(name)
|
|
return sorted(set(out))
|
|
|
|
async def get_perpetual_symbols(self) -> set[str]:
|
|
ids = await self.list_live_usdt_swap_inst_ids()
|
|
return {self.inst_id_to_base_symbol(i) for i in ids}
|
|
|
|
async def get_candles(self, inst_id: str, bar: str, limit: int = 120) -> list[list[str]]:
|
|
"""
|
|
返回按时间正序排列的 K 线列表(与旧 OKX 行格式对齐便于下游逻辑):
|
|
[ts_ms, o, h, l, c, vol, vol_dup, sum_quote, confirm]
|
|
"""
|
|
interval = _to_gate_interval(bar)
|
|
lim = max(1, min(int(limit), 2000))
|
|
url = f"{self._futures_prefix()}/candlesticks"
|
|
params = {"contract": inst_id, "interval": interval, "limit": str(lim)}
|
|
async with self._candle_sem:
|
|
await asyncio.sleep(0.12)
|
|
async with httpx.AsyncClient(**self._client_kwargs(self.timeout)) as client:
|
|
resp = await client.get(url, params=params)
|
|
resp.raise_for_status()
|
|
payload = resp.json()
|
|
if not isinstance(payload, list):
|
|
raise RuntimeError(f"Gate candlesticks error: {payload}")
|
|
rows: list[list[str]] = []
|
|
for item in payload:
|
|
if isinstance(item, dict) and "t" in item:
|
|
rows.append(_candle_row(item))
|
|
rows.sort(key=lambda r: int(r[0]) if r and r[0].isdigit() else 0)
|
|
return rows
|
|
|
|
async def get_usdt_swap_est_quote_volume_map(self) -> dict[str, float]:
|
|
"""
|
|
合约名 -> 近 24h 计价币种成交额(USDT)。
|
|
优先使用 ticker 的 volume_24h_quote;缺失时再尝试简单估算。
|
|
"""
|
|
url = f"{self._futures_prefix()}/tickers"
|
|
tick_timeout = httpx.Timeout(15.0, read=90.0)
|
|
async with httpx.AsyncClient(**self._client_kwargs(tick_timeout)) as client:
|
|
resp = await client.get(url)
|
|
resp.raise_for_status()
|
|
payload = resp.json()
|
|
if not isinstance(payload, list):
|
|
raise RuntimeError(f"Gate tickers error: {type(payload)}")
|
|
out: dict[str, float] = {}
|
|
for item in payload:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
contract = str(item.get("contract") or "").strip()
|
|
if not contract.endswith("_USDT"):
|
|
continue
|
|
vol_quote = item.get("volume_24h_quote") or item.get("volume_24h_usd")
|
|
try:
|
|
if vol_quote is not None and str(vol_quote).strip():
|
|
out[contract] = max(0.0, float(vol_quote))
|
|
continue
|
|
last = float(item.get("last") or 0)
|
|
vol_base = float(item.get("volume_24h_base") or item.get("volume_24h") or 0)
|
|
out[contract] = max(0.0, vol_base * last)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
return out
|