import asyncio import logging from dataclasses import dataclass, asdict from datetime import datetime from .binance import binance_client from .config import settings from .periods import to_ms logger = logging.getLogger(__name__) @dataclass class SymbolStats: symbol: str quote_volume: float price_change_pct: float open_price: float last_price: float rank: int = 0 is_high_volume: bool = False is_high_change: bool = False def to_dict(self) -> dict: d = asdict(self) d["quote_volume_fmt"] = format_volume(self.quote_volume) d["price_change_pct_fmt"] = f"{self.price_change_pct:+.2f}%" return d def format_volume(vol: float) -> str: if vol >= 1e8: return f"{vol / 1e8:.2f}亿" if vol >= 1e4: return f"{vol / 1e4:.2f}万" return f"{vol:.0f}" def _aggregate_klines(klines: list, start_ms: int, end_ms: int) -> tuple[float, float, float]: quote_vol = 0.0 open_price = 0.0 last_price = 0.0 first = True for k in klines: open_time = int(k[0]) if open_time < start_ms or open_time >= end_ms: continue if first: open_price = float(k[1]) first = False last_price = float(k[4]) quote_vol += float(k[7]) return quote_vol, open_price, last_price async def _fetch_symbol_stats( symbol: str, start_ms: int, end_ms: int, prices: dict[str, float], sem: asyncio.Semaphore, ) -> SymbolStats | None: async with sem: try: klines = await binance_client.get_klines(symbol, start_ms, end_ms) quote_vol, open_price, last_price = _aggregate_klines(klines, start_ms, end_ms) if open_price <= 0 and last_price <= 0: return None if open_price <= 0: open_price = last_price if last_price <= 0: last_price = prices.get(symbol, open_price) if last_price <= 0: return None pct = ((last_price - open_price) / open_price) * 100 if open_price > 0 else 0.0 return SymbolStats( symbol=symbol, quote_volume=quote_vol, price_change_pct=pct, open_price=open_price, last_price=last_price, ) except Exception as e: logger.warning("Failed %s: %s", symbol, e) return None async def aggregate_period( start: datetime, end: datetime, use_live_prices: bool = False, ) -> list[dict]: symbols = await binance_client.get_usdt_perpetual_symbols() start_ms = to_ms(start) end_ms = to_ms(end) prices: dict[str, float] = {} if use_live_prices: try: prices = await binance_client.get_prices_batch(symbols) except Exception as e: logger.warning("Batch prices failed: %s", e) sem = asyncio.Semaphore(settings.max_concurrency) tasks = [ _fetch_symbol_stats(s, start_ms, end_ms, prices, sem) for s in symbols ] results = await asyncio.gather(*tasks) stats = [r for r in results if r is not None and r.quote_volume > 0] stats.sort(key=lambda x: x.quote_volume, reverse=True) top = stats[: settings.top_n] for i, s in enumerate(top, 1): s.rank = i s.is_high_volume = s.quote_volume >= settings.volume_threshold s.is_high_change = abs(s.price_change_pct) >= settings.change_threshold return [s.to_dict() for s in top] def enrich_snapshot_meta( items: list[dict], period_start: datetime, period_end: datetime, ) -> dict: return { "period_start": period_start.isoformat(), "period_end": period_end.isoformat(), "updated_at": datetime.now().isoformat(), "top_n": settings.top_n, "volume_threshold": settings.volume_threshold, "change_threshold": settings.change_threshold, "items": items, }