diff --git a/install_trading.py b/install_trading.py index 5c9b73e..6469ff3 100644 --- a/install_trading.py +++ b/install_trading.py @@ -27,6 +27,7 @@ from sl_tp_guard import ( ensure_monitor_order_columns, monitor_order_status, place_monitor_exit_orders, + reconcile_monitors_without_position, start_sl_tp_guard_worker, sync_all_sl_tp_orders, ) @@ -158,18 +159,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se return False def _sync_trade_monitors_with_ctp(conn, mode: str) -> int: - """关闭无对应 CTP 持仓的 active 监控(委托被拒或未成交的幽灵记录)。""" - if not ctp_status(mode).get("connected"): - return 0 - position_keys = _ctp_position_keys(mode) - closed = 0 - for r in conn.execute("SELECT * FROM trade_order_monitors WHERE status='active'").fetchall(): - mon = dict(r) - if _monitor_matches_ctp_position(mon, position_keys): - continue - conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mon["id"],)) - closed += 1 - return closed + """关闭无对应 CTP 持仓的监控,并撤销残留止盈止损挂单。""" + return reconcile_monitors_without_position(conn, mode) def _effective_active_position_count(conn, mode: str) -> int: if ctp_status(mode).get("connected"): diff --git a/sl_tp_guard.py b/sl_tp_guard.py index 9b2a869..12362da 100644 --- a/sl_tp_guard.py +++ b/sl_tp_guard.py @@ -9,6 +9,8 @@ from typing import Any, Callable, Optional from contract_specs import get_contract_spec from ctp_symbol import ths_to_vnpy_symbol from vnpy_bridge import ( + ctp_cancel_order, + ctp_get_tick_price, ctp_list_active_orders, ctp_list_positions, ctp_status, @@ -60,6 +62,26 @@ def _price_near(a: float, b: float, tick: float) -> bool: return abs(float(a) - float(b)) <= max(tick * 0.501, 1e-9) +def _is_resting_exit_price( + hold_direction: str, + kind: str, + exit_price: float, + mark: Optional[float], + tick: float, +) -> bool: + """限价平仓单是否会挂在盘口(而非立即成交)。""" + if mark is None or mark <= 0: + return True + buf = max(tick * 0.5, 1e-9) + if hold_direction == "long": + if kind == "sl": + return exit_price < mark - buf + return exit_price > mark + buf + if kind == "sl": + return exit_price > mark + buf + return exit_price < mark - buf + + def _find_close_order( active_orders: list[dict], *, @@ -114,6 +136,95 @@ def _order_still_active(active_orders: list[dict], vt_order_id: str) -> bool: return False +def cancel_monitor_exit_orders( + conn, + mon: dict, + *, + mode: str, +) -> int: + """撤销该监控对应的止盈止损平仓挂单。""" + ensure_monitor_order_columns(conn) + if not ctp_status(mode).get("connected"): + return 0 + sym = (mon.get("symbol") or "").strip() + direction = (mon.get("direction") or "long").strip().lower() + tick = _tick_size(sym) + active = ctp_list_active_orders(mode) + cancelled = 0 + seen: set[str] = set() + + def _try_cancel(vt_id: str) -> None: + nonlocal cancelled + oid = str(vt_id or "").strip() + if not oid or oid in seen: + return + seen.add(oid) + if ctp_cancel_order(mode, oid): + cancelled += 1 + + for kind, price_key in (("sl", "stop_loss"), ("tp", "take_profit")): + raw = mon.get(price_key) + try: + px = float(raw) if raw is not None else None + except (TypeError, ValueError): + px = None + stored = str(mon.get(f"{kind}_vt_order_id") or "") + if stored: + _try_cancel(stored) + if px is not None: + found = _find_close_order( + active, ths_code=sym, hold_direction=direction, price=px, tick=tick, + ) + if found: + _try_cancel(str(found.get("order_id") or "")) + + if cancelled: + conn.execute( + "UPDATE trade_order_monitors SET sl_vt_order_id=NULL, tp_vt_order_id=NULL WHERE id=?", + (mon["id"],), + ) + conn.commit() + return cancelled + + +def reconcile_monitors_without_position(conn, mode: str) -> int: + """持仓已平时:关闭监控并撤销残留止盈止损挂单。""" + if not ctp_status(mode).get("connected"): + return 0 + positions = ctp_list_positions(mode) + position_keys: set[tuple[str, str]] = set() + for p in positions: + if int(p.get("lots") or 0) <= 0: + continue + sym = (p.get("symbol") or "").lower() + direction = p.get("direction") or "long" + position_keys.add((sym, direction)) + + closed = 0 + for r in conn.execute("SELECT * FROM trade_order_monitors WHERE status='active'").fetchall(): + mon = dict(r) + ms = mon.get("symbol") or "" + md = mon.get("direction") or "long" + matched = False + for ps, pd in position_keys: + if pd != md: + continue + if _match_symbol(ps, ms): + matched = True + break + if matched: + continue + try: + cancel_monitor_exit_orders(conn, mon, mode=mode) + except Exception as exc: + logger.warning("cancel exit orders monitor=%s: %s", mon.get("id"), exc) + conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mon["id"],)) + closed += 1 + if closed: + conn.commit() + return closed + + def place_monitor_exit_orders( conn, mon: dict, @@ -142,13 +253,20 @@ def place_monitor_exit_orders( positions = ctp_list_positions(mode) pos = _find_position(positions, sym, direction) if not pos: - return {"ok": False, "error": "柜台无对应持仓", "placed": []} + reconcile_monitors_without_position(conn, mode) + return {"ok": False, "error": "柜台无对应持仓(可能已被止盈/止损平掉)", "placed": []} - lots = int(pos.get("lots") or mon.get("lots") or 1) + lots = int(pos.get("lots") or 1) + if lots != int(mon.get("lots") or 0): + conn.execute("UPDATE trade_order_monitors SET lots=? WHERE id=?", (lots, mon["id"])) + conn.commit() + + mark = ctp_get_tick_price(mode, sym) active = ctp_list_active_orders(mode) tick = _tick_size(sym) offset = "close_long" if direction == "long" else "close_short" placed: list[str] = [] + skipped: list[str] = [] updates: dict[str, Optional[str]] = {} mid = int(mon.get("id") or 0) @@ -166,6 +284,14 @@ def place_monitor_exit_orders( return if mid > 0 and not force and not _can_place_now(mid, kind): return + if not _is_resting_exit_price(direction, kind, price, mark, tick): + hint = f"{'止损' if kind == 'sl' else '止盈'} {price}" + if mark: + hint += f"(现价 {mark} 会立即成交)" + skipped.append(hint) + if not force: + logger.info("SL/TP skip immediate fill monitor=%s %s mark=%s", mid, kind, mark) + return try: _mark_place_attempt(mid, kind) result = execute_order( @@ -185,11 +311,18 @@ def place_monitor_exit_orders( if oid: updates[f"{kind}_vt_order_id"] = oid placed.append(f"{kind}@{price}") + time.sleep(0.3) + positions_after = ctp_list_positions(mode) + if not _find_position(positions_after, sym, direction): + cancel_monitor_exit_orders(conn, mon, mode=mode) + reconcile_monitors_without_position(conn, mode) + return sl_id = str(mon.get("sl_vt_order_id") or "") tp_id = str(mon.get("tp_vt_order_id") or "") _maybe_place("sl", sl_f, sl_id) - _maybe_place("tp", tp_f, tp_id) + if _find_position(ctp_list_positions(mode), sym, direction): + _maybe_place("tp", tp_f, tp_id) if updates: sl_new = updates.get("sl_vt_order_id", mon.get("sl_vt_order_id")) @@ -200,10 +333,16 @@ def place_monitor_exit_orders( ) conn.commit() - if not placed and not updates: + if not placed and not updates and not skipped: return {"ok": True, "message": "无需新委托", "placed": []} - msg = "已提交: " + ", ".join(placed) if placed else "委托已在柜台" - return {"ok": True, "message": msg, "placed": placed} + msg_parts = [] + if placed: + msg_parts.append("已提交: " + ", ".join(placed)) + elif updates: + msg_parts.append("委托已在柜台") + if skipped: + msg_parts.append("未挂单: " + "; ".join(skipped)) + return {"ok": True, "message": ";".join(msg_parts), "placed": placed, "skipped": skipped} def monitor_order_status( @@ -256,6 +395,7 @@ def sync_all_sl_tp_orders(conn, mode: str) -> int: ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return 0 + reconcile_monitors_without_position(conn, mode) placed_n = 0 rows = conn.execute( "SELECT * FROM trade_order_monitors WHERE status='active'" @@ -296,6 +436,7 @@ def start_sl_tp_guard_worker( try: if init_tables_fn: init_tables_fn(conn) + reconcile_monitors_without_position(conn, mode) n = sync_all_sl_tp_orders(conn, mode) if n: logger.info("止盈止损守护: 新挂 %d 笔委托", n) diff --git a/static/js/trade.js b/static/js/trade.js index 0757d72..bd5c3bd 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -442,7 +442,9 @@ .then(function (r) { return r.json(); }) .then(function (d) { if (!d.ok) throw new Error(d.error || d.message || '委托失败'); - alert(d.message || '委托已提交'); + var msg = d.message || '委托已提交'; + if (d.skipped && d.skipped.length) msg += '\n' + d.skipped.join('\n'); + alert(msg); pollPositions(); }) .catch(function (e) { diff --git a/vnpy_bridge.py b/vnpy_bridge.py index b0d97f0..2c51f6a 100644 --- a/vnpy_bridge.py +++ b/vnpy_bridge.py @@ -831,6 +831,21 @@ class CtpBridge: raise RuntimeError("CTP 拒单或未返回委托号(请检查合约代码、价格是否为最小变动价位整数倍)") return str(vt_orderid) + def cancel_order(self, vt_orderid: str) -> bool: + if not self._engine or not vt_orderid: + return False + try: + order = self._engine.get_order(vt_orderid) + if order is None: + return False + req = order.create_cancel_request() + self._engine.cancel_order(req, GATEWAY_NAME) + logger.info("CTP 撤单 %s", vt_orderid) + return True + except Exception as exc: + logger.warning("CTP 撤单失败 %s: %s", vt_orderid, exc) + return False + def get_bridge() -> CtpBridge: global _bridge @@ -904,6 +919,12 @@ def ctp_list_active_orders(mode: str) -> list[dict[str, Any]]: return b.list_active_orders() +def ctp_cancel_order(mode: str, vt_orderid: str) -> bool: + b = get_bridge() + b.ensure_connected(mode) + return b.cancel_order(vt_orderid) + + def ctp_get_tick_price(mode: str, ths_code: str) -> Optional[float]: """CTP 柜台最新价(需已连接并订阅)。""" b = get_bridge()