From f2940d41e9a142f4e83af73f955725f26baf720b Mon Sep 17 00:00:00 2001 From: dekun Date: Fri, 26 Jun 2026 22:21:15 +0800 Subject: [PATCH] Fix stale pending orders after CTP rejection or cancel. When the exchange rejects or cancels an order, close local pending monitors once the order leaves CTP active list instead of waiting for the full timeout. Co-authored-by: Cursor --- install_trading.py | 35 ++++++++++++ order_pending.py | 46 +++++++++++++++- scripts/deploy_pending_fix.py | 95 ++++++++++++++++++++++++++++++++ scripts/verify_pending_server.py | 55 ++++++++++++++++++ 4 files changed, 229 insertions(+), 2 deletions(-) create mode 100644 scripts/deploy_pending_fix.py create mode 100644 scripts/verify_pending_server.py diff --git a/install_trading.py b/install_trading.py index 7abf243..3de840a 100644 --- a/install_trading.py +++ b/install_trading.py @@ -41,6 +41,7 @@ from ctp_fee_worker import start_ctp_fee_worker from order_pending import ( cancel_pending_monitor, pending_auto_cancel_remaining, + pending_monitor_has_live_order, reconcile_pending_orders, ) from db_conn import execute_retry @@ -1305,11 +1306,23 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se except Exception as exc: logger.warning("compose ctp order row failed: %s", exc) + ctp_active_map: dict[str, dict] = {} + for o in ctp_orders or []: + for key in (o.get("order_id"), o.get("vt_order_id")): + if key: + ctp_active_map[str(key)] = o + for r in conn.execute( "SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC" ).fetchall(): mon = dict(r) try: + if not pending_monitor_has_live_order( + mon, + active_orders=ctp_active_map, + active_order_list=ctp_orders or [], + ): + continue prow = _compose_pending_row( mon, mode=mode, capital=capital, now_iso=now_iso, ) @@ -2207,6 +2220,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "SELECT status FROM trade_order_monitors WHERE id=?", (mid,), ).fetchone() filled = st_row and (st_row["status"] or "").strip().lower() == "active" + rejected = st_row and (st_row["status"] or "").strip().lower() == "closed" + if rejected: + conn.commit() + conn.close() + _push_position_snapshot_async(fast=False) + return jsonify({ + "ok": False, + "error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)", + "lots": lots, + "filled": False, + }), 400 if not filled: try: get_bridge().refresh_positions() @@ -2217,6 +2241,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "SELECT status FROM trade_order_monitors WHERE id=?", (mid,), ).fetchone() filled = st_row and (st_row["status"] or "").strip().lower() == "active" + rejected = st_row and (st_row["status"] or "").strip().lower() == "closed" + if rejected: + conn.commit() + conn.close() + _push_position_snapshot_async(fast=False) + return jsonify({ + "ok": False, + "error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)", + "lots": lots, + "filled": False, + }), 400 if filled: _sync_monitor_from_ctp( conn, mid, sym, direction, mode, capital=_capital(conn), diff --git a/order_pending.py b/order_pending.py index 949a9c5..4f703fc 100644 --- a/order_pending.py +++ b/order_pending.py @@ -19,6 +19,33 @@ logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") DEFAULT_PENDING_ORDER_TIMEOUT_SEC = 300 +# 报单刚提交后短暂等待 CTP 回报,避免误判为拒单 +PENDING_ORDER_SETTLE_GRACE_SEC = 8 + + +def pending_monitor_has_live_order( + mon: dict, + *, + active_orders: dict[str, dict], + active_order_list: list[dict], + match_fn: Callable[[str, str], bool] | None = None, +) -> bool: + """本地 pending 是否仍对应 CTP 柜台上的有效开仓委托。""" + match = match_fn or _match_symbol + sym = mon.get("symbol") or "" + direction = mon.get("direction") or "long" + vt_oid = (mon.get("vt_order_id") or "").strip() + age = pending_age_sec(mon) + + if vt_oid and _vt_order_in_active(vt_oid, active_orders): + return True + if _symbol_open_order_active(active_order_list, sym, direction, match): + return True + if not vt_oid and age < PENDING_ORDER_SETTLE_GRACE_SEC: + return True + if vt_oid and age < PENDING_ORDER_SETTLE_GRACE_SEC: + return True + return False def parse_monitor_ts(raw: str) -> Optional[float]: @@ -183,8 +210,8 @@ def reconcile_pending_orders( continue live_open = _symbol_open_order_active(active_order_list, sym, direction, match) - if live_open or (vt_oid and age < limit_sec): - if live_open and age >= limit_sec and is_trading_session(): + if live_open: + if age >= limit_sec and is_trading_session(): cancel_oid = ( vt_oid or live_open.get("vt_order_id") @@ -199,6 +226,21 @@ def reconcile_pending_orders( stats["cancelled"] += 1 continue + # 有委托号但已不在 CTP 活跃列表且无持仓 → 拒单/已撤/终态 + if vt_oid: + if age < PENDING_ORDER_SETTLE_GRACE_SEC: + continue + conn.execute( + "UPDATE trade_order_monitors SET status='closed' WHERE id=?", + (mid,), + ) + stats["closed"] += 1 + logger.info( + "pending monitor=%s order=%s closed (no longer active on CTP)", + mid, vt_oid, + ) + continue + if age >= limit_sec: if vt_oid and is_trading_session(): if ctp_cancel_order(mode, vt_oid): diff --git a/scripts/deploy_pending_fix.py b/scripts/deploy_pending_fix.py new file mode 100644 index 0000000..6f1896c --- /dev/null +++ b/scripts/deploy_pending_fix.py @@ -0,0 +1,95 @@ +"""Deploy pending order reconcile fix and verify stale monitors cleared.""" +import paramiko +import sys +import textwrap +from pathlib import Path + +sys.stdout.reconfigure(encoding="utf-8", errors="replace") +root = Path(__file__).resolve().parents[1] + +FILES = [ + "order_pending.py", + "install_trading.py", +] + +VERIFY = textwrap.dedent( + ''' + import sqlite3, urllib.request, urllib.parse, http.cookiejar, json + + conn = sqlite3.connect("/opt/qihuo/data.db") + conn.row_factory = sqlite3.Row + pending = conn.execute( + "SELECT id, symbol, direction, vt_order_id, status FROM trade_order_monitors WHERE status='pending'" + ).fetchall() + print("pending_monitors", len(pending)) + for r in pending: + print(dict(r)) + + jar = http.cookiejar.CookieJar() + op = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar)) + user = pwd = "" + for line in open("/opt/qihuo/.env"): + line = line.strip() + if line.startswith("ADMIN_USERNAME="): + user = line.split("=", 1)[1].strip().strip('"').strip("'") + if line.startswith("ADMIN_PASSWORD="): + pwd = line.split("=", 1)[1].strip().strip('"').strip("'") + op.open(urllib.request.Request( + "http://127.0.0.1:6600/login", + urllib.parse.urlencode({"username": user, "password": pwd}).encode(), + )) + raw = op.open("http://127.0.0.1:6600/api/trading/live").read().decode() + data = json.loads(raw) + orders = data.get("active_orders") or [] + cj = [o for o in orders if "CJ" in (o.get("symbol_code") or "").upper()] + print("active_orders", len(orders), "cj_orders", len(cj)) + ok = True + if cj: + ok = False + print("FAIL still showing CJ pending in active_orders", cj) + if ok: + print("VERIFY PASS") + ''' +) + + +def main() -> None: + c = paramiko.SSHClient() + c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + c.connect("192.168.8.21", username="root", password="woaini88", timeout=15) + sftp = c.open_sftp() + for rel in FILES: + local = root / rel + remote = f"/opt/qihuo/{rel.replace(chr(92), '/')}" + sftp.put(str(local), remote) + print("uploaded", rel) + sftp.close() + + for cmd in ( + "cd /opt/qihuo && pm2 restart qihuo", + "sleep 4", + ): + print(">>>", cmd) + _, stdout, stderr = c.exec_command(cmd) + out = stdout.read().decode("utf-8", errors="replace") + err = stderr.read().decode("utf-8", errors="replace") + if out.strip(): + print(out.strip()) + if err.strip(): + print(err.strip()) + + sftp = c.open_sftp() + with sftp.open("/tmp/verify_pending_fix.py", "w") as f: + f.write(VERIFY) + sftp.close() + + _, stdout, stderr = c.exec_command("/opt/qihuo/venv/bin/python /tmp/verify_pending_fix.py") + print(stdout.read().decode("utf-8", errors="replace")) + err = stderr.read().decode("utf-8", errors="replace") + if err.strip(): + print(err.strip()) + c.close() + + +if __name__ == "__main__": + main() diff --git a/scripts/verify_pending_server.py b/scripts/verify_pending_server.py new file mode 100644 index 0000000..f443441 --- /dev/null +++ b/scripts/verify_pending_server.py @@ -0,0 +1,55 @@ +import paramiko +import sys + +sys.stdout.reconfigure(encoding="utf-8", errors="replace") + +VERIFY = r""" +import sqlite3, urllib.request, urllib.parse, http.cookiejar, json, os + +db = "/opt/qihuo/futures.db" +print("db", db) + +conn = sqlite3.connect(db) +conn.row_factory = sqlite3.Row +pending = conn.execute( + "SELECT id, symbol, direction, vt_order_id, status FROM trade_order_monitors WHERE status='pending'" +).fetchall() +print("pending_monitors", len(pending)) +for r in pending: + print(dict(r)) + +jar = http.cookiejar.CookieJar() +op = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar)) +user = pwd = "" +for line in open("/opt/qihuo/.env"): + line = line.strip() + if line.startswith("ADMIN_USERNAME="): + user = line.split("=", 1)[1].strip().strip('"').strip("'") + if line.startswith("ADMIN_PASSWORD="): + pwd = line.split("=", 1)[1].strip().strip('"').strip("'") +op.open(urllib.request.Request( + "http://127.0.0.1:6600/login", + urllib.parse.urlencode({"username": user, "password": pwd}).encode(), +)) +raw = op.open("http://127.0.0.1:6600/api/trading/live").read().decode() +data = json.loads(raw) +orders = data.get("active_orders") or [] +cj = [o for o in orders if "CJ" in (o.get("symbol_code") or "").upper()] +print("active_orders", len(orders), "cj_orders", len(cj)) +if cj: + print("cj detail", cj) +else: + print("VERIFY PASS") +""" + +c = paramiko.SSHClient() +c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) +c.connect("192.168.8.21", username="root", password="woaini88", timeout=15) +sftp = c.open_sftp() +with sftp.open("/tmp/verify_pending_fix.py", "w") as f: + f.write(VERIFY) +sftp.close() +_, stdout, stderr = c.exec_command("/opt/qihuo/venv/bin/python /tmp/verify_pending_fix.py") +print(stdout.read().decode("utf-8", errors="replace")) +print(stderr.read().decode("utf-8", errors="replace")) +c.close()