diff --git a/modules/risk/account_risk_lib.py b/modules/risk/account_risk_lib.py index 150323b..e104756 100644 --- a/modules/risk/account_risk_lib.py +++ b/modules/risk/account_risk_lib.py @@ -274,6 +274,30 @@ def count_active_trade_monitors(conn) -> int: return 0 +def _position_slot_key(sym: str, direction: str) -> tuple[str, str]: + base = (sym or "").strip().lower().split(".")[0] + return (base, (direction or "long").strip().lower()) + + +def active_position_slots_from_monitors(conn) -> set[tuple[str, str]]: + """按品种+方向去重;滚仓/加仓共用同一监控,不额外占仓位槽。""" + keys: set[tuple[str, str]] = set() + try: + for r in conn.execute( + "SELECT symbol, direction FROM trade_order_monitors WHERE status='active'" + ): + sym = (r["symbol"] or "").strip() + if sym: + keys.add(_position_slot_key(sym, r["direction"] or "long")) + except Exception: + pass + return keys + + +def count_active_position_slots(conn) -> int: + return len(active_position_slots_from_monitors(conn)) + + def _position_keys_from_rows(rows: list) -> set[tuple[str, str]]: keys: set[tuple[str, str]] = set() for p in rows or []: @@ -285,10 +309,10 @@ def _position_keys_from_rows(rows: list) -> set[tuple[str, str]]: or p.get("symbol_code") or p.get("ths_code") or "" - ).strip().lower() + ).strip() direction = (p.get("direction") or "long").strip().lower() if sym: - keys.add((sym, direction)) + keys.add(_position_slot_key(sym, direction)) return keys @@ -298,8 +322,8 @@ def effective_active_position_count( *, ctp_connected: Optional[bool] = None, ) -> int: - """风控持仓数:柜台/快照实际持仓优先,本地监控作兜底。""" - monitor_count = count_active_trade_monitors(conn) + """风控持仓数:品种+方向槽位去重;滚仓不计入新仓位。""" + keys = active_position_slots_from_monitors(conn) if ctp_connected is None: try: from modules.ctp.vnpy_bridge import ctp_status @@ -307,23 +331,21 @@ def effective_active_position_count( ctp_connected = bool(ctp_status(mode).get("connected")) except Exception: ctp_connected = False - if not ctp_connected: - return monitor_count - keys: set[tuple[str, str]] = set() - try: - from modules.ctp.ctp_trading_state import trading_state + if ctp_connected: + try: + from modules.ctp.ctp_trading_state import trading_state - keys |= _position_keys_from_rows(trading_state.get_positions()) - except Exception: - pass - try: - from modules.trading.position_stream import position_hub + keys |= _position_keys_from_rows(trading_state.get_positions()) + except Exception: + pass + try: + from modules.trading.position_stream import position_hub - snap = position_hub.get_snapshot() or {} - keys |= _position_keys_from_rows(snap.get("rows")) - except Exception: - pass - return max(monitor_count, len(keys)) + snap = position_hub.get_snapshot() or {} + keys |= _position_keys_from_rows(snap.get("rows")) + except Exception: + pass + return len(keys) def parse_mood_issues(raw: Any) -> list[str]: @@ -419,7 +441,11 @@ def get_risk_status( "UPDATE account_risk_state SET cooloff_until_ms=NULL, cooloff_hours=NULL WHERE id=1" ) conn.commit() - active = count_active_trade_monitors(conn) if active_count is None else int(active_count) + active = ( + count_active_position_slots(conn) + if active_count is None + else int(active_count) + ) mx = max_active_positions() pos_limit = active >= mx daily_opens = count_daily_opens(conn, now) diff --git a/modules/trading/install.py b/modules/trading/install.py index 74302e3..2cb77ac 100644 --- a/modules/trading/install.py +++ b/modules/trading/install.py @@ -3502,25 +3502,32 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se return None def _ensure_strategy_monitors(conn, mode: str) -> int: - """策略页加载前:恢复误关监控并同步柜台,与持仓页逻辑一致。""" + """策略页:仅当柜台持仓缺少 active 监控时才恢复/同步,避免每次打开都全量写库。""" if not _cached_ctp_status(mode).get("connected"): return 0 - capital = _capital(conn) - synced = 0 seen: set[tuple[str, str]] = set() positions = list(trading_state.get_positions() or []) if not positions: positions = list(_ctp_positions(mode, refresh_if_empty=False) or []) + missing: list[tuple[dict, str, str]] = [] for p in positions: lots = int(p.get("lots") or 0) if lots <= 0: continue ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "") direction = (p.get("direction") or "long").strip().lower() - key = (ths.lower(), direction) + key = (ths.lower().split(".")[0], direction) if key in seen: continue seen.add(key) + if _find_active_monitor(conn, ths, direction): + continue + missing.append((p, ths, direction)) + if not missing: + return 0 + capital = _capital(conn) + synced = 0 + for p, ths, direction in missing: mon = _find_or_revive_monitor(conn, ths, direction) if not mon: continue @@ -3550,27 +3557,33 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se capital = _capital(conn) mode = get_trading_mode(get_setting) _ensure_strategy_monitors(conn, mode) - for grow in conn.execute( - "SELECT id, order_monitor_id, initial_lots FROM roll_groups WHERE status='active'" - ).fetchall(): - gfix = dict(grow) - if int(gfix.get("initial_lots") or 0) > 0: - continue - mon_row = conn.execute( - "SELECT lots FROM trade_order_monitors WHERE id=?", - (int(gfix["order_monitor_id"]),), - ).fetchone() - if not mon_row: - continue - gid = int(gfix["id"]) - filled = _roll_filled_lots_map(conn, [gid]).get(gid, 0) - lots = int(mon_row["lots"] or 0) - initial = lots if lots > 0 and filled > 0 and lots <= filled else max(1, lots - filled) - conn.execute( - "UPDATE roll_groups SET initial_lots=? WHERE id=?", - (initial, gid), - ) - commit_retry(conn) + need_initial = conn.execute( + "SELECT id FROM roll_groups WHERE status='active' AND COALESCE(initial_lots, 0)=0 LIMIT 1" + ).fetchone() + if need_initial: + for grow in conn.execute( + "SELECT id, order_monitor_id, initial_lots FROM roll_groups WHERE status='active' AND COALESCE(initial_lots, 0)=0" + ).fetchall(): + gfix = dict(grow) + mon_row = conn.execute( + "SELECT lots FROM trade_order_monitors WHERE id=?", + (int(gfix["order_monitor_id"]),), + ).fetchone() + if not mon_row: + continue + gid = int(gfix["id"]) + filled = _roll_filled_lots_map(conn, [gid]).get(gid, 0) + lots = int(mon_row["lots"] or 0) + initial = ( + lots + if lots > 0 and filled > 0 and lots <= filled + else max(1, lots - filled) + ) + conn.execute( + "UPDATE roll_groups SET initial_lots=? WHERE id=?", + (initial, gid), + ) + commit_retry(conn) active_trend = conn.execute( "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1" ).fetchone()