增加排名

This commit is contained in:
dekun
2026-05-22 14:05:18 +08:00
parent 71ed38b32d
commit e0ec3f87a9
8 changed files with 663 additions and 273 deletions
+35 -41
View File
@@ -6,16 +6,18 @@ from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from .aggregator import aggregate_period, enrich_snapshot_meta
from .config import ROOT_DIR, settings
from .funding_store import enrich_items_with_funding, get_funding_bundle
from .funding_store import get_funding_bundle
from .kline_store import get_daily_candles, sync_daily_klines
from .db import get_latest_snapshot, init_db, log_push
from .db import get_latest_snapshot, init_db, log_push, save_snapshot
from .exceptions import BinanceRateLimitedError
from .periods import get_today_period, get_yesterday_period
from .period_api import get_period_top30
from .periods import get_daybefore_period, get_today_period, get_yesterday_period
from .scheduler import job_finalize_yesterday, job_push_wecom, job_refresh_today, start_scheduler, startup_tasks, stop_scheduler
from .state import get_today_cache
from .stats import compute_three_day_stats
from .aggregator import aggregate_period
from .wecom import build_markdown, send_wecom_markdown
from .state import get_today_cache
logging.basicConfig(
level=logging.INFO,
@@ -59,48 +61,34 @@ async def index():
return {"message": "Web UI not found. Place files in /web"}
@app.get("/api/yesterday/top30")
async def api_yesterday_top30():
snap = get_latest_snapshot("yesterday")
if snap:
items = await enrich_items_with_funding(snap["items"])
return {
"period_start": snap["period_start"],
"period_end": snap["period_end"],
"updated_at": snap["created_at"],
"top_n": settings.top_n,
"volume_threshold": settings.volume_threshold,
"change_threshold": settings.change_threshold,
"items": items,
}
start, end = get_yesterday_period()
try:
mode = settings.yesterday_data_mode
items = await aggregate_period(start, end, mode=mode)
return enrich_snapshot_meta(items, start, end, data_mode=mode)
except Exception as e:
logger.error("api yesterday failed: %s", e)
meta = enrich_snapshot_meta([], start, end)
meta["error"] = "数据暂不可用,请检查网络或稍后重试"
return meta
@app.get("/api/today/top30")
async def api_today_top30():
from .state import get_today_cache
cached = get_today_cache()
if cached:
from .funding_store import enrich_items_with_funding
cached["items"] = await enrich_items_with_funding(cached.get("items", []))
return cached
start, end = get_today_period()
try:
mode = settings.today_data_mode
items = await aggregate_period(start, end, use_live_prices=True, mode=mode)
return enrich_snapshot_meta(items, start, end, data_mode=mode)
except Exception as e:
logger.error("api today failed: %s", e)
meta = enrich_snapshot_meta([], start, end)
meta["error"] = "数据暂不可用,请检查网络或稍后重试"
return meta
return await get_period_top30(
"today", get_today_period, use_live_prices=True, data_mode=settings.today_data_mode
)
@app.get("/api/yesterday/top30")
async def api_yesterday_top30():
return await get_period_top30("yesterday", get_yesterday_period)
@app.get("/api/daybefore/top30")
async def api_daybefore_top30():
return await get_period_top30("daybefore", get_daybefore_period)
@app.get("/api/stats/three-day")
async def api_stats_three_day():
return compute_three_day_stats()
@app.post("/api/push/test")
@@ -135,6 +123,12 @@ async def api_refresh_today():
return get_today_cache() or {"message": "done"}
@app.post("/api/refresh/daybefore")
async def api_refresh_daybefore():
await job_finalize_yesterday()
return get_latest_snapshot("daybefore") or {"message": "done"}
@app.get("/api/chart/{symbol}/daily")
async def api_chart_daily(symbol: str, limit: int | None = None, refresh: bool = False):
"""合约日 K 线:优先读本地 SQLite,过期再拉币安入库。"""
+59
View File
@@ -0,0 +1,59 @@
"""各周期 Top30 API 共用逻辑。"""
import logging
from collections.abc import Callable
from datetime import datetime
from .aggregator import aggregate_period, enrich_snapshot_meta
from .config import settings
from .db import get_latest_snapshot, save_snapshot
from .funding_store import enrich_items_with_funding
logger = logging.getLogger(__name__)
async def get_period_top30(
period_type: str,
period_getter: Callable[[], tuple[datetime, datetime]],
*,
use_live_prices: bool = False,
data_mode: str | None = None,
auto_save: bool = True,
) -> dict:
start, end = period_getter()
expected_end = end.isoformat()
snap = get_latest_snapshot(period_type)
if snap and snap.get("period_end") == expected_end:
items = await enrich_items_with_funding(snap["items"])
return {
"period_type": period_type,
"period_start": snap["period_start"],
"period_end": snap["period_end"],
"updated_at": snap["created_at"],
"top_n": settings.top_n,
"volume_threshold": settings.volume_threshold,
"change_threshold": settings.change_threshold,
"data_mode": data_mode or settings.yesterday_data_mode,
"items": items,
}
mode = data_mode or (
settings.today_data_mode if use_live_prices else settings.yesterday_data_mode
)
try:
items = await aggregate_period(
start, end, use_live_prices=use_live_prices, mode=mode
)
if auto_save and items:
save_snapshot(period_type, start, end, items)
items = await enrich_items_with_funding(items)
meta = enrich_snapshot_meta(items, start, end, data_mode=mode)
meta["period_type"] = period_type
return meta
except Exception as e:
logger.error("period %s failed: %s", period_type, e)
meta = enrich_snapshot_meta([], start, end)
meta["period_type"] = period_type
meta["error"] = "数据暂不可用,请检查网络或稍后重试"
return meta
+8
View File
@@ -24,6 +24,14 @@ def get_yesterday_period(now: datetime | None = None) -> tuple[datetime, datetim
return start, end
def get_daybefore_period(now: datetime | None = None) -> tuple[datetime, datetime]:
"""前日周期 [D-2 08:00, D-1 08:00) in Shanghai time."""
now = now or now_shanghai()
end = _align_cutoff(now) - timedelta(days=1)
start = end - timedelta(days=1)
return start, end
def get_today_period(now: datetime | None = None) -> tuple[datetime, datetime]:
"""[D 08:00, now) in Shanghai time."""
now = now or now_shanghai()
+43 -19
View File
@@ -9,7 +9,7 @@ from .binance import binance_client
from .config import settings
from .db import get_latest_snapshot, init_db, log_push, save_snapshot, was_pushed_today
from .exceptions import BinanceRateLimitedError
from .periods import get_today_period, get_yesterday_period, now_shanghai
from .periods import get_daybefore_period, get_today_period, get_yesterday_period, now_shanghai
from .state import get_today_cache, set_today_cache
from .funding_store import prefetch_funding
from .kline_store import prefetch_symbols
@@ -39,30 +39,46 @@ def _restore_today_from_db() -> bool:
return False
async def _finalize_closed_period(period_type: str, start, end) -> list[dict] | None:
items = await aggregate_period(
start, end, use_live_prices=False, mode=settings.yesterday_data_mode
)
save_snapshot(period_type, start, end, items)
logger.info("%s snapshot saved: %s ~ %s, %d items", period_type, start, end, len(items))
return items
async def job_finalize_yesterday() -> None:
logger.info("Job: finalize yesterday period")
logger.info("Job: finalize yesterday & daybefore")
if binance_client.is_rate_limited():
logger.warning(
"Skip yesterday job — rate limited %ss",
"Skip finalize — rate limited %ss",
binance_client.rate_limit_remaining_sec(),
)
return
try:
binance_client.clear_symbol_cache()
start, end = get_yesterday_period()
items = await aggregate_period(
start, end, use_live_prices=False, mode=settings.yesterday_data_mode
)
save_snapshot("yesterday", start, end, items)
logger.info("Yesterday snapshot saved: %d items", len(items))
syms = [x["symbol"] for x in items if x.get("symbol")]
if syms:
await prefetch_symbols(syms)
await prefetch_funding(syms)
start_db, end_db = get_daybefore_period()
snap_db = get_latest_snapshot("daybefore")
if not snap_db or snap_db.get("period_end") != end_db.isoformat():
items_db = await _finalize_closed_period("daybefore", start_db, end_db)
if items_db:
syms = [x["symbol"] for x in items_db if x.get("symbol")]
if syms:
await prefetch_symbols(syms)
await prefetch_funding(syms)
start_y, end_y = get_yesterday_period()
items_y = await _finalize_closed_period("yesterday", start_y, end_y)
if items_y:
syms = [x["symbol"] for x in items_y if x.get("symbol")]
if syms:
await prefetch_symbols(syms)
await prefetch_funding(syms)
except BinanceRateLimitedError as e:
logger.error("Finalize yesterday rate limited %ss", e.retry_after_sec)
logger.error("Finalize rate limited %ss", e.retry_after_sec)
except Exception as e:
logger.error("Finalize yesterday failed: %s", e)
logger.error("Finalize failed: %s", e)
async def job_push_wecom() -> None:
@@ -144,14 +160,22 @@ async def startup_tasks() -> None:
_restore_today_from_db()
return
start_db, end_db = get_daybefore_period(now)
snap_db = get_latest_snapshot("daybefore")
if not snap_db or snap_db.get("period_end") != end_db.isoformat():
try:
logger.info("Startup: computing daybefore snapshot")
await _finalize_closed_period("daybefore", start_db, end_db)
except BinanceRateLimitedError as e:
logger.error("Startup daybefore rate limited %ss", e.retry_after_sec)
except Exception as e:
logger.error("Startup daybefore failed: %s", e)
snap = get_latest_snapshot("yesterday")
if not snap or snap.get("period_end") != end_y.isoformat():
try:
logger.info("Startup: computing yesterday snapshot")
items = await aggregate_period(
start_y, end_y, use_live_prices=False, mode=settings.yesterday_data_mode
)
save_snapshot("yesterday", start_y, end_y, items)
await _finalize_closed_period("yesterday", start_y, end_y)
except BinanceRateLimitedError as e:
logger.error("Startup yesterday rate limited %ss", e.retry_after_sec)
except Exception as e:
+131
View File
@@ -0,0 +1,131 @@
"""三日数据统计:连续三日 Top30 且 |涨跌|>=5%"""
from typing import Any
from .config import settings
from .db import get_latest_snapshot
def _items_by_symbol(items: list[dict]) -> dict[str, dict]:
return {x["symbol"]: x for x in items if x.get("symbol")}
def compute_three_day_stats() -> dict[str, Any]:
today_snap = get_latest_snapshot("today")
yesterday_snap = get_latest_snapshot("yesterday")
daybefore_snap = get_latest_snapshot("daybefore")
threshold = settings.change_threshold
top_n = settings.top_n
missing = []
if not today_snap:
missing.append("今日")
if not yesterday_snap:
missing.append("昨日")
if not daybefore_snap:
missing.append("前日")
if missing:
return {
"ok": False,
"missing_periods": missing,
"message": f"缺少快照:{', '.join(missing)},请等待刷新或手动触发",
"criteria": _criteria_text(threshold, top_n),
"count": 0,
"items": [],
"periods": _period_meta(today_snap, yesterday_snap, daybefore_snap),
}
today_map = _items_by_symbol(today_snap["items"])
yesterday_map = _items_by_symbol(yesterday_snap["items"])
daybefore_map = _items_by_symbol(daybefore_snap["items"])
symbols = set(today_map) & set(yesterday_map) & set(daybefore_map)
qualified: list[dict] = []
for sym in sorted(symbols):
t, y, b = today_map[sym], yesterday_map[sym], daybefore_map[sym]
if not (
abs(t.get("price_change_pct", 0)) >= threshold
and abs(y.get("price_change_pct", 0)) >= threshold
and abs(b.get("price_change_pct", 0)) >= threshold
):
continue
qualified.append(
{
"symbol": sym,
"today": _pick_fields(t),
"yesterday": _pick_fields(y),
"daybefore": _pick_fields(b),
"avg_change_pct": round(
(
t.get("price_change_pct", 0)
+ y.get("price_change_pct", 0)
+ b.get("price_change_pct", 0)
)
/ 3,
4,
),
"total_quote_volume": (
(t.get("quote_volume") or 0)
+ (y.get("quote_volume") or 0)
+ (b.get("quote_volume") or 0)
),
}
)
qualified.sort(key=lambda x: x["total_quote_volume"], reverse=True)
return {
"ok": True,
"criteria": _criteria_text(threshold, top_n),
"count": len(qualified),
"items": qualified,
"periods": _period_meta(today_snap, yesterday_snap, daybefore_snap),
"summary": {
"today_top30": len(today_map),
"yesterday_top30": len(yesterday_map),
"daybefore_top30": len(daybefore_map),
"intersection": len(symbols),
},
}
def _criteria_text(threshold: float, top_n: int) -> str:
return (
f"连续三日成交额 Top{top_n} 且每日 |涨跌幅| ≥ {threshold:g}%"
f"(今日/昨日/前日三个完整切日周期)"
)
def _pick_fields(row: dict) -> dict:
return {
"rank": row.get("rank"),
"quote_volume": row.get("quote_volume"),
"quote_volume_fmt": row.get("quote_volume_fmt"),
"price_change_pct": row.get("price_change_pct"),
"price_change_pct_fmt": row.get("price_change_pct_fmt"),
"funding_rate_fmt": row.get("funding_rate_fmt"),
"is_high_volume": row.get("is_high_volume"),
"is_high_change": row.get("is_high_change"),
}
def _period_meta(today, yesterday, daybefore) -> dict:
def one(snap, label):
if not snap:
return {"label": label, "ready": False}
return {
"label": label,
"ready": True,
"period_start": snap["period_start"],
"period_end": snap["period_end"],
"updated_at": snap.get("created_at"),
}
return {
"today": one(today, "今日"),
"yesterday": one(yesterday, "昨日"),
"daybefore": one(daybefore, "前日"),
}