"""各周期 Top30 API 共用逻辑。""" import logging from collections.abc import Callable from datetime import datetime from .aggregator import aggregate_period, enrich_snapshot_meta from .config import settings from .db import get_latest_snapshot, save_snapshot from .funding_store import enrich_items_with_funding logger = logging.getLogger(__name__) async def get_period_top30( period_type: str, period_getter: Callable[[], tuple[datetime, datetime]], *, use_live_prices: bool = False, data_mode: str | None = None, auto_save: bool = True, ) -> dict: start, end = period_getter() expected_end = end.isoformat() snap = get_latest_snapshot(period_type) if snap and snap.get("period_end") == expected_end: items = await enrich_items_with_funding(snap["items"]) return { "period_type": period_type, "period_start": snap["period_start"], "period_end": snap["period_end"], "updated_at": snap["created_at"], "top_n": settings.top_n, "volume_threshold": settings.volume_threshold, "change_threshold": settings.change_threshold, "data_mode": data_mode or settings.yesterday_data_mode, "items": items, } mode = data_mode or ( settings.today_data_mode if use_live_prices else settings.yesterday_data_mode ) try: items = await aggregate_period( start, end, use_live_prices=use_live_prices, mode=mode ) if auto_save and items: save_snapshot(period_type, start, end, items) items = await enrich_items_with_funding(items) meta = enrich_snapshot_meta(items, start, end, data_mode=mode) meta["period_type"] = period_type return meta except Exception as e: logger.error("period %s failed: %s", period_type, e) meta = enrich_snapshot_meta([], start, end) meta["period_type"] = period_type meta["error"] = "数据暂不可用,请检查网络或稍后重试" return meta