本地监控止盈止损、盘前自动连CTP,并完善保证金与推荐手数。

- 止盈止损改为程序本地监控,触发后市价平仓(含跳空)
- 交易前30分钟后台自动连接 CTP
- 保证金占用上限默认30%,可在系统设置修改
- K线标准蜡烛图红跌绿涨,费率表全宽固定表头
- 品种推荐按保证金比例×总资金计算推荐手数

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-25 12:18:18 +08:00
parent fe1b651900
commit 9875ee6d44
15 changed files with 467 additions and 256 deletions
+135 -200
View File
@@ -1,4 +1,4 @@
"""止盈止损守护:检测持仓快照,自动/手动向 CTP 平仓限价委托"""
"""止盈止损守护:程序本地监控价位,触发后向 CTP 平仓单(不向交易所挂 SL/TP 限价单)"""
from __future__ import annotations
import logging
@@ -19,10 +19,10 @@ from vnpy_bridge import (
logger = logging.getLogger(__name__)
CHECK_INTERVAL_SEC = 20
PLACE_COOLDOWN_SEC = 120
CHECK_INTERVAL_SEC = 5
PLACE_COOLDOWN_SEC = 30
_last_place_attempt: dict[tuple[int, str], float] = {}
_last_close_attempt: dict[int, float] = {}
MONITOR_ORDER_COLUMNS = (
"ALTER TABLE trade_order_monitors ADD COLUMN sl_vt_order_id TEXT",
@@ -62,26 +62,6 @@ def _price_near(a: float, b: float, tick: float) -> bool:
return abs(float(a) - float(b)) <= max(tick * 0.501, 1e-9)
def _is_resting_exit_price(
hold_direction: str,
kind: str,
exit_price: float,
mark: Optional[float],
tick: float,
) -> bool:
"""限价平仓单是否会挂在盘口(而非立即成交)。"""
if mark is None or mark <= 0:
return True
buf = max(tick * 0.5, 1e-9)
if hold_direction == "long":
if kind == "sl":
return exit_price < mark - buf
return exit_price > mark + buf
if kind == "sl":
return exit_price > mark + buf
return exit_price < mark - buf
def _find_close_order(
active_orders: list[dict],
*,
@@ -117,23 +97,27 @@ def _find_position(positions: list[dict], ths_code: str, direction: str) -> Opti
return None
def _can_place_now(monitor_id: int, kind: str, *, cooldown: int = PLACE_COOLDOWN_SEC) -> bool:
last = _last_place_attempt.get((monitor_id, kind), 0.0)
def _can_close_now(monitor_id: int, *, cooldown: int = PLACE_COOLDOWN_SEC) -> bool:
last = _last_close_attempt.get(monitor_id, 0.0)
return (time.time() - last) >= cooldown
def _mark_place_attempt(monitor_id: int, kind: str) -> None:
_last_place_attempt[(monitor_id, kind)] = time.time()
def _mark_close_attempt(monitor_id: int) -> None:
_last_close_attempt[monitor_id] = time.time()
def _order_still_active(active_orders: list[dict], vt_order_id: str) -> bool:
if not vt_order_id:
return False
oid = str(vt_order_id).strip()
for o in active_orders:
if str(o.get("order_id") or "") == oid:
return True
return False
def _sl_triggered(direction: str, sl: float, mark: float, tick: float) -> bool:
buf = max(tick * 0.01, 1e-9)
if direction == "long":
return mark <= sl + buf
return mark >= sl - buf
def _tp_triggered(direction: str, tp: float, mark: float, tick: float) -> bool:
buf = max(tick * 0.01, 1e-9)
if direction == "long":
return mark >= tp - buf
return mark <= tp + buf
def cancel_monitor_exit_orders(
@@ -142,7 +126,7 @@ def cancel_monitor_exit_orders(
*,
mode: str,
) -> int:
"""撤销该监控对应的止盈止损平仓挂单。"""
"""撤销该监控在交易所残留的旧版止盈止损平仓挂单。"""
ensure_monitor_order_columns(conn)
if not ctp_status(mode).get("connected"):
return 0
@@ -225,6 +209,99 @@ def reconcile_monitors_without_position(conn, mode: str) -> int:
return closed
def _execute_local_close(
conn,
mon: dict,
*,
mode: str,
mark: float,
reason: str,
) -> None:
sym = (mon.get("symbol") or "").strip()
direction = (mon.get("direction") or "long").strip().lower()
positions = ctp_list_positions(mode)
pos = _find_position(positions, sym, direction)
if not pos:
reconcile_monitors_without_position(conn, mode)
return
lots = int(pos.get("lots") or mon.get("lots") or 1)
offset = "close_long" if direction == "long" else "close_short"
cancel_monitor_exit_orders(conn, mon, mode=mode)
execute_order(
conn,
mode=mode,
offset=offset,
symbol=sym,
direction=direction,
lots=lots,
price=mark,
order_type="market",
)
conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mon["id"],))
conn.commit()
logger.info(
"止盈止损本地触发 monitor=%s reason=%s %s %s %d手 @%s",
mon.get("id"), reason, sym, direction, lots, mark,
)
def check_monitors_locally(conn, mode: str) -> int:
"""扫描 active 监控,本地比对行情;触发止盈/止损(含跳空穿透)后立刻平仓。"""
ensure_monitor_order_columns(conn)
if not ctp_status(mode).get("connected"):
return 0
reconcile_monitors_without_position(conn, mode)
closed = 0
rows = conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='active'"
).fetchall()
for r in rows:
mon = dict(r)
mid = int(mon.get("id") or 0)
sym = (mon.get("symbol") or "").strip()
direction = (mon.get("direction") or "long").strip().lower()
if mon.get("sl_vt_order_id") or mon.get("tp_vt_order_id"):
cancel_monitor_exit_orders(conn, mon, mode=mode)
sl = mon.get("stop_loss")
tp = mon.get("take_profit")
try:
sl_f = float(sl) if sl is not None else None
tp_f = float(tp) if tp is not None else None
except (TypeError, ValueError):
sl_f, tp_f = None, None
if sl_f is None and tp_f is None:
continue
positions = ctp_list_positions(mode)
if not _find_position(positions, sym, direction):
continue
mark = ctp_get_tick_price(mode, sym)
if mark is None or mark <= 0:
continue
tick = _tick_size(sym)
reason = None
if tp_f is not None and _tp_triggered(direction, tp_f, mark, tick):
reason = "take_profit"
elif sl_f is not None and _sl_triggered(direction, sl_f, mark, tick):
reason = "stop_loss"
if not reason:
continue
if mid > 0 and not _can_close_now(mid):
continue
try:
_mark_close_attempt(mid)
_execute_local_close(conn, mon, mode=mode, mark=mark, reason=reason)
closed += 1
except Exception as exc:
logger.warning("SL/TP local close failed monitor=%s: %s", mid, exc)
return closed
def place_monitor_exit_orders(
conn,
mon: dict,
@@ -232,117 +309,16 @@ def place_monitor_exit_orders(
mode: str,
force: bool = False,
) -> dict[str, Any]:
"""按开仓快照中的止损/止盈价,向 CTP 挂平仓限价单(缺则补)"""
"""兼容旧 API:本地监控模式不再向交易所挂 SL/TP 单,仅清理旧挂单"""
del force
ensure_monitor_order_columns(conn)
if not ctp_status(mode).get("connected"):
return {"ok": False, "error": "CTP 未连接", "placed": []}
sym = (mon.get("symbol") or "").strip()
direction = (mon.get("direction") or "long").strip().lower()
sl = mon.get("stop_loss")
tp = mon.get("take_profit")
try:
sl_f = float(sl) if sl is not None else None
tp_f = float(tp) if tp is not None else None
except (TypeError, ValueError):
sl_f, tp_f = None, None
if sl_f is None and tp_f is None:
return {"ok": False, "error": "快照无止盈止损,无法委托", "placed": []}
positions = ctp_list_positions(mode)
pos = _find_position(positions, sym, direction)
if not pos:
reconcile_monitors_without_position(conn, mode)
return {"ok": False, "error": "柜台无对应持仓(可能已被止盈/止损平掉)", "placed": []}
lots = int(pos.get("lots") or 1)
if lots != int(mon.get("lots") or 0):
conn.execute("UPDATE trade_order_monitors SET lots=? WHERE id=?", (lots, mon["id"]))
conn.commit()
mark = ctp_get_tick_price(mode, sym)
active = ctp_list_active_orders(mode)
tick = _tick_size(sym)
offset = "close_long" if direction == "long" else "close_short"
placed: list[str] = []
skipped: list[str] = []
updates: dict[str, Optional[str]] = {}
mid = int(mon.get("id") or 0)
def _maybe_place(kind: str, price: Optional[float], stored_id: str) -> None:
if price is None or price <= 0:
return
existing = _find_close_order(
active, ths_code=sym, hold_direction=direction, price=price, tick=tick,
)
if existing:
updates[f"{kind}_vt_order_id"] = str(existing.get("order_id") or stored_id or "")
return
if stored_id and _order_still_active(active, stored_id) and not force:
return
if mid > 0 and not force and not _can_place_now(mid, kind):
return
if not _is_resting_exit_price(direction, kind, price, mark, tick):
hint = f"{'止损' if kind == 'sl' else '止盈'} {price}"
if mark:
hint += f"(现价 {mark} 会立即成交)"
skipped.append(hint)
if not force:
logger.info("SL/TP skip immediate fill monitor=%s %s mark=%s", mid, kind, mark)
return
try:
_mark_place_attempt(mid, kind)
result = execute_order(
conn,
mode=mode,
offset=offset,
symbol=sym,
direction=direction,
lots=lots,
price=price,
order_type="limit",
)
except Exception as exc:
logger.warning("SL/TP place %s monitor=%s failed: %s", kind, mid, exc)
return
oid = str(result.get("order_id") or "")
if oid:
updates[f"{kind}_vt_order_id"] = oid
placed.append(f"{kind}@{price}")
time.sleep(0.3)
positions_after = ctp_list_positions(mode)
if not _find_position(positions_after, sym, direction):
cancel_monitor_exit_orders(conn, mon, mode=mode)
reconcile_monitors_without_position(conn, mode)
return
sl_id = str(mon.get("sl_vt_order_id") or "")
tp_id = str(mon.get("tp_vt_order_id") or "")
_maybe_place("sl", sl_f, sl_id)
if _find_position(ctp_list_positions(mode), sym, direction):
_maybe_place("tp", tp_f, tp_id)
if updates:
sl_new = updates.get("sl_vt_order_id", mon.get("sl_vt_order_id"))
tp_new = updates.get("tp_vt_order_id", mon.get("tp_vt_order_id"))
conn.execute(
"UPDATE trade_order_monitors SET sl_vt_order_id=?, tp_vt_order_id=? WHERE id=?",
(sl_new, tp_new, mon["id"]),
)
conn.commit()
if not placed and not updates and not skipped:
return {"ok": True, "message": "无需新委托", "placed": []}
msg_parts = []
if placed:
msg_parts.append("已提交: " + ", ".join(placed))
elif updates:
msg_parts.append("委托已在柜台")
if skipped:
msg_parts.append("未挂单: " + "; ".join(skipped))
return {"ok": True, "message": "".join(msg_parts), "placed": placed, "skipped": skipped}
cancelled = cancel_monitor_exit_orders(conn, mon, mode=mode)
msg = "程序本地监控中,不向交易所挂止盈止损单"
if cancelled:
msg += f";已撤销旧版柜台挂单 {cancelled}"
return {"ok": True, "message": msg, "placed": [], "local_monitor": True}
def monitor_order_status(
@@ -352,7 +328,8 @@ def monitor_order_status(
ths_code: str,
direction: str,
) -> dict[str, bool]:
"""检查快照价位是否已有对应平仓挂单"""
"""返回本地监控状态(非交易所挂单状态)"""
del mode, ths_code, direction
sl = mon.get("stop_loss") if mon else None
tp = mon.get("take_profit") if mon else None
try:
@@ -360,61 +337,20 @@ def monitor_order_status(
tp_f = float(tp) if tp is not None else None
except (TypeError, ValueError):
sl_f, tp_f = None, None
if not ctp_status(mode).get("connected"):
return {
"sl_order_active": False,
"tp_order_active": False,
"needs_sl_order": sl_f is not None,
"needs_tp_order": tp_f is not None,
}
active = ctp_list_active_orders(mode)
tick = _tick_size(ths_code)
sl_active = False
tp_active = False
if sl_f is not None:
sl_active = _find_close_order(
active, ths_code=ths_code, hold_direction=direction, price=sl_f, tick=tick,
) is not None
if tp_f is not None:
tp_active = _find_close_order(
active, ths_code=ths_code, hold_direction=direction, price=tp_f, tick=tick,
) is not None
return {
"sl_order_active": sl_active,
"tp_order_active": tp_active,
"needs_sl_order": sl_f is not None and not sl_active,
"needs_tp_order": tp_f is not None and not tp_active,
"sl_order_active": sl_f is not None,
"tp_order_active": tp_f is not None,
"sl_monitoring": sl_f is not None,
"tp_monitoring": tp_f is not None,
"needs_sl_order": False,
"needs_tp_order": False,
}
def sync_all_sl_tp_orders(conn, mode: str) -> int:
"""扫描全部 active 监控,为缺失的止盈止损自动挂单。返回新挂单数"""
ensure_monitor_order_columns(conn)
if not ctp_status(mode).get("connected"):
return 0
reconcile_monitors_without_position(conn, mode)
placed_n = 0
rows = conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='active'"
).fetchall()
for r in rows:
mon = dict(r)
st = monitor_order_status(
mon, mode=mode, ths_code=mon.get("symbol") or "", direction=mon.get("direction") or "long",
)
if not st.get("needs_sl_order") and not st.get("needs_tp_order"):
continue
if mon.get("stop_loss") is None and mon.get("take_profit") is None:
continue
try:
res = place_monitor_exit_orders(conn, mon, mode=mode, force=False)
placed_n += len(res.get("placed") or [])
except Exception as exc:
logger.warning("SL/TP auto place failed monitor=%s: %s", mon.get("id"), exc)
return placed_n
"""兼容旧 worker 入口:执行本地监控检查"""
del mode
return 0
def start_sl_tp_guard_worker(
@@ -436,14 +372,13 @@ def start_sl_tp_guard_worker(
try:
if init_tables_fn:
init_tables_fn(conn)
reconcile_monitors_without_position(conn, mode)
n = sync_all_sl_tp_orders(conn, mode)
n = check_monitors_locally(conn, mode)
if n:
logger.info("止盈止损守护: 新挂 %d委托", n)
logger.info("止盈止损本地监控: 触发平仓 %d", n)
finally:
conn.close()
except Exception as exc:
logger.warning("sl_tp_guard worker: %s", exc)
time.sleep(max(10, interval))
time.sleep(max(3, interval))
threading.Thread(target=_loop, daemon=True, name="sl-tp-guard").start()