Use direct PostgreSQL connections instead of pool to fix init deploy.
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+9
-46
@@ -17,15 +17,12 @@ DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), "futures.db")
|
|||||||
|
|
||||||
_backend_lock = threading.Lock()
|
_backend_lock = threading.Lock()
|
||||||
_backend: Optional[str] = None
|
_backend: Optional[str] = None
|
||||||
_pg_pool = None
|
|
||||||
_pg_pool_lock = threading.Lock()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import psycopg
|
import psycopg
|
||||||
from psycopg import OperationalError as PgOperationalError
|
from psycopg import OperationalError as PgOperationalError
|
||||||
from psycopg import IntegrityError as PgIntegrityError
|
from psycopg import IntegrityError as PgIntegrityError
|
||||||
from psycopg.rows import dict_row
|
from psycopg.rows import dict_row
|
||||||
from psycopg_pool import ConnectionPool
|
|
||||||
|
|
||||||
_PSYCOPG_OK = True
|
_PSYCOPG_OK = True
|
||||||
except ImportError:
|
except ImportError:
|
||||||
@@ -33,7 +30,6 @@ except ImportError:
|
|||||||
PgOperationalError = Exception # type: ignore[misc,assignment]
|
PgOperationalError = Exception # type: ignore[misc,assignment]
|
||||||
PgIntegrityError = Exception # type: ignore[misc,assignment]
|
PgIntegrityError = Exception # type: ignore[misc,assignment]
|
||||||
dict_row = None # type: ignore[assignment]
|
dict_row = None # type: ignore[assignment]
|
||||||
ConnectionPool = None # type: ignore[misc,assignment]
|
|
||||||
_PSYCOPG_OK = False
|
_PSYCOPG_OK = False
|
||||||
|
|
||||||
OperationalError = sqlite3.OperationalError
|
OperationalError = sqlite3.OperationalError
|
||||||
@@ -52,7 +48,7 @@ def db_backend() -> str:
|
|||||||
if url.startswith(("postgresql://", "postgres://")):
|
if url.startswith(("postgresql://", "postgres://")):
|
||||||
if not _PSYCOPG_OK:
|
if not _PSYCOPG_OK:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"已配置 DATABASE_URL 但未安装 psycopg,请执行: pip install 'psycopg[binary]' psycopg-pool"
|
"已配置 DATABASE_URL 但未安装 psycopg,请执行: pip install 'psycopg[binary]'"
|
||||||
)
|
)
|
||||||
_backend = "postgres"
|
_backend = "postgres"
|
||||||
else:
|
else:
|
||||||
@@ -216,16 +212,6 @@ class DbConnection:
|
|||||||
self._conn.rollback()
|
self._conn.rollback()
|
||||||
|
|
||||||
def close(self) -> None:
|
def close(self) -> None:
|
||||||
if self._backend == "postgres" and self._from_pool:
|
|
||||||
try:
|
|
||||||
self._conn.rollback()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
try:
|
|
||||||
self._conn.close()
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
return
|
|
||||||
try:
|
try:
|
||||||
self._conn.close()
|
self._conn.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
@@ -248,38 +234,18 @@ class DbConnection:
|
|||||||
self.close()
|
self.close()
|
||||||
|
|
||||||
|
|
||||||
def _pg_pool_instance() -> ConnectionPool:
|
|
||||||
global _pg_pool
|
|
||||||
if _pg_pool is not None:
|
|
||||||
return _pg_pool
|
|
||||||
with _pg_pool_lock:
|
|
||||||
if _pg_pool is not None:
|
|
||||||
return _pg_pool
|
|
||||||
url = (os.getenv("DATABASE_URL") or "").strip()
|
|
||||||
min_size = max(1, int(os.getenv("PG_POOL_MIN", "2") or 2))
|
|
||||||
max_size = max(min_size, int(os.getenv("PG_POOL_MAX", "20") or 20))
|
|
||||||
_pg_pool = ConnectionPool(
|
|
||||||
conninfo=url,
|
|
||||||
min_size=min_size,
|
|
||||||
max_size=max_size,
|
|
||||||
kwargs={"row_factory": dict_row},
|
|
||||||
open=True,
|
|
||||||
)
|
|
||||||
return _pg_pool
|
|
||||||
|
|
||||||
|
|
||||||
def connect_db(path: str | None = None) -> DbConnection:
|
def connect_db(path: str | None = None) -> DbConnection:
|
||||||
"""获取数据库连接。PostgreSQL 使用连接池;SQLite 每次新建连接(WAL)。"""
|
"""获取数据库连接。PostgreSQL / SQLite 均为每次新建连接(用毕 close)。"""
|
||||||
if is_postgres():
|
if is_postgres():
|
||||||
pool = _pg_pool_instance()
|
url = (os.getenv("DATABASE_URL") or "").strip()
|
||||||
raw = pool.getconn()
|
raw = psycopg.connect(url, row_factory=dict_row)
|
||||||
try:
|
try:
|
||||||
with raw.cursor() as cur:
|
with raw.cursor() as cur:
|
||||||
cur.execute("SET TIME ZONE 'Asia/Shanghai'")
|
cur.execute("SET TIME ZONE 'Asia/Shanghai'")
|
||||||
raw.commit()
|
raw.commit()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
return DbConnection("postgres", raw, from_pool=True)
|
return DbConnection("postgres", raw, from_pool=False)
|
||||||
|
|
||||||
db_path = path or DB_PATH
|
db_path = path or DB_PATH
|
||||||
raw = sqlite3.connect(db_path, timeout=30, check_same_thread=False)
|
raw = sqlite3.connect(db_path, timeout=30, check_same_thread=False)
|
||||||
@@ -293,11 +259,8 @@ def connect_db(path: str | None = None) -> DbConnection:
|
|||||||
|
|
||||||
|
|
||||||
def close_pg_pool() -> None:
|
def close_pg_pool() -> None:
|
||||||
global _pg_pool
|
"""兼容旧调用;当前 PostgreSQL 使用直连,无全局连接池。"""
|
||||||
with _pg_pool_lock:
|
return
|
||||||
if _pg_pool is not None:
|
|
||||||
_pg_pool.close()
|
|
||||||
_pg_pool = None
|
|
||||||
|
|
||||||
|
|
||||||
def execute_retry(
|
def execute_retry(
|
||||||
@@ -365,8 +328,8 @@ def is_db_contention_error(exc: BaseException) -> bool:
|
|||||||
|
|
||||||
|
|
||||||
def reset_backend_for_tests(backend: str | None = None) -> None:
|
def reset_backend_for_tests(backend: str | None = None) -> None:
|
||||||
"""测试用:重置后端检测与连接池。"""
|
"""测试用:重置后端检测。"""
|
||||||
global _backend, _pg_pool
|
global _backend
|
||||||
close_pg_pool()
|
close_pg_pool()
|
||||||
with _backend_lock:
|
with _backend_lock:
|
||||||
_backend = backend
|
_backend = backend
|
||||||
|
|||||||
@@ -11,4 +11,3 @@ vnpy_ctp>=6.7.11.4
|
|||||||
|
|
||||||
# PostgreSQL 生产库(配置 DATABASE_URL 时启用;未配置则仍用 SQLite)
|
# PostgreSQL 生产库(配置 DATABASE_URL 时启用;未配置则仍用 SQLite)
|
||||||
psycopg[binary]>=3.2.0
|
psycopg[binary]>=3.2.0
|
||||||
psycopg-pool>=3.2.0
|
|
||||||
|
|||||||
Reference in New Issue
Block a user