Speed up equity/position display after CTP reconnect and auto-refresh stale sessions before market open.
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -18,7 +18,7 @@ from modules.market.market_sessions import (
|
|||||||
is_trading_session,
|
is_trading_session,
|
||||||
should_keep_ctp_connected,
|
should_keep_ctp_connected,
|
||||||
)
|
)
|
||||||
from modules.ctp.vnpy_bridge import ctp_start_connect, ctp_status
|
from modules.ctp.vnpy_bridge import ctp_start_connect, ctp_status, ctp_session_needs_reconnect
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -105,6 +105,19 @@ def start_ctp_premarket_connect_worker(
|
|||||||
mode,
|
mode,
|
||||||
mins_b,
|
mins_b,
|
||||||
)
|
)
|
||||||
|
elif (
|
||||||
|
st.get("connected")
|
||||||
|
and not st.get("connecting")
|
||||||
|
and int(st.get("login_cooldown_sec") or 0) <= 0
|
||||||
|
and (
|
||||||
|
in_premarket_connect_window(minutes_before=mins_b)
|
||||||
|
or is_trading_session()
|
||||||
|
)
|
||||||
|
and ctp_session_needs_reconnect(mode)
|
||||||
|
):
|
||||||
|
info = ctp_start_connect(mode, force=True, scheduled=True)
|
||||||
|
if info.get("started") or info.get("connecting"):
|
||||||
|
logger.info("CTP 会话刷新(盘前/交易时段)[%s]", mode)
|
||||||
if is_trading_session():
|
if is_trading_session():
|
||||||
sleep_sec = TRADING_CHECK_INTERVAL_SEC
|
sleep_sec = TRADING_CHECK_INTERVAL_SEC
|
||||||
elif in_premarket_connect_window(minutes_before=mins_b):
|
elif in_premarket_connect_window(minutes_before=mins_b):
|
||||||
|
|||||||
+46
-12
@@ -173,7 +173,7 @@ def _fire_position_refresh_callback_debounced(*, min_interval: float = 0.35) ->
|
|||||||
def _fire_position_refresh_burst() -> None:
|
def _fire_position_refresh_burst() -> None:
|
||||||
"""连接后持仓回报可能分批到达,分多次触发快照刷新。"""
|
"""连接后持仓回报可能分批到达,分多次触发快照刷新。"""
|
||||||
_fire_position_refresh_callback()
|
_fire_position_refresh_callback()
|
||||||
for delay in (0.4, 0.9, 1.5, 3.0, 6.0, 12.0, 20.0):
|
for delay in (0.3, 0.7, 1.2, 2.5, 5.0, 10.0):
|
||||||
threading.Timer(delay, _fire_position_refresh_callback).start()
|
threading.Timer(delay, _fire_position_refresh_callback).start()
|
||||||
|
|
||||||
|
|
||||||
@@ -228,7 +228,7 @@ _bridge: Optional["CtpBridge"] = None
|
|||||||
_bridge_lock = threading.Lock()
|
_bridge_lock = threading.Lock()
|
||||||
_ctp_td_lock = threading.RLock()
|
_ctp_td_lock = threading.RLock()
|
||||||
POSITION_QUERY_MIN_INTERVAL_SEC = 5.0
|
POSITION_QUERY_MIN_INTERVAL_SEC = 5.0
|
||||||
POSITION_QUERY_RETRY_DELAYS_SEC = (1.5, 4.0, 9.0, 18.0, 35.0)
|
POSITION_QUERY_RETRY_DELAYS_SEC = (0.8, 2.0, 4.0, 8.0, 15.0)
|
||||||
TRADE_QUERY_MIN_INTERVAL_SEC = 10.0
|
TRADE_QUERY_MIN_INTERVAL_SEC = 10.0
|
||||||
|
|
||||||
|
|
||||||
@@ -783,6 +783,7 @@ class CtpBridge:
|
|||||||
"connected": self._connected_mode == mode,
|
"connected": self._connected_mode == mode,
|
||||||
"connecting": connecting,
|
"connecting": connecting,
|
||||||
"connected_mode": self._connected_mode,
|
"connected_mode": self._connected_mode,
|
||||||
|
"td_logged_in": self._td_logged_in() if self._connected_mode == mode else False,
|
||||||
"mode_label": _mode_label(mode),
|
"mode_label": _mode_label(mode),
|
||||||
"missing_config": missing,
|
"missing_config": missing,
|
||||||
"last_error": last_error,
|
"last_error": last_error,
|
||||||
@@ -1375,11 +1376,13 @@ class CtpBridge:
|
|||||||
return False
|
return False
|
||||||
if self._td_logged_in():
|
if self._td_logged_in():
|
||||||
return True
|
return True
|
||||||
try:
|
age = time.time() - float(getattr(self, "_last_connect_ok_ts", 0) or 0)
|
||||||
if self._engine.get_all_accounts():
|
if age < 90:
|
||||||
return True
|
try:
|
||||||
except Exception as exc:
|
if self._engine.get_all_accounts():
|
||||||
logger.debug("CTP ping failed: %s", exc)
|
return True
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("CTP ping failed: %s", exc)
|
||||||
self._connected_mode = None
|
self._connected_mode = None
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -2900,10 +2903,39 @@ def ctp_start_connect(mode: str, *, force: bool = False, scheduled: bool = False
|
|||||||
return {**info, "status": st}
|
return {**info, "status": st}
|
||||||
|
|
||||||
|
|
||||||
|
def ctp_session_needs_reconnect(mode: str) -> bool:
|
||||||
|
"""连接看似在线但不可交易、无资金回报或会话过旧时需强制重连。"""
|
||||||
|
if _use_ctp_worker_client():
|
||||||
|
st = ctp_ipc_client.status(mode)
|
||||||
|
if not st.get("connected"):
|
||||||
|
return True
|
||||||
|
if not st.get("td_logged_in"):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
b = get_bridge()
|
||||||
|
if b.connected_mode != mode:
|
||||||
|
return True
|
||||||
|
if not b._td_logged_in():
|
||||||
|
return True
|
||||||
|
age = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0)
|
||||||
|
if age > 3 * 3600:
|
||||||
|
return True
|
||||||
|
try:
|
||||||
|
acc = b.get_account()
|
||||||
|
if float(acc.get("balance") or 0) <= 0:
|
||||||
|
return True
|
||||||
|
except Exception:
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def ctp_try_auto_reconnect(mode: str) -> bool:
|
def ctp_try_auto_reconnect(mode: str) -> bool:
|
||||||
"""断线时静默异步重连;已连接且交易通道正常则不再重复 connect。"""
|
"""断线时静默异步重连;已连接且交易通道正常则不再重复 connect。"""
|
||||||
if _use_ctp_worker_client():
|
if _use_ctp_worker_client():
|
||||||
info = ctp_ipc_client.start_connect(mode, force=False, scheduled=True)
|
if ctp_session_needs_reconnect(mode):
|
||||||
|
info = ctp_ipc_client.start_connect(mode, force=True, scheduled=True)
|
||||||
|
else:
|
||||||
|
info = ctp_ipc_client.start_connect(mode, force=False, scheduled=True)
|
||||||
return bool(
|
return bool(
|
||||||
info.get("connected")
|
info.get("connected")
|
||||||
or info.get("connecting")
|
or info.get("connecting")
|
||||||
@@ -2921,13 +2953,15 @@ def ctp_try_auto_reconnect(mode: str) -> bool:
|
|||||||
st = _setting_for_mode(mode)
|
st = _setting_for_mode(mode)
|
||||||
if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"):
|
if not st.get("用户名") or not st.get("密码") or not st.get("交易服务器"):
|
||||||
return False
|
return False
|
||||||
|
force = False
|
||||||
if b.connected_mode == mode:
|
if b.connected_mode == mode:
|
||||||
if b._td_logged_in() or b.ping():
|
if not ctp_session_needs_reconnect(mode):
|
||||||
return True
|
return True
|
||||||
recent = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0)
|
recent = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0)
|
||||||
if recent < 120:
|
if recent < 45 and b._td_logged_in():
|
||||||
logger.debug("CTP 跳过自动重连:刚连接 %.0fs", recent)
|
|
||||||
return True
|
return True
|
||||||
|
force = True
|
||||||
|
logger.info("CTP 会话失效或过旧,强制重连 [%s]", mode)
|
||||||
td = st.get("交易服务器", "")
|
td = st.get("交易服务器", "")
|
||||||
ok, err = probe_tcp_address(td, timeout=4.0)
|
ok, err = probe_tcp_address(td, timeout=4.0)
|
||||||
if not ok:
|
if not ok:
|
||||||
@@ -2936,7 +2970,7 @@ def ctp_try_auto_reconnect(mode: str) -> bool:
|
|||||||
"请更新 SIMNOW_TD_ADDRESS 并确认服务器出网。"
|
"请更新 SIMNOW_TD_ADDRESS 并确认服务器出网。"
|
||||||
)
|
)
|
||||||
return False
|
return False
|
||||||
info = b.start_connect_async(mode, force=False, scheduled=True)
|
info = b.start_connect_async(mode, force=force, scheduled=True)
|
||||||
return bool(
|
return bool(
|
||||||
info.get("connected")
|
info.get("connected")
|
||||||
or info.get("connecting")
|
or info.get("connecting")
|
||||||
|
|||||||
@@ -2583,7 +2583,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
since_connect = time.time() - float(
|
since_connect = time.time() - float(
|
||||||
getattr(get_bridge(), "_last_connect_ok_ts", 0) or 0,
|
getattr(get_bridge(), "_last_connect_ok_ts", 0) or 0,
|
||||||
)
|
)
|
||||||
if since_connect < 45:
|
if since_connect < 90:
|
||||||
payload = dict(payload)
|
payload = dict(payload)
|
||||||
payload["sync_state"] = "syncing"
|
payload["sync_state"] = "syncing"
|
||||||
payload["sync_label"] = "持仓同步中…"
|
payload["sync_label"] = "持仓同步中…"
|
||||||
@@ -2771,7 +2771,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
position_hub.broadcast("positions", payload)
|
position_hub.broadcast("positions", payload)
|
||||||
|
|
||||||
def _slow_sync() -> None:
|
def _slow_sync() -> None:
|
||||||
time.sleep(20)
|
time.sleep(8)
|
||||||
try:
|
try:
|
||||||
pl = _refresh_trading_live_snapshot(fast=False)
|
pl = _refresh_trading_live_snapshot(fast=False)
|
||||||
position_hub.set_snapshot(pl)
|
position_hub.set_snapshot(pl)
|
||||||
@@ -2828,13 +2828,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
def _slow_sync() -> None:
|
def _slow_sync() -> None:
|
||||||
time.sleep(8)
|
time.sleep(3)
|
||||||
try:
|
try:
|
||||||
_push_position_snapshot_async(fast=False)
|
_push_position_snapshot_async(fast=False)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug("ctp connected slow sync: %s", exc)
|
logger.debug("ctp connected slow sync: %s", exc)
|
||||||
|
|
||||||
threading.Thread(target=_slow_sync, daemon=True, name="ctp-slow-sync").start()
|
threading.Thread(target=_slow_sync, daemon=True, name="ctp-slow-sync").start()
|
||||||
|
threading.Timer(
|
||||||
|
1.2,
|
||||||
|
lambda: _push_position_snapshot_async(fast=False),
|
||||||
|
).start()
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
logger.debug("ctp connected monitor restore: %s", exc)
|
logger.debug("ctp connected monitor restore: %s", exc)
|
||||||
|
|
||||||
@@ -2954,9 +2958,11 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
@app.route("/api/trading/live")
|
@app.route("/api/trading/live")
|
||||||
@login_required
|
@login_required
|
||||||
def api_trading_live():
|
def api_trading_live():
|
||||||
|
mode = get_trading_mode(get_setting)
|
||||||
snap = position_hub.get_snapshot()
|
snap = position_hub.get_snapshot()
|
||||||
if snap:
|
if snap:
|
||||||
return jsonify(_normalize_live_payload(snap))
|
payload = _apply_live_account(dict(snap), mode)
|
||||||
|
return jsonify(_normalize_live_payload(payload))
|
||||||
payload = _refresh_trading_live_snapshot(fast=True)
|
payload = _refresh_trading_live_snapshot(fast=True)
|
||||||
payload = _normalize_live_payload(payload)
|
payload = _normalize_live_payload(payload)
|
||||||
position_hub.set_snapshot(payload)
|
position_hub.set_snapshot(payload)
|
||||||
@@ -2984,7 +2990,11 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
yield sse_format("positions", payload)
|
yield sse_format("positions", payload)
|
||||||
_push_position_snapshot_async(fast=True)
|
_push_position_snapshot_async(fast=True)
|
||||||
else:
|
else:
|
||||||
yield sse_format("positions", snap)
|
mode = get_trading_mode(get_setting)
|
||||||
|
yield sse_format(
|
||||||
|
"positions",
|
||||||
|
_apply_live_account(dict(snap), mode),
|
||||||
|
)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
msg = q.get(timeout=25)
|
msg = q.get(timeout=25)
|
||||||
@@ -5045,7 +5055,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
since_connect = now - float(
|
since_connect = now - float(
|
||||||
getattr(get_bridge(), "_last_connect_ok_ts", 0) or 0,
|
getattr(get_bridge(), "_last_connect_ok_ts", 0) or 0,
|
||||||
)
|
)
|
||||||
if connected and since_connect < 45:
|
if connected and since_connect < 90:
|
||||||
return _refresh_trading_live_snapshot(fast=True)
|
return _refresh_trading_live_snapshot(fast=True)
|
||||||
need_full = (
|
need_full = (
|
||||||
connected
|
connected
|
||||||
|
|||||||
@@ -37,6 +37,7 @@
|
|||||||
var positionsRendered = false;
|
var positionsRendered = false;
|
||||||
var posFastPollTimer = null;
|
var posFastPollTimer = null;
|
||||||
var posFastPollCount = 0;
|
var posFastPollCount = 0;
|
||||||
|
var posFastPollMax = 120;
|
||||||
var lastPosRowCount = 0;
|
var lastPosRowCount = 0;
|
||||||
var selectedMaxLots = null;
|
var selectedMaxLots = null;
|
||||||
var recommendMaxByProduct = {};
|
var recommendMaxByProduct = {};
|
||||||
@@ -259,16 +260,40 @@
|
|||||||
posFastPollCount = 0;
|
posFastPollCount = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
function startPosFastPoll() {
|
function startPosFastPoll(maxTicks) {
|
||||||
|
if (typeof maxTicks === 'number' && maxTicks > 0) {
|
||||||
|
posFastPollMax = maxTicks;
|
||||||
|
} else {
|
||||||
|
posFastPollMax = 120;
|
||||||
|
}
|
||||||
if (posFastPollTimer) return;
|
if (posFastPollTimer) return;
|
||||||
posFastPollCount = 0;
|
posFastPollCount = 0;
|
||||||
posFastPollTimer = setInterval(function () {
|
posFastPollTimer = setInterval(function () {
|
||||||
pollPositions();
|
pollPositions();
|
||||||
|
refreshCtpAccountQuick();
|
||||||
posFastPollCount += 1;
|
posFastPollCount += 1;
|
||||||
if (posFastPollCount >= 120) stopPosFastPoll();
|
if (posFastPollCount >= posFastPollMax) stopPosFastPoll();
|
||||||
}, 1000);
|
}, 1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
function refreshCtpAccountQuick() {
|
||||||
|
if (!ctpConnected && !ctpConnecting) return;
|
||||||
|
fetch('/api/ctp/status')
|
||||||
|
.then(function (r) { return r.json(); })
|
||||||
|
.then(function (d) {
|
||||||
|
if (!d || !d.account) return;
|
||||||
|
var cap = document.getElementById('cap-display');
|
||||||
|
if (cap && d.account.balance != null) {
|
||||||
|
cap.textContent = Number(d.account.balance).toFixed(2);
|
||||||
|
}
|
||||||
|
var avail = document.getElementById('avail-display');
|
||||||
|
if (avail && d.account.available != null) {
|
||||||
|
avail.textContent = Number(d.account.available).toFixed(2);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch(function () {});
|
||||||
|
}
|
||||||
|
|
||||||
function applyPositionsData(data) {
|
function applyPositionsData(data) {
|
||||||
if (!data) return;
|
if (!data) return;
|
||||||
var cap = document.getElementById('cap-display');
|
var cap = document.getElementById('cap-display');
|
||||||
@@ -302,6 +327,10 @@
|
|||||||
}
|
}
|
||||||
if (connected) {
|
if (connected) {
|
||||||
showCtpError('');
|
showCtpError('');
|
||||||
|
if (data.sync_state === 'syncing') {
|
||||||
|
posFastPollMax = Math.max(posFastPollMax, 240);
|
||||||
|
if (!posFastPollTimer) startPosFastPoll(240);
|
||||||
|
}
|
||||||
} else if (!connecting && data.ctp_status && data.ctp_status.last_error) {
|
} else if (!connecting && data.ctp_status && data.ctp_status.last_error) {
|
||||||
showCtpError(data.ctp_status.last_error);
|
showCtpError(data.ctp_status.last_error);
|
||||||
if (isCtpLoginBanError(data.ctp_status.last_error)) {
|
if (isCtpLoginBanError(data.ctp_status.last_error)) {
|
||||||
@@ -683,7 +712,9 @@
|
|||||||
if (st.connected) {
|
if (st.connected) {
|
||||||
syncCtpBadgeFromStatus(st);
|
syncCtpBadgeFromStatus(st);
|
||||||
showCtpError('');
|
showCtpError('');
|
||||||
|
refreshCtpAccountQuick();
|
||||||
pollPositions();
|
pollPositions();
|
||||||
|
startPosFastPoll(180);
|
||||||
return d;
|
return d;
|
||||||
}
|
}
|
||||||
if ((st.login_cooldown_sec || 0) > 0 || d.cooldown) {
|
if ((st.login_cooldown_sec || 0) > 0 || d.cooldown) {
|
||||||
@@ -693,6 +724,7 @@
|
|||||||
}
|
}
|
||||||
if (d.connecting || st.connecting) {
|
if (d.connecting || st.connecting) {
|
||||||
updateCtpBadge(false, true);
|
updateCtpBadge(false, true);
|
||||||
|
startPosFastPoll(180);
|
||||||
return waitForCtpConnected(70000).then(function (ok) {
|
return waitForCtpConnected(70000).then(function (ok) {
|
||||||
if (!ok && d.error) showCtpError(d.error);
|
if (!ok && d.error) showCtpError(d.error);
|
||||||
else if (!ok && st.last_error) showCtpError(st.last_error);
|
else if (!ok && st.last_error) showCtpError(st.last_error);
|
||||||
|
|||||||
Reference in New Issue
Block a user