"""止盈止损守护:检测持仓快照,自动/手动向 CTP 挂平仓限价委托。""" from __future__ import annotations import logging import threading import time from typing import Any, Callable, Optional from contract_specs import get_contract_spec from symbols import ths_to_vnpy_symbol from vnpy_bridge import ( ctp_list_active_orders, ctp_list_positions, ctp_status, execute_order, ) logger = logging.getLogger(__name__) CHECK_INTERVAL_SEC = 20 MONITOR_ORDER_COLUMNS = ( "ALTER TABLE trade_order_monitors ADD COLUMN sl_vt_order_id TEXT", "ALTER TABLE trade_order_monitors ADD COLUMN tp_vt_order_id TEXT", ) def ensure_monitor_order_columns(conn) -> None: for sql in MONITOR_ORDER_COLUMNS: try: conn.execute(sql) except Exception: pass def _tick_size(ths_code: str) -> float: return float(get_contract_spec(ths_code).get("tick_size") or 1.0) def _match_symbol(ctp_sym: str, ths: str) -> bool: a = (ctp_sym or "").lower() b = (ths or "").lower() if a == b: return True try: vnpy_sym, _ = ths_to_vnpy_symbol(ths) return a == vnpy_sym.lower() except Exception: return False def _close_order_direction(hold_direction: str) -> str: return "short" if hold_direction == "long" else "long" def _price_near(a: float, b: float, tick: float) -> bool: return abs(float(a) - float(b)) <= max(tick * 0.501, 1e-9) def _find_close_order( active_orders: list[dict], *, ths_code: str, hold_direction: str, price: float, tick: float, ) -> Optional[dict]: close_dir = _close_order_direction(hold_direction) for o in active_orders: sym = o.get("symbol") or "" if not _match_symbol(sym, ths_code): continue offset_s = (o.get("offset") or "").upper() if "CLOSE" not in offset_s: continue if (o.get("direction") or "") != close_dir: continue if not _price_near(o.get("price") or 0, price, tick): continue return o return None def _find_position(positions: list[dict], ths_code: str, direction: str) -> Optional[dict]: for p in positions: if int(p.get("lots") or 0) <= 0: continue if (p.get("direction") or "long") != direction: continue if _match_symbol(p.get("symbol") or "", ths_code): return p return None def _order_still_active(active_orders: list[dict], vt_order_id: str) -> bool: if not vt_order_id: return False oid = str(vt_order_id).strip() for o in active_orders: if str(o.get("order_id") or "") == oid: return True return False def place_monitor_exit_orders( conn, mon: dict, *, mode: str, force: bool = False, ) -> dict[str, Any]: """按开仓快照中的止损/止盈价,向 CTP 挂平仓限价单(缺则补)。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return {"ok": False, "error": "CTP 未连接", "placed": []} sym = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() sl = mon.get("stop_loss") tp = mon.get("take_profit") try: sl_f = float(sl) if sl is not None else None tp_f = float(tp) if tp is not None else None except (TypeError, ValueError): sl_f, tp_f = None, None if sl_f is None and tp_f is None: return {"ok": False, "error": "快照无止盈止损,无法委托", "placed": []} positions = ctp_list_positions(mode) pos = _find_position(positions, sym, direction) if not pos: return {"ok": False, "error": "柜台无对应持仓", "placed": []} lots = int(pos.get("lots") or mon.get("lots") or 1) active = ctp_list_active_orders(mode) tick = _tick_size(sym) offset = "close_long" if direction == "long" else "close_short" placed: list[str] = [] updates: dict[str, Optional[str]] = {} def _maybe_place(kind: str, price: Optional[float], stored_id: str) -> None: if price is None or price <= 0: return existing = _find_close_order( active, ths_code=sym, hold_direction=direction, price=price, tick=tick, ) if existing: updates[f"{kind}_vt_order_id"] = str(existing.get("order_id") or stored_id or "") return if stored_id and _order_still_active(active, stored_id) and not force: return result = execute_order( conn, mode=mode, offset=offset, symbol=sym, direction=direction, lots=lots, price=price, order_type="limit", ) oid = str(result.get("order_id") or "") updates[f"{kind}_vt_order_id"] = oid placed.append(f"{kind}@{price}") sl_id = str(mon.get("sl_vt_order_id") or "") tp_id = str(mon.get("tp_vt_order_id") or "") _maybe_place("sl", sl_f, sl_id) _maybe_place("tp", tp_f, tp_id) if updates: sl_new = updates.get("sl_vt_order_id", mon.get("sl_vt_order_id")) tp_new = updates.get("tp_vt_order_id", mon.get("tp_vt_order_id")) conn.execute( "UPDATE trade_order_monitors SET sl_vt_order_id=?, tp_vt_order_id=? WHERE id=?", (sl_new, tp_new, mon["id"]), ) conn.commit() if not placed and not updates: return {"ok": True, "message": "无需新委托", "placed": []} msg = "已提交: " + ", ".join(placed) if placed else "委托已在柜台" return {"ok": True, "message": msg, "placed": placed} def monitor_order_status( mon: dict, *, mode: str, ths_code: str, direction: str, ) -> dict[str, bool]: """检查快照价位是否已有对应平仓挂单。""" sl = mon.get("stop_loss") if mon else None tp = mon.get("take_profit") if mon else None try: sl_f = float(sl) if sl is not None else None tp_f = float(tp) if tp is not None else None except (TypeError, ValueError): sl_f, tp_f = None, None if not ctp_status(mode).get("connected"): return { "sl_order_active": False, "tp_order_active": False, "needs_sl_order": sl_f is not None, "needs_tp_order": tp_f is not None, } active = ctp_list_active_orders(mode) tick = _tick_size(ths_code) sl_active = False tp_active = False if sl_f is not None: sl_active = _find_close_order( active, ths_code=ths_code, hold_direction=direction, price=sl_f, tick=tick, ) is not None if tp_f is not None: tp_active = _find_close_order( active, ths_code=ths_code, hold_direction=direction, price=tp_f, tick=tick, ) is not None return { "sl_order_active": sl_active, "tp_order_active": tp_active, "needs_sl_order": sl_f is not None and not sl_active, "needs_tp_order": tp_f is not None and not tp_active, } def sync_all_sl_tp_orders(conn, mode: str) -> int: """扫描全部 active 监控,为缺失的止盈止损自动挂单。返回新挂单数。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return 0 placed_n = 0 rows = conn.execute( "SELECT * FROM trade_order_monitors WHERE status='active'" ).fetchall() for r in rows: mon = dict(r) st = monitor_order_status( mon, mode=mode, ths_code=mon.get("symbol") or "", direction=mon.get("direction") or "long", ) if not st.get("needs_sl_order") and not st.get("needs_tp_order"): continue if mon.get("stop_loss") is None and mon.get("take_profit") is None: continue try: res = place_monitor_exit_orders(conn, mon, mode=mode, force=False) placed_n += len(res.get("placed") or []) except Exception as exc: logger.warning("SL/TP auto place failed monitor=%s: %s", mon.get("id"), exc) return placed_n def start_sl_tp_guard_worker( *, db_path: str, get_mode_fn: Callable[[], str], init_tables_fn: Callable | None = None, interval: int = CHECK_INTERVAL_SEC, ) -> None: from db_conn import connect_db def _loop() -> None: time.sleep(8) while True: try: mode = get_mode_fn() if ctp_status(mode).get("connected"): conn = connect_db(db_path) try: if init_tables_fn: init_tables_fn(conn) n = sync_all_sl_tp_orders(conn, mode) if n: logger.info("止盈止损守护: 新挂 %d 笔委托", n) finally: conn.close() except Exception as exc: logger.warning("sl_tp_guard worker: %s", exc) time.sleep(max(10, interval)) threading.Thread(target=_loop, daemon=True, name="sl-tp-guard").start()