"""账户冷静期 / 日冻结风控(四所实例共用)。""" from __future__ import annotations import os from datetime import datetime, timezone from typing import Any, Callable, Optional STATUS_NORMAL = "normal" STATUS_FREEZE_1H = "freeze_1h" STATUS_FREEZE_4H = "freeze_4h" STATUS_DAILY = "freeze_daily" STATUS_LABELS = { STATUS_NORMAL: "正常", STATUS_FREEZE_1H: "1h冻结", STATUS_FREEZE_4H: "4h冻结", STATUS_DAILY: "日冻结", } MOOD_ISSUE_OPTIONS = ( "怕踏空", "报复开仓", "盈利飘了", "拿不住单", "扛单", "重仓违规", ) # 仅以下来源计入「手动平仓」风控(用户主动点平仓/结束计划) CLOSE_SOURCE_USER_INSTANCE = "user_instance" CLOSE_SOURCE_USER_HUB = "user_hub" CLOSE_SOURCE_USER_TREND_STOP = "user_trend_stop" USER_INITIATED_CLOSE_SOURCES = frozenset( { CLOSE_SOURCE_USER_INSTANCE, CLOSE_SOURCE_USER_HUB, CLOSE_SOURCE_USER_TREND_STOP, } ) def _env_bool(key: str, default: bool = True) -> bool: raw = (os.getenv(key) or "").strip().lower() if not raw: return default return raw in ("1", "true", "yes", "on") def _env_hours(key: str, default: float) -> float: try: v = float(os.getenv(key, str(default))) except (TypeError, ValueError): v = default return max(0.0, v) def _app_tz(): from zoneinfo import ZoneInfo name = (os.getenv("APP_TIMEZONE") or os.getenv("TZ") or "Asia/Shanghai").strip() try: return ZoneInfo(name) except Exception: return ZoneInfo("Asia/Shanghai") def risk_control_enabled() -> bool: return _env_bool("RISK_CONTROL_ENABLED", True) def cooling_hours_manual() -> float: return _env_hours("RISK_COOLING_HOURS_MANUAL", 4.0) def cooling_hours_manual_journal() -> float: return _env_hours("RISK_COOLING_HOURS_MANUAL_JOURNAL", 1.0) def manual_close_daily_limit() -> int: try: return max(1, int(os.getenv("RISK_MANUAL_CLOSE_DAILY_LIMIT", "2"))) except (TypeError, ValueError): return 2 def mood_issues_daily_freeze_enabled() -> bool: return _env_bool("RISK_MOOD_ISSUES_DAILY_FREEZE", True) def ensure_account_risk_schema(conn) -> None: conn.execute( """CREATE TABLE IF NOT EXISTS account_risk_state ( id INTEGER PRIMARY KEY CHECK (id = 1), trading_day TEXT, manual_close_count INTEGER DEFAULT 0, cooloff_until_ms INTEGER, cooloff_hours INTEGER, daily_frozen INTEGER DEFAULT 0, pending_journal_trade_id INTEGER, last_close_at_ms INTEGER, updated_at TEXT )""" ) row = conn.execute("SELECT id FROM account_risk_state WHERE id=1").fetchone() if not row: conn.execute( "INSERT INTO account_risk_state (id, trading_day, manual_close_count, daily_frozen) VALUES (1, '', 0, 0)" ) def _row_get(row, key, default=None): if row is None: return default try: return row[key] except (KeyError, IndexError, TypeError): return default def _now_ms(now: Optional[datetime] = None) -> int: dt = now or datetime.now() if dt.tzinfo is None: dt = dt.replace(tzinfo=_app_tz()) return int(dt.timestamp() * 1000) def _normalize_epoch_ms(ms: int, ref_now_ms: Optional[int] = None) -> int: """修正旧版把北京时间 naive 当作 UTC 写入的 epoch 毫秒。""" tz = _app_tz() off = datetime.now(tz).utcoffset() if not off: return int(ms) offset_ms = int(off.total_seconds() * 1000) if offset_ms == 0: return int(ms) ref = int(ref_now_ms) if ref_now_ms is not None else _now_ms(datetime.now(tz)) corrected = int(ms) - offset_ms if abs(int(ms) - ref) <= abs(corrected - ref): return int(ms) return corrected def _cooloff_hours_value(row) -> float: return float(_row_get(row, "cooloff_hours") or cooling_hours_manual()) def _resolved_cooloff_until_ms(row, now_ms: int) -> Optional[int]: """取仍有效的冷静期结束时刻(多源时用最短未过期时间,避免旧 4h 覆盖复盘后的 1h)。""" raw_until = _cooloff_until_ms(row) last = _row_get(row, "last_close_at_ms") hours = _cooloff_hours_value(row) candidates: list[int] = [] if raw_until is not None: try: candidates.append(_normalize_epoch_ms(int(raw_until), now_ms)) except (TypeError, ValueError): pass if last is not None: try: last_i = _normalize_epoch_ms(int(last), now_ms) candidates.append(last_i + int(hours * 3600 * 1000)) except (TypeError, ValueError): pass active = [c for c in candidates if c > now_ms] if not active: return None return min(active) def _freeze_tier_from_remaining_ms(remaining_ms: int) -> str: journal_h = cooling_hours_manual_journal() rh = remaining_ms / 3600000.0 if rh <= journal_h + (5 / 60): return STATUS_FREEZE_1H return STATUS_FREEZE_4H def _ms_to_local_str(ms: Optional[int], fmt_local: Callable[[int], str]) -> Optional[str]: if ms is None: return None try: return fmt_local(int(ms)) except Exception: return None def _load_state(conn): ensure_account_risk_schema(conn) return conn.execute("SELECT * FROM account_risk_state WHERE id=1").fetchone() def _sync_trading_day(conn, trading_day: str, now: Optional[datetime] = None) -> Any: row = _load_state(conn) td = (trading_day or "").strip() stored = str(_row_get(row, "trading_day") or "").strip() if stored != td: conn.execute( """UPDATE account_risk_state SET trading_day=?, manual_close_count=0, daily_frozen=0, updated_at=? WHERE id=1""", (td, (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S")), ) row = _load_state(conn) return row def _set_cooloff( conn, *, trading_day: str, close_at_ms: int, hours: float, now: Optional[datetime] = None, ) -> None: _sync_trading_day(conn, trading_day, now=now) h = max(0.0, float(hours)) until_ms = int(close_at_ms + h * 3600 * 1000) conn.execute( """UPDATE account_risk_state SET cooloff_until_ms=?, cooloff_hours=?, last_close_at_ms=?, updated_at=? WHERE id=1""", ( until_ms, int(h) if h == int(h) else int(round(h)), int(close_at_ms), (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"), ), ) def _set_cooloff_until( conn, *, trading_day: str, until_ms: int, hours: float, now: Optional[datetime] = None, ) -> None: _sync_trading_day(conn, trading_day, now=now) h = max(0.0, float(hours)) conn.execute( """UPDATE account_risk_state SET cooloff_until_ms=?, cooloff_hours=?, updated_at=? WHERE id=1""", ( int(until_ms), int(h) if h == int(h) else int(round(h)), (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"), ), ) def _cooloff_until_ms(row) -> Optional[int]: raw = _row_get(row, "cooloff_until_ms") try: return int(raw) if raw is not None else None except (TypeError, ValueError): return None def _journal_can_reduce_cooloff(row, pending, now_ms: int) -> bool: if int(_row_get(row, "daily_frozen") or 0) == 1: return False until_ms = _cooloff_until_ms(row) if until_ms is None or until_ms <= now_ms: return False journal_h = cooling_hours_manual_journal() cooloff_h = float(_row_get(row, "cooloff_hours") or cooling_hours_manual()) if cooloff_h <= journal_h + 1e-6: return False if pending is not None: try: if int(pending) != 0: return True except (TypeError, ValueError): return True return True def _journal_cooloff_until_ms(row, now_ms: int, journal_hours: float) -> int: journal_ms = int(max(0.0, float(journal_hours)) * 3600 * 1000) last_close_ms = _row_get(row, "last_close_at_ms") if last_close_ms: try: base_ms = _normalize_epoch_ms(int(last_close_ms), now_ms) except (TypeError, ValueError): base_ms = now_ms else: base_ms = now_ms until_from_close = base_ms + journal_ms until_ms = until_from_close if until_from_close > now_ms else now_ms + journal_ms current_until = _resolved_cooloff_until_ms(row, now_ms) if current_until is not None and until_ms > current_until: until_ms = current_until return until_ms def _set_daily_frozen(conn, *, trading_day: str, now: Optional[datetime] = None) -> None: _sync_trading_day(conn, trading_day, now=now) conn.execute( """UPDATE account_risk_state SET daily_frozen=1, updated_at=? WHERE id=1""", ((now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),), ) def parse_mood_issues(raw: Any) -> list[str]: if raw is None: return [] if isinstance(raw, (list, tuple)): parts = [str(x).strip() for x in raw if str(x).strip()] else: parts = [x.strip() for x in str(raw).split(",") if x.strip()] return [p for p in parts if p in MOOD_ISSUE_OPTIONS] def _record_one_user_initiated_close( conn, *, source: str, trade_record_id: Optional[int], closed_at_ms: Optional[int], trading_day: str, now: Optional[datetime] = None, ) -> None: row = _sync_trading_day(conn, trading_day, now=now) count = int(_row_get(row, "manual_close_count") or 0) + 1 close_ms = int(closed_at_ms) if closed_at_ms else _now_ms(now) pending = int(trade_record_id) if trade_record_id else None conn.execute( """UPDATE account_risk_state SET manual_close_count=?, pending_journal_trade_id=?, updated_at=? WHERE id=1""", (count, pending, (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S")), ) if count >= manual_close_daily_limit(): _set_daily_frozen(conn, trading_day=trading_day, now=now) return _set_cooloff( conn, trading_day=trading_day, close_at_ms=close_ms, hours=cooling_hours_manual(), now=now, ) def on_user_initiated_close( conn, *, source: str, trade_record_id: Optional[int] = None, closed_at_ms: Optional[int] = None, trading_day: str, now: Optional[datetime] = None, count: int = 1, ) -> None: """用户主动平仓/结束趋势计划:计入手动平仓次数与冷静期。""" if not risk_control_enabled(): return src = (source or "").strip() if src not in USER_INITIATED_CLOSE_SOURCES: return n = max(1, int(count or 1)) for i in range(n): _record_one_user_initiated_close( conn, source=src, trade_record_id=trade_record_id if i == 0 else None, closed_at_ms=closed_at_ms, trading_day=trading_day, now=now, ) row = _load_state(conn) if int(_row_get(row, "daily_frozen") or 0) == 1: break def on_manual_close( conn, *, trade_record_id: int, closed_at_ms: Optional[int], trading_day: str, now: Optional[datetime] = None, ) -> None: """兼容旧调用:等同实例页用户平仓。""" on_user_initiated_close( conn, source=CLOSE_SOURCE_USER_INSTANCE, trade_record_id=trade_record_id, closed_at_ms=closed_at_ms, trading_day=trading_day, now=now, count=1, ) def on_journal_saved( conn, *, early_exit_trigger: str, early_exit_note: str, mood_issues_raw: Any, trading_day: str, now: Optional[datetime] = None, ) -> None: if not risk_control_enabled(): return row = _sync_trading_day(conn, trading_day, now=now) mood_list = parse_mood_issues(mood_issues_raw) if mood_issues_daily_freeze_enabled() and mood_list: _set_daily_frozen(conn, trading_day=trading_day, now=now) conn.execute( "UPDATE account_risk_state SET pending_journal_trade_id=NULL, updated_at=? WHERE id=1", ((now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),), ) return pending = _row_get(row, "pending_journal_trade_id") trigger = (early_exit_trigger or "").strip() note = (early_exit_note or "").strip() now_ms = _now_ms(now) if ( trigger == "手动平仓" and note and int(_row_get(row, "daily_frozen") or 0) != 1 and _journal_can_reduce_cooloff(row, pending, now_ms) ): journal_h = cooling_hours_manual_journal() until_ms = _journal_cooloff_until_ms(row, now_ms, journal_h) _set_cooloff_until( conn, trading_day=trading_day, until_ms=until_ms, hours=journal_h, now=now, ) conn.execute( "UPDATE account_risk_state SET pending_journal_trade_id=NULL, updated_at=? WHERE id=1", ((now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),), ) def apply_manual_close_journal_cooloff( conn, *, early_exit_note: str, trading_day: str, now: Optional[datetime] = None, ) -> None: """核对修改或复盘:手动平仓 + 说明后尝试将 4h 冷静期降为 1h。""" note = (early_exit_note or "").strip() if not note: return on_journal_saved( conn, early_exit_trigger="手动平仓", early_exit_note=note, mood_issues_raw="", trading_day=trading_day, now=now, ) def _next_trading_day_reset_ms(now: datetime, reset_hour: int) -> int: from datetime import timedelta h = max(0, min(23, int(reset_hour))) candidate = now.replace(hour=h, minute=0, second=0, microsecond=0) if now >= candidate: candidate = candidate + timedelta(days=1) return _now_ms(candidate) def enrich_risk_status_countdown( st: dict[str, Any], *, now: Optional[datetime] = None, daily_reset_hour: int = 8, ) -> dict[str, Any]: """补充 freeze_until_ms / freeze_remaining_sec,供前端倒计时展示。""" if not st.get("enabled", True): return st dt = now or datetime.now() now_ms = _now_ms(dt) until_ms: Optional[int] = None if st.get("daily_frozen"): until_ms = _next_trading_day_reset_ms(dt, daily_reset_hour) elif st.get("cooloff_until_ms"): try: until_ms = int(st["cooloff_until_ms"]) except (TypeError, ValueError): until_ms = None if until_ms is not None and until_ms > now_ms: st["freeze_until_ms"] = until_ms st["freeze_remaining_sec"] = max(0, (until_ms - now_ms) // 1000) else: st["freeze_until_ms"] = None st["freeze_remaining_sec"] = 0 return st def compute_account_risk_status( conn, *, trading_day: str, now: Optional[datetime] = None, fmt_local_ms: Optional[Callable[[int], str]] = None, ) -> dict[str, Any]: if not risk_control_enabled(): return { "enabled": False, "status": STATUS_NORMAL, "status_label": STATUS_LABELS[STATUS_NORMAL], "can_trade": True, "reason": "", "cooloff_until_ms": None, "cooloff_until": None, "manual_close_count": 0, "daily_frozen": False, } row = _sync_trading_day(conn, trading_day, now=now) now_ms = _now_ms(now) daily_frozen = int(_row_get(row, "daily_frozen") or 0) == 1 cooloff_until_ms = _resolved_cooloff_until_ms(row, now_ms) manual_close_count = int(_row_get(row, "manual_close_count") or 0) status = STATUS_NORMAL reason = "" if daily_frozen: status = STATUS_DAILY reason = f"账户今日已冻结(手动平仓 {manual_close_count} 次或复盘情绪标签)" elif cooloff_until_ms is not None: remaining_ms = cooloff_until_ms - now_ms status = _freeze_tier_from_remaining_ms(remaining_ms) until_str = _ms_to_local_str(cooloff_until_ms, fmt_local_ms) if fmt_local_ms else None label = STATUS_LABELS[status] reason = f"账户{label}中" if until_str: reason += f",至 {until_str}" can_trade = status == STATUS_NORMAL freeze_remaining_sec = ( max(0, (cooloff_until_ms - now_ms) // 1000) if cooloff_until_ms is not None else 0 ) return { "enabled": True, "status": status, "status_label": STATUS_LABELS[status], "can_trade": can_trade, "reason": reason, "cooloff_until_ms": cooloff_until_ms, "cooloff_until": _ms_to_local_str(cooloff_until_ms, fmt_local_ms) if fmt_local_ms and cooloff_until_ms else None, "manual_close_count": manual_close_count, "daily_frozen": daily_frozen, "pending_journal_trade_id": _row_get(row, "pending_journal_trade_id"), "freeze_remaining_sec": freeze_remaining_sec if not can_trade else 0, } def account_risk_blocks_trading( conn, *, trading_day: str, now: Optional[datetime] = None, fmt_local_ms: Optional[Callable[[int], str]] = None, ) -> tuple[bool, str]: """返回 (允许交易, 拒绝原因)。""" st = compute_account_risk_status( conn, trading_day=trading_day, now=now, fmt_local_ms=fmt_local_ms ) if st.get("can_trade"): return True, "" return False, str(st.get("reason") or STATUS_LABELS.get(st.get("status"), "账户冻结")) def insert_trade_record_id(conn) -> int: row = conn.execute("SELECT last_insert_rowid()").fetchone() return int(row[0] if row else 0)