Files
qihuo/key_monitor_lib.py
T
dekun bfb1b95471 Improve key monitor form with bar period, box direction, and labeled fields.
Match order-monitor layout; persist bar_period and enforce upper-direction filter for box breakouts.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-29 07:24:36 +08:00

407 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""关键位监控:5 分钟收盘触发、支阻区微信提醒、箱体/收敛自动单。"""
from __future__ import annotations
import logging
from datetime import datetime, timedelta
from typing import Any, Callable, Optional
from zoneinfo import ZoneInfo
from contract_specs import get_contract_spec
from kline_chart import fetch_market_klines
logger = logging.getLogger(__name__)
TZ = ZoneInfo("Asia/Shanghai")
TYPE_BOX = "箱体突破"
TYPE_CONV = "收敛突破"
TYPE_ZONE = "关键支阻区"
AUTO_TYPES = (TYPE_BOX, TYPE_CONV)
ZONE_TYPES = (TYPE_ZONE, "关键阻力位", "关键支撑位")
ALERT_MAX_PUSH = 3
ALERT_INTERVAL_SEC = 300
SL_TICK_BUFFER = 2
DEFAULT_BAR_PERIOD = "5m"
PERIOD_MINUTES_MAP = {
"1m": 1, "2m": 2, "3m": 3, "5m": 5, "15m": 15, "30m": 30,
"1h": 60, "2h": 120, "4h": 240, "d": 1440, "1d": 1440,
}
def key_monitor_periods() -> list[dict[str, str]]:
"""关键位监控可选 K 线周期(触发用)。"""
from kline_chart import MARKET_PERIODS
allowed = frozenset({"5m", "15m", "30m", "1h", "2h", "4h", "d"})
return [p for p in MARKET_PERIODS if p["key"] in allowed]
def normalize_bar_period(raw: str) -> str:
valid = {p["key"] for p in key_monitor_periods()}
k = (raw or DEFAULT_BAR_PERIOD).strip()
return k if k in valid else DEFAULT_BAR_PERIOD
def bar_period_label(key: str) -> str:
k = normalize_bar_period(key)
for p in key_monitor_periods():
if p["key"] == k:
return p["label"]
return k
def bar_period_minutes(period: str) -> int:
return PERIOD_MINUTES_MAP.get(normalize_bar_period(period), 5)
def normalize_monitor_type(raw: str) -> str:
t = (raw or "").strip()
if t in ("关键阻力位", "关键支撑位"):
return TYPE_ZONE
return t
def is_auto_trade_type(typ: str) -> bool:
return normalize_monitor_type(typ) in AUTO_TYPES
def is_zone_type(typ: str) -> bool:
return normalize_monitor_type(typ) == TYPE_ZONE
def resolve_order_direction(break_side: str, trade_mode: str) -> str:
"""突破方向 + 顺势/反转 → 下单方向。"""
side = (break_side or "").strip().lower()
mode = (trade_mode or "顺势").strip()
if mode == "反转":
return "short" if side == "upper" else "long"
return "long" if side == "upper" else "short"
def break_direction_label(break_side: str) -> tuple[str, str]:
if break_side == "upper":
return "向上突破上沿", "long"
return "向下突破下沿", "short"
def calc_breakout_sl_tp(
*,
sym: str,
direction: str,
entry: float,
bar: dict,
risk_reward: float,
) -> tuple[float, float]:
tick = float(get_contract_spec(sym).get("tick_size") or 1.0)
bar_high = float(bar.get("high") or entry)
bar_low = float(bar.get("low") or entry)
if direction == "long":
sl = bar_low - SL_TICK_BUFFER * tick
risk = max(entry - sl, tick)
tp = entry + risk * risk_reward
else:
sl = bar_high + SL_TICK_BUFFER * tick
risk = max(sl - entry, tick)
tp = entry - risk * risk_reward
return sl, tp
def _parse_bar_time(raw: str) -> Optional[datetime]:
s = (raw or "").strip().replace("T", " ")
if not s:
return None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M"):
try:
return datetime.strptime(s[:19], fmt).replace(tzinfo=TZ)
except ValueError:
continue
return None
def last_closed_bar(
bars: list[dict],
period_minutes: int = 5,
now: Optional[datetime] = None,
) -> Optional[dict]:
"""取最近一根已收盘 K 线。"""
dnow = now or datetime.now(TZ)
mins = max(1, int(period_minutes or 5))
for bar in reversed(bars or []):
dt = _parse_bar_time(str(bar.get("time") or ""))
if not dt:
continue
bar_end = dt + timedelta(minutes=mins)
if dnow >= bar_end:
return bar
return None
def detect_break_side(close: float, upper: float, lower: float) -> Optional[str]:
if close > upper:
return "upper"
if close < lower:
return "lower"
return None
def fetch_closed_bar(
sym: str,
period: str,
*,
db_path: str,
trading_mode: str,
) -> Optional[dict]:
p = normalize_bar_period(period)
try:
data = fetch_market_klines(
sym,
p,
db_path=db_path,
trading_mode=trading_mode,
prefer_ctp=True,
)
bars = data.get("bars") or []
return last_closed_bar(bars, bar_period_minutes(p))
except Exception as exc:
logger.debug("key monitor kline %s %s: %s", sym, p, exc)
return None
def _now_iso() -> str:
return datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S")
def archive_monitor(conn, pid: int) -> None:
conn.execute(
"UPDATE key_monitors SET status='archived', archived_at=? WHERE id=?",
(_now_iso(), pid),
)
def format_zone_alert(
row: dict,
*,
break_side: str,
close_price: float,
bar_time: str,
push_index: int,
max_push: int = ALERT_MAX_PUSH,
) -> str:
name = row.get("symbol_name") or row.get("symbol") or ""
upper = float(row.get("upper") or 0)
lower = float(row.get("lower") or 0)
break_label, alert_dir = break_direction_label(break_side)
dir_cn = "多头(long" if alert_dir == "long" else "空头(short"
boundary = upper if break_side == "upper" else lower
lines = [
f"📌 {name} 关键位突破提醒({push_index}/{max_push}",
"",
"🧾 突破概要",
"📌 类型:关键支阻区",
f"⏱ 触发时间:{bar_time}",
f"📊 上沿:{upper:g}|下沿:{lower:g}",
f"💹 触发收盘:{close_price:g}",
f"🎯 {break_label}{dir_cn}",
f"📍 突破价位:{boundary:g}",
"",
"📎 说明",
f"· 人工盯盘,共推送 {max_push} 次(间隔约 {ALERT_INTERVAL_SEC // 60} 分钟)",
"· 推送完毕后本条监控自动结案",
"· 不参与自动开仓",
]
return "\n".join(lines)
def format_auto_breakout_msg(
row: dict,
*,
break_side: str,
direction: str,
entry: float,
sl: float,
tp: float,
lots: int,
bar_time: str,
ok: bool,
detail: str = "",
) -> str:
name = row.get("symbol_name") or row.get("symbol") or ""
typ = normalize_monitor_type(row.get("monitor_type") or "")
trade_mode = row.get("trade_mode") or "顺势"
break_label, _ = break_direction_label(break_side)
dir_cn = "做多" if direction == "long" else "做空"
rr = float(row.get("risk_reward") or 2)
period_label = bar_period_label(row.get("bar_period") or DEFAULT_BAR_PERIOD)
lines = [
f"{'' if ok else ''} {name} {typ}自动单",
f"{period_label} 收盘:{bar_time}",
f"🎯 {break_label} · {trade_mode} · {dir_cn}",
f"💹 入场:{entry:g} 止损:{sl:g} 止盈:{tp:g}(盈亏比 {rr:g}",
f"📦 手数:{lots}",
]
if int(row.get("trailing_be") or 0):
lines.append("🛡 已开启移动保本(达目标盈亏比自动止盈)")
if detail:
lines.append(detail)
return "\n".join(lines)
def _should_send_followup_push(row: dict, now: datetime) -> bool:
count = int(row.get("alert_push_count") or 0)
if count <= 0 or count >= ALERT_MAX_PUSH:
return False
last_raw = (row.get("alert_last_push_at") or "").strip()
if not last_raw:
return True
try:
last = datetime.fromisoformat(last_raw.replace("Z", "")).replace(tzinfo=TZ)
except ValueError:
return True
return (now - last).total_seconds() >= ALERT_INTERVAL_SEC
def _record_zone_push(conn, pid: int, *, break_side: str, bar_time: str, now_iso: str) -> int:
row = conn.execute(
"SELECT alert_push_count FROM key_monitors WHERE id=?", (pid,),
).fetchone()
count = int(row["alert_push_count"] or 0) + 1
conn.execute(
"""UPDATE key_monitors SET
alert_push_count=?, alert_last_push_at=?, alert_break_side=?,
breakout_bar_time=?, upper_triggered=?, lower_triggered=?
WHERE id=?""",
(
count,
now_iso,
break_side,
bar_time,
1 if break_side == "upper" else 0,
1 if break_side == "lower" else 0,
pid,
),
)
return count
def _handle_zone_alert(
conn,
row: dict,
*,
break_side: str,
bar: dict,
send_wechat: Callable[[str], None],
) -> None:
pid = int(row["id"])
now_iso = _now_iso()
bar_time = str(bar.get("time") or "")[:19]
close_price = float(bar.get("close") or 0)
bar_key = bar_time
last_bar = (row.get("last_trigger_bar") or "").strip()
if last_bar == bar_key and int(row.get("alert_push_count") or 0) > 0:
return
push_n = _record_zone_push(conn, pid, break_side=break_side, bar_time=bar_time, now_iso=now_iso)
conn.execute(
"UPDATE key_monitors SET last_trigger_bar=?, alert_close_price=? WHERE id=?",
(bar_key, close_price, pid),
)
send_wechat(format_zone_alert(
row, break_side=break_side, close_price=close_price, bar_time=bar_time, push_index=push_n,
))
if push_n >= ALERT_MAX_PUSH:
archive_monitor(conn, pid)
def run_key_monitor_check(
conn,
*,
db_path: str,
get_trading_mode_fn: Callable[[], str],
send_wechat: Callable[[str], None],
execute_breakout_fn: Callable[[Any, dict, str], tuple[bool, str]] | None = None,
) -> None:
"""扫描 active 关键位监控(5m 收盘触发)。"""
rows = conn.execute(
"SELECT * FROM key_monitors WHERE status='active' OR status IS NULL"
).fetchall()
mode = get_trading_mode_fn()
now = datetime.now(TZ)
for r in rows:
row = dict(r)
pid = int(row["id"])
sym = (row.get("symbol") or "").strip()
typ = normalize_monitor_type(row.get("monitor_type") or "")
if not sym:
continue
try:
upper = float(row.get("upper") or 0)
lower = float(row.get("lower") or 0)
except (TypeError, ValueError):
continue
if upper <= lower:
continue
alert_count = int(row.get("alert_push_count") or 0)
if is_zone_type(typ) and alert_count > 0:
if alert_count >= ALERT_MAX_PUSH:
archive_monitor(conn, pid)
continue
if _should_send_followup_push(row, now):
break_side = (row.get("alert_break_side") or "upper").strip()
bar_time = (row.get("breakout_bar_time") or row.get("last_trigger_bar") or "")[:19]
close_price = float(row.get("alert_close_price") or 0)
if close_price <= 0:
close_price = float(row.get("upper") if break_side == "upper" else row.get("lower") or 0)
push_n = _record_zone_push(
conn, pid, break_side=break_side, bar_time=bar_time, now_iso=_now_iso(),
)
send_wechat(format_zone_alert(
row, break_side=break_side, close_price=close_price, bar_time=bar_time, push_index=push_n,
))
if push_n >= ALERT_MAX_PUSH:
archive_monitor(conn, pid)
continue
bar_period = normalize_bar_period(row.get("bar_period") or DEFAULT_BAR_PERIOD)
bar = fetch_closed_bar(sym, bar_period, db_path=db_path, trading_mode=mode)
if not bar:
continue
bar_time = str(bar.get("time") or "")[:19]
if not bar_time:
continue
if (row.get("last_trigger_bar") or "").strip() == bar_time:
continue
try:
close_price = float(bar.get("close") or 0)
except (TypeError, ValueError):
continue
break_side = detect_break_side(close_price, upper, lower)
if not break_side:
continue
if is_zone_type(typ):
_handle_zone_alert(conn, row, break_side=break_side, bar=bar, send_wechat=send_wechat)
continue
if is_auto_trade_type(typ):
if not execute_breakout_fn:
logger.warning("key monitor auto trade skipped: no executor")
continue
ok, detail = execute_breakout_fn(conn, row, bar, break_side)
conn.execute(
"UPDATE key_monitors SET last_trigger_bar=?, breakout_bar_time=?, alert_break_side=? WHERE id=?",
(bar_time, bar_time, break_side, pid),
)
if ok:
archive_monitor(conn, pid)