"""hub_supervisor_lib 单元测试。""" from __future__ import annotations import json import sys from pathlib import Path import pytest ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT)) sys.path.insert(0, str(ROOT / "manual_trading_hub")) import hub_supervisor_lib as sup @pytest.fixture def state_path(tmp_path, monkeypatch): p = tmp_path / "hub_supervisor_state.json" monkeypatch.setattr(sup, "STATE_PATH", p) return p def test_classify_close_result(): assert sup.classify_close_result("手动平仓") == sup.EVENT_MANUAL_CLOSE assert sup.classify_close_result("强制清仓") == sup.EVENT_HUB_CLOSE assert sup.classify_close_result("止盈") == sup.EVENT_PROGRAM_TP assert sup.classify_close_result("止损") == sup.EVENT_PROGRAM_SL assert sup.classify_close_result("外部平仓") == sup.EVENT_EXTERNAL def test_detect_new_opens(): prev = {"0|ETH/USDT|long": {"symbol": "ETH/USDT", "contracts": 1.0}} curr = { "0|ETH/USDT|long": {"symbol": "ETH/USDT", "contracts": 1.0}, "1|BTC/USDT|short": {"symbol": "BTC/USDT", "contracts": 2.0, "exchange_name": "OKX"}, } events = sup.detect_new_opens(prev, curr) assert len(events) == 1 assert events[0]["event_type"] == sup.EVENT_OPEN assert events[0]["symbol"] == "BTC/USDT" def test_detect_new_opens_skips_existing_holdings(): prev = { "2|ZEC/USDT|short": {"symbol": "ZEC/USDT:USDT", "contracts": 5.0}, "2|HYPE/USDT|short": {"symbol": "HYPE/USDT:USDT", "contracts": 3.0}, } curr = { "2|ZEC/USDT|short": {"symbol": "ZEC/USDT:USDT", "contracts": 5.0}, "2|HYPE/USDT|short": {"symbol": "HYPE/USDT:USDT", "contracts": 3.0}, } assert sup.detect_new_opens(prev, curr) == [] def test_detect_new_opens_only_from_flat(): prev = {"2|ZEC/USDT|short": {"symbol": "ZEC/USDT", "contracts": 0.0}} curr = {"2|ZEC/USDT|short": {"symbol": "ZEC/USDT:USDT", "contracts": 2.0}} events = sup.detect_new_opens(prev, curr) assert len(events) == 1 assert events[0]["symbol"] == "ZEC/USDT:USDT" def test_normalize_position_symbol(): assert sup._normalize_position_symbol("ZEC/USDT:USDT") == "ZEC/USDT" assert sup._position_key("2", "ZEC/USDT:USDT", "short") == "2|ZEC/USDT|short" def test_detect_new_closes_dedup(): trades = [ { "account_name": "OKX", "symbol": "ETH/USDT", "result": "手动平仓", "pnl_amount": -5, "closed_at": "2026-06-14 10:00:00", } ] eid = f"close:{sup._trade_event_id(trades[0])}" events = sup.detect_new_closes(set(), trades) assert len(events) == 1 assert events[0]["event_type"] == sup.EVENT_MANUAL_CLOSE assert sup.detect_new_closes({eid}, trades) == [] def test_evaluate_frequency_warnings_interval(): stats = { "2026-06-14": { "supervised_closes": [ {"closed_at": "2026-06-14 09:50:00", "pnl_amount": -1}, ], "supervised_opens": [], } } event = { "event_type": sup.EVENT_MANUAL_CLOSE, "closed_at": "2026-06-14 10:00:00", "pnl_amount": -2, } settings = sup.normalize_supervisor_settings({}) warnings = sup.evaluate_frequency_warnings( trading_day="2026-06-14", event=event, stats=stats, settings=settings, ) rules = {w["rule"] for w in warnings} assert "INTERVAL_SHORT" in rules def test_process_supervisor_tick_seeds_without_events(state_path, monkeypatch, tmp_path): chat_path = tmp_path / "hub_ai_chat.json" monkeypatch.setattr("hub_ai.store.CHAT_PATH", chat_path) dash = { "ok": True, "trading_day": "2026-06-14", "closed_trades": [ { "account_name": "Binance", "symbol": "ETH/USDT", "result": "手动平仓", "pnl_amount": 1, "closed_at": "2026-06-14 08:30:00", } ], } board = { "ok": True, "rows": [{"id": "0", "enabled": True, "agent": {"ok": True, "positions": []}}], } settings = {"supervisor": sup.normalize_supervisor_settings({"enabled": True, "wechat_webhook": ""})} r1 = sup.process_supervisor_tick(dash, board, settings, ai_reply_fn=None) assert r1.get("seeded") is True assert r1.get("events") == 0 r2 = sup.process_supervisor_tick(dash, board, settings, ai_reply_fn=None) assert r2.get("events") == 0 board2 = { "ok": True, "rows": [ { "id": "2", "name": "Gate", "enabled": True, "agent": { "ok": True, "positions": [ {"symbol": "ZEC/USDT:USDT", "side": "short", "contracts": 1.0}, {"symbol": "HYPE/USDT:USDT", "side": "short", "contracts": 1.0}, ], }, } ], } r3 = sup.process_supervisor_tick(dash, board2, settings, ai_reply_fn=None) assert r3.get("events") == 0 dash2 = dict(dash) dash2["closed_trades"] = dash["closed_trades"] + [ { "account_name": "Binance", "symbol": "BTC/USDT", "result": "手动平仓", "pnl_amount": -3, "closed_at": "2026-06-14 11:00:00", } ] r4 = sup.process_supervisor_tick(dash2, board2, settings, ai_reply_fn=None) assert r4.get("events") == 1 def test_normalize_supervisor_settings_env(monkeypatch): monkeypatch.setenv("SUPERVISOR_WECHAT_WEBHOOK", "https://example.com/hook") monkeypatch.setenv("SUPERVISOR_WECHAT_LINK", "https://hub.example/ai?mode=supervisor") cfg = sup.normalize_supervisor_settings({}) assert cfg["wechat_webhook"] == "https://example.com/hook" assert cfg["wechat_link_base"] == "https://hub.example/ai?mode=supervisor" def test_supervisor_fallback_reply_program_sl(): text = sup.build_supervisor_fallback_reply( { "event_type": sup.EVENT_PROGRAM_SL, "symbol": "ZEC/USDT", "pnl_amount": -0.9557, } ) assert "程序止损" in text assert "AI 生成失败" not in text def test_supervisor_fallback_not_error_reply(): from hub_ai.text_util import is_ai_error_reply text = sup.build_supervisor_fallback_reply( {"event_type": sup.EVENT_MANUAL_CLOSE, "symbol": "ETH/USDT", "pnl_amount": -1} ) assert text assert not is_ai_error_reply(text)