# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """关键位监控:5 分钟收盘触发、支阻区微信提醒、箱体/收敛自动单。""" from __future__ import annotations import logging from datetime import datetime, timedelta from typing import Any, Callable, Optional from zoneinfo import ZoneInfo from contract_specs import get_contract_spec from kline_chart import fetch_market_klines logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") TYPE_BOX = "箱体突破" TYPE_CONV = "收敛突破" TYPE_ZONE = "关键支阻区" AUTO_TYPES = (TYPE_BOX, TYPE_CONV) ZONE_TYPES = (TYPE_ZONE, "关键阻力位", "关键支撑位") ALERT_MAX_PUSH = 3 ALERT_INTERVAL_SEC = 300 SL_TICK_BUFFER = 2 DEFAULT_BAR_PERIOD = "5m" PERIOD_MINUTES_MAP = { "1m": 1, "2m": 2, "3m": 3, "5m": 5, "15m": 15, "30m": 30, "1h": 60, "2h": 120, "4h": 240, "d": 1440, "1d": 1440, } def key_monitor_periods() -> list[dict[str, str]]: """关键位监控可选 K 线周期(触发用)。""" from kline_chart import MARKET_PERIODS allowed = frozenset({"5m", "15m", "30m", "1h", "2h", "4h", "d"}) return [p for p in MARKET_PERIODS if p["key"] in allowed] def normalize_bar_period(raw: str) -> str: valid = {p["key"] for p in key_monitor_periods()} k = (raw or DEFAULT_BAR_PERIOD).strip() return k if k in valid else DEFAULT_BAR_PERIOD def bar_period_label(key: str) -> str: k = normalize_bar_period(key) for p in key_monitor_periods(): if p["key"] == k: return p["label"] return k def bar_period_minutes(period: str) -> int: return PERIOD_MINUTES_MAP.get(normalize_bar_period(period), 5) def normalize_monitor_type(raw: str) -> str: t = (raw or "").strip() if t in ("关键阻力位", "关键支撑位"): return TYPE_ZONE return t def is_auto_trade_type(typ: str) -> bool: return normalize_monitor_type(typ) in AUTO_TYPES def is_zone_type(typ: str) -> bool: return normalize_monitor_type(typ) == TYPE_ZONE def resolve_order_direction(break_side: str, trade_mode: str) -> str: """突破方向 + 顺势/反转 → 下单方向。""" side = (break_side or "").strip().lower() mode = (trade_mode or "顺势").strip() if mode == "反转": return "short" if side == "upper" else "long" return "long" if side == "upper" else "short" def break_direction_label(break_side: str) -> tuple[str, str]: if break_side == "upper": return "向上突破上沿", "long" return "向下突破下沿", "short" def calc_breakout_sl_tp( *, sym: str, direction: str, entry: float, bar: dict, risk_reward: float, ) -> tuple[float, float]: tick = float(get_contract_spec(sym).get("tick_size") or 1.0) bar_high = float(bar.get("high") or entry) bar_low = float(bar.get("low") or entry) if direction == "long": sl = bar_low - SL_TICK_BUFFER * tick risk = max(entry - sl, tick) tp = entry + risk * risk_reward else: sl = bar_high + SL_TICK_BUFFER * tick risk = max(sl - entry, tick) tp = entry - risk * risk_reward return sl, tp def _parse_bar_time(raw: str) -> Optional[datetime]: s = (raw or "").strip().replace("T", " ") if not s: return None for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M"): try: return datetime.strptime(s[:19], fmt).replace(tzinfo=TZ) except ValueError: continue return None def last_closed_bar( bars: list[dict], period_minutes: int = 5, now: Optional[datetime] = None, ) -> Optional[dict]: """取最近一根已收盘 K 线。""" dnow = now or datetime.now(TZ) mins = max(1, int(period_minutes or 5)) for bar in reversed(bars or []): dt = _parse_bar_time(str(bar.get("time") or "")) if not dt: continue bar_end = dt + timedelta(minutes=mins) if dnow >= bar_end: return bar return None def detect_break_side(close: float, upper: float, lower: float) -> Optional[str]: if close > upper: return "upper" if close < lower: return "lower" return None def fetch_closed_bar( sym: str, period: str, *, db_path: str, trading_mode: str, ) -> Optional[dict]: p = normalize_bar_period(period) try: data = fetch_market_klines( sym, p, db_path=db_path, trading_mode=trading_mode, prefer_ctp=True, ) bars = data.get("bars") or [] return last_closed_bar(bars, bar_period_minutes(p)) except Exception as exc: logger.debug("key monitor kline %s %s: %s", sym, p, exc) return None def _now_iso() -> str: return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S") def archive_monitor(conn, pid: int) -> None: conn.execute( "UPDATE key_monitors SET status='archived', archived_at=? WHERE id=?", (_now_iso(), pid), ) def format_zone_alert( row: dict, *, break_side: str, close_price: float, bar_time: str, push_index: int, max_push: int = ALERT_MAX_PUSH, ) -> str: name = row.get("symbol_name") or row.get("symbol") or "" upper = float(row.get("upper") or 0) lower = float(row.get("lower") or 0) break_label, alert_dir = break_direction_label(break_side) dir_cn = "多头(long)" if alert_dir == "long" else "空头(short)" boundary = upper if break_side == "upper" else lower lines = [ f"📌 {name} 关键位突破提醒({push_index}/{max_push})", "", "🧾 突破概要", "📌 类型:关键支阻区", f"⏱ 触发时间:{bar_time}", f"📊 上沿:{upper:g}|下沿:{lower:g}", f"💹 触发收盘:{close_price:g}", f"🎯 {break_label}({dir_cn})", f"📍 突破价位:{boundary:g}", "", "📎 说明", f"· 人工盯盘,共推送 {max_push} 次(间隔约 {ALERT_INTERVAL_SEC // 60} 分钟)", "· 推送完毕后本条监控自动结案", "· 不参与自动开仓", ] return "\n".join(lines) def format_auto_breakout_msg( row: dict, *, break_side: str, direction: str, entry: float, sl: float, tp: float, lots: int, bar_time: str, ok: bool, detail: str = "", ) -> str: name = row.get("symbol_name") or row.get("symbol") or "" typ = normalize_monitor_type(row.get("monitor_type") or "") trade_mode = row.get("trade_mode") or "顺势" break_label, _ = break_direction_label(break_side) dir_cn = "做多" if direction == "long" else "做空" rr = float(row.get("risk_reward") or 2) period_label = bar_period_label(row.get("bar_period") or DEFAULT_BAR_PERIOD) lines = [ f"{'✅' if ok else '❌'} {name} {typ}自动单", f"⏱ {period_label} 收盘:{bar_time}", f"🎯 {break_label} · {trade_mode} · {dir_cn}", f"💹 入场:{entry:g} 止损:{sl:g} 止盈:{tp:g}(盈亏比 {rr:g})", f"📦 手数:{lots}", ] if int(row.get("trailing_be") or 0): lines.append("🛡 已开启移动保本(达目标盈亏比自动止盈)") if detail: lines.append(detail) return "\n".join(lines) def _should_send_followup_push(row: dict, now: datetime) -> bool: count = int(row.get("alert_push_count") or 0) if count <= 0 or count >= ALERT_MAX_PUSH: return False last_raw = (row.get("alert_last_push_at") or "").strip() if not last_raw: return True try: last = datetime.fromisoformat(last_raw.replace("Z", "")).replace(tzinfo=TZ) except ValueError: return True return (now - last).total_seconds() >= ALERT_INTERVAL_SEC def _record_zone_push(conn, pid: int, *, break_side: str, bar_time: str, now_iso: str) -> int: row = conn.execute( "SELECT alert_push_count FROM key_monitors WHERE id=?", (pid,), ).fetchone() count = int(row["alert_push_count"] or 0) + 1 conn.execute( """UPDATE key_monitors SET alert_push_count=?, alert_last_push_at=?, alert_break_side=?, breakout_bar_time=?, upper_triggered=?, lower_triggered=? WHERE id=?""", ( count, now_iso, break_side, bar_time, 1 if break_side == "upper" else 0, 1 if break_side == "lower" else 0, pid, ), ) return count def _handle_zone_alert( conn, row: dict, *, break_side: str, bar: dict, send_wechat: Callable[[str], None], ) -> None: pid = int(row["id"]) now_iso = _now_iso() bar_time = str(bar.get("time") or "")[:19] close_price = float(bar.get("close") or 0) bar_key = bar_time last_bar = (row.get("last_trigger_bar") or "").strip() if last_bar == bar_key and int(row.get("alert_push_count") or 0) > 0: return push_n = _record_zone_push(conn, pid, break_side=break_side, bar_time=bar_time, now_iso=now_iso) conn.execute( "UPDATE key_monitors SET last_trigger_bar=?, alert_close_price=? WHERE id=?", (bar_key, close_price, pid), ) send_wechat(format_zone_alert( row, break_side=break_side, close_price=close_price, bar_time=bar_time, push_index=push_n, )) if push_n >= ALERT_MAX_PUSH: archive_monitor(conn, pid) def run_key_monitor_check( conn, *, db_path: str, get_trading_mode_fn: Callable[[], str], send_wechat: Callable[[str], None], execute_breakout_fn: Callable[[Any, dict, str], tuple[bool, str]] | None = None, ) -> None: """扫描 active 关键位监控(5m 收盘触发)。""" rows = conn.execute( "SELECT * FROM key_monitors WHERE status='active' OR status IS NULL" ).fetchall() mode = get_trading_mode_fn() now = datetime.now(TZ) for r in rows: row = dict(r) pid = int(row["id"]) sym = (row.get("symbol") or "").strip() typ = normalize_monitor_type(row.get("monitor_type") or "") if not sym: continue try: upper = float(row.get("upper") or 0) lower = float(row.get("lower") or 0) except (TypeError, ValueError): continue if upper <= lower: continue alert_count = int(row.get("alert_push_count") or 0) if is_zone_type(typ) and alert_count > 0: if alert_count >= ALERT_MAX_PUSH: archive_monitor(conn, pid) continue if _should_send_followup_push(row, now): break_side = (row.get("alert_break_side") or "upper").strip() bar_time = (row.get("breakout_bar_time") or row.get("last_trigger_bar") or "")[:19] close_price = float(row.get("alert_close_price") or 0) if close_price <= 0: close_price = float(row.get("upper") if break_side == "upper" else row.get("lower") or 0) push_n = _record_zone_push( conn, pid, break_side=break_side, bar_time=bar_time, now_iso=_now_iso(), ) send_wechat(format_zone_alert( row, break_side=break_side, close_price=close_price, bar_time=bar_time, push_index=push_n, )) if push_n >= ALERT_MAX_PUSH: archive_monitor(conn, pid) continue bar_period = normalize_bar_period(row.get("bar_period") or DEFAULT_BAR_PERIOD) bar = fetch_closed_bar(sym, bar_period, db_path=db_path, trading_mode=mode) if not bar: continue bar_time = str(bar.get("time") or "")[:19] if not bar_time: continue if (row.get("last_trigger_bar") or "").strip() == bar_time: continue try: close_price = float(bar.get("close") or 0) except (TypeError, ValueError): continue break_side = detect_break_side(close_price, upper, lower) if not break_side: continue if is_zone_type(typ): _handle_zone_alert(conn, row, break_side=break_side, bar=bar, send_wechat=send_wechat) continue if is_auto_trade_type(typ): if not execute_breakout_fn: logger.warning("key monitor auto trade skipped: no executor") continue ok, detail = execute_breakout_fn(conn, row, bar, break_side) conn.execute( "UPDATE key_monitors SET last_trigger_bar=?, breakout_bar_time=?, alert_break_side=? WHERE id=?", (bar_time, bar_time, break_side, pid), ) if ok: archive_monitor(conn, pid)