Files
crypto_monitor/hub_volume_rank_lib.py
T
2026-06-21 09:24:27 +08:00

591 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""行情区:各交易所 USDT 永续昨日成交额 Top N(每日 8:00 快照)。"""
from __future__ import annotations
import json
import os
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any, Callable
from zoneinfo import ZoneInfo
from hub_trades_lib import trading_day_from_dt
TOP_N_DEFAULT = 20
CACHE_VERSION = 3
LIQUIDITY_RANK_CACHE_VERSION = 1
def volume_rank_reset_hour() -> int:
try:
return max(0, min(23, int(os.getenv("HUB_VOLUME_RANK_RESET_HOUR", "8"))))
except ValueError:
return 8
def volume_rank_timezone() -> ZoneInfo:
name = (os.getenv("HUB_VOLUME_RANK_TZ") or "Asia/Shanghai").strip() or "Asia/Shanghai"
try:
return ZoneInfo(name)
except Exception:
return ZoneInfo("Asia/Shanghai")
def rank_date_label(*, now: datetime | None = None, reset_hour: int | None = None) -> str:
"""8 点更新后展示的「昨日」交易日(与 TRADING_DAY_RESET_HOUR 口径一致)。"""
rh = volume_rank_reset_hour() if reset_hour is None else reset_hour
tz = volume_rank_timezone()
dt = now.astimezone(tz) if now else datetime.now(tz)
cur_td = trading_day_from_dt(dt.replace(tzinfo=None), rh)
cur = datetime.strptime(cur_td, "%Y-%m-%d").date()
return (cur - timedelta(days=1)).isoformat()
def seconds_until_next_reset(
*,
now: datetime | None = None,
reset_hour: int | None = None,
) -> float:
rh = volume_rank_reset_hour() if reset_hour is None else reset_hour
tz = volume_rank_timezone()
dt = now.astimezone(tz) if now else datetime.now(tz)
nxt = dt.replace(hour=rh, minute=0, second=0, microsecond=0)
if dt >= nxt:
nxt += timedelta(days=1)
return max(1.0, (nxt - dt).total_seconds())
def default_cache_path() -> Path:
raw = (os.getenv("HUB_VOLUME_RANK_CACHE_PATH") or "").strip()
if raw:
return Path(raw)
hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data"
hub_dir.mkdir(parents=True, exist_ok=True)
return hub_dir / "hub_volume_rank.json"
def _safe_float(v: Any) -> float | None:
try:
n = float(v)
return n if n == n else None
except (TypeError, ValueError):
return None
def _ticker_base(sym_text: str) -> str:
s = str(sym_text or "").upper().strip()
if ":" in s:
s = s.split(":", 1)[0]
if "/" in s:
return s.split("/", 1)[0].strip()
if "-" in s:
return s.split("-", 1)[0].strip()
if s.endswith("USDT"):
return s[:-4].strip()
return s
def _hub_symbol_from_base(base: str, quote: str = "USDT") -> str:
b = str(base or "").strip().upper()
q = str(quote or "USDT").strip().upper()
return f"{b}/{q}" if b else ""
def _hub_symbol_from_market(market: dict | None, fallback_symbol: str) -> str:
if market:
base = str(market.get("base") or "").strip().upper()
quote = str(market.get("quote") or "USDT").strip().upper()
if base:
return f"{base}/{quote}"
fb = str(fallback_symbol or "").upper().strip()
if ":" in fb:
fb = fb.split(":", 1)[0]
if "/" in fb:
return fb
base = _ticker_base(fb)
return f"{base}/USDT" if base else fb
def _okx_turnover_usdt(row: dict | None) -> float | None:
"""OKX SWAP:成交额(USDT) ≈ volCcy24h(基础币) × last。"""
if not isinstance(row, dict):
return None
base_vol = _safe_float(row.get("volCcy24h"))
if base_vol is None or base_vol <= 0:
return None
last = _safe_float(row.get("last") or row.get("lastPx"))
if last is None or last <= 0:
return None
return float(base_vol * last)
def _quote_volume_from_ticker(
ticker: dict | None,
market: dict | None,
*,
exchange_id: str = "",
) -> float | None:
ex_id = str(exchange_id or "").lower()
t = ticker or {}
info = t.get("info") if isinstance(t.get("info"), dict) else {}
if ex_id == "okx":
row = dict(info)
if row.get("last") is None:
row["last"] = t.get("last")
qv = _okx_turnover_usdt(row)
if qv is not None and qv > 0:
return qv
qv = _safe_float(t.get("quoteVolume"))
if qv is not None and qv > 0:
return qv
if ex_id in ("gateio", "gate"):
for key in (
"volume_24h_quote",
"volume_24h_settle",
"quote_volume",
"vol_24h",
"turnover",
):
qv = _safe_float(info.get(key))
if qv is not None and qv > 0:
return qv
for key in ("quoteVolume", "volCcy24h", "vol24h", "turnover24h", "amount24", "turnover"):
qv = _safe_float(info.get(key))
if qv is not None and qv > 0:
if key == "volCcy24h" and ex_id == "okx":
last = _safe_float(info.get("last") or info.get("lastPx") or t.get("last"))
if last:
return qv * last
return qv
bv = _safe_float(t.get("baseVolume"))
lp = _safe_float(t.get("last")) or _safe_float(t.get("close"))
if bv is not None and lp is not None and bv > 0 and lp > 0:
return bv * lp
if info:
bv = _safe_float(info.get("volCcy24h") or info.get("vol24h") or info.get("volume"))
lp = _safe_float(info.get("last") or info.get("lastPx") or info.get("markPrice"))
if bv is not None and lp is not None and bv > 0 and lp > 0:
return bv * lp
return None
def _is_usdt_linear_swap(market: dict | None, symbol: str) -> bool:
if not market:
su = str(symbol or "").upper()
return "USDT" in su and (":USDT" in su or "/USDT" in su or su.endswith("USDT"))
if not market.get("swap") and market.get("type") not in ("swap", "future"):
return False
if str(market.get("quote") or "").upper() != "USDT":
return False
if market.get("linear") is False:
return False
if market.get("active") is False:
return False
settle = str(market.get("settle") or "").upper()
if settle and settle != "USDT":
return False
return True
def _lookup_ticker(tickers: dict, sym: str, market: dict | None) -> dict | None:
if not tickers:
return None
t = tickers.get(sym)
if t:
return t
if not market:
return None
base = market.get("base")
quote = market.get("quote") or "USDT"
settle = market.get("settle") or quote
candidates = [
sym,
f"{base}/{quote}:{settle}",
f"{base}/{quote}",
f"{base}{quote}",
market.get("id"),
]
for key in candidates:
if not key:
continue
t = tickers.get(key)
if t:
return t
return None
def _merge_scores(scored: dict[str, tuple[str, float]]) -> list[tuple[str, str, float]]:
rows = [(sym, base, vol) for base, (sym, vol) in scored.items() if sym and base and vol > 0]
rows.sort(key=lambda x: x[2], reverse=True)
return rows
def _scores_from_okx(exchange) -> list[tuple[str, str, float]]:
by_base: dict[str, tuple[str, float]] = {}
if hasattr(exchange, "publicGetMarketTickers"):
try:
resp = exchange.publicGetMarketTickers({"instType": "SWAP"})
for row in (resp or {}).get("data") or []:
if not isinstance(row, dict):
continue
inst = str(row.get("instId") or "").upper()
parts = inst.split("-")
if len(parts) < 3 or parts[-1] != "SWAP" or parts[1] != "USDT":
continue
base = parts[0].strip()
if not base:
continue
qv = _okx_turnover_usdt(row)
if qv is None or qv <= 0:
continue
sym = _hub_symbol_from_base(base)
prev = by_base.get(base)
if prev is None or qv > prev[1]:
by_base[base] = (sym, float(qv))
if by_base:
return _merge_scores(by_base)
except Exception:
pass
try:
tickers = exchange.fetch_tickers(params={"instType": "SWAP"})
except Exception:
tickers = exchange.fetch_tickers()
return _scores_from_markets(exchange, tickers or {}, "okx")
def _scores_from_binance(exchange) -> list[tuple[str, str, float]]:
by_base: dict[str, tuple[str, float]] = {}
if hasattr(exchange, "fapiPublicGetTicker24hr"):
try:
rows = exchange.fapiPublicGetTicker24hr()
if isinstance(rows, list):
for row in rows:
if not isinstance(row, dict):
continue
raw = str(row.get("symbol") or "").upper()
if not raw.endswith("USDT"):
continue
base = raw[:-4]
if not base:
continue
qv = _safe_float(row.get("quoteVolume"))
if qv is None or qv <= 0:
bv = _safe_float(row.get("volume"))
lp = _safe_float(row.get("lastPrice") or row.get("weightedAvgPrice"))
if bv and lp:
qv = bv * lp
if qv is None or qv <= 0:
continue
sym = _hub_symbol_from_base(base)
prev = by_base.get(base)
if prev is None or qv > prev[1]:
by_base[base] = (sym, float(qv))
if by_base:
return _merge_scores(by_base)
except Exception:
pass
tickers = exchange.fetch_tickers()
return _scores_from_markets(exchange, tickers or {}, "binance")
def _scores_from_gate(exchange) -> list[tuple[str, str, float]]:
by_base: dict[str, tuple[str, float]] = {}
for method_name in ("publicFuturesGetSettleTickers", "publicFuturesGetUsdtTickers"):
fn = getattr(exchange, method_name, None)
if not callable(fn):
continue
try:
rows = fn({"settle": "usdt"})
if isinstance(rows, list):
for row in rows:
if not isinstance(row, dict):
continue
contract = str(row.get("contract") or row.get("name") or "").upper()
if not contract:
continue
base = contract.replace("_USDT", "").replace("USDT", "").strip("_")
if not base:
continue
qv = _safe_float(row.get("volume_24h_quote") or row.get("volume_24h_settle"))
if qv is None or qv <= 0:
bv = _safe_float(row.get("volume_24h_base"))
lp = _safe_float(row.get("last") or row.get("mark_price"))
if bv and lp:
qv = bv * lp
if qv is None or qv <= 0:
continue
sym = _hub_symbol_from_base(base)
prev = by_base.get(base)
if prev is None or qv > prev[1]:
by_base[base] = (sym, float(qv))
if by_base:
return _merge_scores(by_base)
except Exception:
continue
tickers = exchange.fetch_tickers()
return _scores_from_markets(exchange, tickers or {}, "gateio")
def _scores_from_markets(
exchange,
tickers: dict,
exchange_id: str,
) -> list[tuple[str, str, float]]:
by_base: dict[str, tuple[str, float]] = {}
markets = getattr(exchange, "markets", None) or {}
for sym, mk in markets.items():
try:
if not _is_usdt_linear_swap(mk, sym):
continue
ticker = _lookup_ticker(tickers, sym, mk)
qv = _quote_volume_from_ticker(ticker, mk, exchange_id=exchange_id)
if qv is None or qv <= 0:
continue
hub_sym = _hub_symbol_from_market(mk, sym)
base = _ticker_base(hub_sym)
if not base:
continue
prev = by_base.get(base)
if prev is None or qv > prev[1]:
by_base[base] = (hub_sym, float(qv))
except Exception:
continue
return _merge_scores(by_base)
def _collect_scores(exchange, exchange_id: str) -> list[tuple[str, str, float]]:
ex_id = str(exchange_id or "").lower()
if ex_id == "okx":
return _scores_from_okx(exchange)
if ex_id == "binance":
return _scores_from_binance(exchange)
if ex_id in ("gateio", "gate", "gate_bot"):
return _scores_from_gate(exchange)
tickers = exchange.fetch_tickers()
return _scores_from_markets(exchange, tickers or {}, ex_id)
def build_usdt_swap_volume_ranks(
exchange,
ensure_markets_loaded: Callable[[], None],
*,
exchange_id: str | None = None,
) -> tuple[dict[str, int], int]:
"""
全市场 USDT 永续 24h 成交额排名(base -> rank)。
优先各所轻量 ticker API,避免 fetch_tickers() 拉全市场(Gate/Binance 内存优化)。
"""
ensure_markets_loaded()
ex_id = str(exchange_id or getattr(exchange, "id", "") or "").lower()
scored = _collect_scores(exchange, ex_id)
ranks: dict[str, int] = {}
for idx, (_sym, base, _qv) in enumerate(scored, 1):
if base and base not in ranks:
ranks[base] = idx
return ranks, len(scored)
def resolve_daily_volume_rank(
target_base: str,
cache: dict[str, Any],
*,
now_ts: float,
ttl_sec: float,
exchange,
ensure_markets_loaded: Callable[[], None],
exchange_id: str | None = None,
cache_version: int = LIQUIDITY_RANK_CACHE_VERSION,
) -> tuple[int | None, int]:
"""关键位门控:按 base 查 24h 成交额全市场排名;cache 带 TTL。"""
cached_ok = (
cache.get("version") == cache_version
and cache.get("updated_at")
and now_ts - float(cache["updated_at"]) < ttl_sec
)
if not cached_ok:
try:
ranks, total = build_usdt_swap_volume_ranks(
exchange,
ensure_markets_loaded,
exchange_id=exchange_id,
)
cache["ranks"] = ranks
cache["total"] = total
cache["version"] = cache_version
cache["updated_at"] = now_ts
except Exception:
pass
ranks = cache.get("ranks") or {}
total = int(cache.get("total") or 0)
base = str(target_base or "").strip().upper()
return ranks.get(base), total
def fetch_usdt_swap_volume_rank(
exchange,
ensure_markets_loaded: Callable[[], None],
*,
top_n: int = TOP_N_DEFAULT,
rank_date: str | None = None,
exchange_id: str | None = None,
) -> dict[str, Any]:
"""从 ccxt 拉全市场 USDT 永续 ticker,按 24h 成交额(USDT) 取 Top N。"""
top_n = max(1, min(int(top_n or TOP_N_DEFAULT), 100))
ensure_markets_loaded()
ex_id = str(exchange_id or getattr(exchange, "id", "") or "").lower()
try:
scored = _collect_scores(exchange, ex_id)
except Exception as e:
return {"ok": False, "msg": str(e)}
items = []
for idx, (hub_sym, base, qv) in enumerate(scored[:top_n], 1):
items.append(
{
"rank": idx,
"symbol": hub_sym,
"base": base,
"volume_quote": round(qv, 4),
}
)
return {
"ok": True,
"rank_date": rank_date or rank_date_label(),
"items": items,
"total_symbols": len(scored),
"exchange_id": ex_id,
"fetched_at": datetime.now(volume_rank_timezone()).isoformat(timespec="seconds"),
}
def format_volume_quote(value: float | None) -> str:
n = _safe_float(value)
if n is None or n <= 0:
return ""
if n >= 1e9:
return f"{n / 1e9:.2f}B"
if n >= 1e6:
return f"{n / 1e6:.2f}M"
if n >= 1e3:
return f"{n / 1e3:.2f}K"
return f"{n:.0f}"
def load_volume_rank_cache(path: Path | None = None) -> dict[str, Any]:
p = path or default_cache_path()
if not p.is_file():
return {"version": CACHE_VERSION, "exchanges": {}}
try:
data = json.loads(p.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return {"version": CACHE_VERSION, "exchanges": {}}
if int(data.get("version") or 0) < CACHE_VERSION:
return {"version": CACHE_VERSION, "exchanges": {}}
data.setdefault("version", CACHE_VERSION)
data.setdefault("exchanges", {})
return data
except Exception:
return {"version": CACHE_VERSION, "exchanges": {}}
def save_volume_rank_cache(data: dict[str, Any], path: Path | None = None) -> None:
p = path or default_cache_path()
p.parent.mkdir(parents=True, exist_ok=True)
payload = dict(data)
payload["version"] = CACHE_VERSION
payload["updated_at"] = datetime.now(volume_rank_timezone()).isoformat(timespec="seconds")
p.write_text(json.dumps(payload, ensure_ascii=False, indent=2), encoding="utf-8")
def merge_exchange_rank(
cache: dict[str, Any],
exchange_key: str,
payload: dict[str, Any],
) -> dict[str, Any]:
ex_k = str(exchange_key or "").strip().lower()
if not ex_k or not payload.get("ok"):
return cache
exchanges = dict(cache.get("exchanges") or {})
exchanges[ex_k] = {
"rank_date": payload.get("rank_date"),
"items": payload.get("items") or [],
"total_symbols": int(payload.get("total_symbols") or 0),
"fetched_at": payload.get("fetched_at"),
"error": None,
}
out = dict(cache)
out["exchanges"] = exchanges
out["rank_date"] = payload.get("rank_date") or cache.get("rank_date")
return out
def _exchange_rank_row_stale(row: dict[str, Any] | None) -> bool:
if not row:
return True
items = row.get("items") or []
if len(items) < TOP_N_DEFAULT:
return True
total = int(row.get("total_symbols") or 0)
if total > 0 and total < TOP_N_DEFAULT:
return True
return False
def cache_needs_refresh(
cache: dict[str, Any],
*,
expected_rank_date: str | None = None,
required_keys: list[str] | None = None,
) -> bool:
expected = expected_rank_date or rank_date_label()
if int(cache.get("version") or 0) < CACHE_VERSION:
return True
exchanges = cache.get("exchanges") or {}
if not exchanges:
return True
if str(cache.get("rank_date") or "") != expected:
return True
keys = required_keys or list(exchanges.keys())
if not keys:
return True
for key in keys:
ex_k = str(key or "").strip().lower()
if not ex_k:
continue
if _exchange_rank_row_stale(exchanges.get(ex_k)):
return True
return False
def get_cached_rank(
cache: dict[str, Any],
exchange_key: str,
*,
top_n: int = TOP_N_DEFAULT,
) -> dict[str, Any]:
ex_k = str(exchange_key or "").strip().lower()
ex_data = (cache.get("exchanges") or {}).get(ex_k) or {}
items = list(ex_data.get("items") or [])[: max(1, int(top_n))]
stale = _exchange_rank_row_stale(ex_data)
return {
"ok": True,
"exchange_key": ex_k,
"rank_date": ex_data.get("rank_date") or cache.get("rank_date"),
"updated_at": cache.get("updated_at"),
"items": items,
"item_count": len(items),
"expected_count": int(top_n),
"total_symbols": int(ex_data.get("total_symbols") or 0),
"stale": stale,
"error": ex_data.get("error"),
}