"""关键位 5m 硬门控(逻辑自 crypto_monitor_gate _key_hard_checks,使用 Gate K 线)。""" from __future__ import annotations from statistics import mean from .candle_rows import field_float def _row_vol(item: list[str]) -> float: v = field_float(item, 5) return float(v) if v is not None else 0.0 def key_hard_checks_from_rows( rows: list[list[str]], *, direction: str, upper: float, lower: float, volume_ma_bars: int = 20, volume_ratio_min: float = 1.3, breakout_amp_min_pct: float = 0.03, breakout_amp_max_pct: float = 0.5, volume_rank: int | None = None, volume_rank_total: int = 0, volume_rank_max: int = 30, ) -> dict: """ rows:Gate K 线行(时间正序),最后一根应为最近一根已闭合 5m。 突破 K / 确认 K 固定为 rows[-2] / rows[-1](与 crypto_monitor_gate 一致)。 """ out: dict = {"ok": False} vol_lb = max(5, int(volume_ma_bars)) min_need = vol_lb + 3 if len(rows) < min_need: out["reason"] = f"insufficient_candles have={len(rows)} need>={min_need}" return out br_row = rows[-2] cf_row = rows[-1] open_b = field_float(br_row, 1) breakout_high = field_float(br_row, 2) breakout_low = field_float(br_row, 3) breakout_close = field_float(br_row, 4) confirm_close = field_float(cf_row, 4) if None in (open_b, breakout_high, breakout_low, breakout_close, confirm_close): out["reason"] = "invalid_breakout_or_confirm_ohlc" return out open_b = float(open_b) breakout_high = float(breakout_high) breakout_low = float(breakout_low) breakout_close = float(breakout_close) confirm_close = float(confirm_close) hist = rows[-vol_lb - 2 : -2] prev_vol = [_row_vol(x) for x in hist if _row_vol(x) > 0] avg20 = mean(prev_vol) if prev_vol else 0.0 vol_break = _row_vol(br_row) vol_ok = vol_break > avg20 * volume_ratio_min if avg20 > 0 else False direction = (direction or "long").strip().lower() edge = float(upper) if direction == "long" else float(lower) edge_ref = edge if edge > 0 else 1.0 if direction == "long": breach_pct = (breakout_close - edge) / edge_ref * 100.0 if breakout_close > edge else 0.0 breakout_ok = breakout_close > float(upper) confirm_ok = confirm_close > edge else: breach_pct = (edge - breakout_close) / edge_ref * 100.0 if breakout_close < edge else 0.0 breakout_ok = breakout_close < float(lower) confirm_ok = confirm_close < edge body_pct = abs(breakout_close - open_b) / open_b * 100.0 if open_b > 0 else 0.0 amp_ok = ( breakout_ok and breach_pct > breakout_amp_min_pct and breach_pct < breakout_amp_max_pct ) confirm_ok = confirm_ok and breakout_ok rank_ok = (volume_rank is not None) and (int(volume_rank) <= volume_rank_max) try: seg = rows[-48:] if len(rows) >= 48 else rows hh = max(float(x[2]) for x in seg if field_float(x, 2) is not None) ll = min(float(x[3]) for x in seg if field_float(x, 3) is not None) swing4h_pct = ((hh - ll) / ll * 100.0) if ll > 0 else 0.0 except Exception: swing4h_pct = 0.0 out.update( { "ok": all([vol_ok, amp_ok, breakout_ok, confirm_ok, rank_ok]), "vol_ok": vol_ok, "avg20": avg20, "vol_break": vol_break, "amp_ok": amp_ok, "amp_pct": breach_pct, "breach_pct": breach_pct, "body_pct": body_pct, "breakout_ok": breakout_ok, "breakout_close": breakout_close, "confirm_ok": confirm_ok, "confirm_close": confirm_close, "edge_price": edge, "rank": volume_rank, "rank_total": volume_rank_total, "rank_ok": rank_ok, "breakout_high": breakout_high, "breakout_low": breakout_low, "breakout_open": open_b, "direction": direction, "swing4h_pct": swing4h_pct, "amp_min_pct": breakout_amp_min_pct, "amp_max_pct": breakout_amp_max_pct, } ) return out def key_hard_lines_from_checks(checks: dict, *, volume_ratio_min: float) -> list[str]: breach = float(checks.get("breach_pct") if checks.get("breach_pct") is not None else checks.get("amp_pct") or 0) body = float(checks.get("body_pct") or 0) amp_min = float(checks.get("amp_min_pct") or 0.03) amp_max = float(checks.get("amp_max_pct") or 0.5) br_hi = checks.get("breakout_high") br_lo = checks.get("breakout_low") return [ f"量能:{'通过' if checks.get('vol_ok') else '不通过'}(突破K量 {round(float(checks.get('vol_break') or 0), 4)} / 前20均量 {round(float(checks.get('avg20') or 0), 4)},阈值{volume_ratio_min:g}x)", f"突破价位:{'通过' if checks.get('breakout_ok') else '不通过'}(突破K收盘 {round(float(checks.get('breakout_close') or 0), 8)},关键位 {checks.get('edge_price')})", ( f"突破越过关键位:{'通过' if checks.get('amp_ok') else '不通过'}" f"(越过 {round(breach, 4)}%,K线实体 {round(body, 4)}%,要求越过 {amp_min:g}%~{amp_max:g}%)" ), f"第二根确认:{'通过' if checks.get('confirm_ok') else '不通过'}(确认收盘 {checks.get('confirm_close')},关键位 {checks.get('edge_price')})", f"日成交量排名:{'通过' if checks.get('rank_ok') else '不通过'}({checks.get('rank')}/{checks.get('rank_total')},要求前30)", f"突破K极值:高 {br_hi}|低 {br_lo}(空→高点+外扩%|多→低点−外扩%)", ]