"""止盈止损守护:程序本地监控价位,触发后向 CTP 发平仓单(不向交易所挂 SL/TP 限价单)。""" from __future__ import annotations import logging import threading import time from typing import Any, Callable, Optional from contract_specs import get_contract_spec from ctp_symbol import ths_to_vnpy_symbol from vnpy_bridge import ( ctp_cancel_order, ctp_get_tick_price, ctp_list_active_orders, ctp_list_positions, ctp_status, execute_order, ) logger = logging.getLogger(__name__) CHECK_INTERVAL_SEC = 5 PLACE_COOLDOWN_SEC = 30 _last_close_attempt: dict[int, float] = {} MONITOR_ORDER_COLUMNS = ( "ALTER TABLE trade_order_monitors ADD COLUMN sl_vt_order_id TEXT", "ALTER TABLE trade_order_monitors ADD COLUMN tp_vt_order_id TEXT", ) def ensure_monitor_order_columns(conn) -> None: for sql in MONITOR_ORDER_COLUMNS: try: conn.execute(sql) except Exception: pass def _tick_size(ths_code: str) -> float: return float(get_contract_spec(ths_code).get("tick_size") or 1.0) def _match_symbol(ctp_sym: str, ths: str) -> bool: a = (ctp_sym or "").lower() b = (ths or "").lower() if a == b: return True try: vnpy_sym, _ = ths_to_vnpy_symbol(ths) return a == vnpy_sym.lower() except Exception: return False def _close_order_direction(hold_direction: str) -> str: return "short" if hold_direction == "long" else "long" def _price_near(a: float, b: float, tick: float) -> bool: return abs(float(a) - float(b)) <= max(tick * 0.501, 1e-9) def _find_close_order( active_orders: list[dict], *, ths_code: str, hold_direction: str, price: float, tick: float, ) -> Optional[dict]: close_dir = _close_order_direction(hold_direction) for o in active_orders: sym = o.get("symbol") or "" if not _match_symbol(sym, ths_code): continue offset_s = (o.get("offset") or "").upper() if "CLOSE" not in offset_s: continue if (o.get("direction") or "") != close_dir: continue if not _price_near(o.get("price") or 0, price, tick): continue return o return None def _find_position(positions: list[dict], ths_code: str, direction: str) -> Optional[dict]: for p in positions: if int(p.get("lots") or 0) <= 0: continue if (p.get("direction") or "long") != direction: continue if _match_symbol(p.get("symbol") or "", ths_code): return p return None def _can_close_now(monitor_id: int, *, cooldown: int = PLACE_COOLDOWN_SEC) -> bool: last = _last_close_attempt.get(monitor_id, 0.0) return (time.time() - last) >= cooldown def _mark_close_attempt(monitor_id: int) -> None: _last_close_attempt[monitor_id] = time.time() def _sl_triggered(direction: str, sl: float, mark: float, tick: float) -> bool: buf = max(tick * 0.01, 1e-9) if direction == "long": return mark <= sl + buf return mark >= sl - buf def _tp_triggered(direction: str, tp: float, mark: float, tick: float) -> bool: buf = max(tick * 0.01, 1e-9) if direction == "long": return mark >= tp - buf return mark <= tp + buf def cancel_monitor_exit_orders( conn, mon: dict, *, mode: str, ) -> int: """撤销该监控在交易所残留的旧版止盈止损平仓挂单。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return 0 sym = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() tick = _tick_size(sym) active = ctp_list_active_orders(mode) cancelled = 0 seen: set[str] = set() def _try_cancel(vt_id: str) -> None: nonlocal cancelled oid = str(vt_id or "").strip() if not oid or oid in seen: return seen.add(oid) if ctp_cancel_order(mode, oid): cancelled += 1 for kind, price_key in (("sl", "stop_loss"), ("tp", "take_profit")): raw = mon.get(price_key) try: px = float(raw) if raw is not None else None except (TypeError, ValueError): px = None stored = str(mon.get(f"{kind}_vt_order_id") or "") if stored: _try_cancel(stored) if px is not None: found = _find_close_order( active, ths_code=sym, hold_direction=direction, price=px, tick=tick, ) if found: _try_cancel(str(found.get("order_id") or "")) if cancelled: conn.execute( "UPDATE trade_order_monitors SET sl_vt_order_id=NULL, tp_vt_order_id=NULL WHERE id=?", (mon["id"],), ) conn.commit() return cancelled def reconcile_monitors_without_position(conn, mode: str) -> int: """持仓已平时:关闭监控并撤销残留止盈止损挂单。""" if not ctp_status(mode).get("connected"): return 0 positions = ctp_list_positions(mode) position_keys: set[tuple[str, str]] = set() for p in positions: if int(p.get("lots") or 0) <= 0: continue sym = (p.get("symbol") or "").lower() direction = p.get("direction") or "long" position_keys.add((sym, direction)) closed = 0 for r in conn.execute("SELECT * FROM trade_order_monitors WHERE status='active'").fetchall(): mon = dict(r) ms = mon.get("symbol") or "" md = mon.get("direction") or "long" matched = False for ps, pd in position_keys: if pd != md: continue if _match_symbol(ps, ms): matched = True break if matched: continue try: cancel_monitor_exit_orders(conn, mon, mode=mode) except Exception as exc: logger.warning("cancel exit orders monitor=%s: %s", mon.get("id"), exc) conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mon["id"],)) closed += 1 if closed: conn.commit() return closed def _execute_local_close( conn, mon: dict, *, mode: str, mark: float, reason: str, ) -> None: sym = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() positions = ctp_list_positions(mode) pos = _find_position(positions, sym, direction) if not pos: reconcile_monitors_without_position(conn, mode) return lots = int(pos.get("lots") or mon.get("lots") or 1) offset = "close_long" if direction == "long" else "close_short" cancel_monitor_exit_orders(conn, mon, mode=mode) execute_order( conn, mode=mode, offset=offset, symbol=sym, direction=direction, lots=lots, price=mark, order_type="market", ) conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mon["id"],)) conn.commit() logger.info( "止盈止损本地触发 monitor=%s reason=%s %s %s %d手 @%s", mon.get("id"), reason, sym, direction, lots, mark, ) def check_monitors_locally(conn, mode: str) -> int: """扫描 active 监控,本地比对行情;触发止盈/止损(含跳空穿透)后立刻平仓。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return 0 reconcile_monitors_without_position(conn, mode) closed = 0 rows = conn.execute( "SELECT * FROM trade_order_monitors WHERE status='active'" ).fetchall() for r in rows: mon = dict(r) mid = int(mon.get("id") or 0) sym = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() if mon.get("sl_vt_order_id") or mon.get("tp_vt_order_id"): cancel_monitor_exit_orders(conn, mon, mode=mode) sl = mon.get("stop_loss") tp = mon.get("take_profit") try: sl_f = float(sl) if sl is not None else None tp_f = float(tp) if tp is not None else None except (TypeError, ValueError): sl_f, tp_f = None, None if sl_f is None and tp_f is None: continue positions = ctp_list_positions(mode) if not _find_position(positions, sym, direction): continue mark = ctp_get_tick_price(mode, sym) if mark is None or mark <= 0: continue tick = _tick_size(sym) reason = None if tp_f is not None and _tp_triggered(direction, tp_f, mark, tick): reason = "take_profit" elif sl_f is not None and _sl_triggered(direction, sl_f, mark, tick): reason = "stop_loss" if not reason: continue if mid > 0 and not _can_close_now(mid): continue try: _mark_close_attempt(mid) _execute_local_close(conn, mon, mode=mode, mark=mark, reason=reason) closed += 1 except Exception as exc: logger.warning("SL/TP local close failed monitor=%s: %s", mid, exc) return closed def place_monitor_exit_orders( conn, mon: dict, *, mode: str, force: bool = False, ) -> dict[str, Any]: """兼容旧 API:本地监控模式不再向交易所挂 SL/TP 单,仅清理旧挂单。""" del force ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return {"ok": False, "error": "CTP 未连接", "placed": []} cancelled = cancel_monitor_exit_orders(conn, mon, mode=mode) msg = "程序本地监控中,不向交易所挂止盈止损单" if cancelled: msg += f";已撤销旧版柜台挂单 {cancelled} 笔" return {"ok": True, "message": msg, "placed": [], "local_monitor": True} def monitor_order_status( mon: dict, *, mode: str, ths_code: str, direction: str, ) -> dict[str, bool]: """返回本地监控状态(非交易所挂单状态)。""" del mode, ths_code, direction sl = mon.get("stop_loss") if mon else None tp = mon.get("take_profit") if mon else None try: sl_f = float(sl) if sl is not None else None tp_f = float(tp) if tp is not None else None except (TypeError, ValueError): sl_f, tp_f = None, None return { "sl_order_active": sl_f is not None, "tp_order_active": tp_f is not None, "sl_monitoring": sl_f is not None, "tp_monitoring": tp_f is not None, "needs_sl_order": False, "needs_tp_order": False, } def sync_all_sl_tp_orders(conn, mode: str) -> int: """兼容旧 worker 入口:执行本地监控检查。""" del mode return 0 def start_sl_tp_guard_worker( *, db_path: str, get_mode_fn: Callable[[], str], init_tables_fn: Callable | None = None, interval: int = CHECK_INTERVAL_SEC, ) -> None: from db_conn import connect_db def _loop() -> None: time.sleep(8) while True: try: mode = get_mode_fn() if ctp_status(mode).get("connected"): conn = connect_db(db_path) try: if init_tables_fn: init_tables_fn(conn) n = check_monitors_locally(conn, mode) if n: logger.info("止盈止损本地监控: 触发平仓 %d 笔", n) finally: conn.close() except Exception as exc: logger.warning("sl_tp_guard worker: %s", exc) time.sleep(max(3, interval)) threading.Thread(target=_loop, daemon=True, name="sl-tp-guard").start()