Files
Binance_Altcoin_Monitor/backend/app/period_api.py
T
2026-05-22 14:05:18 +08:00

60 lines
2.0 KiB
Python

"""各周期 Top30 API 共用逻辑。"""
import logging
from collections.abc import Callable
from datetime import datetime
from .aggregator import aggregate_period, enrich_snapshot_meta
from .config import settings
from .db import get_latest_snapshot, save_snapshot
from .funding_store import enrich_items_with_funding
logger = logging.getLogger(__name__)
async def get_period_top30(
period_type: str,
period_getter: Callable[[], tuple[datetime, datetime]],
*,
use_live_prices: bool = False,
data_mode: str | None = None,
auto_save: bool = True,
) -> dict:
start, end = period_getter()
expected_end = end.isoformat()
snap = get_latest_snapshot(period_type)
if snap and snap.get("period_end") == expected_end:
items = await enrich_items_with_funding(snap["items"])
return {
"period_type": period_type,
"period_start": snap["period_start"],
"period_end": snap["period_end"],
"updated_at": snap["created_at"],
"top_n": settings.top_n,
"volume_threshold": settings.volume_threshold,
"change_threshold": settings.change_threshold,
"data_mode": data_mode or settings.yesterday_data_mode,
"items": items,
}
mode = data_mode or (
settings.today_data_mode if use_live_prices else settings.yesterday_data_mode
)
try:
items = await aggregate_period(
start, end, use_live_prices=use_live_prices, mode=mode
)
if auto_save and items:
save_snapshot(period_type, start, end, items)
items = await enrich_items_with_funding(items)
meta = enrich_snapshot_meta(items, start, end, data_mode=mode)
meta["period_type"] = period_type
return meta
except Exception as e:
logger.error("period %s failed: %s", period_type, e)
meta = enrich_snapshot_meta([], start, end)
meta["period_type"] = period_type
meta["error"] = "数据暂不可用,请检查网络或稍后重试"
return meta