2081bf2da9
Co-authored-by: Cursor <cursoragent@cursor.com>
122 lines
3.5 KiB
Python
122 lines
3.5 KiB
Python
#!/usr/bin/env python3
|
|
"""日亏损风控单元自检(不加载 CTP 桥)。"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
from datetime import datetime
|
|
from zoneinfo import ZoneInfo
|
|
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
from modules.core.db_conn import connect_db
|
|
from modules.risk.account_risk_lib import (
|
|
STATUS_DAILY_LOSS,
|
|
_parse_open_time_ms,
|
|
daily_trading_risk_pct_limit,
|
|
ensure_account_risk_schema,
|
|
get_risk_status,
|
|
trading_day_start,
|
|
)
|
|
|
|
|
|
def _gs(key: str, default: str = "") -> str:
|
|
defaults = {
|
|
"daily_loss_force_close_pct": "2",
|
|
"daily_loss_slippage_buffer_pct": "1",
|
|
"trading_mode": "simulation",
|
|
}
|
|
return defaults.get(key, default)
|
|
|
|
|
|
def _closed_in_trading_day(close_time: str, now=None) -> bool:
|
|
from modules.risk.account_risk_lib import _parse_open_time_ms, trading_day_start
|
|
|
|
oms = _parse_open_time_ms((close_time or "").replace("T", " "))
|
|
if oms is None:
|
|
return False
|
|
return oms >= int(trading_day_start(now).timestamp() * 1000)
|
|
|
|
|
|
def daily_realized_loss_amount(conn, *, now=None) -> float:
|
|
total = 0.0
|
|
for r in conn.execute(
|
|
"SELECT pnl_net, close_time FROM trade_logs WHERE close_time IS NOT NULL"
|
|
).fetchall():
|
|
ct = r["close_time"] if isinstance(r, dict) else r[1]
|
|
pnl = float((r["pnl_net"] if isinstance(r, dict) else r[0]) or 0)
|
|
if not _closed_in_trading_day(ct, now):
|
|
continue
|
|
if pnl < 0:
|
|
total += -pnl
|
|
return round(total, 2)
|
|
|
|
|
|
def main() -> None:
|
|
now = datetime.now(ZoneInfo("Asia/Shanghai"))
|
|
close_ts = now.strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
fd, path = tempfile.mkstemp(suffix=".db")
|
|
os.close(fd)
|
|
conn = connect_db(path)
|
|
try:
|
|
conn.execute(
|
|
"""CREATE TABLE trade_logs (
|
|
id INTEGER PRIMARY KEY,
|
|
pnl_net REAL,
|
|
close_time TEXT
|
|
)"""
|
|
)
|
|
conn.execute(
|
|
"""CREATE TABLE trade_order_monitors (
|
|
id INTEGER PRIMARY KEY,
|
|
symbol TEXT, direction TEXT, open_time TEXT, status TEXT
|
|
)"""
|
|
)
|
|
conn.execute(
|
|
"""CREATE TABLE roll_groups (
|
|
id INTEGER PRIMARY KEY,
|
|
order_monitor_id INTEGER,
|
|
status TEXT
|
|
)"""
|
|
)
|
|
conn.execute(
|
|
"INSERT INTO trade_logs (pnl_net, close_time) VALUES (?, ?)",
|
|
(-2500.0, close_ts),
|
|
)
|
|
conn.commit()
|
|
ensure_account_risk_schema(conn)
|
|
|
|
lim = daily_trading_risk_pct_limit(_gs)
|
|
assert lim == 2.0, lim
|
|
|
|
loss = daily_realized_loss_amount(conn, now=now)
|
|
assert loss == 2500.0, loss
|
|
|
|
pct = round(loss / 100000.0 * 100, 2)
|
|
assert pct == 2.5, pct
|
|
|
|
# 模拟 get_risk_status 路径(无 CTP 浮亏)
|
|
import unittest.mock as mock
|
|
|
|
with mock.patch(
|
|
"modules.risk.account_risk_lib.daily_trading_risk_used_pct",
|
|
return_value=2.5,
|
|
):
|
|
risk = get_risk_status(
|
|
conn, equity=100000.0, mode="simulation", get_setting=_gs,
|
|
)
|
|
assert risk["can_trade"] is False, risk
|
|
assert risk["status"] == STATUS_DAILY_LOSS, risk
|
|
assert risk["status_label"] == "风控", risk
|
|
|
|
print("OK daily loss risk tests passed")
|
|
finally:
|
|
conn.close()
|
|
os.unlink(path)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|