Fix stop-loss close loop spamming WeChat and blocking manual close.

Throttle close retries, skip monitor revive while pending, and dedupe notifications when CTP already has a close order.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-03 09:12:27 +08:00
parent 6b2c7ade95
commit 50bb04e2bb
2 changed files with 109 additions and 6 deletions
+14
View File
@@ -58,11 +58,14 @@ from modules.trading.order_pending import (
from modules.core.db_conn import commit_retry, execute_retry from modules.core.db_conn import commit_retry, execute_retry
from modules.trading.sl_tp_guard import ( from modules.trading.sl_tp_guard import (
cancel_monitor_exit_orders, cancel_monitor_exit_orders,
close_pending_active,
ensure_monitor_order_columns, ensure_monitor_order_columns,
mark_close_pending,
monitor_order_status, monitor_order_status,
monitor_source_label, monitor_source_label,
place_monitor_exit_orders, place_monitor_exit_orders,
reconcile_monitors_without_position, reconcile_monitors_without_position,
should_skip_monitor_revive,
start_sl_tp_guard_worker, start_sl_tp_guard_worker,
write_manual_close_trade_log, write_manual_close_trade_log,
) )
@@ -1195,6 +1198,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
def _revive_closed_monitor(conn, symbol: str, direction: str) -> Optional[dict]: def _revive_closed_monitor(conn, symbol: str, direction: str) -> Optional[dict]:
"""柜台仍有持仓但本地监控被误关时,恢复最近一条同品种记录。""" """柜台仍有持仓但本地监控被误关时,恢复最近一条同品种记录。"""
if should_skip_monitor_revive(symbol, direction):
return None
direction = (direction or "long").strip().lower() direction = (direction or "long").strip().lower()
for r in conn.execute( for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='closed' ORDER BY id DESC LIMIT 40" "SELECT * FROM trade_order_monitors WHERE status='closed' ORDER BY id DESC LIMIT 40"
@@ -3215,6 +3220,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
if not sym or price <= 0: if not sym or price <= 0:
conn.close() conn.close()
return jsonify({"ok": False, "error": "品种或价格无效"}), 400 return jsonify({"ok": False, "error": "品种或价格无效"}), 400
if close_pending_active(sym, direction):
conn.close()
return jsonify({"ok": False, "error": "平仓处理中,请稍候查看柜台委托"}), 400
from modules.trading.sl_tp_guard import _has_pending_close_order
if _has_pending_close_order(mode, sym, direction):
mark_close_pending(sym, direction)
conn.close()
return jsonify({"ok": False, "error": "已有平仓委托在柜台排队,请勿重复提交"}), 400
offset = "close_long" if direction == "long" else "close_short" offset = "close_long" if direction == "long" else "close_short"
capital = _capital(conn) capital = _capital(conn)
mon = None mon = None
@@ -3253,6 +3266,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
lots=lots, price=price, settings=_settings_dict(), lots=lots, price=price, settings=_settings_dict(),
order_type="market", order_type="market",
) )
mark_close_pending(sym, direction)
# 始终写本地记录:CTP 同步依赖内存开平配对,重启后或成交回报延迟时会漏记 # 始终写本地记录:CTP 同步依赖内存开平配对,重启后或成交回报延迟时会漏记
write_manual_close_trade_log( write_manual_close_trade_log(
conn, conn,
+95 -6
View File
@@ -36,11 +36,15 @@ TZ = ZoneInfo("Asia/Shanghai")
CHECK_INTERVAL_SEC = 1 CHECK_INTERVAL_SEC = 1
CLOSED_MARKET_SLEEP_SEC = 30 CLOSED_MARKET_SLEEP_SEC = 30
DISCONNECTED_SLEEP_SEC = 5 DISCONNECTED_SLEEP_SEC = 5
PLACE_COOLDOWN_SEC = 3 PLACE_COOLDOWN_SEC = 60
CLOSE_PENDING_SEC = 180
CLOSE_NOTIFY_COOLDOWN_SEC = 120
_last_close_attempt: dict[int, float] = {} _last_close_attempt: dict[int, float] = {}
_closing_monitors: set[int] = set() _closing_monitors: set[int] = set()
_closing_symbol_keys: set[str] = set() _closing_symbol_keys: set[str] = set()
_close_pending_until: dict[str, float] = {}
_last_close_notify: dict[str, float] = {}
_closing_lock = threading.Lock() _closing_lock = threading.Lock()
MONITOR_ORDER_COLUMNS = ( MONITOR_ORDER_COLUMNS = (
@@ -177,6 +181,76 @@ def _position_key(sym: str, direction: str) -> str:
return f"{(sym or '').strip().lower()}|{(direction or 'long').strip().lower()}" return f"{(sym or '').strip().lower()}|{(direction or 'long').strip().lower()}"
def mark_close_pending(sym: str, direction: str, *, secs: int = CLOSE_PENDING_SEC) -> None:
key = _position_key(sym, direction)
with _closing_lock:
_close_pending_until[key] = time.time() + max(30, int(secs))
def clear_close_pending(sym: str, direction: str) -> None:
key = _position_key(sym, direction)
with _closing_lock:
_close_pending_until.pop(key, None)
def close_pending_active(sym: str, direction: str) -> bool:
key = _position_key(sym, direction)
with _closing_lock:
until = float(_close_pending_until.get(key) or 0)
if until > time.time():
return True
if until:
_close_pending_until.pop(key, None)
return False
def should_skip_monitor_revive(sym: str, direction: str) -> bool:
return close_pending_active(sym, direction)
def _has_pending_close_order(mode: str, sym: str, hold_direction: str) -> bool:
close_dir = _close_order_direction(hold_direction)
try:
active = ctp_list_active_orders(mode)
except Exception:
return False
for o in active:
if not _match_symbol(o.get("symbol") or "", sym):
continue
offset_s = (o.get("offset") or "").upper()
if "CLOSE" not in offset_s:
continue
if (o.get("direction") or "").strip().lower() != close_dir:
continue
if int(o.get("lots") or 0) <= 0:
continue
return True
return False
def _notify_close_submitted(
notify_fn: Callable[[str], None] | None,
*,
sym: str,
direction: str,
lots: int,
mark: float,
result_label: str,
) -> None:
if not notify_fn:
return
key = _position_key(sym, direction)
now = time.time()
with _closing_lock:
if now - float(_last_close_notify.get(key) or 0) < CLOSE_NOTIFY_COOLDOWN_SEC:
return
_last_close_notify[key] = now
try:
notify_fn(f"{result_label} {sym} {direction} {lots}手 @{mark},平仓委托已提交")
except Exception as exc:
logger.debug("SL/TP notify failed: %s", exc)
def _try_acquire_close_symbol(sym: str, direction: str) -> bool: def _try_acquire_close_symbol(sym: str, direction: str) -> bool:
key = _position_key(sym, direction) key = _position_key(sym, direction)
with _closing_lock: with _closing_lock:
@@ -742,6 +816,8 @@ def _execute_local_close(
) -> None: ) -> None:
sym = (mon.get("symbol") or "").strip() sym = (mon.get("symbol") or "").strip()
direction = (mon.get("direction") or "long").strip().lower() direction = (mon.get("direction") or "long").strip().lower()
if close_pending_active(sym, direction):
return
positions = ctp_list_positions(mode) positions = ctp_list_positions(mode)
pos = _find_position(positions, sym, direction) pos = _find_position(positions, sym, direction)
if not pos: if not pos:
@@ -753,9 +829,14 @@ def _execute_local_close(
float(margin_raw), float(margin_raw),
) )
return return
clear_close_pending(sym, direction)
_close_all_monitors_for_symbol(conn, sym, direction) _close_all_monitors_for_symbol(conn, sym, direction)
reconcile_monitors_without_position(conn, mode) reconcile_monitors_without_position(conn, mode)
return return
if _has_pending_close_order(mode, sym, direction):
mark_close_pending(sym, direction)
cancel_monitor_exit_orders(conn, mon, mode=mode)
return
lots = int(pos.get("lots") or mon.get("lots") or 1) lots = int(pos.get("lots") or mon.get("lots") or 1)
offset = "close_long" if direction == "long" else "close_short" offset = "close_long" if direction == "long" else "close_short"
cancel_monitor_exit_orders(conn, mon, mode=mode) cancel_monitor_exit_orders(conn, mon, mode=mode)
@@ -769,6 +850,7 @@ def _execute_local_close(
price=mark, price=mark,
order_type="market", order_type="market",
) )
mark_close_pending(sym, direction)
_close_all_monitors_for_symbol(conn, sym, direction) _close_all_monitors_for_symbol(conn, sym, direction)
conn.commit() conn.commit()
result_label = _result_for_close(mon, reason) result_label = _result_for_close(mon, reason)
@@ -776,11 +858,14 @@ def _execute_local_close(
"止盈止损本地触发 monitor=%s result=%s %s %s %d手 @%s(待 CTP 成交同步写入交易记录)", "止盈止损本地触发 monitor=%s result=%s %s %s %d手 @%s(待 CTP 成交同步写入交易记录)",
mon.get("id"), result_label, sym, direction, lots, mark, mon.get("id"), result_label, sym, direction, lots, mark,
) )
if notify_fn: _notify_close_submitted(
try: notify_fn,
notify_fn(f"{result_label} {sym} {direction} {lots}手 @{mark},平仓委托已提交") sym=sym,
except Exception as exc: direction=direction,
logger.debug("SL/TP notify failed: %s", exc) lots=lots,
mark=mark,
result_label=result_label,
)
def check_sl_tp_on_tick( def check_sl_tp_on_tick(
@@ -849,6 +934,8 @@ def check_sl_tp_on_tick(
reason = "stop_loss" reason = "stop_loss"
if not reason: if not reason:
continue continue
if close_pending_active(ms, direction):
continue
if mid > 0 and not _can_close_now(mid): if mid > 0 and not _can_close_now(mid):
continue continue
if not _try_acquire_close_symbol(ms, direction): if not _try_acquire_close_symbol(ms, direction):
@@ -931,6 +1018,8 @@ def check_monitors_locally(
if not reason: if not reason:
continue continue
if close_pending_active(sym, direction):
continue
if mid > 0 and not _can_close_now(mid): if mid > 0 and not _can_close_now(mid):
continue continue
if not _try_acquire_close_symbol(sym, direction): if not _try_acquire_close_symbol(sym, direction):