diff --git a/manual_trading_hub/hub_supervisor_lib.py b/manual_trading_hub/hub_supervisor_lib.py index 52c4919..660c6f7 100644 --- a/manual_trading_hub/hub_supervisor_lib.py +++ b/manual_trading_hub/hub_supervisor_lib.py @@ -146,6 +146,22 @@ def is_program_event(event_type: str) -> bool: return event_type in (EVENT_PROGRAM_TP, EVENT_PROGRAM_SL) +def _normalize_position_symbol(sym: str) -> str: + """统一合约名,避免 ZEC/USDT 与 ZEC/USDT:USDT 被当成两笔持仓。""" + s = (sym or "").strip().upper() + if not s: + return "" + if s.endswith(":USDT") and "/" in s: + return s.rsplit(":", 1)[0] + return s + + +def _position_key(exchange_id: str, symbol: str, side: str) -> str: + sym = _normalize_position_symbol(symbol) + sd = (side or "long").strip().lower() or "long" + return f"{exchange_id}|{sym}|{sd}" + + def _position_contracts(pos: dict) -> float: for key in ("contracts", "contracts_signed", "size"): try: @@ -173,7 +189,7 @@ def collect_position_keys(board_payload: dict | None) -> dict[str, dict]: continue sym = str(p.get("symbol") or "") side = str(p.get("side") or "").lower() or "long" - key = f"{ex_id}|{sym}|{side}" + key = _position_key(ex_id, sym, side) out[key] = { "exchange_id": ex_id, "exchange_name": ex_name, @@ -184,13 +200,47 @@ def collect_position_keys(board_payload: dict | None) -> dict[str, dict]: return out +def _board_agent_snapshot_ready(board_payload: dict | None) -> bool: + """监控板各启用账户 agent 快照已就绪(避免空板先入库导致后续持仓误判为新开)。""" + if not isinstance(board_payload, dict) or board_payload.get("ok") is False: + return False + rows = board_payload.get("rows") or [] + if not rows: + return False + seen = 0 + for row in rows: + if not isinstance(row, dict): + continue + if row.get("enabled") is False: + continue + ag = row.get("agent") + if not isinstance(ag, dict): + return False + seen += 1 + return seen > 0 + + +def _entry_contracts(entry: dict | None) -> float: + if not isinstance(entry, dict): + return 0.0 + try: + return float(entry.get("contracts") or 0) + except (TypeError, ValueError): + return 0.0 + + def detect_new_opens( prev_positions: dict[str, dict], curr_positions: dict[str, dict], ) -> list[dict]: + """仅当某合约从空仓变为有仓时视为新开(已有持仓不加仓不算)。""" events = [] for key, info in curr_positions.items(): - if key in prev_positions: + curr_c = _entry_contracts(info) + if curr_c < 1e-12: + continue + prev_c = _entry_contracts(prev_positions.get(key)) + if prev_c >= 1e-12: continue events.append({"event_type": EVENT_OPEN, "event_id": f"open:{key}:{_now_str()[:16]}", **info}) return events @@ -566,6 +616,7 @@ def process_supervisor_tick( "processed": [], "positions": {}, "stats": {trading_day: state.get("stats", {}).get(trading_day, {})}, + "positions_baseline_ready": False, } processed = set(str(x) for x in (state.get("processed") or [])) @@ -573,17 +624,30 @@ def process_supervisor_tick( prev_positions = dict(state.get("positions") or {}) curr_positions = collect_position_keys(board_payload) closed_trades = dash.get("closed_trades") or [] + board_ready = _board_agent_snapshot_ready(board_payload) - if not state.get("initialized"): + if not state.get("positions_baseline_ready"): for trade in closed_trades: if isinstance(trade, dict): processed.add(f"close:{_trade_event_id(trade)}") + if not board_ready: + state["trading_day"] = trading_day + state["processed"] = list(processed) + save_supervisor_state(state) + return {"ok": True, "events": 0, "waiting_board": True, "trading_day": trading_day} state["trading_day"] = trading_day state["processed"] = list(processed) state["positions"] = curr_positions + state["positions_baseline_ready"] = True state["initialized"] = True save_supervisor_state(state) - return {"ok": True, "events": 0, "seeded": True, "trading_day": trading_day} + return { + "ok": True, + "events": 0, + "seeded": True, + "trading_day": trading_day, + "positions": len(curr_positions), + } raw_events = detect_new_opens(prev_positions, curr_positions) + detect_new_closes( processed, closed_trades diff --git a/tests/test_hub_supervisor_lib.py b/tests/test_hub_supervisor_lib.py index 4caefae..06385d5 100644 --- a/tests/test_hub_supervisor_lib.py +++ b/tests/test_hub_supervisor_lib.py @@ -30,10 +30,10 @@ def test_classify_close_result(): def test_detect_new_opens(): - prev = {"0|ETH/USDT|long": {"symbol": "ETH/USDT"}} + prev = {"0|ETH/USDT|long": {"symbol": "ETH/USDT", "contracts": 1.0}} curr = { - "0|ETH/USDT|long": {"symbol": "ETH/USDT"}, - "1|BTC/USDT|short": {"symbol": "BTC/USDT", "exchange_name": "OKX"}, + "0|ETH/USDT|long": {"symbol": "ETH/USDT", "contracts": 1.0}, + "1|BTC/USDT|short": {"symbol": "BTC/USDT", "contracts": 2.0, "exchange_name": "OKX"}, } events = sup.detect_new_opens(prev, curr) assert len(events) == 1 @@ -41,6 +41,31 @@ def test_detect_new_opens(): assert events[0]["symbol"] == "BTC/USDT" +def test_detect_new_opens_skips_existing_holdings(): + prev = { + "2|ZEC/USDT|short": {"symbol": "ZEC/USDT:USDT", "contracts": 5.0}, + "2|HYPE/USDT|short": {"symbol": "HYPE/USDT:USDT", "contracts": 3.0}, + } + curr = { + "2|ZEC/USDT|short": {"symbol": "ZEC/USDT:USDT", "contracts": 5.0}, + "2|HYPE/USDT|short": {"symbol": "HYPE/USDT:USDT", "contracts": 3.0}, + } + assert sup.detect_new_opens(prev, curr) == [] + + +def test_detect_new_opens_only_from_flat(): + prev = {"2|ZEC/USDT|short": {"symbol": "ZEC/USDT", "contracts": 0.0}} + curr = {"2|ZEC/USDT|short": {"symbol": "ZEC/USDT:USDT", "contracts": 2.0}} + events = sup.detect_new_opens(prev, curr) + assert len(events) == 1 + assert events[0]["symbol"] == "ZEC/USDT:USDT" + + +def test_normalize_position_symbol(): + assert sup._normalize_position_symbol("ZEC/USDT:USDT") == "ZEC/USDT" + assert sup._position_key("2", "ZEC/USDT:USDT", "short") == "2|ZEC/USDT|short" + + def test_detect_new_closes_dedup(): trades = [ { @@ -100,7 +125,10 @@ def test_process_supervisor_tick_seeds_without_events(state_path, monkeypatch, t } ], } - board = {"ok": True, "rows": []} + board = { + "ok": True, + "rows": [{"id": "0", "enabled": True, "agent": {"ok": True, "positions": []}}], + } settings = {"supervisor": sup.normalize_supervisor_settings({"enabled": True, "wechat_webhook": ""})} r1 = sup.process_supervisor_tick(dash, board, settings, ai_reply_fn=None) @@ -110,6 +138,26 @@ def test_process_supervisor_tick_seeds_without_events(state_path, monkeypatch, t r2 = sup.process_supervisor_tick(dash, board, settings, ai_reply_fn=None) assert r2.get("events") == 0 + board2 = { + "ok": True, + "rows": [ + { + "id": "2", + "name": "Gate", + "enabled": True, + "agent": { + "ok": True, + "positions": [ + {"symbol": "ZEC/USDT:USDT", "side": "short", "contracts": 1.0}, + {"symbol": "HYPE/USDT:USDT", "side": "short", "contracts": 1.0}, + ], + }, + } + ], + } + r3 = sup.process_supervisor_tick(dash, board2, settings, ai_reply_fn=None) + assert r3.get("events") == 0 + dash2 = dict(dash) dash2["closed_trades"] = dash["closed_trades"] + [ { @@ -120,14 +168,8 @@ def test_process_supervisor_tick_seeds_without_events(state_path, monkeypatch, t "closed_at": "2026-06-14 11:00:00", } ] - r3 = sup.process_supervisor_tick(dash2, board, settings, ai_reply_fn=None) - assert r3.get("events") == 1 - assert chat_path.is_file() - data = json.loads(chat_path.read_text(encoding="utf-8")) - sessions = [s for s in data.get("sessions") or [] if s.get("bot_mode") == "supervisor"] - assert sessions - msgs = sessions[0].get("messages") or [] - assert any(m.get("role") == "system" for m in msgs) + r4 = sup.process_supervisor_tick(dash2, board2, settings, ai_reply_fn=None) + assert r4.get("events") == 1 def test_normalize_supervisor_settings_env(monkeypatch):