Add daily loss force-flatten at configurable equity limit

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-03 12:42:13 +08:00
parent b6c3266a9e
commit 2081bf2da9
17 changed files with 850 additions and 97 deletions
+120 -73
View File
@@ -20,6 +20,7 @@ STATUS_NORMAL = "normal"
STATUS_FREEZE_1H = "freeze_1h"
STATUS_FREEZE_4H = "freeze_4h"
STATUS_DAILY = "freeze_daily"
STATUS_DAILY_LOSS = "freeze_daily_loss"
STATUS_FREEZE_POSITION = "freeze_position"
STATUS_LABELS = {
@@ -27,6 +28,7 @@ STATUS_LABELS = {
STATUS_FREEZE_1H: "1h冻结",
STATUS_FREEZE_4H: "4h冻结",
STATUS_DAILY: "日冻结",
STATUS_DAILY_LOSS: "风控",
STATUS_FREEZE_POSITION: "仓位上限冻结",
}
@@ -82,12 +84,49 @@ def daily_position_limit() -> int:
return 5
def daily_trading_risk_pct_limit() -> float:
"""当日累计止损风险占权益上限(%)。"""
def daily_trading_risk_pct_limit(
get_setting: Optional[Callable[[str, str], str]] = None,
) -> float:
"""当日亏损占权益强平线(%),默认 2。"""
return daily_loss_force_close_pct(get_setting)
def _default_get_setting(key: str, default: str = "") -> str:
try:
return max(0.1, float(os.getenv("RISK_DAILY_TRADING_RISK_PCT", "2")))
from modules.fees.fee_specs import get_setting
return get_setting(key, default)
except Exception:
return default
def daily_loss_force_close_pct(
get_setting: Optional[Callable[[str, str], str]] = None,
) -> float:
gs = get_setting or _default_get_setting
try:
return max(0.1, min(50.0, float(gs("daily_loss_force_close_pct", "2") or 2)))
except (TypeError, ValueError):
return 2.0
try:
return max(0.1, float(os.getenv("RISK_DAILY_TRADING_RISK_PCT", "2")))
except (TypeError, ValueError):
return 2.0
def daily_loss_slippage_buffer_pct(
get_setting: Optional[Callable[[str, str], str]] = None,
) -> float:
gs = get_setting or _default_get_setting
try:
return max(0.0, min(20.0, float(gs("daily_loss_slippage_buffer_pct", "1") or 1)))
except (TypeError, ValueError):
return 1.0
def daily_loss_total_cap_pct(
get_setting: Optional[Callable[[str, str], str]] = None,
) -> float:
return daily_loss_force_close_pct(get_setting) + daily_loss_slippage_buffer_pct(get_setting)
def trading_day_reset_hour() -> int:
@@ -260,68 +299,23 @@ def _risk_amount_for_monitor_row(r, equity: float) -> float:
def daily_trading_risk_used_pct(
conn, equity: float, now: Optional[datetime] = None,
conn, equity: float, now: Optional[datetime] = None, *, mode: Optional[str] = None,
) -> Optional[float]:
"""当日交易风险占权益(%):每品种槽位只计一次。
"""当日亏损占权益(%):已实现亏损 + 持仓浮亏。"""
from modules.risk.daily_loss_guard import daily_loss_used_pct
- 仍持仓:按止损距离算风险金额(以损定仓口径)
- 已平仓:按当日已实现亏损计(pnl_net<0),不再重复累加历史监控行
"""
if equity <= 0:
return None
slots = _daily_open_slots(conn, now)
if not slots:
return 0.0
trade_mode = mode
if not trade_mode:
try:
from modules.core.trading_context import get_trading_mode
from modules.fees.fee_specs import get_setting
active_risk: dict[tuple[str, str], float] = {}
for r in conn.execute(
"""SELECT symbol, direction, lots, entry_price, stop_loss, take_profit, open_time
FROM trade_order_monitors
WHERE status='active' AND open_time IS NOT NULL AND trim(open_time) <> ''"""
).fetchall():
if not _opened_in_trading_day(r["open_time"], now):
continue
key = _position_slot_key(r["symbol"], r["direction"])
if key not in slots:
continue
amt = _risk_amount_for_monitor_row(r, equity)
if amt > 0:
active_risk[key] = amt
closed_risk: dict[tuple[str, str], float] = {}
for r in conn.execute(
"""SELECT symbol, direction, pnl_net, open_time
FROM trade_logs
WHERE open_time IS NOT NULL AND trim(open_time) <> ''"""
).fetchall():
if not _opened_in_trading_day(r["open_time"], now):
continue
key = _position_slot_key(r["symbol"], r["direction"])
if key not in slots or key in active_risk:
continue
loss = max(0.0, -float(r["pnl_net"] or 0))
if loss > 0:
closed_risk[key] = max(closed_risk.get(key, 0.0), loss)
for r in conn.execute(
"""SELECT symbol, direction, lots, entry_price, stop_loss, take_profit, open_time
FROM trade_order_monitors
WHERE status='closed' AND open_time IS NOT NULL AND trim(open_time) <> ''
ORDER BY id DESC"""
).fetchall():
if not _opened_in_trading_day(r["open_time"], now):
continue
key = _position_slot_key(r["symbol"], r["direction"])
if key not in slots or key in active_risk or key in closed_risk:
continue
amt = _risk_amount_for_monitor_row(r, equity)
if amt > 0:
closed_risk[key] = amt
total = sum(active_risk.values()) + sum(closed_risk.values())
if total <= 0:
return 0.0
return round(total / equity * 100, 2)
trade_mode = get_trading_mode(get_setting)
except Exception:
trade_mode = "simulation"
return daily_loss_used_pct(conn, equity, trade_mode, now=now)
def count_active_trade_monitors(conn) -> int:
@@ -483,6 +477,8 @@ def get_risk_status(
now: Optional[datetime] = None,
active_count: Optional[int] = None,
equity: Optional[float] = None,
mode: Optional[str] = None,
get_setting: Optional[Callable[[str, str], str]] = None,
) -> dict:
def _load() -> dict:
ensure_account_risk_schema(conn)
@@ -526,12 +522,28 @@ def get_risk_status(
daily_pos_lim = daily_position_limit()
daily_open_limit = daily_opens >= daily_pos_lim
daily_risk_used: Optional[float] = None
daily_risk_lim = daily_trading_risk_pct_limit()
daily_risk_lim = daily_trading_risk_pct_limit(get_setting)
slip_buf = daily_loss_slippage_buffer_pct(get_setting)
daily_risk_cap = daily_loss_total_cap_pct(get_setting)
daily_risk_limit_hit = False
if equity and float(equity) > 0:
daily_risk_used = daily_trading_risk_used_pct(conn, float(equity), now)
trade_mode = mode
if not trade_mode and get_setting:
try:
from modules.core.trading_context import get_trading_mode
trade_mode = get_trading_mode(get_setting)
except Exception:
trade_mode = None
if equity and float(equity) > 0 and trade_mode:
daily_risk_used = daily_trading_risk_used_pct(
conn, float(equity), now, mode=trade_mode,
)
if daily_risk_used is not None and daily_risk_used >= daily_risk_lim:
daily_risk_limit_hit = True
elif equity and float(equity) > 0:
daily_risk_used = 0.0
loss_locked = is_daily_loss_locked(conn, now=now)
base = {
"active_count": active,
@@ -540,25 +552,37 @@ def get_risk_status(
"daily_position_limit": daily_pos_lim,
"daily_risk_used_pct": daily_risk_used,
"daily_trading_risk_pct_limit": daily_risk_lim,
"daily_loss_slippage_buffer_pct": slip_buf,
"daily_loss_total_cap_pct": daily_risk_cap,
}
if daily:
if daily or loss_locked:
reason = "当日日冻结,禁止新开仓"
if loss_locked and daily_risk_used is not None:
reason = (
f"当日亏损已达 {daily_risk_used:.2f}%(上限 {daily_risk_lim:.2f}% 权益),"
"禁止开仓"
)
return {
**base,
"status": STATUS_DAILY,
"status_label": STATUS_LABELS[STATUS_DAILY],
"status": STATUS_DAILY_LOSS if loss_locked else STATUS_DAILY,
"status_label": STATUS_LABELS[STATUS_DAILY_LOSS] if loss_locked else STATUS_LABELS[STATUS_DAILY],
"can_trade": False,
"can_roll": False,
"reason": "当日日冻结,禁止新开仓",
"reason": reason,
}
if daily_risk_limit_hit:
return {
**base,
"status": STATUS_DAILY,
"status_label": STATUS_LABELS[STATUS_DAILY],
"status": STATUS_DAILY_LOSS,
"status_label": STATUS_LABELS[STATUS_DAILY_LOSS],
"can_trade": False,
"can_roll": pos_limit,
"reason": f"已达日交易风险上限 {daily_risk_used:.2f}%/{daily_risk_lim:.2f}%",
"can_roll": False,
"reason": (
f"当日亏损已达 {daily_risk_used:.2f}%(上限 {daily_risk_lim:.2f}% 权益),"
"正在强制平仓,禁止开仓"
),
"force_flatten_required": True,
}
if daily_open_limit:
return {
@@ -590,13 +614,36 @@ def get_risk_status(
return _db_retry(_load)
def is_daily_loss_locked(conn, *, now=None) -> bool:
ensure_account_risk_schema(conn)
td = trading_day_label(now)
row = conn.execute("SELECT trading_day, daily_frozen FROM account_risk_state WHERE id=1").fetchone()
if not row:
return False
stored = str(row["trading_day"] if isinstance(row, dict) else row[0] or "")
frozen = int((row["daily_frozen"] if isinstance(row, dict) else row[1]) or 0)
return stored == td and frozen == 1
def should_skip_sl_tp_for_daily_loss(conn) -> bool:
return is_daily_loss_locked(conn)
def assert_can_open(
conn,
*,
active_count: Optional[int] = None,
equity: Optional[float] = None,
mode: Optional[str] = None,
get_setting: Optional[Callable[[str, str], str]] = None,
) -> Optional[str]:
rs = get_risk_status(conn, active_count=active_count, equity=equity)
rs = get_risk_status(
conn,
active_count=active_count,
equity=equity,
mode=mode,
get_setting=get_setting,
)
if not rs.get("can_trade"):
return rs.get("reason") or "当前不可开仓"
return None
+347
View File
@@ -0,0 +1,347 @@
# Copyright (c) 2025-2026 马建军. All rights reserved.
"""日亏损风控:达权益比例上限后强制清仓并当日禁开。"""
from __future__ import annotations
import logging
import threading
import time
from typing import Any, Callable, Optional
from modules.core.contract_specs import calc_position_metrics
from modules.market.market_sessions import is_trading_session
from modules.risk.account_risk_lib import (
_default_get_setting,
daily_loss_force_close_pct,
daily_loss_slippage_buffer_pct,
daily_loss_total_cap_pct,
ensure_account_risk_schema,
risk_control_enabled,
trading_day_label,
trading_day_start,
_parse_open_time_ms,
)
from modules.ctp.vnpy_bridge import (
ctp_cancel_order,
ctp_get_tick_price,
ctp_list_active_orders,
ctp_list_positions,
ctp_status,
execute_order,
)
logger = logging.getLogger(__name__)
CHECK_INTERVAL_SEC = 2
DISCONNECTED_SLEEP_SEC = 5
CLOSED_MARKET_SLEEP_SEC = 30
_flatten_lock = threading.Lock()
_flatten_in_progress = False
_last_flatten_attempt: float = 0.0
FLATTEN_COOLDOWN_SEC = 15
def _closed_in_trading_day(close_time: str, now=None) -> bool:
oms = _parse_open_time_ms((close_time or "").replace("T", " "))
if oms is None:
return False
return oms >= int(trading_day_start(now).timestamp() * 1000)
def daily_realized_loss_amount(conn, *, now=None) -> float:
"""当日已平仓实现的亏损金额(正数)。"""
total = 0.0
try:
rows = conn.execute(
"SELECT pnl_net, close_time FROM trade_logs WHERE close_time IS NOT NULL"
).fetchall()
except Exception:
return 0.0
for r in rows:
if isinstance(r, dict):
ct = r.get("close_time") or ""
pnl = float(r.get("pnl_net") or 0)
else:
ct = r[1] if len(r) > 1 else ""
pnl = float(r[0] or 0)
if not _closed_in_trading_day(ct, now):
continue
if pnl < 0:
total += -pnl
return round(total, 2)
def daily_floating_loss_amount(mode: str) -> float:
"""当前持仓浮亏金额(正数),含隔夜仓跳空。"""
if not mode:
return 0.0
loss = 0.0
try:
positions = ctp_list_positions(mode, refresh_if_empty=False, refresh_margin=False)
except Exception:
return 0.0
for p in positions or []:
lots = int(p.get("lots") or 0)
if lots <= 0:
continue
sym = (p.get("symbol") or "").strip()
direction = (p.get("direction") or "long").strip().lower()
entry = float(p.get("avg_price") or p.get("entry_price") or 0)
if entry <= 0 or not sym:
continue
mark = float(p.get("mark_price") or p.get("current_price") or 0)
if mark <= 0:
try:
mark = float(ctp_get_tick_price(mode, sym) or 0)
except Exception:
mark = 0.0
if mark <= 0:
continue
m = calc_position_metrics(
direction, entry, entry, entry, lots, mark, 1.0, sym,
)
fp = float(m.get("float_pnl") or 0)
if fp < 0:
loss += -fp
return round(loss, 2)
def daily_loss_amount(
conn,
equity: float,
mode: str,
*,
now=None,
) -> tuple[float, float]:
"""返回 (亏损金额, 占权益%)。"""
if equity <= 0:
return 0.0, 0.0
realized = daily_realized_loss_amount(conn, now=now)
floating = daily_floating_loss_amount(mode)
total = realized + floating
pct = round(total / float(equity) * 100, 2)
return round(total, 2), pct
def daily_loss_used_pct(
conn,
equity: float,
mode: str,
*,
now=None,
) -> Optional[float]:
if equity <= 0:
return None
_, pct = daily_loss_amount(conn, equity, mode, now=now)
return pct
def mark_daily_loss_lock(conn, *, now=None) -> None:
ensure_account_risk_schema(conn)
td = trading_day_label(now)
conn.execute(
"""UPDATE account_risk_state SET trading_day=?, daily_frozen=1,
cooloff_until_ms=NULL, cooloff_hours=NULL, updated_at=? WHERE id=1""",
(td, time.strftime("%Y-%m-%d %H:%M:%S")),
)
conn.commit()
def _cancel_all_close_orders(mode: str) -> int:
cancelled = 0
try:
active = ctp_list_active_orders(mode)
except Exception:
return 0
for o in active or []:
offset_s = (o.get("offset") or "").upper()
if "CLOSE" not in offset_s:
continue
oid = str(o.get("order_id") or o.get("vt_order_id") or "")
if oid and ctp_cancel_order(mode, oid):
cancelled += 1
return cancelled
def force_flatten_all_positions(
conn,
mode: str,
*,
equity: float,
reason: str = "",
notify_fn: Callable[[str], None] | None = None,
get_setting: Callable[[str, str], str] | None = None,
) -> int:
"""无条件市价平掉全部持仓;返回提交平仓笔数。"""
global _flatten_in_progress, _last_flatten_attempt
if not ctp_status(mode).get("connected"):
return 0
with _flatten_lock:
if _flatten_in_progress:
return 0
if time.time() - _last_flatten_attempt < FLATTEN_COOLDOWN_SEC:
return 0
_flatten_in_progress = True
_last_flatten_attempt = time.time()
submitted = 0
try:
mark_daily_loss_lock(conn)
cancelled = _cancel_all_close_orders(mode)
if cancelled:
logger.info("日亏损强平:已撤平仓挂单 %d", cancelled)
positions = [
p for p in (ctp_list_positions(mode) or [])
if int(p.get("lots") or 0) > 0
]
if not positions:
return 0
slip_buf = daily_loss_slippage_buffer_pct(get_setting)
for p in positions:
sym = (p.get("symbol") or "").strip()
direction = (p.get("direction") or "long").strip().lower()
lots = int(p.get("lots") or 0)
if not sym or lots <= 0:
continue
mark = float(p.get("mark_price") or p.get("current_price") or 0)
if mark <= 0:
mark = float(ctp_get_tick_price(mode, sym) or p.get("avg_price") or 0)
if mark <= 0:
logger.warning("日亏损强平跳过 %s:无有效价格", sym)
continue
offset = "close_long" if direction == "long" else "close_short"
try:
execute_order(
conn,
mode=mode,
offset=offset,
symbol=sym,
direction=direction,
lots=lots,
price=mark,
order_type="market",
urgency="risk_flatten",
equity=equity,
slippage_buffer_pct=slip_buf,
)
submitted += 1
logger.info(
"日亏损强平已报单 %s %s %d手 @%s",
sym, direction, lots, mark,
)
except Exception as exc:
logger.warning("日亏损强平失败 %s: %s", sym, exc)
if submitted and notify_fn:
lim = daily_loss_force_close_pct(get_setting)
cap = daily_loss_total_cap_pct(get_setting)
msg = (
f"日亏损风控:已达权益 {lim:g}% 上限,已强制平仓 {submitted}"
f"(含滑点预留至 {cap:g}%)。当日禁止开仓。"
)
if reason:
msg = f"{reason} {msg}"
try:
notify_fn(msg)
except Exception as exc:
logger.debug("daily loss notify: %s", exc)
if submitted:
try:
conn.execute(
"UPDATE trade_order_monitors SET status='closed' WHERE status='active'"
)
conn.commit()
except Exception as exc:
logger.debug("close monitors after flatten: %s", exc)
return submitted
finally:
with _flatten_lock:
_flatten_in_progress = False
def check_daily_loss_and_flatten(
conn,
mode: str,
*,
equity: float,
notify_fn: Callable[[str], None] | None = None,
get_setting: Callable[[str, str], str] | None = None,
) -> int:
"""达日亏损上限则锁日并强平。返回强平报单笔数。"""
if not risk_control_enabled():
return 0
gs = get_setting or _default_get_setting
lim = daily_loss_force_close_pct(gs)
if equity <= 0:
return 0
used = daily_loss_used_pct(conn, equity, mode)
if used is None or used < lim:
return 0
amt, pct = daily_loss_amount(conn, equity, mode)
reason = f"当日亏损 {amt:.0f}元({pct:.2f}%/权益)"
return force_flatten_all_positions(
conn,
mode,
equity=equity,
reason=reason,
notify_fn=notify_fn,
get_setting=gs,
)
def start_daily_loss_guard_worker(
*,
db_path: str,
get_mode_fn: Callable[[], str],
get_capital_fn: Callable,
get_setting_fn: Callable[[str, str], str] | None = None,
init_tables_fn: Callable | None = None,
notify_fn: Callable[[str], None] | None = None,
interval: int = CHECK_INTERVAL_SEC,
) -> None:
from modules.core.db_conn import connect_db
def _loop() -> None:
time.sleep(25)
while True:
sleep_sec = max(1, interval)
try:
mode = get_mode_fn()
if not ctp_status(mode).get("connected"):
time.sleep(DISCONNECTED_SLEEP_SEC)
continue
if not is_trading_session():
sleep_sec = max(sleep_sec, CLOSED_MARKET_SLEEP_SEC)
conn = connect_db(db_path)
try:
if init_tables_fn:
init_tables_fn(conn)
equity = 0.0
try:
equity = float(get_capital_fn(conn) or 0)
except Exception:
equity = 0.0
if equity <= 0:
try:
from modules.ctp.vnpy_bridge import ctp_get_account
acc = ctp_get_account(mode) or {}
equity = float(acc.get("balance") or 0)
except Exception:
equity = 0.0
if equity > 0:
n = check_daily_loss_and_flatten(
conn,
mode,
equity=equity,
notify_fn=notify_fn,
get_setting=get_setting_fn,
)
if n:
logger.info("日亏损守护: 强制平仓 %d", n)
finally:
conn.close()
except Exception as exc:
logger.warning("daily_loss_guard worker: %s", exc)
time.sleep(sleep_sec)
threading.Thread(target=_loop, daemon=True, name="daily-loss-guard").start()