"""假突破关键位监控:BTC/ETH 限价挂单(共享计算与校验)。""" from __future__ import annotations from datetime import datetime, timedelta from typing import Any, Optional FALSE_BREAKOUT_MONITOR_TYPE = "假突破" FALSE_BREAKOUT_SYMBOLS = frozenset({"BTC/USDT", "ETH/USDT"}) FALSE_BREAKOUT_OFFSET_PCT = 0.1 FALSE_BREAKOUT_SL_PCT = 0.5 FALSE_BREAKOUT_RR = 1.5 FALSE_BREAKOUT_VALIDITY_HOURS = 24 def is_false_breakout_key_monitor_type(monitor_type: Optional[str]) -> bool: return (monitor_type or "").strip() == FALSE_BREAKOUT_MONITOR_TYPE def is_limit_key_monitor_type(monitor_type: Optional[str]) -> bool: from fib_key_monitor_lib import is_fib_key_monitor_type return is_fib_key_monitor_type(monitor_type) or is_false_breakout_key_monitor_type(monitor_type) def normalize_false_breakout_symbol(symbol: Optional[str]) -> Optional[str]: s = (symbol or "").strip().upper() if not s: return None if "/" not in s: s = f"{s}/USDT" return s if s in FALSE_BREAKOUT_SYMBOLS else None def storage_bounds_from_key_price(direction: str, key_price: float) -> tuple[float, float]: k = float(key_price) if k <= 0: raise ValueError("关键价位须为正数") d = (direction or "long").strip().lower() if d == "short": return k, k * 0.9999 if d == "long": return k * 1.0001, k raise ValueError("方向须为 long 或 short") def key_price_from_row(direction: str, upper: Any, lower: Any) -> Optional[float]: d = (direction or "long").strip().lower() try: if d == "short": v = float(upper) else: v = float(lower) except (TypeError, ValueError): return None return v if v > 0 else None def calc_false_breakout_plan(direction: str, key_price: float) -> Optional[tuple[float, float, float]]: try: k = float(key_price) except (TypeError, ValueError): return None if k <= 0: return None d = (direction or "long").strip().lower() off = FALSE_BREAKOUT_OFFSET_PCT / 100.0 sl_pct = FALSE_BREAKOUT_SL_PCT / 100.0 rr = float(FALSE_BREAKOUT_RR) if d == "short": entry = k * (1 + off) sl = entry * (1 + sl_pct) risk = sl - entry if risk <= 0: return None tp = entry - risk * rr return entry, sl, tp if d == "long": entry = k * (1 - off) sl = entry * (1 - sl_pct) risk = entry - sl if risk <= 0: return None tp = entry + risk * rr return entry, sl, tp return None def _parse_created_at(raw: Any) -> Optional[datetime]: s = str(raw or "").strip() if not s: return None for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S"): try: return datetime.strptime(s[:26], fmt) except ValueError: continue try: return datetime.fromisoformat(s.replace("Z", "+00:00")[:32]) except ValueError: return None def is_false_breakout_expired( created_at: Any, now: datetime, *, hours: int = FALSE_BREAKOUT_VALIDITY_HOURS, ) -> bool: dt = _parse_created_at(created_at) if dt is None: return False return now >= dt + timedelta(hours=hours) def expires_at_text(created_at: Any, *, hours: int = FALSE_BREAKOUT_VALIDITY_HOURS) -> str: dt = _parse_created_at(created_at) if dt is None: return "—" return (dt + timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")