From 52aca456e9d29d131a4fca3f3213bae0ab458587 Mon Sep 17 00:00:00 2001 From: dekun Date: Wed, 1 Jul 2026 08:11:42 +0800 Subject: [PATCH] Add PostgreSQL production backend to eliminate SQLite lock contention. Support DATABASE_URL with connection pooling, pg_dump backups, SQLite migration script, and deploy_postgres.sh with docs. Co-authored-by: Cursor --- .env.example | 6 + ai_messages.py | 16 +- app.py | 20 +- ctp_premarket_connect.py | 33 +-- ctp_reconnect.py | 19 +- ctp_settings.py | 2 +- db_backup.py | 115 ++++++++-- db_conn.py | 310 ++++++++++++++++++++++++-- docs/BACKUP.md | 62 ++++-- docs/DEPLOY.md | 19 +- docs/INDEX.md | 1 + docs/POSTGRES.md | 271 ++++++++++++++++++++++ fee_specs.py | 8 +- install_trading.py | 176 ++++++++++++--- requirements.txt | 4 + risk/account_risk_lib.py | 10 +- scripts/deploy_postgres.sh | 101 +++++++++ scripts/migrate_sqlite_to_postgres.py | 136 +++++++++++ sl_tp_guard.py | 27 ++- static/js/dashboard.js | 9 +- static/js/trade.js | 8 +- templates/settings.html | 3 +- templates/trade.html | 2 +- 23 files changed, 1208 insertions(+), 150 deletions(-) create mode 100644 docs/POSTGRES.md create mode 100644 scripts/deploy_postgres.sh create mode 100644 scripts/migrate_sqlite_to_postgres.py diff --git a/.env.example b/.env.example index d20c6a0..cc498b4 100644 --- a/.env.example +++ b/.env.example @@ -53,3 +53,9 @@ MAX_ACTIVE_POSITIONS=1 RISK_DAILY_POSITION_LIMIT=5 RISK_DAILY_TRADING_RISK_PCT=2 TRADING_DAY_RESET_HOUR=8 + +# —— 数据库(生产推荐 PostgreSQL,见 docs/POSTGRES.md)—— +# 未配置 DATABASE_URL 时使用本地 SQLite futures.db +# DATABASE_URL=postgresql://qihuo:your_password@127.0.0.1:5432/qihuo +# PG_POOL_MIN=2 +# PG_POOL_MAX=20 diff --git a/ai_messages.py b/ai_messages.py index 623f663..091f73a 100644 --- a/ai_messages.py +++ b/ai_messages.py @@ -7,9 +7,8 @@ from __future__ import annotations import json -import sqlite3 from datetime import datetime -from typing import Any, Callable, Optional +from typing import Any, Optional from zoneinfo import ZoneInfo TZ = ZoneInfo("Asia/Shanghai") @@ -26,7 +25,7 @@ CREATE TABLE IF NOT EXISTS ai_messages ( """ -def ensure_ai_messages_table(conn: sqlite3.Connection) -> None: +def ensure_ai_messages_table(conn) -> None: conn.execute(CREATE_SQL) conn.execute( "CREATE INDEX IF NOT EXISTS idx_ai_messages_created ON ai_messages(created_at DESC)" @@ -34,7 +33,7 @@ def ensure_ai_messages_table(conn: sqlite3.Connection) -> None: def insert_ai_message( - conn: sqlite3.Connection, + conn, *, kind: str, title: str, @@ -45,13 +44,16 @@ def insert_ai_message( now = datetime.now(TZ).strftime("%Y-%m-%d %H:%M:%S") cur = conn.execute( """INSERT INTO ai_messages (kind, title, content, meta_json, created_at) - VALUES (?,?,?,?,?)""", + VALUES (?,?,?,?,?) RETURNING id""", (kind, title, content, json.dumps(meta or {}, ensure_ascii=False), now), ) - return int(cur.lastrowid) + row = cur.fetchone() + if row is not None: + return int(row["id"] if isinstance(row, dict) else row[0]) + return int(cur.lastrowid or 0) -def list_ai_messages(conn: sqlite3.Connection, *, limit: int = 100) -> list[dict]: +def list_ai_messages(conn, *, limit: int = 100) -> list[dict]: ensure_ai_messages_table(conn) rows = conn.execute( "SELECT * FROM ai_messages ORDER BY id DESC LIMIT ?", diff --git a/app.py b/app.py index 899a9d3..b7b3278 100644 --- a/app.py +++ b/app.py @@ -9,7 +9,6 @@ from locale_fix import ensure_process_locale ensure_process_locale() -import sqlite3 import time import threading import requests @@ -57,7 +56,7 @@ from kline_store import ensure_kline_tables from kline_stream import kline_hub, sse_format from kline_chart import generate_review_kline_chart, fetch_market_klines, MARKET_PERIODS from market import get_price as market_get_price, set_ths_refresh_token, get_quote_source_label -from db_conn import connect_db +from db_conn import OperationalError, connect_db, database_label, is_benign_migration_error, is_db_contention_error from admin_settings import save_admin_credentials from db_backup import ( backup_dir, @@ -292,10 +291,10 @@ def get_stats_data() -> dict: return data try: return refresh_stats_cache(conn, capital) - except sqlite3.OperationalError as exc: - if "locked" not in str(exc).lower(): + except OperationalError as exc: + if not is_db_contention_error(exc): raise - app.logger.warning("stats cache refresh locked, compute without save: %s", exc) + app.logger.warning("stats cache refresh contention, compute without save: %s", exc) return build_all_stats(conn, capital) finally: conn.close() @@ -373,8 +372,9 @@ def init_db(): for sql in migrations: try: c.execute(sql) - except sqlite3.OperationalError: - pass + except Exception as exc: + if not is_benign_migration_error(exc): + raise c.execute('''CREATE TABLE IF NOT EXISTS review_records (id INTEGER PRIMARY KEY AUTOINCREMENT, open_time TEXT, close_time TEXT, @@ -426,8 +426,9 @@ def init_db(): ): try: c.execute(sql) - except sqlite3.OperationalError: - pass + except Exception as exc: + if not is_benign_migration_error(exc): + raise ensure_kline_tables(conn) init_strategy_tables(conn) from risk.account_risk_lib import ensure_account_risk_schema @@ -532,6 +533,7 @@ def sync_admin_from_env(): init_db() +app.logger.info("数据库: %s", database_label()) def sync_ths_token(): diff --git a/ctp_premarket_connect.py b/ctp_premarket_connect.py index 9f8ad94..10543e4 100644 --- a/ctp_premarket_connect.py +++ b/ctp_premarket_connect.py @@ -3,7 +3,7 @@ # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md -"""CTP 按计划自动连接/断开:盘前 30 分钟连,日盘/夜盘收盘后 30 分钟断。""" +"""CTP 按计划自动连接:盘前 30 分钟检查;交易时段断线后台重连;不自动强制断开。""" from __future__ import annotations import logging @@ -18,11 +18,13 @@ from market_sessions import ( is_trading_session, should_keep_ctp_connected, ) -from vnpy_bridge import ctp_disconnect, ctp_start_connect, ctp_status +from vnpy_bridge import ctp_start_connect, ctp_status logger = logging.getLogger(__name__) CHECK_INTERVAL_SEC = 60 +TRADING_CHECK_INTERVAL_SEC = 15 +PREMARKET_CHECK_INTERVAL_SEC = 30 DEFAULT_MINUTES_BEFORE = 30 DEFAULT_MINUTES_AFTER = 30 @@ -49,14 +51,6 @@ def _scheduled_connect_enabled() -> bool: ) -def _scheduled_disconnect_enabled() -> bool: - return (os.getenv("CTP_POSTMARKET_DISCONNECT", "true") or "true").strip().lower() in ( - "1", - "true", - "yes", - ) - - def should_auto_connect_now(*, minutes_before: int | None = None) -> bool: """是否应保持/发起 CTP 连接(供重连、权限判断复用)。""" mins_b = premarket_minutes_before() if minutes_before is None else minutes_before @@ -76,7 +70,7 @@ def start_ctp_premarket_connect_worker( get_setting_fn: Callable[[str, str], str] | None = None, interval: int = CHECK_INTERVAL_SEC, ) -> None: - """盘前自动连接;日盘/夜盘收盘宽限结束后自动断开。""" + """盘前 30 分钟:未连接则自动连;已连接则不重复发起。不自动强制断开。""" def _loop() -> None: time.sleep(10) @@ -101,7 +95,7 @@ def start_ctp_premarket_connect_worker( logger.info("交易时段内自动连接 CTP [%s]", mode) elif in_postmarket_grace_window(minutes_after=mins_a): logger.info( - "盘后宽限期内保持/恢复 CTP 连接 [%s](收盘后 %d 分钟内)", + "盘后宽限期内恢复 CTP 连接 [%s](收盘后 %d 分钟内)", mode, mins_a, ) @@ -111,17 +105,10 @@ def start_ctp_premarket_connect_worker( mode, mins_b, ) - if not is_trading_session() and in_premarket_connect_window( - minutes_before=mins_b, - ): - sleep_sec = 30 - elif _scheduled_disconnect_enabled() and st.get("connected"): - ctp_disconnect() - logger.info( - "盘后自动断开 CTP [%s](日盘/夜盘结束 %d 分钟后)", - mode, - mins_a, - ) + if is_trading_session(): + sleep_sec = TRADING_CHECK_INTERVAL_SEC + elif in_premarket_connect_window(minutes_before=mins_b): + sleep_sec = PREMARKET_CHECK_INTERVAL_SEC except Exception as exc: logger.warning("CTP scheduled connect worker: %s", exc) time.sleep(sleep_sec) diff --git a/ctp_reconnect.py b/ctp_reconnect.py index 47581f5..8a674ee 100644 --- a/ctp_reconnect.py +++ b/ctp_reconnect.py @@ -12,12 +12,15 @@ import threading import time from typing import Callable -from ctp_premarket_connect import should_auto_connect_now +from ctp_premarket_connect import premarket_minutes_before, should_auto_connect_now +from market_sessions import in_premarket_connect_window, is_trading_session from vnpy_bridge import ctp_try_auto_reconnect logger = logging.getLogger(__name__) RECONNECT_INTERVAL_SEC = 60 +TRADING_RECONNECT_INTERVAL_SEC = 15 +PREMARKET_RECONNECT_INTERVAL_SEC = 30 def _auto_reconnect_enabled() -> bool: @@ -34,17 +37,23 @@ def start_ctp_reconnect_worker( get_setting_fn: Callable[[str, str], str] | None = None, interval: int = RECONNECT_INTERVAL_SEC, ) -> None: - """定时检测 CTP 连接;仅在交易时段或盘前窗口内尝试重连,避免非交易时段反复登录。""" + """交易时段 / 盘前窗口内检测 CTP;断线则后台自动重连。""" def _loop() -> None: while True: + sleep_sec = max(5, interval) try: if _auto_reconnect_enabled() and should_auto_connect_now(): mode = get_mode_fn() - if ctp_try_auto_reconnect(mode): - logger.debug("CTP 连接正常 [%s]", mode) + ctp_try_auto_reconnect(mode) + if is_trading_session(): + sleep_sec = TRADING_RECONNECT_INTERVAL_SEC + elif in_premarket_connect_window( + minutes_before=premarket_minutes_before(), + ): + sleep_sec = PREMARKET_RECONNECT_INTERVAL_SEC except Exception as exc: logger.warning("CTP reconnect worker: %s", exc) - time.sleep(max(5, interval)) + time.sleep(sleep_sec) threading.Thread(target=_loop, daemon=True, name="ctp-reconnect-worker").start() diff --git a/ctp_settings.py b/ctp_settings.py index cdb236b..68094c9 100644 --- a/ctp_settings.py +++ b/ctp_settings.py @@ -35,7 +35,7 @@ LIVE_FIELDS: tuple[tuple[str, str, str, str], ...] = ( PASSWORD_DB_KEYS = frozenset({"simnow_password", "ctp_live_password"}) CTP_AUTO_CONNECT_KEY = "ctp_auto_connect" -CTP_DISABLED_HINT = "CTP 自动连接已关闭(非交易时段不重连;开盘前 30 分钟仍会按计划连接)" +CTP_DISABLED_HINT = "CTP 自动连接已关闭(非交易时段不重连;开盘前 30 分钟及交易时段仍会按计划连接;断开请手动操作)" def is_ctp_auto_connect_enabled(get_setting=None) -> bool: diff --git a/db_backup.py b/db_backup.py index 9afb9de..960995f 100644 --- a/db_backup.py +++ b/db_backup.py @@ -3,7 +3,7 @@ # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md -"""SQLite 数据库自动备份:打包 futures.db 与 uploads,可在其他服务器恢复。""" +"""数据库备份:SQLite futures.db 或 PostgreSQL pg_dump,含 uploads 与一键恢复脚本。""" from __future__ import annotations import json @@ -12,6 +12,7 @@ import os import re import shutil import sqlite3 +import subprocess import tarfile import tempfile import threading @@ -21,7 +22,7 @@ from pathlib import Path from typing import Callable, Optional from zoneinfo import ZoneInfo -from db_conn import DB_PATH +from db_conn import DB_PATH, db_backend logger = logging.getLogger(__name__) @@ -44,9 +45,10 @@ RESTORE_MD = """# qihuo 备份恢复说明 | 文件/目录 | 说明 | |-----------|------| -| `futures.db` | SQLite 主库(账号、交易记录、设置等) | +| `futures.db` | SQLite 主库(仅 SQLite 模式备份) | +| `postgres_dump.sql` | PostgreSQL 逻辑备份(仅 PostgreSQL 模式) | | `uploads/` | 复盘截图与 K 线图(若备份时存在) | -| `manifest.json` | 备份元数据 | +| `manifest.json` | 备份元数据(含 `backend` 字段) | | `restore.sh` | 一键恢复脚本 | ## 快速恢复(推荐) @@ -62,30 +64,45 @@ chmod +x restore.sh ./restore.sh ``` -默认恢复到 **`/root/qihuo`**。指定目录: +默认恢复到 **`/root/qihuo`**(SQLite)或导入到 `.env` 中的 PostgreSQL(见 manifest)。 + +指定应用目录: ```bash RESTORE_DIR=/opt/qihuo ./restore.sh ``` -3. 在新服务器部署 qihuo 代码与 Python 环境(见 `docs/DEPLOY.md`) -4. 若恢复到 `/opt/qihuo`,将生成的 `futures.db`、`uploads/` 放入该目录 -5. 配置 `.env`(CTP 账号、SECRET_KEY 等),**不要**直接复制旧 `.env` 到公网环境 -6. 重启服务:`pm2 restart qihuo` +3. 在新服务器部署 qihuo 代码与 Python 环境(见 `docs/POSTGRES.md` / `docs/DEPLOY.md`) +4. 配置 `.env`(`DATABASE_URL` 或 SQLite、`SECRET_KEY`、CTP 账号等) +5. 重启服务:`pm2 restart qihuo` -## 手工恢复 +## PostgreSQL 恢复 + +若 `manifest.json` 中 `"backend": "postgres"`: + +1. 确保目标机已安装 PostgreSQL,且 `.env` 中 `DATABASE_URL` 指向空库或待覆盖库 +2. 执行 `./restore.sh`(会调用 `psql` 导入 `postgres_dump.sql`) + +手工导入: ```bash -mkdir -p /root/qihuo/uploads -cp futures.db /root/qihuo/futures.db -cp -a uploads/. /root/qihuo/uploads/ # 若有 uploads 目录 +export DATABASE_URL=postgresql://qihuo:密码@127.0.0.1:5432/qihuo +psql "$DATABASE_URL" -f postgres_dump.sql +``` + +## SQLite 手工恢复 + +```bash +mkdir -p /opt/qihuo/uploads +cp futures.db /opt/qihuo/futures.db +cp -a uploads/. /opt/qihuo/uploads/ ``` ## 注意 -- 恢复前请停止 qihuo 进程,避免覆盖正在使用的数据库 -- 恢复后首次启动会自动执行数据库迁移,一般无需手工改表 -- `.env` 含敏感信息,请单独安全传输,不要放入公开网盘 +- 恢复前请停止 qihuo 进程 +- `.env` 含敏感信息,请单独安全传输 +- 详见 `docs/POSTGRES.md` 与 `docs/BACKUP.md` """ @@ -142,12 +159,54 @@ def _backup_sqlite(src_path: str, dst_path: str) -> None: src.close() -def _write_restore_script(dest: Path, folder_name: str) -> None: +def _backup_postgres(dst_path: str) -> None: + url = (os.getenv("DATABASE_URL") or "").strip() + if not url: + raise RuntimeError("PostgreSQL 备份需要 DATABASE_URL") + env = os.environ.copy() + proc = subprocess.run( + ["pg_dump", "--no-owner", "--no-acl", "-f", dst_path, url], + capture_output=True, + text=True, + env=env, + check=False, + ) + if proc.returncode != 0: + raise RuntimeError(f"pg_dump 失败: {proc.stderr.strip() or proc.stdout.strip()}") + + +def _write_restore_script(dest: Path, *, backend: str) -> None: + pg_block = "" + if backend == "postgres": + pg_block = """ +if [ -f "$SCRIPT_DIR/postgres_dump.sql" ]; then + if [ -z "${DATABASE_URL:-}" ]; then + if [ -f "$RESTORE_DIR/.env" ]; then + set -a + # shellcheck disable=SC1090 + source "$RESTORE_DIR/.env" + set +a + fi + fi + if [ -z "${DATABASE_URL:-}" ]; then + echo "错误: PostgreSQL 恢复需要 DATABASE_URL(环境变量或 $RESTORE_DIR/.env)" + exit 1 + fi + if ! command -v psql >/dev/null; then + echo "错误: 未找到 psql,请先安装 PostgreSQL 客户端" + exit 1 + fi + echo "导入 PostgreSQL: postgres_dump.sql" + psql "$DATABASE_URL" -f "$SCRIPT_DIR/postgres_dump.sql" + echo "PostgreSQL 导入完成" +fi +""" script = f"""#!/bin/bash set -euo pipefail RESTORE_DIR="${{RESTORE_DIR:-{default_restore_dir()}}}" SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)" mkdir -p "$RESTORE_DIR/uploads" +{pg_block} if [ -f "$SCRIPT_DIR/futures.db" ]; then cp -f "$SCRIPT_DIR/futures.db" "$RESTORE_DIR/futures.db" echo "已复制 futures.db -> $RESTORE_DIR/futures.db" @@ -158,16 +217,19 @@ if [ -d "$SCRIPT_DIR/uploads" ]; then fi echo "" echo "恢复完成。目标目录: $RESTORE_DIR" -echo "下一步: 部署 qihuo 代码、配置 .env、pm2 restart qihuo" -echo "详见 RESTORE.md 与 docs/BACKUP.md" +echo "下一步: 确认 .env、pm2 restart qihuo" +echo "详见 RESTORE.md 与 docs/POSTGRES.md" """ dest.write_text(script, encoding="utf-8") def create_backup(*, include_uploads: bool = True) -> tuple[str, str]: """创建 tar.gz 备份,返回 (文件名, 说明)。""" - if not os.path.isfile(DB_PATH): + backend = db_backend() + if backend == "sqlite" and not os.path.isfile(DB_PATH): raise FileNotFoundError(f"数据库不存在: {DB_PATH}") + if backend == "postgres" and not (os.getenv("DATABASE_URL") or "").strip(): + raise RuntimeError("PostgreSQL 模式需要 DATABASE_URL") with _backup_lock: stamp = datetime.now(TZ).strftime("%Y%m%d_%H%M%S") @@ -180,15 +242,19 @@ def create_backup(*, include_uploads: bool = True) -> tuple[str, str]: with tempfile.TemporaryDirectory(prefix="qihuo_bak_") as tmp: work = Path(tmp) / folder_name work.mkdir() - _backup_sqlite(DB_PATH, str(work / "futures.db")) + if backend == "postgres": + _backup_postgres(str(work / "postgres_dump.sql")) + else: + _backup_sqlite(DB_PATH, str(work / "futures.db")) if include_uploads and upload_src.is_dir(): shutil.copytree(upload_src, work / "uploads", dirs_exist_ok=True) manifest = { "app": "qihuo", + "backend": backend, "created_at": datetime.now(TZ).isoformat(timespec="seconds"), - "db_path": DB_PATH, + "db_path": DB_PATH if backend == "sqlite" else (os.getenv("DATABASE_URL") or ""), "includes_uploads": include_uploads and upload_src.is_dir(), "default_restore_dir": default_restore_dir(), "files": sorted(p.name for p in work.iterdir()), @@ -198,13 +264,14 @@ def create_backup(*, include_uploads: bool = True) -> tuple[str, str]: encoding="utf-8", ) (work / "RESTORE.md").write_text(RESTORE_MD, encoding="utf-8") - _write_restore_script(work / "restore.sh", folder_name) + _write_restore_script(work / "restore.sh", backend=backend) with tarfile.open(out_path, "w:gz") as tar: tar.add(work, arcname=folder_name) size_mb = out_path.stat().st_size / (1024 * 1024) - return filename, f"备份已生成 {filename}({size_mb:.2f} MB)" + label = "PostgreSQL" if backend == "postgres" else "SQLite" + return filename, f"备份已生成 {filename}({label},{size_mb:.2f} MB)" def list_backups() -> list[dict]: diff --git a/db_conn.py b/db_conn.py index b6f607c..f438803 100644 --- a/db_conn.py +++ b/db_conn.py @@ -3,70 +3,344 @@ # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md -"""SQLite 连接统一配置(WAL + busy_timeout,降低并发锁冲突)。""" +"""数据库连接:开发默认 SQLite,生产推荐 PostgreSQL(DATABASE_URL)。""" from __future__ import annotations import os +import re import sqlite3 +import threading import time +from typing import Any, Iterable, Optional, Sequence DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db") +_backend_lock = threading.Lock() +_backend: Optional[str] = None +_pg_pool = None +_pg_pool_lock = threading.Lock() + +try: + import psycopg + from psycopg import OperationalError as PgOperationalError + from psycopg import IntegrityError as PgIntegrityError + from psycopg.rows import dict_row + from psycopg_pool import ConnectionPool + + _PSYCOPG_OK = True +except ImportError: + psycopg = None # type: ignore[assignment] + PgOperationalError = Exception # type: ignore[misc,assignment] + PgIntegrityError = Exception # type: ignore[misc,assignment] + dict_row = None # type: ignore[assignment] + ConnectionPool = None # type: ignore[misc,assignment] + _PSYCOPG_OK = False + +OperationalError = sqlite3.OperationalError +IntegrityError = sqlite3.IntegrityError + + +def db_backend() -> str: + """``sqlite`` 或 ``postgres``。""" + global _backend + if _backend is not None: + return _backend + with _backend_lock: + if _backend is not None: + return _backend + url = (os.getenv("DATABASE_URL") or "").strip() + if url.startswith(("postgresql://", "postgres://")): + if not _PSYCOPG_OK: + raise RuntimeError( + "已配置 DATABASE_URL 但未安装 psycopg,请执行: pip install 'psycopg[binary]' psycopg-pool" + ) + _backend = "postgres" + else: + _backend = "sqlite" + return _backend + + +def is_postgres() -> bool: + return db_backend() == "postgres" + + +def database_label() -> str: + if is_postgres(): + url = (os.getenv("DATABASE_URL") or "").strip() + host = url.split("@")[-1].split("/")[0] if "@" in url else "postgresql" + return f"PostgreSQL ({host})" + return f"SQLite ({DB_PATH})" + + +def adapt_sql(sql: str) -> str: + """将 SQLite 风格 SQL 适配为当前后端。""" + if not is_postgres(): + return sql + out = sql + out = re.sub( + r"\bINTEGER PRIMARY KEY AUTOINCREMENT\b", + "SERIAL PRIMARY KEY", + out, + flags=re.IGNORECASE, + ) + out = re.sub(r"\bAUTOINCREMENT\b", "", out, flags=re.IGNORECASE) + if "?" in out: + out = out.replace("?", "%s") + return out + + +def is_benign_migration_error(exc: BaseException) -> bool: + """ALTER TABLE 重复列等初始化迁移可忽略的错误。""" + msg = str(exc).lower() + if any( + x in msg + for x in ( + "duplicate column", + "already exists", + "duplicate key", + ) + ): + return True + if isinstance(exc, sqlite3.OperationalError) and "duplicate column" in msg: + return True + if _PSYCOPG_OK and isinstance(exc, PgOperationalError): + code = getattr(exc, "sqlstate", "") or "" + if code in ("42701", "42P07"): # duplicate_column, duplicate_table + return True + return False + + +class DbCursor: + """统一 cursor:兼容 sqlite3 的 execute / fetchone / lastrowid。""" + + def __init__(self, backend: str, raw_cursor: Any, raw_conn: Any) -> None: + self._backend = backend + self._cur = raw_cursor + self._conn = raw_conn + self.lastrowid: Optional[int] = None + self.rowcount: int = 0 + + def execute(self, sql: str, params: Sequence[Any] | None = None) -> "DbCursor": + sql = adapt_sql(sql) + params = params or () + self._cur.execute(sql, params) + self.rowcount = int(getattr(self._cur, "rowcount", 0) or 0) + self.lastrowid = getattr(self._cur, "lastrowid", None) + if self.lastrowid is None and is_postgres(): + if re.match(r"^\s*INSERT\b", sql, re.IGNORECASE): + try: + row = self._cur.fetchone() + if row is not None: + if isinstance(row, dict): + self.lastrowid = int(row.get("id") or row.get("Id") or 0) or None + else: + self.lastrowid = int(row[0]) + except Exception: + try: + self._cur.execute("SELECT lastval()") + lv = self._cur.fetchone() + if lv: + self.lastrowid = int(lv[0] if not isinstance(lv, dict) else lv["lastval"]) + except Exception: + pass + return self + + def fetchone(self) -> Any: + return self._cur.fetchone() + + def fetchall(self) -> list[Any]: + return self._cur.fetchall() + + def close(self) -> None: + try: + self._cur.close() + except Exception: + pass + + +class DbConnection: + """统一连接:execute / commit / close,接口对齐 sqlite3.Connection。""" + + def __init__( + self, + backend: str, + raw_conn: Any, + *, + from_pool: bool = False, + ) -> None: + self._backend = backend + self._conn = raw_conn + self._from_pool = from_pool + self.row_factory = None + + def execute(self, sql: str, params: Sequence[Any] | None = None) -> DbCursor: + cur = self.cursor() + return cur.execute(sql, params) + + def cursor(self) -> DbCursor: + if self._backend == "sqlite": + return DbCursor(self._backend, self._conn.cursor(), self._conn) + raw = self._conn.cursor(row_factory=dict_row) + return DbCursor(self._backend, raw, self._conn) + + def commit(self) -> None: + self._conn.commit() + + def rollback(self) -> None: + self._conn.rollback() + + def close(self) -> None: + if self._backend == "postgres" and self._from_pool: + try: + self._conn.rollback() + except Exception: + pass + try: + self._conn.close() + except Exception: + pass + return + try: + self._conn.close() + except Exception: + pass + + def __enter__(self) -> "DbConnection": + return self + + def __exit__(self, exc_type, exc, tb) -> None: + if exc: + try: + self.rollback() + except Exception: + pass + else: + try: + self.commit() + except Exception: + pass + self.close() + + +def _pg_pool_instance() -> ConnectionPool: + global _pg_pool + if _pg_pool is not None: + return _pg_pool + with _pg_pool_lock: + if _pg_pool is not None: + return _pg_pool + url = (os.getenv("DATABASE_URL") or "").strip() + min_size = max(1, int(os.getenv("PG_POOL_MIN", "2") or 2)) + max_size = max(min_size, int(os.getenv("PG_POOL_MAX", "20") or 20)) + _pg_pool = ConnectionPool( + conninfo=url, + min_size=min_size, + max_size=max_size, + kwargs={"row_factory": dict_row}, + open=True, + ) + return _pg_pool + + +def connect_db(path: str | None = None) -> DbConnection: + """获取数据库连接。PostgreSQL 使用连接池;SQLite 每次新建连接(WAL)。""" + if is_postgres(): + pool = _pg_pool_instance() + raw = pool.getconn() + try: + with raw.cursor() as cur: + cur.execute("SET TIME ZONE 'Asia/Shanghai'") + raw.commit() + except Exception: + pass + return DbConnection("postgres", raw, from_pool=True) -def connect_db(path: str | None = None) -> sqlite3.Connection: db_path = path or DB_PATH - conn = sqlite3.connect(db_path, timeout=30, check_same_thread=False) - conn.row_factory = sqlite3.Row - conn.execute("PRAGMA busy_timeout=30000") + raw = sqlite3.connect(db_path, timeout=30, check_same_thread=False) + raw.row_factory = sqlite3.Row + raw.execute("PRAGMA busy_timeout=30000") try: - conn.execute("PRAGMA journal_mode=WAL") + raw.execute("PRAGMA journal_mode=WAL") except sqlite3.OperationalError: pass - return conn + return DbConnection("sqlite", raw) + + +def close_pg_pool() -> None: + global _pg_pool + with _pg_pool_lock: + if _pg_pool is not None: + _pg_pool.close() + _pg_pool = None def execute_retry( - conn: sqlite3.Connection, + conn: DbConnection, sql: str, params: tuple = (), *, retries: int = 6, base_delay: float = 0.05, -) -> sqlite3.Cursor: - """遇 database is locked 时短暂退避重试。""" +) -> DbCursor: + """遇锁冲突时短暂退避重试(SQLite locked / PG serialization)。""" last_exc: Exception | None = None for attempt in range(retries): try: return conn.execute(sql, params) - except sqlite3.OperationalError as exc: - if "locked" not in str(exc).lower(): + except (OperationalError, PgOperationalError) as exc: + msg = str(exc).lower() + retryable = "locked" in msg or "serialize" in msg or "deadlock" in msg + if not retryable: raise last_exc = exc if attempt < retries - 1: time.sleep(base_delay * (attempt + 1)) if last_exc: raise last_exc - raise sqlite3.OperationalError("database is locked") + raise OperationalError("database is locked") def commit_retry( - conn: sqlite3.Connection, + conn: DbConnection, *, retries: int = 6, base_delay: float = 0.05, ) -> None: - """遇 database is locked 时短暂退避重试 commit。""" + """遇锁冲突时短暂退避重试 commit。""" last_exc: Exception | None = None for attempt in range(retries): try: conn.commit() return - except sqlite3.OperationalError as exc: - if "locked" not in str(exc).lower(): + except (OperationalError, PgOperationalError) as exc: + msg = str(exc).lower() + retryable = "locked" in msg or "serialize" in msg or "deadlock" in msg + if not retryable: raise last_exc = exc if attempt < retries - 1: time.sleep(base_delay * (attempt + 1)) if last_exc: raise last_exc - raise sqlite3.OperationalError("database is locked") + raise OperationalError("database is locked") + + +def is_db_contention_error(exc: BaseException) -> bool: + """SQLite locked / PostgreSQL serialization / deadlock。""" + msg = str(exc).lower() + if isinstance(exc, sqlite3.OperationalError): + return "locked" in msg + if _PSYCOPG_OK and isinstance(exc, PgOperationalError): + code = getattr(exc, "sqlstate", "") or "" + if code in ("40001", "40P01", "55P03"): + return True + return any(x in msg for x in ("deadlock", "serialize", "lock")) + return False + + +def reset_backend_for_tests(backend: str | None = None) -> None: + """测试用:重置后端检测与连接池。""" + global _backend, _pg_pool + close_pg_pool() + with _backend_lock: + _backend = backend diff --git a/docs/BACKUP.md b/docs/BACKUP.md index fa477a3..ebdb8ca 100644 --- a/docs/BACKUP.md +++ b/docs/BACKUP.md @@ -1,6 +1,15 @@ # 数据备份与恢复 -qihuo 支持自动备份 SQLite 数据库与复盘附件,生成可在其他 Linux 服务器恢复的压缩包。 +qihuo 支持自动备份数据库与复盘附件,生成可在其他 Linux 服务器恢复的压缩包。 + +存储后端由 `.env` 决定: + +| 后端 | 备份包内主文件 | 说明 | +|------|----------------|------| +| SQLite(默认) | `futures.db` | 本地单文件库 | +| PostgreSQL | `postgres_dump.sql` | `pg_dump` 逻辑备份 | + +PostgreSQL 部署与迁移见 **[POSTGRES.md](./POSTGRES.md)**。 --- @@ -8,13 +17,14 @@ qihuo 支持自动备份 SQLite 数据库与复盘附件,生成可在其他 Li | 内容 | 说明 | |------|------| -| `futures.db` | 主库:账号、交易记录、设置、统计缓存等 | +| `futures.db` | SQLite 主库(仅 SQLite 模式) | +| `postgres_dump.sql` | PostgreSQL 逻辑备份(仅 PostgreSQL 模式) | | `uploads/` | 复盘截图、自动 K 线图(若存在) | -| `manifest.json` | 备份时间与文件清单 | +| `manifest.json` | 备份时间、**backend** 字段、文件清单 | | `RESTORE.md` | 包内恢复说明 | | `restore.sh` | 一键恢复脚本 | -**不包含** `.env`(含 CTP 密码等敏感信息),请单独安全保管或在新服务器重新配置。 +**不包含** `.env`(含 CTP 密码、`DATABASE_URL` 等),请单独安全保管或在新服务器重新配置。 --- @@ -40,6 +50,8 @@ QIHUO_BACKUP_DIR=/data/qihuo_backup - **保留份数**:默认保留最近 **30** 份,超出自动删除最旧文件 - **下载**:列表中点击「下载」获取压缩包 +PostgreSQL 模式下需服务器已安装 `pg_dump`(`apt install postgresql-client` 或完整 `postgresql` 包)。 + --- ## 在新服务器恢复 @@ -55,6 +67,13 @@ cd /root tar -xzf qihuo_backup_20260626_030015.tar.gz cd qihuo_backup_20260626_030015 chmod +x restore.sh + +# SQLite:直接恢复 futures.db +RESTORE_DIR=/opt/qihuo ./restore.sh + +# PostgreSQL:先配置 /opt/qihuo/.env 的 DATABASE_URL,再执行 +export RESTORE_DIR=/opt/qihuo +# 若 .env 在 RESTORE_DIR 下且含 DATABASE_URL,restore.sh 会自动 source ./restore.sh ``` @@ -64,28 +83,32 @@ chmod +x restore.sh RESTORE_DIR=/opt/qihuo ./restore.sh ``` -也可通过环境变量固定默认恢复目录: - -```bash -QIHUO_RESTORE_DIR=/opt/qihuo -``` - -### 方式二:手工复制 +### 方式二:手工复制(SQLite) ```bash tar -xzf qihuo_backup_20260626_030015.tar.gz cd qihuo_backup_20260626_030015 -pm2 stop qihuo # 或停止当前进程 +pm2 stop qihuo cp futures.db /opt/qihuo/futures.db -cp -a uploads/. /opt/qihuo/uploads/ # 若有 uploads +cp -a uploads/. /opt/qihuo/uploads/ +pm2 restart qihuo +``` + +### 方式三:手工导入(PostgreSQL) + +```bash +pm2 stop qihuo +export DATABASE_URL=postgresql://qihuo:密码@127.0.0.1:5432/qihuo +psql "$DATABASE_URL" -f postgres_dump.sql +cp -a uploads/. /opt/qihuo/uploads/ pm2 restart qihuo ``` ### 恢复后检查清单 1. 已部署 qihuo 代码与 Python 虚拟环境(见 [DEPLOY.md](./DEPLOY.md)) -2. 已配置 `.env`(`SECRET_KEY`、CTP 账号等) -3. 数据库文件权限正确(运行用户可读写的 `futures.db`) +2. 已配置 `.env`(`DATABASE_URL` 或 SQLite、`SECRET_KEY`、CTP 账号等) +3. PostgreSQL:库已创建且 `DATABASE_URL` 可连接 4. 访问 Web 登录,检查交易记录、统计页是否正常 5. CTP 模式需在新环境重新连接柜台 @@ -94,8 +117,9 @@ pm2 restart qihuo ## 注意事项 - **恢复前务必停止 qihuo**,避免进程占用数据库导致覆盖不完整 -- 备份使用 SQLite `backup` API,并在 WAL 模式下尝试 checkpoint,降低锁冲突风险 -- 自动备份在应用后台线程执行,与 Web 服务同进程;PM2 重启不影响已生成的历史压缩包 +- SQLite 备份使用 SQLite `backup` API,并在 WAL 模式下尝试 checkpoint +- PostgreSQL 备份使用 `pg_dump`,恢复使用 `psql -f` +- 自动备份在应用后台线程执行,与 Web 服务同进程 - 大体积 `uploads/` 会使压缩包变大,可按需定期清理无用截图 - 不要将含 `.env`、数据库的压缩包上传到公开网盘 @@ -107,13 +131,15 @@ pm2 restart qihuo |------|------| | 设置页无备份列表 | 检查 `/root/qihuo_backup` 目录权限,进程需可写 | | 立即备份无反应 | 查看 PM2 日志;可能上一任务仍在进行 | +| PostgreSQL 备份失败 | 安装 `postgresql-client`;检查 `DATABASE_URL` | | 下载 404 | 文件名须为系统生成的 `qihuo_backup_*.tar.gz` | -| 恢复后无法登录 | 确认 `futures.db` 已覆盖到实际运行目录 | +| 恢复后无法登录 | 确认数据已导入实际使用的库(SQLite 文件或 PG) | | 恢复后 CTP 连不上 | 在新服务器配置正确的 `.env` CTP 参数 | --- ## 相关文档 +- [POSTGRES.md](./POSTGRES.md) — PostgreSQL 一键部署、迁移、备份恢复 - [DEPLOY.md](./DEPLOY.md) — 部署与目录结构 - [FEATURES.md](./FEATURES.md) — 功能与路由一览 diff --git a/docs/DEPLOY.md b/docs/DEPLOY.md index a527551..fa646e9 100644 --- a/docs/DEPLOY.md +++ b/docs/DEPLOY.md @@ -12,7 +12,7 @@ | 运行用户 | `root`(与 `deploy.sh` / PM2 配置一致) | | 服务端口 | `6600` | | 进程管理 | PM2,应用名 `qihuo` | -| 数据库 | SQLite `futures.db` | +| 数据库 | **生产推荐 PostgreSQL**(见 [POSTGRES.md](./POSTGRES.md));未配置 `DATABASE_URL` 时使用 SQLite `futures.db` | | 仓库 | https://git.bz121.com/dekun/qihuo.git | --- @@ -58,6 +58,21 @@ bash deploy.sh 部署完成后访问:`http://<服务器IP>:6600` +### PostgreSQL 生产库(推荐) + +消除 SQLite 并发 `database is locked`,一键安装 PostgreSQL 并迁移: + +```bash +cd /opt/qihuo +git pull +# 新装 PostgreSQL + 空库 +sudo bash scripts/deploy_postgres.sh +# 从现有 futures.db 迁移 +MIGRATE_SQLITE=1 sudo bash scripts/deploy_postgres.sh +``` + +完整说明、手动步骤、备份恢复见 **[POSTGRES.md](./POSTGRES.md)**。 + > 再次部署只需 `cd /opt/qihuo && bash deploy.sh`,无需手工装 locale 或改前置地址。 --- @@ -334,7 +349,7 @@ ufw allow 6600/tcp | **CTP 连接超时** | SimNow 地址/账号/非交易时段 | 核对 `.env` 与 SimNow 官网前置 | | **下单监控无持仓** | 未连接 CTP 或确实无仓 | 先点「连接 CTP」 | | **`Could not resolve host`** | 服务器 DNS 故障 | 配置 systemd-resolved 公共 DNS,见下方 | -| `database is locked` | SQLite 并发 | 更新代码后重启 | +| `database is locked` | SQLite 并发 | **推荐改 PostgreSQL**:`MIGRATE_SQLITE=1 bash scripts/deploy_postgres.sh`,见 [POSTGRES.md](./POSTGRES.md) | | `git pull` 冲突 | 本地有修改 / SCP 部署 | `git fetch && git reset --hard origin/main` | 查看应用是否在监听: diff --git a/docs/INDEX.md b/docs/INDEX.md index c06fadf..9bd5393 100644 --- a/docs/INDEX.md +++ b/docs/INDEX.md @@ -42,6 +42,7 @@ | [SIMNOW.md](./SIMNOW.md) | SimNow 仿真注册与接入 | | [CTP_LIVE.md](./CTP_LIVE.md) | **期货公司实盘 CTP** 与开平仓对比 | | [DEPLOY.md](./DEPLOY.md) | 部署说明 | +| [POSTGRES.md](./POSTGRES.md) | **PostgreSQL 生产库**(一键部署、迁移、备份恢复) | | [BACKUP.md](./BACKUP.md) | 数据备份与恢复 | --- diff --git a/docs/POSTGRES.md b/docs/POSTGRES.md new file mode 100644 index 0000000..d3624f3 --- /dev/null +++ b/docs/POSTGRES.md @@ -0,0 +1,271 @@ +# PostgreSQL 生产数据库 + +qihuo 支持两种存储后端: + +| 模式 | 配置 | 适用场景 | +|------|------|----------| +| **SQLite**(默认) | 不设置 `DATABASE_URL` | 本地开发、单机轻量试用 | +| **PostgreSQL**(推荐生产) | `.env` 中 `DATABASE_URL=postgresql://...` | 7×24 运行、多线程并发、消除 `database is locked` | + +配置 `DATABASE_URL` 后,应用自动使用 **连接池**(默认 2–20 连接),无需改业务代码。 + +--- + +## 为什么用 PostgreSQL + +SQLite 在同一文件上同一时刻只允许一个写者。qihuo 单进程内有多路后台线程(持仓刷新、止盈守护、挂单同步、统计缓存等)和 HTTP 请求同时写库,容易出现: + +``` +position worker failed: database is locked +bootstrap position snapshot: database is locked +``` + +PostgreSQL 面向并发读写设计,多连接、行级锁、连接池,与专业交易软件「服务端数据库 + 内存快照」的思路一致。 + +--- + +## 一键部署(新服务器 / 已有 qihuo) + +在已执行过 `deploy.sh` 的服务器上,以 **root** 运行: + +```bash +cd /opt/qihuo +git pull # 获取最新代码 +sudo bash scripts/deploy_postgres.sh +``` + +脚本会自动: + +1. 安装 `postgresql` / `postgresql-contrib` +2. 创建数据库 `qihuo`、用户 `qihuo`(随机密码,终端会打印) +3. 写入 `/opt/qihuo/.env` 的 `DATABASE_URL`、`PG_POOL_MIN`、`PG_POOL_MAX` +4. `pip install psycopg psycopg-pool` +5. 执行 `init_db()` 建表 +6. `pm2 restart qihuo --update-env` + +### 从现有 SQLite 迁移 + +若 `/opt/qihuo/futures.db` 已有数据: + +```bash +cd /opt/qihuo +MIGRATE_SQLITE=1 sudo bash scripts/deploy_postgres.sh +``` + +会: + +- 初始化 PostgreSQL 表结构 +- 运行 `scripts/migrate_sqlite_to_postgres.py` 导入全部表 +- 将旧库备份为 `futures.db.pre_pg.YYYYMMDD_HHMMSS`(可用 `BACKUP_SQLITE=0` 跳过) + +迁移前建议先做一次 Web 设置页 **立即备份** 或: + +```bash +cp /opt/qihuo/futures.db /root/futures.db.bak.$(date +%Y%m%d) +pm2 stop qihuo +MIGRATE_SQLITE=1 sudo bash scripts/deploy_postgres.sh +``` + +### 环境变量(可选) + +| 变量 | 默认 | 说明 | +|------|------|------| +| `APP_DIR` | `/opt/qihuo` | 应用目录 | +| `PG_DB` | `qihuo` | 数据库名 | +| `PG_USER` | `qihuo` | 数据库用户 | +| `PG_PASSWORD` | 随机 | 不设则脚本生成 | +| `PG_HOST` | `127.0.0.1` | 主机 | +| `PG_PORT` | `5432` | 端口 | +| `MIGRATE_SQLITE` | `0` | `1` 时从 `futures.db` 迁移 | +| `BACKUP_SQLITE` | `1` | 迁移后是否备份旧 SQLite 文件 | + +--- + +## 手动部署 + +### 1. 安装 PostgreSQL(Ubuntu) + +```bash +apt update +apt install -y postgresql postgresql-contrib +systemctl enable postgresql +systemctl start postgresql +``` + +### 2. 创建库与用户 + +```bash +sudo -u postgres psql <<'SQL' +CREATE USER qihuo WITH PASSWORD '请改为强密码'; +CREATE DATABASE qihuo OWNER qihuo; +GRANT ALL PRIVILEGES ON DATABASE qihuo TO qihuo; +SQL +``` + +### 3. 配置 `.env` + +```bash +cd /opt/qihuo +cat >> .env <<'EOF' + +DATABASE_URL=postgresql://qihuo:请改为强密码@127.0.0.1:5432/qihuo +PG_POOL_MIN=2 +PG_POOL_MAX=20 +EOF +``` + +### 4. 安装 Python 驱动并初始化 + +```bash +source venv/bin/activate +pip install -r requirements.txt +export $(grep -v '^#' .env | xargs) # 或手动 export DATABASE_URL +python3 -c "from app import init_db; init_db()" +``` + +### 5. 迁移 SQLite(可选) + +```bash +python3 scripts/migrate_sqlite_to_postgres.py --sqlite /opt/qihuo/futures.db +# 仅预览行数: +python3 scripts/migrate_sqlite_to_postgres.py --dry-run +``` + +### 6. 重启应用 + +```bash +pm2 restart qihuo --update-env +pm2 logs qihuo --lines 30 +``` + +启动后日志中不应再频繁出现 `database is locked`(SQLite 特有)。 + +--- + +## 连接池 + +| 变量 | 默认 | 说明 | +|------|------|------| +| `PG_POOL_MIN` | `2` | 池内最少连接 | +| `PG_POOL_MAX` | `20` | 池内最多连接 | + +每个 HTTP 请求 / 后台 worker 从池中借连接,用毕归还。PM2 请保持 **`instances: 1`**(见 `ecosystem.config.cjs`);若要多实例,共用同一 `DATABASE_URL` 即可,PostgreSQL 可承受。 + +--- + +## 备份 + +### 方式一:系统设置页(推荐) + +**系统设置 → 数据备份与恢复 → 立即备份** + +PostgreSQL 模式下包内为 `postgres_dump.sql`(`pg_dump` 逻辑备份),而非 `futures.db`。 + +### 方式二:命令行 + +```bash +# 需与 .env 中 DATABASE_URL 一致 +source /opt/qihuo/venv/bin/activate +set -a && source /opt/qihuo/.env && set +a +pg_dump --no-owner --no-acl -f /root/qihuo_backup/manual_$(date +%Y%m%d_%H%M%S).sql "$DATABASE_URL" +``` + +### 方式三:每日自动备份 + +设置页开启 **每日自动备份**(默认 03:00),保留份数默认 30。备份目录默认 `/root/qihuo_backup`。 + +详见 [BACKUP.md](./BACKUP.md)。 + +--- + +## 恢复 + +### 从 qihuo 备份包恢复(含 restore.sh) + +```bash +pm2 stop qihuo +cd /root +tar -xzf qihuo_backup_YYYYMMDD_HHMMSS.tar.gz +cd qihuo_backup_YYYYMMDD_HHMMSS +# 确保 /opt/qihuo/.env 已配置 DATABASE_URL +export RESTORE_DIR=/opt/qihuo +chmod +x restore.sh +./restore.sh +pm2 restart qihuo +``` + +`manifest.json` 中 `"backend": "postgres"` 表示包内为 `postgres_dump.sql`。 + +### 手工 psql 恢复 + +```bash +pm2 stop qihuo +export DATABASE_URL=postgresql://qihuo:密码@127.0.0.1:5432/qihuo +# 空库或需覆盖的库 +psql "$DATABASE_URL" -f /path/to/postgres_dump.sql +cp -a uploads_backup/. /opt/qihuo/uploads/ # 若有附件 +pm2 restart qihuo +``` + +### 恢复后检查 + +1. Web 登录正常 +2. **交易记录**、**统计** 页数据完整 +3. **系统设置** 中 CTP、资金等配置仍在 +4. 连接 CTP,持仓页刷新正常 +5. `pm2 logs qihuo` 无持续数据库报错 + +--- + +## 回退到 SQLite + +1. `pm2 stop qihuo` +2. 注释或删除 `.env` 中 `DATABASE_URL` +3. 确保 `/opt/qihuo/futures.db` 存在(可用迁移前备份 `futures.db.pre_pg.*`) +4. `pm2 restart qihuo` + +--- + +## 故障排查 + +| 现象 | 可能原因 | 处理 | +|------|----------|------| +| `未安装 psycopg` | 未 pip install | `pip install -r requirements.txt` | +| `pg_dump 失败` | 未装客户端 / URL 错误 | `apt install postgresql-client`;检查 `DATABASE_URL` | +| 迁移后缺表 | 未 init_db | `python3 -c "from app import init_db; init_db()"` 后重跑迁移 | +| 登录失败 | 只恢复了 SQL 未恢复 settings | 检查 `settings` 表是否有 `admin_password_hash` | +| 连接拒绝 | PostgreSQL 未启动 | `systemctl status postgresql` | +| 仍见 locked | 未切到 PG,仍用 SQLite | `grep DATABASE_URL /opt/qihuo/.env`;`pm2 restart --update-env` | + +### 验证当前后端 + +```bash +cd /opt/qihuo && source venv/bin/activate +set -a && source .env && set +a +python3 -c "from db_conn import database_label, db_backend; print(db_backend(), database_label())" +``` + +应输出 `postgres PostgreSQL (...)`。 + +### 查看 PostgreSQL 连接 + +```bash +sudo -u postgres psql -d qihuo -c "SELECT count(*) FROM pg_stat_activity WHERE datname='qihuo';" +``` + +--- + +## 安全建议 + +- `DATABASE_URL` 含密码,勿提交到 git;`.env` 权限建议 `chmod 600` +- 备份包、`postgres_dump.sql` 含交易与账号数据,勿上传公开网盘 +- 生产库仅监听 `127.0.0.1`,不暴露 5432 到公网 +- 定期测试 **备份 → 解压 → restore.sh → 登录** 全流程 + +--- + +## 相关文档 + +- [DEPLOY.md](./DEPLOY.md) — 应用一键部署 +- [BACKUP.md](./BACKUP.md) — 备份策略与设置页说明 +- [FEATURES.md](./FEATURES.md) — 功能与数据表概览 diff --git a/fee_specs.py b/fee_specs.py index 16c76f3..ea0cf42 100644 --- a/fee_specs.py +++ b/fee_specs.py @@ -7,13 +7,12 @@ import json import os import re -import sqlite3 from datetime import datetime from typing import Optional from contract_specs import get_contract_spec -from db_conn import connect_db +from db_conn import connect_db, is_benign_migration_error DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db") DATA_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data") @@ -54,8 +53,9 @@ def ensure_fee_rates_schema(conn=None) -> None: ): try: conn.execute(sql) - except sqlite3.OperationalError: - pass + except Exception as exc: + if not is_benign_migration_error(exc): + raise conn.commit() finally: if close: diff --git a/install_trading.py b/install_trading.py index 7538acb..e70ee58 100644 --- a/install_trading.py +++ b/install_trading.py @@ -488,6 +488,71 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se ) if ctp_positions: return + _ensure_monitors_from_sticky_state(conn, mode) + + def _ensure_monitors_from_sticky_state(conn, mode: str) -> None: + """vnpy 持仓空窗但账户仍有保证金时,恢复本地 active 监控。""" + if not ctp_status(mode).get("connected"): + return + margin_raw = ctp_account_margin_used(mode) + if margin_raw is None or float(margin_raw or 0) <= 0: + return + if count_active_trade_monitors(conn) > 0: + return + capital = _capital(conn) + for p in trading_state.get_positions() or []: + lots = int(p.get("lots") or 0) + if lots <= 0: + continue + direction = p.get("direction") or "long" + ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "") + if not ths: + continue + existing = _find_or_revive_monitor(conn, ths, direction) + if existing: + _sync_monitor_from_ctp( + conn, int(existing["id"]), ths, direction, mode, ctp=p, + capital=capital, + ) + continue + sl, tp, trailing_be, initial_sl = _restore_sl_tp_from_closed(conn, ths, direction) + mid = _upsert_open_monitor( + conn, + sym=ths, + direction=direction, + lots=lots, + price=float(p.get("avg_price") or 0), + sl=sl, + tp=tp, + trailing_be=trailing_be, + ctp_open_time=(p.get("open_time") or "").strip() or None, + monitor_type="ctp_sync", + ) + if initial_sl is not None and sl is not None: + conn.execute( + "UPDATE trade_order_monitors SET initial_stop_loss=? WHERE id=?", + (initial_sl, mid), + ) + if count_active_trade_monitors(conn) > 0: + return + today = datetime.now().strftime("%Y-%m-%d") + for r in conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='closed' " + "AND open_time LIKE ? ORDER BY id DESC LIMIT 5", + (f"{today}%",), + ).fetchall(): + mon = dict(r) + if int(mon.get("lots") or 0) <= 0: + continue + revived = _revive_closed_monitor( + conn, mon.get("symbol") or "", mon.get("direction") or "long", + ) + if revived: + logger.info( + "保证金占用下恢复监控 id=%s sym=%s", + revived.get("id"), revived.get("symbol"), + ) + break def _restore_recent_pending_monitors(conn, mode: str) -> None: """重启或 vnpy 委托缓存丢失时,恢复当日最近一笔可能仍有效的开仓挂单。""" @@ -1729,8 +1794,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se seen.add(rk) deduped.append(row) - if not deduped and ctp_status(mode).get("connected") and monitor_by_pk: - margin_used = float(ctp_account_margin_used(mode) or 0) + if not deduped and ctp_status(mode).get("connected"): + margin_raw = ctp_account_margin_used(mode) + margin_used = float(margin_raw or 0) if margin_raw is not None else 0.0 + has_margin_hint = margin_raw is not None and margin_used > 0 has_active_mon = any( int(m.get("lots") or 0) > 0 for m in monitor_by_pk.values() ) @@ -1741,7 +1808,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se ) except Exception: pass - if margin_used > 0 or has_active_mon or since_connect < 300: + if has_margin_hint or has_active_mon or since_connect < 300: + if not monitor_by_pk and has_margin_hint: + _ensure_monitors_from_sticky_state(conn, mode) + monitor_by_pk = _monitors_by_position_key(conn) for mon in monitor_by_pk.values(): lots = int(mon.get("lots") or 0) if lots <= 0: @@ -1776,6 +1846,41 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se except Exception as exc: logger.warning("compose monitor fallback row failed: %s", exc) + if not deduped and ctp_status(mode).get("connected"): + for r in conn.execute( + "SELECT * FROM trade_order_monitors WHERE status='active' ORDER BY id DESC" + ).fetchall(): + mon = dict(r) + lots = int(mon.get("lots") or 0) + if lots <= 0: + continue + sym = (mon.get("symbol") or "").strip() + direction = (mon.get("direction") or "long").strip().lower() + rk = _monitor_position_key(mon) + if rk in seen: + continue + if fast: + mon = _overlay_sl_tp_readonly(conn, mon, sym, direction) or mon + try: + row = _compose_position_row( + conn, + mon=mon, + ctp=None, + mode=mode, + capital=capital, + now_iso=now_iso, + fast=fast, + ) + if not row: + continue + row_key = row.get("key") or row.get("position_key") or rk + if row_key in seen: + continue + seen.add(row_key) + deduped.append(row) + except Exception as exc: + logger.warning("compose active monitor row failed: %s", exc) + return deduped def _build_trading_live_payload(conn, *, fast: bool = False) -> dict: @@ -1787,9 +1892,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se capital = _capital(conn) if ctp_st.get("connected") and (not fast or _has_pending_monitors(conn)): _reconcile_pending(conn, mode, capital=capital) - if ctp_st.get("connected") and not fast: - _ensure_monitors_from_ctp(conn, mode) - _sync_trade_monitors_with_ctp(conn, mode) + if ctp_st.get("connected"): + if not fast: + _ensure_monitors_from_ctp(conn, mode) + _sync_trade_monitors_with_ctp(conn, mode) + elif count_active_trade_monitors(conn) == 0: + margin_raw = ctp_account_margin_used(mode) + if margin_raw is not None and float(margin_raw) > 0: + _ensure_monitors_from_sticky_state(conn, mode) rows = _build_trading_live_rows(conn, fast=fast) active_orders = _build_active_orders( conn, mode=mode, capital=capital, now_iso=now_iso, @@ -1803,12 +1913,16 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se active_count=_effective_active_position_count(conn, mode), equity=capital, ) + margin_used = ( + ctp_account_margin_used(mode) if ctp_st.get("connected") else None + ) return { "ok": True, "rows": rows, "active_orders": active_orders, "pending_orders": pending_orders, "capital": capital, + "margin_used": margin_used, "ctp_status": ctp_st, "trading_mode_label": trading_mode_label(get_setting), "risk_status": risk, @@ -1841,18 +1955,34 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se payload = _build_trading_live_payload(conn, fast=fast) commit_retry(conn) prev = position_hub.get_snapshot() + active_n = int((payload.get("risk_status") or {}).get("active_count") or 0) if ( prev and ctp_status(mode).get("connected") and not (payload.get("rows") or []) and (prev.get("rows") or []) ): - margin_used = float(ctp_account_margin_used(mode) or 0) - if margin_used > 0 or trading_state.sync_state == "syncing": + margin_raw = payload.get("margin_used") + if margin_raw is None: + margin_raw = ctp_account_margin_used(mode) + margin_used = float(margin_raw or 0) if margin_raw is not None else 0.0 + if ( + (margin_raw is not None and margin_used > 0) + or trading_state.sync_state == "syncing" + or active_n > 0 + ): payload = dict(payload) payload["rows"] = prev["rows"] - payload["sync_state"] = "syncing" - payload["sync_label"] = "同步中…" + if trading_state.sync_state == "syncing": + payload["sync_state"] = "syncing" + payload["sync_label"] = "同步中…" + elif ( + ctp_status(mode).get("connected") + and not (payload.get("rows") or []) + and active_n > 0 + ): + payload = dict(payload) + payload["rows"] = _build_trading_live_rows(conn, fast=fast) return payload finally: conn.close() @@ -2143,15 +2273,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se @app.route("/api/trading/live") @login_required def api_trading_live(): - conn = get_db() - try: - init_strategy_tables(conn) - payload = _build_trading_live_payload(conn, fast=True) - commit_retry(conn) - position_hub.set_snapshot(payload) - return jsonify(payload) - finally: - conn.close() + payload = _refresh_trading_live_snapshot(fast=True) + return jsonify(payload) @app.route("/api/trading/stream") @login_required @@ -3183,7 +3306,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se risk_percent, capital_snapshot, plan_margin, target_lots, first_lots, remainder_lots, dca_legs, leg_amounts_json, grid_prices_json, first_order_done, avg_entry_price, lots_open, opened_at, period - ) VALUES ('active',?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,1,?,?,?,?)""", + ) VALUES ('active',?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,1,?,?,?,?) RETURNING id""", ( sym, sym_name or (codes.get("name", sym) if codes else sym), plan["direction"], plan["stop_loss"], plan["add_upper"], plan["take_profit"], @@ -3193,7 +3316,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se price, plan["first_lots"], now, plan["period"], ), ) - plan_id = cur.lastrowid + row = cur.fetchone() + plan_id = int(row["id"] if isinstance(row, dict) else row[0]) conn.commit() conn.close() send_wechat_msg(f"趋势回调首仓 {sym} {plan['first_lots']}手") @@ -3325,13 +3449,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se """INSERT INTO roll_groups ( order_monitor_id, symbol, direction, initial_take_profit, initial_stop_loss, current_stop_loss, risk_percent, leg_count, status, created_at, updated_at - ) VALUES (?,?,?,?,?,?,?,1,'active',?,?)""", + ) VALUES (?,?,?,?,?,?,?,1,'active',?,?) RETURNING id""", ( mon_id, sym, mon["direction"], mon["take_profit"], mon["stop_loss"], new_sl, risk_budget, now, now, ), ) - gid = int(cur.lastrowid) + row = cur.fetchone() + gid = int(row["id"] if isinstance(row, dict) else row[0]) leg_n = 1 if pending_leg_id: conn.execute( @@ -3417,13 +3542,14 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se """INSERT INTO roll_groups ( order_monitor_id, symbol, direction, initial_take_profit, initial_stop_loss, current_stop_loss, risk_percent, leg_count, status, created_at, updated_at - ) VALUES (?,?,?,?,?,?,?,0,'active',?,?)""", + ) VALUES (?,?,?,?,?,?,?,0,'active',?,?) RETURNING id""", ( mon_id, mon["symbol"], mon["direction"], mon["take_profit"], mon["stop_loss"], preview["new_stop_loss"], risk_budget, now, now, ), ) - gid = int(cur.lastrowid) + row = cur.fetchone() + gid = int(row["id"] if isinstance(row, dict) else row[0]) leg_n = int(conn.execute( "SELECT COUNT(*) AS n FROM roll_legs WHERE roll_group_id=? AND status=?", (gid, LEG_STATUS_FILLED), diff --git a/requirements.txt b/requirements.txt index 2ef565b..77f2740 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,7 @@ akshare==1.18.64 # CTP 下单:SimNow 模拟盘 / 期货公司实盘(见 docs/DEPLOY.md) vnpy>=3.9.0 vnpy_ctp>=6.7.11.4 + +# PostgreSQL 生产库(配置 DATABASE_URL 时启用;未配置则仍用 SQLite) +psycopg[binary]>=3.2.0 +psycopg-pool>=3.2.0 diff --git a/risk/account_risk_lib.py b/risk/account_risk_lib.py index 02d31f7..74d3a62 100644 --- a/risk/account_risk_lib.py +++ b/risk/account_risk_lib.py @@ -7,12 +7,13 @@ from __future__ import annotations import os -import sqlite3 import time from datetime import datetime from typing import Any, Callable, Optional, TypeVar from zoneinfo import ZoneInfo +from db_conn import OperationalError + T = TypeVar("T") STATUS_NORMAL = "normal" @@ -100,12 +101,13 @@ _SCHEMA_READY = False def _db_retry(action: Callable[[], T], *, retries: int = 8, base_delay: float = 0.03) -> T: - last: sqlite3.OperationalError | None = None + last: OperationalError | None = None for i in range(retries): try: return action() - except sqlite3.OperationalError as exc: - if "locked" not in str(exc).lower(): + except OperationalError as exc: + msg = str(exc).lower() + if "locked" not in msg and "serialize" not in msg and "deadlock" not in msg: raise last = exc time.sleep(base_delay * (2 ** i)) diff --git a/scripts/deploy_postgres.sh b/scripts/deploy_postgres.sh new file mode 100644 index 0000000..15540d1 --- /dev/null +++ b/scripts/deploy_postgres.sh @@ -0,0 +1,101 @@ +#!/usr/bin/env bash +# qihuo · PostgreSQL 一键部署 / 从 SQLite 迁移 +# 用法: sudo bash scripts/deploy_postgres.sh +# 可选: MIGRATE_SQLITE=1 自动从 /opt/qihuo/futures.db 迁移 + +set -euo pipefail + +APP_DIR="${APP_DIR:-/opt/qihuo}" +PG_DB="${PG_DB:-qihuo}" +PG_USER="${PG_USER:-qihuo}" +PG_HOST="${PG_HOST:-127.0.0.1}" +PG_PORT="${PG_PORT:-5432}" +MIGRATE_SQLITE="${MIGRATE_SQLITE:-0}" + +if [ "$(id -u)" -ne 0 ]; then + echo "请使用 root: sudo bash scripts/deploy_postgres.sh" + exit 1 +fi + +if [ ! -d "$APP_DIR" ]; then + echo "错误: 应用目录不存在 $APP_DIR,请先 bash deploy.sh" + exit 1 +fi + +echo "==> 安装 PostgreSQL..." +export DEBIAN_FRONTEND=noninteractive +apt-get update -qq +apt-get install -y postgresql postgresql-contrib + +echo "==> 创建数据库与用户..." +if [ -z "${PG_PASSWORD:-}" ]; then + PG_PASSWORD="$(python3 -c 'import secrets; print(secrets.token_urlsafe(16))')" +fi + +sudo -u postgres psql -v ON_ERROR_STOP=1 < 写入 .env DATABASE_URL..." +ENV_FILE="$APP_DIR/.env" +if grep -q "^DATABASE_URL=" "$ENV_FILE" 2>/dev/null; then + sed -i "s|^DATABASE_URL=.*|DATABASE_URL=${DATABASE_URL}|" "$ENV_FILE" +else + echo "" >>"$ENV_FILE" + echo "# PostgreSQL(生产推荐,消除 SQLite 并发锁)" >>"$ENV_FILE" + echo "DATABASE_URL=${DATABASE_URL}" >>"$ENV_FILE" + echo "PG_POOL_MIN=2" >>"$ENV_FILE" + echo "PG_POOL_MAX=20" >>"$ENV_FILE" +fi + +echo "==> Python 依赖..." +# shellcheck disable=SC1091 +source "$APP_DIR/venv/bin/activate" +pip install -q -r "$APP_DIR/requirements.txt" + +echo "==> 初始化 PostgreSQL 表结构..." +cd "$APP_DIR" +export DATABASE_URL +python3 -c "from app import init_db; init_db(); from db_conn import database_label; print('OK:', database_label())" + +if [ "$MIGRATE_SQLITE" = "1" ] && [ -f "$APP_DIR/futures.db" ]; then + echo "==> 从 SQLite 迁移数据..." + python3 "$APP_DIR/scripts/migrate_sqlite_to_postgres.py" --sqlite "$APP_DIR/futures.db" + if [ "${BACKUP_SQLITE:-1}" = "1" ]; then + BAK="$APP_DIR/futures.db.pre_pg.$(date +%Y%m%d_%H%M%S)" + cp -a "$APP_DIR/futures.db" "$BAK" + echo " 已备份旧库: $BAK" + fi +elif [ -f "$APP_DIR/futures.db" ]; then + echo "提示: 检测到 futures.db,如需迁移请: MIGRATE_SQLITE=1 bash scripts/deploy_postgres.sh" +fi + +echo "==> 重启 PM2..." +if pm2 describe qihuo &>/dev/null; then + pm2 restart ecosystem.config.cjs --update-env +else + pm2 start "$APP_DIR/ecosystem.config.cjs" +fi +pm2 save + +echo "" +echo "==========================================" +echo " PostgreSQL 部署完成" +echo " DATABASE_URL=${DATABASE_URL}" +echo " 请妥善保存数据库密码: ${PG_PASSWORD}" +echo " 文档: docs/POSTGRES.md" +echo " 备份: 系统设置页 或 pg_dump / 自动备份" +echo "==========================================" diff --git a/scripts/migrate_sqlite_to_postgres.py b/scripts/migrate_sqlite_to_postgres.py new file mode 100644 index 0000000..720af8a --- /dev/null +++ b/scripts/migrate_sqlite_to_postgres.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +# Copyright (c) 2025-2026 马建军. All rights reserved. +"""将 SQLite futures.db 迁移到 PostgreSQL(需已配置 DATABASE_URL 并 init 空库)。""" +from __future__ import annotations + +import argparse +import os +import sqlite3 +import sys +from pathlib import Path + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + +from dotenv import load_dotenv + +load_dotenv(ROOT / ".env") + +from db_conn import DB_PATH, connect_db, db_backend, is_postgres # noqa: E402 + + +def _sqlite_tables(conn: sqlite3.Connection) -> list[str]: + rows = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name" + ).fetchall() + return [r[0] for r in rows] + + +def _pg_columns(pg_conn, table: str) -> list[str]: + rows = pg_conn.execute( + """SELECT column_name FROM information_schema.columns + WHERE table_schema='public' AND table_name=%s + ORDER BY ordinal_position""", + (table,), + ).fetchall() + return [r["column_name"] for r in rows] + + +def _reset_sequences(pg_conn, table: str, pk: str = "id") -> None: + try: + pg_conn.execute( + f"""SELECT setval( + pg_get_serial_sequence('{table}', '{pk}'), + COALESCE((SELECT MAX({pk}) FROM {table}), 1), + true + )""" + ) + except Exception: + pass + + +def migrate(*, sqlite_path: str | None = None, dry_run: bool = False) -> dict: + if not is_postgres(): + raise RuntimeError("请先配置 DATABASE_URL=postgresql://... 后再运行迁移") + + src_path = sqlite_path or DB_PATH + if not os.path.isfile(src_path): + raise FileNotFoundError(f"SQLite 源库不存在: {src_path}") + + print(f"==> 源库: {src_path}") + print(f"==> 目标: PostgreSQL ({os.getenv('DATABASE_URL', '').split('@')[-1]})") + + if not dry_run: + print("==> 初始化 PostgreSQL 表结构...") + from app import init_db + + init_db() + + src = sqlite3.connect(src_path) + src.row_factory = sqlite3.Row + dst = connect_db() + + stats: dict[str, int] = {} + tables = _sqlite_tables(src) + print(f"==> 共 {len(tables)} 张表: {', '.join(tables)}") + + try: + for table in tables: + pg_cols = _pg_columns(dst, table) + if not pg_cols: + print(f" 跳过 {table}(PostgreSQL 无此表,请先 init_db)") + continue + src_cols = [c[1] for c in src.execute(f"PRAGMA table_info({table})").fetchall()] + cols = [c for c in src_cols if c in pg_cols] + if not cols: + print(f" 跳过 {table}(无共同列)") + continue + rows = src.execute(f"SELECT {', '.join(cols)} FROM {table}").fetchall() + if dry_run: + stats[table] = len(rows) + print(f" [dry-run] {table}: {len(rows)} 行") + continue + if not rows: + stats[table] = 0 + continue + dst.execute(f"DELETE FROM {table}") + placeholders = ", ".join(["?"] * len(cols)) + col_sql = ", ".join(cols) + insert_sql = f"INSERT INTO {table} ({col_sql}) VALUES ({placeholders})" + for row in rows: + dst.execute(insert_sql, tuple(row[c] for c in cols)) + stats[table] = len(rows) + if "id" in cols: + _reset_sequences(dst, table, "id") + print(f" {table}: {len(rows)} 行") + if not dry_run: + dst.commit() + finally: + src.close() + dst.close() + + total = sum(stats.values()) + print(f"==> 完成,共迁移 {total} 行") + return stats + + +def main() -> int: + parser = argparse.ArgumentParser(description="SQLite -> PostgreSQL 数据迁移") + parser.add_argument("--sqlite", default=DB_PATH, help=f"SQLite 路径,默认 {DB_PATH}") + parser.add_argument("--dry-run", action="store_true", help="仅统计行数,不写入") + args = parser.parse_args() + + if db_backend() != "postgres": + print("错误: 未检测到 DATABASE_URL(postgresql://...)", file=sys.stderr) + return 1 + try: + migrate(sqlite_path=args.sqlite, dry_run=args.dry_run) + except Exception as exc: + print(f"迁移失败: {exc}", file=sys.stderr) + return 1 + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/sl_tp_guard.py b/sl_tp_guard.py index 1291d13..52e2c6c 100644 --- a/sl_tp_guard.py +++ b/sl_tp_guard.py @@ -616,10 +616,25 @@ def reconcile_monitors_without_position(conn, mode: str, *, grace_sec: int = 120 sym = (p.get("symbol") or "").lower() direction = p.get("direction") or "long" position_keys.add((sym, direction)) + try: + from ctp_trading_state import trading_state - margin_used = ctp_account_margin_used(mode) or 0.0 + for p in trading_state.get_positions() or []: + lots = int(p.get("lots") or 0) + if lots <= 0: + continue + sym = (p.get("symbol") or "").lower() + direction = p.get("direction") or "long" + position_keys.add((sym, direction)) + except Exception: + pass + + margin_raw = ctp_account_margin_used(mode) + if margin_raw is None: + return 0 + margin_used = float(margin_raw or 0.0) if not position_keys: - if margin_used > 100: + if margin_used > 0: return 0 try: bridge = get_bridge() @@ -686,6 +701,14 @@ def _execute_local_close( positions = ctp_list_positions(mode) pos = _find_position(positions, sym, direction) if not pos: + margin_raw = ctp_account_margin_used(mode) + if margin_raw is not None and float(margin_raw) > 0: + logger.debug( + "skip close monitor=%s: vnpy empty but margin=%.2f", + mon.get("id"), + float(margin_raw), + ) + return _close_all_monitors_for_symbol(conn, sym, direction) reconcile_monitors_without_position(conn, mode) return diff --git a/static/js/dashboard.js b/static/js/dashboard.js index 65e8538..34d4fa4 100644 --- a/static/js/dashboard.js +++ b/static/js/dashboard.js @@ -859,8 +859,13 @@ equityEl.textContent = fmtMoney(data.capital); } var rows = positionRows(data); - if (!rows.length && data.sync_state === 'syncing' && lastPosRows.length) { - rows = lastPosRows; + if (!rows.length && lastPosRows.length) { + var keepSticky = data.sync_state === 'syncing' + || Number(data.margin_used) > 0 + || (data.risk_status && Number(data.risk_status.active_count) > 0); + if (keepSticky) { + rows = lastPosRows; + } } var sig = rows.map(function (r) { var key = r.key || r.position_key || ((r.symbol_code || '') + ':' + (r.direction || '')); diff --git a/static/js/trade.js b/static/js/trade.js index 1fa521e..ccd7184 100644 --- a/static/js/trade.js +++ b/static/js/trade.js @@ -331,7 +331,9 @@ } var syncing = data.sync_state === 'syncing'; var hadPos = lastPosRowCount > 0 || !!list.querySelector('.pos-card'); - if (syncing || hadPos) { + var marginOpen = Number(data.margin_used) > 0; + var riskActive = data.risk_status && Number(data.risk_status.active_count) > 0; + if (syncing || hadPos || marginOpen || riskActive) { if (syncBadge) { syncBadge.hidden = false; syncBadge.textContent = data.sync_label || '持仓同步中…'; @@ -543,8 +545,8 @@ var hint = document.getElementById('ctp-auto-hint'); if (hint) { hint.textContent = ctpAutoConnectEnabled - ? '断线自动重连 · 开盘前 30 分钟自动连接' - : '自动连接已关闭 · 开盘前 30 分钟仍会按计划连接'; + ? '交易时段断线自动重连 · 开盘前 30 分钟检查连接 · 不自动强制断开' + : '自动连接已关闭 · 盘前 30 分钟及交易时段仍会按计划连接 · 断开请手动操作'; } if (btnConnect && !ctpAutoConnectEnabled) { btnConnect.disabled = true; diff --git a/templates/settings.html b/templates/settings.html index 51d6d78..abeca82 100644 --- a/templates/settings.html +++ b/templates/settings.html @@ -270,8 +270,7 @@ CTP 自动连接 - 开启:盘前自动连接、断线重连、持仓页可连 CTP。关闭:立即断开,非交易时段不再重连;开盘前 30 分钟及交易时段仍会自动连接。 - SimNow 非交易时段前置常不可用(与快期相同),建议收盘后关闭。 + 开启:盘前 30 分钟检查连接(已连则不重复发起)、交易时段断线自动重连、持仓页可连 CTP。关闭:立即断开,非交易时段不再重连;开盘前 30 分钟及交易时段仍会自动连接。系统不会自动强制断线,休盘后请按需手动断开。 diff --git a/templates/trade.html b/templates/trade.html index 08877f4..e56c6e5 100644 --- a/templates/trade.html +++ b/templates/trade.html @@ -40,7 +40,7 @@ {% if not ctp_auto_connect %}disabled title="请先在系统设置 → CTP 连接 中开启自动连接"{% endif %}> {% if ctp_status.connected %}重连 CTP{% else %}连接 CTP{% endif %} - {% if ctp_auto_connect %}断线自动重连 · 开盘前 30 分钟连接 · 日盘/夜盘收盘后 30 分钟断开{% else %}手动连接已关闭 · 仍按交易时段计划自动连/断(盘前 30 分连、收盘 30 分后断){% endif %} + {% if ctp_auto_connect %}交易时段断线自动重连 · 开盘前 30 分钟检查连接 · 不自动强制断开{% else %}自动连接已关闭 · 盘前 30 分钟及交易时段仍会按计划连接 · 断开请手动操作{% endif %}