232 lines
8.6 KiB
Python
232 lines
8.6 KiB
Python
import logging
|
|
from datetime import datetime
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
|
|
from .aggregator import aggregate_period, enrich_snapshot_meta
|
|
from .binance import binance_client
|
|
from .config import settings
|
|
from .db import get_latest_snapshot, init_db, log_push, save_snapshot, was_pushed_today
|
|
from .exceptions import BinanceRateLimitedError
|
|
from .periods import get_daybefore_period, get_today_period, get_yesterday_period, now_shanghai
|
|
from .state import get_today_cache, set_today_cache
|
|
from .funding_store import prefetch_funding
|
|
from .kline_store import prefetch_symbols
|
|
from .wecom import build_push_payload, send_push_payload
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
scheduler = AsyncIOScheduler(timezone="Asia/Shanghai")
|
|
|
|
|
|
def _restore_today_from_db() -> bool:
|
|
snap = get_latest_snapshot("today")
|
|
if snap and snap.get("items"):
|
|
set_today_cache(
|
|
{
|
|
"period_start": snap["period_start"],
|
|
"period_end": snap["period_end"],
|
|
"updated_at": snap["created_at"],
|
|
"top_n": settings.top_n,
|
|
"volume_threshold": settings.volume_threshold,
|
|
"change_threshold": settings.change_threshold,
|
|
"data_mode": "cached",
|
|
"items": snap["items"],
|
|
}
|
|
)
|
|
return True
|
|
return False
|
|
|
|
|
|
async def _finalize_closed_period(period_type: str, start, end) -> list[dict] | None:
|
|
items = await aggregate_period(
|
|
start, end, use_live_prices=False, mode=settings.yesterday_data_mode
|
|
)
|
|
save_snapshot(period_type, start, end, items)
|
|
logger.info("%s snapshot saved: %s ~ %s, %d items", period_type, start, end, len(items))
|
|
return items
|
|
|
|
|
|
async def job_finalize_yesterday() -> None:
|
|
logger.info("Job: finalize yesterday & daybefore")
|
|
if binance_client.is_rate_limited():
|
|
logger.warning(
|
|
"Skip finalize — rate limited %ss",
|
|
binance_client.rate_limit_remaining_sec(),
|
|
)
|
|
return
|
|
try:
|
|
binance_client.clear_symbol_cache()
|
|
start_db, end_db = get_daybefore_period()
|
|
snap_db = get_latest_snapshot("daybefore")
|
|
if not snap_db or snap_db.get("period_end") != end_db.isoformat():
|
|
items_db = await _finalize_closed_period("daybefore", start_db, end_db)
|
|
if items_db:
|
|
syms = [x["symbol"] for x in items_db if x.get("symbol")]
|
|
if syms:
|
|
await prefetch_symbols(syms)
|
|
await prefetch_funding(syms)
|
|
|
|
start_y, end_y = get_yesterday_period()
|
|
items_y = await _finalize_closed_period("yesterday", start_y, end_y)
|
|
if items_y:
|
|
syms = [x["symbol"] for x in items_y if x.get("symbol")]
|
|
if syms:
|
|
await prefetch_symbols(syms)
|
|
await prefetch_funding(syms)
|
|
except BinanceRateLimitedError as e:
|
|
logger.error("Finalize rate limited %ss", e.retry_after_sec)
|
|
except Exception as e:
|
|
logger.error("Finalize failed: %s", e)
|
|
|
|
|
|
async def job_push_wecom() -> None:
|
|
logger.info("Job: WeCom push (three-day intersection)")
|
|
try:
|
|
await job_refresh_today()
|
|
except Exception as e:
|
|
logger.warning("Pre-push today refresh failed: %s", e)
|
|
start, end = get_yesterday_period()
|
|
snapshot = get_latest_snapshot("yesterday")
|
|
if not snapshot and not binance_client.is_rate_limited():
|
|
try:
|
|
items = await aggregate_period(
|
|
start, end, use_live_prices=False, mode=settings.yesterday_data_mode
|
|
)
|
|
save_snapshot("yesterday", start, end, items)
|
|
snapshot = get_latest_snapshot("yesterday")
|
|
except BinanceRateLimitedError as e:
|
|
logger.error("Push prep rate limited %ss", e.retry_after_sec)
|
|
|
|
if not snapshot:
|
|
logger.error("No yesterday snapshot for push")
|
|
return
|
|
|
|
ps, pe = snapshot["period_start"], snapshot["period_end"]
|
|
if was_pushed_today(ps, pe):
|
|
logger.info("Already pushed for period %s ~ %s", ps, pe)
|
|
return
|
|
|
|
payload = build_push_payload()
|
|
if not payload.get("ok"):
|
|
logger.warning("WeCom push skipped: %s", payload.get("message"))
|
|
return
|
|
ok, msg = await send_push_payload(payload)
|
|
log_push(ps, pe, ok, msg)
|
|
if ok:
|
|
logger.info("WeCom push succeeded")
|
|
else:
|
|
logger.error("WeCom push failed: %s", msg)
|
|
|
|
|
|
async def job_refresh_today() -> None:
|
|
logger.info("Job: refresh today (mode=%s)", settings.today_data_mode)
|
|
if binance_client.is_rate_limited():
|
|
sec = binance_client.rate_limit_remaining_sec()
|
|
logger.warning("Rate limited %ss — using DB/cache", sec)
|
|
if _restore_today_from_db():
|
|
logger.info("Today restored from DB cache")
|
|
return
|
|
try:
|
|
start, end = get_today_period()
|
|
items = await aggregate_period(
|
|
start,
|
|
end,
|
|
use_live_prices=True,
|
|
mode=settings.today_data_mode,
|
|
)
|
|
meta = enrich_snapshot_meta(
|
|
items, start, end, data_mode=settings.today_data_mode
|
|
)
|
|
save_snapshot("today", start, end, items)
|
|
set_today_cache(meta)
|
|
logger.info("Today cache refreshed: %d items", len(items))
|
|
syms = [x["symbol"] for x in items if x.get("symbol")]
|
|
if syms:
|
|
await prefetch_symbols(syms)
|
|
await prefetch_funding(syms)
|
|
except BinanceRateLimitedError as e:
|
|
logger.error("Refresh today rate limited %ss — use cache", e.retry_after_sec)
|
|
_restore_today_from_db()
|
|
except Exception as e:
|
|
logger.error("Refresh today failed: %s", e)
|
|
_restore_today_from_db()
|
|
|
|
|
|
async def startup_tasks() -> None:
|
|
init_db()
|
|
now = now_shanghai()
|
|
start_y, end_y = get_yesterday_period(now)
|
|
|
|
if binance_client.is_rate_limited():
|
|
logger.warning(
|
|
"Startup: Binance rate limited ~%ss, skip API; use DB cache",
|
|
binance_client.rate_limit_remaining_sec(),
|
|
)
|
|
_restore_today_from_db()
|
|
return
|
|
|
|
start_db, end_db = get_daybefore_period(now)
|
|
snap_db = get_latest_snapshot("daybefore")
|
|
if not snap_db or snap_db.get("period_end") != end_db.isoformat():
|
|
try:
|
|
logger.info("Startup: computing daybefore snapshot")
|
|
await _finalize_closed_period("daybefore", start_db, end_db)
|
|
except BinanceRateLimitedError as e:
|
|
logger.error("Startup daybefore rate limited %ss", e.retry_after_sec)
|
|
except Exception as e:
|
|
logger.error("Startup daybefore failed: %s", e)
|
|
|
|
snap = get_latest_snapshot("yesterday")
|
|
if not snap or snap.get("period_end") != end_y.isoformat():
|
|
try:
|
|
logger.info("Startup: computing yesterday snapshot")
|
|
await _finalize_closed_period("yesterday", start_y, end_y)
|
|
except BinanceRateLimitedError as e:
|
|
logger.error("Startup yesterday rate limited %ss", e.retry_after_sec)
|
|
except Exception as e:
|
|
logger.error("Startup yesterday failed: %s", e)
|
|
|
|
try:
|
|
await job_refresh_today()
|
|
except Exception as e:
|
|
logger.error("Startup today refresh failed: %s", e)
|
|
|
|
if now.hour > 8 or (now.hour == 8 and now.minute >= 10):
|
|
ps, pe = start_y.isoformat(), end_y.isoformat()
|
|
if not was_pushed_today(ps, pe) and settings.wecom_webhook_url.strip():
|
|
try:
|
|
await job_push_wecom()
|
|
except Exception as e:
|
|
logger.error("Startup catch-up push failed: %s", e)
|
|
|
|
def start_scheduler() -> None:
|
|
scheduler.add_job(
|
|
job_finalize_yesterday,
|
|
CronTrigger(hour=8, minute=0, timezone="Asia/Shanghai"),
|
|
id="finalize_yesterday",
|
|
replace_existing=True,
|
|
)
|
|
scheduler.add_job(
|
|
job_push_wecom,
|
|
CronTrigger(hour=8, minute=10, timezone="Asia/Shanghai"),
|
|
id="push_wecom",
|
|
replace_existing=True,
|
|
)
|
|
refresh_hours = max(1, settings.refresh_minutes // 60)
|
|
scheduler.add_job(
|
|
job_refresh_today,
|
|
CronTrigger(hour=f"*/{refresh_hours}", minute=0, timezone="Asia/Shanghai"),
|
|
id="refresh_today",
|
|
replace_existing=True,
|
|
)
|
|
if not scheduler.running:
|
|
scheduler.start()
|
|
logger.info("Scheduler started (today every %dh)", refresh_hours)
|
|
|
|
|
|
def stop_scheduler() -> None:
|
|
if scheduler.running:
|
|
scheduler.shutdown(wait=False)
|