"""账户冷静期 / 日冻结风控(四所实例共用)。""" from __future__ import annotations import os from datetime import datetime, timezone from typing import Any, Callable, Optional STATUS_NORMAL = "normal" STATUS_FREEZE_1H = "freeze_1h" STATUS_FREEZE_4H = "freeze_4h" STATUS_DAILY = "freeze_daily" STATUS_FREEZE_POSITION = "freeze_position" STATUS_LABELS = { STATUS_NORMAL: "正常", STATUS_FREEZE_1H: "1h冻结", STATUS_FREEZE_4H: "4h冻结", STATUS_DAILY: "日冻结", STATUS_FREEZE_POSITION: "仓位上限冻结", } MOOD_ISSUE_OPTIONS = ( "怕踏空", "报复开仓", "盈利飘了", "拿不住单", "扛单", "重仓违规", ) # 仅以下来源计入「手动平仓」风控(用户主动点平仓/结束计划) CLOSE_SOURCE_USER_INSTANCE = "user_instance" CLOSE_SOURCE_USER_HUB = "user_hub" CLOSE_SOURCE_USER_TREND_STOP = "user_trend_stop" USER_INITIATED_CLOSE_SOURCES = frozenset( { CLOSE_SOURCE_USER_INSTANCE, CLOSE_SOURCE_USER_HUB, CLOSE_SOURCE_USER_TREND_STOP, } ) def _env_bool(key: str, default: bool = True) -> bool: raw = (os.getenv(key) or "").strip().lower() if not raw: return default return raw in ("1", "true", "yes", "on") def _env_hours(key: str, default: float) -> float: try: v = float(os.getenv(key, str(default))) except (TypeError, ValueError): v = default return max(0.0, v) def _app_tz(): from zoneinfo import ZoneInfo name = (os.getenv("APP_TIMEZONE") or os.getenv("TZ") or "Asia/Shanghai").strip() try: return ZoneInfo(name) except Exception: return ZoneInfo("Asia/Shanghai") def risk_control_enabled() -> bool: return _env_bool("RISK_CONTROL_ENABLED", True) def cooling_hours_manual() -> float: return _env_hours("RISK_COOLING_HOURS_MANUAL", 4.0) def cooling_hours_manual_journal() -> float: return _env_hours("RISK_COOLING_HOURS_MANUAL_JOURNAL", 1.0) def manual_close_daily_limit() -> int: try: return max(1, int(os.getenv("RISK_MANUAL_CLOSE_DAILY_LIMIT", "2"))) except (TypeError, ValueError): return 2 def max_active_positions_from_env(default: int = 1) -> int: try: return max(1, int(os.getenv("MAX_ACTIVE_POSITIONS", str(default)))) except (TypeError, ValueError): return max(1, default) def position_limit_reached( conn, *, max_active_positions: Optional[int] = None, ) -> tuple[bool, int, int]: """(已达上限, 计入上限的活跃数, 上限值)。""" from strategy_trade_labels import count_position_limit_active_monitors mx = max(1, int(max_active_positions if max_active_positions is not None else max_active_positions_from_env())) ac = count_position_limit_active_monitors(conn) return ac >= mx, ac, mx def mood_issues_daily_freeze_enabled() -> bool: return _env_bool("RISK_MOOD_ISSUES_DAILY_FREEZE", True) def ensure_account_risk_schema(conn) -> None: conn.execute( """CREATE TABLE IF NOT EXISTS account_risk_state ( id INTEGER PRIMARY KEY CHECK (id = 1), trading_day TEXT, manual_close_count INTEGER DEFAULT 0, cooloff_until_ms INTEGER, cooloff_hours INTEGER, daily_frozen INTEGER DEFAULT 0, pending_journal_trade_id INTEGER, last_close_at_ms INTEGER, updated_at TEXT )""" ) row = conn.execute("SELECT id FROM account_risk_state WHERE id=1").fetchone() if not row: conn.execute( "INSERT INTO account_risk_state (id, trading_day, manual_close_count, daily_frozen) VALUES (1, '', 0, 0)" ) def _row_get(row, key, default=None): if row is None: return default try: return row[key] except (KeyError, IndexError, TypeError): return default def _now_ms(now: Optional[datetime] = None) -> int: dt = now or datetime.now() if dt.tzinfo is None: dt = dt.replace(tzinfo=_app_tz()) return int(dt.timestamp() * 1000) def _normalize_epoch_ms(ms: int, ref_now_ms: Optional[int] = None) -> int: """修正旧版把北京时间 naive 当作 UTC 写入的 epoch 毫秒。""" tz = _app_tz() off = datetime.now(tz).utcoffset() if not off: return int(ms) offset_ms = int(off.total_seconds() * 1000) if offset_ms == 0: return int(ms) ref = int(ref_now_ms) if ref_now_ms is not None else _now_ms(datetime.now(tz)) corrected = int(ms) - offset_ms if abs(int(ms) - ref) <= abs(corrected - ref): return int(ms) return corrected def _sanitize_last_close_ms(last_ms: int, now_ms: int) -> Optional[int]: """平仓时刻须不晚于当前(允许 1 分钟时钟偏差);显著未来视为无效锚点。""" slack_ms = 60 * 1000 if last_ms > now_ms + slack_ms: return None return last_ms def _cooloff_duration_ms(hours: float) -> int: return int(max(0.0, float(hours)) * 3600 * 1000) def _cooloff_hours_value(row) -> float: return float(_row_get(row, "cooloff_hours") or cooling_hours_manual()) def _resolved_cooloff_until_ms(row, now_ms: int) -> Optional[int]: """冷静期结束 = last_close + cooloff_hours;无效/已过期锚点不再重启计时。""" hours = _cooloff_hours_value(row) journal_h = cooling_hours_manual_journal() duration_ms = _cooloff_duration_ms(hours) last_raw = _row_get(row, "last_close_at_ms") stored_raw = _cooloff_until_ms(row) if last_raw is not None: try: last_ms = _sanitize_last_close_ms( _normalize_epoch_ms(int(last_raw), now_ms), now_ms ) except (TypeError, ValueError): last_ms = None if last_ms is not None: end_ms = last_ms + duration_ms if end_ms > now_ms: return end_ms if hours <= journal_h + 1e-6: return None if stored_raw is None: return None stored_ms = _normalize_epoch_ms(int(stored_raw), now_ms) return stored_ms if stored_ms > now_ms else None def _clear_inactive_cooloff( conn, *, now: Optional[datetime] = None, ) -> None: """冷静期已结束或锚点无效时清库,避免重启后误读旧冻结。""" conn.execute( """UPDATE account_risk_state SET cooloff_until_ms=NULL, cooloff_hours=NULL, last_close_at_ms=NULL, updated_at=? WHERE id=1""", ((now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),), ) def _freeze_tier_from_remaining_ms(remaining_ms: int, hours: float) -> str: journal_h = cooling_hours_manual_journal() rh = remaining_ms / 3600000.0 if rh <= journal_h + (5 / 60): return STATUS_FREEZE_1H return STATUS_FREEZE_4H def _freeze_status_label(hours: float, status: str) -> str: if status == STATUS_FREEZE_1H: return STATUS_LABELS[STATUS_FREEZE_1H] if status == STATUS_FREEZE_4H: h = int(hours) if float(hours) == int(hours) else round(float(hours), 1) if abs(float(hours) - 4.0) < 1e-6: return STATUS_LABELS[STATUS_FREEZE_4H] return f"{h}h冻结" return STATUS_LABELS.get(status, STATUS_LABELS[STATUS_NORMAL]) def _ms_to_local_str(ms: Optional[int], fmt_local: Callable[[int], str]) -> Optional[str]: if ms is None: return None try: return fmt_local(int(ms)) except Exception: return None def _load_state(conn): ensure_account_risk_schema(conn) return conn.execute("SELECT * FROM account_risk_state WHERE id=1").fetchone() def _sync_trading_day(conn, trading_day: str, now: Optional[datetime] = None) -> Any: row = _load_state(conn) td = (trading_day or "").strip() stored = str(_row_get(row, "trading_day") or "").strip() if stored != td: now_ms = _now_ms(now) cooloff_active = _resolved_cooloff_until_ms(row, now_ms) conn.execute( """UPDATE account_risk_state SET trading_day=?, manual_close_count=0, daily_frozen=0, cooloff_until_ms=?, cooloff_hours=?, last_close_at_ms=?, pending_journal_trade_id=NULL, updated_at=? WHERE id=1""", ( td, cooloff_active, _row_get(row, "cooloff_hours") if cooloff_active else None, _row_get(row, "last_close_at_ms") if cooloff_active else None, (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"), ), ) row = _load_state(conn) return row def _set_cooloff( conn, *, trading_day: str, close_at_ms: int, hours: float, now: Optional[datetime] = None, ) -> None: _sync_trading_day(conn, trading_day, now=now) h = max(0.0, float(hours)) until_ms = int(close_at_ms + h * 3600 * 1000) conn.execute( """UPDATE account_risk_state SET cooloff_until_ms=?, cooloff_hours=?, last_close_at_ms=?, updated_at=? WHERE id=1""", ( until_ms, int(h) if h == int(h) else int(round(h)), int(close_at_ms), (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"), ), ) def _set_cooloff_until( conn, *, trading_day: str, until_ms: int, hours: float, now: Optional[datetime] = None, ) -> None: _sync_trading_day(conn, trading_day, now=now) h = max(0.0, float(hours)) conn.execute( """UPDATE account_risk_state SET cooloff_until_ms=?, cooloff_hours=?, updated_at=? WHERE id=1""", ( int(until_ms), int(h) if h == int(h) else int(round(h)), (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"), ), ) def _ms_trading_day_label(ms: int) -> str: dt = datetime.fromtimestamp(ms / 1000, tz=_app_tz()) return dt.strftime("%Y-%m-%d") def _parse_journal_close_ms(raw: Any) -> Optional[int]: if raw is None: return None s = str(raw).strip() if not s: return None for fmt in ("%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%Y-%m-%d %H:%M"): try: dt = datetime.strptime(s[:19] if len(s) > 16 else s, fmt) return _now_ms(dt) except ValueError: continue return None def _latest_journaled_manual_close_ms(conn, trading_day: str) -> Optional[int]: """当日最近一条已复盘的手动平仓时刻(journal 有说明)。""" try: rows = conn.execute( """SELECT close_datetime FROM journal_entries WHERE early_exit_trigger='手动平仓' AND early_exit_note IS NOT NULL AND TRIM(early_exit_note) <> '' ORDER BY close_datetime DESC""" ).fetchall() except Exception: return None td = (trading_day or "").strip() best: Optional[int] = None for row in rows: ms = _parse_journal_close_ms(_row_get(row, "close_datetime")) if ms is None: continue if td and _ms_trading_day_label(ms) != td: continue if best is None or ms > best: best = ms return best def _journaled_manual_cooloff_expired( conn, *, trading_day: str, now_ms: int, pending: Any ) -> bool: """当日手动平仓已复盘且 1h 冷静期结束,且无待复盘的新平仓。""" if pending is not None: try: if int(pending) != 0: return False except (TypeError, ValueError): return False close_ms = _latest_journaled_manual_close_ms(conn, trading_day) if close_ms is None: return False journal_ms = _cooloff_duration_ms(cooling_hours_manual_journal()) return close_ms + journal_ms <= now_ms def _cooloff_until_ms(row) -> Optional[int]: raw = _row_get(row, "cooloff_until_ms") try: return int(raw) if raw is not None else None except (TypeError, ValueError): return None def _repair_stale_cooloff_row( conn, row, *, now_ms: int, resolved_until_ms: Optional[int], now: Optional[datetime] = None, ) -> None: """脏数据读时写回:过期/无效则清库,否则对齐 until / last_close。""" last_raw = _row_get(row, "last_close_at_ms") stored_raw = _cooloff_until_ms(row) if last_raw is None and stored_raw is None: return if resolved_until_ms is None: if last_raw is not None or stored_raw is not None: _clear_inactive_cooloff(conn, now=now) return dirty = False new_last: Optional[int] = None if last_raw is not None: try: norm = _normalize_epoch_ms(int(last_raw), now_ms) sanitized = _sanitize_last_close_ms(norm, now_ms) if sanitized is None: dirty = True else: new_last = sanitized if sanitized != int(last_raw): dirty = True except (TypeError, ValueError): dirty = True if stored_raw is not None: stored_norm = _normalize_epoch_ms(int(stored_raw), now_ms) if abs(stored_norm - int(resolved_until_ms)) > 60 * 1000: dirty = True if not dirty: return conn.execute( """UPDATE account_risk_state SET cooloff_until_ms=?, cooloff_hours=?, last_close_at_ms=?, updated_at=? WHERE id=1""", ( resolved_until_ms, _row_get(row, "cooloff_hours"), new_last, (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"), ), ) def _journal_can_reduce_cooloff(row, pending, now_ms: int) -> bool: if int(_row_get(row, "daily_frozen") or 0) == 1: return False if _resolved_cooloff_until_ms(row, now_ms) is None: return False journal_h = cooling_hours_manual_journal() cooloff_h = float(_row_get(row, "cooloff_hours") or cooling_hours_manual()) if cooloff_h <= journal_h + 1e-6: return False if pending is not None: try: if int(pending) != 0: return True except (TypeError, ValueError): return True return True def _journal_cooloff_until_ms(row, now_ms: int, journal_hours: float) -> int: journal_ms = int(max(0.0, float(journal_hours)) * 3600 * 1000) last_close_ms = _row_get(row, "last_close_at_ms") if last_close_ms: try: base_ms = _sanitize_last_close_ms( _normalize_epoch_ms(int(last_close_ms), now_ms), now_ms ) except (TypeError, ValueError): base_ms = None if base_ms is None: base_ms = now_ms else: base_ms = now_ms until_from_close = base_ms + journal_ms if until_from_close > now_ms: return until_from_close return now_ms + journal_ms def _set_daily_frozen(conn, *, trading_day: str, now: Optional[datetime] = None) -> None: _sync_trading_day(conn, trading_day, now=now) conn.execute( """UPDATE account_risk_state SET daily_frozen=1, updated_at=? WHERE id=1""", ((now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),), ) def parse_mood_issues(raw: Any) -> list[str]: if raw is None: return [] if isinstance(raw, (list, tuple)): parts = [str(x).strip() for x in raw if str(x).strip()] else: parts = [x.strip() for x in str(raw).split(",") if x.strip()] return [p for p in parts if p in MOOD_ISSUE_OPTIONS] def _record_one_user_initiated_close( conn, *, source: str, trade_record_id: Optional[int], closed_at_ms: Optional[int], trading_day: str, now: Optional[datetime] = None, ) -> None: row = _sync_trading_day(conn, trading_day, now=now) count = int(_row_get(row, "manual_close_count") or 0) + 1 close_ms = int(closed_at_ms) if closed_at_ms else _now_ms(now) pending = int(trade_record_id) if trade_record_id else None conn.execute( """UPDATE account_risk_state SET manual_close_count=?, pending_journal_trade_id=?, updated_at=? WHERE id=1""", (count, pending, (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S")), ) if count >= manual_close_daily_limit(): _set_daily_frozen(conn, trading_day=trading_day, now=now) return _set_cooloff( conn, trading_day=trading_day, close_at_ms=close_ms, hours=cooling_hours_manual(), now=now, ) def on_user_initiated_close( conn, *, source: str, trade_record_id: Optional[int] = None, closed_at_ms: Optional[int] = None, trading_day: str, now: Optional[datetime] = None, count: int = 1, ) -> None: """用户主动平仓/结束趋势计划:计入手动平仓次数与冷静期。""" if not risk_control_enabled(): return src = (source or "").strip() if src not in USER_INITIATED_CLOSE_SOURCES: return n = max(1, int(count or 1)) for i in range(n): _record_one_user_initiated_close( conn, source=src, trade_record_id=trade_record_id if i == 0 else None, closed_at_ms=closed_at_ms, trading_day=trading_day, now=now, ) row = _load_state(conn) if int(_row_get(row, "daily_frozen") or 0) == 1: break def on_manual_close( conn, *, trade_record_id: int, closed_at_ms: Optional[int], trading_day: str, now: Optional[datetime] = None, ) -> None: """兼容旧调用:等同实例页用户平仓。""" on_user_initiated_close( conn, source=CLOSE_SOURCE_USER_INSTANCE, trade_record_id=trade_record_id, closed_at_ms=closed_at_ms, trading_day=trading_day, now=now, count=1, ) def on_journal_saved( conn, *, early_exit_trigger: str, early_exit_note: str, mood_issues_raw: Any, trading_day: str, now: Optional[datetime] = None, ) -> None: if not risk_control_enabled(): return row = _sync_trading_day(conn, trading_day, now=now) mood_list = parse_mood_issues(mood_issues_raw) if mood_issues_daily_freeze_enabled() and mood_list: _set_daily_frozen(conn, trading_day=trading_day, now=now) conn.execute( "UPDATE account_risk_state SET pending_journal_trade_id=NULL, updated_at=? WHERE id=1", ((now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S"),), ) return pending = _row_get(row, "pending_journal_trade_id") trigger = (early_exit_trigger or "").strip() note = (early_exit_note or "").strip() now_ms = _now_ms(now) if ( trigger == "手动平仓" and note and int(_row_get(row, "daily_frozen") or 0) != 1 and _journal_can_reduce_cooloff(row, pending, now_ms) ): journal_h = cooling_hours_manual_journal() until_ms = _journal_cooloff_until_ms(row, now_ms, journal_h) _set_cooloff_until( conn, trading_day=trading_day, until_ms=until_ms, hours=journal_h, now=now, ) anchor_ms = until_ms - int(journal_h * 3600 * 1000) conn.execute( """UPDATE account_risk_state SET pending_journal_trade_id=NULL, last_close_at_ms=?, updated_at=? WHERE id=1""", (int(anchor_ms), (now or datetime.now()).strftime("%Y-%m-%d %H:%M:%S")), ) return def apply_manual_close_journal_cooloff( conn, *, early_exit_note: str, trading_day: str, now: Optional[datetime] = None, ) -> None: """核对修改或复盘:手动平仓 + 说明后尝试将 4h 冷静期降为 1h。""" note = (early_exit_note or "").strip() if not note: return on_journal_saved( conn, early_exit_trigger="手动平仓", early_exit_note=note, mood_issues_raw="", trading_day=trading_day, now=now, ) def _next_trading_day_reset_ms(now: datetime, reset_hour: int) -> int: from datetime import timedelta h = max(0, min(23, int(reset_hour))) candidate = now.replace(hour=h, minute=0, second=0, microsecond=0) if now >= candidate: candidate = candidate + timedelta(days=1) return _now_ms(candidate) def enrich_risk_status_countdown( st: dict[str, Any], *, now: Optional[datetime] = None, daily_reset_hour: int = 8, ) -> dict[str, Any]: """补充 freeze_until_ms / freeze_remaining_sec,供前端倒计时展示。""" if not st.get("enabled", True): return st dt = now or datetime.now() now_ms = _now_ms(dt) until_ms: Optional[int] = None if st.get("daily_frozen"): until_ms = _next_trading_day_reset_ms(dt, daily_reset_hour) elif st.get("cooloff_until_ms"): try: until_ms = int(st["cooloff_until_ms"]) except (TypeError, ValueError): until_ms = None if until_ms is not None and until_ms > now_ms: st["freeze_until_ms"] = until_ms st["freeze_remaining_sec"] = max(0, (until_ms - now_ms) // 1000) else: st["freeze_until_ms"] = None st["freeze_remaining_sec"] = 0 return st def apply_position_limit_risk( st: dict[str, Any], active_count: int, *, max_active_positions: Optional[int] = None, ) -> dict[str, Any]: """持仓达 env MAX_ACTIVE_POSITIONS 时叠加「仓位上限冻结」(时间冻结优先展示)。""" out = dict(st or {}) try: mx = max(1, int(max_active_positions if max_active_positions is not None else max_active_positions_from_env())) except (TypeError, ValueError): mx = max_active_positions_from_env() try: ac = max(0, int(active_count)) except (TypeError, ValueError): ac = 0 out["max_active_positions"] = mx out["active_count"] = ac if out.get("status") != STATUS_NORMAL: return out if ac >= mx: out["status"] = STATUS_FREEZE_POSITION out["status_label"] = STATUS_LABELS[STATUS_FREEZE_POSITION] out["can_trade"] = False out["can_roll"] = True out["reason"] = f"已达最大持仓数({ac}/{mx}),新开仓已冻结,顺势加仓仍可用" out["position_limit_frozen"] = True out["freeze_until_ms"] = None out["freeze_remaining_sec"] = 0 else: out["position_limit_frozen"] = False out["can_roll"] = True return out def compute_account_risk_status( conn, *, trading_day: str, now: Optional[datetime] = None, fmt_local_ms: Optional[Callable[[int], str]] = None, ) -> dict[str, Any]: if not risk_control_enabled(): return { "enabled": False, "status": STATUS_NORMAL, "status_label": STATUS_LABELS[STATUS_NORMAL], "can_trade": True, "reason": "", "cooloff_until_ms": None, "cooloff_until": None, "manual_close_count": 0, "daily_frozen": False, } row = _sync_trading_day(conn, trading_day, now=now) now_ms = _now_ms(now) daily_frozen = int(_row_get(row, "daily_frozen") or 0) == 1 pending = _row_get(row, "pending_journal_trade_id") cooloff_until_ms = _resolved_cooloff_until_ms(row, now_ms) if ( not daily_frozen and cooloff_until_ms is not None and _journaled_manual_cooloff_expired( conn, trading_day=trading_day, now_ms=now_ms, pending=pending ) ): cooloff_until_ms = None if not daily_frozen: _repair_stale_cooloff_row( conn, row, now_ms=now_ms, resolved_until_ms=cooloff_until_ms, now=now ) row = _load_state(conn) cooloff_until_ms = _resolved_cooloff_until_ms(row, now_ms) manual_close_count = int(_row_get(row, "manual_close_count") or 0) status = STATUS_NORMAL reason = "" if daily_frozen: status = STATUS_DAILY reason = f"账户今日已冻结(手动平仓 {manual_close_count} 次或复盘情绪标签)" elif cooloff_until_ms is not None: remaining_ms = cooloff_until_ms - now_ms hours = _cooloff_hours_value(row) status = _freeze_tier_from_remaining_ms(remaining_ms, hours) status_label = _freeze_status_label(hours, status) until_str = _ms_to_local_str(cooloff_until_ms, fmt_local_ms) if fmt_local_ms else None label = status_label reason = f"账户{label}中" if until_str: reason += f",至 {until_str}" can_trade = status == STATUS_NORMAL freeze_remaining_sec = ( max(0, (cooloff_until_ms - now_ms) // 1000) if cooloff_until_ms is not None else 0 ) return { "enabled": True, "status": status, "status_label": _freeze_status_label(_cooloff_hours_value(row), status) if status in (STATUS_FREEZE_1H, STATUS_FREEZE_4H) else STATUS_LABELS[status], "can_trade": can_trade, "reason": reason, "cooloff_until_ms": cooloff_until_ms, "cooloff_until": _ms_to_local_str(cooloff_until_ms, fmt_local_ms) if fmt_local_ms and cooloff_until_ms else None, "manual_close_count": manual_close_count, "daily_frozen": daily_frozen, "pending_journal_trade_id": pending, "freeze_remaining_sec": freeze_remaining_sec if not can_trade else 0, } def account_risk_blocks_trading( conn, *, trading_day: str, now: Optional[datetime] = None, fmt_local_ms: Optional[Callable[[int], str]] = None, ) -> tuple[bool, str]: """返回 (允许交易, 拒绝原因)。""" st = compute_account_risk_status( conn, trading_day=trading_day, now=now, fmt_local_ms=fmt_local_ms ) if st.get("can_trade"): return True, "" return False, str(st.get("reason") or STATUS_LABELS.get(st.get("status"), "账户冻结")) def insert_trade_record_id(conn) -> int: row = conn.execute("SELECT last_insert_rowid()").fetchone() return int(row[0] if row else 0)