修改前端漏斗
This commit is contained in:
@@ -3,6 +3,7 @@ from __future__ import annotations
|
||||
import hashlib
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime, timedelta, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
@@ -32,6 +33,9 @@ from .storage import Storage
|
||||
LOGGER = logging.getLogger("onchain_scout.web")
|
||||
FIXED_BAR = "5m"
|
||||
DAILY_REPORT_JOB_ID = "daily_report_job"
|
||||
FUNNEL_DISPLAY_HOURS_DEFAULT = 24.0
|
||||
FUNNEL_DISPLAY_HOURS_MIN = 1.0
|
||||
FUNNEL_DISPLAY_HOURS_MAX = 168.0
|
||||
|
||||
|
||||
def _hash_password(plain: str) -> str:
|
||||
@@ -49,6 +53,35 @@ def _asset_version(root: Path) -> str:
|
||||
return str(mt or 1)
|
||||
|
||||
|
||||
def _parse_alert_created_at_utc(raw: object) -> datetime | None:
|
||||
if raw is None:
|
||||
return None
|
||||
try:
|
||||
s = str(raw).strip()
|
||||
if not s:
|
||||
return None
|
||||
if s.endswith("Z"):
|
||||
s = s[:-1] + "+00:00"
|
||||
dt = datetime.fromisoformat(s)
|
||||
if dt.tzinfo is not None:
|
||||
return dt.astimezone(timezone.utc).replace(tzinfo=None)
|
||||
return dt
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
|
||||
|
||||
def _filter_alerts_within_hours(alerts: list[dict], *, within_hours: float) -> list[dict]:
|
||||
if within_hours <= 0:
|
||||
return list(alerts)
|
||||
cutoff = datetime.utcnow() - timedelta(hours=within_hours)
|
||||
out: list[dict] = []
|
||||
for a in alerts:
|
||||
created = _parse_alert_created_at_utc(a.get("created_at"))
|
||||
if created is not None and created >= cutoff:
|
||||
out.append(a)
|
||||
return out
|
||||
|
||||
|
||||
def _dedupe_funnel_alerts_by_symbol(alerts: list[dict]) -> list[dict]:
|
||||
"""同一币种只保留一条漏斗记录:优先保留 created_at 最新的(避免历史轮次堆叠)。"""
|
||||
by_time = sorted(alerts, key=lambda x: str(x.get("created_at") or ""), reverse=True)
|
||||
@@ -513,15 +546,23 @@ def create_app(settings: Settings) -> FastAPI:
|
||||
)
|
||||
|
||||
@app.get("/api/funnel")
|
||||
async def api_funnel(_: None = Depends(require_login)) -> JSONResponse:
|
||||
async def api_funnel(
|
||||
window_hours: float = FUNNEL_DISPLAY_HOURS_DEFAULT,
|
||||
_: None = Depends(require_login),
|
||||
) -> JSONResponse:
|
||||
wh = max(
|
||||
FUNNEL_DISPLAY_HOURS_MIN,
|
||||
min(FUNNEL_DISPLAY_HOURS_MAX, float(window_hours)),
|
||||
)
|
||||
alerts = await storage.get_recent_alerts(limit=500)
|
||||
items = [a for a in alerts if (a.get("details") or {}).get("source") == "gemma_funnel"]
|
||||
items = _filter_alerts_within_hours(items, within_hours=wh)
|
||||
items = _dedupe_funnel_alerts_by_symbol(items)
|
||||
items.sort(
|
||||
key=lambda x: float((x.get("details") or {}).get("composite_score") or 0.0),
|
||||
reverse=True,
|
||||
)
|
||||
return JSONResponse({"items": items[:100]})
|
||||
return JSONResponse({"items": items[:100], "window_hours": wh})
|
||||
|
||||
@app.get("/api/daily-report")
|
||||
async def api_daily_report(_: None = Depends(require_login)) -> JSONResponse:
|
||||
|
||||
Reference in New Issue
Block a user