# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """开仓委托:pending 状态跟踪、成交转正、超时撤单。""" from __future__ import annotations import logging import time from datetime import datetime from typing import Any, Callable, Optional from zoneinfo import ZoneInfo from market_sessions import is_trading_session from vnpy_bridge import ctp_cancel_order, ctp_list_active_orders, ctp_status logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") DEFAULT_PENDING_ORDER_TIMEOUT_SEC = 300 # 报单刚提交后短暂等待 CTP 回报,避免误判为拒单 PENDING_ORDER_SETTLE_GRACE_SEC = 8 def pending_monitor_has_live_order( mon: dict, *, active_orders: dict[str, dict], active_order_list: list[dict], match_fn: Callable[[str, str], bool] | None = None, ) -> bool: """本地 pending 是否仍对应 CTP 柜台上的有效开仓委托。""" match = match_fn or _match_symbol sym = mon.get("symbol") or "" direction = mon.get("direction") or "long" vt_oid = (mon.get("vt_order_id") or "").strip() age = pending_age_sec(mon) if vt_oid and _vt_order_in_active(vt_oid, active_orders): return True if _symbol_open_order_active(active_order_list, sym, direction, match): return True if not vt_oid and age < PENDING_ORDER_SETTLE_GRACE_SEC: return True if vt_oid and age < PENDING_ORDER_SETTLE_GRACE_SEC: return True return False def parse_monitor_ts(raw: str) -> Optional[float]: s = (raw or "").strip() if not s: return None for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%dT%H:%M"): try: return datetime.strptime(s[:19], fmt).replace(tzinfo=TZ).timestamp() except ValueError: continue return None def pending_age_sec(mon: dict) -> float: ts = parse_monitor_ts(mon.get("open_time") or "") or parse_monitor_ts( str(mon.get("created_at") or "") ) if ts is None: return 0.0 return max(0.0, time.time() - ts) def pending_auto_cancel_remaining( mon: dict, *, timeout_sec: int = DEFAULT_PENDING_ORDER_TIMEOUT_SEC, ) -> int: limit = max(60, int(timeout_sec or DEFAULT_PENDING_ORDER_TIMEOUT_SEC)) return max(0, int(limit - pending_age_sec(mon))) def _match_symbol(ctp_sym: str, ths: str) -> bool: a = (ctp_sym or "").lower() b = (ths or "").lower() if a == b: return True if a and b and a.split(".")[0] == b.split(".")[0]: return True try: from ctp_symbol import ths_to_vnpy_symbol vnpy_sym, _ = ths_to_vnpy_symbol(ths) if a == vnpy_sym.lower(): return True except Exception: pass return False def _find_ctp_position(positions: list[dict], sym: str, direction: str) -> Optional[dict]: direction = (direction or "long").strip().lower() for p in positions or []: if int(p.get("lots") or 0) <= 0: continue if (p.get("direction") or "long") != direction: continue if _match_symbol(p.get("symbol") or "", sym): return p return None def _vt_order_in_active(vt_oid: str, active_orders: dict[str, dict]) -> bool: oid = (vt_oid or "").strip() if not oid: return False if oid in active_orders: return True tail = oid.rsplit("_", 1)[-1] for key in active_orders: if key == oid or key.endswith(tail) or oid.endswith(key): return True return False def _symbol_open_order_active( orders: list[dict], sym: str, direction: str, match_fn: Callable[[str, str], bool], ) -> Optional[dict]: direction = (direction or "long").strip().lower() for o in orders or []: offset_u = (o.get("offset") or "").upper() if offset_u and "OPEN" not in offset_u: continue if (o.get("direction") or "long") != direction: continue if match_fn(o.get("symbol") or "", sym): return o return None def reconcile_pending_orders( conn, mode: str, *, match_symbol_fn: Callable[[str, str], bool] | None = None, sync_monitor_fn: Callable[..., None] | None = None, capital: float = 0.0, list_positions_fn: Callable[..., list] | None = None, timeout_sec: int = DEFAULT_PENDING_ORDER_TIMEOUT_SEC, ) -> dict[str, int]: """同步 pending 委托:成交→active;超时/已撤→closed。""" limit_sec = max(60, int(timeout_sec or DEFAULT_PENDING_ORDER_TIMEOUT_SEC)) stats = {"promoted": 0, "cancelled": 0, "closed": 0} if not ctp_status(mode).get("connected"): return stats match = match_symbol_fn or _match_symbol positions = ( list_positions_fn(mode, refresh_if_empty=False, refresh_margin=False) if list_positions_fn else [] ) try: active_order_list = ctp_list_active_orders(mode) active_orders = {} for o in active_order_list: for key in (o.get("order_id"), o.get("vt_order_id")): if key: active_orders[str(key)] = o except Exception as exc: logger.debug("list active orders: %s", exc) active_order_list = [] active_orders = {} rows = conn.execute( "SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id ASC" ).fetchall() for r in rows: mon = dict(r) mid = int(mon["id"]) sym = mon.get("symbol") or "" direction = mon.get("direction") or "long" vt_oid = (mon.get("vt_order_id") or "").strip() age = pending_age_sec(mon) pos = _find_ctp_position(positions, sym, direction) if pos: conn.execute( "UPDATE trade_order_monitors SET status='active' WHERE id=?", (mid,), ) if sync_monitor_fn: sync_monitor_fn( conn, mid, sym, direction, mode, ctp=pos, capital=capital, ) stats["promoted"] += 1 continue if vt_oid and _vt_order_in_active(vt_oid, active_orders): if age >= limit_sec and is_trading_session(): if ctp_cancel_order(mode, vt_oid): conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mid,), ) stats["cancelled"] += 1 else: logger.warning("pending auto-cancel failed monitor=%s order=%s", mid, vt_oid) continue live_open = _symbol_open_order_active(active_order_list, sym, direction, match) if live_open: if age >= limit_sec and is_trading_session(): cancel_oid = ( vt_oid or live_open.get("vt_order_id") or live_open.get("order_id") or "" ) if cancel_oid and ctp_cancel_order(mode, cancel_oid): conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mid,), ) stats["cancelled"] += 1 continue # 有委托号但已不在 CTP 活跃列表且无持仓 → 拒单/已撤/终态 if vt_oid: if age < PENDING_ORDER_SETTLE_GRACE_SEC: continue conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mid,), ) stats["closed"] += 1 logger.info( "pending monitor=%s order=%s closed (no longer active on CTP)", mid, vt_oid, ) continue if age >= limit_sec: if vt_oid and is_trading_session(): if ctp_cancel_order(mode, vt_oid): conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mid,), ) stats["cancelled"] += 1 else: logger.info( "pending monitor=%s order=%s kept (cancel not confirmed)", mid, vt_oid, ) elif not vt_oid: conn.execute( "UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mid,), ) stats["closed"] += 1 if any(stats.values()): conn.commit() return stats def cancel_pending_monitor( conn, mon: dict, mode: str, ) -> tuple[bool, str]: """手动撤销 pending 开仓委托。""" mid = int(mon.get("id") or 0) vt_oid = (mon.get("vt_order_id") or "").strip() if vt_oid and ctp_status(mode).get("connected"): try: ctp_cancel_order(mode, vt_oid) except Exception as exc: logger.warning("cancel pending order monitor=%s: %s", mid, exc) conn.execute("UPDATE trade_order_monitors SET status='closed' WHERE id=?", (mid,)) conn.commit() return True, "开仓委托已撤销"