Fix position limit to count distinct slots and speed up strategy page.

Roll/add-on shares one monitor per symbol+direction so it no longer inflates the active position count; strategy page skips full CTP sync when monitors already exist.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-02 22:49:32 +08:00
parent 530738ae93
commit 6710692dfb
2 changed files with 84 additions and 45 deletions
+46 -20
View File
@@ -274,6 +274,30 @@ def count_active_trade_monitors(conn) -> int:
return 0 return 0
def _position_slot_key(sym: str, direction: str) -> tuple[str, str]:
base = (sym or "").strip().lower().split(".")[0]
return (base, (direction or "long").strip().lower())
def active_position_slots_from_monitors(conn) -> set[tuple[str, str]]:
"""按品种+方向去重;滚仓/加仓共用同一监控,不额外占仓位槽。"""
keys: set[tuple[str, str]] = set()
try:
for r in conn.execute(
"SELECT symbol, direction FROM trade_order_monitors WHERE status='active'"
):
sym = (r["symbol"] or "").strip()
if sym:
keys.add(_position_slot_key(sym, r["direction"] or "long"))
except Exception:
pass
return keys
def count_active_position_slots(conn) -> int:
return len(active_position_slots_from_monitors(conn))
def _position_keys_from_rows(rows: list) -> set[tuple[str, str]]: def _position_keys_from_rows(rows: list) -> set[tuple[str, str]]:
keys: set[tuple[str, str]] = set() keys: set[tuple[str, str]] = set()
for p in rows or []: for p in rows or []:
@@ -285,10 +309,10 @@ def _position_keys_from_rows(rows: list) -> set[tuple[str, str]]:
or p.get("symbol_code") or p.get("symbol_code")
or p.get("ths_code") or p.get("ths_code")
or "" or ""
).strip().lower() ).strip()
direction = (p.get("direction") or "long").strip().lower() direction = (p.get("direction") or "long").strip().lower()
if sym: if sym:
keys.add((sym, direction)) keys.add(_position_slot_key(sym, direction))
return keys return keys
@@ -298,8 +322,8 @@ def effective_active_position_count(
*, *,
ctp_connected: Optional[bool] = None, ctp_connected: Optional[bool] = None,
) -> int: ) -> int:
"""风控持仓数:柜台/快照实际持仓优先,本地监控作兜底""" """风控持仓数:品种+方向槽位去重;滚仓不计入新仓位"""
monitor_count = count_active_trade_monitors(conn) keys = active_position_slots_from_monitors(conn)
if ctp_connected is None: if ctp_connected is None:
try: try:
from modules.ctp.vnpy_bridge import ctp_status from modules.ctp.vnpy_bridge import ctp_status
@@ -307,23 +331,21 @@ def effective_active_position_count(
ctp_connected = bool(ctp_status(mode).get("connected")) ctp_connected = bool(ctp_status(mode).get("connected"))
except Exception: except Exception:
ctp_connected = False ctp_connected = False
if not ctp_connected: if ctp_connected:
return monitor_count try:
keys: set[tuple[str, str]] = set() from modules.ctp.ctp_trading_state import trading_state
try:
from modules.ctp.ctp_trading_state import trading_state
keys |= _position_keys_from_rows(trading_state.get_positions()) keys |= _position_keys_from_rows(trading_state.get_positions())
except Exception: except Exception:
pass pass
try: try:
from modules.trading.position_stream import position_hub from modules.trading.position_stream import position_hub
snap = position_hub.get_snapshot() or {} snap = position_hub.get_snapshot() or {}
keys |= _position_keys_from_rows(snap.get("rows")) keys |= _position_keys_from_rows(snap.get("rows"))
except Exception: except Exception:
pass pass
return max(monitor_count, len(keys)) return len(keys)
def parse_mood_issues(raw: Any) -> list[str]: def parse_mood_issues(raw: Any) -> list[str]:
@@ -419,7 +441,11 @@ def get_risk_status(
"UPDATE account_risk_state SET cooloff_until_ms=NULL, cooloff_hours=NULL WHERE id=1" "UPDATE account_risk_state SET cooloff_until_ms=NULL, cooloff_hours=NULL WHERE id=1"
) )
conn.commit() conn.commit()
active = count_active_trade_monitors(conn) if active_count is None else int(active_count) active = (
count_active_position_slots(conn)
if active_count is None
else int(active_count)
)
mx = max_active_positions() mx = max_active_positions()
pos_limit = active >= mx pos_limit = active >= mx
daily_opens = count_daily_opens(conn, now) daily_opens = count_daily_opens(conn, now)
+38 -25
View File
@@ -3502,25 +3502,32 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
return None return None
def _ensure_strategy_monitors(conn, mode: str) -> int: def _ensure_strategy_monitors(conn, mode: str) -> int:
"""策略页加载前:恢复误关监控并同步柜台,与持仓页逻辑一致""" """策略页:仅当柜台持仓缺少 active 监控时才恢复/同步,避免每次打开都全量写库"""
if not _cached_ctp_status(mode).get("connected"): if not _cached_ctp_status(mode).get("connected"):
return 0 return 0
capital = _capital(conn)
synced = 0
seen: set[tuple[str, str]] = set() seen: set[tuple[str, str]] = set()
positions = list(trading_state.get_positions() or []) positions = list(trading_state.get_positions() or [])
if not positions: if not positions:
positions = list(_ctp_positions(mode, refresh_if_empty=False) or []) positions = list(_ctp_positions(mode, refresh_if_empty=False) or [])
missing: list[tuple[dict, str, str]] = []
for p in positions: for p in positions:
lots = int(p.get("lots") or 0) lots = int(p.get("lots") or 0)
if lots <= 0: if lots <= 0:
continue continue
ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "") ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "")
direction = (p.get("direction") or "long").strip().lower() direction = (p.get("direction") or "long").strip().lower()
key = (ths.lower(), direction) key = (ths.lower().split(".")[0], direction)
if key in seen: if key in seen:
continue continue
seen.add(key) seen.add(key)
if _find_active_monitor(conn, ths, direction):
continue
missing.append((p, ths, direction))
if not missing:
return 0
capital = _capital(conn)
synced = 0
for p, ths, direction in missing:
mon = _find_or_revive_monitor(conn, ths, direction) mon = _find_or_revive_monitor(conn, ths, direction)
if not mon: if not mon:
continue continue
@@ -3550,27 +3557,33 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
capital = _capital(conn) capital = _capital(conn)
mode = get_trading_mode(get_setting) mode = get_trading_mode(get_setting)
_ensure_strategy_monitors(conn, mode) _ensure_strategy_monitors(conn, mode)
for grow in conn.execute( need_initial = conn.execute(
"SELECT id, order_monitor_id, initial_lots FROM roll_groups WHERE status='active'" "SELECT id FROM roll_groups WHERE status='active' AND COALESCE(initial_lots, 0)=0 LIMIT 1"
).fetchall(): ).fetchone()
gfix = dict(grow) if need_initial:
if int(gfix.get("initial_lots") or 0) > 0: for grow in conn.execute(
continue "SELECT id, order_monitor_id, initial_lots FROM roll_groups WHERE status='active' AND COALESCE(initial_lots, 0)=0"
mon_row = conn.execute( ).fetchall():
"SELECT lots FROM trade_order_monitors WHERE id=?", gfix = dict(grow)
(int(gfix["order_monitor_id"]),), mon_row = conn.execute(
).fetchone() "SELECT lots FROM trade_order_monitors WHERE id=?",
if not mon_row: (int(gfix["order_monitor_id"]),),
continue ).fetchone()
gid = int(gfix["id"]) if not mon_row:
filled = _roll_filled_lots_map(conn, [gid]).get(gid, 0) continue
lots = int(mon_row["lots"] or 0) gid = int(gfix["id"])
initial = lots if lots > 0 and filled > 0 and lots <= filled else max(1, lots - filled) filled = _roll_filled_lots_map(conn, [gid]).get(gid, 0)
conn.execute( lots = int(mon_row["lots"] or 0)
"UPDATE roll_groups SET initial_lots=? WHERE id=?", initial = (
(initial, gid), lots
) if lots > 0 and filled > 0 and lots <= filled
commit_retry(conn) else max(1, lots - filled)
)
conn.execute(
"UPDATE roll_groups SET initial_lots=? WHERE id=?",
(initial, gid),
)
commit_retry(conn)
active_trend = conn.execute( active_trend = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1" "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1"
).fetchone() ).fetchone()