feat: daily volume top20 rank per exchange in market page

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-08 15:46:36 +08:00
parent 09eb9dc475
commit 4bf0c2363f
11 changed files with 763 additions and 6 deletions
+179 -1
View File
@@ -24,6 +24,18 @@ from hub_ohlcv_lib import (
chart_memory_cap,
retention_policy_meta,
)
from hub_volume_rank_lib import (
TOP_N_DEFAULT,
cache_needs_refresh,
format_volume_quote,
get_cached_rank,
load_volume_rank_cache,
merge_exchange_rank,
rank_date_label,
save_volume_rank_cache,
seconds_until_next_reset,
volume_rank_reset_hour,
)
from hub_symbol_archive_lib import (
ARCHIVE_DEFAULT_TIMEFRAME,
ARCHIVE_SEED_LOOKBACK_DAYS,
@@ -104,6 +116,9 @@ HUB_BUILD = "20260607-hub-archive"
_archive_sync_stop: asyncio.Event | None = None
_archive_sync_task: asyncio.Task | None = None
_last_archive_sync: dict | None = None
_volume_rank_stop: asyncio.Event | None = None
_volume_rank_task: asyncio.Task | None = None
_volume_rank_cache: dict | None = None
HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8"))
HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10"))
HUB_BOARD_TIMEOUT = float(os.getenv("HUB_BOARD_TIMEOUT", "45"))
@@ -321,6 +336,99 @@ async def _run_archive_sync_once() -> dict:
return out
def _fetch_instance_volume_rank_sync(ex: dict, *, top_n: int = TOP_N_DEFAULT) -> dict:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return {"ok": False, "msg": "未配置 flask_url"}
params = {"top": str(int(top_n))}
url = f"{base}/api/hub/volume-rank?{urlencode(params)}"
try:
with httpx.Client(timeout=max(HUB_FLASK_TIMEOUT, 60.0)) as client:
r = client.get(url, headers=_hub_headers())
if r.status_code >= 400:
parsed = _parse_http_json_body(r)
parsed.setdefault("ok", False)
parsed.setdefault("status", r.status_code)
return parsed
data = r.json() if r.content else {}
return data if isinstance(data, dict) else {"ok": False, "msg": "无效 JSON"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def _get_volume_rank_cache() -> dict:
global _volume_rank_cache
if _volume_rank_cache is None:
_volume_rank_cache = load_volume_rank_cache()
return _volume_rank_cache
def _refresh_volume_ranks(*, force: bool = False) -> dict:
global _volume_rank_cache
expected = rank_date_label()
cache = _get_volume_rank_cache()
if not force and not cache_needs_refresh(cache, expected_rank_date=expected):
return {
"ok": True,
"skipped": True,
"rank_date": cache.get("rank_date"),
"updated_at": cache.get("updated_at"),
}
targets = enabled_exchanges(load_settings())
errors: list[str] = []
for ex in targets:
ex_key = str(ex.get("key") or "").strip().lower()
if not ex_key or not ex.get("enabled"):
continue
resp = _fetch_instance_volume_rank_sync(ex, top_n=TOP_N_DEFAULT)
if resp.get("ok"):
cache = merge_exchange_rank(cache, ex_key, resp)
else:
msg = str(resp.get("msg") or resp.get("error") or "拉取失败")
errors.append(f"{ex_key}:{msg}")
exchanges = dict(cache.get("exchanges") or {})
prev = dict(exchanges.get(ex_key) or {})
prev["error"] = msg
exchanges[ex_key] = prev
cache["exchanges"] = exchanges
cache["rank_date"] = expected
save_volume_rank_cache(cache)
_volume_rank_cache = cache
out: dict = {
"ok": True,
"rank_date": expected,
"exchanges": len(targets),
"updated_at": cache.get("updated_at"),
}
if errors:
out["errors"] = errors[:8]
return out
async def _volume_rank_loop() -> None:
global _volume_rank_stop
stop = _volume_rank_stop
if stop is None:
return
try:
await asyncio.to_thread(_refresh_volume_ranks, force=False)
except Exception:
pass
while not stop.is_set():
try:
wait_sec = seconds_until_next_reset()
await asyncio.wait_for(stop.wait(), timeout=wait_sec)
break
except asyncio.TimeoutError:
pass
if stop.is_set():
break
try:
await asyncio.to_thread(_refresh_volume_ranks, force=True)
except Exception:
pass
async def _archive_sync_loop() -> None:
global _archive_sync_stop
stop = _archive_sync_stop
@@ -340,11 +448,13 @@ async def _archive_sync_loop() -> None:
@asynccontextmanager
async def _hub_lifespan(_app: FastAPI):
global _archive_sync_stop, _archive_sync_task
global _archive_sync_stop, _archive_sync_task, _volume_rank_stop, _volume_rank_task
await board_store.start(_run_board_aggregate)
await chart_poll_store.start(_run_chart_poll)
_archive_sync_stop = asyncio.Event()
_archive_sync_task = asyncio.create_task(_archive_sync_loop(), name="hub-archive-sync")
_volume_rank_stop = asyncio.Event()
_volume_rank_task = asyncio.create_task(_volume_rank_loop(), name="hub-volume-rank")
try:
yield
finally:
@@ -358,6 +468,16 @@ async def _hub_lifespan(_app: FastAPI):
pass
_archive_sync_task = None
_archive_sync_stop = None
if _volume_rank_stop:
_volume_rank_stop.set()
if _volume_rank_task:
_volume_rank_task.cancel()
try:
await _volume_rank_task
except asyncio.CancelledError:
pass
_volume_rank_task = None
_volume_rank_stop = None
await chart_poll_store.stop()
await board_store.stop()
@@ -645,9 +765,67 @@ def api_chart_meta():
"chunk_limits": {tf: chart_chunk_limit(tf) for tf in tfs if tf in CHART_TIMEFRAMES},
"memory_caps": {tf: chart_memory_cap(tf) for tf in tfs if tf in CHART_TIMEFRAMES},
"exchanges": exchanges,
"volume_rank_top_n": TOP_N_DEFAULT,
"volume_rank_reset_hour": volume_rank_reset_hour(),
}
@app.get("/api/chart/volume-rank")
def api_chart_volume_rank(exchange_key: str = "", refresh: str = ""):
force = (refresh or "").strip().lower() in ("1", "true", "yes", "on")
if force:
result = _refresh_volume_ranks(force=True)
if not result.get("ok"):
raise HTTPException(status_code=502, detail=result.get("msg") or "刷新失败")
cache = _get_volume_rank_cache()
if cache_needs_refresh(cache):
_refresh_volume_ranks(force=False)
cache = _get_volume_rank_cache()
ex_k = (exchange_key or "").strip().lower()
if ex_k:
ex = _find_exchange_by_key(ex_k)
if not ex:
raise HTTPException(status_code=400, detail="交易所不存在")
payload = get_cached_rank(cache, ex_k, top_n=TOP_N_DEFAULT)
payload["items"] = [
{**row, "volume_label": format_volume_quote(row.get("volume_quote"))}
for row in payload.get("items") or []
]
payload["reset_hour"] = volume_rank_reset_hour()
err = ((cache.get("exchanges") or {}).get(ex_k) or {}).get("error")
if err and not payload.get("items"):
payload["ok"] = False
payload["msg"] = err
return payload
exchanges_out = {}
for ex in enabled_exchanges(load_settings()):
key = str(ex.get("key") or "").strip().lower()
if not key:
continue
row = get_cached_rank(cache, key, top_n=TOP_N_DEFAULT)
row["name"] = ex.get("name")
row["items"] = [
{**item, "volume_label": format_volume_quote(item.get("volume_quote"))}
for item in row.get("items") or []
]
exchanges_out[key] = row
return {
"ok": True,
"rank_date": cache.get("rank_date"),
"updated_at": cache.get("updated_at"),
"reset_hour": volume_rank_reset_hour(),
"exchanges": exchanges_out,
}
@app.post("/api/chart/volume-rank/refresh")
async def api_chart_volume_rank_refresh():
result = await asyncio.to_thread(_refresh_volume_ranks, force=True)
if not result.get("ok"):
raise HTTPException(status_code=502, detail=result.get("msg") or "刷新失败")
return result
@app.get("/api/chart/ohlcv")
def api_chart_ohlcv(
exchange_key: str = "",