from __future__ import annotations import asyncio import json import logging from dataclasses import dataclass, field from datetime import datetime, timezone from typing import TYPE_CHECKING from .btc_regime import evaluate_btc_daily_gate from .chart_candles import daily_candles_png_base64 from .config import Settings, WatchSymbol from .daily_features import ( build_daily_programmatic, composite_score, daily_ohlc_text_block, programmatic_scores, ) from .exchange_rules import IntradayRuleParams, evaluate_exchange from .notifier import WeComNotifier from .order_executor_forward import build_order_executor_payload, forward_signal_to_executors from .order_executors_store import read_forward_config, record_last_forward from .gate import GateClient from .storage import Storage if TYPE_CHECKING: from .gemma_client import OllamaGemmaClient LOGGER = logging.getLogger("onchain_scout.monitor") FIXED_BAR = "5m" # 最近 8 墙钟小时 ≈ 32 根 15m K BTC_15M_BARS_PER_8H = 32 @dataclass class RuntimeState: started_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) last_cycle_at: str = "" last_cycle_status: str = "INIT" last_cycle_msg: str = "" chart_bar: str = FIXED_BAR universe: str = "all_swaps" intraday_params: dict = field(default_factory=dict) monitoring_pool: list[dict] = field(default_factory=list) perpetual_symbols_count: int = 0 monitored_inst_count: int = 0 pushed_alerts_count: int = 0 btc_gate_allow: bool = True btc_gate_regime: str = "" btc_gate_reason: str = "" btc_gate_metrics: dict = field(default_factory=dict) btc_env_8h_15m: str = "" symbol_blocklist_count: int = 0 symbol_blocklist_removed: int = 0 last_funnel: list[dict] = field(default_factory=list) last_funnel_at: str = "" gemma_cycle_msg: str = "" class MonitorService: def __init__( self, settings: Settings, storage: Storage, gate_client: GateClient, notifier: WeComNotifier, gemma_client: OllamaGemmaClient | None = None, ) -> None: self.settings = settings self.storage = storage self.gate = gate_client self.notifier = notifier self.gemma_client = gemma_client self.state = RuntimeState() self._lock = asyncio.Lock() self._funnel_bg_task: asyncio.Task[None] | None = None @staticmethod def _symbol_blocklist_from_kv(raw: str | None) -> frozenset[str]: if not raw or not str(raw).strip(): return frozenset() try: data = json.loads(raw) except json.JSONDecodeError: return frozenset() if not isinstance(data, list): return frozenset() out: set[str] = set() for x in data: s = str(x).strip().upper() if s: out.add(s) return frozenset(out) async def _maybe_forward_order_executor(self, sym: str, inst: str, push_metrics: dict) -> None: if not self.settings.key_monitor.auto_scan_forward_executor: return fwd = read_forward_config(self.settings) if not fwd.get("enabled"): return secret = str(fwd.get("webhook_secret") or "").strip() if not secret: await self.storage.add_log( "WARN", "order_executor.enabled=true but webhook_secret is empty; skip POST /v1/signal", ) return targets = list(fwd.get("executors") or []) if not targets: await self.storage.add_log( "WARN", "order_executor.enabled=true but no enabled executor in panel list; skip POST /v1/signal", ) return payload = build_order_executor_payload(inst_id=inst, metrics=push_metrics) if not payload: await self.storage.add_log( "WARN", f"order_executor_build_payload_failed sym={sym} inst={inst}", ) return try: results = await forward_signal_to_executors( self.settings, executors=targets, webhook_secret=secret, timeout_seconds=float(fwd.get("timeout_seconds") or 15.0), payload=payload, ) except Exception as exc: # noqa: BLE001 await self.storage.add_log( "ERROR", f"order_executor_forward_exception sym={sym} inst={inst}: {exc}", ) return for out in results: name = out.get("name") or "?" eid = str(out.get("executor_id") or "") status = out.get("http_status") ok = out.get("ok") body = out.get("body") or {} st = body.get("status") if isinstance(body, dict) else None detail = out.get("error") if not detail and isinstance(body, dict): detail = body.get("reason") or body.get("detail") try: record_last_forward( self.settings, eid, http_status=int(status or 0), ok=bool(ok), exec_status=str(st) if st is not None else None, detail=str(detail) if detail is not None else None, ) except Exception: # noqa: BLE001 pass if ok: await self.storage.add_log( "INFO", f"order_executor_ok name={name} sym={sym} inst={inst} http={status} exec_status={st}", ) else: await self.storage.add_log( "ERROR", f"order_executor_failed name={name} sym={sym} inst={inst} http={status} body={body!r} err={detail}", ) @staticmethod def _btc_intraday_bias(btc_rows: list[list[str]]) -> str: closes: list[float] = [] for row in btc_rows: if len(row) < 5: continue try: closes.append(float(row[4])) except (TypeError, ValueError): continue if len(closes) < 2: return "NEUTRAL" if closes[-1] > closes[-2]: return "BULL" if closes[-1] < closes[-2]: return "BEAR" return "NEUTRAL" @staticmethod def _ema(values: list[float], period: int) -> float: if not values: return 0.0 p = max(1, period) alpha = 2.0 / (p + 1.0) ema = values[0] for v in values[1:]: ema = alpha * v + (1 - alpha) * ema return ema @staticmethod def _status_by_ema55(rows: list[list[str]]) -> str: closes: list[float] = [] highs: list[float] = [] lows: list[float] = [] for row in rows: if len(row) < 5: continue try: highs.append(float(row[2])) lows.append(float(row[3])) closes.append(float(row[4])) except (TypeError, ValueError): continue if len(closes) < 55: return "横盘" ema55 = MonitorService._ema(closes[-120:], 55) last = closes[-1] lb = min(21, len(closes)) h = max(highs[-lb:]) l = min(lows[-lb:]) mid = (h + l) / 2 if h > l else 0.0 range_pct = ((h - l) / mid * 100.0) if mid > 0 else 999.0 if range_pct <= 2.0: return "横盘" if last >= ema55: return "多头" return "空头" @staticmethod def _btc_env_15m_last_8h(rows_15m: list[list[str]]) -> str: """最近 8 小时内 BTC 15m 走势:多头 / 空头 / 横盘(窄幅视为横盘)。""" closes: list[float] = [] highs: list[float] = [] lows: list[float] = [] for row in rows_15m: if len(row) < 5: continue try: highs.append(float(row[2])) lows.append(float(row[3])) closes.append(float(row[4])) except (TypeError, ValueError): continue need = BTC_15M_BARS_PER_8H if len(closes) < need: return "横盘" h_win = max(highs[-need:]) l_win = min(lows[-need:]) mid = (h_win + l_win) / 2 if h_win > l_win else 0.0 range_pct = ((h_win - l_win) / mid * 100.0) if mid > 0 else 999.0 if range_pct <= 1.8: return "横盘" warmup = min(len(closes), 96) seq = closes[-warmup:] if len(seq) < 21: return "横盘" ema21 = MonitorService._ema(seq, 21) last = closes[-1] if last >= ema21: return "多头" return "空头" @staticmethod def _push_matches_btc_env(btc_env: str, signal_side: str) -> bool: if signal_side not in {"LONG", "SHORT"}: return False if btc_env == "横盘": return True if btc_env == "多头": return signal_side == "LONG" if btc_env == "空头": return signal_side == "SHORT" return False @staticmethod def _within_push_window_utc8(enabled: bool) -> bool: if not enabled: return True now_utc = datetime.now(timezone.utc) bj_hour = (now_utc.hour + 8) % 24 return 9 <= bj_hour < 23 async def run_cycle(self) -> None: funnel_candidates: list[dict] = [] async with self._lock: try: funnel_candidates = await self._run_cycle_inner() self.state.last_cycle_status = "OK" self.state.last_cycle_msg = "cycle_completed" except Exception as exc: # noqa: BLE001 msg = f"cycle_failed: {exc}" self.state.last_cycle_status = "ERROR" self.state.last_cycle_msg = msg LOGGER.exception(msg) await self.storage.add_log("ERROR", msg) funnel_candidates = [] finally: # 本轮扫描结束后刷新,避免 HUD「LAST」与墙钟脱节(原先在周期开始时写入) self.state.last_cycle_at = datetime.now(timezone.utc).isoformat() if self.gemma_client and self.settings.gemma.enabled and funnel_candidates: if self._funnel_bg_task is not None and not self._funnel_bg_task.done(): await self.storage.add_log( "WARN", f"funnel_skipped_previous_still_running candidates={len(funnel_candidates)}", ) else: self._funnel_bg_task = asyncio.create_task( self._run_gemma_funnel_safe(funnel_candidates), name="gemma_funnel", ) async def _run_gemma_funnel_safe(self, candidates: list[dict]) -> None: try: await self._run_gemma_funnel(candidates) except Exception as exc: # noqa: BLE001 msg = f"funnel_failed: {exc}" LOGGER.exception(msg) await self.storage.add_log("ERROR", msg) async with self._lock: self.state.gemma_cycle_msg = f"funnel_failed: {exc!s}"[:500] async def _run_cycle_inner(self) -> list[dict]: bar = FIXED_BAR self.state.chart_bar = bar universe = self.settings.monitor.universe self.state.universe = universe rule_params = await self._load_intraday_params() stop_buffer_pct = _as_float(await self.storage.get_kv("intraday_stop_buffer_pct"), 0.2) stop_buffer_pct = max(0.0, min(stop_buffer_pct, 10.0)) self.state.intraday_params = { "range_hours": rule_params.range_hours, "range_max_pct": rule_params.range_max_pct, "volume_spike_mult": rule_params.volume_spike_mult, "volume_lookback_bars": rule_params.volume_lookback_bars, "breakout_buffer_pct": rule_params.breakout_buffer_pct, "stop_buffer_pct": stop_buffer_pct, } blocklist = self._symbol_blocklist_from_kv(await self.storage.get_kv("monitor_symbol_blocklist")) self.state.symbol_blocklist_count = len(blocklist) self.state.symbol_blocklist_removed = 0 listed_bases = await self.gate.get_perpetual_symbols() self.state.perpetual_symbols_count = len(listed_bases) min_vol = float(self.settings.monitor.min_24h_quote_volume_usdt) vol_map: dict[str, float] = {} watch_insts: list[str] = [] if universe == "all_swaps": if min_vol <= 0: await self.storage.add_log( "ERROR", "all_swaps requires monitor.min_24h_quote_volume_usdt > 0; skipping cycle", ) self.state.monitoring_pool = [] self.state.monitored_inst_count = 0 return [] vol_map = await self.gate.get_usdt_swap_est_quote_volume_map() all_ids = await self.gate.list_live_usdt_swap_inst_ids() watch_insts = [i for i in all_ids if vol_map.get(i, 0.0) >= min_vol] await self.storage.add_log( "INFO", f"universe=all_swaps bar={bar} min_usdt={min_vol:.0f} pool={len(watch_insts)}/{len(all_ids)}", ) else: watchlist = [w for w in self.settings.watch_symbols if w.symbol.upper() in listed_bases] if min_vol > 0: vol_map = await self.gate.get_usdt_swap_est_quote_volume_map() before = len(watchlist) kept: list[WatchSymbol] = [] for w in watchlist: inst = self.gate.symbol_to_swap_inst_id(w.symbol) est = vol_map.get(inst, 0.0) if est >= min_vol: kept.append(w) else: await self.storage.add_log( "INFO", f"{w.symbol.upper()} skipped_24h_vol est_usdt={est:.0f} < min={min_vol:.0f}", ) watchlist = kept await self.storage.add_log( "INFO", f"universe=watchlist volume_filter min_usdt={min_vol:.0f} kept={len(watchlist)}/{before}", ) watch_insts = [self.gate.symbol_to_swap_inst_id(w.symbol) for w in watchlist] await self.storage.add_log( "INFO", f"universe=watchlist bar={bar} pool={len(watch_insts)} gate_bases={len(listed_bases)}", ) if blocklist: before_bl = len(watch_insts) watch_insts = [ i for i in watch_insts if self.gate.inst_id_to_base_symbol(i) not in blocklist ] removed = before_bl - len(watch_insts) self.state.symbol_blocklist_removed = removed if removed: await self.storage.add_log( "INFO", f"symbol_blocklist removed={removed} pool_now={len(watch_insts)} rules={len(blocklist)}", ) self.state.monitored_inst_count = len(watch_insts) push_window_enabled = _as_bool(await self.storage.get_kv("intraday_push_time_window_enabled"), True) vol_rank_map: dict[str, int] = {} vol_rank_total = len(watch_insts) if vol_map and watch_insts: sorted_insts = sorted(watch_insts, key=lambda x: float(vol_map.get(x, 0.0)), reverse=True) vol_rank_map = {inst_id: idx + 1 for idx, inst_id in enumerate(sorted_insts)} self.state.monitoring_pool = [] for inst in watch_insts: sym = self.gate.inst_id_to_base_symbol(inst) entry: dict = {"symbol": sym, "instId": inst} if vol_map: entry["est_quote_vol_24h_usdt"] = round(vol_map.get(inst, 0.0), 2) self.state.monitoring_pool.append(entry) btc_inst = self.gate.symbol_to_swap_inst_id("BTC") if self.settings.monitor.btc_daily_gate_enabled: btc_1d = await self.gate.get_candles(btc_inst, "1D", limit=60) gate = evaluate_btc_daily_gate( btc_1d, sideways_lookback_days=self.settings.monitor.btc_sideways_lookback_days, sideways_max_range_pct=self.settings.monitor.btc_sideways_max_range_pct, ) self.state.btc_gate_allow = gate.allow_alt_scan self.state.btc_gate_regime = gate.regime self.state.btc_gate_reason = gate.reason self.state.btc_gate_metrics = dict(gate.metrics) if not gate.allow_alt_scan: await self.storage.add_log( "INFO", ( f"btc_daily_gate regime={gate.regime} reason={gate.reason} " f"(informational only; scan continues) metrics={gate.metrics}" ), ) else: self.state.btc_gate_allow = True self.state.btc_gate_regime = "disabled" self.state.btc_gate_reason = "btc_daily_gate_enabled=false" self.state.btc_gate_metrics = {} btc_rows = await self.gate.get_candles(btc_inst, bar, limit=120) btc_bias_5m = self._btc_intraday_bias(btc_rows) btc_15m_rows = await self.gate.get_candles(btc_inst, "15m", limit=120) btc_env_8h_15m = self._btc_env_15m_last_8h(btc_15m_rows) self.state.btc_env_8h_15m = btc_env_8h_15m await self.storage.add_log( "INFO", f"btc_intraday_bias_5m={btc_bias_5m} btc_env_8h_15m={btc_env_8h_15m}", ) funnel_candidates: list[dict] = [] for inst in watch_insts: sym = self.gate.inst_id_to_base_symbol(inst) try: alt_rows = await self.gate.get_candles(inst, bar, limit=120) except Exception as exc: # noqa: BLE001 await self.storage.add_log("WARN", f"{sym} candles_failed: {exc}") continue try: result = evaluate_exchange(sym, alt_rows, btc_rows, rule_params) except Exception as exc: # noqa: BLE001 await self.storage.add_log("WARN", f"{sym} evaluate_failed: {exc}") continue if result.signal_level in {"WATCH", "TRIGGER"}: est_vol = float(vol_map.get(inst, 0.0)) if vol_map else 0.0 signal_side = str((result.metrics or {}).get("signal_side") or result.signal_side or "NONE") push_allowed = result.signal_level == "TRIGGER" funnel_candidates.append( { "symbol": sym, "inst": inst, "est_vol": est_vol, "est_vol_rank": int(vol_rank_map.get(inst, 0)) if vol_rank_map else 0, "est_vol_rank_total": int(vol_rank_total), "signal_level": result.signal_level, "signal_side": signal_side, "btc_bias_5m": btc_bias_5m, "push_allowed": push_allowed, "btc_env_8h_15m": btc_env_8h_15m, "intraday_metrics": dict(result.metrics), } ) dedupe_h = float(self.settings.monitor.symbol_signal_dedupe_hours) chain_suffix = signal_side if signal_side in {"LONG", "SHORT"} else "NONE" surface_chain = f"GATE-USDT {bar} {result.signal_level} {chain_suffix}" skip_surface_alert = dedupe_h > 0 and await self.storage.has_recent_alert( sym, chain=surface_chain, within_hours=dedupe_h ) if not skip_surface_alert: symbol_4h_status = "横盘" symbol_side_ok = False push_time_ok = True vol_rank_ok = True rank_max = int(getattr(self.settings.monitor, "wecom_push_max_volume_rank", 0) or 0) if result.signal_level == "TRIGGER": try: sym_4h_rows = await self.gate.get_candles(inst, "4H", limit=120) symbol_4h_status = self._status_by_ema55(sym_4h_rows) except Exception as exc: # noqa: BLE001 await self.storage.add_log("WARN", f"{sym} 4h_status_failed: {exc}") symbol_side_ok = (signal_side == "LONG" and symbol_4h_status == "多头") or ( signal_side == "SHORT" and symbol_4h_status == "空头" ) push_time_ok = self._within_push_window_utc8(push_window_enabled) if rank_max > 0: if vol_rank_map: rnk = int(vol_rank_map.get(inst, 999)) vol_rank_ok = 1 <= rnk <= rank_max else: vol_rank_ok = False btc_env_ok = self._push_matches_btc_env(btc_env_8h_15m, signal_side) strict_push_ok = bool( push_allowed and symbol_side_ok and push_time_ok and vol_rank_ok and btc_env_ok ) push_reason = "trigger_pushed" if result.signal_level == "TRIGGER" and not strict_push_ok: reasons: list[str] = [] if not btc_env_ok: reasons.append("btc_env_8h_15m_direction_mismatch") if not symbol_side_ok: reasons.append("symbol_4h_not_aligned") if not push_time_ok: reasons.append("outside_push_time_window") if not vol_rank_ok: reasons.append("volume_rank_outside_top_n") push_reason = ",".join(reasons) if reasons else "filtered_by_rules" await self.storage.add_alert( symbol=sym, venue=surface_chain, trigger_types=result.trigger_types, score=result.score, details={ "metrics": result.metrics, "instId": inst, "signal_level": result.signal_level, "signal_side": signal_side, "btc_bias_5m": btc_bias_5m, "push_allowed": push_allowed, "btc_env_8h_15m": btc_env_8h_15m, "btc_env_ok": btc_env_ok, "symbol_4h_status": symbol_4h_status, "push_time_ok": push_time_ok, "vol_rank_ok": vol_rank_ok, "strict_push_ok": strict_push_ok, "push_block_reason": push_reason, }, ) if result.signal_level == "TRIGGER": if strict_push_ok: push_metrics = dict(result.metrics) push_metrics["signal_side"] = signal_side push_metrics["btc_bias"] = btc_bias_5m push_metrics["btc_env_8h_15m"] = btc_env_8h_15m push_metrics["symbol_4h_status"] = symbol_4h_status push_metrics["est_quote_vol_24h_usdt"] = est_vol push_metrics["est_quote_vol_rank"] = int(vol_rank_map.get(inst, 0)) push_metrics["est_quote_vol_rank_total"] = int(vol_rank_total) push_metrics["stop_buffer_pct"] = stop_buffer_pct try: await self.notifier.send_breakout_alert( symbol=sym, bar=bar, inst_id=inst, trigger_types=result.trigger_types, metrics=push_metrics, ) except Exception as exc: # noqa: BLE001 await self.storage.add_log("ERROR", f"wecom_push_failed {sym}: {exc}") else: self.state.pushed_alerts_count += 1 await self._maybe_forward_order_executor(sym, inst, push_metrics) else: await self.storage.add_log( "INFO", ( f"signal_blocked sym={sym} side={signal_side} btc_bias={btc_bias_5m} " f"btc_env_8h_15m={btc_env_8h_15m} btc_env_ok={btc_env_ok} " f"sym_4h={symbol_4h_status} symbol_side_ok={symbol_side_ok} " f"push_time_ok={push_time_ok} vol_rank_ok={vol_rank_ok} " f"rank={vol_rank_map.get(inst, 0)}/{rank_max}" ), ) await self.storage.add_log( "WARN", ( f"signal={result.signal_level} side={signal_side} {sym} bar={bar} " f"push_allowed={push_allowed} triggers={','.join(result.trigger_types)}" ), ) else: await self.storage.add_log( "INFO", f"signal_dedupe_skip sym={sym} chain={surface_chain} within_h={dedupe_h}", ) await asyncio.sleep(0.08) if not self.settings.gemma.enabled: self.state.last_funnel = [] self.state.gemma_cycle_msg = "gemma_disabled" elif not self.gemma_client: self.state.last_funnel = [] self.state.gemma_cycle_msg = "gemma_client_none" elif not funnel_candidates: self.state.last_funnel = [] self.state.gemma_cycle_msg = "no_funnel_candidates" else: self.state.gemma_cycle_msg = "funnel_pending" await self.storage.add_log( "INFO", f"cycle_scan_done monitored={len(watch_insts)} funnel_candidates={len(funnel_candidates)}", ) return funnel_candidates async def _run_gemma_funnel(self, candidates: list[dict]) -> None: assert self.gemma_client is not None cfg = self.settings.gemma candidates.sort(key=lambda x: float(x.get("est_vol") or 0.0), reverse=True) take = candidates[: max(1, cfg.max_funnel_per_cycle)] out: list[dict] = [] dedupe_h = float(self.settings.monitor.symbol_signal_dedupe_hours) for i, c in enumerate(take): sym = str(c["symbol"]) inst = str(c["inst"]) est = float(c.get("est_vol") or 0.0) if dedupe_h > 0 and await self.storage.has_recent_alert( sym, chain="FUNNEL-GEMMA", within_hours=dedupe_h ): await self.storage.add_log("INFO", f"funnel_dedupe_skip sym={sym} within_h={dedupe_h}") continue try: rows_1d = await self.gate.get_candles(inst, "1D", limit=80) except Exception as exc: # noqa: BLE001 await self.storage.add_log("WARN", f"funnel {sym} 1d_failed: {exc}") continue prog = build_daily_programmatic(rows_1d, est) subs = programmatic_scores(prog) prog_text = json.dumps({**prog, **subs}, ensure_ascii=False) ohlc_block = daily_ohlc_text_block(rows_1d) img_b64: str | None = None if cfg.send_chart_image and i < max(0, cfg.vision_top_n): img_b64 = daily_candles_png_base64(rows_1d, sym) try: gemma_out = await self.gemma_client.rank_funnel(sym, prog_text, ohlc_block, img_b64) except Exception as exc: # noqa: BLE001 await self.storage.add_log("ERROR", f"gemma_ollama_failed {sym}: {exc}") gemma_out = { "daily_structure": "weak", "volume_view": "low", "upside_space": "low", "mid_resistance": "high", "priority": 1, "one_liner": f"Ollama 调用失败: {exc}", "error": str(exc), } pri = float(gemma_out.get("priority", 1)) comp = composite_score(pri, subs) signal_side = str(c.get("signal_side") or "NONE") btc_bias_5m = str(c.get("btc_bias_5m") or "NEUTRAL") btc_env_8h_15m = str(c.get("btc_env_8h_15m") or "横盘") btc_env_ok = self._push_matches_btc_env(btc_env_8h_15m, signal_side) threshold_ok = pri >= cfg.gemma_push_priority_min or comp >= cfg.composite_push_min rank_max_f = int(getattr(self.settings.monitor, "wecom_push_max_volume_rank", 0) or 0) vol_rank_ok_f = True if rank_max_f > 0: rnk_f = int(c.get("est_vol_rank") or 999) vol_rank_ok_f = 1 <= rnk_f <= rank_max_f should_push = btc_env_ok and threshold_ok and vol_rank_ok_f gemma_clean = {k: v for k, v in gemma_out.items() if k not in {"raw", "error"}} details: dict = { "source": "gemma_funnel", "underlying_signal": c.get("signal_level"), "signal_side": signal_side, "btc_bias_5m": btc_bias_5m, "btc_env_8h_15m": btc_env_8h_15m, "gemma": gemma_clean, "programmatic": prog, "programmatic_subscores": subs, "composite_score": comp, "priority_push": should_push, "priority_threshold_ok": threshold_ok, "btc_env_ok": btc_env_ok, "volume_rank_ok": vol_rank_ok_f, "instId": inst, "image_sent": bool(img_b64), "intraday_signal_metrics": c.get("intraday_metrics"), } if gemma_out.get("error"): details["gemma_error"] = str(gemma_out.get("error"))[:500] raw_snip = gemma_out.get("raw") if isinstance(raw_snip, str) and raw_snip: details["gemma_raw_snip"] = raw_snip[:800] await self.storage.add_alert( symbol=sym, venue="FUNNEL-GEMMA", trigger_types=["漏斗", f"P{int(pri)}", str(gemma_out.get("daily_structure", "?"))], score=comp, details=details, ) if should_push: try: await self.notifier.send_funnel_priority( symbol=sym, inst_id=inst, composite_score=comp, gemma=gemma_clean, programmatic=prog, ) self.state.pushed_alerts_count += 1 await self.storage.add_log( "WARN", f"funnel_priority_push {sym} composite={comp} priority={pri}", ) except Exception as exc: # noqa: BLE001 await self.storage.add_log("ERROR", f"funnel_wecom_push_failed {sym}: {exc}") out.append( { "symbol": sym, "composite_score": comp, "gemma_priority": pri, "signal_side": signal_side, "btc_bias_5m": btc_bias_5m, "pushed": should_push, "one_liner": gemma_clean.get("one_liner", ""), } ) await asyncio.sleep(0.35) out.sort(key=lambda x: float(x.get("composite_score") or 0.0), reverse=True) msg = f"funnel_ranked={len(out)}" async with self._lock: self.state.last_funnel = out[:40] self.state.last_funnel_at = datetime.now(timezone.utc).isoformat() self.state.gemma_cycle_msg = msg await self.storage.add_log("INFO", msg) async def _load_intraday_params(self) -> IntradayRuleParams: range_hours = _as_float(await self.storage.get_kv("intraday_range_hours"), 24.0) range_max_pct = _as_float(await self.storage.get_kv("intraday_range_max_pct"), 1.5) volume_spike_mult = _as_float(await self.storage.get_kv("intraday_volume_spike_mult"), 1.6) volume_lookback_bars = int(_as_float(await self.storage.get_kv("intraday_volume_lookback_bars"), 20)) breakout_buffer_pct = _as_float(await self.storage.get_kv("intraday_breakout_buffer_pct"), 0.05) return IntradayRuleParams( range_hours=max(1.0, range_hours), range_max_pct=max(0.1, range_max_pct), volume_spike_mult=max(1.0, volume_spike_mult), volume_lookback_bars=max(5, volume_lookback_bars), breakout_buffer_pct=max(0.0, breakout_buffer_pct), ) def _as_float(raw: str | None, default: float) -> float: try: return float(raw) if raw is not None else default except (TypeError, ValueError): return default def _as_bool(raw: str | None, default: bool) -> bool: if raw is None: return default return str(raw).strip().lower() in {"1", "true", "yes", "y", "on"}