fix: 重启后立即读库展示持仓,CTP异步重连不再阻塞

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-25 14:57:39 +08:00
parent 01de8dfb69
commit 7daed9bd3a
5 changed files with 111 additions and 38 deletions
+2 -3
View File
@@ -11,7 +11,7 @@ from vnpy_bridge import ctp_try_auto_reconnect
logger = logging.getLogger(__name__)
RECONNECT_INTERVAL_SEC = 30
RECONNECT_INTERVAL_SEC = 10
def _auto_reconnect_enabled() -> bool:
@@ -26,7 +26,6 @@ def start_ctp_reconnect_worker(*, get_mode_fn: Callable[[], str], interval: int
"""定时检测 CTP 连接,断线后自动重连。"""
def _loop() -> None:
time.sleep(5)
while True:
try:
if _auto_reconnect_enabled():
@@ -35,6 +34,6 @@ def start_ctp_reconnect_worker(*, get_mode_fn: Callable[[], str], interval: int
logger.debug("CTP 连接正常 [%s]", mode)
except Exception as exc:
logger.warning("CTP reconnect worker: %s", exc)
time.sleep(max(15, interval))
time.sleep(max(5, interval))
threading.Thread(target=_loop, daemon=True, name="ctp-reconnect-worker").start()
+76 -24
View File
@@ -78,6 +78,7 @@ from vnpy_bridge import (
ctp_status,
execute_order,
get_bridge,
set_position_refresh_callback,
)
@@ -128,9 +129,18 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
except Exception:
return {}
def _ctp_positions(mode: str, *, refresh_if_empty: bool = True) -> list:
def _ctp_positions(
mode: str,
*,
refresh_if_empty: bool = True,
refresh_margin: bool = False,
) -> list:
try:
return ctp_list_positions(mode, refresh_if_empty=refresh_if_empty)
return ctp_list_positions(
mode,
refresh_if_empty=refresh_if_empty,
refresh_margin=refresh_margin,
)
except Exception:
return []
@@ -431,6 +441,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
mode: str,
capital: float,
now_iso: str,
fast: bool = False,
) -> Optional[dict]:
if not mon and not ctp:
return None
@@ -463,14 +474,16 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
holding = _holding_duration(open_time, now_iso) if open_time else ""
mark = None
if ctp_status(mode).get("connected"):
if not fast and ctp_status(mode).get("connected"):
mark = ctp_get_tick_price(mode, sym)
if (mark is None or mark <= 0) and codes:
if not fast and (mark is None or mark <= 0) and codes:
mark = fetch_price(
sym,
codes.get("market_code", ""),
codes.get("sina_code", ""),
)
if mark is None or mark <= 0:
mark = entry if entry else None
close_est = float(mark) if mark and mark > 0 else entry
if float_pnl is None and mark and entry:
pos_tmp = calc_position_metrics(
@@ -564,7 +577,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
"trailing_r_locked": int(mon.get("trailing_r_locked") or 0) if mon else 0,
}
def _build_trading_live_rows(conn) -> list[dict]:
def _build_trading_live_rows(conn, *, fast: bool = False) -> list[dict]:
from zoneinfo import ZoneInfo
tz = ZoneInfo("Asia/Shanghai")
now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M")
@@ -583,7 +596,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
if key not in monitor_by_key:
monitor_by_key[key] = mon
ctp_list: list[dict] = _ctp_positions(mode) if ctp_status(mode).get("connected") else []
ctp_list: list[dict] = (
_ctp_positions(mode, refresh_if_empty=not fast, refresh_margin=not fast)
if ctp_status(mode).get("connected") else []
)
ctp_by_key: dict[str, dict] = {}
for p in ctp_list:
if int(p.get("lots") or 0) <= 0:
@@ -617,6 +633,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
try:
row = _compose_position_row(
conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso,
fast=fast,
)
if row:
rows.append(row)
@@ -647,6 +664,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
try:
row = _compose_position_row(
conn, mon=mon, ctp=ctp, mode=mode, capital=capital, now_iso=now_iso,
fast=fast,
)
if row:
rows.append(row)
@@ -663,11 +681,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
deduped.append(row)
return deduped
def _build_trading_live_payload(conn) -> dict:
def _build_trading_live_payload(conn, *, fast: bool = False) -> dict:
mode = get_trading_mode(get_setting)
ctp_st = ctp_status(mode)
_ensure_monitors_from_ctp(conn, mode)
rows = _build_trading_live_rows(conn)
if not fast:
_ensure_monitors_from_ctp(conn, mode)
rows = _build_trading_live_rows(conn, fast=fast)
pending_orders = _build_pending_orders(conn, mode)
capital = _capital(conn)
risk = get_risk_status(conn, active_count=_effective_active_position_count(conn, mode))
@@ -682,26 +701,54 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
"trading_session": is_trading_session(),
}
def _refresh_trading_live_snapshot() -> dict:
def _refresh_trading_live_snapshot(*, fast: bool = False) -> dict:
mode = get_trading_mode(get_setting)
if not fast and ctp_status(mode).get("connected"):
try:
get_bridge().refresh_positions()
except Exception as exc:
logger.debug("refresh positions before snapshot: %s", exc)
conn = get_db()
try:
init_strategy_tables(conn)
payload = _build_trading_live_payload(conn)
payload = _build_trading_live_payload(conn, fast=fast)
conn.commit()
return payload
finally:
conn.close()
def _push_position_snapshot_async() -> None:
def _push_position_snapshot_async(*, fast: bool = False) -> None:
def _run() -> None:
try:
payload = _refresh_trading_live_snapshot()
payload = _refresh_trading_live_snapshot(fast=fast)
position_hub.broadcast("positions", payload)
except Exception as exc:
logger.debug("push position snapshot: %s", exc)
threading.Thread(target=_run, daemon=True).start()
def _bootstrap_trading_runtime() -> None:
"""进程启动:立刻读库展示持仓,并异步连 CTP。"""
set_position_refresh_callback(
lambda: _push_position_snapshot_async(fast=False)
)
def _warm() -> None:
try:
payload = _refresh_trading_live_snapshot(fast=True)
position_hub.set_snapshot(payload)
position_hub.broadcast("positions", payload)
except Exception as exc:
logger.warning("bootstrap position snapshot: %s", exc)
threading.Thread(target=_warm, daemon=True, name="position-bootstrap").start()
try:
from vnpy_bridge import ctp_start_connect
mode = get_trading_mode(get_setting)
ctp_start_connect(mode, force=False)
except Exception as exc:
logger.debug("bootstrap ctp connect: %s", exc)
@app.route("/trade")
@login_required
def trade_page():
@@ -788,7 +835,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
conn = get_db()
try:
init_strategy_tables(conn)
payload = _build_trading_live_payload(conn)
payload = _build_trading_live_payload(conn, fast=True)
conn.commit()
position_hub.set_snapshot(payload)
return jsonify(payload)
@@ -807,7 +854,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
if snap:
yield sse_format("positions", snap)
else:
payload = _refresh_trading_live_snapshot()
payload = _refresh_trading_live_snapshot(fast=True)
position_hub.set_snapshot(payload)
yield sse_format("positions", payload)
while True:
@@ -1322,20 +1369,24 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
"status": st,
"account": acc,
})
try:
st = ctp_connect(mode, force=force)
acc = _ctp_account(mode)
return jsonify({"ok": True, "status": st, "account": acc})
except Exception as exc:
st = ctp_status(mode)
return jsonify({"ok": False, "error": str(exc), "status": st}), 400
return jsonify({
"ok": False,
"error": st.get("last_error") or "CTP 连接未启动",
"status": st,
"account": acc,
}), 400
@app.route("/api/ctp/status")
@login_required
def api_ctp_status():
mode = get_trading_mode(get_setting)
st = ctp_status(mode)
acc = _ctp_account(mode) if st.get("connected") else {}
acc = {}
if st.get("connected"):
try:
acc = _ctp_account(mode)
except Exception:
acc = {}
return jsonify({"ok": True, "status": st, "account": acc})
@app.route("/api/account_snapshot")
@@ -1777,9 +1828,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
interval=1,
)
start_position_worker(
refresh_fn=_refresh_trading_live_snapshot,
refresh_fn=lambda: _refresh_trading_live_snapshot(fast=False),
interval=1,
)
_bootstrap_trading_runtime()
start_ctp_fee_worker(
get_mode_fn=lambda: get_trading_mode(get_setting),
get_setting_fn=get_setting,
-1
View File
@@ -76,7 +76,6 @@ def start_position_worker(
"""后台定时刷新持仓快照并 SSE 广播。"""
def _loop() -> None:
time.sleep(3)
while True:
sleep_sec = idle_interval
try:
+5
View File
@@ -111,6 +111,10 @@
function savePosCache(data) {
try {
if (!data || !data.rows || !data.rows.length) {
var prev = loadPosCache();
if (prev && prev.rows && prev.rows.length) return;
}
sessionStorage.setItem(POS_CACHE_KEY, JSON.stringify(data));
} catch (e) { /* quota */ }
}
@@ -947,6 +951,7 @@
}
pollPositions();
connectPositionStream();
requestCtpConnect(false);
connectRecommendStream();
fetch('/api/recommend/list')
.then(function (r) { return r.json(); })
+28 -10
View File
@@ -6,7 +6,7 @@ import os
import threading
import time
from collections import deque
from typing import Any, Optional
from typing import Any, Callable, Optional
from locale_fix import ensure_process_locale
@@ -19,6 +19,23 @@ logger = logging.getLogger(__name__)
GATEWAY_NAME = "CTP"
_position_refresh_callback: Optional[Callable[[], None]] = None
def set_position_refresh_callback(fn: Optional[Callable[[], None]]) -> None:
global _position_refresh_callback
_position_refresh_callback = fn
def _fire_position_refresh_callback() -> None:
fn = _position_refresh_callback
if not fn:
return
try:
threading.Thread(target=fn, daemon=True, name="ctp-position-refresh").start()
except Exception as exc:
logger.debug("position refresh callback: %s", exc)
_bridge: Optional["CtpBridge"] = None
_bridge_lock = threading.Lock()
@@ -231,6 +248,7 @@ class CtpBridge:
self.refresh_positions()
except Exception as exc:
logger.debug("initial position query: %s", exc)
_fire_position_refresh_callback()
return
time.sleep(0.5)
finally:
@@ -796,7 +814,7 @@ class CtpBridge:
except Exception as exc:
logger.debug("refresh_positions: %s", exc)
def list_positions(self, *, refresh_if_empty: bool = True, refresh_margin: bool = True) -> list[dict[str, Any]]:
def list_positions(self, *, refresh_if_empty: bool = True, refresh_margin: bool = False) -> list[dict[str, Any]]:
if self._engine and self._connected_mode and refresh_margin:
self.refresh_positions()
out = self._collect_positions()
@@ -958,23 +976,23 @@ def ctp_start_connect(mode: str, *, force: bool = False) -> dict[str, Any]:
def ctp_try_auto_reconnect(mode: str) -> bool:
"""断线时静默重连;已连接且 ping 正常则直接返回 True。"""
"""断线时静默异步重连;已连接且 ping 正常则直接返回 True。"""
b = get_bridge()
if not b.available():
return False
if b.connect_in_progress():
return False
return True
st = _setting_for_mode(mode)
if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"):
return False
if b.connected_mode == mode and b.ping():
return True
try:
b.connect(mode, force=False)
return b.connected_mode == mode
except Exception as exc:
logger.info("CTP 自动重连失败: %s", exc)
return False
info = b.start_connect_async(mode, force=False)
return bool(
info.get("connected")
or info.get("connecting")
or info.get("started")
)
def ctp_status(mode: str) -> dict[str, Any]: