Fix SQLite lock errors on /api/stats under concurrent writes.
Retry stats cache commits, serialize refresh, and fall back to read-only compute so the stats API does not return 500 when the database is briefly locked. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -45,7 +45,7 @@ from fee_specs import (
|
|||||||
purge_non_ctp_fee_rates,
|
purge_non_ctp_fee_rates,
|
||||||
)
|
)
|
||||||
from nav_settings import NAV_TOGGLES, get_nav_items, nav_enabled, save_nav_items
|
from nav_settings import NAV_TOGGLES, get_nav_items, nav_enabled, save_nav_items
|
||||||
from stats_engine import STATS_VIEWS, load_stats_cache, refresh_stats_cache
|
from stats_engine import STATS_VIEWS, build_all_stats, load_stats_cache, refresh_stats_cache
|
||||||
from kline_store import ensure_kline_tables
|
from kline_store import ensure_kline_tables
|
||||||
from kline_stream import kline_hub, sse_format
|
from kline_stream import kline_hub, sse_format
|
||||||
from kline_chart import generate_review_kline_chart, fetch_market_klines, MARKET_PERIODS
|
from kline_chart import generate_review_kline_chart, fetch_market_klines, MARKET_PERIODS
|
||||||
@@ -230,12 +230,20 @@ def touch_stats_cache():
|
|||||||
|
|
||||||
def get_stats_data() -> dict:
|
def get_stats_data() -> dict:
|
||||||
conn = get_db()
|
conn = get_db()
|
||||||
capital = float(get_setting("live_capital", "0") or 0)
|
try:
|
||||||
data = load_stats_cache(conn)
|
capital = float(get_setting("live_capital", "0") or 0)
|
||||||
if not data:
|
data = load_stats_cache(conn)
|
||||||
data = refresh_stats_cache(conn, capital)
|
if data:
|
||||||
conn.close()
|
return data
|
||||||
return data
|
try:
|
||||||
|
return refresh_stats_cache(conn, capital)
|
||||||
|
except sqlite3.OperationalError as exc:
|
||||||
|
if "locked" not in str(exc).lower():
|
||||||
|
raise
|
||||||
|
app.logger.warning("stats cache refresh locked, compute without save: %s", exc)
|
||||||
|
return build_all_stats(conn, capital)
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
|
||||||
def init_db():
|
def init_db():
|
||||||
|
|||||||
+23
@@ -47,3 +47,26 @@ def execute_retry(
|
|||||||
if last_exc:
|
if last_exc:
|
||||||
raise last_exc
|
raise last_exc
|
||||||
raise sqlite3.OperationalError("database is locked")
|
raise sqlite3.OperationalError("database is locked")
|
||||||
|
|
||||||
|
|
||||||
|
def commit_retry(
|
||||||
|
conn: sqlite3.Connection,
|
||||||
|
*,
|
||||||
|
retries: int = 6,
|
||||||
|
base_delay: float = 0.05,
|
||||||
|
) -> None:
|
||||||
|
"""遇 database is locked 时短暂退避重试 commit。"""
|
||||||
|
last_exc: Exception | None = None
|
||||||
|
for attempt in range(retries):
|
||||||
|
try:
|
||||||
|
conn.commit()
|
||||||
|
return
|
||||||
|
except sqlite3.OperationalError as exc:
|
||||||
|
if "locked" not in str(exc).lower():
|
||||||
|
raise
|
||||||
|
last_exc = exc
|
||||||
|
if attempt < retries - 1:
|
||||||
|
time.sleep(base_delay * (attempt + 1))
|
||||||
|
if last_exc:
|
||||||
|
raise last_exc
|
||||||
|
raise sqlite3.OperationalError("database is locked")
|
||||||
|
|||||||
+12
-5
@@ -7,11 +7,16 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import json
|
import json
|
||||||
|
import threading
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Any, Optional
|
from typing import Any, Optional
|
||||||
|
|
||||||
from zoneinfo import ZoneInfo
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
from db_conn import commit_retry, execute_retry
|
||||||
|
|
||||||
|
_stats_refresh_lock = threading.Lock()
|
||||||
|
|
||||||
TZ = ZoneInfo("Asia/Shanghai")
|
TZ = ZoneInfo("Asia/Shanghai")
|
||||||
|
|
||||||
STATS_VIEWS = [
|
STATS_VIEWS = [
|
||||||
@@ -288,13 +293,14 @@ def build_all_stats(conn, live_capital: float = 0.0) -> dict:
|
|||||||
|
|
||||||
|
|
||||||
def save_stats_cache(conn, data: dict) -> None:
|
def save_stats_cache(conn, data: dict) -> None:
|
||||||
conn.execute(
|
execute_retry(
|
||||||
|
conn,
|
||||||
"""INSERT INTO stats_cache (key, data_json, updated_at)
|
"""INSERT INTO stats_cache (key, data_json, updated_at)
|
||||||
VALUES ('all', ?, ?)
|
VALUES ('all', ?, ?)
|
||||||
ON CONFLICT(key) DO UPDATE SET data_json=excluded.data_json, updated_at=excluded.updated_at""",
|
ON CONFLICT(key) DO UPDATE SET data_json=excluded.data_json, updated_at=excluded.updated_at""",
|
||||||
(json.dumps(data, ensure_ascii=False), data["updated_at"]),
|
(json.dumps(data, ensure_ascii=False), data["updated_at"]),
|
||||||
)
|
)
|
||||||
conn.commit()
|
commit_retry(conn)
|
||||||
|
|
||||||
|
|
||||||
def load_stats_cache(conn) -> Optional[dict]:
|
def load_stats_cache(conn) -> Optional[dict]:
|
||||||
@@ -310,6 +316,7 @@ def load_stats_cache(conn) -> Optional[dict]:
|
|||||||
|
|
||||||
|
|
||||||
def refresh_stats_cache(conn, live_capital: float = 0.0) -> dict:
|
def refresh_stats_cache(conn, live_capital: float = 0.0) -> dict:
|
||||||
data = build_all_stats(conn, live_capital)
|
with _stats_refresh_lock:
|
||||||
save_stats_cache(conn, data)
|
data = build_all_stats(conn, live_capital)
|
||||||
return data
|
save_stats_cache(conn, data)
|
||||||
|
return data
|
||||||
|
|||||||
Reference in New Issue
Block a user