"""开仓委托:pending 状态跟踪、成交转正、超时撤单。""" from __future__ import annotations import logging import time from datetime import datetime from typing import Any, Callable, Optional from zoneinfo import ZoneInfo from market_sessions import is_trading_session from vnpy_bridge import ctp_cancel_order, ctp_list_active_orders, ctp_status logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") DEFAULT_PENDING_ORDER_TIMEOUT_SEC = 300 def parse_monitor_ts(raw: str) -> Optional[float]: s = (raw or "").strip() if not s: return None for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"): try: return datetime.strptime(s[:19], fmt).replace(tzinfo=TZ).timestamp() except ValueError: continue return None def pending_age_sec(mon: dict) -> float: ts = parse_monitor_ts(mon.get("open_time") or "") or parse_monitor_ts( str(mon.get("created_at") or "") ) if ts is None: return 0.0 return max(0.0, time.time() - ts) def pending_auto_cancel_remaining( mon: dict, *, timeout_sec: int = DEFAULT_PENDING_ORDER_TIMEOUT_SEC, ) -> int: limit = max(60, int(timeout_sec or DEFAULT_PENDING_ORDER_TIMEOUT_SEC)) return max(0, int(limit - pending_age_sec(mon))) def _match_symbol(ctp_sym: str, ths: str) -> bool: a = (ctp_sym or "").lower() b = (ths or "").lower() if a == b: return True if a and b and a.split(".")[0] == b.split(".")[0]: return True try: from ctp_symbol import ths_to_vnpy_symbol vnpy_sym, _ = ths_to_vnpy_symbol(ths) if a == vnpy_sym.lower(): return True except Exception: pass return False def _find_ctp_position(positions: list[dict], sym: str, direction: str) -> Optional[dict]: direction = (direction or "long").strip().lower() for p in positions or []: if int(p.get("lots") or 0) <= 0: continue if (p.get("direction") or "long") != direction: continue if _match_symbol(p.get("symbol") or "", sym): return p return None def reconcile_pending_orders( conn, mode: str, *, match_symbol_fn: Callable[[str, str], bool] | None = None, sync_monitor_fn: Callable[..., None] | None = None, capital: float = 0.0, list_positions_fn: Callable[..., list] | None = None, timeout_sec: int = DEFAULT_PENDING_ORDER_TIMEOUT_SEC, ) -> dict[str, int]: """同步 pending 委托:成交→active;超时/已撤→closed。""" limit_sec = max(60, int(timeout_sec or DEFAULT_PENDING_ORDER_TIMEOUT_SEC)) stats = {"promoted": 0, "cancelled": 0, "closed": 0} if not ctp_status(mode).get("connected"): return stats match = match_symbol_fn or _match_symbol positions = ( list_positions_fn(mode, refresh_if_empty=False, refresh_margin=False) if list_positions_fn else [] ) try: active_orders = { str(o.get("order_id") or ""): o for o in ctp_list_active_orders(mode) if o.get("order_id") } except Exception as exc: logger.debug("list active orders: %s", exc) active_orders = {} rows = conn.execute( "SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id ASC" ).fetchall() for r in rows: mon = dict(r) mid = int(mon["id"]) sym = mon.get("symbol") or "" direction = mon.get("direction") or "long" vt_oid = (mon.get("vt_order_id") or "").strip() age = pending_age_sec(mon) pos = _find_ctp_position(positions, sym, direction) if pos: conn.execute( "UPDATE trade_order_monitors SET status='active' WHERE id=?", (mid,), ) if sync_monitor_fn: sync_monitor_fn( conn, mid, sym, direction, mode, ctp=pos, capital=capital, ) stats["promoted"] += 1 continue if vt_oid and vt_oid in active_orders: if age >= limit_sec and is_trading_session(): if ctp_cancel_order(mode, vt_oid): conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mid,), ) stats["cancelled"] += 1 else: logger.warning("pending auto-cancel failed monitor=%s order=%s", mid, vt_oid) continue # 委托已不在活跃列表且无持仓:拒单/撤单/过期 if age >= 8: conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mid,), ) stats["closed"] += 1 if any(stats.values()): conn.commit() return stats def cancel_pending_monitor( conn, mon: dict, mode: str, ) -> tuple[bool, str]: """手动撤销 pending 开仓委托。""" mid = int(mon.get("id") or 0) vt_oid = (mon.get("vt_order_id") or "").strip() if vt_oid and ctp_status(mode).get("connected"): try: ctp_cancel_order(mode, vt_oid) except Exception as exc: logger.warning("cancel pending order monitor=%s: %s", mid, exc) conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mid,)) conn.commit() return True, "开仓委托已撤销"