from __future__ import annotations import hashlib import json import logging from datetime import datetime, timedelta, timezone from pathlib import Path from apscheduler.schedulers.asyncio import AsyncIOScheduler from fastapi import Depends, FastAPI, Form, HTTPException, Request, status from fastapi.responses import HTMLResponse, JSONResponse, RedirectResponse, StreamingResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from starlette.middleware.gzip import GZipMiddleware from starlette.middleware.sessions import SessionMiddleware from .config import Settings from .daily_report import DailyReportService from .gemma_client import OllamaGemmaClient from .key_monitor_service import KeyMonitorService from .key_sl_tp import ( KEY_MONITOR_TYPES, normalize_monitor_type, normalize_sl_tp_mode, sl_tp_mode_label, stop_outside_pct_for_mode, ) from .monitor import MonitorService from .notifier import WeComNotifier from .gate import GateClient from .order_executors_store import ( add_executor, delete_executor, ensure_store_initialized, read_snapshot, update_executor, write_global_settings, ) from .storage import Storage LOGGER = logging.getLogger("onchain_scout.web") FIXED_BAR = "5m" DAILY_REPORT_JOB_ID = "daily_report_job" KEY_MONITOR_JOB_ID = "key_monitor_poll" FUNNEL_DISPLAY_HOURS_DEFAULT = 24.0 FUNNEL_DISPLAY_HOURS_MIN = 1.0 FUNNEL_DISPLAY_HOURS_MAX = 168.0 def _hash_password(plain: str) -> str: return hashlib.sha256(plain.encode("utf-8")).hexdigest() def _asset_version(root: Path) -> str: """静态资源 ?v= 避免浏览器强缓存旧 app.js。""" mt = 0 for name in ("app.js", "style.css"): try: mt = max(mt, int((root / "static" / name).stat().st_mtime)) except OSError: continue return str(mt or 1) def _parse_alert_created_at_utc(raw: object) -> datetime | None: if raw is None: return None try: s = str(raw).strip() if not s: return None if s.endswith("Z"): s = s[:-1] + "+00:00" dt = datetime.fromisoformat(s) if dt.tzinfo is not None: return dt.astimezone(timezone.utc).replace(tzinfo=None) return dt except (TypeError, ValueError): return None def _filter_alerts_within_hours(alerts: list[dict], *, within_hours: float) -> list[dict]: if within_hours <= 0: return list(alerts) cutoff = datetime.utcnow() - timedelta(hours=within_hours) out: list[dict] = [] for a in alerts: created = _parse_alert_created_at_utc(a.get("created_at")) if created is not None and created >= cutoff: out.append(a) return out def _dedupe_funnel_alerts_by_symbol(alerts: list[dict]) -> list[dict]: """同一币种只保留一条漏斗记录:优先保留 created_at 最新的(避免历史轮次堆叠)。""" by_time = sorted(alerts, key=lambda x: str(x.get("created_at") or ""), reverse=True) seen: set[str] = set() out: list[dict] = [] for a in by_time: sym = (a.get("symbol") or "").strip().upper() if not sym or sym in seen: continue seen.add(sym) out.append(a) return out def _slim_monitor_state(state) -> dict: """避免 monitoring_pool 全量下发(可达上千条),局域网面板极慢。""" raw = dict(state.__dict__) pool = list(raw.pop("monitoring_pool", []) or []) raw["monitoring_pool_count"] = len(pool) raw["monitoring_pool_preview"] = pool[:50] return raw def _parse_hhmm(raw: str) -> tuple[int, int]: s = (raw or "").strip() if ":" not in s: return 8, 30 hh, mm = s.split(":", 1) try: h = max(0, min(23, int(hh))) m = max(0, min(59, int(mm))) return h, m except ValueError: return 8, 30 def _to_bool(raw: str | None, default: bool) -> bool: if raw is None: return default return str(raw).strip().lower() in {"1", "true", "yes", "y", "on"} def _normalize_manual_symbols(raw: object) -> list[str]: if isinstance(raw, list): text = "\n".join([str(x) for x in raw]) else: text = str(raw or "") out: list[str] = [] for token in text.replace(",", "\n").replace(";", "\n").splitlines(): s = token.strip().upper() if not s: continue if "_USDT" in s: s = s.split("_USDT", 1)[0] elif "-USDT-SWAP" in s: s = s.split("-USDT-SWAP", 1)[0] elif "-USDT" in s: s = s.split("-USDT", 1)[0] s = "".join(ch for ch in s if ch.isalnum()) if not s: continue if s not in out: out.append(s) return out[:200] def _normalize_symbol_token(raw: object) -> str: s = str(raw or "").strip().upper() if not s: return "" if "_USDT" in s: s = s.split("_USDT", 1)[0] elif "-USDT-SWAP" in s: s = s.split("-USDT-SWAP", 1)[0] elif "-USDT" in s: s = s.split("-USDT", 1)[0] s = "".join(ch for ch in s if ch.isalnum()) return s def create_app(settings: Settings) -> FastAPI: def require_login(request: Request) -> None: if not settings.auth.enabled: return if request.session.get("logged_in") is not True: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="unauthorized") app = FastAPI(title="MATRIX FUNNEL", version="2.1.0") app.add_middleware(GZipMiddleware, minimum_size=800) app.add_middleware( SessionMiddleware, secret_key=settings.app.session_secret, max_age=60 * 60 * 24 * 7, same_site="lax", https_only=False, ) root_dir = Path(__file__).resolve().parent.parent templates = Jinja2Templates(directory=str(root_dir / "templates")) app.mount("/static", StaticFiles(directory=str(root_dir / "static")), name="static") storage = Storage(settings.app.database_url) proxy_url = settings.proxy.url if settings.proxy.enabled else None gate_client = GateClient(settings.gate, proxy_url=proxy_url) notifier = WeComNotifier(settings.wecom, proxy_url=None) gemma_client = OllamaGemmaClient(settings.gemma) if settings.gemma.enabled else None monitor = MonitorService( settings=settings, storage=storage, gate_client=gate_client, notifier=notifier, gemma_client=gemma_client, ) key_monitor = KeyMonitorService( settings=settings, storage=storage, gate=gate_client, notifier=notifier, ) daily_report = DailyReportService( settings=settings, storage=storage, gate_client=gate_client, notifier=notifier, gemma_client=gemma_client, ) scheduler = AsyncIOScheduler(timezone="UTC") app.state.settings = settings app.state.storage = storage app.state.monitor = monitor app.state.key_monitor = key_monitor app.state.gate_client = gate_client app.state.scheduler = scheduler app.state.auth_user = settings.auth.username app.state.auth_password_hash = _hash_password(settings.auth.password) @app.on_event("startup") async def on_startup() -> None: runtime_dir = Path(settings.app.log_file).resolve().parent runtime_dir.mkdir(parents=True, exist_ok=True) await storage.init_db() ensure_store_initialized(settings) await _ensure_runtime_defaults(storage) monitor.state.chart_bar = FIXED_BAR scheduler.add_job(monitor.run_cycle, "interval", seconds=settings.app.poll_interval_seconds, max_instances=1) if settings.key_monitor.enabled: scheduler.add_job( key_monitor.run_poll, "interval", seconds=settings.key_monitor.poll_interval_seconds, max_instances=1, id=KEY_MONITOR_JOB_ID, replace_existing=True, ) dr = await _get_daily_report_settings(storage, settings) if dr["enabled"]: hh, mm = _parse_hhmm(str(dr["run_time_cn"])) scheduler.add_job( daily_report.run_once, "cron", hour=hh, minute=mm, max_instances=1, timezone="Asia/Shanghai", id=DAILY_REPORT_JOB_ID, replace_existing=True, ) scheduler.start() await monitor.run_cycle() if dr["enabled"] and dr["run_on_startup"]: await daily_report.run_once() await storage.add_log( "INFO", ( f"service_started_gate_usdt gemma={'on' if settings.gemma.enabled else 'off'} " f"proxy={'on ' + settings.proxy.url if settings.proxy.enabled else 'off'} " f"web_login={'on' if settings.auth.enabled else 'off'} " f"daily_report={'on' if settings.daily_report.enabled else 'off'} " f"key_monitor={'on' if settings.key_monitor.enabled else 'off'}" ), ) LOGGER.info("Service started") @app.on_event("shutdown") async def on_shutdown() -> None: scheduler.shutdown(wait=False) await storage.add_log("INFO", "service_stopped") await storage.close() @app.get("/", response_class=HTMLResponse) async def root(request: Request) -> HTMLResponse: if not settings.auth.enabled: return RedirectResponse("/dashboard", status_code=302) if request.session.get("logged_in") is True: return RedirectResponse("/dashboard", status_code=302) return RedirectResponse("/login", status_code=302) @app.get("/login", response_class=HTMLResponse) async def login_page(request: Request) -> HTMLResponse: if not settings.auth.enabled: return RedirectResponse("/dashboard", status_code=302) return templates.TemplateResponse("login.html", {"request": request, "error": ""}) @app.post("/login", response_class=HTMLResponse) async def login_submit(request: Request, username: str = Form(...), password: str = Form(...)) -> HTMLResponse: if not settings.auth.enabled: return RedirectResponse("/dashboard", status_code=302) ok_user = username == app.state.auth_user ok_pass = _hash_password(password) == app.state.auth_password_hash if ok_user and ok_pass: request.session["logged_in"] = True request.session["username"] = username return RedirectResponse("/dashboard", status_code=302) return templates.TemplateResponse("login.html", {"request": request, "error": "用户名或密码错误"}) @app.get("/logout") async def logout(request: Request) -> RedirectResponse: request.session.clear() if not settings.auth.enabled: return RedirectResponse("/dashboard", status_code=302) return RedirectResponse("/login", status_code=302) @app.get("/dashboard", response_class=HTMLResponse) async def dashboard(request: Request) -> HTMLResponse: if settings.auth.enabled and request.session.get("logged_in") is not True: return RedirectResponse("/login", status_code=302) display_name = request.session.get("username") or settings.auth.username or "admin" return templates.TemplateResponse( "dashboard.html", { "request": request, "username": display_name, "asset_version": _asset_version(root_dir), }, ) @app.get("/api/status") async def api_status(_: None = Depends(require_login)) -> JSONResponse: intraday = await _get_intraday_settings(storage) return JSONResponse( { "running": True, "state": _slim_monitor_state(monitor.state), "poll_interval_seconds": settings.app.poll_interval_seconds, "chart_bar": FIXED_BAR, "mode": "GATE_USDT_PERP", "universe": settings.monitor.universe, "intraday_settings": intraday, "gemma_enabled": settings.gemma.enabled, "gemma_model": settings.gemma.model, "key_monitor": settings.key_monitor.model_dump(), } ) @app.get("/api/settings") async def api_settings_get(_: None = Depends(require_login)) -> JSONResponse: intraday = await _get_intraday_settings(storage) daily = await _get_daily_report_settings(storage, settings) blocklist = await _get_symbol_blocklist_settings(storage) return JSONResponse( { "chart_bar": FIXED_BAR, "intraday_settings": intraday, "daily_report_settings": daily, "symbol_blocklist_settings": blocklist, "order_executors": read_snapshot(settings), } ) @app.get("/api/order-executors") async def api_order_executors_get(_: None = Depends(require_login)) -> JSONResponse: return JSONResponse(read_snapshot(settings)) @app.put("/api/order-executors/settings") async def api_order_executors_settings(request: Request, _: None = Depends(require_login)) -> JSONResponse: body = await request.json() try: snap = write_global_settings( settings, enabled=body.get("enabled") if "enabled" in body else None, webhook_secret=body.get("webhook_secret") if "webhook_secret" in body else None, timeout_seconds=body.get("timeout_seconds") if "timeout_seconds" in body else None, ) except ValueError as exc: return JSONResponse({"ok": False, "detail": str(exc)}, status_code=400) await storage.add_log( "INFO", ( "order_executors_settings_updated " f"enabled={snap.get('enabled')} timeout={snap.get('timeout_seconds')} " f"secret_set={bool((snap.get('webhook_secret') or '').strip())}" ), ) return JSONResponse({"ok": True, "order_executors": snap}) @app.post("/api/order-executors") async def api_order_executors_add(request: Request, _: None = Depends(require_login)) -> JSONResponse: body = await request.json() try: row = add_executor( settings, name=str(body.get("name") or ""), base_url=str(body.get("base_url") or ""), enabled=bool(body.get("enabled", True)), ) except ValueError as exc: return JSONResponse({"ok": False, "detail": str(exc)}, status_code=400) await storage.add_log( "INFO", f"order_executor_added name={row.get('name')} url={row.get('base_url')}", ) return JSONResponse({"ok": True, "executor": row, "order_executors": read_snapshot(settings)}) @app.patch("/api/order-executors/{executor_id}") async def api_order_executors_patch( executor_id: str, request: Request, _: None = Depends(require_login) ) -> JSONResponse: body = await request.json() try: row = update_executor( settings, executor_id, name=body.get("name") if "name" in body else None, base_url=body.get("base_url") if "base_url" in body else None, enabled=body.get("enabled") if "enabled" in body else None, ) except ValueError as exc: code = 404 if str(exc) == "executor_not_found" else 400 return JSONResponse({"ok": False, "detail": str(exc)}, status_code=code) await storage.add_log("INFO", f"order_executor_updated id={executor_id} name={row.get('name')}") return JSONResponse({"ok": True, "executor": row, "order_executors": read_snapshot(settings)}) @app.delete("/api/order-executors/{executor_id}") async def api_order_executors_delete(executor_id: str, _: None = Depends(require_login)) -> JSONResponse: try: delete_executor(settings, executor_id) except ValueError as exc: return JSONResponse({"ok": False, "detail": str(exc)}, status_code=404) await storage.add_log("INFO", f"order_executor_deleted id={executor_id}") return JSONResponse({"ok": True, "order_executors": read_snapshot(settings)}) @app.post("/api/settings/intraday") async def api_settings_intraday(request: Request, _: None = Depends(require_login)) -> JSONResponse: body = await request.json() range_hours = _must_float(body.get("range_hours"), "range_hours") range_max_pct = _must_float(body.get("range_max_pct"), "range_max_pct") volume_spike_mult = _must_float(body.get("volume_spike_mult"), "volume_spike_mult") volume_lookback_bars = int(_must_float(body.get("volume_lookback_bars"), "volume_lookback_bars")) breakout_buffer_pct = _must_float(body.get("breakout_buffer_pct"), "breakout_buffer_pct") stop_buffer_pct = _must_float(body.get("stop_buffer_pct"), "stop_buffer_pct") push_time_window_enabled = _to_bool(body.get("push_time_window_enabled"), True) if range_hours < 1: raise HTTPException(status_code=400, detail="range_hours must be >= 1") if range_max_pct <= 0: raise HTTPException(status_code=400, detail="range_max_pct must be > 0") if volume_spike_mult < 1: raise HTTPException(status_code=400, detail="volume_spike_mult must be >= 1") if volume_lookback_bars < 5: raise HTTPException(status_code=400, detail="volume_lookback_bars must be >= 5") if breakout_buffer_pct < 0: raise HTTPException(status_code=400, detail="breakout_buffer_pct must be >= 0") if stop_buffer_pct < 0 or stop_buffer_pct > 10: raise HTTPException(status_code=400, detail="stop_buffer_pct must be between 0 and 10") await storage.set_kv("intraday_range_hours", str(range_hours)) await storage.set_kv("intraday_range_max_pct", str(range_max_pct)) await storage.set_kv("intraday_volume_spike_mult", str(volume_spike_mult)) await storage.set_kv("intraday_volume_lookback_bars", str(volume_lookback_bars)) await storage.set_kv("intraday_breakout_buffer_pct", str(breakout_buffer_pct)) await storage.set_kv("intraday_stop_buffer_pct", str(stop_buffer_pct)) await storage.set_kv("intraday_push_time_window_enabled", "1" if push_time_window_enabled else "0") await storage.add_log( "INFO", ( "intraday_settings_updated " f"range_hours={range_hours} range_max_pct={range_max_pct} " f"volume_spike_mult={volume_spike_mult} volume_lookback_bars={volume_lookback_bars} " f"breakout_buffer_pct={breakout_buffer_pct} stop_buffer_pct={stop_buffer_pct} " f"push_time_window_enabled={push_time_window_enabled}" ), ) return JSONResponse({"ok": True, "intraday_settings": await _get_intraday_settings(storage)}) @app.post("/api/settings/daily-report") async def api_settings_daily_report(request: Request, _: None = Depends(require_login)) -> JSONResponse: body = await request.json() enabled = bool(body.get("enabled", True)) run_time_cn = str(body.get("run_time_cn") or "08:30").strip() push_wecom = bool(body.get("push_wecom", True)) run_on_startup = bool(body.get("run_on_startup", False)) hh, mm = _parse_hhmm(run_time_cn) run_time_cn = f"{hh:02d}:{mm:02d}" await storage.set_kv("daily_report_enabled", "1" if enabled else "0") await storage.set_kv("daily_report_run_time_cn", run_time_cn) await storage.set_kv("daily_report_push_wecom", "1" if push_wecom else "0") await storage.set_kv("daily_report_run_on_startup", "1" if run_on_startup else "0") if scheduler.get_job(DAILY_REPORT_JOB_ID): scheduler.remove_job(DAILY_REPORT_JOB_ID) if enabled: scheduler.add_job( daily_report.run_once, "cron", hour=hh, minute=mm, max_instances=1, timezone="Asia/Shanghai", id=DAILY_REPORT_JOB_ID, replace_existing=True, ) daily = await _get_daily_report_settings(storage, settings) await storage.add_log( "INFO", ( "daily_report_settings_updated " f"enabled={daily['enabled']} run_time_cn={daily['run_time_cn']} " f"push_wecom={daily['push_wecom']} run_on_startup={daily['run_on_startup']}" ), ) return JSONResponse({"ok": True, "daily_report_settings": daily}) @app.post("/api/settings/symbol-blocklist") async def api_settings_symbol_blocklist(request: Request, _: None = Depends(require_login)) -> JSONResponse: body = await request.json() symbols = _normalize_manual_symbols(body.get("symbols_text", "")) await storage.set_kv("monitor_symbol_blocklist", json.dumps(symbols, ensure_ascii=False)) await storage.add_log( "INFO", f"symbol_blocklist_updated count={len(symbols)} symbols={','.join(symbols[:30])}{'…' if len(symbols) > 30 else ''}", ) return JSONResponse({"ok": True, "symbol_blocklist_settings": await _get_symbol_blocklist_settings(storage)}) def _key_rule_text() -> str: km = settings.key_monitor return ( f"周期 5m|突破K/确认K:倒数第2/第1根闭合K|量能:突破K量 > 前{km.volume_ma_bars}均量×{km.volume_ratio_min}|" f"计划RR须 > {km.min_planned_rr:g}|日成交额排名前{km.daily_volume_rank_max}|" f"箱体/收敛方案:标准突破(止损突破K外{km.standard_stop_outside_pct:g}%|止盈1×H)或 " f"趋势突破(止损突破K外{km.trend_stop_outside_pct:g}%|止盈手填)|" f"触发后企微+{'转发执行器' if km.forward_executor else '不转发执行器'}" ) @app.get("/api/key-monitors") async def api_key_monitors_list(_: None = Depends(require_login)) -> JSONResponse: rows = await storage.list_key_monitors() enriched = [] for row in rows: preview = await key_monitor.preview_row(row) enriched.append({**row, "preview": preview}) history = await storage.list_key_monitor_history(limit=200) return JSONResponse( { "active": enriched, "history": history, "rule_text": _key_rule_text(), "config": settings.key_monitor.model_dump(), } ) @app.post("/api/key-monitors") async def api_key_monitors_add(request: Request, _: None = Depends(require_login)) -> JSONResponse: body = await request.json() sym = _normalize_symbol_token(body.get("symbol")) if not sym: return JSONResponse({"ok": False, "detail": "invalid symbol"}, status_code=400) inst = gate_client.symbol_to_swap_inst_id(sym) monitor_type = normalize_monitor_type(body.get("monitor_type")) if monitor_type not in KEY_MONITOR_TYPES: return JSONResponse({"ok": False, "detail": "invalid monitor_type"}, status_code=400) direction = str(body.get("direction") or "").strip().lower() if direction not in ("long", "short"): return JSONResponse({"ok": False, "detail": "direction must be long or short"}, status_code=400) try: upper = float(body.get("upper")) lower = float(body.get("lower")) except (TypeError, ValueError): return JSONResponse({"ok": False, "detail": "upper/lower required"}, status_code=400) if upper <= lower: return JSONResponse({"ok": False, "detail": "upper must be > lower"}, status_code=400) sl_tp_mode = normalize_sl_tp_mode(body.get("sl_tp_mode")) manual_tp = body.get("manual_take_profit") manual_tp_f: float | None = None if sl_tp_mode == "trend_manual": try: manual_tp_f = float(manual_tp) except (TypeError, ValueError): return JSONResponse({"ok": False, "detail": "manual_take_profit required for trend"}, status_code=400) stop_pct = stop_outside_pct_for_mode(sl_tp_mode) if sl_tp_mode == "standard": stop_pct = float(settings.key_monitor.standard_stop_outside_pct) else: stop_pct = float(settings.key_monitor.trend_stop_outside_pct) be = 1 if str(body.get("breakeven_enabled") or "").lower() in ("1", "true", "on", "yes") else 0 kid = await storage.add_key_monitor( symbol=sym, inst_id=inst, monitor_type=monitor_type, direction=direction, upper=upper, lower=lower, sl_tp_mode=sl_tp_mode, manual_take_profit=manual_tp_f, stop_outside_pct=stop_pct, breakeven_enabled=be, note=str(body.get("note") or "")[:500] or None, ) await storage.add_log( "INFO", f"key_monitor_added id={kid} sym={sym} type={monitor_type} mode={sl_tp_mode} dir={direction}", ) return JSONResponse({"ok": True, "id": kid}) @app.delete("/api/key-monitors/{kid}") async def api_key_monitors_delete(kid: int, _: None = Depends(require_login)) -> JSONResponse: row = await storage.get_key_monitor(kid) if not row: return JSONResponse({"ok": False, "detail": "not_found"}, status_code=404) await storage.finalize_key_monitor( row, close_reason="manual", last_alert_message=None, confirm_close=None, planned_sl=None, planned_tp=None, planned_rr=None, executor_signal_id=None, executor_status=None, checks=None, ) await storage.add_log("INFO", f"key_monitor_manual_close id={kid} sym={row.get('symbol')}") return JSONResponse({"ok": True}) @app.delete("/api/key-monitors/history/{hid}") async def api_key_history_delete(hid: int, _: None = Depends(require_login)) -> JSONResponse: ok = await storage.delete_key_monitor_history(hid) if not ok: return JSONResponse({"ok": False, "detail": "not_found"}, status_code=404) return JSONResponse({"ok": True}) @app.get("/export/key_monitor_history.csv") async def export_key_monitor_history( days: int = 30, _: None = Depends(require_login), ) -> StreamingResponse: days = max(1, min(365, int(days))) end_utc = datetime.utcnow() start_utc = end_utc - timedelta(days=days) rows = await storage.export_key_monitor_history_rows(start_utc=start_utc, end_utc=end_utc) import csv import io buf = io.StringIO() head = [ "id", "symbol", "monitor_type", "direction", "sl_tp_mode", "upper", "lower", "confirm_close", "planned_sl", "planned_tp", "planned_rr", "executor_signal_id", "executor_status", "close_reason", "closed_at", ] w = csv.writer(buf) w.writerow(head) for r in rows: w.writerow( [ r.get("id"), r.get("symbol"), r.get("monitor_type"), r.get("direction"), r.get("sl_tp_mode"), r.get("upper"), r.get("lower"), r.get("confirm_close"), r.get("planned_sl"), r.get("planned_tp"), r.get("planned_rr"), r.get("executor_signal_id"), r.get("executor_status"), r.get("close_reason"), r.get("closed_at"), ] ) day = datetime.utcnow().strftime("%Y%m%d") content = buf.getvalue().encode("utf-8-sig") return StreamingResponse( iter([content]), media_type="text/csv; charset=utf-8", headers={"Content-Disposition": f'attachment; filename="key_monitor_history_{day}.csv"'}, ) @app.get("/api/alerts") async def api_alerts(_: None = Depends(require_login)) -> JSONResponse: alerts = await storage.get_recent_alerts(limit=120) return JSONResponse({"items": alerts}) @app.get("/api/logs") async def api_logs(_: None = Depends(require_login)) -> JSONResponse: logs = await storage.get_recent_logs(limit=120) return JSONResponse({"items": logs}) @app.get("/api/config") async def api_config(_: None = Depends(require_login)) -> JSONResponse: symbols = [{"symbol": w.symbol.upper()} for w in settings.watch_symbols] g = settings.gemma dr = await _get_daily_report_settings(storage, settings) return JSONResponse( { "auth_enabled": settings.auth.enabled, "host": settings.app.host, "port": settings.app.port, "poll_interval_seconds": settings.app.poll_interval_seconds, "universe": settings.monitor.universe, "min_24h_quote_volume_usdt": settings.monitor.min_24h_quote_volume_usdt, "btc_daily_gate_enabled": settings.monitor.btc_daily_gate_enabled, "btc_sideways_lookback_days": settings.monitor.btc_sideways_lookback_days, "btc_sideways_max_range_pct": settings.monitor.btc_sideways_max_range_pct, "symbol_signal_dedupe_hours": settings.monitor.symbol_signal_dedupe_hours, "wecom_push_max_volume_rank": settings.monitor.wecom_push_max_volume_rank, "gemma": { "enabled": g.enabled, "ollama_base_url": g.ollama_base_url, "api_style": g.api_style, "api_key_set": bool((g.api_key or "").strip()), "model": g.model, "max_funnel_per_cycle": g.max_funnel_per_cycle, "vision_top_n": g.vision_top_n, "gemma_push_priority_min": g.gemma_push_priority_min, "composite_push_min": g.composite_push_min, }, "daily_report": dr, "proxy": { "enabled": settings.proxy.enabled, "url": settings.proxy.url if settings.proxy.enabled else "", }, "order_executor": read_snapshot(settings), "key_monitor": settings.key_monitor.model_dump(), "watch_symbols": symbols, } ) @app.get("/api/funnel") async def api_funnel( window_hours: float = FUNNEL_DISPLAY_HOURS_DEFAULT, _: None = Depends(require_login), ) -> JSONResponse: wh = max( FUNNEL_DISPLAY_HOURS_MIN, min(FUNNEL_DISPLAY_HOURS_MAX, float(window_hours)), ) alerts = await storage.get_recent_alerts(limit=500) items = [a for a in alerts if (a.get("details") or {}).get("source") == "gemma_funnel"] items = _filter_alerts_within_hours(items, within_hours=wh) items = _dedupe_funnel_alerts_by_symbol(items) items.sort( key=lambda x: float((x.get("details") or {}).get("composite_score") or 0.0), reverse=True, ) return JSONResponse({"items": items[:100], "window_hours": wh}) @app.get("/api/daily-report") async def api_daily_report(_: None = Depends(require_login)) -> JSONResponse: raw = await storage.get_kv("daily_report_latest") if not raw: return JSONResponse( { "ready": False, "message": "晨报尚未生成。请等待定时任务,或开启 daily_report.run_on_startup。", } ) try: obj = json.loads(raw) except json.JSONDecodeError: return JSONResponse({"ready": False, "message": "晨报解析失败"}, status_code=500) return JSONResponse({"ready": True, "report": obj}) @app.post("/api/daily-report/run") async def api_daily_report_run(_: None = Depends(require_login)) -> JSONResponse: dr = await _get_daily_report_settings(storage, settings) if not dr["enabled"]: return JSONResponse({"ok": False, "message": "daily_report.enabled=false"}, status_code=400) report = await daily_report.run_once() return JSONResponse({"ok": True, "report": report}) return app async def _ensure_runtime_defaults(storage: Storage) -> None: defaults = { "intraday_range_hours": "12", "intraday_range_max_pct": "2.0", "intraday_volume_spike_mult": "1.6", "intraday_volume_lookback_bars": "18", "intraday_breakout_buffer_pct": "0.03", "intraday_push_time_window_enabled": "1", "intraday_stop_buffer_pct": "0.2", "daily_report_enabled": "1", "daily_report_run_time_cn": "08:30", "daily_report_push_wecom": "1", "daily_report_run_on_startup": "0", } for key, value in defaults.items(): if await storage.get_kv(key) is None: await storage.set_kv(key, value) async def _get_intraday_settings(storage: Storage) -> dict: return { "range_hours": _to_float(await storage.get_kv("intraday_range_hours"), 24.0), "range_max_pct": _to_float(await storage.get_kv("intraday_range_max_pct"), 1.5), "volume_spike_mult": _to_float(await storage.get_kv("intraday_volume_spike_mult"), 1.6), "volume_lookback_bars": int(_to_float(await storage.get_kv("intraday_volume_lookback_bars"), 20.0)), "breakout_buffer_pct": _to_float(await storage.get_kv("intraday_breakout_buffer_pct"), 0.05), "push_time_window_enabled": _to_bool(await storage.get_kv("intraday_push_time_window_enabled"), True), "stop_buffer_pct": _to_float(await storage.get_kv("intraday_stop_buffer_pct"), 0.2), } async def _get_daily_report_settings(storage: Storage, settings: Settings) -> dict: return { "enabled": _to_bool(await storage.get_kv("daily_report_enabled"), settings.daily_report.enabled), "run_time_cn": str(await storage.get_kv("daily_report_run_time_cn") or settings.daily_report.run_time_cn), "push_wecom": _to_bool(await storage.get_kv("daily_report_push_wecom"), settings.daily_report.push_wecom), "run_on_startup": _to_bool(await storage.get_kv("daily_report_run_on_startup"), settings.daily_report.run_on_startup), } async def _get_symbol_blocklist_settings(storage: Storage) -> dict: raw = await storage.get_kv("monitor_symbol_blocklist") symbols: list[str] = [] if raw and str(raw).strip(): try: data = json.loads(raw) if isinstance(data, list): seen: set[str] = set() for x in data: s = str(x).strip().upper() if s and s not in seen: seen.add(s) symbols.append(s) except json.JSONDecodeError: symbols = [] return { "symbols": symbols, "symbols_text": "\n".join(symbols), "count": len(symbols), } def _to_float(raw: str | None, default: float) -> float: try: return float(raw) if raw is not None else default except (TypeError, ValueError): return default def _must_float(raw: object, name: str) -> float: try: return float(raw) except (TypeError, ValueError): raise HTTPException(status_code=400, detail=f"{name} must be a number")