From 55d95b4c399d14dd09afce4bae007ffedf5ae6d5 Mon Sep 17 00:00:00 2001 From: dekun Date: Wed, 24 Jun 2026 10:30:26 +0800 Subject: [PATCH] =?UTF-8?q?=E8=BF=9B=E4=B8=80=E6=AD=A5=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=20SQLite=20=E5=B9=B6=E5=8F=91=E9=94=81=E5=86=B2=E7=AA=81?= =?UTF-8?q?=EF=BC=8C=E7=BB=9F=E4=B8=80=E8=BF=9E=E6=8E=A5=E4=B8=8E=E9=87=8D?= =?UTF-8?q?=E8=AF=95=E6=9C=BA=E5=88=B6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 db_conn 模块、缓存 schema 初始化、positions 页 commit,风控读库自动重试。 Co-authored-by: Cursor --- app.py | 13 ++-- db_conn.py | 19 ++++++ fee_specs.py | 6 +- install_trading.py | 71 +++++++++++---------- kline_chart.py | 7 ++- kline_stream.py | 4 +- risk/account_risk_lib.py | 131 +++++++++++++++++++++++---------------- static/js/trade.js | 2 +- strategy/strategy_db.py | 8 +++ 9 files changed, 155 insertions(+), 106 deletions(-) create mode 100644 db_conn.py diff --git a/app.py b/app.py index 435c928..612f24a 100644 --- a/app.py +++ b/app.py @@ -34,6 +34,7 @@ from kline_store import ensure_kline_tables from kline_stream import kline_hub, sse_format from kline_chart import generate_review_kline_chart, fetch_market_klines, MARKET_PERIODS from market import get_price as market_get_price, set_ths_refresh_token, get_quote_source_label +from db_conn import connect_db from strategy.strategy_db import init_strategy_tables from install_trading import install_trading from vnpy_bridge import try_init_vnpy @@ -155,17 +156,9 @@ def expire_old_plans(): conn.commit() conn.close() -# —————————————— 设置读写 —————————————— def get_db(): - conn = sqlite3.connect(DB_PATH, timeout=30) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA busy_timeout=30000") - try: - conn.execute("PRAGMA journal_mode=WAL") - except sqlite3.OperationalError: - pass - return conn + return connect_db() def get_setting(key: str, default: str = "") -> str: @@ -315,6 +308,8 @@ def init_db(): updated_at TEXT NOT NULL)''') ensure_kline_tables(conn) init_strategy_tables(conn) + from risk.account_risk_lib import ensure_account_risk_schema + ensure_account_risk_schema(conn) conn.commit() conn.close() diff --git a/db_conn.py b/db_conn.py new file mode 100644 index 0000000..feb2454 --- /dev/null +++ b/db_conn.py @@ -0,0 +1,19 @@ +"""SQLite 连接统一配置(WAL + busy_timeout,降低并发锁冲突)。""" +from __future__ import annotations + +import os +import sqlite3 + +DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db") + + +def connect_db(path: str | None = None) -> sqlite3.Connection: + db_path = path or DB_PATH + conn = sqlite3.connect(db_path, timeout=30, check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA busy_timeout=30000") + try: + conn.execute("PRAGMA journal_mode=WAL") + except sqlite3.OperationalError: + pass + return conn diff --git a/fee_specs.py b/fee_specs.py index d37aece..5559edc 100644 --- a/fee_specs.py +++ b/fee_specs.py @@ -8,6 +8,8 @@ from typing import Optional from contract_specs import get_contract_spec +from db_conn import connect_db + DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db") DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") DEFAULT_JSON = os.path.join(DATA_DIR, "fee_rates.json") @@ -32,9 +34,7 @@ def product_from_code(ths_code: str) -> str: def _get_db(): - conn = sqlite3.connect(DB_PATH) - conn.row_factory = sqlite3.Row - return conn + return connect_db() def get_fee_multiplier() -> float: diff --git a/install_trading.py b/install_trading.py index a426613..1e48da1 100644 --- a/install_trading.py +++ b/install_trading.py @@ -282,40 +282,43 @@ def install_trading(app, *, login_required, get_db, get_setting, set_setting, fe @login_required def positions(): conn = get_db() - init_strategy_tables(conn) - mode = get_trading_mode(get_setting) - ctp_st = ctp_status(mode) - capital = _capital(conn) - risk = get_risk_status(conn) - ctp_acc = _ctp_account(mode) if ctp_st.get("connected") else {} - recommend_rows = list_product_recommendations(capital, _main_price) - active_trend = conn.execute( - "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1" - ).fetchone() - monitor_count = conn.execute( - "SELECT COUNT(*) AS n FROM trade_order_monitors WHERE status='active'" - ).fetchone()["n"] - roll_count = conn.execute( - "SELECT COUNT(*) AS n FROM roll_groups WHERE status='active'" - ).fetchone()["n"] - conn.close() - sizing = get_sizing_mode(get_setting) - return render_template( - "trade.html", - trading_mode=mode, - trading_mode_label=trading_mode_label(get_setting), - capital=capital, - risk_status=risk, - ctp_status=ctp_st, - ctp_account=ctp_acc, - recommend_rows=recommend_rows, - active_trend=dict(active_trend) if active_trend else None, - monitor_count=monitor_count, - roll_count=roll_count, - sizing_mode=sizing, - sizing_mode_label="以损定仓" if sizing == MODE_RISK else "固定张数", - risk_percent=get_risk_percent(get_setting), - ) + try: + init_strategy_tables(conn) + mode = get_trading_mode(get_setting) + ctp_st = ctp_status(mode) + capital = _capital(conn) + risk = get_risk_status(conn) + ctp_acc = _ctp_account(mode) if ctp_st.get("connected") else {} + recommend_rows = list_product_recommendations(capital, _main_price) + active_trend = conn.execute( + "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1" + ).fetchone() + monitor_count = conn.execute( + "SELECT COUNT(*) AS n FROM trade_order_monitors WHERE status='active'" + ).fetchone()["n"] + roll_count = conn.execute( + "SELECT COUNT(*) AS n FROM roll_groups WHERE status='active'" + ).fetchone()["n"] + conn.commit() + sizing = get_sizing_mode(get_setting) + return render_template( + "trade.html", + trading_mode=mode, + trading_mode_label=trading_mode_label(get_setting), + capital=capital, + risk_status=risk, + ctp_status=ctp_st, + ctp_account=ctp_acc, + recommend_rows=recommend_rows, + active_trend=dict(active_trend) if active_trend else None, + monitor_count=monitor_count, + roll_count=roll_count, + sizing_mode=sizing, + sizing_mode_label="以损定仓" if sizing == MODE_RISK else "固定张数", + risk_percent=get_risk_percent(get_setting), + ) + finally: + conn.close() @app.route("/recommend") @login_required diff --git a/kline_chart.py b/kline_chart.py index 6d6f237..8c6ff1c 100644 --- a/kline_chart.py +++ b/kline_chart.py @@ -11,6 +11,7 @@ from zoneinfo import ZoneInfo import requests from symbols import ths_to_codes +from db_conn import connect_db from kline_store import ensure_kline_tables, get_cached_entry, save_bars logger = logging.getLogger(__name__) @@ -234,7 +235,7 @@ def fetch_market_klines( if db_path and chart_sym and not force_remote: try: - conn = sqlite3.connect(db_path) + conn = connect_db(db_path) cached = get_cached_entry(conn, chart_sym, p) conn.close() if cached and cached.get("fresh"): @@ -251,7 +252,7 @@ def fetch_market_klines( source = "remote" if db_path and chart_sym: try: - conn = sqlite3.connect(db_path) + conn = connect_db(db_path) ensure_kline_tables(conn) save_bars(conn, chart_sym, p, remote_bars) meta = conn.execute( @@ -264,7 +265,7 @@ def fetch_market_klines( logger.warning("kline cache write failed %s %s: %s", chart_sym, p, exc) elif not bars and db_path and chart_sym: try: - conn = sqlite3.connect(db_path) + conn = connect_db(db_path) cached = get_cached_entry(conn, chart_sym, p) conn.close() if cached and cached.get("bars"): diff --git a/kline_stream.py b/kline_stream.py index 28030d5..4ba4244 100644 --- a/kline_stream.py +++ b/kline_stream.py @@ -102,8 +102,8 @@ class KlineStreamHub: if is_trading_session() and sub.period in FAST_PERIODS: return True try: - import sqlite3 - conn = sqlite3.connect(db_path) + from db_conn import connect_db + conn = connect_db(db_path) ensure_kline_tables(conn) meta = load_meta(conn, chart_sym, sub.period) conn.close() diff --git a/risk/account_risk_lib.py b/risk/account_risk_lib.py index a9c1050..a3fc913 100644 --- a/risk/account_risk_lib.py +++ b/risk/account_risk_lib.py @@ -2,10 +2,14 @@ from __future__ import annotations import os +import sqlite3 +import time from datetime import datetime -from typing import Any, Optional +from typing import Any, Callable, Optional, TypeVar from zoneinfo import ZoneInfo +T = TypeVar("T") + STATUS_NORMAL = "normal" STATUS_FREEZE_1H = "freeze_1h" STATUS_FREEZE_4H = "freeze_4h" @@ -79,6 +83,21 @@ def trading_day_reset_hour() -> int: _SCHEMA_READY = False +def _db_retry(action: Callable[[], T], *, retries: int = 8, base_delay: float = 0.03) -> T: + last: sqlite3.OperationalError | None = None + for i in range(retries): + try: + return action() + except sqlite3.OperationalError as exc: + if "locked" not in str(exc).lower(): + raise + last = exc + time.sleep(base_delay * (2 ** i)) + if last is not None: + raise last + raise RuntimeError("db retry failed") + + def ensure_account_risk_schema(conn) -> None: global _SCHEMA_READY if _SCHEMA_READY: @@ -209,67 +228,71 @@ def reduce_cooloff_after_journal(conn, *, trading_day: str, now: Optional[dateti def get_risk_status(conn, *, now: Optional[datetime] = None) -> dict: - ensure_account_risk_schema(conn) - row = conn.execute("SELECT * FROM account_risk_state WHERE id=1").fetchone() - td = trading_day_label(now) - stored = str(_row_get(row, "trading_day") or "") - if stored != td: - conn.execute( - "UPDATE account_risk_state SET trading_day=?, manual_close_count=0, daily_frozen=0 WHERE id=1 AND trading_day<>?", - (td, td), - ) + def _load() -> dict: + ensure_account_risk_schema(conn) row = conn.execute("SELECT * FROM account_risk_state WHERE id=1").fetchone() + td = trading_day_label(now) + stored = str(_row_get(row, "trading_day") or "") + if stored != td: + conn.execute( + "UPDATE account_risk_state SET trading_day=?, manual_close_count=0, daily_frozen=0 WHERE id=1 AND trading_day<>?", + (td, td), + ) + conn.commit() + row = conn.execute("SELECT * FROM account_risk_state WHERE id=1").fetchone() - now_ms = _now_ms(now) - daily = int(_row_get(row, "daily_frozen") or 0) == 1 - until = _row_get(row, "cooloff_until_ms") - active = count_active_trade_monitors(conn) - mx = max_active_positions() - pos_limit = active >= mx + now_ms = _now_ms(now) + daily = int(_row_get(row, "daily_frozen") or 0) == 1 + until = _row_get(row, "cooloff_until_ms") + active = count_active_trade_monitors(conn) + mx = max_active_positions() + pos_limit = active >= mx - if daily: + if daily: + return { + "status": STATUS_DAILY, + "status_label": STATUS_LABELS[STATUS_DAILY], + "can_trade": False, + "can_roll": False, + "reason": "当日日冻结,禁止新开仓", + "active_count": active, + "max_active_positions": mx, + } + if until and int(until) > now_ms: + rem = int((int(until) - now_ms) / 1000) + hours = float(_row_get(row, "cooloff_hours") or cooling_hours_manual()) + st = STATUS_FREEZE_1H if hours <= cooling_hours_manual_journal() + 0.01 else STATUS_FREEZE_4H + return { + "status": st, + "status_label": STATUS_LABELS[st], + "can_trade": False, + "can_roll": pos_limit, + "reason": f"冷静期中,剩余约 {rem // 3600}h {(rem % 3600) // 60}m", + "freeze_remaining_sec": rem, + "active_count": active, + "max_active_positions": mx, + } + if pos_limit: + return { + "status": STATUS_FREEZE_POSITION, + "status_label": STATUS_LABELS[STATUS_FREEZE_POSITION], + "can_trade": False, + "can_roll": True, + "reason": f"已达仓位上限 {active}/{mx}", + "active_count": active, + "max_active_positions": mx, + } return { - "status": STATUS_DAILY, - "status_label": STATUS_LABELS[STATUS_DAILY], - "can_trade": False, - "can_roll": False, - "reason": "当日日冻结,禁止新开仓", - "active_count": active, - "max_active_positions": mx, - } - if until and int(until) > now_ms: - rem = int((int(until) - now_ms) / 1000) - hours = float(_row_get(row, "cooloff_hours") or cooling_hours_manual()) - st = STATUS_FREEZE_1H if hours <= cooling_hours_manual_journal() + 0.01 else STATUS_FREEZE_4H - return { - "status": st, - "status_label": STATUS_LABELS[st], - "can_trade": False, - "can_roll": pos_limit, - "reason": f"冷静期中,剩余约 {rem // 3600}h {(rem % 3600) // 60}m", - "freeze_remaining_sec": rem, - "active_count": active, - "max_active_positions": mx, - } - if pos_limit: - return { - "status": STATUS_FREEZE_POSITION, - "status_label": STATUS_LABELS[STATUS_FREEZE_POSITION], - "can_trade": False, + "status": STATUS_NORMAL, + "status_label": STATUS_LABELS[STATUS_NORMAL], + "can_trade": True, "can_roll": True, - "reason": f"已达仓位上限 {active}/{mx}", + "reason": "可新开仓", "active_count": active, "max_active_positions": mx, } - return { - "status": STATUS_NORMAL, - "status_label": STATUS_LABELS[STATUS_NORMAL], - "can_trade": True, - "can_roll": True, - "reason": "可新开仓", - "active_count": active, - "max_active_positions": mx, - } + + return _db_retry(_load) def assert_can_open(conn) -> Optional[str]: diff --git a/static/js/trade.js b/static/js/trade.js index 7c7d797..af6c584 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -165,6 +165,6 @@ document.addEventListener('DOMContentLoaded', function () { pollPositions(); - pollTimer = setInterval(pollPositions, 2000); + pollTimer = setInterval(pollPositions, 3000); }); })(); diff --git a/strategy/strategy_db.py b/strategy/strategy_db.py index 451e378..06cbd22 100644 --- a/strategy/strategy_db.py +++ b/strategy/strategy_db.py @@ -116,7 +116,13 @@ CREATE TABLE IF NOT EXISTS ctp_sim_positions ( """ +_TABLES_READY = False + + def init_strategy_tables(conn) -> None: + global _TABLES_READY + if _TABLES_READY: + return for sql in ( ROLL_GROUPS_SQL, ROLL_LEGS_SQL, @@ -129,3 +135,5 @@ def init_strategy_tables(conn) -> None: conn.execute(sql) if not conn.execute("SELECT id FROM ctp_sim_account WHERE id=1").fetchone(): conn.execute("INSERT INTO ctp_sim_account (id, balance, available) VALUES (1, 100000, 100000)") + conn.commit() + _TABLES_READY = True