Fix server hang: stop CTP reconnect storm and throttle live account polling.
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -18,7 +18,7 @@ from modules.market.market_sessions import (
|
|||||||
is_trading_session,
|
is_trading_session,
|
||||||
should_keep_ctp_connected,
|
should_keep_ctp_connected,
|
||||||
)
|
)
|
||||||
from modules.ctp.vnpy_bridge import ctp_start_connect, ctp_status, ctp_session_needs_reconnect
|
from modules.ctp.vnpy_bridge import ctp_start_connect, ctp_status, ctp_session_needs_reconnect, get_bridge
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -114,6 +114,7 @@ def start_ctp_premarket_connect_worker(
|
|||||||
or is_trading_session()
|
or is_trading_session()
|
||||||
)
|
)
|
||||||
and ctp_session_needs_reconnect(mode)
|
and ctp_session_needs_reconnect(mode)
|
||||||
|
and not get_bridge().connect_in_progress()
|
||||||
):
|
):
|
||||||
info = ctp_start_connect(mode, force=True, scheduled=True)
|
info = ctp_start_connect(mode, force=True, scheduled=True)
|
||||||
if info.get("started") or info.get("connecting"):
|
if info.get("started") or info.get("connecting"):
|
||||||
|
|||||||
@@ -2904,27 +2904,27 @@ def ctp_start_connect(mode: str, *, force: bool = False, scheduled: bool = False
|
|||||||
|
|
||||||
|
|
||||||
def ctp_session_needs_reconnect(mode: str) -> bool:
|
def ctp_session_needs_reconnect(mode: str) -> bool:
|
||||||
"""连接看似在线但不可交易、无资金回报或会话过旧时需强制重连。"""
|
"""连接看似在线但不可交易或会话过旧时需强制重连(连接进行中不重复发起)。"""
|
||||||
if _use_ctp_worker_client():
|
if _use_ctp_worker_client():
|
||||||
st = ctp_ipc_client.status(mode)
|
st = ctp_ipc_client.status(mode)
|
||||||
|
if st.get("connecting"):
|
||||||
|
return False
|
||||||
if not st.get("connected"):
|
if not st.get("connected"):
|
||||||
return True
|
return True
|
||||||
if not st.get("td_logged_in"):
|
if not st.get("td_logged_in"):
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
b = get_bridge()
|
b = get_bridge()
|
||||||
|
if b.connect_in_progress():
|
||||||
|
return False
|
||||||
if b.connected_mode != mode:
|
if b.connected_mode != mode:
|
||||||
return True
|
return True
|
||||||
|
since_connect = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0)
|
||||||
|
if since_connect < 120:
|
||||||
|
return False
|
||||||
if not b._td_logged_in():
|
if not b._td_logged_in():
|
||||||
return True
|
return True
|
||||||
age = time.time() - float(getattr(b, "_last_connect_ok_ts", 0) or 0)
|
if since_connect > 3 * 3600:
|
||||||
if age > 3 * 3600:
|
|
||||||
return True
|
|
||||||
try:
|
|
||||||
acc = b.get_account()
|
|
||||||
if float(acc.get("balance") or 0) <= 0:
|
|
||||||
return True
|
|
||||||
except Exception:
|
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|||||||
@@ -2955,13 +2955,26 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
def recommend_page():
|
def recommend_page():
|
||||||
return redirect(url_for("positions") + "#recommend")
|
return redirect(url_for("positions") + "#recommend")
|
||||||
|
|
||||||
|
def _maybe_refresh_live_account(payload: dict, mode: str) -> dict:
|
||||||
|
"""轻量刷新权益:快照较新时跳过,避免阻塞 HTTP 线程。"""
|
||||||
|
if not ctp_status(mode).get("connected"):
|
||||||
|
return payload
|
||||||
|
if payload.get("capital") and position_hub.snapshot_age_sec() < 3.0:
|
||||||
|
return payload
|
||||||
|
if not _live_refresh_lock.acquire(blocking=False):
|
||||||
|
return payload
|
||||||
|
try:
|
||||||
|
return _apply_live_account(dict(payload), mode)
|
||||||
|
finally:
|
||||||
|
_live_refresh_lock.release()
|
||||||
|
|
||||||
@app.route("/api/trading/live")
|
@app.route("/api/trading/live")
|
||||||
@login_required
|
@login_required
|
||||||
def api_trading_live():
|
def api_trading_live():
|
||||||
mode = get_trading_mode(get_setting)
|
mode = get_trading_mode(get_setting)
|
||||||
snap = position_hub.get_snapshot()
|
snap = position_hub.get_snapshot()
|
||||||
if snap:
|
if snap:
|
||||||
payload = _apply_live_account(dict(snap), mode)
|
payload = _maybe_refresh_live_account(dict(snap), mode)
|
||||||
return jsonify(_normalize_live_payload(payload))
|
return jsonify(_normalize_live_payload(payload))
|
||||||
payload = _refresh_trading_live_snapshot(fast=True)
|
payload = _refresh_trading_live_snapshot(fast=True)
|
||||||
payload = _normalize_live_payload(payload)
|
payload = _normalize_live_payload(payload)
|
||||||
@@ -2993,7 +3006,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
mode = get_trading_mode(get_setting)
|
mode = get_trading_mode(get_setting)
|
||||||
yield sse_format(
|
yield sse_format(
|
||||||
"positions",
|
"positions",
|
||||||
_apply_live_account(dict(snap), mode),
|
_maybe_refresh_live_account(dict(snap), mode),
|
||||||
)
|
)
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -45,6 +45,12 @@ class PositionStreamHub:
|
|||||||
with self._lock:
|
with self._lock:
|
||||||
return dict(self._snapshot) if self._snapshot else None
|
return dict(self._snapshot) if self._snapshot else None
|
||||||
|
|
||||||
|
def snapshot_age_sec(self) -> float:
|
||||||
|
with self._lock:
|
||||||
|
if not self._snapshot_ts:
|
||||||
|
return 9999.0
|
||||||
|
return max(0.0, time.time() - self._snapshot_ts)
|
||||||
|
|
||||||
def set_snapshot(self, data: dict) -> None:
|
def set_snapshot(self, data: dict) -> None:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
self._snapshot = dict(data)
|
self._snapshot = dict(data)
|
||||||
|
|||||||
@@ -270,10 +270,10 @@
|
|||||||
posFastPollCount = 0;
|
posFastPollCount = 0;
|
||||||
posFastPollTimer = setInterval(function () {
|
posFastPollTimer = setInterval(function () {
|
||||||
pollPositions();
|
pollPositions();
|
||||||
refreshCtpAccountQuick();
|
if (posFastPollCount % 3 === 0) refreshCtpAccountQuick();
|
||||||
posFastPollCount += 1;
|
posFastPollCount += 1;
|
||||||
if (posFastPollCount >= posFastPollMax) stopPosFastPoll();
|
if (posFastPollCount >= posFastPollMax) stopPosFastPoll();
|
||||||
}, 1000);
|
}, 2000);
|
||||||
}
|
}
|
||||||
|
|
||||||
function refreshCtpAccountQuick() {
|
function refreshCtpAccountQuick() {
|
||||||
@@ -328,8 +328,8 @@
|
|||||||
if (connected) {
|
if (connected) {
|
||||||
showCtpError('');
|
showCtpError('');
|
||||||
if (data.sync_state === 'syncing') {
|
if (data.sync_state === 'syncing') {
|
||||||
posFastPollMax = Math.max(posFastPollMax, 240);
|
posFastPollMax = Math.max(posFastPollMax, 90);
|
||||||
if (!posFastPollTimer) startPosFastPoll(240);
|
if (!posFastPollTimer) startPosFastPoll(90);
|
||||||
}
|
}
|
||||||
} else if (!connecting && data.ctp_status && data.ctp_status.last_error) {
|
} else if (!connecting && data.ctp_status && data.ctp_status.last_error) {
|
||||||
showCtpError(data.ctp_status.last_error);
|
showCtpError(data.ctp_status.last_error);
|
||||||
@@ -714,7 +714,7 @@
|
|||||||
showCtpError('');
|
showCtpError('');
|
||||||
refreshCtpAccountQuick();
|
refreshCtpAccountQuick();
|
||||||
pollPositions();
|
pollPositions();
|
||||||
startPosFastPoll(180);
|
startPosFastPoll(60);
|
||||||
return d;
|
return d;
|
||||||
}
|
}
|
||||||
if ((st.login_cooldown_sec || 0) > 0 || d.cooldown) {
|
if ((st.login_cooldown_sec || 0) > 0 || d.cooldown) {
|
||||||
@@ -724,7 +724,7 @@
|
|||||||
}
|
}
|
||||||
if (d.connecting || st.connecting) {
|
if (d.connecting || st.connecting) {
|
||||||
updateCtpBadge(false, true);
|
updateCtpBadge(false, true);
|
||||||
startPosFastPoll(180);
|
startPosFastPoll(60);
|
||||||
return waitForCtpConnected(70000).then(function (ok) {
|
return waitForCtpConnected(70000).then(function (ok) {
|
||||||
if (!ok && d.error) showCtpError(d.error);
|
if (!ok && d.error) showCtpError(d.error);
|
||||||
else if (!ok && st.last_error) showCtpError(st.last_error);
|
else if (!ok && st.last_error) showCtpError(st.last_error);
|
||||||
|
|||||||
Reference in New Issue
Block a user