"""止盈止损守护:检测持仓快照,自动/手动向 CTP 挂平仓限价委托。""" from __future__ import annotations import logging import threading import time from typing import Any, Callable, Optional from contract_specs import get_contract_spec from ctp_symbol import ths_to_vnpy_symbol from vnpy_bridge import ( ctp_cancel_order, ctp_get_tick_price, ctp_list_active_orders, ctp_list_positions, ctp_status, execute_order, ) logger = logging.getLogger(__name__) CHECK_INTERVAL_SEC = 20 PLACE_COOLDOWN_SEC = 120 _last_place_attempt: dict[tuple[int, str], float] = {} MONITOR_ORDER_COLUMNS = ( "ALTER TABLE trade_order_monitors ADD COLUMN sl_vt_order_id TEXT", "ALTER TABLE trade_order_monitors ADD COLUMN tp_vt_order_id TEXT", ) def ensure_monitor_order_columns(conn) -> None: for sql in MONITOR_ORDER_COLUMNS: try: conn.execute(sql) except Exception: pass def _tick_size(ths_code: str) -> float: return float(get_contract_spec(ths_code).get("tick_size") or 1.0) def _match_symbol(ctp_sym: str, ths: str) -> bool: a = (ctp_sym or "").lower() b = (ths or "").lower() if a == b: return True try: vnpy_sym, _ = ths_to_vnpy_symbol(ths) return a == vnpy_sym.lower() except Exception: return False def _close_order_direction(hold_direction: str) -> str: return "short" if hold_direction == "long" else "long" def _price_near(a: float, b: float, tick: float) -> bool: return abs(float(a) - float(b)) <= max(tick * 0.501, 1e-9) def _is_resting_exit_price( hold_direction: str, kind: str, exit_price: float, mark: Optional[float], tick: float, ) -> bool: """限价平仓单是否会挂在盘口(而非立即成交)。""" if mark is None or mark <= 0: return True buf = max(tick * 0.5, 1e-9) if hold_direction == "long": if kind == "sl": return exit_price < mark - buf return exit_price > mark + buf if kind == "sl": return exit_price > mark + buf return exit_price < mark - buf def _find_close_order( active_orders: list[dict], *, ths_code: str, hold_direction: str, price: float, tick: float, ) -> Optional[dict]: close_dir = _close_order_direction(hold_direction) for o in active_orders: sym = o.get("symbol") or "" if not _match_symbol(sym, ths_code): continue offset_s = (o.get("offset") or "").upper() if "CLOSE" not in offset_s: continue if (o.get("direction") or "") != close_dir: continue if not _price_near(o.get("price") or 0, price, tick): continue return o return None def _find_position(positions: list[dict], ths_code: str, direction: str) -> Optional[dict]: for p in positions: if int(p.get("lots") or 0) <= 0: continue if (p.get("direction") or "long") != direction: continue if _match_symbol(p.get("symbol") or "", ths_code): return p return None def _can_place_now(monitor_id: int, kind: str, *, cooldown: int = PLACE_COOLDOWN_SEC) -> bool: last = _last_place_attempt.get((monitor_id, kind), 0.0) return (time.time() - last) >= cooldown def _mark_place_attempt(monitor_id: int, kind: str) -> None: _last_place_attempt[(monitor_id, kind)] = time.time() def _order_still_active(active_orders: list[dict], vt_order_id: str) -> bool: if not vt_order_id: return False oid = str(vt_order_id).strip() for o in active_orders: if str(o.get("order_id") or "") == oid: return True return False def cancel_monitor_exit_orders( conn, mon: dict, *, mode: str, ) -> int: """撤销该监控对应的止盈止损平仓挂单。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return 0 sym = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() tick = _tick_size(sym) active = ctp_list_active_orders(mode) cancelled = 0 seen: set[str] = set() def _try_cancel(vt_id: str) -> None: nonlocal cancelled oid = str(vt_id or "").strip() if not oid or oid in seen: return seen.add(oid) if ctp_cancel_order(mode, oid): cancelled += 1 for kind, price_key in (("sl", "stop_loss"), ("tp", "take_profit")): raw = mon.get(price_key) try: px = float(raw) if raw is not None else None except (TypeError, ValueError): px = None stored = str(mon.get(f"{kind}_vt_order_id") or "") if stored: _try_cancel(stored) if px is not None: found = _find_close_order( active, ths_code=sym, hold_direction=direction, price=px, tick=tick, ) if found: _try_cancel(str(found.get("order_id") or "")) if cancelled: conn.execute( "UPDATE trade_order_monitors SET sl_vt_order_id=NULL, tp_vt_order_id=NULL WHERE id=?", (mon["id"],), ) conn.commit() return cancelled def reconcile_monitors_without_position(conn, mode: str) -> int: """持仓已平时:关闭监控并撤销残留止盈止损挂单。""" if not ctp_status(mode).get("connected"): return 0 positions = ctp_list_positions(mode) position_keys: set[tuple[str, str]] = set() for p in positions: if int(p.get("lots") or 0) <= 0: continue sym = (p.get("symbol") or "").lower() direction = p.get("direction") or "long" position_keys.add((sym, direction)) closed = 0 for r in conn.execute("SELECT * FROM trade_order_monitors WHERE status='active'").fetchall(): mon = dict(r) ms = mon.get("symbol") or "" md = mon.get("direction") or "long" matched = False for ps, pd in position_keys: if pd != md: continue if _match_symbol(ps, ms): matched = True break if matched: continue try: cancel_monitor_exit_orders(conn, mon, mode=mode) except Exception as exc: logger.warning("cancel exit orders monitor=%s: %s", mon.get("id"), exc) conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mon["id"],)) closed += 1 if closed: conn.commit() return closed def place_monitor_exit_orders( conn, mon: dict, *, mode: str, force: bool = False, ) -> dict[str, Any]: """按开仓快照中的止损/止盈价,向 CTP 挂平仓限价单(缺则补)。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return {"ok": False, "error": "CTP 未连接", "placed": []} sym = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() sl = mon.get("stop_loss") tp = mon.get("take_profit") try: sl_f = float(sl) if sl is not None else None tp_f = float(tp) if tp is not None else None except (TypeError, ValueError): sl_f, tp_f = None, None if sl_f is None and tp_f is None: return {"ok": False, "error": "快照无止盈止损,无法委托", "placed": []} positions = ctp_list_positions(mode) pos = _find_position(positions, sym, direction) if not pos: reconcile_monitors_without_position(conn, mode) return {"ok": False, "error": "柜台无对应持仓(可能已被止盈/止损平掉)", "placed": []} lots = int(pos.get("lots") or 1) if lots != int(mon.get("lots") or 0): conn.execute("UPDATE trade_order_monitors SET lots=? WHERE id=?", (lots, mon["id"])) conn.commit() mark = ctp_get_tick_price(mode, sym) active = ctp_list_active_orders(mode) tick = _tick_size(sym) offset = "close_long" if direction == "long" else "close_short" placed: list[str] = [] skipped: list[str] = [] updates: dict[str, Optional[str]] = {} mid = int(mon.get("id") or 0) def _maybe_place(kind: str, price: Optional[float], stored_id: str) -> None: if price is None or price <= 0: return existing = _find_close_order( active, ths_code=sym, hold_direction=direction, price=price, tick=tick, ) if existing: updates[f"{kind}_vt_order_id"] = str(existing.get("order_id") or stored_id or "") return if stored_id and _order_still_active(active, stored_id) and not force: return if mid > 0 and not force and not _can_place_now(mid, kind): return if not _is_resting_exit_price(direction, kind, price, mark, tick): hint = f"{'止损' if kind == 'sl' else '止盈'} {price}" if mark: hint += f"(现价 {mark} 会立即成交)" skipped.append(hint) if not force: logger.info("SL/TP skip immediate fill monitor=%s %s mark=%s", mid, kind, mark) return try: _mark_place_attempt(mid, kind) result = execute_order( conn, mode=mode, offset=offset, symbol=sym, direction=direction, lots=lots, price=price, order_type="limit", ) except Exception as exc: logger.warning("SL/TP place %s monitor=%s failed: %s", kind, mid, exc) return oid = str(result.get("order_id") or "") if oid: updates[f"{kind}_vt_order_id"] = oid placed.append(f"{kind}@{price}") time.sleep(0.3) positions_after = ctp_list_positions(mode) if not _find_position(positions_after, sym, direction): cancel_monitor_exit_orders(conn, mon, mode=mode) reconcile_monitors_without_position(conn, mode) return sl_id = str(mon.get("sl_vt_order_id") or "") tp_id = str(mon.get("tp_vt_order_id") or "") _maybe_place("sl", sl_f, sl_id) if _find_position(ctp_list_positions(mode), sym, direction): _maybe_place("tp", tp_f, tp_id) if updates: sl_new = updates.get("sl_vt_order_id", mon.get("sl_vt_order_id")) tp_new = updates.get("tp_vt_order_id", mon.get("tp_vt_order_id")) conn.execute( "UPDATE trade_order_monitors SET sl_vt_order_id=?, tp_vt_order_id=? WHERE id=?", (sl_new, tp_new, mon["id"]), ) conn.commit() if not placed and not updates and not skipped: return {"ok": True, "message": "无需新委托", "placed": []} msg_parts = [] if placed: msg_parts.append("已提交: " + ", ".join(placed)) elif updates: msg_parts.append("委托已在柜台") if skipped: msg_parts.append("未挂单: " + "; ".join(skipped)) return {"ok": True, "message": ";".join(msg_parts), "placed": placed, "skipped": skipped} def monitor_order_status( mon: dict, *, mode: str, ths_code: str, direction: str, ) -> dict[str, bool]: """检查快照价位是否已有对应平仓挂单。""" sl = mon.get("stop_loss") if mon else None tp = mon.get("take_profit") if mon else None try: sl_f = float(sl) if sl is not None else None tp_f = float(tp) if tp is not None else None except (TypeError, ValueError): sl_f, tp_f = None, None if not ctp_status(mode).get("connected"): return { "sl_order_active": False, "tp_order_active": False, "needs_sl_order": sl_f is not None, "needs_tp_order": tp_f is not None, } active = ctp_list_active_orders(mode) tick = _tick_size(ths_code) sl_active = False tp_active = False if sl_f is not None: sl_active = _find_close_order( active, ths_code=ths_code, hold_direction=direction, price=sl_f, tick=tick, ) is not None if tp_f is not None: tp_active = _find_close_order( active, ths_code=ths_code, hold_direction=direction, price=tp_f, tick=tick, ) is not None return { "sl_order_active": sl_active, "tp_order_active": tp_active, "needs_sl_order": sl_f is not None and not sl_active, "needs_tp_order": tp_f is not None and not tp_active, } def sync_all_sl_tp_orders(conn, mode: str) -> int: """扫描全部 active 监控,为缺失的止盈止损自动挂单。返回新挂单数。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return 0 reconcile_monitors_without_position(conn, mode) placed_n = 0 rows = conn.execute( "SELECT * FROM trade_order_monitors WHERE status='active'" ).fetchall() for r in rows: mon = dict(r) st = monitor_order_status( mon, mode=mode, ths_code=mon.get("symbol") or "", direction=mon.get("direction") or "long", ) if not st.get("needs_sl_order") and not st.get("needs_tp_order"): continue if mon.get("stop_loss") is None and mon.get("take_profit") is None: continue try: res = place_monitor_exit_orders(conn, mon, mode=mode, force=False) placed_n += len(res.get("placed") or []) except Exception as exc: logger.warning("SL/TP auto place failed monitor=%s: %s", mon.get("id"), exc) return placed_n def start_sl_tp_guard_worker( *, db_path: str, get_mode_fn: Callable[[], str], init_tables_fn: Callable | None = None, interval: int = CHECK_INTERVAL_SEC, ) -> None: from db_conn import connect_db def _loop() -> None: time.sleep(8) while True: try: mode = get_mode_fn() if ctp_status(mode).get("connected"): conn = connect_db(db_path) try: if init_tables_fn: init_tables_fn(conn) reconcile_monitors_without_position(conn, mode) n = sync_all_sl_tp_orders(conn, mode) if n: logger.info("止盈止损守护: 新挂 %d 笔委托", n) finally: conn.close() except Exception as exc: logger.warning("sl_tp_guard worker: %s", exc) time.sleep(max(10, interval)) threading.Thread(target=_loop, daemon=True, name="sl-tp-guard").start()