From 658982d2b93ab3f061c1a59841494e866486b58f Mon Sep 17 00:00:00 2001 From: dekun Date: Wed, 1 Jul 2026 15:02:37 +0800 Subject: [PATCH] Speed up CTP account display: refresh equity on fast snapshot without waiting for full position sync. Co-authored-by: Cursor --- modules/core/trading_context.py | 10 +++- modules/trading/install.py | 99 +++++++++++++++++++++++++++++---- modules/web/static/js/trade.js | 6 +- 3 files changed, 101 insertions(+), 14 deletions(-) diff --git a/modules/core/trading_context.py b/modules/core/trading_context.py index 5e5e39e..80be077 100644 --- a/modules/core/trading_context.py +++ b/modules/core/trading_context.py @@ -88,8 +88,14 @@ def _cached_ctp_account(mode: str) -> dict[str, float]: snap = position_hub.get_snapshot() or {} cap = float(snap.get("capital") or 0) - if cap > 0: - return {"balance": cap} + avail = snap.get("account_available") + if cap > 0 or avail is not None: + out: dict[str, float] = {} + if cap > 0: + out["balance"] = cap + if avail is not None: + out["available"] = float(avail) + return out except Exception: pass try: diff --git a/modules/trading/install.py b/modules/trading/install.py index 82a46fe..ba072a0 100644 --- a/modules/trading/install.py +++ b/modules/trading/install.py @@ -154,6 +154,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se """注册交易相关路由。""" _nav = require_nav _live_refresh_lock = threading.Lock() + _live_slow_lock = threading.Lock() _ctp_status_cache: dict = {"mode": "", "status": {}, "ts": 0.0} _ctp_status_cache_lock = threading.Lock() _ctp_status_refresh_flag = {"busy": False} @@ -331,6 +332,52 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se except Exception: return {} + def _live_account_fields(mode: str) -> dict: + """轻量读柜台资金(不跑 SQLite 对账)。""" + if not _cached_ctp_status(mode).get("connected"): + return {} + try: + acc = _ctp_account(mode) + except Exception: + return {} + balance = float(acc.get("balance") or 0) + available = acc.get("available") + out: dict = {} + if balance > 0: + out["capital"] = balance + if available is not None: + out["account_available"] = round(float(available), 2) + try: + mu = ctp_account_margin_used(mode) + if mu is not None: + out["margin_used"] = mu + except Exception: + pass + return out + + def _apply_live_account(payload: dict, mode: str, fields: dict | None = None) -> dict: + merged = dict(payload) + acct = fields if fields is not None else _live_account_fields(mode) + if not acct: + return merged + if acct.get("capital"): + merged["capital"] = acct["capital"] + if acct.get("account_available") is not None: + merged["account_available"] = acct["account_available"] + if acct.get("margin_used") is not None: + merged["margin_used"] = acct["margin_used"] + return merged + + def _broadcast_account_fast(mode: str) -> None: + """CTP 连上后立刻推送权益/可用,持仓可仍在后台同步。""" + try: + snap = position_hub.get_snapshot() or {"ok": True, "rows": []} + merged = _apply_live_account(dict(snap), mode) + merged["ctp_status"] = dict(ctp_status(mode) or _cached_ctp_status(mode)) + position_hub.broadcast("positions", merged) + except Exception as exc: + logger.debug("broadcast account fast: %s", exc) + def _ctp_positions( mode: str, *, @@ -2132,7 +2179,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se equity=capital, ) syncing = bool(ctp_st.get("connected") or ctp_st.get("connecting")) - return { + payload = { "ok": True, "rows": [], "active_orders": [], @@ -2148,6 +2195,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "sync_state": "syncing" if syncing else trading_state.sync_state, "sync_label": "加载中…" if syncing else trading_state.sync_label(), } + return _apply_live_account(payload, mode) def _normalize_live_payload(payload: dict) -> dict: if payload.get("rows"): @@ -2212,37 +2260,54 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se since_connect = time.time() - float( getattr(get_bridge(), "_last_connect_ok_ts", 0) or 0, ) - if since_connect < 180: + if since_connect < 45: payload = dict(payload) payload["sync_state"] = "syncing" payload["sync_label"] = "持仓同步中…" - return _normalize_live_payload(payload) + return _apply_live_account(_normalize_live_payload(payload), mode) finally: conn.close() if fast: + mode = get_trading_mode(get_setting) snap = position_hub.get_snapshot() if snap: - return snap + return _apply_live_account(dict(snap), mode) if _live_refresh_lock.acquire(blocking=False): try: - return _build() + return _apply_live_account(_build(), mode) finally: _live_refresh_lock.release() conn = get_db() try: init_strategy_tables(conn) - return _minimal_live_payload(conn) + return _apply_live_account(_minimal_live_payload(conn), mode) finally: conn.close() - with _live_refresh_lock: - return _build() + if not _live_slow_lock.acquire(blocking=False): + mode = get_trading_mode(get_setting) + snap = position_hub.get_snapshot() + if snap: + return _apply_live_account(dict(snap), mode) + conn = get_db() + try: + init_strategy_tables(conn) + return _apply_live_account(_minimal_live_payload(conn), mode) + finally: + conn.close() + try: + with _live_refresh_lock: + return _apply_live_account(_build(), get_trading_mode(get_setting)) + finally: + _live_slow_lock.release() def _push_position_snapshot_async(*, fast: bool = True) -> None: def _run() -> None: try: payload = _refresh_trading_live_snapshot(fast=fast) position_hub.broadcast("positions", payload) + if fast: + return conn = get_db() try: rec = _recommend_payload(conn) @@ -2415,11 +2480,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def _on_ctp_connected(mode: str) -> None: if mode != get_trading_mode(get_setting): return + _broadcast_account_fast(mode) _schedule_recommend_refresh() _push_position_snapshot_async(fast=True) def _after_connect() -> None: try: + _broadcast_account_fast(mode) + _push_position_snapshot_async(fast=True) try: with _ctp_td_lock: get_bridge().request_position_snapshot(force=True) @@ -2434,7 +2502,15 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se commit_retry(conn) finally: conn.close() - _push_position_snapshot_async(fast=False) + + def _slow_sync() -> None: + time.sleep(8) + try: + _push_position_snapshot_async(fast=False) + except Exception as exc: + logger.debug("ctp connected slow sync: %s", exc) + + threading.Thread(target=_slow_sync, daemon=True, name="ctp-slow-sync").start() except Exception as exc: logger.debug("ctp connected monitor restore: %s", exc) @@ -2463,11 +2539,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se ), equity=capital, ) - ctp_acc = {} + ctp_acc = _ctp_account(mode) if connected else {} bootstrap_live = position_hub.get_snapshot() if connected and bootstrap_live and bootstrap_live.get("capital") is not None: cap = float(bootstrap_live.get("capital") or 0) - ctp_acc = {"balance": cap, "available": cap} + if cap > 0 and not ctp_acc.get("balance"): + ctp_acc = {"balance": cap, "available": bootstrap_live.get("account_available", cap)} active_trend = conn.execute( "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1" ).fetchone() diff --git a/modules/web/static/js/trade.js b/modules/web/static/js/trade.js index cb79583..3135afb 100644 --- a/modules/web/static/js/trade.js +++ b/modules/web/static/js/trade.js @@ -264,7 +264,7 @@ posFastPollTimer = setInterval(function () { pollPositions(); posFastPollCount += 1; - if (posFastPollCount >= 90) stopPosFastPoll(); + if (posFastPollCount >= 120) stopPosFastPoll(); }, 1000); } @@ -272,6 +272,10 @@ if (!data) return; var cap = document.getElementById('cap-display'); if (cap && data.capital != null) cap.textContent = Number(data.capital).toFixed(2); + var avail = document.getElementById('avail-display'); + if (avail && data.account_available != null) { + avail.textContent = Number(data.account_available).toFixed(2); + } var connected = data.ctp_status && data.ctp_status.connected; var connecting = data.ctp_status && data.ctp_status.connecting; var cooldownSec = (data.ctp_status && data.ctp_status.login_cooldown_sec) || 0;