fix: avoid SQLite lock on fast position poll by skipping DB writes
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+25
@@ -3,6 +3,7 @@ from __future__ import annotations
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
import sqlite3
|
import sqlite3
|
||||||
|
import time
|
||||||
|
|
||||||
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db")
|
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db")
|
||||||
|
|
||||||
@@ -17,3 +18,27 @@ def connect_db(path: str | None = None) -> sqlite3.Connection:
|
|||||||
except sqlite3.OperationalError:
|
except sqlite3.OperationalError:
|
||||||
pass
|
pass
|
||||||
return conn
|
return conn
|
||||||
|
|
||||||
|
|
||||||
|
def execute_retry(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
sql: str,
|
||||||
|
params: tuple = (),
|
||||||
|
*,
|
||||||
|
retries: int = 6,
|
||||||
|
base_delay: float = 0.05,
|
||||||
|
) -> sqlite3.Cursor:
|
||||||
|
"""遇 database is locked 时短暂退避重试。"""
|
||||||
|
last_exc: Exception | None = None
|
||||||
|
for attempt in range(retries):
|
||||||
|
try:
|
||||||
|
return conn.execute(sql, params)
|
||||||
|
except sqlite3.OperationalError as exc:
|
||||||
|
if "locked" not in str(exc).lower():
|
||||||
|
raise
|
||||||
|
last_exc = exc
|
||||||
|
if attempt < retries - 1:
|
||||||
|
time.sleep(base_delay * (attempt + 1))
|
||||||
|
if last_exc:
|
||||||
|
raise last_exc
|
||||||
|
raise sqlite3.OperationalError("database is locked")
|
||||||
|
|||||||
+4
-3
@@ -33,6 +33,7 @@ from position_stream import position_hub, start_position_worker
|
|||||||
from ctp_reconnect import start_ctp_reconnect_worker
|
from ctp_reconnect import start_ctp_reconnect_worker
|
||||||
from ctp_premarket_connect import start_ctp_premarket_connect_worker
|
from ctp_premarket_connect import start_ctp_premarket_connect_worker
|
||||||
from ctp_fee_worker import start_ctp_fee_worker
|
from ctp_fee_worker import start_ctp_fee_worker
|
||||||
|
from db_conn import execute_retry
|
||||||
from sl_tp_guard import (
|
from sl_tp_guard import (
|
||||||
cancel_monitor_exit_orders,
|
cancel_monitor_exit_orders,
|
||||||
ensure_monitor_order_columns,
|
ensure_monitor_order_columns,
|
||||||
@@ -480,7 +481,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
open_time_val = ctp_open
|
open_time_val = ctp_open
|
||||||
elif ctp_open:
|
elif ctp_open:
|
||||||
open_time_val = ctp_open
|
open_time_val = ctp_open
|
||||||
conn.execute(
|
execute_retry(
|
||||||
|
conn,
|
||||||
"""UPDATE trade_order_monitors SET lots=?, entry_price=?,
|
"""UPDATE trade_order_monitors SET lots=?, entry_price=?,
|
||||||
open_time=? WHERE id=?""",
|
open_time=? WHERE id=?""",
|
||||||
(
|
(
|
||||||
@@ -684,7 +686,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
break
|
break
|
||||||
elif key in ctp_by_key:
|
elif key in ctp_by_key:
|
||||||
used_ctp_keys.add(key)
|
used_ctp_keys.add(key)
|
||||||
if ctp and mon:
|
if ctp and mon and not fast:
|
||||||
_sync_monitor_lots_from_ctp(
|
_sync_monitor_lots_from_ctp(
|
||||||
conn, int(mon["id"]), mon.get("symbol") or "",
|
conn, int(mon["id"]), mon.get("symbol") or "",
|
||||||
mon.get("direction") or "long", mode, ctp=ctp,
|
mon.get("direction") or "long", mode, ctp=ctp,
|
||||||
@@ -896,7 +898,6 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
try:
|
try:
|
||||||
init_strategy_tables(conn)
|
init_strategy_tables(conn)
|
||||||
payload = _build_trading_live_payload(conn, fast=True)
|
payload = _build_trading_live_payload(conn, fast=True)
|
||||||
conn.commit()
|
|
||||||
position_hub.set_snapshot(payload)
|
position_hub.set_snapshot(payload)
|
||||||
return jsonify(payload)
|
return jsonify(payload)
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
+5
-4
@@ -606,12 +606,13 @@ def check_monitors_locally(
|
|||||||
if not is_trading_session():
|
if not is_trading_session():
|
||||||
return 0
|
return 0
|
||||||
reconcile_monitors_without_position(conn, mode)
|
reconcile_monitors_without_position(conn, mode)
|
||||||
|
conn.commit()
|
||||||
closed = 0
|
closed = 0
|
||||||
rows = conn.execute(
|
rows = [dict(r) for r in conn.execute(
|
||||||
"SELECT * FROM trade_order_monitors WHERE status='active'"
|
"SELECT * FROM trade_order_monitors WHERE status='active'"
|
||||||
).fetchall()
|
).fetchall()]
|
||||||
for r in rows:
|
conn.commit()
|
||||||
mon = dict(r)
|
for mon in rows:
|
||||||
mid = int(mon.get("id") or 0)
|
mid = int(mon.get("id") or 0)
|
||||||
sym = (mon.get("symbol") or "").strip()
|
sym = (mon.get("symbol") or "").strip()
|
||||||
direction = (mon.get("direction") or "long").strip().lower()
|
direction = (mon.get("direction") or "long").strip().lower()
|
||||||
|
|||||||
Reference in New Issue
Block a user