Fix slow positions and DB lock on fast live refresh by skipping writes.
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+70
-20
@@ -53,7 +53,7 @@ from order_pending import (
|
|||||||
pending_monitor_has_live_order,
|
pending_monitor_has_live_order,
|
||||||
reconcile_pending_orders,
|
reconcile_pending_orders,
|
||||||
)
|
)
|
||||||
from db_conn import execute_retry
|
from db_conn import commit_retry, execute_retry
|
||||||
from sl_tp_guard import (
|
from sl_tp_guard import (
|
||||||
cancel_monitor_exit_orders,
|
cancel_monitor_exit_orders,
|
||||||
ensure_monitor_order_columns,
|
ensure_monitor_order_columns,
|
||||||
@@ -620,7 +620,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
rsl, rtp, rtrail, rinitial = _restore_sl_tp_from_closed(conn, sym, direction)
|
rsl, rtp, rtrail, rinitial = _restore_sl_tp_from_closed(conn, sym, direction)
|
||||||
if rsl is None and rtp is None and not rtrail:
|
if rsl is None and rtp is None and not rtrail:
|
||||||
return mon
|
return mon
|
||||||
conn.execute(
|
execute_retry(
|
||||||
|
conn,
|
||||||
"""UPDATE trade_order_monitors SET
|
"""UPDATE trade_order_monitors SET
|
||||||
stop_loss=?, take_profit=?, trailing_be=?, initial_stop_loss=?
|
stop_loss=?, take_profit=?, trailing_be=?, initial_stop_loss=?
|
||||||
WHERE id=? AND status='active'""",
|
WHERE id=? AND status='active'""",
|
||||||
@@ -833,6 +834,40 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
).fetchone()
|
).fetchone()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _overlay_sl_tp_readonly(
|
||||||
|
conn,
|
||||||
|
mon: Optional[dict],
|
||||||
|
sym: str,
|
||||||
|
direction: str,
|
||||||
|
) -> Optional[dict]:
|
||||||
|
"""只读:从已关闭监控补全止盈止损,不写库。"""
|
||||||
|
if not mon:
|
||||||
|
rsl, rtp, rtrail, rinitial = _restore_sl_tp_from_closed(conn, sym, direction)
|
||||||
|
if rsl is None and rtp is None and not rtrail:
|
||||||
|
return None
|
||||||
|
return {
|
||||||
|
"symbol": sym,
|
||||||
|
"direction": direction,
|
||||||
|
"stop_loss": rsl,
|
||||||
|
"take_profit": rtp,
|
||||||
|
"trailing_be": rtrail,
|
||||||
|
"initial_stop_loss": rinitial,
|
||||||
|
}
|
||||||
|
sl = mon.get("stop_loss")
|
||||||
|
tp = mon.get("take_profit")
|
||||||
|
trailing = int(mon.get("trailing_be") or 0)
|
||||||
|
if sl is not None or tp is not None or trailing:
|
||||||
|
return mon
|
||||||
|
rsl, rtp, rtrail, rinitial = _restore_sl_tp_from_closed(conn, sym, direction)
|
||||||
|
if rsl is None and rtp is None and not rtrail:
|
||||||
|
return mon
|
||||||
|
merged = dict(mon)
|
||||||
|
merged["stop_loss"] = rsl
|
||||||
|
merged["take_profit"] = rtp
|
||||||
|
merged["trailing_be"] = rtrail
|
||||||
|
merged["initial_stop_loss"] = rinitial
|
||||||
|
return merged
|
||||||
|
|
||||||
def _revive_closed_monitor(conn, symbol: str, direction: str) -> Optional[dict]:
|
def _revive_closed_monitor(conn, symbol: str, direction: str) -> Optional[dict]:
|
||||||
"""柜台仍有持仓但本地监控被误关时,恢复最近一条同品种记录。"""
|
"""柜台仍有持仓但本地监控被误关时,恢复最近一条同品种记录。"""
|
||||||
direction = (direction or "long").strip().lower()
|
direction = (direction or "long").strip().lower()
|
||||||
@@ -846,7 +881,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
continue
|
continue
|
||||||
if int(row.get("lots") or 0) <= 0:
|
if int(row.get("lots") or 0) <= 0:
|
||||||
continue
|
continue
|
||||||
conn.execute(
|
execute_retry(
|
||||||
|
conn,
|
||||||
"UPDATE trade_order_monitors SET status='active' WHERE id=?",
|
"UPDATE trade_order_monitors SET status='active' WHERE id=?",
|
||||||
(row["id"],),
|
(row["id"],),
|
||||||
)
|
)
|
||||||
@@ -1583,7 +1619,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
ctp_list = trading_state.get_positions()
|
ctp_list = trading_state.get_positions()
|
||||||
if not ctp_list:
|
if not ctp_list:
|
||||||
ctp_list = _ctp_positions(
|
ctp_list = _ctp_positions(
|
||||||
mode, refresh_if_empty=not fast, refresh_margin=not fast,
|
mode, refresh_if_empty=True, refresh_margin=not fast,
|
||||||
)
|
)
|
||||||
|
|
||||||
rows: list[dict] = []
|
rows: list[dict] = []
|
||||||
@@ -1605,18 +1641,26 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
if not mon:
|
if not mon:
|
||||||
mon = _find_pending_monitor(conn, ths, direction)
|
mon = _find_pending_monitor(conn, ths, direction)
|
||||||
if not mon:
|
if not mon:
|
||||||
mon = _find_or_revive_monitor(conn, ths, direction)
|
if fast:
|
||||||
|
mon = _find_active_monitor(conn, ths, direction)
|
||||||
|
else:
|
||||||
|
mon = _find_or_revive_monitor(conn, ths, direction)
|
||||||
if mon:
|
if mon:
|
||||||
mon = _restore_monitor_sl_tp_if_missing(conn, mon, ths, direction) or mon
|
if fast:
|
||||||
_sync_monitor_from_ctp(
|
mon = _overlay_sl_tp_readonly(conn, mon, ths, direction) or mon
|
||||||
conn, int(mon["id"]), mon.get("symbol") or ths,
|
else:
|
||||||
mon.get("direction") or direction,
|
mon = _restore_monitor_sl_tp_if_missing(conn, mon, ths, direction) or mon
|
||||||
mode, ctp=p, capital=capital,
|
_sync_monitor_from_ctp(
|
||||||
)
|
conn, int(mon["id"]), mon.get("symbol") or ths,
|
||||||
mon = _find_active_monitor(
|
mon.get("direction") or direction,
|
||||||
conn, mon.get("symbol") or ths, mon.get("direction") or direction,
|
mode, ctp=p, capital=capital,
|
||||||
) or mon
|
)
|
||||||
mon = _restore_monitor_sl_tp_if_missing(conn, mon, ths, direction) or mon
|
mon = _find_active_monitor(
|
||||||
|
conn, mon.get("symbol") or ths, mon.get("direction") or direction,
|
||||||
|
) or mon
|
||||||
|
mon = _restore_monitor_sl_tp_if_missing(conn, mon, ths, direction) or mon
|
||||||
|
elif fast:
|
||||||
|
mon = _overlay_sl_tp_readonly(conn, None, ths, direction)
|
||||||
try:
|
try:
|
||||||
row = _compose_position_row(
|
row = _compose_position_row(
|
||||||
conn, mon=mon, ctp=p, mode=mode, capital=capital,
|
conn, mon=mon, ctp=p, mode=mode, capital=capital,
|
||||||
@@ -1654,7 +1698,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
conn, mode=mode, capital=capital, now_iso=now_iso,
|
conn, mode=mode, capital=capital, now_iso=now_iso,
|
||||||
)
|
)
|
||||||
rows = _apply_account_margin_to_rows(rows, mode, capital)
|
rows = _apply_account_margin_to_rows(rows, mode, capital)
|
||||||
_persist_ctp_snapshot_to_monitors(conn, rows, mode)
|
if not fast:
|
||||||
|
_persist_ctp_snapshot_to_monitors(conn, rows, mode)
|
||||||
pending_orders = _build_pending_orders(conn, mode)
|
pending_orders = _build_pending_orders(conn, mode)
|
||||||
risk = get_risk_status(
|
risk = get_risk_status(
|
||||||
conn,
|
conn,
|
||||||
@@ -1681,7 +1726,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
def _refresh_trading_live_snapshot(*, fast: bool = False) -> dict:
|
def _refresh_trading_live_snapshot(*, fast: bool = False) -> dict:
|
||||||
mode = get_trading_mode(get_setting)
|
mode = get_trading_mode(get_setting)
|
||||||
if ctp_status(mode).get("connected"):
|
if ctp_status(mode).get("connected"):
|
||||||
if not fast:
|
if not fast or trading_state.sync_state == "idle":
|
||||||
try:
|
try:
|
||||||
with _ctp_td_lock:
|
with _ctp_td_lock:
|
||||||
get_bridge().calibrate_trading_state()
|
get_bridge().calibrate_trading_state()
|
||||||
@@ -1698,7 +1743,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
try:
|
try:
|
||||||
init_strategy_tables(conn)
|
init_strategy_tables(conn)
|
||||||
payload = _build_trading_live_payload(conn, fast=fast)
|
payload = _build_trading_live_payload(conn, fast=fast)
|
||||||
conn.commit()
|
commit_retry(conn)
|
||||||
return payload
|
return payload
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
@@ -1837,11 +1882,16 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
|
|
||||||
def _after_connect() -> None:
|
def _after_connect() -> None:
|
||||||
try:
|
try:
|
||||||
|
try:
|
||||||
|
with _ctp_td_lock:
|
||||||
|
get_bridge().calibrate_trading_state()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.debug("ctp connected calibrate: %s", exc)
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
try:
|
try:
|
||||||
init_strategy_tables(conn)
|
init_strategy_tables(conn)
|
||||||
_ensure_monitors_from_ctp(conn, mode)
|
_ensure_monitors_from_ctp(conn, mode)
|
||||||
conn.commit()
|
commit_retry(conn)
|
||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
_push_position_snapshot_async(fast=False)
|
_push_position_snapshot_async(fast=False)
|
||||||
@@ -1937,7 +1987,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
try:
|
try:
|
||||||
init_strategy_tables(conn)
|
init_strategy_tables(conn)
|
||||||
payload = _build_trading_live_payload(conn, fast=True)
|
payload = _build_trading_live_payload(conn, fast=True)
|
||||||
conn.commit()
|
commit_retry(conn)
|
||||||
position_hub.set_snapshot(payload)
|
position_hub.set_snapshot(payload)
|
||||||
return jsonify(payload)
|
return jsonify(payload)
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
Reference in New Issue
Block a user