fix: 交易安全审计修复 — 补偿平仓、中控同步、滚仓/趋势防护

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-04 22:44:16 +08:00
parent df28e6dfb8
commit eb975b0133
11 changed files with 675 additions and 162 deletions
+95
View File
@@ -0,0 +1,95 @@
"""Hub 中控市价全平后立即同步 order_monitors(三所共用)。"""
from __future__ import annotations
import time
from typing import Any, Callable
def reconcile_hub_external_close_impl(
conn,
symbol: str,
direction: str,
*,
exchange_configured: Callable[[], bool],
not_configured_msg: str,
symbols_match: Callable[[str, str], bool],
get_opened_at_value: Callable[[Any], str],
resolve_monitor_exchange_symbol: Callable[[Any], str],
get_live_position_contracts: Callable[[str, str], float | None],
cancel_conditional_orders: Callable[[str], None],
resolve_synced_flat_close: Callable[..., tuple],
finalize_stopped_monitor: Callable[..., None],
sync_trade_records: Callable[..., None] | None = None,
reconcile_flat_streak: dict | None = None,
to_ms_with_fallback: Callable[..., int | None] | None = None,
prefer_manual_resolve: bool = False,
order_row_monitor_type: Callable[[Any], str] | None = None,
) -> dict[str, Any]:
if not exchange_configured():
return {"ok": False, "msg": not_configured_msg, "synced": 0}
sym_req = (symbol or "").strip()
dir_l = (direction or "").strip().lower()
if dir_l not in ("long", "short"):
return {"ok": False, "msg": "side 须为 long 或 short", "synced": 0}
synced = 0
streak = reconcile_flat_streak if reconcile_flat_streak is not None else {}
rows = conn.execute(
"SELECT * FROM order_monitors WHERE status IN ('active', 'error')"
).fetchall()
for r in rows:
if not symbols_match(str(r["symbol"] or ""), sym_req):
continue
if (r["direction"] or "").strip().lower() != dir_l:
continue
oid = int(r["id"])
if r["status"] == "error":
opened_at_chk = get_opened_at_value(r)
mtype = order_row_monitor_type(r) if order_row_monitor_type else r["monitor_type"]
existing = conn.execute(
"SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type=? LIMIT 1",
(r["symbol"], opened_at_chk, mtype),
).fetchone()
if existing:
conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,))
synced += 1
continue
exchange_symbol = resolve_monitor_exchange_symbol(r)
live_contracts = get_live_position_contracts(exchange_symbol, r["direction"])
if live_contracts is None:
continue
if live_contracts > 0:
time.sleep(0.6)
live_contracts = get_live_position_contracts(exchange_symbol, r["direction"])
if live_contracts is None or live_contracts > 0:
continue
streak.pop(oid, None)
cancel_conditional_orders(exchange_symbol)
opened_at = get_opened_at_value(r)
opened_at_ms = None
if to_ms_with_fallback is not None:
keys = r.keys() if hasattr(r, "keys") else ()
opened_at_ms = to_ms_with_fallback(
r["opened_at_ms"] if "opened_at_ms" in keys else None,
opened_at,
)
resolve_kw = {"opened_at_ms": opened_at_ms}
if prefer_manual_resolve:
resolve_kw["prefer_manual"] = True
result, pnl_amount, closed_at, miss_reason = resolve_synced_flat_close(
r, opened_at, **resolve_kw
)
finalize_stopped_monitor(
conn,
r,
result=result,
pnl_amount=pnl_amount,
closed_at=closed_at,
miss_reason=miss_reason,
)
synced += 1
if sync_trade_records is not None:
try:
sync_trade_records(conn, force=True)
except Exception:
pass
return {"ok": True, "synced": synced}
@@ -37,6 +37,34 @@ KEY_ENTRY_REASON_CALLBACK = "关键位回调触价开仓"
KEY_ENTRY_REASON_BREAKOUT = "关键位突破触价开仓"
KEY_ENTRY_REASON_TRIGGER_LEGACY = "关键位触价开仓"
TRIGGER_ENTRY_IN_FLIGHT_OID = "__trigger_entry_in_flight__"
def is_trigger_entry_in_flight_row(row: Any) -> bool:
if row is None:
return False
try:
v = row["fib_limit_order_id"]
except (KeyError, IndexError, TypeError):
v = getattr(row, "fib_limit_order_id", None)
return (v or "").strip() == TRIGGER_ENTRY_IN_FLIGHT_OID
def acquire_trigger_entry_exec_lock(conn: Any, monitor_id: int) -> bool:
cur = conn.execute(
"UPDATE key_monitors SET fib_limit_order_id=? WHERE id=? "
"AND (fib_limit_order_id IS NULL OR fib_limit_order_id='')",
(TRIGGER_ENTRY_IN_FLIGHT_OID, int(monitor_id)),
)
return int(cur.rowcount or 0) == 1
def release_trigger_entry_exec_lock(conn: Any, monitor_id: int) -> None:
conn.execute(
"UPDATE key_monitors SET fib_limit_order_id=NULL WHERE id=? AND fib_limit_order_id=?",
(int(monitor_id), TRIGGER_ENTRY_IN_FLIGHT_OID),
)
def normalize_trigger_entry_monitor_type(monitor_type: Optional[str]) -> str:
mt = (monitor_type or "").strip()
+15 -2
View File
@@ -40,7 +40,8 @@ def check_roll_monitors(cfg: dict[str, Any]) -> None:
_reconcile_roll_groups(conn, cfg)
_check_pending_roll_legs(conn, cfg)
conn.commit()
except Exception:
except Exception as e:
print(f"[roll_monitor] {e}", flush=True)
try:
conn.rollback()
except Exception:
@@ -408,7 +409,19 @@ def _execute_pending_roll_leg(
return
oid = str(order.get("id") or "") if isinstance(order, dict) else ""
cfg["replace_tpsl"](ex_sym, direction, sl, tp0, mon)
try:
cfg["replace_tpsl"](ex_sym, direction, sl, tp0, mon)
except Exception as tpsl_err:
fe = cfg.get("friendly_error")
msg = fe(tpsl_err) if callable(fe) else str(tpsl_err)
conn.execute(
"""UPDATE roll_legs SET status='error', exchange_order_id=?, fill_price=?, amount=?
WHERE id=? AND status='pending'""",
(oid, fill, float(amount), leg_id),
)
_notify_roll_fail(cfg, group, leg, mark, f"加仓成交但止盈止损更新失败: {msg}")
return
conn.execute(
"""UPDATE roll_legs SET status='filled', fill_price=?, amount=?, exchange_order_id=?,
new_stop_loss=? WHERE id=? AND status='pending'""",
+69 -16
View File
@@ -244,7 +244,7 @@ def _row(cfg, row) -> dict:
return cfg["row_to_dict"](row)
def precheck_trend_start(cfg: dict, conn) -> tuple[bool, str]:
def precheck_trend_start(cfg: dict, conn, *, symbol: str = "", direction: str = "long") -> tuple[bool, str]:
m = _m(cfg)
mode = getattr(m, "POSITION_SIZING_MODE", None) or "risk"
try:
@@ -255,9 +255,41 @@ def precheck_trend_start(cfg: dict, conn) -> tuple[bool, str]:
return False, src_msg
except Exception:
pass
now = m.app_now()
if not m.trading_day_reset_allows_new_open(now):
return False, f"北京时间 {cfg['reset_hour']}:00 前不允许持仓"
sym = (symbol or "").strip()
dir_l = (direction or "long").strip().lower()
if sym and dir_l in ("long", "short") and hasattr(m, "precheck_risk"):
ok_risk, risk_msg = m.precheck_risk(conn, sym, dir_l)
if not ok_risk:
return False, risk_msg
else:
now = m.app_now()
if not m.trading_day_reset_allows_new_open(now):
return False, f"北京时间 {cfg['reset_hour']}:00 前不允许持仓"
from lib.trade.account_risk_lib import account_risk_blocks_trading, position_limit_reached
ok_risk, risk_reason = account_risk_blocks_trading(
conn,
trading_day=m.get_trading_day(now),
now=now,
fmt_local_ms=getattr(m, "ms_to_app_local_str", lambda _x: ""),
)
if not ok_risk:
return False, risk_reason
reached, active_count, mx = position_limit_reached(
conn, max_active_positions=cfg["max_active_positions"]
)
if reached:
return False, f"已达最大持仓数({active_count}/{mx}"
from lib.trade.daily_open_limit_lib import check_daily_open_hard_limit
ok_daily, daily_reason, _opens = check_daily_open_hard_limit(
conn,
m.get_trading_day(now),
getattr(m, "DAILY_OPEN_HARD_LIMIT", 0),
cfg["reset_hour"],
)
if not ok_daily:
return False, daily_reason
active = m.get_active_position_count(conn)
if active >= cfg["max_active_positions"]:
return (
@@ -1604,22 +1636,27 @@ def register_trend_routes(app: Flask, cfg: dict) -> None:
def preview_trend_pullback():
conn = get_db()
init_strategy_tables(conn)
okp, msg = precheck_trend_start(cfg, conn)
if not okp:
conn.close()
flash(msg)
return _redirect_trend()
m = _m(cfg)
ok_live, reason = m.ensure_exchange_live_ready()
if not ok_live:
conn.close()
flash(reason)
return _redirect_trend()
payload, err = parse_trend_plan(cfg, request.form)
if err:
conn.close()
flash(err)
return _redirect_trend()
okp, msg = precheck_trend_start(
cfg,
conn,
symbol=str(payload.get("symbol") or ""),
direction=str(payload.get("direction") or "long"),
)
if not okp:
conn.close()
flash(msg)
return _redirect_trend()
ok_live, reason = m.ensure_exchange_live_ready()
if not ok_live:
conn.close()
flash(reason)
return _redirect_trend()
pid = str(uuid.uuid4())
exp_ms = int(time.time() * 1000) + cfg["preview_ttl"] * 1000
created = m.app_now_str()
@@ -1678,7 +1715,12 @@ def register_trend_routes(app: Flask, cfg: dict) -> None:
conn.close()
flash("预览已过期或不存在,请重新生成预览")
return _redirect_trend()
okp, msg = precheck_trend_start(cfg, conn)
okp, msg = precheck_trend_start(
cfg,
conn,
symbol=str(pr["symbol"] or ""),
direction=str(pr["direction"] or "long"),
)
if not okp:
conn.close()
flash(msg)
@@ -1718,7 +1760,18 @@ def register_trend_routes(app: Flask, cfg: dict) -> None:
exchange_symbol, direction, first_amt, leverage, stop_loss=None, take_profit=None
)
fill1 = m.resolve_order_entry_price(o1, exchange_symbol, live_price)
trend_refresh_stop_only(cfg, exchange_symbol, direction, stop_loss)
try:
trend_refresh_stop_only(cfg, exchange_symbol, direction, stop_loss)
except Exception as sl_err:
from lib.strategy.strategy_trend_exchange import cancel_symbol_orders, trend_market_close
try:
pos_qty = m.get_live_position_contracts(exchange_symbol, direction) or first_amt
trend_market_close(cfg, exchange_symbol, direction, float(pos_qty), leverage)
cancel_symbol_orders(cfg, exchange_symbol)
except Exception as close_err:
print(f"[trend_start] compensating close failed: {close_err}", flush=True)
raise sl_err
except Exception as e:
conn.close()
fe = getattr(m, "friendly_exchange_error", lambda x, **k: str(x))
+16
View File
@@ -0,0 +1,16 @@
"""开仓后挂 TP/SL 失败时的补偿平仓(避免裸仓)。"""
from __future__ import annotations
from typing import Callable
def log_compensating_close_error(prefix: str, exc: BaseException) -> None:
print(f"[{prefix}] {exc}", flush=True)
def run_compensating_close(close_fn: Callable[[], None], *, log_prefix: str = "compensating_close") -> None:
"""执行补偿平仓;二次失败只打日志,不掩盖原始异常。"""
try:
close_fn()
except Exception as e:
log_compensating_close_error(log_prefix, e)