Files
crypto_monitor/false_breakout_key_monitor_lib.py
dekun fa59fc1273 fix: show limit-order gate for false breakout monitors
False breakout used box/convergence volume/break gates in the UI; now shows pending limit order status like fib monitors.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-11 08:22:41 +08:00

146 lines
4.4 KiB
Python

"""假突破关键位监控:BTC/ETH 限价挂单(共享计算与校验)。"""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Any, Optional
FALSE_BREAKOUT_MONITOR_TYPE = "假突破"
FALSE_BREAKOUT_SYMBOLS = frozenset({"BTC/USDT", "ETH/USDT"})
FALSE_BREAKOUT_OFFSET_PCT = 0.1
FALSE_BREAKOUT_SL_PCT = 0.5
FALSE_BREAKOUT_RR = 1.5
FALSE_BREAKOUT_VALIDITY_HOURS = 24
def is_false_breakout_key_monitor_type(monitor_type: Optional[str]) -> bool:
return (monitor_type or "").strip() == FALSE_BREAKOUT_MONITOR_TYPE
def is_limit_key_monitor_type(monitor_type: Optional[str]) -> bool:
from fib_key_monitor_lib import is_fib_key_monitor_type
return is_fib_key_monitor_type(monitor_type) or is_false_breakout_key_monitor_type(monitor_type)
def normalize_false_breakout_symbol(symbol: Optional[str]) -> Optional[str]:
s = (symbol or "").strip().upper()
if not s:
return None
if "/" not in s:
s = f"{s}/USDT"
return s if s in FALSE_BREAKOUT_SYMBOLS else None
def storage_bounds_from_key_price(direction: str, key_price: float) -> tuple[float, float]:
k = float(key_price)
if k <= 0:
raise ValueError("关键价位须为正数")
d = (direction or "long").strip().lower()
if d == "short":
return k, k * 0.9999
if d == "long":
return k * 1.0001, k
raise ValueError("方向须为 long 或 short")
def key_price_from_row(direction: str, upper: Any, lower: Any) -> Optional[float]:
d = (direction or "long").strip().lower()
try:
if d == "short":
v = float(upper)
else:
v = float(lower)
except (TypeError, ValueError):
return None
return v if v > 0 else None
def calc_false_breakout_plan(direction: str, key_price: float) -> Optional[tuple[float, float, float]]:
try:
k = float(key_price)
except (TypeError, ValueError):
return None
if k <= 0:
return None
d = (direction or "long").strip().lower()
off = FALSE_BREAKOUT_OFFSET_PCT / 100.0
sl_pct = FALSE_BREAKOUT_SL_PCT / 100.0
rr = float(FALSE_BREAKOUT_RR)
if d == "short":
entry = k * (1 + off)
sl = entry * (1 + sl_pct)
risk = sl - entry
if risk <= 0:
return None
tp = entry - risk * rr
return entry, sl, tp
if d == "long":
entry = k * (1 - off)
sl = entry * (1 - sl_pct)
risk = entry - sl
if risk <= 0:
return None
tp = entry + risk * rr
return entry, sl, tp
return None
def _parse_created_at(raw: Any) -> Optional[datetime]:
s = str(raw or "").strip()
if not s:
return None
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(s[:26], fmt)
except ValueError:
continue
try:
return datetime.fromisoformat(s.replace("Z", "+00:00")[:32])
except ValueError:
return None
def is_false_breakout_expired(
created_at: Any,
now: datetime,
*,
hours: int = FALSE_BREAKOUT_VALIDITY_HOURS,
) -> bool:
dt = _parse_created_at(created_at)
if dt is None:
return False
return now >= dt + timedelta(hours=hours)
def expires_at_text(created_at: Any, *, hours: int = FALSE_BREAKOUT_VALIDITY_HOURS) -> str:
dt = _parse_created_at(created_at)
if dt is None:
return ""
return (dt + timedelta(hours=hours)).strftime("%Y-%m-%d %H:%M:%S")
def false_breakout_gate_preview(
*,
entry_display: str,
limit_order_id: Any = None,
created_at: Any = None,
now: Optional[datetime] = None,
hours: int = FALSE_BREAKOUT_VALIDITY_HOURS,
) -> dict[str, Any]:
"""假突破门控预览:限价挂单状态,不使用箱体/收敛的量破幅二确门控。"""
now_dt = now or datetime.now()
expired = is_false_breakout_expired(created_at, now_dt, hours=hours)
exp_txt = expires_at_text(created_at, hours=hours)
status = "已过期" if expired else "等待成交"
metrics_parts: list[str] = []
oid = str(limit_order_id or "").strip()
if oid:
metrics_parts.append(f"限价单:{oid}")
if exp_txt != "":
metrics_parts.append(f"截至:{exp_txt}")
return {
"summary": f"假突破 挂E={entry_display} {status}",
"metrics": " ".join(metrics_parts),
"gate_ok": not expired,
}