Files
crypto_monitor/account_risk_lib.py
T
dekun f0a158686e fix(risk): allow journal to reduce 4h cooloff to 1h without pending trade id
Hub closes and late journal saves now shorten active manual cooloffs when exit trigger and note are filled in.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-18 16:27:21 +08:00

467 lines
14 KiB
Python

"""账户冷静期 / 日冻结风控(四所实例共用)。"""
from __future__ import annotations
import os
from datetime import datetime, timezone
from typing import Any, Callable, Optional
STATUS_NORMAL = "normal"
STATUS_FREEZE_1H = "freeze_1h"
STATUS_FREEZE_4H = "freeze_4h"
STATUS_DAILY = "freeze_daily"
STATUS_LABELS = {
STATUS_NORMAL: "正常",
STATUS_FREEZE_1H: "1h冻结",
STATUS_FREEZE_4H: "4h冻结",
STATUS_DAILY: "日冻结",
}
MOOD_ISSUE_OPTIONS = (
"怕踏空",
"报复开仓",
"盈利飘了",
"拿不住单",
"扛单",
"重仓违规",
)
# 仅以下来源计入「手动平仓」风控(用户主动点平仓/结束计划)
CLOSE_SOURCE_USER_INSTANCE = "user_instance"
CLOSE_SOURCE_USER_HUB = "user_hub"
CLOSE_SOURCE_USER_TREND_STOP = "user_trend_stop"
USER_INITIATED_CLOSE_SOURCES = frozenset(
{
CLOSE_SOURCE_USER_INSTANCE,
CLOSE_SOURCE_USER_HUB,
CLOSE_SOURCE_USER_TREND_STOP,
}
)
def _env_bool(key: str, default: bool = True) -> bool:
raw = (os.getenv(key) or "").strip().lower()
if not raw:
return default
return raw in ("1", "true", "yes", "on")
def _env_hours(key: str, default: float) -> float:
try:
v = float(os.getenv(key, str(default)))
except (TypeError, ValueError):
v = default
return max(0.0, v)
def risk_control_enabled() -> bool:
return _env_bool("RISK_CONTROL_ENABLED", True)
def cooling_hours_manual() -> float:
return _env_hours("RISK_COOLING_HOURS_MANUAL", 4.0)
def cooling_hours_manual_journal() -> float:
return _env_hours("RISK_COOLING_HOURS_MANUAL_JOURNAL", 1.0)
def manual_close_daily_limit() -> int:
try:
return max(1, int(os.getenv("RISK_MANUAL_CLOSE_DAILY_LIMIT", "2")))
except (TypeError, ValueError):
return 2
def mood_issues_daily_freeze_enabled() -> bool:
return _env_bool("RISK_MOOD_ISSUES_DAILY_FREEZE", True)
def ensure_account_risk_schema(conn) -> None:
conn.execute(
"""CREATE TABLE IF NOT EXISTS account_risk_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
trading_day TEXT,
manual_close_count INTEGER DEFAULT 0,
cooloff_until_ms INTEGER,
cooloff_hours INTEGER,
daily_frozen INTEGER DEFAULT 0,
pending_journal_trade_id INTEGER,
last_close_at_ms INTEGER,
updated_at TEXT
)"""
)
row = conn.execute("SELECT id FROM account_risk_state WHERE id=1").fetchone()
if not row:
conn.execute(
"INSERT INTO account_risk_state (id, trading_day, manual_close_count, daily_frozen) VALUES (1, '', 0, 0)"
)
def _row_get(row, key, default=None):
if row is None:
return default
try:
return row[key]
except (KeyError, IndexError, TypeError):
return default
def _now_ms(now: Optional[datetime] = None) -> int:
dt = now or datetime.now()
if dt.tzinfo is None:
return int(dt.replace(tzinfo=timezone.utc).timestamp() * 1000)
return int(dt.timestamp() * 1000)
def _ms_to_local_str(ms: Optional[int], fmt_local: Callable[[int], str]) -> Optional[str]:
if ms is None:
return None
try:
return fmt_local(int(ms))
except Exception:
return None
def _load_state(conn):
ensure_account_risk_schema(conn)
return conn.execute("SELECT * FROM account_risk_state WHERE id=1").fetchone()
def _sync_trading_day(conn, trading_day: str, now: Optional[datetime] = None) -> Any:
row = _load_state(conn)
td = (trading_day or "").strip()
stored = str(_row_get(row, "trading_day") or "").strip()
if stored != td:
conn.execute(
"""UPDATE account_risk_state SET
trading_day=?,
manual_close_count=0,
daily_frozen=0,
updated_at=?
WHERE id=1""",
(td, (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S")),
)
row = _load_state(conn)
return row
def _set_cooloff(
conn,
*,
trading_day: str,
close_at_ms: int,
hours: float,
now: Optional[datetime] = None,
) -> None:
_sync_trading_day(conn, trading_day, now=now)
h = max(0.0, float(hours))
until_ms = int(close_at_ms + h * 3600 * 1000)
conn.execute(
"""UPDATE account_risk_state SET
cooloff_until_ms=?,
cooloff_hours=?,
last_close_at_ms=?,
updated_at=?
WHERE id=1""",
(
until_ms,
int(h) if h == int(h) else int(round(h)),
int(close_at_ms),
(now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),
),
)
def _set_cooloff_until(
conn,
*,
trading_day: str,
until_ms: int,
hours: float,
now: Optional[datetime] = None,
) -> None:
_sync_trading_day(conn, trading_day, now=now)
h = max(0.0, float(hours))
conn.execute(
"""UPDATE account_risk_state SET
cooloff_until_ms=?,
cooloff_hours=?,
updated_at=?
WHERE id=1""",
(
int(until_ms),
int(h) if h == int(h) else int(round(h)),
(now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),
),
)
def _cooloff_until_ms(row) -> Optional[int]:
raw = _row_get(row, "cooloff_until_ms")
try:
return int(raw) if raw is not None else None
except (TypeError, ValueError):
return None
def _in_active_manual_cooloff(row, now_ms: int) -> bool:
if int(_row_get(row, "daily_frozen") or 0) == 1:
return False
if int(_row_get(row, "manual_close_count") or 0) < 1:
return False
until_ms = _cooloff_until_ms(row)
return until_ms is not None and until_ms > now_ms
def _journal_can_reduce_cooloff(row, pending, now_ms: int) -> bool:
if pending is not None:
try:
if int(pending) != 0:
return True
except (TypeError, ValueError):
return True
return _in_active_manual_cooloff(row, now_ms)
def _journal_cooloff_until_ms(row, now_ms: int, journal_hours: float) -> int:
journal_ms = int(max(0.0, float(journal_hours)) * 3600 * 1000)
last_close_ms = _row_get(row, "last_close_at_ms")
base_ms = int(last_close_ms) if last_close_ms else now_ms
until_from_close = base_ms + journal_ms
until_ms = until_from_close if until_from_close > now_ms else now_ms + journal_ms
current_until = _cooloff_until_ms(row)
if current_until is not None and current_until > now_ms and until_ms > current_until:
until_ms = current_until
return until_ms
def _set_daily_frozen(conn, *, trading_day: str, now: Optional[datetime] = None) -> None:
_sync_trading_day(conn, trading_day, now=now)
conn.execute(
"""UPDATE account_risk_state SET daily_frozen=1, updated_at=? WHERE id=1""",
((now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),),
)
def parse_mood_issues(raw: Any) -> list[str]:
if raw is None:
return []
if isinstance(raw, (list, tuple)):
parts = [str(x).strip() for x in raw if str(x).strip()]
else:
parts = [x.strip() for x in str(raw).split(",") if x.strip()]
return [p for p in parts if p in MOOD_ISSUE_OPTIONS]
def _record_one_user_initiated_close(
conn,
*,
source: str,
trade_record_id: Optional[int],
closed_at_ms: Optional[int],
trading_day: str,
now: Optional[datetime] = None,
) -> None:
row = _sync_trading_day(conn, trading_day, now=now)
count = int(_row_get(row, "manual_close_count") or 0) + 1
close_ms = int(closed_at_ms) if closed_at_ms else _now_ms(now)
pending = int(trade_record_id) if trade_record_id else None
conn.execute(
"""UPDATE account_risk_state SET
manual_close_count=?,
pending_journal_trade_id=?,
updated_at=?
WHERE id=1""",
(count, pending, (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S")),
)
if count >= manual_close_daily_limit():
_set_daily_frozen(conn, trading_day=trading_day, now=now)
return
_set_cooloff(
conn,
trading_day=trading_day,
close_at_ms=close_ms,
hours=cooling_hours_manual(),
now=now,
)
def on_user_initiated_close(
conn,
*,
source: str,
trade_record_id: Optional[int] = None,
closed_at_ms: Optional[int] = None,
trading_day: str,
now: Optional[datetime] = None,
count: int = 1,
) -> None:
"""用户主动平仓/结束趋势计划:计入手动平仓次数与冷静期。"""
if not risk_control_enabled():
return
src = (source or "").strip()
if src not in USER_INITIATED_CLOSE_SOURCES:
return
n = max(1, int(count or 1))
for i in range(n):
_record_one_user_initiated_close(
conn,
source=src,
trade_record_id=trade_record_id if i == 0 else None,
closed_at_ms=closed_at_ms,
trading_day=trading_day,
now=now,
)
row = _load_state(conn)
if int(_row_get(row, "daily_frozen") or 0) == 1:
break
def on_manual_close(
conn,
*,
trade_record_id: int,
closed_at_ms: Optional[int],
trading_day: str,
now: Optional[datetime] = None,
) -> None:
"""兼容旧调用:等同实例页用户平仓。"""
on_user_initiated_close(
conn,
source=CLOSE_SOURCE_USER_INSTANCE,
trade_record_id=trade_record_id,
closed_at_ms=closed_at_ms,
trading_day=trading_day,
now=now,
count=1,
)
def on_journal_saved(
conn,
*,
early_exit_trigger: str,
early_exit_note: str,
mood_issues_raw: Any,
trading_day: str,
now: Optional[datetime] = None,
) -> None:
if not risk_control_enabled():
return
row = _sync_trading_day(conn, trading_day, now=now)
mood_list = parse_mood_issues(mood_issues_raw)
if mood_issues_daily_freeze_enabled() and mood_list:
_set_daily_frozen(conn, trading_day=trading_day, now=now)
conn.execute(
"UPDATE account_risk_state SET pending_journal_trade_id=NULL, updated_at=? WHERE id=1",
((now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),),
)
return
pending = _row_get(row, "pending_journal_trade_id")
trigger = (early_exit_trigger or "").strip()
note = (early_exit_note or "").strip()
now_ms = _now_ms(now)
if (
trigger == "手动平仓"
and note
and int(_row_get(row, "daily_frozen") or 0) != 1
and _journal_can_reduce_cooloff(row, pending, now_ms)
):
journal_h = cooling_hours_manual_journal()
until_ms = _journal_cooloff_until_ms(row, now_ms, journal_h)
_set_cooloff_until(
conn,
trading_day=trading_day,
until_ms=until_ms,
hours=journal_h,
now=now,
)
conn.execute(
"UPDATE account_risk_state SET pending_journal_trade_id=NULL, updated_at=? WHERE id=1",
((now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),),
)
def compute_account_risk_status(
conn,
*,
trading_day: str,
now: Optional[datetime] = None,
fmt_local_ms: Optional[Callable[[int], str]] = None,
) -> dict[str, Any]:
if not risk_control_enabled():
return {
"enabled": False,
"status": STATUS_NORMAL,
"status_label": STATUS_LABELS[STATUS_NORMAL],
"can_trade": True,
"reason": "",
"cooloff_until_ms": None,
"cooloff_until": None,
"manual_close_count": 0,
"daily_frozen": False,
}
row = _sync_trading_day(conn, trading_day, now=now)
now_ms = _now_ms(now)
daily_frozen = int(_row_get(row, "daily_frozen") or 0) == 1
cooloff_until_ms = _row_get(row, "cooloff_until_ms")
try:
cooloff_until_ms = int(cooloff_until_ms) if cooloff_until_ms is not None else None
except (TypeError, ValueError):
cooloff_until_ms = None
cooloff_hours = _row_get(row, "cooloff_hours")
manual_close_count = int(_row_get(row, "manual_close_count") or 0)
status = STATUS_NORMAL
reason = ""
if daily_frozen:
status = STATUS_DAILY
reason = f"账户今日已冻结(手动平仓 {manual_close_count} 次或复盘情绪标签)"
elif cooloff_until_ms is not None and cooloff_until_ms > now_ms:
h = int(cooloff_hours or cooling_hours_manual())
status = STATUS_FREEZE_1H if h <= 1 else STATUS_FREEZE_4H
until_str = _ms_to_local_str(cooloff_until_ms, fmt_local_ms) if fmt_local_ms else None
label = STATUS_LABELS[status]
reason = f"账户{label}"
if until_str:
reason += f",至 {until_str}"
can_trade = status == STATUS_NORMAL
return {
"enabled": True,
"status": status,
"status_label": STATUS_LABELS[status],
"can_trade": can_trade,
"reason": reason,
"cooloff_until_ms": cooloff_until_ms if cooloff_until_ms and cooloff_until_ms > now_ms else None,
"cooloff_until": _ms_to_local_str(cooloff_until_ms, fmt_local_ms)
if fmt_local_ms and cooloff_until_ms and cooloff_until_ms > now_ms
else None,
"manual_close_count": manual_close_count,
"daily_frozen": daily_frozen,
"pending_journal_trade_id": _row_get(row, "pending_journal_trade_id"),
}
def account_risk_blocks_trading(
conn,
*,
trading_day: str,
now: Optional[datetime] = None,
fmt_local_ms: Optional[Callable[[int], str]] = None,
) -> tuple[bool, str]:
"""返回 (允许交易, 拒绝原因)。"""
st = compute_account_risk_status(
conn, trading_day=trading_day, now=now, fmt_local_ms=fmt_local_ms
)
if st.get("can_trade"):
return True, ""
return False, str(st.get("reason") or STATUS_LABELS.get(st.get("status"), "账户冻结"))
def insert_trade_record_id(conn) -> int:
row = conn.execute("SELECT last_insert_rowid()").fetchone()
return int(row[0] if row else 0)