fix: exchange-specific volume rank APIs for OKX and full top20

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-08 15:53:41 +08:00
parent 4bf0c2363f
commit 89a58c7323
8 changed files with 286 additions and 37 deletions
+1
View File
@@ -8181,6 +8181,7 @@ def _hub_fetch_volume_rank(top_n=20):
exchange=exchange, exchange=exchange,
ensure_markets_loaded=ensure_markets_loaded, ensure_markets_loaded=ensure_markets_loaded,
top_n=top_n, top_n=top_n,
exchange_id="binance",
) )
+1
View File
@@ -8241,6 +8241,7 @@ def _hub_fetch_volume_rank(top_n=20):
exchange=exchange, exchange=exchange,
ensure_markets_loaded=ensure_markets_loaded, ensure_markets_loaded=ensure_markets_loaded,
top_n=top_n, top_n=top_n,
exchange_id="gateio",
) )
+1
View File
@@ -7937,6 +7937,7 @@ def _hub_fetch_volume_rank(top_n=20):
exchange=exchange, exchange=exchange,
ensure_markets_loaded=ensure_markets_loaded, ensure_markets_loaded=ensure_markets_loaded,
top_n=top_n, top_n=top_n,
exchange_id="gateio",
) )
+1
View File
@@ -7864,6 +7864,7 @@ def _hub_fetch_volume_rank(top_n=20):
exchange=exchange, exchange=exchange,
ensure_markets_loaded=ensure_markets_loaded, ensure_markets_loaded=ensure_markets_loaded,
top_n=top_n, top_n=top_n,
exchange_id="okx",
) )
+254 -34
View File
@@ -4,7 +4,6 @@ from __future__ import annotations
import json import json
import os import os
import time
from datetime import datetime, timedelta from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
@@ -13,7 +12,7 @@ from zoneinfo import ZoneInfo
from hub_trades_lib import trading_day_from_dt from hub_trades_lib import trading_day_from_dt
TOP_N_DEFAULT = 20 TOP_N_DEFAULT = 20
CACHE_VERSION = 1 CACHE_VERSION = 2
def volume_rank_reset_hour() -> int: def volume_rank_reset_hour() -> int:
@@ -85,6 +84,12 @@ def _ticker_base(sym_text: str) -> str:
return s return s
def _hub_symbol_from_base(base: str, quote: str = "USDT") -> str:
b = str(base or "").strip().upper()
q = str(quote or "USDT").strip().upper()
return f"{b}/{q}" if b else ""
def _hub_symbol_from_market(market: dict | None, fallback_symbol: str) -> str: def _hub_symbol_from_market(market: dict | None, fallback_symbol: str) -> str:
if market: if market:
base = str(market.get("base") or "").strip().upper() base = str(market.get("base") or "").strip().upper()
@@ -100,29 +105,73 @@ def _hub_symbol_from_market(market: dict | None, fallback_symbol: str) -> str:
return f"{base}/USDT" if base else fb return f"{base}/USDT" if base else fb
def _quote_volume_from_ticker(ticker: dict | None, market: dict | None) -> float | None: def _okx_turnover_usdt(row: dict | None) -> float | None:
"""OKX SWAP:成交额(USDT) ≈ volCcy24h(基础币) × last。"""
if not isinstance(row, dict):
return None
base_vol = _safe_float(row.get("volCcy24h"))
if base_vol is None or base_vol <= 0:
return None
last = _safe_float(row.get("last") or row.get("lastPx"))
if last is None or last <= 0:
return None
return float(base_vol * last)
def _quote_volume_from_ticker(
ticker: dict | None,
market: dict | None,
*,
exchange_id: str = "",
) -> float | None:
ex_id = str(exchange_id or "").lower()
t = ticker or {} t = ticker or {}
info = t.get("info") if isinstance(t.get("info"), dict) else {}
if ex_id == "okx":
row = dict(info)
if row.get("last") is None:
row["last"] = t.get("last")
qv = _okx_turnover_usdt(row)
if qv is not None and qv > 0:
return qv
qv = _safe_float(t.get("quoteVolume")) qv = _safe_float(t.get("quoteVolume"))
if qv is not None and qv > 0: if qv is not None and qv > 0:
return qv return qv
info = t.get("info") if isinstance(t.get("info"), dict) else {}
for key in ("quoteVolume", "volCcy24h", "vol24h", "turnover24h", "amount24"): if ex_id in ("gateio", "gate"):
for key in (
"volume_24h_quote",
"volume_24h_settle",
"quote_volume",
"vol_24h",
"turnover",
):
qv = _safe_float(info.get(key)) qv = _safe_float(info.get(key))
if qv is not None and qv > 0: if qv is not None and qv > 0:
return qv return qv
for key in ("quoteVolume", "volCcy24h", "vol24h", "turnover24h", "amount24", "turnover"):
qv = _safe_float(info.get(key))
if qv is not None and qv > 0:
if key == "volCcy24h" and ex_id == "okx":
last = _safe_float(info.get("last") or info.get("lastPx") or t.get("last"))
if last:
return qv * last
return qv
bv = _safe_float(t.get("baseVolume")) bv = _safe_float(t.get("baseVolume"))
lp = _safe_float(t.get("last")) or _safe_float(t.get("close")) lp = _safe_float(t.get("last")) or _safe_float(t.get("close"))
if bv is not None and lp is not None and bv > 0 and lp > 0: if bv is not None and lp is not None and bv > 0 and lp > 0:
return bv * lp return bv * lp
if info: if info:
bv = _safe_float(info.get("volCcy24h") or info.get("vol24h")) bv = _safe_float(info.get("volCcy24h") or info.get("vol24h") or info.get("volume"))
lp = _safe_float(info.get("last") or info.get("lastPx")) lp = _safe_float(info.get("last") or info.get("lastPx") or info.get("markPrice"))
if bv is not None and lp is not None and bv > 0 and lp > 0: if bv is not None and lp is not None and bv > 0 and lp > 0:
return bv * lp return bv * lp
if market and lp:
bv = _safe_float(t.get("baseVolume"))
if bv is not None and bv > 0:
return bv * lp
return None return None
@@ -130,7 +179,7 @@ def _is_usdt_linear_swap(market: dict | None, symbol: str) -> bool:
if not market: if not market:
su = str(symbol or "").upper() su = str(symbol or "").upper()
return "USDT" in su and (":USDT" in su or "/USDT" in su or su.endswith("USDT")) return "USDT" in su and (":USDT" in su or "/USDT" in su or su.endswith("USDT"))
if not market.get("swap"): if not market.get("swap") and market.get("type") not in ("swap", "future"):
return False return False
if str(market.get("quote") or "").upper() != "USDT": if str(market.get("quote") or "").upper() != "USDT":
return False return False
@@ -138,46 +187,209 @@ def _is_usdt_linear_swap(market: dict | None, symbol: str) -> bool:
return False return False
if market.get("active") is False: if market.get("active") is False:
return False return False
settle = str(market.get("settle") or "").upper()
if settle and settle != "USDT":
return False
return True return True
def _lookup_ticker(tickers: dict, sym: str, market: dict | None) -> dict | None:
if not tickers:
return None
t = tickers.get(sym)
if t:
return t
if not market:
return None
base = market.get("base")
quote = market.get("quote") or "USDT"
settle = market.get("settle") or quote
candidates = [
sym,
f"{base}/{quote}:{settle}",
f"{base}/{quote}",
f"{base}{quote}",
market.get("id"),
]
for key in candidates:
if not key:
continue
t = tickers.get(key)
if t:
return t
return None
def _merge_scores(scored: dict[str, tuple[str, float]]) -> list[tuple[str, str, float]]:
rows = [(sym, base, vol) for base, (sym, vol) in scored.items() if sym and base and vol > 0]
rows.sort(key=lambda x: x[2], reverse=True)
return rows
def _scores_from_okx(exchange) -> list[tuple[str, str, float]]:
by_base: dict[str, tuple[str, float]] = {}
if hasattr(exchange, "publicGetMarketTickers"):
try:
resp = exchange.publicGetMarketTickers({"instType": "SWAP"})
for row in (resp or {}).get("data") or []:
if not isinstance(row, dict):
continue
inst = str(row.get("instId") or "").upper()
parts = inst.split("-")
if len(parts) < 3 or parts[-1] != "SWAP" or parts[1] != "USDT":
continue
base = parts[0].strip()
if not base:
continue
qv = _okx_turnover_usdt(row)
if qv is None or qv <= 0:
continue
sym = _hub_symbol_from_base(base)
prev = by_base.get(base)
if prev is None or qv > prev[1]:
by_base[base] = (sym, float(qv))
if by_base:
return _merge_scores(by_base)
except Exception:
pass
try:
tickers = exchange.fetch_tickers(params={"instType": "SWAP"})
except Exception:
tickers = exchange.fetch_tickers()
return _scores_from_markets(exchange, tickers or {}, "okx")
def _scores_from_binance(exchange) -> list[tuple[str, str, float]]:
by_base: dict[str, tuple[str, float]] = {}
if hasattr(exchange, "fapiPublicGetTicker24hr"):
try:
rows = exchange.fapiPublicGetTicker24hr()
if isinstance(rows, list):
for row in rows:
if not isinstance(row, dict):
continue
raw = str(row.get("symbol") or "").upper()
if not raw.endswith("USDT"):
continue
base = raw[:-4]
if not base:
continue
qv = _safe_float(row.get("quoteVolume"))
if qv is None or qv <= 0:
bv = _safe_float(row.get("volume"))
lp = _safe_float(row.get("lastPrice") or row.get("weightedAvgPrice"))
if bv and lp:
qv = bv * lp
if qv is None or qv <= 0:
continue
sym = _hub_symbol_from_base(base)
prev = by_base.get(base)
if prev is None or qv > prev[1]:
by_base[base] = (sym, float(qv))
if by_base:
return _merge_scores(by_base)
except Exception:
pass
tickers = exchange.fetch_tickers()
return _scores_from_markets(exchange, tickers or {}, "binance")
def _scores_from_gate(exchange) -> list[tuple[str, str, float]]:
by_base: dict[str, tuple[str, float]] = {}
for method_name in ("publicFuturesGetSettleTickers", "publicFuturesGetUsdtTickers"):
fn = getattr(exchange, method_name, None)
if not callable(fn):
continue
try:
rows = fn({"settle": "usdt"})
if isinstance(rows, list):
for row in rows:
if not isinstance(row, dict):
continue
contract = str(row.get("contract") or row.get("name") or "").upper()
if not contract:
continue
base = contract.replace("_USDT", "").replace("USDT", "").strip("_")
if not base:
continue
qv = _safe_float(row.get("volume_24h_quote") or row.get("volume_24h_settle"))
if qv is None or qv <= 0:
bv = _safe_float(row.get("volume_24h_base"))
lp = _safe_float(row.get("last") or row.get("mark_price"))
if bv and lp:
qv = bv * lp
if qv is None or qv <= 0:
continue
sym = _hub_symbol_from_base(base)
prev = by_base.get(base)
if prev is None or qv > prev[1]:
by_base[base] = (sym, float(qv))
if by_base:
return _merge_scores(by_base)
except Exception:
continue
tickers = exchange.fetch_tickers()
return _scores_from_markets(exchange, tickers or {}, "gateio")
def _scores_from_markets(
exchange,
tickers: dict,
exchange_id: str,
) -> list[tuple[str, str, float]]:
by_base: dict[str, tuple[str, float]] = {}
markets = getattr(exchange, "markets", None) or {}
for sym, mk in markets.items():
try:
if not _is_usdt_linear_swap(mk, sym):
continue
ticker = _lookup_ticker(tickers, sym, mk)
qv = _quote_volume_from_ticker(ticker, mk, exchange_id=exchange_id)
if qv is None or qv <= 0:
continue
hub_sym = _hub_symbol_from_market(mk, sym)
base = _ticker_base(hub_sym)
if not base:
continue
prev = by_base.get(base)
if prev is None or qv > prev[1]:
by_base[base] = (hub_sym, float(qv))
except Exception:
continue
return _merge_scores(by_base)
def _collect_scores(exchange, exchange_id: str) -> list[tuple[str, str, float]]:
ex_id = str(exchange_id or "").lower()
if ex_id == "okx":
return _scores_from_okx(exchange)
if ex_id == "binance":
return _scores_from_binance(exchange)
if ex_id in ("gateio", "gate", "gate_bot"):
return _scores_from_gate(exchange)
tickers = exchange.fetch_tickers()
return _scores_from_markets(exchange, tickers or {}, ex_id)
def fetch_usdt_swap_volume_rank( def fetch_usdt_swap_volume_rank(
exchange, exchange,
ensure_markets_loaded: Callable[[], None], ensure_markets_loaded: Callable[[], None],
*, *,
top_n: int = TOP_N_DEFAULT, top_n: int = TOP_N_DEFAULT,
rank_date: str | None = None, rank_date: str | None = None,
exchange_id: str | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""从 ccxt 拉全市场 USDT 永续 ticker,按 24h 成交额(USDT) 取 Top N。""" """从 ccxt 拉全市场 USDT 永续 ticker,按 24h 成交额(USDT) 取 Top N。"""
top_n = max(1, min(int(top_n or TOP_N_DEFAULT), 100)) top_n = max(1, min(int(top_n or TOP_N_DEFAULT), 100))
ensure_markets_loaded() ensure_markets_loaded()
scored: list[tuple[str, str, float]] = [] ex_id = str(exchange_id or getattr(exchange, "id", "") or "").lower()
seen_bases: set[str] = set()
try: try:
tickers = exchange.fetch_tickers() scored = _collect_scores(exchange, ex_id)
except Exception as e: except Exception as e:
return {"ok": False, "msg": str(e)} return {"ok": False, "msg": str(e)}
markets = getattr(exchange, "markets", None) or {}
for sym, ticker in (tickers or {}).items():
try:
mk = markets.get(sym) if markets else None
if not _is_usdt_linear_swap(mk, sym):
continue
qv = _quote_volume_from_ticker(ticker, mk)
if qv is None or qv <= 0:
continue
hub_sym = _hub_symbol_from_market(mk, sym)
base = _ticker_base(hub_sym)
if not base or base in seen_bases:
continue
seen_bases.add(base)
scored.append((hub_sym, base, float(qv)))
except Exception:
continue
scored.sort(key=lambda x: x[2], reverse=True)
items = [] items = []
for idx, (hub_sym, base, qv) in enumerate(scored[:top_n], 1): for idx, (hub_sym, base, qv) in enumerate(scored[:top_n], 1):
items.append( items.append(
@@ -193,6 +405,7 @@ def fetch_usdt_swap_volume_rank(
"rank_date": rank_date or rank_date_label(), "rank_date": rank_date or rank_date_label(),
"items": items, "items": items,
"total_symbols": len(scored), "total_symbols": len(scored),
"exchange_id": ex_id,
"fetched_at": datetime.now(volume_rank_timezone()).isoformat(timespec="seconds"), "fetched_at": datetime.now(volume_rank_timezone()).isoformat(timespec="seconds"),
} }
@@ -218,6 +431,8 @@ def load_volume_rank_cache(path: Path | None = None) -> dict[str, Any]:
data = json.loads(p.read_text(encoding="utf-8")) data = json.loads(p.read_text(encoding="utf-8"))
if not isinstance(data, dict): if not isinstance(data, dict):
return {"version": CACHE_VERSION, "exchanges": {}} return {"version": CACHE_VERSION, "exchanges": {}}
if int(data.get("version") or 0) < CACHE_VERSION:
return {"version": CACHE_VERSION, "exchanges": {}}
data.setdefault("version", CACHE_VERSION) data.setdefault("version", CACHE_VERSION)
data.setdefault("exchanges", {}) data.setdefault("exchanges", {})
return data return data
@@ -258,10 +473,15 @@ def merge_exchange_rank(
def cache_needs_refresh(cache: dict[str, Any], *, expected_rank_date: str | None = None) -> bool: def cache_needs_refresh(cache: dict[str, Any], *, expected_rank_date: str | None = None) -> bool:
expected = expected_rank_date or rank_date_label() expected = expected_rank_date or rank_date_label()
if int(cache.get("version") or 0) < CACHE_VERSION:
return True
if not cache.get("exchanges"): if not cache.get("exchanges"):
return True return True
if str(cache.get("rank_date") or "") != expected: if str(cache.get("rank_date") or "") != expected:
return True return True
for _key, row in (cache.get("exchanges") or {}).items():
if not row or not row.get("items"):
return True
return False return False
+5 -1
View File
@@ -381,14 +381,18 @@ def _refresh_volume_ranks(*, force: bool = False) -> dict:
if not ex_key or not ex.get("enabled"): if not ex_key or not ex.get("enabled"):
continue continue
resp = _fetch_instance_volume_rank_sync(ex, top_n=TOP_N_DEFAULT) resp = _fetch_instance_volume_rank_sync(ex, top_n=TOP_N_DEFAULT)
if resp.get("ok"): if resp.get("ok") and resp.get("items"):
cache = merge_exchange_rank(cache, ex_key, resp) cache = merge_exchange_rank(cache, ex_key, resp)
else: else:
msg = str(resp.get("msg") or resp.get("error") or "拉取失败") msg = str(resp.get("msg") or resp.get("error") or "拉取失败")
if resp.get("ok") and not resp.get("items"):
msg = msg if msg != "拉取失败" else "无有效成交额数据"
errors.append(f"{ex_key}:{msg}") errors.append(f"{ex_key}:{msg}")
exchanges = dict(cache.get("exchanges") or {}) exchanges = dict(cache.get("exchanges") or {})
prev = dict(exchanges.get(ex_key) or {}) prev = dict(exchanges.get(ex_key) or {})
prev["error"] = msg prev["error"] = msg
if not prev.get("items"):
prev["items"] = []
exchanges[ex_key] = prev exchanges[ex_key] = prev
cache["exchanges"] = exchanges cache["exchanges"] = exchanges
cache["rank_date"] = expected cache["rank_date"] = expected
+11 -2
View File
@@ -2621,14 +2621,23 @@
elVolRankList.innerHTML = ""; elVolRankList.innerHTML = "";
if (!data || !data.ok || !data.items || !data.items.length) { if (!data || !data.ok || !data.items || !data.items.length) {
elVolRankMeta.textContent = elVolRankMeta.textContent =
(data && data.msg) || "暂无排名数据(请稍后或检查实例 /api/hub/volume-rank"; (data && data.msg) ||
"暂无排名数据(请 pm2 restart 四实例与 manual-trading-hub 后重试)";
return; return;
} }
const resetHour = data.reset_hour != null ? data.reset_hour : 8; const resetHour = data.reset_hour != null ? data.reset_hour : 8;
const rankDate = data.rank_date || "—"; const rankDate = data.rank_date || "—";
const updated = data.updated_at || "—"; const updated = data.updated_at || "—";
const total = data.total_symbols != null ? data.total_symbols : "";
elVolRankMeta.textContent = elVolRankMeta.textContent =
"昨日成交 Top20 · 交易日 " + rankDate + " · 每早 " + resetHour + ":00 更新 · " + updated; "昨日成交 Top20 · 交易日 " +
rankDate +
" · 每早 " +
resetHour +
":00 更新" +
(total ? " · 全市场 " + total + " 个" : "") +
" · " +
updated;
const curSym = (elSymbol && elSymbol.value.trim().toUpperCase()) || ""; const curSym = (elSymbol && elSymbol.value.trim().toUpperCase()) || "";
data.items.forEach(function (row) { data.items.forEach(function (row) {
const li = document.createElement("li"); const li = document.createElement("li");
+12
View File
@@ -1,6 +1,8 @@
from datetime import datetime from datetime import datetime
from hub_volume_rank_lib import ( from hub_volume_rank_lib import (
CACHE_VERSION,
_okx_turnover_usdt,
cache_needs_refresh, cache_needs_refresh,
format_volume_quote, format_volume_quote,
merge_exchange_rank, merge_exchange_rank,
@@ -26,6 +28,11 @@ def test_format_volume_quote():
assert format_volume_quote(4500) == "4.50K" assert format_volume_quote(4500) == "4.50K"
def test_okx_turnover_usdt():
qv = _okx_turnover_usdt({"volCcy24h": "100", "last": "50"})
assert qv == 5000.0
def test_cache_needs_refresh_and_merge(): def test_cache_needs_refresh_and_merge():
cache = {"rank_date": "2026-06-05", "exchanges": {}} cache = {"rank_date": "2026-06-05", "exchanges": {}}
assert cache_needs_refresh(cache, expected_rank_date="2026-06-07") is True assert cache_needs_refresh(cache, expected_rank_date="2026-06-07") is True
@@ -41,3 +48,8 @@ def test_cache_needs_refresh_and_merge():
) )
assert merged["exchanges"]["binance"]["items"][0]["symbol"] == "BTC/USDT" assert merged["exchanges"]["binance"]["items"][0]["symbol"] == "BTC/USDT"
assert merged["rank_date"] == "2026-06-07" assert merged["rank_date"] == "2026-06-07"
def test_stale_cache_version_forces_refresh():
cache = {"version": CACHE_VERSION - 1, "rank_date": "2026-06-07", "exchanges": {"okx": {"items": [{}]}}}
assert cache_needs_refresh(cache) is True