"""行情区:各交易所 USDT 永续昨日成交额 Top N(每日 8:00 快照)。""" from __future__ import annotations import json import os from datetime import datetime, timedelta from pathlib import Path from typing import Any, Callable from zoneinfo import ZoneInfo from hub_trades_lib import trading_day_from_dt TOP_N_DEFAULT = 20 CACHE_VERSION = 3 LIQUIDITY_RANK_CACHE_VERSION = 1 def volume_rank_reset_hour() -> int: try: return max(0, min(23, int(os.getenv("HUB_VOLUME_RANK_RESET_HOUR", "8")))) except ValueError: return 8 def volume_rank_timezone() -> ZoneInfo: name = (os.getenv("HUB_VOLUME_RANK_TZ") or "Asia/Shanghai").strip() or "Asia/Shanghai" try: return ZoneInfo(name) except Exception: return ZoneInfo("Asia/Shanghai") def rank_date_label(*, now: datetime | None = None, reset_hour: int | None = None) -> str: """8 点更新后展示的「昨日」交易日(与 TRADING_DAY_RESET_HOUR 口径一致)。""" rh = volume_rank_reset_hour() if reset_hour is None else reset_hour tz = volume_rank_timezone() dt = now.astimezone(tz) if now else datetime.now(tz) cur_td = trading_day_from_dt(dt.replace(tzinfo=None), rh) cur = datetime.strptime(cur_td, "%Y-%m-%d").date() return (cur - timedelta(days=1)).isoformat() def seconds_until_next_reset( *, now: datetime | None = None, reset_hour: int | None = None, ) -> float: rh = volume_rank_reset_hour() if reset_hour is None else reset_hour tz = volume_rank_timezone() dt = now.astimezone(tz) if now else datetime.now(tz) nxt = dt.replace(hour=rh, minute=0, second=0, microsecond=0) if dt >= nxt: nxt += timedelta(days=1) return max(1.0, (nxt - dt).total_seconds()) def default_cache_path() -> Path: raw = (os.getenv("HUB_VOLUME_RANK_CACHE_PATH") or "").strip() if raw: return Path(raw) hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data" hub_dir.mkdir(parents=True, exist_ok=True) return hub_dir / "hub_volume_rank.json" def _safe_float(v: Any) -> float | None: try: n = float(v) return n if n == n else None except (TypeError, ValueError): return None def _ticker_base(sym_text: str) -> str: s = str(sym_text or "").upper().strip() if ":" in s: s = s.split(":", 1)[0] if "/" in s: return s.split("/", 1)[0].strip() if "-" in s: return s.split("-", 1)[0].strip() if s.endswith("USDT"): return s[:-4].strip() return s def _hub_symbol_from_base(base: str, quote: str = "USDT") -> str: b = str(base or "").strip().upper() q = str(quote or "USDT").strip().upper() return f"{b}/{q}" if b else "" def _hub_symbol_from_market(market: dict | None, fallback_symbol: str) -> str: if market: base = str(market.get("base") or "").strip().upper() quote = str(market.get("quote") or "USDT").strip().upper() if base: return f"{base}/{quote}" fb = str(fallback_symbol or "").upper().strip() if ":" in fb: fb = fb.split(":", 1)[0] if "/" in fb: return fb base = _ticker_base(fb) return f"{base}/USDT" if base else fb def _okx_turnover_usdt(row: dict | None) -> float | None: """OKX SWAP:成交额(USDT) ≈ volCcy24h(基础币) × last。""" if not isinstance(row, dict): return None base_vol = _safe_float(row.get("volCcy24h")) if base_vol is None or base_vol <= 0: return None last = _safe_float(row.get("last") or row.get("lastPx")) if last is None or last <= 0: return None return float(base_vol * last) def _quote_volume_from_ticker( ticker: dict | None, market: dict | None, *, exchange_id: str = "", ) -> float | None: ex_id = str(exchange_id or "").lower() t = ticker or {} info = t.get("info") if isinstance(t.get("info"), dict) else {} if ex_id == "okx": row = dict(info) if row.get("last") is None: row["last"] = t.get("last") qv = _okx_turnover_usdt(row) if qv is not None and qv > 0: return qv qv = _safe_float(t.get("quoteVolume")) if qv is not None and qv > 0: return qv if ex_id in ("gateio", "gate"): for key in ( "volume_24h_quote", "volume_24h_settle", "quote_volume", "vol_24h", "turnover", ): qv = _safe_float(info.get(key)) if qv is not None and qv > 0: return qv for key in ("quoteVolume", "volCcy24h", "vol24h", "turnover24h", "amount24", "turnover"): qv = _safe_float(info.get(key)) if qv is not None and qv > 0: if key == "volCcy24h" and ex_id == "okx": last = _safe_float(info.get("last") or info.get("lastPx") or t.get("last")) if last: return qv * last return qv bv = _safe_float(t.get("baseVolume")) lp = _safe_float(t.get("last")) or _safe_float(t.get("close")) if bv is not None and lp is not None and bv > 0 and lp > 0: return bv * lp if info: bv = _safe_float(info.get("volCcy24h") or info.get("vol24h") or info.get("volume")) lp = _safe_float(info.get("last") or info.get("lastPx") or info.get("markPrice")) if bv is not None and lp is not None and bv > 0 and lp > 0: return bv * lp return None def _is_usdt_linear_swap(market: dict | None, symbol: str) -> bool: if not market: su = str(symbol or "").upper() return "USDT" in su and (":USDT" in su or "/USDT" in su or su.endswith("USDT")) if not market.get("swap") and market.get("type") not in ("swap", "future"): return False if str(market.get("quote") or "").upper() != "USDT": return False if market.get("linear") is False: return False if market.get("active") is False: return False settle = str(market.get("settle") or "").upper() if settle and settle != "USDT": return False return True def _lookup_ticker(tickers: dict, sym: str, market: dict | None) -> dict | None: if not tickers: return None t = tickers.get(sym) if t: return t if not market: return None base = market.get("base") quote = market.get("quote") or "USDT" settle = market.get("settle") or quote candidates = [ sym, f"{base}/{quote}:{settle}", f"{base}/{quote}", f"{base}{quote}", market.get("id"), ] for key in candidates: if not key: continue t = tickers.get(key) if t: return t return None def _merge_scores(scored: dict[str, tuple[str, float]]) -> list[tuple[str, str, float]]: rows = [(sym, base, vol) for base, (sym, vol) in scored.items() if sym and base and vol > 0] rows.sort(key=lambda x: x[2], reverse=True) return rows def _scores_from_okx(exchange) -> list[tuple[str, str, float]]: by_base: dict[str, tuple[str, float]] = {} if hasattr(exchange, "publicGetMarketTickers"): try: resp = exchange.publicGetMarketTickers({"instType": "SWAP"}) for row in (resp or {}).get("data") or []: if not isinstance(row, dict): continue inst = str(row.get("instId") or "").upper() parts = inst.split("-") if len(parts) < 3 or parts[-1] != "SWAP" or parts[1] != "USDT": continue base = parts[0].strip() if not base: continue qv = _okx_turnover_usdt(row) if qv is None or qv <= 0: continue sym = _hub_symbol_from_base(base) prev = by_base.get(base) if prev is None or qv > prev[1]: by_base[base] = (sym, float(qv)) if by_base: return _merge_scores(by_base) except Exception: pass try: tickers = exchange.fetch_tickers(params={"instType": "SWAP"}) except Exception: tickers = exchange.fetch_tickers() return _scores_from_markets(exchange, tickers or {}, "okx") def _scores_from_binance(exchange) -> list[tuple[str, str, float]]: by_base: dict[str, tuple[str, float]] = {} if hasattr(exchange, "fapiPublicGetTicker24hr"): try: rows = exchange.fapiPublicGetTicker24hr() if isinstance(rows, list): for row in rows: if not isinstance(row, dict): continue raw = str(row.get("symbol") or "").upper() if not raw.endswith("USDT"): continue base = raw[:-4] if not base: continue qv = _safe_float(row.get("quoteVolume")) if qv is None or qv <= 0: bv = _safe_float(row.get("volume")) lp = _safe_float(row.get("lastPrice") or row.get("weightedAvgPrice")) if bv and lp: qv = bv * lp if qv is None or qv <= 0: continue sym = _hub_symbol_from_base(base) prev = by_base.get(base) if prev is None or qv > prev[1]: by_base[base] = (sym, float(qv)) if by_base: return _merge_scores(by_base) except Exception: pass return [] def _scores_from_gate(exchange) -> list[tuple[str, str, float]]: by_base: dict[str, tuple[str, float]] = {} for method_name in ("publicFuturesGetSettleTickers", "publicFuturesGetUsdtTickers"): fn = getattr(exchange, method_name, None) if not callable(fn): continue try: rows = fn({"settle": "usdt"}) if isinstance(rows, list): for row in rows: if not isinstance(row, dict): continue contract = str(row.get("contract") or row.get("name") or "").upper() if not contract: continue base = contract.replace("_USDT", "").replace("USDT", "").strip("_") if not base: continue qv = _safe_float(row.get("volume_24h_quote") or row.get("volume_24h_settle")) if qv is None or qv <= 0: bv = _safe_float(row.get("volume_24h_base")) lp = _safe_float(row.get("last") or row.get("mark_price")) if bv and lp: qv = bv * lp if qv is None or qv <= 0: continue sym = _hub_symbol_from_base(base) prev = by_base.get(base) if prev is None or qv > prev[1]: by_base[base] = (sym, float(qv)) if by_base: return _merge_scores(by_base) except Exception: continue return [] def _scores_from_markets( exchange, tickers: dict, exchange_id: str, ) -> list[tuple[str, str, float]]: by_base: dict[str, tuple[str, float]] = {} markets = getattr(exchange, "markets", None) or {} for sym, mk in markets.items(): try: if not _is_usdt_linear_swap(mk, sym): continue ticker = _lookup_ticker(tickers, sym, mk) qv = _quote_volume_from_ticker(ticker, mk, exchange_id=exchange_id) if qv is None or qv <= 0: continue hub_sym = _hub_symbol_from_market(mk, sym) base = _ticker_base(hub_sym) if not base: continue prev = by_base.get(base) if prev is None or qv > prev[1]: by_base[base] = (hub_sym, float(qv)) except Exception: continue return _merge_scores(by_base) def _collect_scores(exchange, exchange_id: str) -> list[tuple[str, str, float]]: ex_id = str(exchange_id or "").lower() if ex_id == "okx": return _scores_from_okx(exchange) if ex_id == "binance": return _scores_from_binance(exchange) if ex_id in ("gateio", "gate", "gate_bot"): return _scores_from_gate(exchange) tickers = exchange.fetch_tickers() return _scores_from_markets(exchange, tickers or {}, ex_id) def _uses_lightweight_volume_scores(exchange_id: str) -> bool: ex_id = str(exchange_id or "").lower() return ex_id in ("okx", "binance", "gateio", "gate", "gate_bot") def build_usdt_swap_volume_ranks( exchange, ensure_markets_loaded: Callable[[], None], *, exchange_id: str | None = None, ) -> tuple[dict[str, int], int]: """ 全市场 USDT 永续 24h 成交额排名(base -> rank)。 优先各所轻量 ticker API,避免 fetch_tickers() 拉全市场(Gate/Binance 内存优化)。 """ ex_id = str(exchange_id or getattr(exchange, "id", "") or "").lower() if not _uses_lightweight_volume_scores(ex_id): ensure_markets_loaded() scored = _collect_scores(exchange, ex_id) ranks: dict[str, int] = {} for idx, (_sym, base, _qv) in enumerate(scored, 1): if base and base not in ranks: ranks[base] = idx return ranks, len(scored) def resolve_daily_volume_rank( target_base: str, cache: dict[str, Any], *, now_ts: float, ttl_sec: float, exchange, ensure_markets_loaded: Callable[[], None], exchange_id: str | None = None, cache_version: int = LIQUIDITY_RANK_CACHE_VERSION, ) -> tuple[int | None, int]: """关键位门控:按 base 查 24h 成交额全市场排名;cache 带 TTL。""" cached_ok = ( cache.get("version") == cache_version and cache.get("updated_at") and now_ts - float(cache["updated_at"]) < ttl_sec ) if not cached_ok: try: ranks, total = build_usdt_swap_volume_ranks( exchange, ensure_markets_loaded, exchange_id=exchange_id, ) if total > 0 and ranks: cache["ranks"] = ranks cache["total"] = total cache["version"] = cache_version cache["updated_at"] = now_ts except Exception: pass ranks = cache.get("ranks") or {} total = int(cache.get("total") or 0) base = str(target_base or "").strip().upper() return ranks.get(base), total def fetch_usdt_swap_volume_rank( exchange, ensure_markets_loaded: Callable[[], None], *, top_n: int = TOP_N_DEFAULT, rank_date: str | None = None, exchange_id: str | None = None, ) -> dict[str, Any]: """从 ccxt 拉全市场 USDT 永续 ticker,按 24h 成交额(USDT) 取 Top N。""" top_n = max(1, min(int(top_n or TOP_N_DEFAULT), 100)) ensure_markets_loaded() ex_id = str(exchange_id or getattr(exchange, "id", "") or "").lower() try: scored = _collect_scores(exchange, ex_id) except Exception as e: return {"ok": False, "msg": str(e)} items = [] for idx, (hub_sym, base, qv) in enumerate(scored[:top_n], 1): items.append( { "rank": idx, "symbol": hub_sym, "base": base, "volume_quote": round(qv, 4), } ) return { "ok": True, "rank_date": rank_date or rank_date_label(), "items": items, "total_symbols": len(scored), "exchange_id": ex_id, "fetched_at": datetime.now(volume_rank_timezone()).isoformat(timespec="seconds"), } def format_volume_quote(value: float | None) -> str: n = _safe_float(value) if n is None or n <= 0: return "—" if n >= 1e9: return f"{n / 1e9:.2f}B" if n >= 1e6: return f"{n / 1e6:.2f}M" if n >= 1e3: return f"{n / 1e3:.2f}K" return f"{n:.0f}" def load_volume_rank_cache(path: Path | None = None) -> dict[str, Any]: p = path or default_cache_path() if not p.is_file(): return {"version": CACHE_VERSION, "exchanges": {}} try: data = json.loads(p.read_text(encoding="utf-8")) if not isinstance(data, dict): return {"version": CACHE_VERSION, "exchanges": {}} if int(data.get("version") or 0) < CACHE_VERSION: return {"version": CACHE_VERSION, "exchanges": {}} data.setdefault("version", CACHE_VERSION) data.setdefault("exchanges", {}) return data except Exception: return {"version": CACHE_VERSION, "exchanges": {}} def save_volume_rank_cache(data: dict[str, Any], path: Path | None = None) -> None: p = path or default_cache_path() p.parent.mkdir(parents=True, exist_ok=True) payload = dict(data) payload["version"] = CACHE_VERSION payload["updated_at"] = datetime.now(volume_rank_timezone()).isoformat(timespec="seconds") p.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8") def merge_exchange_rank( cache: dict[str, Any], exchange_key: str, payload: dict[str, Any], ) -> dict[str, Any]: ex_k = str(exchange_key or "").strip().lower() if not ex_k or not payload.get("ok"): return cache exchanges = dict(cache.get("exchanges") or {}) exchanges[ex_k] = { "rank_date": payload.get("rank_date"), "items": payload.get("items") or [], "total_symbols": int(payload.get("total_symbols") or 0), "fetched_at": payload.get("fetched_at"), "error": None, } out = dict(cache) out["exchanges"] = exchanges out["rank_date"] = payload.get("rank_date") or cache.get("rank_date") return out def _exchange_rank_row_stale(row: dict[str, Any] | None) -> bool: if not row: return True items = row.get("items") or [] if len(items) < TOP_N_DEFAULT: return True total = int(row.get("total_symbols") or 0) if total > 0 and total < TOP_N_DEFAULT: return True return False def cache_needs_refresh( cache: dict[str, Any], *, expected_rank_date: str | None = None, required_keys: list[str] | None = None, ) -> bool: expected = expected_rank_date or rank_date_label() if int(cache.get("version") or 0) < CACHE_VERSION: return True exchanges = cache.get("exchanges") or {} if not exchanges: return True if str(cache.get("rank_date") or "") != expected: return True keys = required_keys or list(exchanges.keys()) if not keys: return True for key in keys: ex_k = str(key or "").strip().lower() if not ex_k: continue if _exchange_rank_row_stale(exchanges.get(ex_k)): return True return False def get_cached_rank( cache: dict[str, Any], exchange_key: str, *, top_n: int = TOP_N_DEFAULT, ) -> dict[str, Any]: ex_k = str(exchange_key or "").strip().lower() ex_data = (cache.get("exchanges") or {}).get(ex_k) or {} items = list(ex_data.get("items") or [])[: max(1, int(top_n))] stale = _exchange_rank_row_stale(ex_data) return { "ok": True, "exchange_key": ex_k, "rank_date": ex_data.get("rank_date") or cache.get("rank_date"), "updated_at": cache.get("updated_at"), "items": items, "item_count": len(items), "expected_count": int(top_n), "total_symbols": int(ex_data.get("total_symbols") or 0), "stale": stale, "error": ex_data.get("error"), }