import logging from datetime import datetime from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from .aggregator import aggregate_period, enrich_snapshot_meta from .binance import binance_client from .config import settings from .db import get_latest_snapshot, init_db, log_push, save_snapshot, was_pushed_today from .exceptions import BinanceRateLimitedError from .periods import get_daybefore_period, get_today_period, get_yesterday_period, now_shanghai from .state import get_today_cache, set_today_cache from .funding_store import prefetch_funding from .kline_store import prefetch_symbols from .llm_service import run_interpretation_batch, schedule_interpret_background from .stats import compute_three_day_stats from .wecom import build_push_payload, send_wecom_markdown logger = logging.getLogger(__name__) scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") def _restore_today_from_db() -> bool: snap = get_latest_snapshot("today") if snap and snap.get("items"): set_today_cache( { "period_start": snap["period_start"], "period_end": snap["period_end"], "updated_at": snap["created_at"], "top_n": settings.top_n, "volume_threshold": settings.volume_threshold, "change_threshold": settings.change_threshold, "data_mode": "cached", "items": snap["items"], } ) return True return False async def _finalize_closed_period(period_type: str, start, end) -> list[dict] | None: items = await aggregate_period( start, end, use_live_prices=False, mode=settings.yesterday_data_mode ) save_snapshot(period_type, start, end, items) logger.info("%s snapshot saved: %s ~ %s, %d items", period_type, start, end, len(items)) return items async def job_finalize_yesterday() -> None: logger.info("Job: finalize yesterday & daybefore") if binance_client.is_rate_limited(): logger.warning( "Skip finalize — rate limited %ss", binance_client.rate_limit_remaining_sec(), ) return try: binance_client.clear_symbol_cache() start_db, end_db = get_daybefore_period() snap_db = get_latest_snapshot("daybefore") if not snap_db or snap_db.get("period_end") != end_db.isoformat(): items_db = await _finalize_closed_period("daybefore", start_db, end_db) if items_db: syms = [x["symbol"] for x in items_db if x.get("symbol")] if syms: await prefetch_symbols(syms) await prefetch_funding(syms) start_y, end_y = get_yesterday_period() items_y = await _finalize_closed_period("yesterday", start_y, end_y) if items_y: syms = [x["symbol"] for x in items_y if x.get("symbol")] if syms: await prefetch_symbols(syms) await prefetch_funding(syms) except BinanceRateLimitedError as e: logger.error("Finalize rate limited %ss", e.retry_after_sec) except Exception as e: logger.error("Finalize failed: %s", e) async def job_push_wecom() -> None: logger.info("Job: WeCom push (three-day intersection)") try: await job_refresh_today() except Exception as e: logger.warning("Pre-push today refresh failed: %s", e) start, end = get_yesterday_period() snapshot = get_latest_snapshot("yesterday") if not snapshot and not binance_client.is_rate_limited(): try: items = await aggregate_period( start, end, use_live_prices=False, mode=settings.yesterday_data_mode ) save_snapshot("yesterday", start, end, items) snapshot = get_latest_snapshot("yesterday") except BinanceRateLimitedError as e: logger.error("Push prep rate limited %ss", e.retry_after_sec) if not snapshot: logger.error("No yesterday snapshot for push") return ps, pe = snapshot["period_start"], snapshot["period_end"] if was_pushed_today(ps, pe): logger.info("Already pushed for period %s ~ %s", ps, pe) return payload = build_push_payload() if not payload.get("ok"): logger.warning("WeCom push skipped: %s", payload.get("message")) return ok, msg = await send_wecom_markdown(payload["markdown"]) log_push(ps, pe, ok, msg) if ok: logger.info("WeCom push succeeded") else: logger.error("WeCom push failed: %s", msg) async def job_refresh_today() -> None: logger.info("Job: refresh today (mode=%s)", settings.today_data_mode) if binance_client.is_rate_limited(): sec = binance_client.rate_limit_remaining_sec() logger.warning("Rate limited %ss — using DB/cache", sec) if _restore_today_from_db(): logger.info("Today restored from DB cache") return try: start, end = get_today_period() items = await aggregate_period( start, end, use_live_prices=True, mode=settings.today_data_mode, ) meta = enrich_snapshot_meta( items, start, end, data_mode=settings.today_data_mode ) save_snapshot("today", start, end, items) set_today_cache(meta) logger.info("Today cache refreshed: %d items", len(items)) syms = [x["symbol"] for x in items if x.get("symbol")] if syms: await prefetch_symbols(syms) await prefetch_funding(syms) except BinanceRateLimitedError as e: logger.error("Refresh today rate limited %ss — use cache", e.retry_after_sec) _restore_today_from_db() except Exception as e: logger.error("Refresh today failed: %s", e) _restore_today_from_db() async def job_llm_interpret() -> None: """08:05 对三日交集币种逐个大模型解读(每币间隔 3 分钟)。""" logger.info("Job: LLM interpret three-day intersection") if not settings.llm_api_key.strip(): logger.info("LLM_API_KEY not set, skip") return try: await run_interpretation_batch() except Exception as e: logger.error("LLM job failed: %s", e) async def startup_tasks() -> None: init_db() now = now_shanghai() start_y, end_y = get_yesterday_period(now) if binance_client.is_rate_limited(): logger.warning( "Startup: Binance rate limited ~%ss, skip API; use DB cache", binance_client.rate_limit_remaining_sec(), ) _restore_today_from_db() return start_db, end_db = get_daybefore_period(now) snap_db = get_latest_snapshot("daybefore") if not snap_db or snap_db.get("period_end") != end_db.isoformat(): try: logger.info("Startup: computing daybefore snapshot") await _finalize_closed_period("daybefore", start_db, end_db) except BinanceRateLimitedError as e: logger.error("Startup daybefore rate limited %ss", e.retry_after_sec) except Exception as e: logger.error("Startup daybefore failed: %s", e) snap = get_latest_snapshot("yesterday") if not snap or snap.get("period_end") != end_y.isoformat(): try: logger.info("Startup: computing yesterday snapshot") await _finalize_closed_period("yesterday", start_y, end_y) except BinanceRateLimitedError as e: logger.error("Startup yesterday rate limited %ss", e.retry_after_sec) except Exception as e: logger.error("Startup yesterday failed: %s", e) try: await job_refresh_today() except Exception as e: logger.error("Startup today refresh failed: %s", e) if now.hour > 8 or (now.hour == 8 and now.minute >= 10): ps, pe = start_y.isoformat(), end_y.isoformat() if not was_pushed_today(ps, pe) and settings.wecom_webhook_url.strip(): try: await job_push_wecom() except Exception as e: logger.error("Startup catch-up push failed: %s", e) if settings.llm_api_key.strip() and settings.llm_auto_on_startup: stats = compute_three_day_stats() if stats.get("ok") and stats.get("symbols"): logger.info("Startup: schedule one LLM interpret batch") schedule_interpret_background() def start_scheduler() -> None: scheduler.add_job( job_finalize_yesterday, CronTrigger(hour=8, minute=0, timezone="Asia/Shanghai"), id="finalize_yesterday", replace_existing=True, ) scheduler.add_job( job_push_wecom, CronTrigger(hour=8, minute=10, timezone="Asia/Shanghai"), id="push_wecom", replace_existing=True, ) refresh_hours = max(1, settings.refresh_minutes // 60) scheduler.add_job( job_refresh_today, CronTrigger(hour=f"*/{refresh_hours}", minute=0, timezone="Asia/Shanghai"), id="refresh_today", replace_existing=True, ) scheduler.add_job( job_llm_interpret, CronTrigger(hour=8, minute=5, timezone="Asia/Shanghai"), id="llm_interpret", replace_existing=True, ) if not scheduler.running: scheduler.start() logger.info( "Scheduler started (today every %dh, LLM 08:05, interval %ds)", refresh_hours, settings.llm_symbol_interval_sec, ) def stop_scheduler() -> None: if scheduler.running: scheduler.shutdown(wait=False)