Files
2026-05-23 17:18:38 +08:00

779 lines
34 KiB
Python

from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from .btc_regime import evaluate_btc_daily_gate
from .chart_candles import daily_candles_png_base64
from .config import Settings, WatchSymbol
from .daily_features import (
build_daily_programmatic,
composite_score,
daily_ohlc_text_block,
programmatic_scores,
)
from .exchange_rules import IntradayRuleParams, evaluate_exchange
from .notifier import WeComNotifier
from .order_executor_forward import build_order_executor_payload, forward_signal_to_executors
from .order_executors_store import read_forward_config, record_last_forward
from .gate import GateClient
from .storage import Storage
if TYPE_CHECKING:
from .gemma_client import OllamaGemmaClient
LOGGER = logging.getLogger("onchain_scout.monitor")
FIXED_BAR = "5m"
# 最近 8 墙钟小时 ≈ 32 根 15m K
BTC_15M_BARS_PER_8H = 32
@dataclass
class RuntimeState:
started_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
last_cycle_at: str = ""
last_cycle_status: str = "INIT"
last_cycle_msg: str = ""
chart_bar: str = FIXED_BAR
universe: str = "all_swaps"
intraday_params: dict = field(default_factory=dict)
monitoring_pool: list[dict] = field(default_factory=list)
perpetual_symbols_count: int = 0
monitored_inst_count: int = 0
pushed_alerts_count: int = 0
btc_gate_allow: bool = True
btc_gate_regime: str = ""
btc_gate_reason: str = ""
btc_gate_metrics: dict = field(default_factory=dict)
btc_env_8h_15m: str = ""
symbol_blocklist_count: int = 0
symbol_blocklist_removed: int = 0
last_funnel: list[dict] = field(default_factory=list)
last_funnel_at: str = ""
gemma_cycle_msg: str = ""
class MonitorService:
def __init__(
self,
settings: Settings,
storage: Storage,
gate_client: GateClient,
notifier: WeComNotifier,
gemma_client: OllamaGemmaClient | None = None,
) -> None:
self.settings = settings
self.storage = storage
self.gate = gate_client
self.notifier = notifier
self.gemma_client = gemma_client
self.state = RuntimeState()
self._lock = asyncio.Lock()
self._funnel_bg_task: asyncio.Task[None] | None = None
@staticmethod
def _symbol_blocklist_from_kv(raw: str | None) -> frozenset[str]:
if not raw or not str(raw).strip():
return frozenset()
try:
data = json.loads(raw)
except json.JSONDecodeError:
return frozenset()
if not isinstance(data, list):
return frozenset()
out: set[str] = set()
for x in data:
s = str(x).strip().upper()
if s:
out.add(s)
return frozenset(out)
async def _maybe_forward_order_executor(self, sym: str, inst: str, push_metrics: dict) -> None:
if not self.settings.key_monitor.auto_scan_forward_executor:
return
fwd = read_forward_config(self.settings)
if not fwd.get("enabled"):
return
secret = str(fwd.get("webhook_secret") or "").strip()
if not secret:
await self.storage.add_log(
"WARN",
"order_executor.enabled=true but webhook_secret is empty; skip POST /v1/signal",
)
return
targets = list(fwd.get("executors") or [])
if not targets:
await self.storage.add_log(
"WARN",
"order_executor.enabled=true but no enabled executor in panel list; skip POST /v1/signal",
)
return
payload = build_order_executor_payload(inst_id=inst, metrics=push_metrics)
if not payload:
await self.storage.add_log(
"WARN",
f"order_executor_build_payload_failed sym={sym} inst={inst}",
)
return
try:
results = await forward_signal_to_executors(
self.settings,
executors=targets,
webhook_secret=secret,
timeout_seconds=float(fwd.get("timeout_seconds") or 15.0),
payload=payload,
)
except Exception as exc: # noqa: BLE001
await self.storage.add_log(
"ERROR",
f"order_executor_forward_exception sym={sym} inst={inst}: {exc}",
)
return
for out in results:
name = out.get("name") or "?"
eid = str(out.get("executor_id") or "")
status = out.get("http_status")
ok = out.get("ok")
body = out.get("body") or {}
st = body.get("status") if isinstance(body, dict) else None
detail = out.get("error")
if not detail and isinstance(body, dict):
detail = body.get("reason") or body.get("detail")
try:
record_last_forward(
self.settings,
eid,
http_status=int(status or 0),
ok=bool(ok),
exec_status=str(st) if st is not None else None,
detail=str(detail) if detail is not None else None,
)
except Exception: # noqa: BLE001
pass
if ok:
await self.storage.add_log(
"INFO",
f"order_executor_ok name={name} sym={sym} inst={inst} http={status} exec_status={st}",
)
else:
await self.storage.add_log(
"ERROR",
f"order_executor_failed name={name} sym={sym} inst={inst} http={status} body={body!r} err={detail}",
)
@staticmethod
def _btc_intraday_bias(btc_rows: list[list[str]]) -> str:
closes: list[float] = []
for row in btc_rows:
if len(row) < 5:
continue
try:
closes.append(float(row[4]))
except (TypeError, ValueError):
continue
if len(closes) < 2:
return "NEUTRAL"
if closes[-1] > closes[-2]:
return "BULL"
if closes[-1] < closes[-2]:
return "BEAR"
return "NEUTRAL"
@staticmethod
def _ema(values: list[float], period: int) -> float:
if not values:
return 0.0
p = max(1, period)
alpha = 2.0 / (p + 1.0)
ema = values[0]
for v in values[1:]:
ema = alpha * v + (1 - alpha) * ema
return ema
@staticmethod
def _status_by_ema55(rows: list[list[str]]) -> str:
closes: list[float] = []
highs: list[float] = []
lows: list[float] = []
for row in rows:
if len(row) < 5:
continue
try:
highs.append(float(row[2]))
lows.append(float(row[3]))
closes.append(float(row[4]))
except (TypeError, ValueError):
continue
if len(closes) < 55:
return "横盘"
ema55 = MonitorService._ema(closes[-120:], 55)
last = closes[-1]
lb = min(21, len(closes))
h = max(highs[-lb:])
l = min(lows[-lb:])
mid = (h + l) / 2 if h > l else 0.0
range_pct = ((h - l) / mid * 100.0) if mid > 0 else 999.0
if range_pct <= 2.0:
return "横盘"
if last >= ema55:
return "多头"
return "空头"
@staticmethod
def _btc_env_15m_last_8h(rows_15m: list[list[str]]) -> str:
"""最近 8 小时内 BTC 15m 走势:多头 / 空头 / 横盘(窄幅视为横盘)。"""
closes: list[float] = []
highs: list[float] = []
lows: list[float] = []
for row in rows_15m:
if len(row) < 5:
continue
try:
highs.append(float(row[2]))
lows.append(float(row[3]))
closes.append(float(row[4]))
except (TypeError, ValueError):
continue
need = BTC_15M_BARS_PER_8H
if len(closes) < need:
return "横盘"
h_win = max(highs[-need:])
l_win = min(lows[-need:])
mid = (h_win + l_win) / 2 if h_win > l_win else 0.0
range_pct = ((h_win - l_win) / mid * 100.0) if mid > 0 else 999.0
if range_pct <= 1.8:
return "横盘"
warmup = min(len(closes), 96)
seq = closes[-warmup:]
if len(seq) < 21:
return "横盘"
ema21 = MonitorService._ema(seq, 21)
last = closes[-1]
if last >= ema21:
return "多头"
return "空头"
@staticmethod
def _push_matches_btc_env(btc_env: str, signal_side: str) -> bool:
if signal_side not in {"LONG", "SHORT"}:
return False
if btc_env == "横盘":
return True
if btc_env == "多头":
return signal_side == "LONG"
if btc_env == "空头":
return signal_side == "SHORT"
return False
@staticmethod
def _within_push_window_utc8(enabled: bool) -> bool:
if not enabled:
return True
now_utc = datetime.now(timezone.utc)
bj_hour = (now_utc.hour + 8) % 24
return 9 <= bj_hour < 23
async def run_cycle(self) -> None:
funnel_candidates: list[dict] = []
async with self._lock:
try:
funnel_candidates = await self._run_cycle_inner()
self.state.last_cycle_status = "OK"
self.state.last_cycle_msg = "cycle_completed"
except Exception as exc: # noqa: BLE001
msg = f"cycle_failed: {exc}"
self.state.last_cycle_status = "ERROR"
self.state.last_cycle_msg = msg
LOGGER.exception(msg)
await self.storage.add_log("ERROR", msg)
funnel_candidates = []
finally:
# 本轮扫描结束后刷新,避免 HUD「LAST」与墙钟脱节(原先在周期开始时写入)
self.state.last_cycle_at = datetime.now(timezone.utc).isoformat()
if self.gemma_client and self.settings.gemma.enabled and funnel_candidates:
if self._funnel_bg_task is not None and not self._funnel_bg_task.done():
await self.storage.add_log(
"WARN",
f"funnel_skipped_previous_still_running candidates={len(funnel_candidates)}",
)
else:
self._funnel_bg_task = asyncio.create_task(
self._run_gemma_funnel_safe(funnel_candidates),
name="gemma_funnel",
)
async def _run_gemma_funnel_safe(self, candidates: list[dict]) -> None:
try:
await self._run_gemma_funnel(candidates)
except Exception as exc: # noqa: BLE001
msg = f"funnel_failed: {exc}"
LOGGER.exception(msg)
await self.storage.add_log("ERROR", msg)
async with self._lock:
self.state.gemma_cycle_msg = f"funnel_failed: {exc!s}"[:500]
async def _run_cycle_inner(self) -> list[dict]:
bar = FIXED_BAR
self.state.chart_bar = bar
universe = self.settings.monitor.universe
self.state.universe = universe
rule_params = await self._load_intraday_params()
stop_buffer_pct = _as_float(await self.storage.get_kv("intraday_stop_buffer_pct"), 0.2)
stop_buffer_pct = max(0.0, min(stop_buffer_pct, 10.0))
self.state.intraday_params = {
"range_hours": rule_params.range_hours,
"range_max_pct": rule_params.range_max_pct,
"volume_spike_mult": rule_params.volume_spike_mult,
"volume_lookback_bars": rule_params.volume_lookback_bars,
"breakout_buffer_pct": rule_params.breakout_buffer_pct,
"stop_buffer_pct": stop_buffer_pct,
}
blocklist = self._symbol_blocklist_from_kv(await self.storage.get_kv("monitor_symbol_blocklist"))
self.state.symbol_blocklist_count = len(blocklist)
self.state.symbol_blocklist_removed = 0
listed_bases = await self.gate.get_perpetual_symbols()
self.state.perpetual_symbols_count = len(listed_bases)
min_vol = float(self.settings.monitor.min_24h_quote_volume_usdt)
vol_map: dict[str, float] = {}
watch_insts: list[str] = []
if universe == "all_swaps":
if min_vol <= 0:
await self.storage.add_log(
"ERROR",
"all_swaps requires monitor.min_24h_quote_volume_usdt > 0; skipping cycle",
)
self.state.monitoring_pool = []
self.state.monitored_inst_count = 0
return []
vol_map = await self.gate.get_usdt_swap_est_quote_volume_map()
all_ids = await self.gate.list_live_usdt_swap_inst_ids()
watch_insts = [i for i in all_ids if vol_map.get(i, 0.0) >= min_vol]
await self.storage.add_log(
"INFO",
f"universe=all_swaps bar={bar} min_usdt={min_vol:.0f} pool={len(watch_insts)}/{len(all_ids)}",
)
else:
watchlist = [w for w in self.settings.watch_symbols if w.symbol.upper() in listed_bases]
if min_vol > 0:
vol_map = await self.gate.get_usdt_swap_est_quote_volume_map()
before = len(watchlist)
kept: list[WatchSymbol] = []
for w in watchlist:
inst = self.gate.symbol_to_swap_inst_id(w.symbol)
est = vol_map.get(inst, 0.0)
if est >= min_vol:
kept.append(w)
else:
await self.storage.add_log(
"INFO",
f"{w.symbol.upper()} skipped_24h_vol est_usdt={est:.0f} < min={min_vol:.0f}",
)
watchlist = kept
await self.storage.add_log(
"INFO",
f"universe=watchlist volume_filter min_usdt={min_vol:.0f} kept={len(watchlist)}/{before}",
)
watch_insts = [self.gate.symbol_to_swap_inst_id(w.symbol) for w in watchlist]
await self.storage.add_log(
"INFO",
f"universe=watchlist bar={bar} pool={len(watch_insts)} gate_bases={len(listed_bases)}",
)
if blocklist:
before_bl = len(watch_insts)
watch_insts = [
i for i in watch_insts if self.gate.inst_id_to_base_symbol(i) not in blocklist
]
removed = before_bl - len(watch_insts)
self.state.symbol_blocklist_removed = removed
if removed:
await self.storage.add_log(
"INFO",
f"symbol_blocklist removed={removed} pool_now={len(watch_insts)} rules={len(blocklist)}",
)
self.state.monitored_inst_count = len(watch_insts)
push_window_enabled = _as_bool(await self.storage.get_kv("intraday_push_time_window_enabled"), True)
vol_rank_map: dict[str, int] = {}
vol_rank_total = len(watch_insts)
if vol_map and watch_insts:
sorted_insts = sorted(watch_insts, key=lambda x: float(vol_map.get(x, 0.0)), reverse=True)
vol_rank_map = {inst_id: idx + 1 for idx, inst_id in enumerate(sorted_insts)}
self.state.monitoring_pool = []
for inst in watch_insts:
sym = self.gate.inst_id_to_base_symbol(inst)
entry: dict = {"symbol": sym, "instId": inst}
if vol_map:
entry["est_quote_vol_24h_usdt"] = round(vol_map.get(inst, 0.0), 2)
self.state.monitoring_pool.append(entry)
btc_inst = self.gate.symbol_to_swap_inst_id("BTC")
if self.settings.monitor.btc_daily_gate_enabled:
btc_1d = await self.gate.get_candles(btc_inst, "1D", limit=60)
gate = evaluate_btc_daily_gate(
btc_1d,
sideways_lookback_days=self.settings.monitor.btc_sideways_lookback_days,
sideways_max_range_pct=self.settings.monitor.btc_sideways_max_range_pct,
)
self.state.btc_gate_allow = gate.allow_alt_scan
self.state.btc_gate_regime = gate.regime
self.state.btc_gate_reason = gate.reason
self.state.btc_gate_metrics = dict(gate.metrics)
if not gate.allow_alt_scan:
await self.storage.add_log(
"INFO",
(
f"btc_daily_gate regime={gate.regime} reason={gate.reason} "
f"(informational only; scan continues) metrics={gate.metrics}"
),
)
else:
self.state.btc_gate_allow = True
self.state.btc_gate_regime = "disabled"
self.state.btc_gate_reason = "btc_daily_gate_enabled=false"
self.state.btc_gate_metrics = {}
btc_rows = await self.gate.get_candles(btc_inst, bar, limit=120)
btc_bias_5m = self._btc_intraday_bias(btc_rows)
btc_15m_rows = await self.gate.get_candles(btc_inst, "15m", limit=120)
btc_env_8h_15m = self._btc_env_15m_last_8h(btc_15m_rows)
self.state.btc_env_8h_15m = btc_env_8h_15m
await self.storage.add_log(
"INFO",
f"btc_intraday_bias_5m={btc_bias_5m} btc_env_8h_15m={btc_env_8h_15m}",
)
funnel_candidates: list[dict] = []
for inst in watch_insts:
sym = self.gate.inst_id_to_base_symbol(inst)
try:
alt_rows = await self.gate.get_candles(inst, bar, limit=120)
except Exception as exc: # noqa: BLE001
await self.storage.add_log("WARN", f"{sym} candles_failed: {exc}")
continue
try:
result = evaluate_exchange(sym, alt_rows, btc_rows, rule_params)
except Exception as exc: # noqa: BLE001
await self.storage.add_log("WARN", f"{sym} evaluate_failed: {exc}")
continue
if result.signal_level in {"WATCH", "TRIGGER"}:
est_vol = float(vol_map.get(inst, 0.0)) if vol_map else 0.0
signal_side = str((result.metrics or {}).get("signal_side") or result.signal_side or "NONE")
push_allowed = result.signal_level == "TRIGGER"
funnel_candidates.append(
{
"symbol": sym,
"inst": inst,
"est_vol": est_vol,
"est_vol_rank": int(vol_rank_map.get(inst, 0)) if vol_rank_map else 0,
"est_vol_rank_total": int(vol_rank_total),
"signal_level": result.signal_level,
"signal_side": signal_side,
"btc_bias_5m": btc_bias_5m,
"push_allowed": push_allowed,
"btc_env_8h_15m": btc_env_8h_15m,
"intraday_metrics": dict(result.metrics),
}
)
dedupe_h = float(self.settings.monitor.symbol_signal_dedupe_hours)
chain_suffix = signal_side if signal_side in {"LONG", "SHORT"} else "NONE"
surface_chain = f"GATE-USDT {bar} {result.signal_level} {chain_suffix}"
skip_surface_alert = dedupe_h > 0 and await self.storage.has_recent_alert(
sym, chain=surface_chain, within_hours=dedupe_h
)
if not skip_surface_alert:
symbol_4h_status = "横盘"
symbol_side_ok = False
push_time_ok = True
vol_rank_ok = True
rank_max = int(getattr(self.settings.monitor, "wecom_push_max_volume_rank", 0) or 0)
if result.signal_level == "TRIGGER":
try:
sym_4h_rows = await self.gate.get_candles(inst, "4H", limit=120)
symbol_4h_status = self._status_by_ema55(sym_4h_rows)
except Exception as exc: # noqa: BLE001
await self.storage.add_log("WARN", f"{sym} 4h_status_failed: {exc}")
symbol_side_ok = (signal_side == "LONG" and symbol_4h_status == "多头") or (
signal_side == "SHORT" and symbol_4h_status == "空头"
)
push_time_ok = self._within_push_window_utc8(push_window_enabled)
if rank_max > 0:
if vol_rank_map:
rnk = int(vol_rank_map.get(inst, 999))
vol_rank_ok = 1 <= rnk <= rank_max
else:
vol_rank_ok = False
btc_env_ok = self._push_matches_btc_env(btc_env_8h_15m, signal_side)
strict_push_ok = bool(
push_allowed and symbol_side_ok and push_time_ok and vol_rank_ok and btc_env_ok
)
push_reason = "trigger_pushed"
if result.signal_level == "TRIGGER" and not strict_push_ok:
reasons: list[str] = []
if not btc_env_ok:
reasons.append("btc_env_8h_15m_direction_mismatch")
if not symbol_side_ok:
reasons.append("symbol_4h_not_aligned")
if not push_time_ok:
reasons.append("outside_push_time_window")
if not vol_rank_ok:
reasons.append("volume_rank_outside_top_n")
push_reason = ",".join(reasons) if reasons else "filtered_by_rules"
await self.storage.add_alert(
symbol=sym,
venue=surface_chain,
trigger_types=result.trigger_types,
score=result.score,
details={
"metrics": result.metrics,
"instId": inst,
"signal_level": result.signal_level,
"signal_side": signal_side,
"btc_bias_5m": btc_bias_5m,
"push_allowed": push_allowed,
"btc_env_8h_15m": btc_env_8h_15m,
"btc_env_ok": btc_env_ok,
"symbol_4h_status": symbol_4h_status,
"push_time_ok": push_time_ok,
"vol_rank_ok": vol_rank_ok,
"strict_push_ok": strict_push_ok,
"push_block_reason": push_reason,
},
)
if result.signal_level == "TRIGGER":
if strict_push_ok and self.settings.monitor.push_watch_trigger_wecom:
push_metrics = dict(result.metrics)
push_metrics["signal_side"] = signal_side
push_metrics["btc_bias"] = btc_bias_5m
push_metrics["btc_env_8h_15m"] = btc_env_8h_15m
push_metrics["symbol_4h_status"] = symbol_4h_status
push_metrics["est_quote_vol_24h_usdt"] = est_vol
push_metrics["est_quote_vol_rank"] = int(vol_rank_map.get(inst, 0))
push_metrics["est_quote_vol_rank_total"] = int(vol_rank_total)
push_metrics["stop_buffer_pct"] = stop_buffer_pct
try:
await self.notifier.send_breakout_alert(
symbol=sym,
bar=bar,
inst_id=inst,
trigger_types=result.trigger_types,
metrics=push_metrics,
)
except Exception as exc: # noqa: BLE001
await self.storage.add_log("ERROR", f"wecom_push_failed {sym}: {exc}")
else:
self.state.pushed_alerts_count += 1
await self._maybe_forward_order_executor(sym, inst, push_metrics)
else:
await self.storage.add_log(
"INFO",
(
f"signal_blocked sym={sym} side={signal_side} btc_bias={btc_bias_5m} "
f"btc_env_8h_15m={btc_env_8h_15m} btc_env_ok={btc_env_ok} "
f"sym_4h={symbol_4h_status} symbol_side_ok={symbol_side_ok} "
f"push_time_ok={push_time_ok} vol_rank_ok={vol_rank_ok} "
f"rank={vol_rank_map.get(inst, 0)}/{rank_max}"
),
)
await self.storage.add_log(
"WARN",
(
f"signal={result.signal_level} side={signal_side} {sym} bar={bar} "
f"push_allowed={push_allowed} triggers={','.join(result.trigger_types)}"
),
)
else:
await self.storage.add_log(
"INFO",
f"signal_dedupe_skip sym={sym} chain={surface_chain} within_h={dedupe_h}",
)
await asyncio.sleep(0.08)
if not self.settings.gemma.enabled:
self.state.last_funnel = []
self.state.gemma_cycle_msg = "gemma_disabled"
elif not self.gemma_client:
self.state.last_funnel = []
self.state.gemma_cycle_msg = "gemma_client_none"
elif not funnel_candidates:
self.state.last_funnel = []
self.state.gemma_cycle_msg = "no_funnel_candidates"
else:
self.state.gemma_cycle_msg = "funnel_pending"
await self.storage.add_log(
"INFO",
f"cycle_scan_done monitored={len(watch_insts)} funnel_candidates={len(funnel_candidates)}",
)
return funnel_candidates
async def _run_gemma_funnel(self, candidates: list[dict]) -> None:
assert self.gemma_client is not None
cfg = self.settings.gemma
candidates.sort(key=lambda x: float(x.get("est_vol") or 0.0), reverse=True)
take = candidates[: max(1, cfg.max_funnel_per_cycle)]
out: list[dict] = []
dedupe_h = float(self.settings.monitor.symbol_signal_dedupe_hours)
for i, c in enumerate(take):
sym = str(c["symbol"])
inst = str(c["inst"])
est = float(c.get("est_vol") or 0.0)
if dedupe_h > 0 and await self.storage.has_recent_alert(
sym, chain="FUNNEL-GEMMA", within_hours=dedupe_h
):
await self.storage.add_log("INFO", f"funnel_dedupe_skip sym={sym} within_h={dedupe_h}")
continue
try:
rows_1d = await self.gate.get_candles(inst, "1D", limit=80)
except Exception as exc: # noqa: BLE001
await self.storage.add_log("WARN", f"funnel {sym} 1d_failed: {exc}")
continue
prog = build_daily_programmatic(rows_1d, est)
subs = programmatic_scores(prog)
prog_text = json.dumps({**prog, **subs}, ensure_ascii=False)
ohlc_block = daily_ohlc_text_block(rows_1d)
img_b64: str | None = None
if cfg.send_chart_image and i < max(0, cfg.vision_top_n):
img_b64 = daily_candles_png_base64(rows_1d, sym)
try:
gemma_out = await self.gemma_client.rank_funnel(sym, prog_text, ohlc_block, img_b64)
except Exception as exc: # noqa: BLE001
await self.storage.add_log("ERROR", f"gemma_ollama_failed {sym}: {exc}")
gemma_out = {
"daily_structure": "weak",
"volume_view": "low",
"upside_space": "low",
"mid_resistance": "high",
"priority": 1,
"one_liner": f"Ollama 调用失败: {exc}",
"error": str(exc),
}
pri = float(gemma_out.get("priority", 1))
comp = composite_score(pri, subs)
signal_side = str(c.get("signal_side") or "NONE")
btc_bias_5m = str(c.get("btc_bias_5m") or "NEUTRAL")
btc_env_8h_15m = str(c.get("btc_env_8h_15m") or "横盘")
btc_env_ok = self._push_matches_btc_env(btc_env_8h_15m, signal_side)
threshold_ok = pri >= cfg.gemma_push_priority_min or comp >= cfg.composite_push_min
rank_max_f = int(getattr(self.settings.monitor, "wecom_push_max_volume_rank", 0) or 0)
vol_rank_ok_f = True
if rank_max_f > 0:
rnk_f = int(c.get("est_vol_rank") or 999)
vol_rank_ok_f = 1 <= rnk_f <= rank_max_f
should_push = btc_env_ok and threshold_ok and vol_rank_ok_f
gemma_clean = {k: v for k, v in gemma_out.items() if k not in {"raw", "error"}}
details: dict = {
"source": "gemma_funnel",
"underlying_signal": c.get("signal_level"),
"signal_side": signal_side,
"btc_bias_5m": btc_bias_5m,
"btc_env_8h_15m": btc_env_8h_15m,
"gemma": gemma_clean,
"programmatic": prog,
"programmatic_subscores": subs,
"composite_score": comp,
"priority_push": should_push,
"priority_threshold_ok": threshold_ok,
"btc_env_ok": btc_env_ok,
"volume_rank_ok": vol_rank_ok_f,
"instId": inst,
"image_sent": bool(img_b64),
"intraday_signal_metrics": c.get("intraday_metrics"),
}
if gemma_out.get("error"):
details["gemma_error"] = str(gemma_out.get("error"))[:500]
raw_snip = gemma_out.get("raw")
if isinstance(raw_snip, str) and raw_snip:
details["gemma_raw_snip"] = raw_snip[:800]
await self.storage.add_alert(
symbol=sym,
venue="FUNNEL-GEMMA",
trigger_types=["漏斗", f"P{int(pri)}", str(gemma_out.get("daily_structure", "?"))],
score=comp,
details=details,
)
if should_push:
try:
await self.notifier.send_funnel_priority(
symbol=sym,
inst_id=inst,
composite_score=comp,
gemma=gemma_clean,
programmatic=prog,
)
self.state.pushed_alerts_count += 1
await self.storage.add_log(
"WARN",
f"funnel_priority_push {sym} composite={comp} priority={pri}",
)
except Exception as exc: # noqa: BLE001
await self.storage.add_log("ERROR", f"funnel_wecom_push_failed {sym}: {exc}")
out.append(
{
"symbol": sym,
"composite_score": comp,
"gemma_priority": pri,
"signal_side": signal_side,
"btc_bias_5m": btc_bias_5m,
"pushed": should_push,
"one_liner": gemma_clean.get("one_liner", ""),
}
)
await asyncio.sleep(0.35)
out.sort(key=lambda x: float(x.get("composite_score") or 0.0), reverse=True)
msg = f"funnel_ranked={len(out)}"
async with self._lock:
self.state.last_funnel = out[:40]
self.state.last_funnel_at = datetime.now(timezone.utc).isoformat()
self.state.gemma_cycle_msg = msg
await self.storage.add_log("INFO", msg)
async def _load_intraday_params(self) -> IntradayRuleParams:
range_hours = _as_float(await self.storage.get_kv("intraday_range_hours"), 24.0)
range_max_pct = _as_float(await self.storage.get_kv("intraday_range_max_pct"), 1.5)
volume_spike_mult = _as_float(await self.storage.get_kv("intraday_volume_spike_mult"), 1.6)
volume_lookback_bars = int(_as_float(await self.storage.get_kv("intraday_volume_lookback_bars"), 20))
breakout_buffer_pct = _as_float(await self.storage.get_kv("intraday_breakout_buffer_pct"), 0.05)
return IntradayRuleParams(
range_hours=max(1.0, range_hours),
range_max_pct=max(0.1, range_max_pct),
volume_spike_mult=max(1.0, volume_spike_mult),
volume_lookback_bars=max(5, volume_lookback_bars),
breakout_buffer_pct=max(0.0, breakout_buffer_pct),
)
def _as_float(raw: str | None, default: float) -> float:
try:
return float(raw) if raw is not None else default
except (TypeError, ValueError):
return default
def _as_bool(raw: str | None, default: bool) -> bool:
if raw is None:
return default
return str(raw).strip().lower() in {"1", "true", "yes", "y", "on"}