From 2f581593720a42f39ec17363c7c1b6217f7d9941 Mon Sep 17 00:00:00 2001 From: dekun Date: Tue, 30 Jun 2026 09:29:42 +0800 Subject: [PATCH] Fix slow positions and DB lock on fast live refresh by skipping writes. Co-authored-by: Cursor --- install_trading.py | 90 +++++++++++++++++++++++++++++++++++----------- 1 file changed, 70 insertions(+), 20 deletions(-) diff --git a/install_trading.py b/install_trading.py index 45fd75a..5de4c0b 100644 --- a/install_trading.py +++ b/install_trading.py @@ -53,7 +53,7 @@ from order_pending import ( pending_monitor_has_live_order, reconcile_pending_orders, ) -from db_conn import execute_retry +from db_conn import commit_retry, execute_retry from sl_tp_guard import ( cancel_monitor_exit_orders, ensure_monitor_order_columns, @@ -620,7 +620,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se rsl, rtp, rtrail, rinitial = _restore_sl_tp_from_closed(conn, sym, direction) if rsl is None and rtp is None and not rtrail: return mon - conn.execute( + execute_retry( + conn, """UPDATE trade_order_monitors SET stop_loss=?, take_profit=?, trailing_be=?, initial_stop_loss=? WHERE id=? AND status='active'""", @@ -833,6 +834,40 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se ).fetchone() ) + def _overlay_sl_tp_readonly( + conn, + mon: Optional[dict], + sym: str, + direction: str, + ) -> Optional[dict]: + """只读:从已关闭监控补全止盈止损,不写库。""" + if not mon: + rsl, rtp, rtrail, rinitial = _restore_sl_tp_from_closed(conn, sym, direction) + if rsl is None and rtp is None and not rtrail: + return None + return { + "symbol": sym, + "direction": direction, + "stop_loss": rsl, + "take_profit": rtp, + "trailing_be": rtrail, + "initial_stop_loss": rinitial, + } + sl = mon.get("stop_loss") + tp = mon.get("take_profit") + trailing = int(mon.get("trailing_be") or 0) + if sl is not None or tp is not None or trailing: + return mon + rsl, rtp, rtrail, rinitial = _restore_sl_tp_from_closed(conn, sym, direction) + if rsl is None and rtp is None and not rtrail: + return mon + merged = dict(mon) + merged["stop_loss"] = rsl + merged["take_profit"] = rtp + merged["trailing_be"] = rtrail + merged["initial_stop_loss"] = rinitial + return merged + def _revive_closed_monitor(conn, symbol: str, direction: str) -> Optional[dict]: """柜台仍有持仓但本地监控被误关时,恢复最近一条同品种记录。""" direction = (direction or "long").strip().lower() @@ -846,7 +881,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se continue if int(row.get("lots") or 0) <= 0: continue - conn.execute( + execute_retry( + conn, "UPDATE trade_order_monitors SET status='active' WHERE id=?", (row["id"],), ) @@ -1583,7 +1619,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se ctp_list = trading_state.get_positions() if not ctp_list: ctp_list = _ctp_positions( - mode, refresh_if_empty=not fast, refresh_margin=not fast, + mode, refresh_if_empty=True, refresh_margin=not fast, ) rows: list[dict] = [] @@ -1605,18 +1641,26 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se if not mon: mon = _find_pending_monitor(conn, ths, direction) if not mon: - mon = _find_or_revive_monitor(conn, ths, direction) + if fast: + mon = _find_active_monitor(conn, ths, direction) + else: + mon = _find_or_revive_monitor(conn, ths, direction) if mon: - mon = _restore_monitor_sl_tp_if_missing(conn, mon, ths, direction) or mon - _sync_monitor_from_ctp( - conn, int(mon["id"]), mon.get("symbol") or ths, - mon.get("direction") or direction, - mode, ctp=p, capital=capital, - ) - mon = _find_active_monitor( - conn, mon.get("symbol") or ths, mon.get("direction") or direction, - ) or mon - mon = _restore_monitor_sl_tp_if_missing(conn, mon, ths, direction) or mon + if fast: + mon = _overlay_sl_tp_readonly(conn, mon, ths, direction) or mon + else: + mon = _restore_monitor_sl_tp_if_missing(conn, mon, ths, direction) or mon + _sync_monitor_from_ctp( + conn, int(mon["id"]), mon.get("symbol") or ths, + mon.get("direction") or direction, + mode, ctp=p, capital=capital, + ) + mon = _find_active_monitor( + conn, mon.get("symbol") or ths, mon.get("direction") or direction, + ) or mon + mon = _restore_monitor_sl_tp_if_missing(conn, mon, ths, direction) or mon + elif fast: + mon = _overlay_sl_tp_readonly(conn, None, ths, direction) try: row = _compose_position_row( conn, mon=mon, ctp=p, mode=mode, capital=capital, @@ -1654,7 +1698,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se conn, mode=mode, capital=capital, now_iso=now_iso, ) rows = _apply_account_margin_to_rows(rows, mode, capital) - _persist_ctp_snapshot_to_monitors(conn, rows, mode) + if not fast: + _persist_ctp_snapshot_to_monitors(conn, rows, mode) pending_orders = _build_pending_orders(conn, mode) risk = get_risk_status( conn, @@ -1681,7 +1726,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def _refresh_trading_live_snapshot(*, fast: bool = False) -> dict: mode = get_trading_mode(get_setting) if ctp_status(mode).get("connected"): - if not fast: + if not fast or trading_state.sync_state == "idle": try: with _ctp_td_lock: get_bridge().calibrate_trading_state() @@ -1698,7 +1743,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se try: init_strategy_tables(conn) payload = _build_trading_live_payload(conn, fast=fast) - conn.commit() + commit_retry(conn) return payload finally: conn.close() @@ -1837,11 +1882,16 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def _after_connect() -> None: try: + try: + with _ctp_td_lock: + get_bridge().calibrate_trading_state() + except Exception as exc: + logger.debug("ctp connected calibrate: %s", exc) conn = get_db() try: init_strategy_tables(conn) _ensure_monitors_from_ctp(conn, mode) - conn.commit() + commit_retry(conn) finally: conn.close() _push_position_snapshot_async(fast=False) @@ -1937,7 +1987,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se try: init_strategy_tables(conn) payload = _build_trading_live_payload(conn, fast=True) - conn.commit() + commit_retry(conn) position_hub.set_snapshot(payload) return jsonify(payload) finally: