# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """开仓挂单超时:后台定期 reconcile,不依赖 SSE 完整刷新。""" from __future__ import annotations import logging import threading import time from typing import Callable, Optional from vnpy_bridge import ctp_status logger = logging.getLogger(__name__) CHECK_INTERVAL_SEC = 10 IDLE_INTERVAL_SEC = 45 DISCONNECTED_SLEEP_SEC = 30 STARTUP_DELAY_SEC = 15 def start_pending_order_worker( *, db_path: str, get_mode_fn: Callable[[], str], init_tables_fn: Callable | None = None, get_capital_fn: Callable | None = None, reconcile_fn: Callable[..., dict], on_changed_fn: Callable[[], None] | None = None, interval: int = CHECK_INTERVAL_SEC, idle_interval: int = IDLE_INTERVAL_SEC, ) -> None: """后台线程:存在 pending 开仓监控时定期同步成交/超时撤单。""" from db_conn import connect_db def _loop() -> None: time.sleep(STARTUP_DELAY_SEC) while True: sleep_sec = max(5, idle_interval) try: mode = get_mode_fn() if not ctp_status(mode).get("connected"): time.sleep(DISCONNECTED_SLEEP_SEC) continue conn = connect_db(db_path) try: if init_tables_fn: init_tables_fn(conn) pending_n = conn.execute( "SELECT COUNT(*) AS n FROM trade_order_monitors WHERE status='pending'" ).fetchone()["n"] if pending_n <= 0: time.sleep(sleep_sec) continue sleep_sec = max(1, interval) capital = 0.0 if get_capital_fn: try: capital = float(get_capital_fn(conn) or 0) except Exception: capital = 0.0 stats = reconcile_fn(conn, mode, capital=capital) or {} if any(int(stats.get(k) or 0) for k in ("promoted", "cancelled", "closed")): logger.info( "pending worker reconcile: promoted=%s cancelled=%s closed=%s", stats.get("promoted", 0), stats.get("cancelled", 0), stats.get("closed", 0), ) if on_changed_fn: on_changed_fn() finally: conn.close() except Exception as exc: logger.warning("pending order worker: %s", exc) time.sleep(sleep_sec) threading.Thread(target=_loop, daemon=True, name="pending-order-worker").start()