"""触价开仓关键位监控:程序盯价、触达计划入场后市价成交(四所共用逻辑)。""" from __future__ import annotations from datetime import datetime, timedelta from typing import Any, Callable, Optional from false_breakout_key_monitor_lib import ( _parse_created_at, expires_at_text, is_false_breakout_expired, ) from strategy_trend_lib import trend_dca_level_reached TRIGGER_ENTRY_MONITOR_TYPE = "触价开仓" TRIGGER_ENTRY_VALIDITY_HOURS = 24 TRIGGER_ENTRY_CLOSE_FILLED = "trigger_entry_filled" TRIGGER_ENTRY_CLOSE_TP_INVALIDATE = "trigger_tp_invalidate" TRIGGER_ENTRY_CLOSE_EXPIRED = "trigger_entry_expired" TRIGGER_ENTRY_CLOSE_EXCHANGE_FAILED = "trigger_exchange_failed" KEY_ENTRY_REASON_TRIGGER = "关键位触价开仓" def is_trigger_entry_key_monitor_type(monitor_type: Optional[str]) -> bool: return (monitor_type or "").strip() == TRIGGER_ENTRY_MONITOR_TYPE def trigger_entry_reached(direction: str, mark_price: float, entry: float) -> bool: return trend_dca_level_reached(direction, mark_price, entry) def trigger_entry_invalidate_by_tp(direction: str, mark_price: float, take_profit: float) -> bool: """未开仓前标记价先触达止盈侧则失效。""" try: m = float(mark_price) tp = float(take_profit) except (TypeError, ValueError): return False d = (direction or "long").strip().lower() if d == "short": return m <= tp return m >= tp def validate_trigger_entry_geometry( direction: str, entry: float, stop_loss: float, take_profit: float, mark_at_add: Optional[float] = None, ) -> Optional[str]: """返回错误文案;合法则 None。""" try: e = float(entry) sl = float(stop_loss) tp = float(take_profit) except (TypeError, ValueError): return "入场价、止损、止盈须为有效数字" if e <= 0 or sl <= 0 or tp <= 0: return "入场价、止损、止盈须大于 0" d = (direction or "long").strip().lower() if d == "long": if not (sl < e < tp): return "做多:须满足 止损 < 入场价 < 止盈" if mark_at_add is not None and float(mark_at_add) >= tp: return "做多:当前价已不低于止盈,无法添加触价开仓" elif d == "short": if not (tp < e < sl): return "做空:须满足 止盈 < 入场价 < 止损" if mark_at_add is not None and float(mark_at_add) <= tp: return "做空:当前价已不高于止盈,无法添加触价开仓" else: return "方向须为 long 或 short" return None def validate_trigger_entry_rr( direction: str, entry: float, stop_loss: float, take_profit: float, min_rr: float, calc_rr_ratio: Callable[..., Optional[float]], ) -> Optional[str]: rr = calc_rr_ratio(direction, entry, stop_loss, take_profit) if rr is None or rr <= float(min_rr): fmt = f"{rr:.4f}" if rr is not None else "无法计算" return f"计划盈亏比 {fmt}:1 未达要求(>{float(min_rr)}:1)" return None def is_trigger_entry_expired( created_at: Any, now: datetime, *, hours: int = TRIGGER_ENTRY_VALIDITY_HOURS, ) -> bool: return is_false_breakout_expired(created_at, now, hours=hours) def trigger_entry_expires_at_text( created_at: Any, *, hours: int = TRIGGER_ENTRY_VALIDITY_HOURS, ) -> str: return expires_at_text(created_at, hours=hours) def count_pending_trigger_entries(conn: Any, trading_day: str) -> int: td = (trading_day or "").strip() if not td: return 0 row = conn.execute( "SELECT COUNT(*) FROM key_monitors WHERE monitor_type=? AND session_date=?", (TRIGGER_ENTRY_MONITOR_TYPE, td), ).fetchone() return int(row[0] if row else 0) def check_trigger_entry_intent_limit( conn: Any, trading_day: str, opens_today: int, hard_limit: int, ) -> tuple[bool, str]: """当日开仓意图:已成交次数 + 待触发触价条数。""" if int(hard_limit) <= 0: return True, "" pending = count_pending_trigger_entries(conn, trading_day) total = int(opens_today) + pending if total >= int(hard_limit): return ( False, f"本交易日开仓意图已达上限(已开 {int(opens_today)} + 待触发 {pending} / 硬上限 {int(hard_limit)})", ) return True, "" def trigger_entry_gate_preview( *, entry_display: str, take_profit_display: str, created_at: Any = None, now: Optional[datetime] = None, expired: bool = False, tp_invalidated: bool = False, hours: int = TRIGGER_ENTRY_VALIDITY_HOURS, ) -> dict[str, Any]: now_dt = now or datetime.now() is_exp = expired or is_trigger_entry_expired(created_at, now_dt, hours=hours) exp_txt = trigger_entry_expires_at_text(created_at, hours=hours) if tp_invalidated: status = "止盈侧失效" elif is_exp: status = "已过期" else: status = "触价待触发" metrics_parts: list[str] = [f"TP:{take_profit_display}"] if exp_txt != "—": metrics_parts.append(f"截至:{exp_txt}") return { "summary": f"触价 E={entry_display} {status}", "metrics": " ".join(metrics_parts), "gate_ok": not is_exp and not tp_invalidated, }