Files
crypto_monitor/tests/test_account_risk_lib.py
T
2026-06-24 01:58:24 +08:00

525 lines
20 KiB
Python

import os
import sqlite3
import unittest
from datetime import datetime
from unittest import mock
from zoneinfo import ZoneInfo
from account_risk_lib import (
CLOSE_SOURCE_USER_HUB,
CLOSE_SOURCE_USER_INSTANCE,
CLOSE_SOURCE_USER_TREND_STOP,
STATUS_DAILY,
STATUS_FREEZE_1H,
STATUS_FREEZE_4H,
STATUS_FREEZE_POSITION,
STATUS_NORMAL,
account_risk_blocks_trading,
apply_position_limit_risk,
compute_account_risk_status,
enrich_risk_status_countdown,
ensure_account_risk_schema,
max_active_positions_from_env,
on_journal_saved,
on_manual_close,
on_user_initiated_close,
parse_mood_issues,
)
APP_TZ = ZoneInfo("Asia/Shanghai")
def _mem_conn():
conn = sqlite3.connect(":memory:")
conn.row_factory = sqlite3.Row
ensure_account_risk_schema(conn)
return conn
def _mem_conn_with_journal():
conn = _mem_conn()
conn.execute(
"""CREATE TABLE IF NOT EXISTS journal_entries (
close_datetime TEXT, early_exit_trigger TEXT, early_exit_note TEXT
)"""
)
return conn
def _local_ms(dt_naive: datetime) -> int:
return int(dt_naive.replace(tzinfo=APP_TZ).timestamp() * 1000)
class AccountRiskLibTests(unittest.TestCase):
def setUp(self):
self.env_patch = mock.patch.dict(os.environ, {}, clear=False)
self.env_patch.start()
os.environ["RISK_CONTROL_ENABLED"] = "1"
os.environ["RISK_COOLING_HOURS_MANUAL"] = "4"
os.environ["RISK_COOLING_HOURS_MANUAL_JOURNAL"] = "1"
os.environ["RISK_MANUAL_CLOSE_DAILY_LIMIT"] = "2"
os.environ["RISK_MOOD_ISSUES_DAILY_FREEZE"] = "1"
os.environ["APP_TIMEZONE"] = "Asia/Shanghai"
def tearDown(self):
self.env_patch.stop()
def test_user_instance_sets_4h_cooloff(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
close_ms = _local_ms(now)
on_user_initiated_close(
conn,
source=CLOSE_SOURCE_USER_INSTANCE,
trade_record_id=101,
closed_at_ms=close_ms,
trading_day="2026-06-14",
now=now,
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
self.assertEqual(st["status"], STATUS_FREEZE_4H)
self.assertFalse(st["can_trade"])
self.assertAlmostEqual(st["freeze_remaining_sec"], 4 * 3600, delta=2)
def test_invalid_source_ignored(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
on_user_initiated_close(
conn,
source="exchange_tpsl",
trading_day="2026-06-14",
now=now,
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
self.assertEqual(st["status"], STATUS_NORMAL)
def test_second_user_close_daily_freeze(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
close_ms = _local_ms(now)
on_user_initiated_close(
conn, source=CLOSE_SOURCE_USER_HUB, closed_at_ms=close_ms, trading_day="2026-06-14", now=now
)
on_user_initiated_close(
conn, source=CLOSE_SOURCE_USER_HUB, closed_at_ms=close_ms + 1000, trading_day="2026-06-14", now=now
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
self.assertEqual(st["status"], STATUS_DAILY)
def test_hub_close_all_count(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
close_ms = _local_ms(now)
on_user_initiated_close(
conn,
source=CLOSE_SOURCE_USER_HUB,
closed_at_ms=close_ms,
trading_day="2026-06-14",
now=now,
count=2,
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
self.assertEqual(st["manual_close_count"], 2)
self.assertEqual(st["status"], STATUS_DAILY)
def test_trend_stop_counts_as_manual(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
on_user_initiated_close(
conn,
source=CLOSE_SOURCE_USER_TREND_STOP,
trading_day="2026-06-14",
now=now,
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
self.assertEqual(st["manual_close_count"], 1)
self.assertEqual(st["status"], STATUS_FREEZE_4H)
self.assertAlmostEqual(st["freeze_remaining_sec"], 4 * 3600, delta=2)
def test_journal_manual_with_note_reduces_to_1h(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
close_ms = _local_ms(now)
on_manual_close(conn, trade_record_id=9, closed_at_ms=close_ms, trading_day="2026-06-14", now=now)
on_journal_saved(
conn,
early_exit_trigger="手动平仓",
early_exit_note="违反计划提前离场",
mood_issues_raw="",
trading_day="2026-06-14",
now=now,
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
self.assertEqual(st["status"], STATUS_FREEZE_1H)
self.assertAlmostEqual(st["freeze_remaining_sec"], 3600, delta=2)
def test_journal_hub_close_without_pending_reduces_to_1h(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
close_ms = _local_ms(now)
on_user_initiated_close(
conn,
source=CLOSE_SOURCE_USER_HUB,
closed_at_ms=close_ms,
trading_day="2026-06-14",
now=now,
)
on_journal_saved(
conn,
early_exit_trigger="手动平仓",
early_exit_note="中控全平后复盘说明",
mood_issues_raw="",
trading_day="2026-06-14",
now=now,
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
self.assertEqual(st["status"], STATUS_FREEZE_1H)
def test_journal_reduces_when_manual_count_cleared_but_cooloff_active(self):
conn = _mem_conn()
now = datetime(2026, 6, 15, 10, 0, 0)
now_ms = _local_ms(now)
close_ms = now_ms - 3600 * 1000
until_ms = close_ms + 4 * 3600 * 1000
conn.execute(
"""UPDATE account_risk_state SET
trading_day='2026-06-15',
manual_close_count=0,
cooloff_until_ms=?,
cooloff_hours=4,
last_close_at_ms=?,
daily_frozen=0
WHERE id=1""",
(until_ms, close_ms),
)
on_journal_saved(
conn,
early_exit_trigger="手动平仓",
early_exit_note="切日后补复盘",
mood_issues_raw="",
trading_day="2026-06-15",
now=now,
)
st = compute_account_risk_status(conn, trading_day="2026-06-15", now=now)
self.assertEqual(st["status"], STATUS_FREEZE_1H)
def test_journal_late_save_still_gets_1h_from_now(self):
conn = _mem_conn()
close_at = datetime(2026, 6, 14, 12, 0, 0)
close_ms = _local_ms(close_at)
on_user_initiated_close(
conn,
source=CLOSE_SOURCE_USER_INSTANCE,
closed_at_ms=close_ms,
trading_day="2026-06-14",
now=close_at,
)
journal_at = datetime(2026, 6, 14, 14, 0, 0)
on_journal_saved(
conn,
early_exit_trigger="手动平仓",
early_exit_note="补写复盘说明",
mood_issues_raw="",
trading_day="2026-06-14",
now=journal_at,
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=journal_at)
self.assertEqual(st["status"], STATUS_FREEZE_1H)
self.assertEqual(st["cooloff_until_ms"], _local_ms(journal_at) + 3600 * 1000)
def test_stale_4h_until_with_1h_hours_uses_shorter_end(self):
"""库内 cooloff_hours=1 但 cooloff_until_ms 仍为旧 4h 时,应按 last_close+1h 倒计时。"""
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 6, 0)
now_ms = _local_ms(now)
close_ms = now_ms - 6 * 60 * 1000
stale_until_4h = close_ms + 4 * 3600 * 1000
conn.execute(
"""UPDATE account_risk_state SET
trading_day='2026-06-14',
manual_close_count=1,
cooloff_until_ms=?,
cooloff_hours=1,
last_close_at_ms=?,
daily_frozen=0
WHERE id=1""",
(stale_until_4h, close_ms),
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
self.assertEqual(st["status"], STATUS_FREEZE_1H)
self.assertAlmostEqual(st["freeze_remaining_sec"], 54 * 60, delta=3)
def test_stale_4h_ignored_after_1h_journal_expired(self):
"""复盘已降为 1h 且窗口结束后,不应再读库内旧 4h until。"""
conn = _mem_conn()
close_at = datetime(2026, 6, 18, 17, 56, 0)
now = datetime(2026, 6, 18, 21, 50, 0)
close_ms = _local_ms(close_at)
stale_4h_until = close_ms + 4 * 3600 * 1000
conn.execute(
"""UPDATE account_risk_state SET
trading_day='2026-06-18',
manual_close_count=1,
cooloff_until_ms=?,
cooloff_hours=1,
last_close_at_ms=?,
daily_frozen=0
WHERE id=1""",
(stale_4h_until, close_ms),
)
st = compute_account_risk_status(conn, trading_day="2026-06-18", now=now)
self.assertEqual(st["status"], STATUS_NORMAL)
self.assertTrue(st["can_trade"])
row = conn.execute(
"SELECT cooloff_until_ms, cooloff_hours, last_close_at_ms FROM account_risk_state WHERE id=1"
).fetchone()
self.assertIsNone(row["cooloff_until_ms"])
self.assertIsNone(row["last_close_at_ms"])
def test_corrupted_anchor_cleared_when_journaled_manual_expired(self):
"""上一版误把 last_close 写成近期时刻时,已复盘且 1h 已过的仍应显示正常。"""
conn = _mem_conn_with_journal()
now = datetime(2026, 6, 18, 22, 30, 0)
now_ms = _local_ms(now)
bad_last = now_ms - 60 * 1000
conn.execute(
"""UPDATE account_risk_state SET
trading_day='2026-06-18',
manual_close_count=1,
cooloff_until_ms=?,
cooloff_hours=1,
last_close_at_ms=?,
pending_journal_trade_id=NULL,
daily_frozen=0
WHERE id=1""",
(bad_last + 3600 * 1000, bad_last),
)
conn.execute(
"INSERT INTO journal_entries (close_datetime, early_exit_trigger, early_exit_note) VALUES (?,?,?)",
("2026-06-18 17:56:00", "手动平仓", "按计划离场"),
)
st = compute_account_risk_status(conn, trading_day="2026-06-18", now=now)
self.assertEqual(st["status"], STATUS_NORMAL)
self.assertTrue(st["can_trade"])
def test_future_last_close_does_not_restart_cooloff(self):
"""脏数据 last_close 在未来时,不应重启 1h/4h 冻结。"""
conn = _mem_conn()
now = datetime(2026, 6, 18, 22, 30, 0)
now_ms = _local_ms(now)
future_close = now_ms + 49 * 60 * 1000
conn.execute(
"""UPDATE account_risk_state SET
trading_day='2026-06-18',
manual_close_count=1,
cooloff_until_ms=?,
cooloff_hours=1,
last_close_at_ms=?,
daily_frozen=0
WHERE id=1""",
(future_close + 3600 * 1000, future_close),
)
st = compute_account_risk_status(conn, trading_day="2026-06-18", now=now)
self.assertEqual(st["status"], STATUS_NORMAL)
self.assertTrue(st["can_trade"])
def test_active_4h_countdown_matches_tier(self):
conn = _mem_conn()
close_at = datetime(2026, 6, 18, 21, 46, 0)
now = datetime(2026, 6, 18, 21, 52, 0)
close_ms = _local_ms(close_at)
on_user_initiated_close(
conn,
source=CLOSE_SOURCE_USER_INSTANCE,
closed_at_ms=close_ms,
trading_day="2026-06-18",
now=close_at,
)
st = compute_account_risk_status(conn, trading_day="2026-06-18", now=now)
self.assertEqual(st["status"], STATUS_FREEZE_4H)
self.assertAlmostEqual(st["freeze_remaining_sec"], 3 * 3600 + 54 * 60, delta=5)
def test_trading_day_reset_clears_expired_stale_cooloff(self):
conn = _mem_conn()
close_at = datetime(2026, 6, 18, 17, 56, 0)
close_ms = _local_ms(close_at)
stale_4h_until = close_ms + 4 * 3600 * 1000
conn.execute(
"""UPDATE account_risk_state SET
trading_day='2026-06-18',
manual_close_count=1,
cooloff_until_ms=?,
cooloff_hours=1,
last_close_at_ms=?,
daily_frozen=0
WHERE id=1""",
(stale_4h_until, close_ms),
)
next_day = datetime(2026, 6, 19, 9, 0, 0)
st = compute_account_risk_status(conn, trading_day="2026-06-19", now=next_day)
self.assertEqual(st["status"], STATUS_NORMAL)
row = conn.execute("SELECT cooloff_until_ms FROM account_risk_state WHERE id=1").fetchone()
self.assertIsNone(row["cooloff_until_ms"])
def test_remaining_never_exceeds_configured_hours(self):
conn = _mem_conn()
now = datetime(2026, 6, 18, 22, 0, 0)
now_ms = _local_ms(now)
future_close = now_ms + 49 * 60 * 1000
conn.execute(
"""UPDATE account_risk_state SET
trading_day='2026-06-18',
manual_close_count=1,
cooloff_until_ms=?,
cooloff_hours=4,
last_close_at_ms=?,
daily_frozen=0
WHERE id=1""",
(future_close + 4 * 3600 * 1000, future_close),
)
st = compute_account_risk_status(conn, trading_day="2026-06-18", now=now)
self.assertEqual(st["status"], STATUS_NORMAL)
self.assertTrue(st["can_trade"])
def test_legacy_naive_utc_ms_countdown_normalized(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
now_ms = _local_ms(now)
offset_ms = 8 * 3600 * 1000
legacy_close = now_ms + offset_ms
legacy_until = legacy_close + 4 * 3600 * 1000
conn.execute(
"""UPDATE account_risk_state SET
trading_day='2026-06-14',
manual_close_count=1,
cooloff_until_ms=?,
cooloff_hours=4,
last_close_at_ms=?,
daily_frozen=0
WHERE id=1""",
(legacy_until, legacy_close),
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
st = enrich_risk_status_countdown(st, now=now, daily_reset_hour=8)
self.assertEqual(st["status"], STATUS_FREEZE_4H)
self.assertAlmostEqual(st["freeze_remaining_sec"], 4 * 3600, delta=2)
def test_journal_mood_issues_daily_freeze(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
on_journal_saved(
conn,
early_exit_trigger="止损",
early_exit_note="",
mood_issues_raw=["报复开仓"],
trading_day="2026-06-14",
now=now,
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
self.assertEqual(st["status"], STATUS_DAILY)
def test_cooloff_expired_returns_normal(self):
conn = _mem_conn()
start = datetime(2026, 6, 14, 8, 0, 0)
close_ms = _local_ms(start)
on_user_initiated_close(
conn, source=CLOSE_SOURCE_USER_INSTANCE, closed_at_ms=close_ms, trading_day="2026-06-14", now=start
)
later = datetime(2026, 6, 14, 13, 0, 0)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=later)
self.assertEqual(st["status"], STATUS_NORMAL)
row = conn.execute("SELECT cooloff_until_ms FROM account_risk_state WHERE id=1").fetchone()
self.assertIsNone(row["cooloff_until_ms"])
def test_trading_day_reset_clears_daily_frozen(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
on_journal_saved(
conn,
early_exit_trigger="止损",
early_exit_note="",
mood_issues_raw="扛单",
trading_day="2026-06-14",
now=now,
)
next_day = datetime(2026, 6, 15, 8, 0, 0)
st = compute_account_risk_status(conn, trading_day="2026-06-15", now=next_day)
self.assertEqual(st["status"], STATUS_NORMAL)
def test_parse_mood_issues_filters_unknown(self):
self.assertEqual(parse_mood_issues("怕踏空,未知标签,扛单"), ["怕踏空", "扛单"])
def test_enrich_countdown_for_daily_and_cooloff(self):
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
close_ms = _local_ms(now)
on_user_initiated_close(
conn,
source=CLOSE_SOURCE_USER_INSTANCE,
closed_at_ms=close_ms,
trading_day="2026-06-14",
now=now,
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
st = enrich_risk_status_countdown(st, now=now, daily_reset_hour=8)
self.assertGreater(st["freeze_remaining_sec"], 0)
self.assertEqual(st["freeze_until_ms"], st["cooloff_until_ms"])
on_journal_saved(
conn,
early_exit_trigger="止损",
early_exit_note="",
mood_issues_raw="扛单",
trading_day="2026-06-14",
now=now,
)
st2 = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
st2 = enrich_risk_status_countdown(st2, now=now, daily_reset_hour=8)
self.assertTrue(st2["daily_frozen"])
self.assertGreater(st2["freeze_remaining_sec"], 0)
self.assertIsNotNone(st2["freeze_until_ms"])
def test_disabled_risk_control(self):
os.environ["RISK_CONTROL_ENABLED"] = "0"
conn = _mem_conn()
now = datetime(2026, 6, 14, 12, 0, 0)
on_user_initiated_close(
conn, source=CLOSE_SOURCE_USER_INSTANCE, trading_day="2026-06-14", now=now
)
st = compute_account_risk_status(conn, trading_day="2026-06-14", now=now)
self.assertFalse(st["enabled"])
self.assertTrue(st["can_trade"])
ok, _ = account_risk_blocks_trading(conn, trading_day="2026-06-14", now=now)
self.assertTrue(ok)
def test_position_limit_freeze_from_env(self):
os.environ["MAX_ACTIVE_POSITIONS"] = "2"
st = apply_position_limit_risk({"status": STATUS_NORMAL, "can_trade": True}, 2)
self.assertEqual(st["status"], STATUS_FREEZE_POSITION)
self.assertEqual(st["status_label"], "仓位上限冻结")
self.assertFalse(st["can_trade"])
self.assertIn("2/2", st["reason"])
self.assertEqual(st["max_active_positions"], 2)
def test_position_limit_normal_when_under_cap(self):
st = apply_position_limit_risk({"status": STATUS_NORMAL, "can_trade": True}, 0, max_active_positions=1)
self.assertEqual(st["status"], STATUS_NORMAL)
self.assertTrue(st["can_trade"])
def test_time_freeze_takes_priority_over_position_limit(self):
st = apply_position_limit_risk(
{"status": STATUS_FREEZE_4H, "status_label": "4h冻结", "can_trade": False},
5,
max_active_positions=1,
)
self.assertEqual(st["status"], STATUS_FREEZE_4H)
self.assertEqual(st["active_count"], 5)
def test_max_active_positions_from_env(self):
os.environ["MAX_ACTIVE_POSITIONS"] = "3"
self.assertEqual(max_active_positions_from_env(), 3)
if __name__ == "__main__":
unittest.main()