Files
gate_scout_order/onchain_scout_gate/app/web.py
T
2026-05-16 22:25:48 +08:00

627 lines
26 KiB
Python

from __future__ import annotations
import hashlib
import json
import logging
from pathlib import Path
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import Depends, FastAPI, Form, HTTPException, Request, status
from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from starlette.middleware.gzip import GZipMiddleware
from starlette.middleware.sessions import SessionMiddleware
from .config import Settings
from .daily_report import DailyReportService
from .gemma_client import OllamaGemmaClient
from .monitor import MonitorService
from .notifier import WeComNotifier
from .gate import GateClient
from .order_executors_store import (
add_executor,
delete_executor,
ensure_store_initialized,
read_snapshot,
update_executor,
write_global_settings,
)
from .storage import Storage
LOGGER = logging.getLogger("onchain_scout.web")
FIXED_BAR = "5m"
DAILY_REPORT_JOB_ID = "daily_report_job"
def _hash_password(plain: str) -> str:
return hashlib.sha256(plain.encode("utf-8")).hexdigest()
def _asset_version(root: Path) -> str:
"""静态资源 ?v= 避免浏览器强缓存旧 app.js。"""
mt = 0
for name in ("app.js", "style.css"):
try:
mt = max(mt, int((root / "static" / name).stat().st_mtime))
except OSError:
continue
return str(mt or 1)
def _dedupe_funnel_alerts_by_symbol(alerts: list[dict]) -> list[dict]:
"""同一币种只保留一条漏斗记录:优先保留 created_at 最新的(避免历史轮次堆叠)。"""
by_time = sorted(alerts, key=lambda x: str(x.get("created_at") or ""), reverse=True)
seen: set[str] = set()
out: list[dict] = []
for a in by_time:
sym = (a.get("symbol") or "").strip().upper()
if not sym or sym in seen:
continue
seen.add(sym)
out.append(a)
return out
def _slim_monitor_state(state) -> dict:
"""避免 monitoring_pool 全量下发(可达上千条),局域网面板极慢。"""
raw = dict(state.__dict__)
pool = list(raw.pop("monitoring_pool", []) or [])
raw["monitoring_pool_count"] = len(pool)
raw["monitoring_pool_preview"] = pool[:50]
return raw
def _parse_hhmm(raw: str) -> tuple[int, int]:
s = (raw or "").strip()
if ":" not in s:
return 8, 30
hh, mm = s.split(":", 1)
try:
h = max(0, min(23, int(hh)))
m = max(0, min(59, int(mm)))
return h, m
except ValueError:
return 8, 30
def _to_bool(raw: str | None, default: bool) -> bool:
if raw is None:
return default
return str(raw).strip().lower() in {"1", "true", "yes", "y", "on"}
def _normalize_manual_symbols(raw: object) -> list[str]:
if isinstance(raw, list):
text = "\n".join([str(x) for x in raw])
else:
text = str(raw or "")
out: list[str] = []
for token in text.replace(",", "\n").replace(";", "\n").splitlines():
s = token.strip().upper()
if not s:
continue
if "_USDT" in s:
s = s.split("_USDT", 1)[0]
elif "-USDT-SWAP" in s:
s = s.split("-USDT-SWAP", 1)[0]
elif "-USDT" in s:
s = s.split("-USDT", 1)[0]
s = "".join(ch for ch in s if ch.isalnum())
if not s:
continue
if s not in out:
out.append(s)
return out[:200]
def _normalize_symbol_token(raw: object) -> str:
s = str(raw or "").strip().upper()
if not s:
return ""
if "_USDT" in s:
s = s.split("_USDT", 1)[0]
elif "-USDT-SWAP" in s:
s = s.split("-USDT-SWAP", 1)[0]
elif "-USDT" in s:
s = s.split("-USDT", 1)[0]
s = "".join(ch for ch in s if ch.isalnum())
return s
def create_app(settings: Settings) -> FastAPI:
def require_login(request: Request) -> None:
if not settings.auth.enabled:
return
if request.session.get("logged_in") is not True:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="unauthorized")
app = FastAPI(title="MATRIX FUNNEL", version="2.1.0")
app.add_middleware(GZipMiddleware, minimum_size=800)
app.add_middleware(
SessionMiddleware,
secret_key=settings.app.session_secret,
max_age=60 * 60 * 24 * 7,
same_site="lax",
https_only=False,
)
root_dir = Path(__file__).resolve().parent.parent
templates = Jinja2Templates(directory=str(root_dir / "templates"))
app.mount("/static", StaticFiles(directory=str(root_dir / "static")), name="static")
storage = Storage(settings.app.database_url)
proxy_url = settings.proxy.url if settings.proxy.enabled else None
gate_client = GateClient(settings.gate, proxy_url=proxy_url)
notifier = WeComNotifier(settings.wecom, proxy_url=None)
gemma_client = OllamaGemmaClient(settings.gemma) if settings.gemma.enabled else None
monitor = MonitorService(
settings=settings,
storage=storage,
gate_client=gate_client,
notifier=notifier,
gemma_client=gemma_client,
)
daily_report = DailyReportService(
settings=settings,
storage=storage,
gate_client=gate_client,
notifier=notifier,
gemma_client=gemma_client,
)
scheduler = AsyncIOScheduler(timezone="UTC")
app.state.settings = settings
app.state.storage = storage
app.state.monitor = monitor
app.state.scheduler = scheduler
app.state.auth_user = settings.auth.username
app.state.auth_password_hash = _hash_password(settings.auth.password)
@app.on_event("startup")
async def on_startup() -> None:
runtime_dir = Path(settings.app.log_file).resolve().parent
runtime_dir.mkdir(parents=True, exist_ok=True)
await storage.init_db()
ensure_store_initialized(settings)
await _ensure_runtime_defaults(storage)
monitor.state.chart_bar = FIXED_BAR
scheduler.add_job(monitor.run_cycle, "interval", seconds=settings.app.poll_interval_seconds, max_instances=1)
dr = await _get_daily_report_settings(storage, settings)
if dr["enabled"]:
hh, mm = _parse_hhmm(str(dr["run_time_cn"]))
scheduler.add_job(
daily_report.run_once,
"cron",
hour=hh,
minute=mm,
max_instances=1,
timezone="Asia/Shanghai",
id=DAILY_REPORT_JOB_ID,
replace_existing=True,
)
scheduler.start()
await monitor.run_cycle()
if dr["enabled"] and dr["run_on_startup"]:
await daily_report.run_once()
await storage.add_log(
"INFO",
(
f"service_started_gate_usdt gemma={'on' if settings.gemma.enabled else 'off'} "
f"proxy={'on ' + settings.proxy.url if settings.proxy.enabled else 'off'} "
f"web_login={'on' if settings.auth.enabled else 'off'} "
f"daily_report={'on' if settings.daily_report.enabled else 'off'}"
),
)
LOGGER.info("Service started")
@app.on_event("shutdown")
async def on_shutdown() -> None:
scheduler.shutdown(wait=False)
await storage.add_log("INFO", "service_stopped")
await storage.close()
@app.get("/", response_class=HTMLResponse)
async def root(request: Request) -> HTMLResponse:
if not settings.auth.enabled:
return RedirectResponse("/dashboard", status_code=302)
if request.session.get("logged_in") is True:
return RedirectResponse("/dashboard", status_code=302)
return RedirectResponse("/login", status_code=302)
@app.get("/login", response_class=HTMLResponse)
async def login_page(request: Request) -> HTMLResponse:
if not settings.auth.enabled:
return RedirectResponse("/dashboard", status_code=302)
return templates.TemplateResponse("login.html", {"request": request, "error": ""})
@app.post("/login", response_class=HTMLResponse)
async def login_submit(request: Request, username: str = Form(...), password: str = Form(...)) -> HTMLResponse:
if not settings.auth.enabled:
return RedirectResponse("/dashboard", status_code=302)
ok_user = username == app.state.auth_user
ok_pass = _hash_password(password) == app.state.auth_password_hash
if ok_user and ok_pass:
request.session["logged_in"] = True
request.session["username"] = username
return RedirectResponse("/dashboard", status_code=302)
return templates.TemplateResponse("login.html", {"request": request, "error": "用户名或密码错误"})
@app.get("/logout")
async def logout(request: Request) -> RedirectResponse:
request.session.clear()
if not settings.auth.enabled:
return RedirectResponse("/dashboard", status_code=302)
return RedirectResponse("/login", status_code=302)
@app.get("/dashboard", response_class=HTMLResponse)
async def dashboard(request: Request) -> HTMLResponse:
if settings.auth.enabled and request.session.get("logged_in") is not True:
return RedirectResponse("/login", status_code=302)
display_name = request.session.get("username") or settings.auth.username or "admin"
return templates.TemplateResponse(
"dashboard.html",
{
"request": request,
"username": display_name,
"asset_version": _asset_version(root_dir),
},
)
@app.get("/api/status")
async def api_status(_: None = Depends(require_login)) -> JSONResponse:
intraday = await _get_intraday_settings(storage)
return JSONResponse(
{
"running": True,
"state": _slim_monitor_state(monitor.state),
"poll_interval_seconds": settings.app.poll_interval_seconds,
"chart_bar": FIXED_BAR,
"mode": "GATE_USDT_PERP",
"universe": settings.monitor.universe,
"intraday_settings": intraday,
"gemma_enabled": settings.gemma.enabled,
"gemma_model": settings.gemma.model,
}
)
@app.get("/api/settings")
async def api_settings_get(_: None = Depends(require_login)) -> JSONResponse:
intraday = await _get_intraday_settings(storage)
daily = await _get_daily_report_settings(storage, settings)
blocklist = await _get_symbol_blocklist_settings(storage)
return JSONResponse(
{
"chart_bar": FIXED_BAR,
"intraday_settings": intraday,
"daily_report_settings": daily,
"symbol_blocklist_settings": blocklist,
"order_executors": read_snapshot(settings),
}
)
@app.get("/api/order-executors")
async def api_order_executors_get(_: None = Depends(require_login)) -> JSONResponse:
return JSONResponse(read_snapshot(settings))
@app.put("/api/order-executors/settings")
async def api_order_executors_settings(request: Request, _: None = Depends(require_login)) -> JSONResponse:
body = await request.json()
try:
snap = write_global_settings(
settings,
enabled=body.get("enabled") if "enabled" in body else None,
webhook_secret=body.get("webhook_secret") if "webhook_secret" in body else None,
timeout_seconds=body.get("timeout_seconds") if "timeout_seconds" in body else None,
)
except ValueError as exc:
return JSONResponse({"ok": False, "detail": str(exc)}, status_code=400)
await storage.add_log(
"INFO",
(
"order_executors_settings_updated "
f"enabled={snap.get('enabled')} timeout={snap.get('timeout_seconds')} "
f"secret_set={bool((snap.get('webhook_secret') or '').strip())}"
),
)
return JSONResponse({"ok": True, "order_executors": snap})
@app.post("/api/order-executors")
async def api_order_executors_add(request: Request, _: None = Depends(require_login)) -> JSONResponse:
body = await request.json()
try:
row = add_executor(
settings,
name=str(body.get("name") or ""),
base_url=str(body.get("base_url") or ""),
enabled=bool(body.get("enabled", True)),
)
except ValueError as exc:
return JSONResponse({"ok": False, "detail": str(exc)}, status_code=400)
await storage.add_log(
"INFO",
f"order_executor_added name={row.get('name')} url={row.get('base_url')}",
)
return JSONResponse({"ok": True, "executor": row, "order_executors": read_snapshot(settings)})
@app.patch("/api/order-executors/{executor_id}")
async def api_order_executors_patch(
executor_id: str, request: Request, _: None = Depends(require_login)
) -> JSONResponse:
body = await request.json()
try:
row = update_executor(
settings,
executor_id,
name=body.get("name") if "name" in body else None,
base_url=body.get("base_url") if "base_url" in body else None,
enabled=body.get("enabled") if "enabled" in body else None,
)
except ValueError as exc:
code = 404 if str(exc) == "executor_not_found" else 400
return JSONResponse({"ok": False, "detail": str(exc)}, status_code=code)
await storage.add_log("INFO", f"order_executor_updated id={executor_id} name={row.get('name')}")
return JSONResponse({"ok": True, "executor": row, "order_executors": read_snapshot(settings)})
@app.delete("/api/order-executors/{executor_id}")
async def api_order_executors_delete(executor_id: str, _: None = Depends(require_login)) -> JSONResponse:
try:
delete_executor(settings, executor_id)
except ValueError as exc:
return JSONResponse({"ok": False, "detail": str(exc)}, status_code=404)
await storage.add_log("INFO", f"order_executor_deleted id={executor_id}")
return JSONResponse({"ok": True, "order_executors": read_snapshot(settings)})
@app.post("/api/settings/intraday")
async def api_settings_intraday(request: Request, _: None = Depends(require_login)) -> JSONResponse:
body = await request.json()
range_hours = _must_float(body.get("range_hours"), "range_hours")
range_max_pct = _must_float(body.get("range_max_pct"), "range_max_pct")
volume_spike_mult = _must_float(body.get("volume_spike_mult"), "volume_spike_mult")
volume_lookback_bars = int(_must_float(body.get("volume_lookback_bars"), "volume_lookback_bars"))
breakout_buffer_pct = _must_float(body.get("breakout_buffer_pct"), "breakout_buffer_pct")
stop_buffer_pct = _must_float(body.get("stop_buffer_pct"), "stop_buffer_pct")
push_time_window_enabled = _to_bool(body.get("push_time_window_enabled"), True)
if range_hours < 1:
raise HTTPException(status_code=400, detail="range_hours must be >= 1")
if range_max_pct <= 0:
raise HTTPException(status_code=400, detail="range_max_pct must be > 0")
if volume_spike_mult < 1:
raise HTTPException(status_code=400, detail="volume_spike_mult must be >= 1")
if volume_lookback_bars < 5:
raise HTTPException(status_code=400, detail="volume_lookback_bars must be >= 5")
if breakout_buffer_pct < 0:
raise HTTPException(status_code=400, detail="breakout_buffer_pct must be >= 0")
if stop_buffer_pct < 0 or stop_buffer_pct > 10:
raise HTTPException(status_code=400, detail="stop_buffer_pct must be between 0 and 10")
await storage.set_kv("intraday_range_hours", str(range_hours))
await storage.set_kv("intraday_range_max_pct", str(range_max_pct))
await storage.set_kv("intraday_volume_spike_mult", str(volume_spike_mult))
await storage.set_kv("intraday_volume_lookback_bars", str(volume_lookback_bars))
await storage.set_kv("intraday_breakout_buffer_pct", str(breakout_buffer_pct))
await storage.set_kv("intraday_stop_buffer_pct", str(stop_buffer_pct))
await storage.set_kv("intraday_push_time_window_enabled", "1" if push_time_window_enabled else "0")
await storage.add_log(
"INFO",
(
"intraday_settings_updated "
f"range_hours={range_hours} range_max_pct={range_max_pct} "
f"volume_spike_mult={volume_spike_mult} volume_lookback_bars={volume_lookback_bars} "
f"breakout_buffer_pct={breakout_buffer_pct} stop_buffer_pct={stop_buffer_pct} "
f"push_time_window_enabled={push_time_window_enabled}"
),
)
return JSONResponse({"ok": True, "intraday_settings": await _get_intraday_settings(storage)})
@app.post("/api/settings/daily-report")
async def api_settings_daily_report(request: Request, _: None = Depends(require_login)) -> JSONResponse:
body = await request.json()
enabled = bool(body.get("enabled", True))
run_time_cn = str(body.get("run_time_cn") or "08:30").strip()
push_wecom = bool(body.get("push_wecom", True))
run_on_startup = bool(body.get("run_on_startup", False))
hh, mm = _parse_hhmm(run_time_cn)
run_time_cn = f"{hh:02d}:{mm:02d}"
await storage.set_kv("daily_report_enabled", "1" if enabled else "0")
await storage.set_kv("daily_report_run_time_cn", run_time_cn)
await storage.set_kv("daily_report_push_wecom", "1" if push_wecom else "0")
await storage.set_kv("daily_report_run_on_startup", "1" if run_on_startup else "0")
if scheduler.get_job(DAILY_REPORT_JOB_ID):
scheduler.remove_job(DAILY_REPORT_JOB_ID)
if enabled:
scheduler.add_job(
daily_report.run_once,
"cron",
hour=hh,
minute=mm,
max_instances=1,
timezone="Asia/Shanghai",
id=DAILY_REPORT_JOB_ID,
replace_existing=True,
)
daily = await _get_daily_report_settings(storage, settings)
await storage.add_log(
"INFO",
(
"daily_report_settings_updated "
f"enabled={daily['enabled']} run_time_cn={daily['run_time_cn']} "
f"push_wecom={daily['push_wecom']} run_on_startup={daily['run_on_startup']}"
),
)
return JSONResponse({"ok": True, "daily_report_settings": daily})
@app.post("/api/settings/symbol-blocklist")
async def api_settings_symbol_blocklist(request: Request, _: None = Depends(require_login)) -> JSONResponse:
body = await request.json()
symbols = _normalize_manual_symbols(body.get("symbols_text", ""))
await storage.set_kv("monitor_symbol_blocklist", json.dumps(symbols, ensure_ascii=False))
await storage.add_log(
"INFO",
f"symbol_blocklist_updated count={len(symbols)} symbols={','.join(symbols[:30])}{'' if len(symbols) > 30 else ''}",
)
return JSONResponse({"ok": True, "symbol_blocklist_settings": await _get_symbol_blocklist_settings(storage)})
@app.get("/api/alerts")
async def api_alerts(_: None = Depends(require_login)) -> JSONResponse:
alerts = await storage.get_recent_alerts(limit=120)
return JSONResponse({"items": alerts})
@app.get("/api/logs")
async def api_logs(_: None = Depends(require_login)) -> JSONResponse:
logs = await storage.get_recent_logs(limit=120)
return JSONResponse({"items": logs})
@app.get("/api/config")
async def api_config(_: None = Depends(require_login)) -> JSONResponse:
symbols = [{"symbol": w.symbol.upper()} for w in settings.watch_symbols]
g = settings.gemma
dr = await _get_daily_report_settings(storage, settings)
return JSONResponse(
{
"auth_enabled": settings.auth.enabled,
"host": settings.app.host,
"port": settings.app.port,
"poll_interval_seconds": settings.app.poll_interval_seconds,
"universe": settings.monitor.universe,
"min_24h_quote_volume_usdt": settings.monitor.min_24h_quote_volume_usdt,
"btc_daily_gate_enabled": settings.monitor.btc_daily_gate_enabled,
"btc_sideways_lookback_days": settings.monitor.btc_sideways_lookback_days,
"btc_sideways_max_range_pct": settings.monitor.btc_sideways_max_range_pct,
"symbol_signal_dedupe_hours": settings.monitor.symbol_signal_dedupe_hours,
"wecom_push_max_volume_rank": settings.monitor.wecom_push_max_volume_rank,
"gemma": {
"enabled": g.enabled,
"ollama_base_url": g.ollama_base_url,
"model": g.model,
"max_funnel_per_cycle": g.max_funnel_per_cycle,
"vision_top_n": g.vision_top_n,
"gemma_push_priority_min": g.gemma_push_priority_min,
"composite_push_min": g.composite_push_min,
},
"daily_report": dr,
"proxy": {
"enabled": settings.proxy.enabled,
"url": settings.proxy.url if settings.proxy.enabled else "",
},
"order_executor": read_snapshot(settings),
"watch_symbols": symbols,
}
)
@app.get("/api/funnel")
async def api_funnel(_: None = Depends(require_login)) -> JSONResponse:
alerts = await storage.get_recent_alerts(limit=500)
items = [a for a in alerts if (a.get("details") or {}).get("source") == "gemma_funnel"]
items = _dedupe_funnel_alerts_by_symbol(items)
items.sort(
key=lambda x: float((x.get("details") or {}).get("composite_score") or 0.0),
reverse=True,
)
return JSONResponse({"items": items[:100]})
@app.get("/api/daily-report")
async def api_daily_report(_: None = Depends(require_login)) -> JSONResponse:
raw = await storage.get_kv("daily_report_latest")
if not raw:
return JSONResponse(
{
"ready": False,
"message": "晨报尚未生成。请等待定时任务,或开启 daily_report.run_on_startup。",
}
)
try:
obj = json.loads(raw)
except json.JSONDecodeError:
return JSONResponse({"ready": False, "message": "晨报解析失败"}, status_code=500)
return JSONResponse({"ready": True, "report": obj})
@app.post("/api/daily-report/run")
async def api_daily_report_run(_: None = Depends(require_login)) -> JSONResponse:
dr = await _get_daily_report_settings(storage, settings)
if not dr["enabled"]:
return JSONResponse({"ok": False, "message": "daily_report.enabled=false"}, status_code=400)
report = await daily_report.run_once()
return JSONResponse({"ok": True, "report": report})
return app
async def _ensure_runtime_defaults(storage: Storage) -> None:
defaults = {
"intraday_range_hours": "12",
"intraday_range_max_pct": "2.0",
"intraday_volume_spike_mult": "1.6",
"intraday_volume_lookback_bars": "18",
"intraday_breakout_buffer_pct": "0.03",
"intraday_push_time_window_enabled": "1",
"intraday_stop_buffer_pct": "0.2",
"daily_report_enabled": "1",
"daily_report_run_time_cn": "08:30",
"daily_report_push_wecom": "1",
"daily_report_run_on_startup": "0",
}
for key, value in defaults.items():
if await storage.get_kv(key) is None:
await storage.set_kv(key, value)
async def _get_intraday_settings(storage: Storage) -> dict:
return {
"range_hours": _to_float(await storage.get_kv("intraday_range_hours"), 24.0),
"range_max_pct": _to_float(await storage.get_kv("intraday_range_max_pct"), 1.5),
"volume_spike_mult": _to_float(await storage.get_kv("intraday_volume_spike_mult"), 1.6),
"volume_lookback_bars": int(_to_float(await storage.get_kv("intraday_volume_lookback_bars"), 20.0)),
"breakout_buffer_pct": _to_float(await storage.get_kv("intraday_breakout_buffer_pct"), 0.05),
"push_time_window_enabled": _to_bool(await storage.get_kv("intraday_push_time_window_enabled"), True),
"stop_buffer_pct": _to_float(await storage.get_kv("intraday_stop_buffer_pct"), 0.2),
}
async def _get_daily_report_settings(storage: Storage, settings: Settings) -> dict:
return {
"enabled": _to_bool(await storage.get_kv("daily_report_enabled"), settings.daily_report.enabled),
"run_time_cn": str(await storage.get_kv("daily_report_run_time_cn") or settings.daily_report.run_time_cn),
"push_wecom": _to_bool(await storage.get_kv("daily_report_push_wecom"), settings.daily_report.push_wecom),
"run_on_startup": _to_bool(await storage.get_kv("daily_report_run_on_startup"), settings.daily_report.run_on_startup),
}
async def _get_symbol_blocklist_settings(storage: Storage) -> dict:
raw = await storage.get_kv("monitor_symbol_blocklist")
symbols: list[str] = []
if raw and str(raw).strip():
try:
data = json.loads(raw)
if isinstance(data, list):
seen: set[str] = set()
for x in data:
s = str(x).strip().upper()
if s and s not in seen:
seen.add(s)
symbols.append(s)
except json.JSONDecodeError:
symbols = []
return {
"symbols": symbols,
"symbols_text": "\n".join(symbols),
"count": len(symbols),
}
def _to_float(raw: str | None, default: float) -> float:
try:
return float(raw) if raw is not None else default
except (TypeError, ValueError):
return default
def _must_float(raw: object, name: str) -> float:
try:
return float(raw)
except (TypeError, ValueError):
raise HTTPException(status_code=400, detail=f"{name} must be a number")