feat: add false breakout key monitor for BTC/ETH on three exchanges

Place limit orders outside key levels with fixed SL and 1.5 RR, 24h expiry, separate stats, and full-margin mode guard.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-09 10:13:53 +08:00
parent 2786acf884
commit 7cb55f6557
10 changed files with 903 additions and 88 deletions
+119
View File
@@ -0,0 +1,119 @@
"""假突破关键位监控:BTC/ETH 限价挂单(共享计算与校验)。"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any, Optional
FALSE_BREAKOUT_MONITOR_TYPE = "假突破"
FALSE_BREAKOUT_SYMBOLS = frozenset({"BTC/USDT", "ETH/USDT"})
FALSE_BREAKOUT_OFFSET_PCT = 0.1
FALSE_BREAKOUT_SL_PCT = 0.5
FALSE_BREAKOUT_RR = 1.5
FALSE_BREAKOUT_VALIDITY_HOURS = 24
def is_false_breakout_key_monitor_type(monitor_type: Optional[str]) -> bool:
return (monitor_type or "").strip() == FALSE_BREAKOUT_MONITOR_TYPE
def is_limit_key_monitor_type(monitor_type: Optional[str]) -> bool:
from fib_key_monitor_lib import is_fib_key_monitor_type
return is_fib_key_monitor_type(monitor_type) or is_false_breakout_key_monitor_type(monitor_type)
def normalize_false_breakout_symbol(symbol: Optional[str]) -> Optional[str]:
s = (symbol or "").strip().upper()
if not s:
return None
if "/" not in s:
s = f"{s}/USDT"
return s if s in FALSE_BREAKOUT_SYMBOLS else None
def storage_bounds_from_key_price(direction: str, key_price: float) -> tuple[float, float]:
k = float(key_price)
if k <= 0:
raise ValueError("关键价位须为正数")
d = (direction or "long").strip().lower()
if d == "short":
return k, k * 0.9999
if d == "long":
return k * 1.0001, k
raise ValueError("方向须为 long 或 short")
def key_price_from_row(direction: str, upper: Any, lower: Any) -> Optional[float]:
d = (direction or "long").strip().lower()
try:
if d == "short":
v = float(upper)
else:
v = float(lower)
except (TypeError, ValueError):
return None
return v if v > 0 else None
def calc_false_breakout_plan(direction: str, key_price: float) -> Optional[tuple[float, float, float]]:
try:
k = float(key_price)
except (TypeError, ValueError):
return None
if k <= 0:
return None
d = (direction or "long").strip().lower()
off = FALSE_BREAKOUT_OFFSET_PCT / 100.0
sl_pct = FALSE_BREAKOUT_SL_PCT / 100.0
rr = float(FALSE_BREAKOUT_RR)
if d == "short":
entry = k * (1 + off)
sl = entry * (1 + sl_pct)
risk = sl - entry
if risk <= 0:
return None
tp = entry - risk * rr
return entry, sl, tp
if d == "long":
entry = k * (1 - off)
sl = entry * (1 - sl_pct)
risk = entry - sl
if risk <= 0:
return None
tp = entry + risk * rr
return entry, sl, tp
return None
def _parse_created_at(raw: Any) -> Optional[datetime]:
s = str(raw or "").strip()
if not s:
return None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(s[:26], fmt)
except ValueError:
continue
try:
return datetime.fromisoformat(s.replace("Z", "+00:00")[:32])
except ValueError:
return None
def is_false_breakout_expired(
created_at: Any,
now: datetime,
*,
hours: int = FALSE_BREAKOUT_VALIDITY_HOURS,
) -> bool:
dt = _parse_created_at(created_at)
if dt is None:
return False
return now >= dt + timedelta(hours=hours)
def expires_at_text(created_at: Any, *, hours: int = FALSE_BREAKOUT_VALIDITY_HOURS) -> str:
dt = _parse_created_at(created_at)
if dt is None:
return ""
return (dt + timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")