Fix slow CTP position sync after restart and link positions to 15m K-line.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-26 11:05:13 +08:00
parent deb9501cbe
commit 7a4a3f08e5
4 changed files with 80 additions and 10 deletions
+21 -4
View File
@@ -771,11 +771,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
"monitor_id": mon["id"] if mon else None, "monitor_id": mon["id"] if mon else None,
}) })
row_key = _canonical_position_key(sym, direction) row_key = _canonical_position_key(sym, direction)
ctp_st = ctp_status(mode)
sync_pending = (
mon is not None
and ctp is None
and bool(ctp_st.get("connected"))
)
return { return {
"key": row_key, "key": row_key,
"source": "ctp" if ctp else "local", "source": "ctp" if ctp else "local",
"source_label": source_label, "source_label": source_label,
"sync_pending": ctp is None and mon is not None, "sync_pending": sync_pending,
"monitor_id": mon["id"] if mon else None, "monitor_id": mon["id"] if mon else None,
"symbol_code": sym, "symbol_code": sym,
**_symbol_display_fields(sym), **_symbol_display_fields(sym),
@@ -1024,7 +1030,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
capital = _capital(conn) capital = _capital(conn)
if not fast and ctp_st.get("connected"): if not fast and ctp_st.get("connected"):
_reconcile_pending(conn, mode, capital=capital) _reconcile_pending(conn, mode, capital=capital)
if not fast: if ctp_st.get("connected"):
_ensure_monitors_from_ctp(conn, mode) _ensure_monitors_from_ctp(conn, mode)
rows = _build_trading_live_rows(conn, fast=fast) rows = _build_trading_live_rows(conn, fast=fast)
pending_orders = _build_pending_orders(conn, mode) pending_orders = _build_pending_orders(conn, mode)
@@ -2262,8 +2268,19 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
def _position_worker_refresh() -> dict: def _position_worker_refresh() -> dict:
_pos_refresh_tick["n"] += 1 _pos_refresh_tick["n"] += 1
# 每秒轻量刷新;每 5 秒做一次 CTP 持仓/挂单对账,避免频繁 query 导致 vnctptd 崩溃 mode = get_trading_mode(get_setting)
return _refresh_trading_live_snapshot(fast=(_pos_refresh_tick["n"] % 5 != 0)) connected = bool(ctp_status(mode).get("connected"))
# 已连接时每 2 秒完整对账;未连接时每 5 秒轻量刷新(禁止 query_position
if connected:
use_fast = _pos_refresh_tick["n"] % 2 != 0
else:
use_fast = _pos_refresh_tick["n"] % 5 != 0
payload = _refresh_trading_live_snapshot(fast=use_fast)
if connected and use_fast and any(
r.get("sync_pending") for r in (payload.get("rows") or [])
):
payload = _refresh_trading_live_snapshot(fast=False)
return payload
start_position_worker( start_position_worker(
refresh_fn=_position_worker_refresh, refresh_fn=_position_worker_refresh,
+3
View File
@@ -65,6 +65,9 @@
.gap-badge{font-size:.72rem} .gap-badge{font-size:.72rem}
.rec-market-link{color:inherit;text-decoration:none;display:inline-flex;flex-wrap:wrap;align-items:baseline;gap:.2rem .35rem} .rec-market-link{color:inherit;text-decoration:none;display:inline-flex;flex-wrap:wrap;align-items:baseline;gap:.2rem .35rem}
.rec-market-link:hover strong,.rec-market-link:hover .text-accent{color:var(--accent);text-decoration:underline} .rec-market-link:hover strong,.rec-market-link:hover .text-accent{color:var(--accent);text-decoration:underline}
.pos-market-link{color:inherit;text-decoration:none}
.pos-market-link:hover{color:var(--accent)}
.pos-market-link:hover .text-accent{text-decoration:underline}
.pos-symbol-sub{font-size:.72rem;line-height:1.35} .pos-symbol-sub{font-size:.72rem;line-height:1.35}
.pos-main-badge{font-size:.68rem;vertical-align:middle} .pos-main-badge{font-size:.68rem;vertical-align:middle}
.pos-change-up{color:var(--profit)} .pos-change-up{color:var(--profit)}
+20 -5
View File
@@ -30,6 +30,7 @@
var isTradingSession = false; var isTradingSession = false;
var hasSlTpMonitoring = false; var hasSlTpMonitoring = false;
var ctpConnected = false; var ctpConnected = false;
var ctpConnecting = false;
var positionsRendered = false; var positionsRendered = false;
var selectedMaxLots = null; var selectedMaxLots = null;
var recommendMaxByProduct = {}; var recommendMaxByProduct = {};
@@ -164,6 +165,7 @@
var cooldownSec = (data.ctp_status && data.ctp_status.login_cooldown_sec) || 0; var cooldownSec = (data.ctp_status && data.ctp_status.login_cooldown_sec) || 0;
if (cooldownSec > 0) connecting = false; if (cooldownSec > 0) connecting = false;
ctpConnected = !!connected; ctpConnected = !!connected;
ctpConnecting = !!connecting;
isTradingSession = !!data.trading_session; isTradingSession = !!data.trading_session;
syncCtpBadgeFromStatus(data.ctp_status || { connected: connected, connecting: connecting }); syncCtpBadgeFromStatus(data.ctp_status || { connected: connected, connecting: connecting });
if (!connected && !connecting && data.ctp_status && data.ctp_status.last_error) { if (!connected && !connecting && data.ctp_status && data.ctp_status.last_error) {
@@ -811,13 +813,17 @@
var name = row.symbol_name || row.symbol || ''; var name = row.symbol_name || row.symbol || '';
var code = row.symbol_code || ''; var code = row.symbol_code || '';
var mainBadge = row.symbol_is_main ? ' <span class="badge planned pos-main-badge">主力</span>' : ''; var mainBadge = row.symbol_is_main ? ' <span class="badge planned pos-main-badge">主力</span>' : '';
var title = name + mainBadge; var inner = name + mainBadge;
if (code && String(name).toLowerCase() !== String(code).toLowerCase()) { if (code && String(name).toLowerCase() !== String(code).toLowerCase()) {
title += ' <span class="text-accent">' + code + '</span>'; inner += ' <span class="text-accent">' + code + '</span>';
} else if (!name && code) { } else if (!name && code) {
title = '<span class="text-accent">' + code + '</span>'; inner = '<span class="text-accent">' + code + '</span>';
} }
return title + extraBadges; if (marketNavEnabled && code) {
var href = '/market?symbol=' + encodeURIComponent(code) + '&period=15m';
inner = '<a href="' + href + '" class="pos-market-link" title="查看 15 分 K 线">' + inner + '</a>';
}
return inner + extraBadges;
} }
function posSymbolSubHtml(row) { function posSymbolSubHtml(row) {
@@ -910,7 +916,16 @@
' · ' + slTpStatusHtml(row) + ' · ' + slTpStatusHtml(row) +
' · 移动保本 ' + trailingStatusHtml(row) + ' · 移动保本 ' + trailingStatusHtml(row) +
(slTpBtn ? ' · ' + slTpBtn : '') + (slTpBtn ? ' · ' + slTpBtn : '') +
(row.sync_pending ? ' · <span class="text-muted">同步柜台中…</span>' : ''); (function () {
if (row.order_state === 'pending' || !row.monitor_id) return '';
if (ctpConnecting) {
return ' · <span class="text-muted">CTP 连接中…</span>';
}
if (row.sync_pending) {
return ' · <span class="text-muted">同步柜台中…</span>';
}
return '';
}());
var feeLabel = row.fee_source === 'ctp' ? '手续费(柜台)' : '手续费'; var feeLabel = row.fee_source === 'ctp' ? '手续费(柜台)' : '手续费';
var marginLabel = row.margin_source === 'ctp' ? '占用保证金(柜台)' : '占用保证金'; var marginLabel = row.margin_source === 'ctp' ? '占用保证金(柜台)' : '占用保证金';
var openLabel = '开仓'; var openLabel = '开仓';
+36 -1
View File
@@ -73,6 +73,8 @@ def _load_persisted_last_error() -> str:
return (get_setting(CTP_LAST_ERROR_KEY, "") or "").strip() return (get_setting(CTP_LAST_ERROR_KEY, "") or "").strip()
_position_refresh_callback: Optional[Callable[[], None]] = None _position_refresh_callback: Optional[Callable[[], None]] = None
_position_refresh_debounce_lock = threading.Lock()
_position_refresh_debounce_ts: float = 0.0
def set_position_refresh_callback(fn: Optional[Callable[[], None]]) -> None: def set_position_refresh_callback(fn: Optional[Callable[[], None]]) -> None:
@@ -89,6 +91,23 @@ def _fire_position_refresh_callback() -> None:
except Exception as exc: except Exception as exc:
logger.debug("position refresh callback: %s", exc) logger.debug("position refresh callback: %s", exc)
def _fire_position_refresh_callback_debounced(*, min_interval: float = 0.35) -> None:
global _position_refresh_debounce_ts
now = time.monotonic()
with _position_refresh_debounce_lock:
if now - _position_refresh_debounce_ts < min_interval:
return
_position_refresh_debounce_ts = now
_fire_position_refresh_callback()
def _fire_position_refresh_burst() -> None:
"""连接后持仓回报可能分批到达,分多次触发快照刷新。"""
_fire_position_refresh_callback()
for delay in (1.5, 4.0, 10.0, 25.0):
threading.Timer(delay, _fire_position_refresh_callback).start()
_bridge: Optional["CtpBridge"] = None _bridge: Optional["CtpBridge"] = None
_bridge_lock = threading.Lock() _bridge_lock = threading.Lock()
_ctp_td_lock = threading.RLock() _ctp_td_lock = threading.RLock()
@@ -203,6 +222,7 @@ class CtpBridge:
self._last_trade_query_ts: float = 0.0 self._last_trade_query_ts: float = 0.0
self._last_connect_ok_ts: float = 0.0 self._last_connect_ok_ts: float = 0.0
self._tick_hooked = False self._tick_hooked = False
self._position_hooked = False
self._bar_generators: dict[str, Any] = {} self._bar_generators: dict[str, Any] = {}
self._bars_1m: dict[str, deque] = {} self._bars_1m: dict[str, deque] = {}
self._init_engine() self._init_engine()
@@ -217,11 +237,26 @@ class CtpBridge:
self._ee = EventEngine() self._ee = EventEngine()
self._engine = MainEngine(self._ee) self._engine = MainEngine(self._ee)
self._engine.add_gateway(CtpGateway) self._engine.add_gateway(CtpGateway)
self._ensure_position_event_hook()
except ImportError: except ImportError:
self._last_error = "未安装 vnpy / vnpy_ctp,请 pip install vnpy vnpy_ctp" self._last_error = "未安装 vnpy / vnpy_ctp,请 pip install vnpy vnpy_ctp"
except Exception as exc: except Exception as exc:
self._last_error = str(exc) self._last_error = str(exc)
def _ensure_position_event_hook(self) -> None:
if self._position_hooked or not self._ee:
return
try:
from vnpy.trader.event import EVENT_POSITION
except ImportError:
return
def _on_position(_event) -> None:
_fire_position_refresh_callback_debounced()
self._ee.register(EVENT_POSITION, _on_position)
self._position_hooked = True
def available(self) -> bool: def available(self) -> bool:
return self._engine is not None return self._engine is not None
@@ -414,7 +449,7 @@ class CtpBridge:
mode, self._td_logged_in(), mode, self._td_logged_in(),
len(self._engine.get_all_accounts() or [])) len(self._engine.get_all_accounts() or []))
self._schedule_fee_sync(mode) self._schedule_fee_sync(mode)
_fire_position_refresh_callback() _fire_position_refresh_burst()
return return
finally: finally:
self._ee.unregister(EVENT_LOG, _on_log) self._ee.unregister(EVENT_LOG, _on_log)