Files
crypto_monitor/key_monitor_lib.py
T
dekun 073a382d41 Unify key support/resistance monitor type and fix form parity.
Merge 关键阻力位/关键支撑位 into 关键支撑阻力, share key_monitor_form.js across hub and new-tab views, and add hub shortcut to /key_monitor.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-19 08:31:14 +08:00

367 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
关键位监控:阻力/支撑双向提醒与箱体/收敛自动门控的共享逻辑。
"""
from __future__ import annotations
from datetime import datetime
from typing import Any, Optional
KEY_MONITOR_AUTO_TYPES = frozenset({"箱体突破", "收敛突破"})
KEY_MONITOR_RS_TYPE = "关键支撑阻力"
KEY_MONITOR_RS_LEGACY_TYPES = frozenset({"关键阻力位", "关键支撑位"})
KEY_MONITOR_RS_TYPES = frozenset({KEY_MONITOR_RS_TYPE}) | KEY_MONITOR_RS_LEGACY_TYPES
KEY_MONITOR_ALERT_ONLY_TYPES = frozenset({KEY_MONITOR_RS_TYPE}) | KEY_MONITOR_RS_LEGACY_TYPES
KEY_DIRECTION_WATCH = "watch"
def is_rs_key_monitor_type(monitor_type: str) -> bool:
return (monitor_type or "").strip() in KEY_MONITOR_RS_TYPES
def rs_monitor_type_label(monitor_type: str) -> str:
"""展示用:旧库里的阻力/支撑合并为「关键支撑阻力」。"""
if is_rs_key_monitor_type(monitor_type):
return KEY_MONITOR_RS_TYPE
return (monitor_type or "").strip()
def rs_monitor_type_for_storage(monitor_type: str) -> str:
if is_rs_key_monitor_type(monitor_type):
return KEY_MONITOR_RS_TYPE
return (monitor_type or "").strip()
def calc_breakout_breach_pct(direction: str, close: float, upper: float, lower: float) -> float:
"""突破 K 收盘相对关键位的越过幅度(%)。未越过对应边界时返回 0。"""
direction = (direction or "long").strip().lower()
c = float(close)
if direction == "long":
u = float(upper)
if u <= 0 or c <= u:
return 0.0
return (c - u) / u * 100.0
lo = float(lower)
if lo <= 0 or c >= lo:
return 0.0
return (lo - c) / lo * 100.0
def auto_amp_ok(
direction: str,
close_b: float,
upper: float,
lower: float,
min_pct: float,
) -> tuple[bool, float]:
breach = calc_breakout_breach_pct(direction, close_b, upper, lower)
return breach > float(min_pct), breach
def auto_confirm_ok(direction: str, cfm_close: float, upper: float, lower: float) -> bool:
"""确认 K 收盘须在箱体外(不得回到 [lower, upper] 内)。"""
direction = (direction or "long").strip().lower()
c = float(cfm_close)
if direction == "long":
return c > float(upper)
return c < float(lower)
def detect_rs_box_break(close: float, upper: float, lower: float) -> Optional[dict[str, Any]]:
"""
阻力/支撑人工盯盘:最近 5m 收盘突破上沿或下沿(严格 > / <)。
上沿优先:同一根 K 不可能同时满足两者。
"""
u, lo, c = float(upper), float(lower), float(close)
if c > u:
return {
"break_side": "upper",
"direction": "long",
"edge_price": u,
"key_price": u,
"break_label": "向上突破上沿",
}
if c < lo:
return {
"break_side": "lower",
"direction": "short",
"edge_price": lo,
"key_price": lo,
"break_label": "向下突破下沿",
}
return None
def rs_break_from_direction(direction: str, upper: float, lower: float) -> Optional[dict[str, Any]]:
"""已触发后根据入库方向还原突破边(long=上沿,short=下沿)。"""
d = (direction or "").strip().lower()
if d == "long":
return {
"break_side": "upper",
"direction": "long",
"edge_price": float(upper),
"key_price": float(upper),
"break_label": "向上突破上沿",
}
if d == "short":
return {
"break_side": "lower",
"direction": "short",
"edge_price": float(lower),
"key_price": float(lower),
"break_label": "向下突破下沿",
}
return None
def rs_break_infer_from_close(close: float, upper: float, lower: float) -> dict[str, Any]:
"""
续发提醒时价格已回到箱体内:按收盘价相对箱体中线推断首次突破边,
保证第 2/3 次企业微信提醒仍能发出。
"""
mid = (float(upper) + float(lower)) / 2.0
if float(close) >= mid:
br = rs_break_from_direction("long", upper, lower)
else:
br = rs_break_from_direction("short", upper, lower)
if br:
return br
return {
"break_side": "upper",
"direction": "long",
"edge_price": float(upper),
"key_price": float(upper),
"break_label": "向上突破上沿",
}
def _parse_notify_datetime(raw: Optional[str]) -> Optional[datetime]:
s = str(raw or "").strip()
if not s:
return None
try:
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
if dt.tzinfo is not None:
dt = dt.replace(tzinfo=None)
return dt
except Exception:
pass
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(s[:19], fmt)
except Exception:
continue
return None
def claim_rs_level_notify(
conn: Any,
monitor_id: int,
notify_index: int,
direction: str,
notified_at: str,
bar_ts: Optional[int],
*,
prior_count: Optional[int] = None,
) -> bool:
"""
原子占位:仅在 notification_count 仍为 prior_count 时推进到 notify_index。
须在发送企业微信之前调用并 commit,避免 (2/3) 重复刷屏。
"""
prior = int(prior_count if prior_count is not None else notify_index - 1)
if prior < 0 or notify_index != prior + 1:
return False
bar_val: Optional[int] = None
if bar_ts is not None:
try:
bar_val = int(bar_ts)
except (TypeError, ValueError):
bar_val = None
cur = conn.execute(
"UPDATE key_monitors SET notification_count=?, direction=?, last_notified_at=?, last_rs_bar_ts=? "
"WHERE id=? AND COALESCE(notification_count,0)=?",
(notify_index, direction, notified_at, bar_val, int(monitor_id), prior),
)
return int(cur.rowcount or 0) > 0
def parse_last_rs_bar_ts(row: Any) -> Optional[int]:
if row is None:
return None
try:
keys = row.keys() if hasattr(row, "keys") else []
except Exception:
keys = []
raw = row["last_rs_bar_ts"] if "last_rs_bar_ts" in keys else None
if raw is None:
return None
try:
return int(raw)
except (TypeError, ValueError):
return None
def run_rs_level_alert_tick(
row: Any,
close: float,
bar_ts: Optional[int],
now_dt: datetime,
*,
default_max_notify: int,
default_interval_min: int,
) -> Optional[dict[str, Any]]:
"""
判定本轮回合是否应推送阻力/支撑提醒。
首条:仅在新闭合 K 越线时触发;发送前须 claim_rs_level_notify 占位防轮询/多进程重复。
"""
up, lo = float(row["upper"]), float(row["lower"])
if up <= lo:
return None
count = int(row["notification_count"] or 0)
max_n = max(1, int(row["max_notify"] or default_max_notify))
interval = max(1, int(row["notify_interval_min"] or default_interval_min))
if count >= max_n:
return None
bar_ts_i: Optional[int] = None
if bar_ts is not None:
try:
bar_ts_i = int(bar_ts)
except (TypeError, ValueError):
bar_ts_i = None
last_bar_i = parse_last_rs_bar_ts(row)
if count == 0:
br = detect_rs_box_break(close, up, lo)
if not br:
return None
if bar_ts_i is not None and last_bar_i is not None and bar_ts_i == last_bar_i:
return None
return {
"break_info": br,
"notify_index": 1,
"prior_count": 0,
"notify_max": max_n,
"interval_min": interval,
"bar_ts": bar_ts_i,
}
if not notify_interval_elapsed(row["last_notified_at"], interval, now_dt):
return None
br = resolve_rs_break_for_alert(count, row["direction"], close, up, lo)
if not br:
return None
return {
"break_info": br,
"notify_index": count + 1,
"prior_count": count,
"notify_max": max_n,
"interval_min": interval,
"bar_ts": bar_ts_i,
}
def resolve_rs_break_for_alert(
notification_count: int,
direction: Optional[str],
close: float,
upper: float,
lower: float,
) -> Optional[dict[str, Any]]:
"""
阻力/支撑提醒:首次用 5m 收盘越线判定;后续用已存方向,兼容 direction=watch。
"""
count = int(notification_count or 0)
up, lo, c = float(upper), float(lower), float(close)
if count <= 0:
return detect_rs_box_break(c, up, lo)
br = rs_break_from_direction(direction, up, lo)
if br:
return br
d = (direction or "").strip().lower()
if d not in ("", KEY_DIRECTION_WATCH):
return None
br = detect_rs_box_break(c, up, lo)
if br:
return br
return rs_break_infer_from_close(c, up, lo)
def notify_interval_elapsed(
last_notified_at: Optional[str],
interval_min: int,
now_dt: datetime,
) -> bool:
if not last_notified_at:
return False
last_dt = _parse_notify_datetime(last_notified_at)
if last_dt is None:
return False
return (now_dt - last_dt).total_seconds() >= max(1, int(interval_min)) * 60
def format_auto_amp_line(amp_ok: bool, amp_pct: float, min_pct: float) -> str:
return (
f"突破越过幅度:{'通过' if amp_ok else '不通过'}"
f"{round(float(amp_pct), 4)}%,要求 > {min_pct}%"
)
def format_auto_confirm_line(confirm_ok: bool, cfm_close, edge_price, direction: str) -> str:
side = "箱外上方" if (direction or "").lower() == "long" else "箱外下方"
return (
f"第二根确认:{'通过' if confirm_ok else '不通过'}"
f"(确认收盘 {cfm_close},须收于{side},关键位 {edge_price}"
)
def key_monitor_rule_template_context(
*,
kline_timeframe: str,
key_breakout_amp_min_pct: float,
key_volume_ma_bars: int,
key_volume_ratio_min: float,
key_auto_min_planned_rr: float,
key_daily_volume_rank_max: int,
key_confirm_breakout_bar: int,
key_confirm_bar: int,
key_alert_max_times: int,
key_alert_interval_minutes: int,
key_stop_outside_breakout_pct: float,
key_trend_stop_outside_pct: float,
false_breakout_validity_hours: int,
trigger_entry_validity_hours: int | None = None,
) -> dict[str, Any]:
"""关键位监控页规则说明表格(Jinja key_rule_ctx)。"""
from false_breakout_key_monitor_lib import (
FALSE_BREAKOUT_OFFSET_PCT,
FALSE_BREAKOUT_RR,
FALSE_BREAKOUT_SL_PCT,
)
from trigger_entry_key_monitor_lib import TRIGGER_ENTRY_VALIDITY_HOURS
te_hours = (
int(trigger_entry_validity_hours)
if trigger_entry_validity_hours is not None
else TRIGGER_ENTRY_VALIDITY_HOURS
)
return {
"tf": (kline_timeframe or "5m").strip(),
"amp_min_pct": key_breakout_amp_min_pct,
"vol_ma_bars": key_volume_ma_bars,
"vol_ratio_min": key_volume_ratio_min,
"min_rr": key_auto_min_planned_rr,
"vol_rank_max": key_daily_volume_rank_max,
"breakout_bar": key_confirm_breakout_bar,
"confirm_bar": key_confirm_bar,
"alert_max": key_alert_max_times,
"alert_interval_min": key_alert_interval_minutes,
"stop_outside_pct": key_stop_outside_breakout_pct,
"trend_stop_outside_pct": key_trend_stop_outside_pct,
"fb_offset_pct": FALSE_BREAKOUT_OFFSET_PCT,
"fb_sl_pct": FALSE_BREAKOUT_SL_PCT,
"fb_rr": FALSE_BREAKOUT_RR,
"fb_valid_hours": false_breakout_validity_hours,
"trigger_entry_validity_hours": te_hours,
}