""" 关键位监控:阻力/支撑双向提醒与箱体/收敛自动门控的共享逻辑。 """ from __future__ import annotations from datetime import datetime from typing import Any, Optional KEY_MONITOR_AUTO_TYPES = frozenset({"箱体突破", "收敛突破"}) KEY_MONITOR_RS_TYPES = frozenset({"关键阻力位", "关键支撑位"}) KEY_MONITOR_ALERT_ONLY_TYPES = KEY_MONITOR_RS_TYPES KEY_DIRECTION_WATCH = "watch" def calc_breakout_breach_pct(direction: str, close: float, upper: float, lower: float) -> float: """突破 K 收盘相对关键位的越过幅度(%)。未越过对应边界时返回 0。""" direction = (direction or "long").strip().lower() c = float(close) if direction == "long": u = float(upper) if u <= 0 or c <= u: return 0.0 return (c - u) / u * 100.0 lo = float(lower) if lo <= 0 or c >= lo: return 0.0 return (lo - c) / lo * 100.0 def auto_amp_ok( direction: str, close_b: float, upper: float, lower: float, min_pct: float, ) -> tuple[bool, float]: breach = calc_breakout_breach_pct(direction, close_b, upper, lower) return breach > float(min_pct), breach def auto_confirm_ok(direction: str, cfm_close: float, upper: float, lower: float) -> bool: """确认 K 收盘须在箱体外(不得回到 [lower, upper] 内)。""" direction = (direction or "long").strip().lower() c = float(cfm_close) if direction == "long": return c > float(upper) return c < float(lower) def detect_rs_box_break(close: float, upper: float, lower: float) -> Optional[dict[str, Any]]: """ 阻力/支撑人工盯盘:最近 5m 收盘突破上沿或下沿(严格 > / <)。 上沿优先:同一根 K 不可能同时满足两者。 """ u, lo, c = float(upper), float(lower), float(close) if c > u: return { "break_side": "upper", "direction": "long", "edge_price": u, "key_price": u, "break_label": "向上突破上沿", } if c < lo: return { "break_side": "lower", "direction": "short", "edge_price": lo, "key_price": lo, "break_label": "向下突破下沿", } return None def rs_break_from_direction(direction: str, upper: float, lower: float) -> Optional[dict[str, Any]]: """已触发后根据入库方向还原突破边(long=上沿,short=下沿)。""" d = (direction or "").strip().lower() if d == "long": return { "break_side": "upper", "direction": "long", "edge_price": float(upper), "key_price": float(upper), "break_label": "向上突破上沿", } if d == "short": return { "break_side": "lower", "direction": "short", "edge_price": float(lower), "key_price": float(lower), "break_label": "向下突破下沿", } return None def rs_break_infer_from_close(close: float, upper: float, lower: float) -> dict[str, Any]: """ 续发提醒时价格已回到箱体内:按收盘价相对箱体中线推断首次突破边, 保证第 2/3 次企业微信提醒仍能发出。 """ mid = (float(upper) + float(lower)) / 2.0 if float(close) >= mid: br = rs_break_from_direction("long", upper, lower) else: br = rs_break_from_direction("short", upper, lower) if br: return br return { "break_side": "upper", "direction": "long", "edge_price": float(upper), "key_price": float(upper), "break_label": "向上突破上沿", } def _parse_notify_datetime(raw: Optional[str]) -> Optional[datetime]: s = str(raw or "").strip() if not s: return None try: dt = datetime.fromisoformat(s.replace("Z", "+00:00")) if dt.tzinfo is not None: dt = dt.replace(tzinfo=None) return dt except Exception: pass for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(s[:19], fmt) except Exception: continue return None def claim_rs_level_notify( conn: Any, monitor_id: int, notify_index: int, direction: str, notified_at: str, bar_ts: Optional[int], *, prior_count: Optional[int] = None, ) -> bool: """ 原子占位:仅在 notification_count 仍为 prior_count 时推进到 notify_index。 须在发送企业微信之前调用并 commit,避免 (2/3) 重复刷屏。 """ prior = int(prior_count if prior_count is not None else notify_index - 1) if prior < 0 or notify_index != prior + 1: return False bar_val: Optional[int] = None if bar_ts is not None: try: bar_val = int(bar_ts) except (TypeError, ValueError): bar_val = None cur = conn.execute( "UPDATE key_monitors SET notification_count=?, direction=?, last_notified_at=?, last_rs_bar_ts=? " "WHERE id=? AND COALESCE(notification_count,0)=?", (notify_index, direction, notified_at, bar_val, int(monitor_id), prior), ) return int(cur.rowcount or 0) > 0 def parse_last_rs_bar_ts(row: Any) -> Optional[int]: if row is None: return None try: keys = row.keys() if hasattr(row, "keys") else [] except Exception: keys = [] raw = row["last_rs_bar_ts"] if "last_rs_bar_ts" in keys else None if raw is None: return None try: return int(raw) except (TypeError, ValueError): return None def run_rs_level_alert_tick( row: Any, close: float, bar_ts: Optional[int], now_dt: datetime, *, default_max_notify: int, default_interval_min: int, ) -> Optional[dict[str, Any]]: """ 判定本轮回合是否应推送阻力/支撑提醒。 首条:仅在新闭合 K 越线时触发;发送前须 claim_rs_level_notify 占位防轮询/多进程重复。 """ up, lo = float(row["upper"]), float(row["lower"]) if up <= lo: return None count = int(row["notification_count"] or 0) max_n = max(1, int(row["max_notify"] or default_max_notify)) interval = max(1, int(row["notify_interval_min"] or default_interval_min)) if count >= max_n: return None bar_ts_i: Optional[int] = None if bar_ts is not None: try: bar_ts_i = int(bar_ts) except (TypeError, ValueError): bar_ts_i = None last_bar_i = parse_last_rs_bar_ts(row) if count == 0: br = detect_rs_box_break(close, up, lo) if not br: return None if bar_ts_i is not None and last_bar_i is not None and bar_ts_i == last_bar_i: return None return { "break_info": br, "notify_index": 1, "prior_count": 0, "notify_max": max_n, "interval_min": interval, "bar_ts": bar_ts_i, } if not notify_interval_elapsed(row["last_notified_at"], interval, now_dt): return None br = resolve_rs_break_for_alert(count, row["direction"], close, up, lo) if not br: return None return { "break_info": br, "notify_index": count + 1, "prior_count": count, "notify_max": max_n, "interval_min": interval, "bar_ts": bar_ts_i, } def resolve_rs_break_for_alert( notification_count: int, direction: Optional[str], close: float, upper: float, lower: float, ) -> Optional[dict[str, Any]]: """ 阻力/支撑提醒:首次用 5m 收盘越线判定;后续用已存方向,兼容 direction=watch。 """ count = int(notification_count or 0) up, lo, c = float(upper), float(lower), float(close) if count <= 0: return detect_rs_box_break(c, up, lo) br = rs_break_from_direction(direction, up, lo) if br: return br d = (direction or "").strip().lower() if d not in ("", KEY_DIRECTION_WATCH): return None br = detect_rs_box_break(c, up, lo) if br: return br return rs_break_infer_from_close(c, up, lo) def notify_interval_elapsed( last_notified_at: Optional[str], interval_min: int, now_dt: datetime, ) -> bool: if not last_notified_at: return False last_dt = _parse_notify_datetime(last_notified_at) if last_dt is None: return False return (now_dt - last_dt).total_seconds() >= max(1, int(interval_min)) * 60 def format_auto_amp_line(amp_ok: bool, amp_pct: float, min_pct: float) -> str: return ( f"突破越过幅度:{'通过' if amp_ok else '不通过'}" f"({round(float(amp_pct), 4)}%,要求 > {min_pct}%)" ) def format_auto_confirm_line(confirm_ok: bool, cfm_close, edge_price, direction: str) -> str: side = "箱外上方" if (direction or "").lower() == "long" else "箱外下方" return ( f"第二根确认:{'通过' if confirm_ok else '不通过'}" f"(确认收盘 {cfm_close},须收于{side},关键位 {edge_price})" ) def key_monitor_rule_template_context( *, kline_timeframe: str, key_breakout_amp_min_pct: float, key_volume_ma_bars: int, key_volume_ratio_min: float, key_auto_min_planned_rr: float, key_daily_volume_rank_max: int, key_confirm_breakout_bar: int, key_confirm_bar: int, key_alert_max_times: int, key_alert_interval_minutes: int, key_stop_outside_breakout_pct: float, key_trend_stop_outside_pct: float, false_breakout_validity_hours: int, trigger_entry_validity_hours: int | None = None, ) -> dict[str, Any]: """关键位监控页规则说明表格(Jinja key_rule_ctx)。""" from false_breakout_key_monitor_lib import ( FALSE_BREAKOUT_OFFSET_PCT, FALSE_BREAKOUT_RR, FALSE_BREAKOUT_SL_PCT, ) from trigger_entry_key_monitor_lib import TRIGGER_ENTRY_VALIDITY_HOURS te_hours = ( int(trigger_entry_validity_hours) if trigger_entry_validity_hours is not None else TRIGGER_ENTRY_VALIDITY_HOURS ) return { "tf": (kline_timeframe or "5m").strip(), "amp_min_pct": key_breakout_amp_min_pct, "vol_ma_bars": key_volume_ma_bars, "vol_ratio_min": key_volume_ratio_min, "min_rr": key_auto_min_planned_rr, "vol_rank_max": key_daily_volume_rank_max, "breakout_bar": key_confirm_breakout_bar, "confirm_bar": key_confirm_bar, "alert_max": key_alert_max_times, "alert_interval_min": key_alert_interval_minutes, "stop_outside_pct": key_stop_outside_breakout_pct, "trend_stop_outside_pct": key_trend_stop_outside_pct, "fb_offset_pct": FALSE_BREAKOUT_OFFSET_PCT, "fb_sl_pct": FALSE_BREAKOUT_SL_PCT, "fb_rr": FALSE_BREAKOUT_RR, "fb_valid_hours": false_breakout_validity_hours, "trigger_entry_validity_hours": te_hours, }