7cb55f6557
Place limit orders outside key levels with fixed SL and 1.5 RR, 24h expiry, separate stats, and full-margin mode guard. Co-authored-by: Cursor <cursoragent@cursor.com>
120 lines
3.5 KiB
Python
120 lines
3.5 KiB
Python
"""假突破关键位监控:BTC/ETH 限价挂单(共享计算与校验)。"""
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Optional
|
|
|
|
FALSE_BREAKOUT_MONITOR_TYPE = "假突破"
|
|
FALSE_BREAKOUT_SYMBOLS = frozenset({"BTC/USDT", "ETH/USDT"})
|
|
FALSE_BREAKOUT_OFFSET_PCT = 0.1
|
|
FALSE_BREAKOUT_SL_PCT = 0.5
|
|
FALSE_BREAKOUT_RR = 1.5
|
|
FALSE_BREAKOUT_VALIDITY_HOURS = 24
|
|
|
|
|
|
def is_false_breakout_key_monitor_type(monitor_type: Optional[str]) -> bool:
|
|
return (monitor_type or "").strip() == FALSE_BREAKOUT_MONITOR_TYPE
|
|
|
|
|
|
def is_limit_key_monitor_type(monitor_type: Optional[str]) -> bool:
|
|
from fib_key_monitor_lib import is_fib_key_monitor_type
|
|
|
|
return is_fib_key_monitor_type(monitor_type) or is_false_breakout_key_monitor_type(monitor_type)
|
|
|
|
|
|
def normalize_false_breakout_symbol(symbol: Optional[str]) -> Optional[str]:
|
|
s = (symbol or "").strip().upper()
|
|
if not s:
|
|
return None
|
|
if "/" not in s:
|
|
s = f"{s}/USDT"
|
|
return s if s in FALSE_BREAKOUT_SYMBOLS else None
|
|
|
|
|
|
def storage_bounds_from_key_price(direction: str, key_price: float) -> tuple[float, float]:
|
|
k = float(key_price)
|
|
if k <= 0:
|
|
raise ValueError("关键价位须为正数")
|
|
d = (direction or "long").strip().lower()
|
|
if d == "short":
|
|
return k, k * 0.9999
|
|
if d == "long":
|
|
return k * 1.0001, k
|
|
raise ValueError("方向须为 long 或 short")
|
|
|
|
|
|
def key_price_from_row(direction: str, upper: Any, lower: Any) -> Optional[float]:
|
|
d = (direction or "long").strip().lower()
|
|
try:
|
|
if d == "short":
|
|
v = float(upper)
|
|
else:
|
|
v = float(lower)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
return v if v > 0 else None
|
|
|
|
|
|
def calc_false_breakout_plan(direction: str, key_price: float) -> Optional[tuple[float, float, float]]:
|
|
try:
|
|
k = float(key_price)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
if k <= 0:
|
|
return None
|
|
d = (direction or "long").strip().lower()
|
|
off = FALSE_BREAKOUT_OFFSET_PCT / 100.0
|
|
sl_pct = FALSE_BREAKOUT_SL_PCT / 100.0
|
|
rr = float(FALSE_BREAKOUT_RR)
|
|
if d == "short":
|
|
entry = k * (1 + off)
|
|
sl = entry * (1 + sl_pct)
|
|
risk = sl - entry
|
|
if risk <= 0:
|
|
return None
|
|
tp = entry - risk * rr
|
|
return entry, sl, tp
|
|
if d == "long":
|
|
entry = k * (1 - off)
|
|
sl = entry * (1 - sl_pct)
|
|
risk = entry - sl
|
|
if risk <= 0:
|
|
return None
|
|
tp = entry + risk * rr
|
|
return entry, sl, tp
|
|
return None
|
|
|
|
|
|
def _parse_created_at(raw: Any) -> Optional[datetime]:
|
|
s = str(raw or "").strip()
|
|
if not s:
|
|
return None
|
|
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S"):
|
|
try:
|
|
return datetime.strptime(s[:26], fmt)
|
|
except ValueError:
|
|
continue
|
|
try:
|
|
return datetime.fromisoformat(s.replace("Z", "+00:00")[:32])
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def is_false_breakout_expired(
|
|
created_at: Any,
|
|
now: datetime,
|
|
*,
|
|
hours: int = FALSE_BREAKOUT_VALIDITY_HOURS,
|
|
) -> bool:
|
|
dt = _parse_created_at(created_at)
|
|
if dt is None:
|
|
return False
|
|
return now >= dt + timedelta(hours=hours)
|
|
|
|
|
|
def expires_at_text(created_at: Any, *, hours: int = FALSE_BREAKOUT_VALIDITY_HOURS) -> str:
|
|
dt = _parse_created_at(created_at)
|
|
if dt is None:
|
|
return "—"
|
|
return (dt + timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")
|