From 50bb04e2bb2fd26b3ec8fd2c4fa89efb95edaa93 Mon Sep 17 00:00:00 2001 From: dekun Date: Fri, 3 Jul 2026 09:12:27 +0800 Subject: [PATCH] Fix stop-loss close loop spamming WeChat and blocking manual close. Throttle close retries, skip monitor revive while pending, and dedupe notifications when CTP already has a close order. Co-authored-by: Cursor --- modules/trading/install.py | 14 +++++ modules/trading/sl_tp_guard.py | 101 +++++++++++++++++++++++++++++++-- 2 files changed, 109 insertions(+), 6 deletions(-) diff --git a/modules/trading/install.py b/modules/trading/install.py index 3487c92..525175d 100644 --- a/modules/trading/install.py +++ b/modules/trading/install.py @@ -58,11 +58,14 @@ from modules.trading.order_pending import ( from modules.core.db_conn import commit_retry, execute_retry from modules.trading.sl_tp_guard import ( cancel_monitor_exit_orders, + close_pending_active, ensure_monitor_order_columns, + mark_close_pending, monitor_order_status, monitor_source_label, place_monitor_exit_orders, reconcile_monitors_without_position, + should_skip_monitor_revive, start_sl_tp_guard_worker, write_manual_close_trade_log, ) @@ -1195,6 +1198,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def _revive_closed_monitor(conn, symbol: str, direction: str) -> Optional[dict]: """柜台仍有持仓但本地监控被误关时,恢复最近一条同品种记录。""" + if should_skip_monitor_revive(symbol, direction): + return None direction = (direction or "long").strip().lower() for r in conn.execute( "SELECT * FROM trade_order_monitors WHERE status='closed' ORDER BY id DESC LIMIT 40" @@ -3215,6 +3220,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if not sym or price <= 0: conn.close() return jsonify({"ok": False, "error": "品种或价格无效"}), 400 + if close_pending_active(sym, direction): + conn.close() + return jsonify({"ok": False, "error": "平仓处理中,请稍候查看柜台委托"}), 400 + from modules.trading.sl_tp_guard import _has_pending_close_order + if _has_pending_close_order(mode, sym, direction): + mark_close_pending(sym, direction) + conn.close() + return jsonify({"ok": False, "error": "已有平仓委托在柜台排队,请勿重复提交"}), 400 offset = "close_long" if direction == "long" else "close_short" capital = _capital(conn) mon = None @@ -3253,6 +3266,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se lots=lots, price=price, settings=_settings_dict(), order_type="market", ) + mark_close_pending(sym, direction) # 始终写本地记录:CTP 同步依赖内存开平配对,重启后或成交回报延迟时会漏记 write_manual_close_trade_log( conn, diff --git a/modules/trading/sl_tp_guard.py b/modules/trading/sl_tp_guard.py index d475a4a..d3e1dba 100644 --- a/modules/trading/sl_tp_guard.py +++ b/modules/trading/sl_tp_guard.py @@ -36,11 +36,15 @@ TZ = ZoneInfo("Asia/Shanghai") CHECK_INTERVAL_SEC = 1 CLOSED_MARKET_SLEEP_SEC = 30 DISCONNECTED_SLEEP_SEC = 5 -PLACE_COOLDOWN_SEC = 3 +PLACE_COOLDOWN_SEC = 60 +CLOSE_PENDING_SEC = 180 +CLOSE_NOTIFY_COOLDOWN_SEC = 120 _last_close_attempt: dict[int, float] = {} _closing_monitors: set[int] = set() _closing_symbol_keys: set[str] = set() +_close_pending_until: dict[str, float] = {} +_last_close_notify: dict[str, float] = {} _closing_lock = threading.Lock() MONITOR_ORDER_COLUMNS = ( @@ -177,6 +181,76 @@ def _position_key(sym: str, direction: str) -> str: return f"{(sym or '').strip().lower()}|{(direction or 'long').strip().lower()}" +def mark_close_pending(sym: str, direction: str, *, secs: int = CLOSE_PENDING_SEC) -> None: + key = _position_key(sym, direction) + with _closing_lock: + _close_pending_until[key] = time.time() + max(30, int(secs)) + + +def clear_close_pending(sym: str, direction: str) -> None: + key = _position_key(sym, direction) + with _closing_lock: + _close_pending_until.pop(key, None) + + +def close_pending_active(sym: str, direction: str) -> bool: + key = _position_key(sym, direction) + with _closing_lock: + until = float(_close_pending_until.get(key) or 0) + if until > time.time(): + return True + if until: + _close_pending_until.pop(key, None) + return False + + +def should_skip_monitor_revive(sym: str, direction: str) -> bool: + return close_pending_active(sym, direction) + + +def _has_pending_close_order(mode: str, sym: str, hold_direction: str) -> bool: + close_dir = _close_order_direction(hold_direction) + try: + active = ctp_list_active_orders(mode) + except Exception: + return False + for o in active: + if not _match_symbol(o.get("symbol") or "", sym): + continue + offset_s = (o.get("offset") or "").upper() + if "CLOSE" not in offset_s: + continue + if (o.get("direction") or "").strip().lower() != close_dir: + continue + if int(o.get("lots") or 0) <= 0: + continue + return True + return False + + +def _notify_close_submitted( + notify_fn: Callable[[str], None] | None, + *, + sym: str, + direction: str, + lots: int, + mark: float, + result_label: str, +) -> None: + if not notify_fn: + return + key = _position_key(sym, direction) + now = time.time() + with _closing_lock: + if now - float(_last_close_notify.get(key) or 0) < CLOSE_NOTIFY_COOLDOWN_SEC: + return + _last_close_notify[key] = now + try: + notify_fn(f"{result_label} {sym} {direction} {lots}手 @{mark},平仓委托已提交") + except Exception as exc: + logger.debug("SL/TP notify failed: %s", exc) + + def _try_acquire_close_symbol(sym: str, direction: str) -> bool: key = _position_key(sym, direction) with _closing_lock: @@ -742,6 +816,8 @@ def _execute_local_close( ) -> None: sym = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() + if close_pending_active(sym, direction): + return positions = ctp_list_positions(mode) pos = _find_position(positions, sym, direction) if not pos: @@ -753,9 +829,14 @@ def _execute_local_close( float(margin_raw), ) return + clear_close_pending(sym, direction) _close_all_monitors_for_symbol(conn, sym, direction) reconcile_monitors_without_position(conn, mode) return + if _has_pending_close_order(mode, sym, direction): + mark_close_pending(sym, direction) + cancel_monitor_exit_orders(conn, mon, mode=mode) + return lots = int(pos.get("lots") or mon.get("lots") or 1) offset = "close_long" if direction == "long" else "close_short" cancel_monitor_exit_orders(conn, mon, mode=mode) @@ -769,6 +850,7 @@ def _execute_local_close( price=mark, order_type="market", ) + mark_close_pending(sym, direction) _close_all_monitors_for_symbol(conn, sym, direction) conn.commit() result_label = _result_for_close(mon, reason) @@ -776,11 +858,14 @@ def _execute_local_close( "止盈止损本地触发 monitor=%s result=%s %s %s %d手 @%s(待 CTP 成交同步写入交易记录)", mon.get("id"), result_label, sym, direction, lots, mark, ) - if notify_fn: - try: - notify_fn(f"{result_label} {sym} {direction} {lots}手 @{mark},平仓委托已提交") - except Exception as exc: - logger.debug("SL/TP notify failed: %s", exc) + _notify_close_submitted( + notify_fn, + sym=sym, + direction=direction, + lots=lots, + mark=mark, + result_label=result_label, + ) def check_sl_tp_on_tick( @@ -849,6 +934,8 @@ def check_sl_tp_on_tick( reason = "stop_loss" if not reason: continue + if close_pending_active(ms, direction): + continue if mid > 0 and not _can_close_now(mid): continue if not _try_acquire_close_symbol(ms, direction): @@ -931,6 +1018,8 @@ def check_monitors_locally( if not reason: continue + if close_pending_active(sym, direction): + continue if mid > 0 and not _can_close_now(mid): continue if not _try_acquire_close_symbol(sym, direction):