""" 全仓杠杆模式下:撤销已添加的箱体/收敛/斐波关键位监控并微信说明。 """ from __future__ import annotations from typing import Any, Callable, Iterable, Optional from fib_key_monitor_lib import FIB_KEY_MONITOR_TYPES, is_fib_key_monitor_type from key_monitor_lib import KEY_MONITOR_AUTO_TYPES from position_sizing_lib import is_full_margin_mode, mode_label_zh def monitor_type_disallowed_in_full_margin(monitor_type: str) -> bool: mt = (monitor_type or "").strip() if mt in KEY_MONITOR_AUTO_TYPES: return True return is_fib_key_monitor_type(mt) def purge_disallowed_key_monitors( conn: Any, *, sizing_mode: str, select_rows: Callable[[Any], Iterable[Any]], cancel_fib_limit: Callable[[Any], None], delete_monitor: Callable[[Any, int], None], send_wechat: Callable[[str], None], row_symbol: Callable[[Any], str] = lambda r: str(r["symbol"] or ""), row_monitor_type: Callable[[Any], str] = lambda r: str(r["monitor_type"] or ""), row_id: Callable[[Any], int] = lambda r: int(r["id"]), ) -> int: if not is_full_margin_mode(sizing_mode): return 0 removed = [] for row in select_rows(conn): mt = row_monitor_type(row) if not monitor_type_disallowed_in_full_margin(mt): continue sym = row_symbol(row) kid = row_id(row) if is_fib_key_monitor_type(mt): try: cancel_fib_limit(row) except Exception: pass delete_monitor(conn, kid) removed.append((sym, mt, kid)) if removed: lines = [f"· {s} {t} (#{i})" for s, t, i in removed[:12]] if len(removed) > 12: lines.append(f"… 共 {len(removed)} 条") send_wechat( "# ⚠️ 全仓杠杆模式:已自动撤销关键位监控\n" f"计仓模式:{mode_label_zh(sizing_mode)}(仅 env 可切换,须无仓)\n" "已撤销:箱体突破 / 收敛突破 / 斐波回调监控(不可与全仓杠杆并存)\n" + "\n".join(lines) ) return len(removed)