Files
Binance_Altcoin_Monitor/backend/app/aggregator.py
T
2026-05-22 14:00:19 +08:00

215 lines
6.9 KiB
Python

import asyncio
import logging
from dataclasses import dataclass, asdict
from datetime import datetime
from .binance import binance_client
from .config import settings
from .exceptions import BinanceRateLimitedError
from .funding_store import enrich_items_with_funding
from .periods import to_ms
logger = logging.getLogger(__name__)
@dataclass
class SymbolStats:
symbol: str
quote_volume: float
price_change_pct: float
open_price: float
last_price: float
rank: int = 0
is_high_volume: bool = False
is_high_change: bool = False
data_source: str = "klines"
def to_dict(self) -> dict:
d = asdict(self)
d["quote_volume_fmt"] = format_volume(self.quote_volume)
d["price_change_pct_fmt"] = f"{self.price_change_pct:+.2f}%"
return d
def format_volume(vol: float) -> str:
if vol >= 1e8:
return f"{vol / 1e8:.2f}亿"
if vol >= 1e4:
return f"{vol / 1e4:.2f}"
return f"{vol:.0f}"
async def _finalize_top(stats: list[SymbolStats]) -> list[dict]:
stats.sort(key=lambda x: x.quote_volume, reverse=True)
top = stats[: settings.top_n]
for i, s in enumerate(top, 1):
s.rank = i
s.is_high_volume = s.quote_volume >= settings.volume_threshold
s.is_high_change = abs(s.price_change_pct) >= settings.change_threshold
items = [s.to_dict() for s in top]
return await enrich_items_with_funding(items)
def _aggregate_klines(klines: list, start_ms: int, end_ms: int) -> tuple[float, float, float]:
quote_vol = 0.0
open_price = 0.0
last_price = 0.0
first = True
for k in klines:
open_time = int(k[0])
if open_time < start_ms or open_time >= end_ms:
continue
if first:
open_price = float(k[1])
first = False
last_price = float(k[4])
quote_vol += float(k[7])
return quote_vol, open_price, last_price
async def aggregate_from_ticker24hr() -> list[dict]:
"""仅 1 次 API 请求,使用滚动 24h 数据(今日刷新推荐)。"""
tickers = await binance_client.get_24hr_tickers()
stats: list[SymbolStats] = []
for t in tickers:
sym = t.get("symbol", "")
if not sym.endswith("USDT"):
continue
vol = float(t.get("quoteVolume", 0) or 0)
if vol <= 0:
continue
stats.append(
SymbolStats(
symbol=sym,
quote_volume=vol,
price_change_pct=float(t.get("priceChangePercent", 0) or 0),
open_price=float(t.get("openPrice", 0) or 0),
last_price=float(t.get("lastPrice", 0) or 0),
data_source="ticker24h",
)
)
logger.info("ticker24h mode: %d symbols, 1 API call", len(stats))
return await _finalize_top(stats)
async def _fetch_symbol_stats(
symbol: str,
start_ms: int,
end_ms: int,
prices: dict[str, float],
sem: asyncio.Semaphore,
) -> SymbolStats | None:
async with sem:
try:
klines = await binance_client.get_klines(symbol, start_ms, end_ms)
quote_vol, open_price, last_price = _aggregate_klines(klines, start_ms, end_ms)
if open_price <= 0 and last_price <= 0:
return None
if open_price <= 0:
open_price = last_price
if last_price <= 0:
last_price = prices.get(symbol, open_price)
if last_price <= 0:
return None
pct = ((last_price - open_price) / open_price) * 100 if open_price > 0 else 0.0
return SymbolStats(
symbol=symbol,
quote_volume=quote_vol,
price_change_pct=pct,
open_price=open_price,
last_price=last_price,
data_source="klines",
)
except BinanceRateLimitedError:
raise
except Exception as e:
logger.warning("Failed %s: %s", symbol, e)
return None
async def _pick_candidate_symbols(symbols: list[str]) -> list[str]:
try:
tickers = await binance_client.get_24hr_tickers()
vol_map = {
t["symbol"]: float(t.get("quoteVolume", 0) or 0)
for t in tickers
if t.get("symbol") in symbols
}
ranked = sorted(symbols, key=lambda s: vol_map.get(s, 0.0), reverse=True)
pool = min(settings.candidate_pool, len(ranked))
picked = ranked[:pool]
logger.info("Candidate pool: %d / %d symbols", len(picked), len(symbols))
return picked
except BinanceRateLimitedError:
raise
except Exception as e:
logger.warning("24hr prescreen failed: %s", e)
return symbols[: settings.candidate_pool]
async def aggregate_period_klines(
start: datetime,
end: datetime,
use_live_prices: bool = False,
) -> list[dict]:
symbols = await binance_client.get_usdt_perpetual_symbols()
candidates = await _pick_candidate_symbols(symbols)
start_ms = to_ms(start)
end_ms = to_ms(end)
prices: dict[str, float] = {}
if use_live_prices:
try:
prices = await binance_client.get_prices_batch(symbols)
except BinanceRateLimitedError:
raise
except Exception as e:
logger.warning("Batch prices failed: %s", e)
sem = asyncio.Semaphore(settings.max_concurrency)
tasks = [
_fetch_symbol_stats(s, start_ms, end_ms, prices, sem) for s in candidates
]
logger.info(
"klines mode: %s ~ %s, %d symbols, concurrency=%d",
start.isoformat(),
end.isoformat(),
len(candidates),
settings.max_concurrency,
)
results = await asyncio.gather(*tasks)
stats = [r for r in results if r is not None and r.quote_volume > 0]
return await _finalize_top(stats)
async def aggregate_period(
start: datetime,
end: datetime,
use_live_prices: bool = False,
mode: str | None = None,
) -> list[dict]:
mode = mode or (
settings.today_data_mode if use_live_prices else settings.yesterday_data_mode
)
if mode == "ticker24h":
return await aggregate_from_ticker24hr()
return await aggregate_period_klines(start, end, use_live_prices)
def enrich_snapshot_meta(
items: list[dict],
period_start: datetime,
period_end: datetime,
data_mode: str = "",
) -> dict:
return {
"period_start": period_start.isoformat(),
"period_end": period_end.isoformat(),
"updated_at": datetime.now().isoformat(),
"top_n": settings.top_n,
"volume_threshold": settings.volume_threshold,
"change_threshold": settings.change_threshold,
"data_mode": data_mode,
"items": items,
}