import asyncio import logging from dataclasses import dataclass, asdict from datetime import datetime from .binance import binance_client from .config import settings from .exceptions import BinanceRateLimitedError from .periods import to_ms logger = logging.getLogger(__name__) @dataclass class SymbolStats: symbol: str quote_volume: float price_change_pct: float open_price: float last_price: float rank: int = 0 is_high_volume: bool = False is_high_change: bool = False data_source: str = "klines" def to_dict(self) -> dict: d = asdict(self) d["quote_volume_fmt"] = format_volume(self.quote_volume) d["price_change_pct_fmt"] = f"{self.price_change_pct:+.2f}%" return d def format_volume(vol: float) -> str: if vol >= 1e8: return f"{vol / 1e8:.2f}亿" if vol >= 1e4: return f"{vol / 1e4:.2f}万" return f"{vol:.0f}" def _finalize_top(stats: list[SymbolStats]) -> list[dict]: stats.sort(key=lambda x: x.quote_volume, reverse=True) top = stats[: settings.top_n] for i, s in enumerate(top, 1): s.rank = i s.is_high_volume = s.quote_volume >= settings.volume_threshold s.is_high_change = abs(s.price_change_pct) >= settings.change_threshold return [s.to_dict() for s in top] def _aggregate_klines(klines: list, start_ms: int, end_ms: int) -> tuple[float, float, float]: quote_vol = 0.0 open_price = 0.0 last_price = 0.0 first = True for k in klines: open_time = int(k[0]) if open_time < start_ms or open_time >= end_ms: continue if first: open_price = float(k[1]) first = False last_price = float(k[4]) quote_vol += float(k[7]) return quote_vol, open_price, last_price async def aggregate_from_ticker24hr() -> list[dict]: """仅 1 次 API 请求,使用滚动 24h 数据(今日刷新推荐)。""" tickers = await binance_client.get_24hr_tickers() stats: list[SymbolStats] = [] for t in tickers: sym = t.get("symbol", "") if not sym.endswith("USDT"): continue vol = float(t.get("quoteVolume", 0) or 0) if vol <= 0: continue stats.append( SymbolStats( symbol=sym, quote_volume=vol, price_change_pct=float(t.get("priceChangePercent", 0) or 0), open_price=float(t.get("openPrice", 0) or 0), last_price=float(t.get("lastPrice", 0) or 0), data_source="ticker24h", ) ) logger.info("ticker24h mode: %d symbols, 1 API call", len(stats)) return _finalize_top(stats) async def _fetch_symbol_stats( symbol: str, start_ms: int, end_ms: int, prices: dict[str, float], sem: asyncio.Semaphore, ) -> SymbolStats | None: async with sem: try: klines = await binance_client.get_klines(symbol, start_ms, end_ms) quote_vol, open_price, last_price = _aggregate_klines(klines, start_ms, end_ms) if open_price <= 0 and last_price <= 0: return None if open_price <= 0: open_price = last_price if last_price <= 0: last_price = prices.get(symbol, open_price) if last_price <= 0: return None pct = ((last_price - open_price) / open_price) * 100 if open_price > 0 else 0.0 return SymbolStats( symbol=symbol, quote_volume=quote_vol, price_change_pct=pct, open_price=open_price, last_price=last_price, data_source="klines", ) except BinanceRateLimitedError: raise except Exception as e: logger.warning("Failed %s: %s", symbol, e) return None async def _pick_candidate_symbols(symbols: list[str]) -> list[str]: try: tickers = await binance_client.get_24hr_tickers() vol_map = { t["symbol"]: float(t.get("quoteVolume", 0) or 0) for t in tickers if t.get("symbol") in symbols } ranked = sorted(symbols, key=lambda s: vol_map.get(s, 0.0), reverse=True) pool = min(settings.candidate_pool, len(ranked)) picked = ranked[:pool] logger.info("Candidate pool: %d / %d symbols", len(picked), len(symbols)) return picked except BinanceRateLimitedError: raise except Exception as e: logger.warning("24hr prescreen failed: %s", e) return symbols[: settings.candidate_pool] async def aggregate_period_klines( start: datetime, end: datetime, use_live_prices: bool = False, ) -> list[dict]: symbols = await binance_client.get_usdt_perpetual_symbols() candidates = await _pick_candidate_symbols(symbols) start_ms = to_ms(start) end_ms = to_ms(end) prices: dict[str, float] = {} if use_live_prices: try: prices = await binance_client.get_prices_batch(symbols) except BinanceRateLimitedError: raise except Exception as e: logger.warning("Batch prices failed: %s", e) sem = asyncio.Semaphore(settings.max_concurrency) tasks = [ _fetch_symbol_stats(s, start_ms, end_ms, prices, sem) for s in candidates ] logger.info( "klines mode: %s ~ %s, %d symbols, concurrency=%d", start.isoformat(), end.isoformat(), len(candidates), settings.max_concurrency, ) results = await asyncio.gather(*tasks) stats = [r for r in results if r is not None and r.quote_volume > 0] return _finalize_top(stats) async def aggregate_period( start: datetime, end: datetime, use_live_prices: bool = False, mode: str | None = None, ) -> list[dict]: mode = mode or ( settings.today_data_mode if use_live_prices else settings.yesterday_data_mode ) if mode == "ticker24h": return await aggregate_from_ticker24hr() return await aggregate_period_klines(start, end, use_live_prices) def enrich_snapshot_meta( items: list[dict], period_start: datetime, period_end: datetime, data_mode: str = "", ) -> dict: return { "period_start": period_start.isoformat(), "period_end": period_end.isoformat(), "updated_at": datetime.now().isoformat(), "top_n": settings.top_n, "volume_threshold": settings.volume_threshold, "change_threshold": settings.change_threshold, "data_mode": data_mode, "items": items, }