import logging from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from .aggregator import aggregate_period, enrich_snapshot_meta from .binance import binance_client from .config import settings from .db import get_latest_snapshot, init_db, log_push, save_snapshot, was_pushed_today from .exceptions import BinanceRateLimitedError from .periods import get_today_period, get_yesterday_period, now_shanghai from .state import get_today_cache, set_today_cache from .kline_store import prefetch_symbols from .wecom import build_markdown, send_wecom_markdown logger = logging.getLogger(__name__) scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") def _restore_today_from_db() -> bool: snap = get_latest_snapshot("today") if snap and snap.get("items"): set_today_cache( { "period_start": snap["period_start"], "period_end": snap["period_end"], "updated_at": snap["created_at"], "top_n": settings.top_n, "volume_threshold": settings.volume_threshold, "change_threshold": settings.change_threshold, "data_mode": "cached", "items": snap["items"], } ) return True return False async def job_finalize_yesterday() -> None: logger.info("Job: finalize yesterday period") if binance_client.is_rate_limited(): logger.warning( "Skip yesterday job — rate limited %ss", binance_client.rate_limit_remaining_sec(), ) return try: binance_client.clear_symbol_cache() start, end = get_yesterday_period() items = await aggregate_period( start, end, use_live_prices=False, mode=settings.yesterday_data_mode ) save_snapshot("yesterday", start, end, items) logger.info("Yesterday snapshot saved: %d items", len(items)) syms = [x["symbol"] for x in items if x.get("symbol")] if syms: await prefetch_symbols(syms) except BinanceRateLimitedError as e: logger.error("Finalize yesterday rate limited %ss", e.retry_after_sec) except Exception as e: logger.error("Finalize yesterday failed: %s", e) async def job_push_wecom() -> None: logger.info("Job: WeCom push") start, end = get_yesterday_period() snapshot = get_latest_snapshot("yesterday") if not snapshot and not binance_client.is_rate_limited(): try: items = await aggregate_period( start, end, use_live_prices=False, mode=settings.yesterday_data_mode ) save_snapshot("yesterday", start, end, items) snapshot = get_latest_snapshot("yesterday") except BinanceRateLimitedError as e: logger.error("Push prep rate limited %ss", e.retry_after_sec) if not snapshot: logger.error("No yesterday snapshot for push") return ps, pe = snapshot["period_start"], snapshot["period_end"] if was_pushed_today(ps, pe): logger.info("Already pushed for period %s ~ %s", ps, pe) return content = build_markdown(snapshot) ok, msg = await send_wecom_markdown(content) log_push(ps, pe, ok, msg) if ok: logger.info("WeCom push succeeded") else: logger.error("WeCom push failed: %s", msg) async def job_refresh_today() -> None: logger.info("Job: refresh today (mode=%s)", settings.today_data_mode) if binance_client.is_rate_limited(): sec = binance_client.rate_limit_remaining_sec() logger.warning("Rate limited %ss — using DB/cache", sec) if _restore_today_from_db(): logger.info("Today restored from DB cache") return try: start, end = get_today_period() items = await aggregate_period( start, end, use_live_prices=True, mode=settings.today_data_mode, ) meta = enrich_snapshot_meta( items, start, end, data_mode=settings.today_data_mode ) save_snapshot("today", start, end, items) set_today_cache(meta) logger.info("Today cache refreshed: %d items", len(items)) syms = [x["symbol"] for x in items if x.get("symbol")] if syms: await prefetch_symbols(syms) except BinanceRateLimitedError as e: logger.error("Refresh today rate limited %ss — use cache", e.retry_after_sec) _restore_today_from_db() except Exception as e: logger.error("Refresh today failed: %s", e) _restore_today_from_db() async def startup_tasks() -> None: init_db() now = now_shanghai() start_y, end_y = get_yesterday_period(now) if binance_client.is_rate_limited(): logger.warning( "Startup: Binance rate limited ~%ss, skip API; use DB cache", binance_client.rate_limit_remaining_sec(), ) _restore_today_from_db() return snap = get_latest_snapshot("yesterday") if not snap or snap.get("period_end") != end_y.isoformat(): try: logger.info("Startup: computing yesterday snapshot") items = await aggregate_period( start_y, end_y, use_live_prices=False, mode=settings.yesterday_data_mode ) save_snapshot("yesterday", start_y, end_y, items) except BinanceRateLimitedError as e: logger.error("Startup yesterday rate limited %ss", e.retry_after_sec) except Exception as e: logger.error("Startup yesterday failed: %s", e) try: await job_refresh_today() except Exception as e: logger.error("Startup today refresh failed: %s", e) if now.hour > 8 or (now.hour == 8 and now.minute >= 10): ps, pe = start_y.isoformat(), end_y.isoformat() if not was_pushed_today(ps, pe) and settings.wecom_webhook_url.strip(): try: await job_push_wecom() except Exception as e: logger.error("Startup catch-up push failed: %s", e) def start_scheduler() -> None: scheduler.add_job( job_finalize_yesterday, CronTrigger(hour=8, minute=0, timezone="Asia/Shanghai"), id="finalize_yesterday", replace_existing=True, ) scheduler.add_job( job_push_wecom, CronTrigger(hour=8, minute=10, timezone="Asia/Shanghai"), id="push_wecom", replace_existing=True, ) scheduler.add_job( job_refresh_today, CronTrigger(minute=f"*/{settings.refresh_minutes}", timezone="Asia/Shanghai"), id="refresh_today", replace_existing=True, ) if not scheduler.running: scheduler.start() logger.info( "Scheduler started (today=%s, yesterday=%s, every %d min)", settings.today_data_mode, settings.yesterday_data_mode, settings.refresh_minutes, ) def stop_scheduler() -> None: if scheduler.running: scheduler.shutdown(wait=False)