diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 6124380..da8c002 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -179,6 +179,7 @@ from order_monitor_display_lib import ( ) from wechat_notify_lib import build_wechat_rs_level_message, send_wechat_webhook from hub_auth import request_allowed as hub_request_allowed +from hub_volume_rank_lib import resolve_daily_volume_rank from history_window_lib import ( PRESET_CUSTOM, PRESET_UTC_LAST24H, @@ -416,6 +417,7 @@ ACCOUNT_BALANCE_CACHE = { } LIQUIDITY_RANK_CACHE = { "updated_at": 0.0, + "version": 0, "ranks": {}, "total": 0, } @@ -4552,67 +4554,19 @@ def _status_by_ema55(symbol, timeframe): def _daily_volume_rank(symbol): """ - 返回(symbol_rank, total_count),按 quoteVolume 降序,缺失时 fallback 到 baseVolume*last。 + 返回(symbol_rank, total_count),按 USDT 永续 24h 成交额降序。 + 走 hub_volume_rank_lib 轻量 ticker API,避免 fetch_tickers() 全市场拉取。 """ sym_norm = normalize_symbol_input(symbol) target_base = journal_coin_from_symbol(sym_norm) - - def _ticker_base(sym_text): - s = str(sym_text or "").upper().strip() - if ":" in s: - s = s.split(":", 1)[0] - if "/" in s: - return s.split("/", 1)[0].strip() - if "-" in s: - return s.split("-", 1)[0].strip() - if s.endswith("USDT"): - return s[:-4].strip() - return s - now_ts = time.time() - cached_ok = ( - LIQUIDITY_RANK_CACHE["updated_at"] - and now_ts - float(LIQUIDITY_RANK_CACHE["updated_at"]) < max(30, BALANCE_REFRESH_SECONDS) + return resolve_daily_volume_rank( + target_base, + LIQUIDITY_RANK_CACHE, + now_ts=time.time(), + ttl_sec=max(30, BALANCE_REFRESH_SECONDS), + exchange=exchange, + ensure_markets_loaded=ensure_markets_loaded, ) - if not cached_ok: - try: - ensure_markets_loaded() - tickers = exchange.fetch_tickers() - scored = [] - for s, t in (tickers or {}).items(): - try: - mk = exchange.markets.get(s) - if not mk or not mk.get("swap"): - continue - su = str(s).upper() - if "USDT" not in su: - continue - qv = _safe_float((t or {}).get("quoteVolume")) - if qv is None: - info = (t or {}).get("info") if isinstance((t or {}).get("info"), dict) else {} - qv = _safe_float(info.get("volCcy24h") or info.get("vol24h")) - if qv is None: - bv = _safe_float((t or {}).get("baseVolume")) - lp = _safe_float((t or {}).get("last")) - if bv is not None and lp is not None: - qv = bv * lp - if qv is None or qv <= 0: - continue - scored.append((_ticker_base(s), float(qv))) - except Exception: - continue - scored.sort(key=lambda x: x[1], reverse=True) - ranks = {} - for idx, (base, _) in enumerate(scored, 1): - if base and base not in ranks: - ranks[base] = idx - LIQUIDITY_RANK_CACHE["ranks"] = ranks - LIQUIDITY_RANK_CACHE["total"] = len(scored) - LIQUIDITY_RANK_CACHE["updated_at"] = now_ts - except Exception: - pass - ranks = LIQUIDITY_RANK_CACHE.get("ranks") or {} - total = int(LIQUIDITY_RANK_CACHE.get("total") or 0) - return ranks.get(target_base), total def _key_hard_checks(symbol, direction, upper, lower, monitor_type): diff --git a/crypto_monitor_gate/app.py b/crypto_monitor_gate/app.py index 21ec2d8..f4763e4 100644 --- a/crypto_monitor_gate/app.py +++ b/crypto_monitor_gate/app.py @@ -178,6 +178,7 @@ from order_monitor_display_lib import ( ) from wechat_notify_lib import build_wechat_rs_level_message, send_wechat_webhook from hub_auth import request_allowed as hub_request_allowed +from hub_volume_rank_lib import resolve_daily_volume_rank from history_window_lib import ( PRESET_CUSTOM, PRESET_UTC_LAST24H, @@ -402,6 +403,7 @@ ACCOUNT_BALANCE_CACHE = { } LIQUIDITY_RANK_CACHE = { "updated_at": 0.0, + "version": 0, "ranks": {}, "total": 0, } @@ -4284,67 +4286,19 @@ def _status_by_ema55(symbol, timeframe): def _daily_volume_rank(symbol): """ - 返回(symbol_rank, total_count),按 quoteVolume 降序,缺失时 fallback 到 baseVolume*last。 + 返回(symbol_rank, total_count),按 USDT 永续 24h 成交额降序。 + 走 hub_volume_rank_lib 轻量 ticker API,避免 fetch_tickers() 全市场拉取。 """ sym_norm = normalize_symbol_input(symbol) target_base = journal_coin_from_symbol(sym_norm) - - def _ticker_base(sym_text): - s = str(sym_text or "").upper().strip() - if ":" in s: - s = s.split(":", 1)[0] - if "/" in s: - return s.split("/", 1)[0].strip() - if "-" in s: - return s.split("-", 1)[0].strip() - if s.endswith("USDT"): - return s[:-4].strip() - return s - now_ts = time.time() - cached_ok = ( - LIQUIDITY_RANK_CACHE["updated_at"] - and now_ts - float(LIQUIDITY_RANK_CACHE["updated_at"]) < max(30, BALANCE_REFRESH_SECONDS) + return resolve_daily_volume_rank( + target_base, + LIQUIDITY_RANK_CACHE, + now_ts=time.time(), + ttl_sec=max(30, BALANCE_REFRESH_SECONDS), + exchange=exchange, + ensure_markets_loaded=ensure_markets_loaded, ) - if not cached_ok: - try: - ensure_markets_loaded() - tickers = exchange.fetch_tickers() - scored = [] - for s, t in (tickers or {}).items(): - try: - mk = exchange.markets.get(s) - if not mk or not mk.get("swap"): - continue - su = str(s).upper() - if "USDT" not in su: - continue - qv = _safe_float((t or {}).get("quoteVolume")) - if qv is None: - info = (t or {}).get("info") if isinstance((t or {}).get("info"), dict) else {} - qv = _safe_float(info.get("volCcy24h") or info.get("vol24h")) - if qv is None: - bv = _safe_float((t or {}).get("baseVolume")) - lp = _safe_float((t or {}).get("last")) - if bv is not None and lp is not None: - qv = bv * lp - if qv is None or qv <= 0: - continue - scored.append((_ticker_base(s), float(qv))) - except Exception: - continue - scored.sort(key=lambda x: x[1], reverse=True) - ranks = {} - for idx, (base, _) in enumerate(scored, 1): - if base and base not in ranks: - ranks[base] = idx - LIQUIDITY_RANK_CACHE["ranks"] = ranks - LIQUIDITY_RANK_CACHE["total"] = len(scored) - LIQUIDITY_RANK_CACHE["updated_at"] = now_ts - except Exception: - pass - ranks = LIQUIDITY_RANK_CACHE.get("ranks") or {} - total = int(LIQUIDITY_RANK_CACHE.get("total") or 0) - return ranks.get(target_base), total def _key_hard_checks(symbol, direction, upper, lower, monitor_type): diff --git a/crypto_monitor_gate_bot/app.py b/crypto_monitor_gate_bot/app.py index e479c64..6162fa5 100644 --- a/crypto_monitor_gate_bot/app.py +++ b/crypto_monitor_gate_bot/app.py @@ -178,6 +178,7 @@ from order_monitor_display_lib import ( ) from wechat_notify_lib import build_wechat_rs_level_message, send_wechat_webhook from hub_auth import request_allowed as hub_request_allowed +from hub_volume_rank_lib import resolve_daily_volume_rank from history_window_lib import ( PRESET_CUSTOM, PRESET_UTC_LAST24H, @@ -402,6 +403,7 @@ ACCOUNT_BALANCE_CACHE = { } LIQUIDITY_RANK_CACHE = { "updated_at": 0.0, + "version": 0, "ranks": {}, "total": 0, } @@ -4284,67 +4286,19 @@ def _status_by_ema55(symbol, timeframe): def _daily_volume_rank(symbol): """ - 返回(symbol_rank, total_count),按 quoteVolume 降序,缺失时 fallback 到 baseVolume*last。 + 返回(symbol_rank, total_count),按 USDT 永续 24h 成交额降序。 + 走 hub_volume_rank_lib 轻量 ticker API,避免 fetch_tickers() 全市场拉取。 """ sym_norm = normalize_symbol_input(symbol) target_base = journal_coin_from_symbol(sym_norm) - - def _ticker_base(sym_text): - s = str(sym_text or "").upper().strip() - if ":" in s: - s = s.split(":", 1)[0] - if "/" in s: - return s.split("/", 1)[0].strip() - if "-" in s: - return s.split("-", 1)[0].strip() - if s.endswith("USDT"): - return s[:-4].strip() - return s - now_ts = time.time() - cached_ok = ( - LIQUIDITY_RANK_CACHE["updated_at"] - and now_ts - float(LIQUIDITY_RANK_CACHE["updated_at"]) < max(30, BALANCE_REFRESH_SECONDS) + return resolve_daily_volume_rank( + target_base, + LIQUIDITY_RANK_CACHE, + now_ts=time.time(), + ttl_sec=max(30, BALANCE_REFRESH_SECONDS), + exchange=exchange, + ensure_markets_loaded=ensure_markets_loaded, ) - if not cached_ok: - try: - ensure_markets_loaded() - tickers = exchange.fetch_tickers() - scored = [] - for s, t in (tickers or {}).items(): - try: - mk = exchange.markets.get(s) - if not mk or not mk.get("swap"): - continue - su = str(s).upper() - if "USDT" not in su: - continue - qv = _safe_float((t or {}).get("quoteVolume")) - if qv is None: - info = (t or {}).get("info") if isinstance((t or {}).get("info"), dict) else {} - qv = _safe_float(info.get("volCcy24h") or info.get("vol24h")) - if qv is None: - bv = _safe_float((t or {}).get("baseVolume")) - lp = _safe_float((t or {}).get("last")) - if bv is not None and lp is not None: - qv = bv * lp - if qv is None or qv <= 0: - continue - scored.append((_ticker_base(s), float(qv))) - except Exception: - continue - scored.sort(key=lambda x: x[1], reverse=True) - ranks = {} - for idx, (base, _) in enumerate(scored, 1): - if base and base not in ranks: - ranks[base] = idx - LIQUIDITY_RANK_CACHE["ranks"] = ranks - LIQUIDITY_RANK_CACHE["total"] = len(scored) - LIQUIDITY_RANK_CACHE["updated_at"] = now_ts - except Exception: - pass - ranks = LIQUIDITY_RANK_CACHE.get("ranks") or {} - total = int(LIQUIDITY_RANK_CACHE.get("total") or 0) - return ranks.get(target_base), total def _key_hard_checks(symbol, direction, upper, lower, monitor_type): diff --git a/crypto_monitor_okx/app.py b/crypto_monitor_okx/app.py index c936b6c..39918a6 100644 --- a/crypto_monitor_okx/app.py +++ b/crypto_monitor_okx/app.py @@ -177,6 +177,7 @@ from order_monitor_display_lib import ( ) from wechat_notify_lib import build_wechat_rs_level_message, send_wechat_webhook from hub_auth import request_allowed as hub_request_allowed +from hub_volume_rank_lib import resolve_daily_volume_rank from history_window_lib import ( PRESET_CUSTOM, PRESET_UTC_LAST24H, @@ -404,7 +405,6 @@ LIQUIDITY_RANK_CACHE = { "updated_at": 0.0, "version": 0, "ranks": {}, - "volumes": {}, "total": 0, } @@ -3900,137 +3900,20 @@ def _status_by_ema55(symbol, timeframe): return "横盘", None, None -def _okx_swap_turnover_usdt(ticker_row: dict) -> float | None: - """ - OKX 永续 24h 成交额(USDT)。 - 官方文档:SWAP 的 volCcy24h 单位是「基础币数量」(如 ZEC),不是 USDT; - 网页「24小时成交额」≈ volCcy24h × last。 - """ - if not isinstance(ticker_row, dict): - return None - base_vol = _safe_float(ticker_row.get("volCcy24h")) - if base_vol is None or base_vol <= 0: - return None - last = _safe_float(ticker_row.get("last")) - if last is None or last <= 0: - return None - return float(base_vol * last) - - -def _okx_usdt_swap_volume_by_base() -> dict[str, float]: - """ - OKX USDT 永续全市场 24h 成交额(USDT),与 App「24小时成交额」一致。 - 走 /market/tickers?instType=SWAP,成交额 = volCcy24h(基础币) × last。 - """ - by_base: dict[str, float] = {} - try: - ensure_markets_loaded() - if hasattr(exchange, "publicGetMarketTickers"): - resp = exchange.publicGetMarketTickers({"instType": "SWAP"}) - rows = (resp or {}).get("data") or [] - for row in rows: - if not isinstance(row, dict): - continue - inst = str(row.get("instId") or "").upper() - # 仅 USDT 本位永续:BTC-USDT-SWAP - parts = inst.split("-") - if len(parts) < 3 or parts[-1] != "SWAP" or parts[1] != "USDT": - continue - base = parts[0].strip() - if not base: - continue - qv = _okx_swap_turnover_usdt(row) - if qv is None or qv <= 0: - continue - by_base[base] = max(by_base.get(base, 0), float(qv)) - if by_base: - return by_base - except Exception: - pass - - def _ticker_base(sym_text): - s = str(sym_text or "").upper().strip() - if ":" in s: - s = s.split(":", 1)[0] - if "/" in s: - return s.split("/", 1)[0].strip() - if "-" in s: - return s.split("-", 1)[0].strip() - if s.endswith("USDT"): - return s[:-4].strip() - return s - - try: - ensure_markets_loaded() - try: - tickers = exchange.fetch_tickers(params={"instType": "SWAP"}) - except Exception: - tickers = exchange.fetch_tickers() - for s, t in (tickers or {}).items(): - try: - mk = exchange.markets.get(s) if exchange.markets else None - if mk is not None: - if not mk.get("swap"): - continue - if str(mk.get("quote") or "").upper() != "USDT": - continue - if mk.get("linear") is False: - continue - if mk.get("active") is False: - continue - else: - su = str(s).upper() - if "USDT" not in su or ":USDT" not in su: - continue - base = _ticker_base(s) - if not base: - continue - info = (t or {}).get("info") if isinstance((t or {}).get("info"), dict) else {} - row = dict(info) - if row.get("last") is None: - row["last"] = (t or {}).get("last") - qv = _okx_swap_turnover_usdt(row) - if qv is None: - qv = _safe_float((t or {}).get("quoteVolume")) - if qv is None or qv <= 0: - continue - by_base[base] = max(by_base.get(base, 0), float(qv)) - except Exception: - continue - except Exception: - pass - return by_base - - def _daily_volume_rank(symbol): """ - 返回(symbol_rank, total_count):OKX USDT 永续 24h 成交额(USDT) 在全市场币种中的排名(非「本月」)。 - 成交额 = volCcy24h(基础币) × last,与 OKX App「24小时成交额」一致。 + 返回(symbol_rank, total_count):OKX USDT 永续 24h 成交额(USDT) 在全市场币种中的排名。 """ sym_norm = normalize_symbol_input(symbol) target_base = journal_coin_from_symbol(sym_norm) - now_ts = time.time() - cache_ver = 4 - cached_ok = ( - LIQUIDITY_RANK_CACHE.get("version") == cache_ver - and LIQUIDITY_RANK_CACHE["updated_at"] - and now_ts - float(LIQUIDITY_RANK_CACHE["updated_at"]) < max(30, BALANCE_REFRESH_SECONDS) + return resolve_daily_volume_rank( + target_base, + LIQUIDITY_RANK_CACHE, + now_ts=time.time(), + ttl_sec=max(30, BALANCE_REFRESH_SECONDS), + exchange=exchange, + ensure_markets_loaded=ensure_markets_loaded, ) - if not cached_ok: - try: - by_base = _okx_usdt_swap_volume_by_base() - scored = sorted(by_base.items(), key=lambda x: x[1], reverse=True) - ranks = {base: idx for idx, (base, _) in enumerate(scored, 1)} - LIQUIDITY_RANK_CACHE["ranks"] = ranks - LIQUIDITY_RANK_CACHE["volumes"] = dict(by_base) - LIQUIDITY_RANK_CACHE["total"] = len(scored) - LIQUIDITY_RANK_CACHE["version"] = cache_ver - LIQUIDITY_RANK_CACHE["updated_at"] = now_ts - except Exception: - pass - ranks = LIQUIDITY_RANK_CACHE.get("ranks") or {} - total = int(LIQUIDITY_RANK_CACHE.get("total") or 0) - return ranks.get(target_base), total def _key_hard_checks(symbol, direction, upper, lower, monitor_type): diff --git a/hub_volume_rank_lib.py b/hub_volume_rank_lib.py index 7df0ac1..e5b05d7 100644 --- a/hub_volume_rank_lib.py +++ b/hub_volume_rank_lib.py @@ -13,6 +13,7 @@ from hub_trades_lib import trading_day_from_dt TOP_N_DEFAULT = 20 CACHE_VERSION = 3 +LIQUIDITY_RANK_CACHE_VERSION = 1 def volume_rank_reset_hour() -> int: @@ -372,6 +373,62 @@ def _collect_scores(exchange, exchange_id: str) -> list[tuple[str, str, float]]: return _scores_from_markets(exchange, tickers or {}, ex_id) +def build_usdt_swap_volume_ranks( + exchange, + ensure_markets_loaded: Callable[[], None], + *, + exchange_id: str | None = None, +) -> tuple[dict[str, int], int]: + """ + 全市场 USDT 永续 24h 成交额排名(base -> rank)。 + 优先各所轻量 ticker API,避免 fetch_tickers() 拉全市场(Gate/Binance 内存优化)。 + """ + ensure_markets_loaded() + ex_id = str(exchange_id or getattr(exchange, "id", "") or "").lower() + scored = _collect_scores(exchange, ex_id) + ranks: dict[str, int] = {} + for idx, (_sym, base, _qv) in enumerate(scored, 1): + if base and base not in ranks: + ranks[base] = idx + return ranks, len(scored) + + +def resolve_daily_volume_rank( + target_base: str, + cache: dict[str, Any], + *, + now_ts: float, + ttl_sec: float, + exchange, + ensure_markets_loaded: Callable[[], None], + exchange_id: str | None = None, + cache_version: int = LIQUIDITY_RANK_CACHE_VERSION, +) -> tuple[int | None, int]: + """关键位门控:按 base 查 24h 成交额全市场排名;cache 带 TTL。""" + cached_ok = ( + cache.get("version") == cache_version + and cache.get("updated_at") + and now_ts - float(cache["updated_at"]) < ttl_sec + ) + if not cached_ok: + try: + ranks, total = build_usdt_swap_volume_ranks( + exchange, + ensure_markets_loaded, + exchange_id=exchange_id, + ) + cache["ranks"] = ranks + cache["total"] = total + cache["version"] = cache_version + cache["updated_at"] = now_ts + except Exception: + pass + ranks = cache.get("ranks") or {} + total = int(cache.get("total") or 0) + base = str(target_base or "").strip().upper() + return ranks.get(base), total + + def fetch_usdt_swap_volume_rank( exchange, ensure_markets_loaded: Callable[[], None], diff --git a/tests/test_hub_volume_rank_lib.py b/tests/test_hub_volume_rank_lib.py index cf03577..f363131 100644 --- a/tests/test_hub_volume_rank_lib.py +++ b/tests/test_hub_volume_rank_lib.py @@ -1,14 +1,20 @@ from datetime import datetime +from unittest.mock import MagicMock from hub_volume_rank_lib import ( CACHE_VERSION, + LIQUIDITY_RANK_CACHE_VERSION, TOP_N_DEFAULT, _exchange_rank_row_stale, _okx_turnover_usdt, + _scores_from_binance, + _scores_from_gate, + build_usdt_swap_volume_ranks, cache_needs_refresh, format_volume_quote, merge_exchange_rank, rank_date_label, + resolve_daily_volume_rank, ) @@ -63,3 +69,73 @@ def test_short_item_list_is_stale(): assert _exchange_rank_row_stale(row) is True full = {"items": items + [{"rank": i, "symbol": f"X{i}/USDT"} for i in range(13, TOP_N_DEFAULT + 1)], "total_symbols": 300} assert _exchange_rank_row_stale(full) is False + + +def test_scores_from_binance_uses_fapi_lightweight_api(): + ex = MagicMock() + ex.id = "binance" + ex.fapiPublicGetTicker24hr.return_value = [ + {"symbol": "BTCUSDT", "quoteVolume": "9000000"}, + {"symbol": "ETHUSDT", "quoteVolume": "5000000"}, + ] + scored = _scores_from_binance(ex) + assert scored[0][1] == "BTC" + assert scored[0][2] == 9000000.0 + ex.fetch_tickers.assert_not_called() + + +def test_scores_from_gate_uses_futures_tickers_api(): + ex = MagicMock() + ex.id = "gateio" + ex.publicFuturesGetSettleTickers.return_value = [ + {"contract": "BTC_USDT", "volume_24h_quote": "8000000"}, + {"contract": "ETH_USDT", "volume_24h_quote": "4000000"}, + ] + scored = _scores_from_gate(ex) + assert scored[0][1] == "BTC" + ex.fetch_tickers.assert_not_called() + + +def test_resolve_daily_volume_rank_caches_result(): + cache = {"version": 0, "updated_at": 0.0, "ranks": {}, "total": 0} + ex = MagicMock() + ex.id = "binance" + ex.fapiPublicGetTicker24hr.return_value = [ + {"symbol": "BTCUSDT", "quoteVolume": "100"}, + {"symbol": "ETHUSDT", "quoteVolume": "50"}, + ] + + rank, total = resolve_daily_volume_rank( + "BTC", + cache, + now_ts=1000.0, + ttl_sec=60.0, + exchange=ex, + ensure_markets_loaded=lambda: None, + ) + assert rank == 1 + assert total == 2 + assert cache["version"] == LIQUIDITY_RANK_CACHE_VERSION + calls = ex.fapiPublicGetTicker24hr.call_count + + rank2, _ = resolve_daily_volume_rank( + "BTC", + cache, + now_ts=1010.0, + ttl_sec=60.0, + exchange=ex, + ensure_markets_loaded=lambda: None, + ) + assert rank2 == 1 + assert ex.fapiPublicGetTicker24hr.call_count == calls + + +def test_build_usdt_swap_volume_ranks(): + ex = MagicMock() + ex.id = "binance" + ex.fapiPublicGetTicker24hr.return_value = [ + {"symbol": "SOLUSDT", "quoteVolume": "200"}, + ] + ranks, total = build_usdt_swap_volume_ranks(ex, lambda: None) + assert ranks["SOL"] == 1 + assert total == 1