Files
Binance_Altcoin_Monitor/backend/app/scheduler.py
T
2026-05-22 13:06:42 +08:00

133 lines
4.7 KiB
Python

import asyncio
import logging
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from .aggregator import aggregate_period, enrich_snapshot_meta
from .binance import binance_client
from .config import settings
from .db import get_latest_snapshot, init_db, log_push, save_snapshot, was_pushed_today
from .periods import get_today_period, get_yesterday_period, now_shanghai
from .state import set_today_cache
from .wecom import build_markdown, send_wecom_markdown
logger = logging.getLogger(__name__)
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
async def job_finalize_yesterday() -> None:
"""08:00 — compute and persist the closed yesterday period."""
logger.info("Job: finalize yesterday period")
try:
binance_client.clear_symbol_cache()
start, end = get_yesterday_period()
items = await aggregate_period(start, end, use_live_prices=False)
save_snapshot("yesterday", start, end, items)
logger.info("Yesterday snapshot saved: %s ~ %s, %d items", start, end, len(items))
except Exception as e:
logger.error("Finalize yesterday failed: %s", e)
async def job_push_wecom() -> None:
"""08:10 — push yesterday Top30 to WeCom."""
logger.info("Job: WeCom push")
start, end = get_yesterday_period()
snapshot = get_latest_snapshot("yesterday")
if not snapshot:
logger.info("No yesterday snapshot, computing now")
items = await aggregate_period(start, end, use_live_prices=False)
save_snapshot("yesterday", start, end, items)
snapshot = get_latest_snapshot("yesterday")
if not snapshot:
logger.error("Failed to get yesterday snapshot for push")
return
ps, pe = snapshot["period_start"], snapshot["period_end"]
if was_pushed_today(ps, pe):
logger.info("Already pushed for period %s ~ %s", ps, pe)
return
content = build_markdown(snapshot)
ok, msg = await send_wecom_markdown(content)
log_push(ps, pe, ok, msg)
if ok:
logger.info("WeCom push succeeded")
else:
logger.error("WeCom push failed: %s", msg)
async def job_refresh_today() -> None:
"""Refresh today period cache."""
logger.info("Job: refresh today")
try:
start, end = get_today_period()
items = await aggregate_period(start, end, use_live_prices=True)
meta = enrich_snapshot_meta(items, start, end)
save_snapshot("today", start, end, items)
set_today_cache(meta)
logger.info("Today cache refreshed: %d items", len(items))
except Exception as e:
logger.error("Refresh today failed: %s", e)
async def startup_tasks() -> None:
init_db()
now = now_shanghai()
start_y, end_y = get_yesterday_period(now)
snap = get_latest_snapshot("yesterday")
if not snap or snap.get("period_end") != end_y.isoformat():
try:
logger.info("Startup: computing yesterday snapshot")
items = await aggregate_period(start_y, end_y, use_live_prices=False)
save_snapshot("yesterday", start_y, end_y, items)
except Exception as e:
logger.error("Startup yesterday snapshot failed (will retry on schedule): %s", e)
try:
await job_refresh_today()
except Exception as e:
logger.error("Startup today refresh failed (will retry on schedule): %s", e)
if now.hour > 8 or (now.hour == 8 and now.minute >= 10):
ps, pe = start_y.isoformat(), end_y.isoformat()
if not was_pushed_today(ps, pe) and settings.wecom_webhook_url.strip():
try:
logger.info("Startup: catch-up WeCom push")
await job_push_wecom()
except Exception as e:
logger.error("Startup catch-up push failed: %s", e)
def start_scheduler() -> None:
scheduler.add_job(
job_finalize_yesterday,
CronTrigger(hour=8, minute=0, timezone="Asia/Shanghai"),
id="finalize_yesterday",
replace_existing=True,
)
scheduler.add_job(
job_push_wecom,
CronTrigger(hour=8, minute=10, timezone="Asia/Shanghai"),
id="push_wecom",
replace_existing=True,
)
scheduler.add_job(
job_refresh_today,
CronTrigger(minute=f"*/{settings.refresh_minutes}", timezone="Asia/Shanghai"),
id="refresh_today",
replace_existing=True,
)
if not scheduler.running:
scheduler.start()
logger.info("Scheduler started (refresh every %d min)", settings.refresh_minutes)
def stop_scheduler() -> None:
if scheduler.running:
scheduler.shutdown(wait=False)