From 89a58c73232592a751982a6a2ad32ea34e58462e Mon Sep 17 00:00:00 2001 From: dekun Date: Mon, 8 Jun 2026 15:53:41 +0800 Subject: [PATCH] fix: exchange-specific volume rank APIs for OKX and full top20 Co-authored-by: Cursor --- crypto_monitor_binance/app.py | 1 + crypto_monitor_gate/app.py | 1 + crypto_monitor_gate_bot/app.py | 1 + crypto_monitor_okx/app.py | 1 + hub_volume_rank_lib.py | 288 +++++++++++++++++++++++++---- manual_trading_hub/hub.py | 6 +- manual_trading_hub/static/chart.js | 13 +- tests/test_hub_volume_rank_lib.py | 12 ++ 8 files changed, 286 insertions(+), 37 deletions(-) diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 32d454d..f2a77c0 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -8181,6 +8181,7 @@ def _hub_fetch_volume_rank(top_n=20): exchange=exchange, ensure_markets_loaded=ensure_markets_loaded, top_n=top_n, + exchange_id="binance", ) diff --git a/crypto_monitor_gate/app.py b/crypto_monitor_gate/app.py index c0ef891..874feea 100644 --- a/crypto_monitor_gate/app.py +++ b/crypto_monitor_gate/app.py @@ -8241,6 +8241,7 @@ def _hub_fetch_volume_rank(top_n=20): exchange=exchange, ensure_markets_loaded=ensure_markets_loaded, top_n=top_n, + exchange_id="gateio", ) diff --git a/crypto_monitor_gate_bot/app.py b/crypto_monitor_gate_bot/app.py index 323a2c8..5580a27 100644 --- a/crypto_monitor_gate_bot/app.py +++ b/crypto_monitor_gate_bot/app.py @@ -7937,6 +7937,7 @@ def _hub_fetch_volume_rank(top_n=20): exchange=exchange, ensure_markets_loaded=ensure_markets_loaded, top_n=top_n, + exchange_id="gateio", ) diff --git a/crypto_monitor_okx/app.py b/crypto_monitor_okx/app.py index 9aba218..e792e4e 100644 --- a/crypto_monitor_okx/app.py +++ b/crypto_monitor_okx/app.py @@ -7864,6 +7864,7 @@ def _hub_fetch_volume_rank(top_n=20): exchange=exchange, ensure_markets_loaded=ensure_markets_loaded, top_n=top_n, + exchange_id="okx", ) diff --git a/hub_volume_rank_lib.py b/hub_volume_rank_lib.py index a05a1d1..19500ea 100644 --- a/hub_volume_rank_lib.py +++ b/hub_volume_rank_lib.py @@ -4,7 +4,6 @@ from __future__ import annotations import json import os -import time from datetime import datetime, timedelta from pathlib import Path from typing import Any, Callable @@ -13,7 +12,7 @@ from zoneinfo import ZoneInfo from hub_trades_lib import trading_day_from_dt TOP_N_DEFAULT = 20 -CACHE_VERSION = 1 +CACHE_VERSION = 2 def volume_rank_reset_hour() -> int: @@ -85,6 +84,12 @@ def _ticker_base(sym_text: str) -> str: return s +def _hub_symbol_from_base(base: str, quote: str = "USDT") -> str: + b = str(base or "").strip().upper() + q = str(quote or "USDT").strip().upper() + return f"{b}/{q}" if b else "" + + def _hub_symbol_from_market(market: dict | None, fallback_symbol: str) -> str: if market: base = str(market.get("base") or "").strip().upper() @@ -100,29 +105,73 @@ def _hub_symbol_from_market(market: dict | None, fallback_symbol: str) -> str: return f"{base}/USDT" if base else fb -def _quote_volume_from_ticker(ticker: dict | None, market: dict | None) -> float | None: +def _okx_turnover_usdt(row: dict | None) -> float | None: + """OKX SWAP:成交额(USDT) ≈ volCcy24h(基础币) × last。""" + if not isinstance(row, dict): + return None + base_vol = _safe_float(row.get("volCcy24h")) + if base_vol is None or base_vol <= 0: + return None + last = _safe_float(row.get("last") or row.get("lastPx")) + if last is None or last <= 0: + return None + return float(base_vol * last) + + +def _quote_volume_from_ticker( + ticker: dict | None, + market: dict | None, + *, + exchange_id: str = "", +) -> float | None: + ex_id = str(exchange_id or "").lower() t = ticker or {} + info = t.get("info") if isinstance(t.get("info"), dict) else {} + + if ex_id == "okx": + row = dict(info) + if row.get("last") is None: + row["last"] = t.get("last") + qv = _okx_turnover_usdt(row) + if qv is not None and qv > 0: + return qv + qv = _safe_float(t.get("quoteVolume")) if qv is not None and qv > 0: return qv - info = t.get("info") if isinstance(t.get("info"), dict) else {} - for key in ("quoteVolume", "volCcy24h", "vol24h", "turnover24h", "amount24"): + + if ex_id in ("gateio", "gate"): + for key in ( + "volume_24h_quote", + "volume_24h_settle", + "quote_volume", + "vol_24h", + "turnover", + ): + qv = _safe_float(info.get(key)) + if qv is not None and qv > 0: + return qv + + for key in ("quoteVolume", "volCcy24h", "vol24h", "turnover24h", "amount24", "turnover"): qv = _safe_float(info.get(key)) if qv is not None and qv > 0: + if key == "volCcy24h" and ex_id == "okx": + last = _safe_float(info.get("last") or info.get("lastPx") or t.get("last")) + if last: + return qv * last return qv + bv = _safe_float(t.get("baseVolume")) lp = _safe_float(t.get("last")) or _safe_float(t.get("close")) if bv is not None and lp is not None and bv > 0 and lp > 0: return bv * lp + if info: - bv = _safe_float(info.get("volCcy24h") or info.get("vol24h")) - lp = _safe_float(info.get("last") or info.get("lastPx")) + bv = _safe_float(info.get("volCcy24h") or info.get("vol24h") or info.get("volume")) + lp = _safe_float(info.get("last") or info.get("lastPx") or info.get("markPrice")) if bv is not None and lp is not None and bv > 0 and lp > 0: return bv * lp - if market and lp: - bv = _safe_float(t.get("baseVolume")) - if bv is not None and bv > 0: - return bv * lp + return None @@ -130,7 +179,7 @@ def _is_usdt_linear_swap(market: dict | None, symbol: str) -> bool: if not market: su = str(symbol or "").upper() return "USDT" in su and (":USDT" in su or "/USDT" in su or su.endswith("USDT")) - if not market.get("swap"): + if not market.get("swap") and market.get("type") not in ("swap", "future"): return False if str(market.get("quote") or "").upper() != "USDT": return False @@ -138,46 +187,209 @@ def _is_usdt_linear_swap(market: dict | None, symbol: str) -> bool: return False if market.get("active") is False: return False + settle = str(market.get("settle") or "").upper() + if settle and settle != "USDT": + return False return True +def _lookup_ticker(tickers: dict, sym: str, market: dict | None) -> dict | None: + if not tickers: + return None + t = tickers.get(sym) + if t: + return t + if not market: + return None + base = market.get("base") + quote = market.get("quote") or "USDT" + settle = market.get("settle") or quote + candidates = [ + sym, + f"{base}/{quote}:{settle}", + f"{base}/{quote}", + f"{base}{quote}", + market.get("id"), + ] + for key in candidates: + if not key: + continue + t = tickers.get(key) + if t: + return t + return None + + +def _merge_scores(scored: dict[str, tuple[str, float]]) -> list[tuple[str, str, float]]: + rows = [(sym, base, vol) for base, (sym, vol) in scored.items() if sym and base and vol > 0] + rows.sort(key=lambda x: x[2], reverse=True) + return rows + + +def _scores_from_okx(exchange) -> list[tuple[str, str, float]]: + by_base: dict[str, tuple[str, float]] = {} + if hasattr(exchange, "publicGetMarketTickers"): + try: + resp = exchange.publicGetMarketTickers({"instType": "SWAP"}) + for row in (resp or {}).get("data") or []: + if not isinstance(row, dict): + continue + inst = str(row.get("instId") or "").upper() + parts = inst.split("-") + if len(parts) < 3 or parts[-1] != "SWAP" or parts[1] != "USDT": + continue + base = parts[0].strip() + if not base: + continue + qv = _okx_turnover_usdt(row) + if qv is None or qv <= 0: + continue + sym = _hub_symbol_from_base(base) + prev = by_base.get(base) + if prev is None or qv > prev[1]: + by_base[base] = (sym, float(qv)) + if by_base: + return _merge_scores(by_base) + except Exception: + pass + + try: + tickers = exchange.fetch_tickers(params={"instType": "SWAP"}) + except Exception: + tickers = exchange.fetch_tickers() + return _scores_from_markets(exchange, tickers or {}, "okx") + + +def _scores_from_binance(exchange) -> list[tuple[str, str, float]]: + by_base: dict[str, tuple[str, float]] = {} + if hasattr(exchange, "fapiPublicGetTicker24hr"): + try: + rows = exchange.fapiPublicGetTicker24hr() + if isinstance(rows, list): + for row in rows: + if not isinstance(row, dict): + continue + raw = str(row.get("symbol") or "").upper() + if not raw.endswith("USDT"): + continue + base = raw[:-4] + if not base: + continue + qv = _safe_float(row.get("quoteVolume")) + if qv is None or qv <= 0: + bv = _safe_float(row.get("volume")) + lp = _safe_float(row.get("lastPrice") or row.get("weightedAvgPrice")) + if bv and lp: + qv = bv * lp + if qv is None or qv <= 0: + continue + sym = _hub_symbol_from_base(base) + prev = by_base.get(base) + if prev is None or qv > prev[1]: + by_base[base] = (sym, float(qv)) + if by_base: + return _merge_scores(by_base) + except Exception: + pass + tickers = exchange.fetch_tickers() + return _scores_from_markets(exchange, tickers or {}, "binance") + + +def _scores_from_gate(exchange) -> list[tuple[str, str, float]]: + by_base: dict[str, tuple[str, float]] = {} + for method_name in ("publicFuturesGetSettleTickers", "publicFuturesGetUsdtTickers"): + fn = getattr(exchange, method_name, None) + if not callable(fn): + continue + try: + rows = fn({"settle": "usdt"}) + if isinstance(rows, list): + for row in rows: + if not isinstance(row, dict): + continue + contract = str(row.get("contract") or row.get("name") or "").upper() + if not contract: + continue + base = contract.replace("_USDT", "").replace("USDT", "").strip("_") + if not base: + continue + qv = _safe_float(row.get("volume_24h_quote") or row.get("volume_24h_settle")) + if qv is None or qv <= 0: + bv = _safe_float(row.get("volume_24h_base")) + lp = _safe_float(row.get("last") or row.get("mark_price")) + if bv and lp: + qv = bv * lp + if qv is None or qv <= 0: + continue + sym = _hub_symbol_from_base(base) + prev = by_base.get(base) + if prev is None or qv > prev[1]: + by_base[base] = (sym, float(qv)) + if by_base: + return _merge_scores(by_base) + except Exception: + continue + tickers = exchange.fetch_tickers() + return _scores_from_markets(exchange, tickers or {}, "gateio") + + +def _scores_from_markets( + exchange, + tickers: dict, + exchange_id: str, +) -> list[tuple[str, str, float]]: + by_base: dict[str, tuple[str, float]] = {} + markets = getattr(exchange, "markets", None) or {} + for sym, mk in markets.items(): + try: + if not _is_usdt_linear_swap(mk, sym): + continue + ticker = _lookup_ticker(tickers, sym, mk) + qv = _quote_volume_from_ticker(ticker, mk, exchange_id=exchange_id) + if qv is None or qv <= 0: + continue + hub_sym = _hub_symbol_from_market(mk, sym) + base = _ticker_base(hub_sym) + if not base: + continue + prev = by_base.get(base) + if prev is None or qv > prev[1]: + by_base[base] = (hub_sym, float(qv)) + except Exception: + continue + return _merge_scores(by_base) + + +def _collect_scores(exchange, exchange_id: str) -> list[tuple[str, str, float]]: + ex_id = str(exchange_id or "").lower() + if ex_id == "okx": + return _scores_from_okx(exchange) + if ex_id == "binance": + return _scores_from_binance(exchange) + if ex_id in ("gateio", "gate", "gate_bot"): + return _scores_from_gate(exchange) + tickers = exchange.fetch_tickers() + return _scores_from_markets(exchange, tickers or {}, ex_id) + + def fetch_usdt_swap_volume_rank( exchange, ensure_markets_loaded: Callable[[], None], *, top_n: int = TOP_N_DEFAULT, rank_date: str | None = None, + exchange_id: str | None = None, ) -> dict[str, Any]: """从 ccxt 拉全市场 USDT 永续 ticker,按 24h 成交额(USDT) 取 Top N。""" top_n = max(1, min(int(top_n or TOP_N_DEFAULT), 100)) ensure_markets_loaded() - scored: list[tuple[str, str, float]] = [] - seen_bases: set[str] = set() + ex_id = str(exchange_id or getattr(exchange, "id", "") or "").lower() try: - tickers = exchange.fetch_tickers() + scored = _collect_scores(exchange, ex_id) except Exception as e: return {"ok": False, "msg": str(e)} - markets = getattr(exchange, "markets", None) or {} - for sym, ticker in (tickers or {}).items(): - try: - mk = markets.get(sym) if markets else None - if not _is_usdt_linear_swap(mk, sym): - continue - qv = _quote_volume_from_ticker(ticker, mk) - if qv is None or qv <= 0: - continue - hub_sym = _hub_symbol_from_market(mk, sym) - base = _ticker_base(hub_sym) - if not base or base in seen_bases: - continue - seen_bases.add(base) - scored.append((hub_sym, base, float(qv))) - except Exception: - continue - - scored.sort(key=lambda x: x[2], reverse=True) items = [] for idx, (hub_sym, base, qv) in enumerate(scored[:top_n], 1): items.append( @@ -193,6 +405,7 @@ def fetch_usdt_swap_volume_rank( "rank_date": rank_date or rank_date_label(), "items": items, "total_symbols": len(scored), + "exchange_id": ex_id, "fetched_at": datetime.now(volume_rank_timezone()).isoformat(timespec="seconds"), } @@ -218,6 +431,8 @@ def load_volume_rank_cache(path: Path | None = None) -> dict[str, Any]: data = json.loads(p.read_text(encoding="utf-8")) if not isinstance(data, dict): return {"version": CACHE_VERSION, "exchanges": {}} + if int(data.get("version") or 0) < CACHE_VERSION: + return {"version": CACHE_VERSION, "exchanges": {}} data.setdefault("version", CACHE_VERSION) data.setdefault("exchanges", {}) return data @@ -258,10 +473,15 @@ def merge_exchange_rank( def cache_needs_refresh(cache: dict[str, Any], *, expected_rank_date: str | None = None) -> bool: expected = expected_rank_date or rank_date_label() + if int(cache.get("version") or 0) < CACHE_VERSION: + return True if not cache.get("exchanges"): return True if str(cache.get("rank_date") or "") != expected: return True + for _key, row in (cache.get("exchanges") or {}).items(): + if not row or not row.get("items"): + return True return False diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index 32d34d2..3566c26 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -381,14 +381,18 @@ def _refresh_volume_ranks(*, force: bool = False) -> dict: if not ex_key or not ex.get("enabled"): continue resp = _fetch_instance_volume_rank_sync(ex, top_n=TOP_N_DEFAULT) - if resp.get("ok"): + if resp.get("ok") and resp.get("items"): cache = merge_exchange_rank(cache, ex_key, resp) else: msg = str(resp.get("msg") or resp.get("error") or "拉取失败") + if resp.get("ok") and not resp.get("items"): + msg = msg if msg != "拉取失败" else "无有效成交额数据" errors.append(f"{ex_key}:{msg}") exchanges = dict(cache.get("exchanges") or {}) prev = dict(exchanges.get(ex_key) or {}) prev["error"] = msg + if not prev.get("items"): + prev["items"] = [] exchanges[ex_key] = prev cache["exchanges"] = exchanges cache["rank_date"] = expected diff --git a/manual_trading_hub/static/chart.js b/manual_trading_hub/static/chart.js index 98185ff..8131cb5 100644 --- a/manual_trading_hub/static/chart.js +++ b/manual_trading_hub/static/chart.js @@ -2621,14 +2621,23 @@ elVolRankList.innerHTML = ""; if (!data || !data.ok || !data.items || !data.items.length) { elVolRankMeta.textContent = - (data && data.msg) || "暂无排名数据(请稍后或检查实例 /api/hub/volume-rank)"; + (data && data.msg) || + "暂无排名数据(请 pm2 restart 四实例与 manual-trading-hub 后重试)"; return; } const resetHour = data.reset_hour != null ? data.reset_hour : 8; const rankDate = data.rank_date || "—"; const updated = data.updated_at || "—"; + const total = data.total_symbols != null ? data.total_symbols : ""; elVolRankMeta.textContent = - "昨日成交 Top20 · 交易日 " + rankDate + " · 每早 " + resetHour + ":00 更新 · " + updated; + "昨日成交 Top20 · 交易日 " + + rankDate + + " · 每早 " + + resetHour + + ":00 更新" + + (total ? " · 全市场 " + total + " 个" : "") + + " · " + + updated; const curSym = (elSymbol && elSymbol.value.trim().toUpperCase()) || ""; data.items.forEach(function (row) { const li = document.createElement("li"); diff --git a/tests/test_hub_volume_rank_lib.py b/tests/test_hub_volume_rank_lib.py index e11e622..8272352 100644 --- a/tests/test_hub_volume_rank_lib.py +++ b/tests/test_hub_volume_rank_lib.py @@ -1,6 +1,8 @@ from datetime import datetime from hub_volume_rank_lib import ( + CACHE_VERSION, + _okx_turnover_usdt, cache_needs_refresh, format_volume_quote, merge_exchange_rank, @@ -26,6 +28,11 @@ def test_format_volume_quote(): assert format_volume_quote(4500) == "4.50K" +def test_okx_turnover_usdt(): + qv = _okx_turnover_usdt({"volCcy24h": "100", "last": "50"}) + assert qv == 5000.0 + + def test_cache_needs_refresh_and_merge(): cache = {"rank_date": "2026-06-05", "exchanges": {}} assert cache_needs_refresh(cache, expected_rank_date="2026-06-07") is True @@ -41,3 +48,8 @@ def test_cache_needs_refresh_and_merge(): ) assert merged["exchanges"]["binance"]["items"][0]["symbol"] == "BTC/USDT" assert merged["rank_date"] == "2026-06-07" + + +def test_stale_cache_version_forces_refresh(): + cache = {"version": CACHE_VERSION - 1, "rank_date": "2026-06-07", "exchanges": {"okx": {"items": [{}]}}} + assert cache_needs_refresh(cache) is True