# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """止盈止损守护:程序本地监控价位,触发后向 CTP 发平仓单(不向交易所挂 SL/TP 限价单)。""" from __future__ import annotations import logging import threading import time from datetime import datetime from typing import Any, Callable, Optional from zoneinfo import ZoneInfo from contract_specs import calc_position_metrics from ctp_symbol import ths_to_vnpy_symbol from fee_specs import calc_round_trip_fee from trade_log_lib import calc_equity_after, refresh_trade_log_equity_chain from market_sessions import is_trading_session from symbols import ths_to_codes from vnpy_bridge import ( ctp_cancel_order, ctp_get_tick_price, ctp_list_active_orders, ctp_list_positions, ctp_status, ctp_account_margin_used, execute_order, get_bridge, ) logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") CHECK_INTERVAL_SEC = 1 CLOSED_MARKET_SLEEP_SEC = 30 DISCONNECTED_SLEEP_SEC = 5 PLACE_COOLDOWN_SEC = 3 _last_close_attempt: dict[int, float] = {} _closing_monitors: set[int] = set() _closing_symbol_keys: set[str] = set() _closing_lock = threading.Lock() MONITOR_ORDER_COLUMNS = ( "ALTER TABLE trade_order_monitors ADD COLUMN sl_vt_order_id TEXT", "ALTER TABLE trade_order_monitors ADD COLUMN tp_vt_order_id TEXT", "ALTER TABLE trade_order_monitors ADD COLUMN trailing_be INTEGER DEFAULT 0", "ALTER TABLE trade_order_monitors ADD COLUMN initial_stop_loss REAL", "ALTER TABLE trade_order_monitors ADD COLUMN trailing_r_locked INTEGER DEFAULT 0", "ALTER TABLE trade_order_monitors ADD COLUMN margin REAL", "ALTER TABLE trade_order_monitors ADD COLUMN position_pct REAL", "ALTER TABLE trade_order_monitors ADD COLUMN mark_price REAL", "ALTER TABLE trade_order_monitors ADD COLUMN float_pnl REAL", "ALTER TABLE trade_order_monitors ADD COLUMN vt_order_id TEXT", "ALTER TABLE trade_order_monitors ADD COLUMN order_price REAL", "ALTER TABLE trade_order_monitors ADD COLUMN open_fee REAL", ) TRADE_RESULTS = ("止损", "止盈", "移动止盈", "保本止盈", "手动平仓") def ensure_monitor_order_columns(conn) -> None: for sql in MONITOR_ORDER_COLUMNS: try: conn.execute(sql) except Exception: pass def _tick_size(ths_code: str) -> float: from contract_specs import get_contract_spec return float(get_contract_spec(ths_code).get("tick_size") or 1.0) def _match_symbol(ctp_sym: str, ths: str) -> bool: a = (ctp_sym or "").lower() b = (ths or "").lower() if a == b: return True if a and b and a.split(".")[0] == b.split(".")[0]: return True try: vnpy_sym, _ = ths_to_vnpy_symbol(ths) if a == vnpy_sym.lower(): return True except Exception: pass try: vnpy_sym, _ = ths_to_vnpy_symbol(ctp_sym) if vnpy_sym.lower() == b.split(".")[0]: return True except Exception: pass return False def _close_order_direction(hold_direction: str) -> str: return "short" if hold_direction == "long" else "long" def _price_near(a: float, b: float, tick: float) -> bool: return abs(float(a) - float(b)) <= max(tick * 0.501, 1e-9) def _find_close_order( active_orders: list[dict], *, ths_code: str, hold_direction: str, price: float, tick: float, ) -> Optional[dict]: close_dir = _close_order_direction(hold_direction) for o in active_orders: sym = o.get("symbol") or "" if not _match_symbol(sym, ths_code): continue offset_s = (o.get("offset") or "").upper() if "CLOSE" not in offset_s: continue if (o.get("direction") or "") != close_dir: continue if not _price_near(o.get("price") or 0, price, tick): continue return o return None def _find_position(positions: list[dict], ths_code: str, direction: str) -> Optional[dict]: for p in positions: if int(p.get("lots") or 0) <= 0: continue if (p.get("direction") or "long") != direction: continue if _match_symbol(p.get("symbol") or "", ths_code): return p return None def _position_key(sym: str, direction: str) -> str: return f"{(sym or '').strip().lower()}|{(direction or 'long').strip().lower()}" def _try_acquire_close_symbol(sym: str, direction: str) -> bool: key = _position_key(sym, direction) with _closing_lock: if key in _closing_symbol_keys: return False _closing_symbol_keys.add(key) return True def _release_close_symbol(sym: str, direction: str) -> None: key = _position_key(sym, direction) with _closing_lock: _closing_symbol_keys.discard(key) def _close_all_monitors_for_symbol(conn, sym: str, direction: str) -> None: direction = (direction or "long").strip().lower() for r in conn.execute( "SELECT id, symbol, direction FROM trade_order_monitors WHERE status='active'" ).fetchall(): if (r["direction"] or "long") != direction: continue if _match_symbol(sym, r["symbol"] or ""): conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", (r["id"],), ) def _dedupe_active_monitors(conn) -> None: """同一品种方向只保留一条 active 监控,避免重复触发平仓。""" groups: dict[str, list[dict]] = {} for r in conn.execute( "SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id ASC" ).fetchall(): row = dict(r) key = _position_key(row.get("symbol") or "", row.get("direction") or "long") groups.setdefault(key, []).append(row) for items in groups.values(): if len(items) <= 1: continue def _keep_score(m: dict) -> tuple: mt = (m.get("monitor_type") or "").lower() score = 0 if mt != "ctp_sync": score += 10 if m.get("stop_loss") is not None: score += 5 return (score, int(m.get("id") or 0)) items.sort(key=_keep_score, reverse=True) for dup in items[1:]: conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", (dup["id"],), ) def _can_close_now(monitor_id: int, *, cooldown: int = PLACE_COOLDOWN_SEC) -> bool: last = _last_close_attempt.get(monitor_id, 0.0) return (time.time() - last) >= cooldown def _mark_close_attempt(monitor_id: int) -> None: _last_close_attempt[monitor_id] = time.time() def _try_acquire_close(monitor_id: int) -> bool: with _closing_lock: if monitor_id in _closing_monitors: return False _closing_monitors.add(monitor_id) return True def _release_close(monitor_id: int) -> None: with _closing_lock: _closing_monitors.discard(monitor_id) def monitor_source_label(raw: str) -> str: """持仓展示用来源文案。""" mapping = { "manual": "期货下单", "trend": "趋势回调", "roll": "顺势加仓", "ctp_sync": "CTP 柜台", "箱体突破": "箱体突破", "收敛突破": "收敛突破", } key = (raw or "manual").strip().lower() return mapping.get(key, raw or "期货下单") def _result_for_close(mon: dict, reason: str) -> str: """平仓结果:止损 / 止盈 / 移动止盈 / 保本止盈 / 手动平仓。""" if reason == "manual": return "手动平仓" if reason == "take_profit": return "止盈" if not mon.get("trailing_be"): return "止损" locked = int(mon.get("trailing_r_locked") or 0) if locked >= 2: return "移动止盈" if locked >= 1: return "保本止盈" return "止损" def write_trade_log( conn, *, symbol: str, direction: str, entry_price: float, close_price: float, lots: float, result: str, trading_mode: str, stop_loss: Optional[float] = None, take_profit: Optional[float] = None, open_time: str = "", symbol_name: str = "", market_code: str = "", sina_code: str = "", monitor_type: str = "期货下单", capital: float = 0.0, ) -> None: """写入 trade_logs(程序平仓 / 手动平仓)。""" sym = (symbol or "").strip() direction = (direction or "long").strip().lower() entry = float(entry_price or close_price) sl = float(stop_loss) if stop_loss is not None else entry tp = float(take_profit) if take_profit is not None else entry close_time = datetime.now(TZ).strftime("%Y-%m-%dT%H:%M") if not sina_code or not market_code: codes = ths_to_codes(sym) or {} sina_code = sina_code or codes.get("sina_code") or "" market_code = market_code or codes.get("market_code") or "" if not symbol_name: symbol_name = sym metrics = calc_position_metrics( direction, entry, sl, tp, lots, close_price, capital, sym, ) pnl = metrics.get("float_pnl") or 0.0 fee = calc_round_trip_fee( sym, entry, close_price, lots, open_time, close_time, trading_mode=trading_mode, ) pnl_net = round(pnl - fee, 2) margin_pct = metrics.get("position_pct") equity_after = calc_equity_after(capital, pnl_net) try: from app import holding_to_minutes minutes = holding_to_minutes(open_time, close_time) except Exception: minutes = 0 conn.execute( """INSERT INTO trade_logs (symbol, symbol_name, market_code, sina_code, monitor_type, direction, entry_price, stop_loss, take_profit, close_price, lots, margin, margin_pct, holding_minutes, open_time, close_time, pnl, fee, pnl_net, equity_after, result) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( sym, symbol_name, market_code, sina_code, monitor_type, direction, entry, stop_loss if stop_loss is not None else sl, take_profit if take_profit is not None else tp, close_price, lots, metrics.get("margin"), margin_pct, minutes, open_time, close_time, pnl, fee, pnl_net, equity_after, result if result in TRADE_RESULTS else "手动平仓", ), ) try: refresh_trade_log_equity_chain(conn, capital if capital > 0 else None) except Exception as exc: logger.debug("equity chain refresh after trade log: %s", exc) try: from stats_engine import refresh_stats_cache refresh_stats_cache(conn, capital) except Exception as exc: logger.debug("stats refresh after close: %s", exc) try: from trade_notify import notify_trade_log_close from trading_context import trading_mode_label from app import get_setting, send_wechat_msg from ai_worker import schedule_ai_event_analysis from db_conn import DB_PATH notify_trade_log_close( send_wechat=send_wechat_msg, get_setting=get_setting, mode_label=trading_mode_label(get_setting), capital=capital, sym=sym, symbol_name=symbol_name, direction=direction, entry=entry, close_price=close_price, sl=stop_loss if stop_loss is not None else None, tp=take_profit if take_profit is not None else None, lots=lots, pnl_net=pnl_net, equity_after=equity_after, holding_minutes=minutes, result=result, monitor_type=monitor_type, schedule_ai_fn=schedule_ai_event_analysis, db_path=DB_PATH, ) except Exception as exc: logger.debug("close notify: %s", exc) def _write_trade_log( conn, mon: dict, *, close_price: float, reason: str, trading_mode: str, capital: float = 0.0, ) -> None: sym = (mon.get("symbol") or "").strip() sl_raw = mon.get("stop_loss") tp_raw = mon.get("take_profit") initial_sl = mon.get("initial_stop_loss") write_trade_log( conn, symbol=sym, direction=mon.get("direction") or "long", entry_price=float(mon.get("entry_price") or close_price), close_price=close_price, lots=float(mon.get("lots") or 1), result=_result_for_close(mon, reason), trading_mode=trading_mode, stop_loss=float(initial_sl) if initial_sl is not None else ( float(sl_raw) if sl_raw is not None else None ), take_profit=float(tp_raw) if tp_raw is not None else None, open_time=(mon.get("open_time") or "").strip(), symbol_name=mon.get("symbol_name") or sym, market_code=mon.get("market_code") or "", monitor_type=monitor_source_label(mon.get("monitor_type") or ""), capital=capital, ) def write_manual_close_trade_log( conn, mon: Optional[dict], *, symbol: str, direction: str, lots: float, close_price: float, entry_price: float, trading_mode: str, capital: float = 0.0, stop_loss: Optional[float] = None, take_profit: Optional[float] = None, open_time: str = "", symbol_name: str = "", market_code: str = "", ) -> None: """程序内点击平仓按钮 → 手动平仓。""" if mon: write_trade_log( conn, symbol=(mon.get("symbol") or symbol).strip(), direction=mon.get("direction") or direction, entry_price=float(mon.get("entry_price") or entry_price), close_price=close_price, lots=float(mon.get("lots") or lots), result="手动平仓", trading_mode=trading_mode, stop_loss=float(mon["initial_stop_loss"]) if mon.get("initial_stop_loss") is not None else ( float(mon["stop_loss"]) if mon.get("stop_loss") is not None else stop_loss ), take_profit=float(mon["take_profit"]) if mon.get("take_profit") is not None else take_profit, open_time=(mon.get("open_time") or open_time).strip(), symbol_name=mon.get("symbol_name") or symbol_name, market_code=mon.get("market_code") or market_code, monitor_type=monitor_source_label(mon.get("monitor_type") or ""), capital=capital, ) return write_trade_log( conn, symbol=symbol, direction=direction, entry_price=entry_price, close_price=close_price, lots=lots, result="手动平仓", trading_mode=trading_mode, stop_loss=stop_loss, take_profit=take_profit, open_time=open_time, symbol_name=symbol_name, market_code=market_code, capital=capital, ) def _update_trailing_stop_loss( conn, mon: dict, mark: float, *, be_tick_mult: int, ) -> dict: """达 1R 移保本(开仓±N跳),达 2R 移 1R,依次类推。""" if not mon.get("trailing_be"): return mon entry = float(mon.get("entry_price") or 0) initial_sl = mon.get("initial_stop_loss") if initial_sl is None: initial_sl = mon.get("stop_loss") try: initial_sl_f = float(initial_sl) if initial_sl is not None else None except (TypeError, ValueError): return mon if not entry or initial_sl_f is None: return mon direction = (mon.get("direction") or "long").strip().lower() sym = (mon.get("symbol") or "").strip() tick = _tick_size(sym) r = abs(entry - initial_sl_f) if r < tick * 0.5: return mon profit_r = (mark - entry) / r if direction == "long" else (entry - mark) / r if profit_r < 1.0: return mon level = int(profit_r) locked = int(mon.get("trailing_r_locked") or 0) if level <= locked: return mon if level == 1: new_sl = entry + be_tick_mult * tick if direction == "long" else entry - be_tick_mult * tick else: new_sl = entry + (level - 1) * r if direction == "long" else entry - (level - 1) * r new_sl = round(new_sl, 4) try: current_sl = float(mon.get("stop_loss") or 0) except (TypeError, ValueError): current_sl = 0.0 if direction == "long" and new_sl <= current_sl + tick * 0.01: return mon if direction == "short" and new_sl >= current_sl - tick * 0.01: return mon mid = mon.get("id") conn.execute( "UPDATE trade_order_monitors SET stop_loss=?, trailing_r_locked=? WHERE id=?", (new_sl, level, mid), ) conn.commit() mon["stop_loss"] = new_sl mon["trailing_r_locked"] = level logger.info("移动保本 monitor=%s %dR 止损→%s", mid, level, new_sl) return mon def _sl_triggered(direction: str, sl: float, mark: float, tick: float) -> bool: buf = max(tick * 0.01, 1e-9) if direction == "long": return mark <= sl + buf return mark >= sl - buf def _tp_triggered(direction: str, tp: float, mark: float, tick: float) -> bool: buf = max(tick * 0.01, 1e-9) if direction == "long": return mark >= tp - buf return mark <= tp + buf def cancel_monitor_exit_orders( conn, mon: dict, *, mode: str, ) -> int: """撤销该监控在交易所残留的旧版止盈止损平仓挂单。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return 0 sym = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() tick = _tick_size(sym) active = ctp_list_active_orders(mode) cancelled = 0 seen: set[str] = set() def _try_cancel(vt_id: str) -> None: nonlocal cancelled oid = str(vt_id or "").strip() if not oid or oid in seen: return seen.add(oid) if ctp_cancel_order(mode, oid): cancelled += 1 for kind, price_key in (("sl", "stop_loss"), ("tp", "take_profit")): raw = mon.get(price_key) try: px = float(raw) if raw is not None else None except (TypeError, ValueError): px = None stored = str(mon.get(f"{kind}_vt_order_id") or "") if stored: _try_cancel(stored) if px is not None: found = _find_close_order( active, ths_code=sym, hold_direction=direction, price=px, tick=tick, ) if found: _try_cancel(str(found.get("order_id") or "")) if cancelled: conn.execute( "UPDATE trade_order_monitors SET sl_vt_order_id=NULL, tp_vt_order_id=NULL WHERE id=?", (mon["id"],), ) conn.commit() return cancelled def reconcile_monitors_without_position(conn, mode: str, *, grace_sec: int = 120) -> int: """持仓已平时:关闭监控并撤销残留止盈止损挂单(新开仓 grace_sec 内不清理)。""" if not ctp_status(mode).get("connected"): return 0 try: bridge = get_bridge() since_connect = time.time() - float(getattr(bridge, "_last_connect_ok_ts", 0) or 0) if since_connect < 90: return 0 except Exception: pass positions = ctp_list_positions(mode, refresh_if_empty=False, refresh_margin=False) position_keys: set[tuple[str, str]] = set() for p in positions: if int(p.get("lots") or 0) <= 0: continue sym = (p.get("symbol") or "").lower() direction = p.get("direction") or "long" position_keys.add((sym, direction)) try: from ctp_trading_state import trading_state for p in trading_state.get_positions() or []: lots = int(p.get("lots") or 0) if lots <= 0: continue sym = (p.get("symbol") or "").lower() direction = p.get("direction") or "long" position_keys.add((sym, direction)) except Exception: pass margin_raw = ctp_account_margin_used(mode) if margin_raw is None: return 0 margin_used = float(margin_raw or 0.0) if not position_keys: if margin_used > 0: return 0 try: bridge = get_bridge() since_connect = time.time() - float(getattr(bridge, "_last_connect_ok_ts", 0) or 0) if since_connect < 180: return 0 except Exception: return 0 now_ts = time.time() def _monitor_within_grace(mon: dict) -> bool: raw = (mon.get("open_time") or mon.get("created_at") or "").strip() if not raw: return True for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"): try: dt = datetime.strptime(raw[:19], fmt) if (now_ts - dt.timestamp()) <= grace_sec: return True except ValueError: continue return False closed = 0 for r in conn.execute("SELECT * FROM trade_order_monitors WHERE status='active'").fetchall(): mon = dict(r) if _monitor_within_grace(mon): continue ms = mon.get("symbol") or "" md = mon.get("direction") or "long" matched = False for ps, pd in position_keys: if pd != md: continue if _match_symbol(ps, ms): matched = True break if matched: continue try: cancel_monitor_exit_orders(conn, mon, mode=mode) except Exception as exc: logger.warning("cancel exit orders monitor=%s: %s", mon.get("id"), exc) conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mon["id"],)) closed += 1 if closed: conn.commit() return closed def _execute_local_close( conn, mon: dict, *, mode: str, mark: float, reason: str, capital: float = 0.0, notify_fn: Callable[[str], None] | None = None, ) -> None: sym = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() positions = ctp_list_positions(mode) pos = _find_position(positions, sym, direction) if not pos: margin_raw = ctp_account_margin_used(mode) if margin_raw is not None and float(margin_raw) > 0: logger.debug( "skip close monitor=%s: vnpy empty but margin=%.2f", mon.get("id"), float(margin_raw), ) return _close_all_monitors_for_symbol(conn, sym, direction) reconcile_monitors_without_position(conn, mode) return lots = int(pos.get("lots") or mon.get("lots") or 1) offset = "close_long" if direction == "long" else "close_short" cancel_monitor_exit_orders(conn, mon, mode=mode) execute_order( conn, mode=mode, offset=offset, symbol=sym, direction=direction, lots=lots, price=mark, order_type="market", ) _close_all_monitors_for_symbol(conn, sym, direction) conn.commit() result_label = _result_for_close(mon, reason) logger.info( "止盈止损本地触发 monitor=%s result=%s %s %s %d手 @%s(待 CTP 成交同步写入交易记录)", mon.get("id"), result_label, sym, direction, lots, mark, ) if notify_fn: try: notify_fn(f"{result_label} {sym} {direction} {lots}手 @{mark},平仓委托已提交") except Exception as exc: logger.debug("SL/TP notify failed: %s", exc) def check_sl_tp_on_tick( conn, mode: str, exchange: str, symbol: str, mark: float, *, capital: float = 0.0, notify_fn: Callable[[str], None] | None = None, be_tick_mult: int = 2, ) -> int: """EVENT_TICK 触发:仅检查与 tick 品种匹配的 active 监控。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected") or not is_trading_session(): return 0 if mark <= 0: return 0 sym_l = (symbol or "").lower() ex_u = (exchange or "").upper() closed = 0 rows = [dict(r) for r in conn.execute( "SELECT * FROM trade_order_monitors WHERE status='active'" ).fetchall()] for mon in rows: mid = int(mon.get("id") or 0) ms = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() try: vnpy_sym, ex2 = ths_to_vnpy_symbol(ms) if sym_l != vnpy_sym.lower(): continue if ex_u and ex2 and ex_u != ex2.upper(): continue except Exception: if sym_l != ms.lower(): continue sl = mon.get("stop_loss") tp = mon.get("take_profit") try: sl_f = float(sl) if sl is not None else None tp_f = float(tp) if tp is not None else None except (TypeError, ValueError): sl_f, tp_f = None, None if sl_f is None and tp_f is None: continue positions = ctp_list_positions(mode) if not _find_position(positions, ms, direction): continue tick = _tick_size(ms) if mon.get("trailing_be"): mon = _update_trailing_stop_loss(conn, mon, mark, be_tick_mult=be_tick_mult) try: sl_f = float(mon["stop_loss"]) if mon.get("stop_loss") is not None else sl_f except (TypeError, ValueError): pass reason = None if tp_f is not None and _tp_triggered(direction, tp_f, mark, tick): reason = "take_profit" elif sl_f is not None and _sl_triggered(direction, sl_f, mark, tick): reason = "stop_loss" if not reason: continue if mid > 0 and not _can_close_now(mid): continue if not _try_acquire_close_symbol(ms, direction): continue try: _execute_local_close( conn, mon, mode=mode, mark=mark, reason=reason, capital=capital, notify_fn=notify_fn, ) if mid > 0: _mark_close_attempt(mid) closed += 1 except Exception as exc: logger.warning("SL/TP tick close failed monitor=%s: %s", mid, exc) finally: _release_close_symbol(ms, direction) return closed def check_monitors_locally( conn, mode: str, *, capital: float = 0.0, notify_fn: Callable[[str], None] | None = None, be_tick_mult: int = 2, ) -> int: """扫描 active 监控,本地比对行情;触发止盈/止损(含跳空穿透)后立刻市价平仓并记交易记录。""" ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return 0 if not is_trading_session(): return 0 reconcile_monitors_without_position(conn, mode) _dedupe_active_monitors(conn) conn.commit() closed = 0 rows = [dict(r) for r in conn.execute( "SELECT * FROM trade_order_monitors WHERE status='active'" ).fetchall()] for mon in rows: mid = int(mon.get("id") or 0) sym = (mon.get("symbol") or "").strip() direction = (mon.get("direction") or "long").strip().lower() if mon.get("sl_vt_order_id") or mon.get("tp_vt_order_id"): cancel_monitor_exit_orders(conn, mon, mode=mode) sl = mon.get("stop_loss") tp = mon.get("take_profit") try: sl_f = float(sl) if sl is not None else None tp_f = float(tp) if tp is not None else None except (TypeError, ValueError): sl_f, tp_f = None, None if sl_f is None and tp_f is None: continue positions = ctp_list_positions(mode) if not _find_position(positions, sym, direction): continue mark = ctp_get_tick_price(mode, sym) if mark is None or mark <= 0: continue tick = _tick_size(sym) if mon.get("trailing_be"): mon = _update_trailing_stop_loss(conn, mon, mark, be_tick_mult=be_tick_mult) try: sl_f = float(mon["stop_loss"]) if mon.get("stop_loss") is not None else sl_f except (TypeError, ValueError): pass reason = None if tp_f is not None and _tp_triggered(direction, tp_f, mark, tick): reason = "take_profit" elif sl_f is not None and _sl_triggered(direction, sl_f, mark, tick): reason = "stop_loss" if not reason: continue if mid > 0 and not _can_close_now(mid): continue if not _try_acquire_close_symbol(sym, direction): continue try: _execute_local_close( conn, mon, mode=mode, mark=mark, reason=reason, capital=capital, notify_fn=notify_fn, ) if mid > 0: _mark_close_attempt(mid) closed += 1 except Exception as exc: logger.warning("SL/TP local close failed monitor=%s: %s", mid, exc) finally: _release_close_symbol(sym, direction) return closed def place_monitor_exit_orders( conn, mon: dict, *, mode: str, force: bool = False, ) -> dict[str, Any]: """兼容旧 API:本地监控模式不再向交易所挂 SL/TP 单,仅清理旧挂单。""" del force ensure_monitor_order_columns(conn) if not ctp_status(mode).get("connected"): return {"ok": False, "error": "CTP 未连接", "placed": []} cancelled = cancel_monitor_exit_orders(conn, mon, mode=mode) msg = "程序本地监控中,不向交易所挂止盈止损单" if cancelled: msg += f";已撤销旧版柜台挂单 {cancelled} 笔" return {"ok": True, "message": msg, "placed": [], "local_monitor": True} def monitor_order_status( mon: dict, *, mode: str, ths_code: str, direction: str, ) -> dict[str, bool]: """返回本地监控状态(非交易所挂单状态)。""" del mode, ths_code, direction sl = mon.get("stop_loss") if mon else None tp = mon.get("take_profit") if mon else None try: sl_f = float(sl) if sl is not None else None tp_f = float(tp) if tp is not None else None except (TypeError, ValueError): sl_f, tp_f = None, None return { "sl_order_active": sl_f is not None, "tp_order_active": tp_f is not None, "sl_monitoring": sl_f is not None, "tp_monitoring": tp_f is not None, "needs_sl_order": False, "needs_tp_order": False, } def sync_all_sl_tp_orders(conn, mode: str) -> int: """兼容旧 worker 入口:执行本地监控检查。""" del mode return 0 def start_sl_tp_guard_worker( *, db_path: str, get_mode_fn: Callable[[], str], init_tables_fn: Callable | None = None, get_capital_fn: Callable | None = None, get_be_tick_buffer_fn: Callable[[], int] | None = None, notify_fn: Callable[[str], None] | None = None, interval: int = CHECK_INTERVAL_SEC, ) -> None: from db_conn import connect_db def _loop() -> None: time.sleep(20) while True: sleep_sec = max(1, interval) try: if not is_trading_session(): time.sleep(CLOSED_MARKET_SLEEP_SEC) continue mode = get_mode_fn() if not ctp_status(mode).get("connected"): time.sleep(DISCONNECTED_SLEEP_SEC) continue conn = connect_db(db_path) try: if init_tables_fn: init_tables_fn(conn) has_monitors = conn.execute( """SELECT COUNT(*) AS n FROM trade_order_monitors WHERE status='active' AND (stop_loss IS NOT NULL OR take_profit IS NOT NULL)""" ).fetchone()["n"] if not has_monitors: sleep_sec = max(sleep_sec, 5) else: capital = 0.0 if get_capital_fn: try: capital = float(get_capital_fn(conn) or 0) except Exception: capital = 0.0 n = check_monitors_locally( conn, mode, capital=capital, notify_fn=notify_fn, be_tick_mult=( get_be_tick_buffer_fn() if get_be_tick_buffer_fn else 2 ), ) if n: logger.info("止盈止损本地监控: 触发平仓 %d 笔", n) finally: conn.close() except Exception as exc: logger.warning("sl_tp_guard worker: %s", exc) time.sleep(sleep_sec) threading.Thread(target=_loop, daemon=True, name="sl-tp-guard").start()