Fix stale pending orders after CTP rejection or cancel.

When the exchange rejects or cancels an order, close local pending monitors once the order leaves CTP active list instead of waiting for the full timeout.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-26 22:21:15 +08:00
parent fd49b08c08
commit f2940d41e9
4 changed files with 229 additions and 2 deletions
+35
View File
@@ -41,6 +41,7 @@ from ctp_fee_worker import start_ctp_fee_worker
from order_pending import (
cancel_pending_monitor,
pending_auto_cancel_remaining,
pending_monitor_has_live_order,
reconcile_pending_orders,
)
from db_conn import execute_retry
@@ -1305,11 +1306,23 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
except Exception as exc:
logger.warning("compose ctp order row failed: %s", exc)
ctp_active_map: dict[str, dict] = {}
for o in ctp_orders or []:
for key in (o.get("order_id"), o.get("vt_order_id")):
if key:
ctp_active_map[str(key)] = o
for r in conn.execute(
"SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC"
).fetchall():
mon = dict(r)
try:
if not pending_monitor_has_live_order(
mon,
active_orders=ctp_active_map,
active_order_list=ctp_orders or [],
):
continue
prow = _compose_pending_row(
mon, mode=mode, capital=capital, now_iso=now_iso,
)
@@ -2207,6 +2220,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
"SELECT status FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
filled = st_row and (st_row["status"] or "").strip().lower() == "active"
rejected = st_row and (st_row["status"] or "").strip().lower() == "closed"
if rejected:
conn.commit()
conn.close()
_push_position_snapshot_async(fast=False)
return jsonify({
"ok": False,
"error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)",
"lots": lots,
"filled": False,
}), 400
if not filled:
try:
get_bridge().refresh_positions()
@@ -2217,6 +2241,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
"SELECT status FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
filled = st_row and (st_row["status"] or "").strip().lower() == "active"
rejected = st_row and (st_row["status"] or "").strip().lower() == "closed"
if rejected:
conn.commit()
conn.close()
_push_position_snapshot_async(fast=False)
return jsonify({
"ok": False,
"error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)",
"lots": lots,
"filled": False,
}), 400
if filled:
_sync_monitor_from_ctp(
conn, mid, sym, direction, mode, capital=_capital(conn),
+44 -2
View File
@@ -19,6 +19,33 @@ logger = logging.getLogger(__name__)
TZ = ZoneInfo("Asia/Shanghai")
DEFAULT_PENDING_ORDER_TIMEOUT_SEC = 300
# 报单刚提交后短暂等待 CTP 回报,避免误判为拒单
PENDING_ORDER_SETTLE_GRACE_SEC = 8
def pending_monitor_has_live_order(
mon: dict,
*,
active_orders: dict[str, dict],
active_order_list: list[dict],
match_fn: Callable[[str, str], bool] | None = None,
) -> bool:
"""本地 pending 是否仍对应 CTP 柜台上的有效开仓委托。"""
match = match_fn or _match_symbol
sym = mon.get("symbol") or ""
direction = mon.get("direction") or "long"
vt_oid = (mon.get("vt_order_id") or "").strip()
age = pending_age_sec(mon)
if vt_oid and _vt_order_in_active(vt_oid, active_orders):
return True
if _symbol_open_order_active(active_order_list, sym, direction, match):
return True
if not vt_oid and age < PENDING_ORDER_SETTLE_GRACE_SEC:
return True
if vt_oid and age < PENDING_ORDER_SETTLE_GRACE_SEC:
return True
return False
def parse_monitor_ts(raw: str) -> Optional[float]:
@@ -183,8 +210,8 @@ def reconcile_pending_orders(
continue
live_open = _symbol_open_order_active(active_order_list, sym, direction, match)
if live_open or (vt_oid and age < limit_sec):
if live_open and age >= limit_sec and is_trading_session():
if live_open:
if age >= limit_sec and is_trading_session():
cancel_oid = (
vt_oid
or live_open.get("vt_order_id")
@@ -199,6 +226,21 @@ def reconcile_pending_orders(
stats["cancelled"] += 1
continue
# 有委托号但已不在 CTP 活跃列表且无持仓 → 拒单/已撤/终态
if vt_oid:
if age < PENDING_ORDER_SETTLE_GRACE_SEC:
continue
conn.execute(
"UPDATE trade_order_monitors SET status='closed' WHERE id=?",
(mid,),
)
stats["closed"] += 1
logger.info(
"pending monitor=%s order=%s closed (no longer active on CTP)",
mid, vt_oid,
)
continue
if age >= limit_sec:
if vt_oid and is_trading_session():
if ctp_cancel_order(mode, vt_oid):
+95
View File
@@ -0,0 +1,95 @@
"""Deploy pending order reconcile fix and verify stale monitors cleared."""
import paramiko
import sys
import textwrap
from pathlib import Path
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
root = Path(__file__).resolve().parents[1]
FILES = [
"order_pending.py",
"install_trading.py",
]
VERIFY = textwrap.dedent(
'''
import sqlite3, urllib.request, urllib.parse, http.cookiejar, json
conn = sqlite3.connect("/opt/qihuo/data.db")
conn.row_factory = sqlite3.Row
pending = conn.execute(
"SELECT id, symbol, direction, vt_order_id, status FROM trade_order_monitors WHERE status='pending'"
).fetchall()
print("pending_monitors", len(pending))
for r in pending:
print(dict(r))
jar = http.cookiejar.CookieJar()
op = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar))
user = pwd = ""
for line in open("/opt/qihuo/.env"):
line = line.strip()
if line.startswith("ADMIN_USERNAME="):
user = line.split("=", 1)[1].strip().strip('"').strip("'")
if line.startswith("ADMIN_PASSWORD="):
pwd = line.split("=", 1)[1].strip().strip('"').strip("'")
op.open(urllib.request.Request(
"http://127.0.0.1:6600/login",
urllib.parse.urlencode({"username": user, "password": pwd}).encode(),
))
raw = op.open("http://127.0.0.1:6600/api/trading/live").read().decode()
data = json.loads(raw)
orders = data.get("active_orders") or []
cj = [o for o in orders if "CJ" in (o.get("symbol_code") or "").upper()]
print("active_orders", len(orders), "cj_orders", len(cj))
ok = True
if cj:
ok = False
print("FAIL still showing CJ pending in active_orders", cj)
if ok:
print("VERIFY PASS")
'''
)
def main() -> None:
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("192.168.8.21", username="root", password="woaini88", timeout=15)
sftp = c.open_sftp()
for rel in FILES:
local = root / rel
remote = f"/opt/qihuo/{rel.replace(chr(92), '/')}"
sftp.put(str(local), remote)
print("uploaded", rel)
sftp.close()
for cmd in (
"cd /opt/qihuo && pm2 restart qihuo",
"sleep 4",
):
print(">>>", cmd)
_, stdout, stderr = c.exec_command(cmd)
out = stdout.read().decode("utf-8", errors="replace")
err = stderr.read().decode("utf-8", errors="replace")
if out.strip():
print(out.strip())
if err.strip():
print(err.strip())
sftp = c.open_sftp()
with sftp.open("/tmp/verify_pending_fix.py", "w") as f:
f.write(VERIFY)
sftp.close()
_, stdout, stderr = c.exec_command("/opt/qihuo/venv/bin/python /tmp/verify_pending_fix.py")
print(stdout.read().decode("utf-8", errors="replace"))
err = stderr.read().decode("utf-8", errors="replace")
if err.strip():
print(err.strip())
c.close()
if __name__ == "__main__":
main()
+55
View File
@@ -0,0 +1,55 @@
import paramiko
import sys
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
VERIFY = r"""
import sqlite3, urllib.request, urllib.parse, http.cookiejar, json, os
db = "/opt/qihuo/futures.db"
print("db", db)
conn = sqlite3.connect(db)
conn.row_factory = sqlite3.Row
pending = conn.execute(
"SELECT id, symbol, direction, vt_order_id, status FROM trade_order_monitors WHERE status='pending'"
).fetchall()
print("pending_monitors", len(pending))
for r in pending:
print(dict(r))
jar = http.cookiejar.CookieJar()
op = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar))
user = pwd = ""
for line in open("/opt/qihuo/.env"):
line = line.strip()
if line.startswith("ADMIN_USERNAME="):
user = line.split("=", 1)[1].strip().strip('"').strip("'")
if line.startswith("ADMIN_PASSWORD="):
pwd = line.split("=", 1)[1].strip().strip('"').strip("'")
op.open(urllib.request.Request(
"http://127.0.0.1:6600/login",
urllib.parse.urlencode({"username": user, "password": pwd}).encode(),
))
raw = op.open("http://127.0.0.1:6600/api/trading/live").read().decode()
data = json.loads(raw)
orders = data.get("active_orders") or []
cj = [o for o in orders if "CJ" in (o.get("symbol_code") or "").upper()]
print("active_orders", len(orders), "cj_orders", len(cj))
if cj:
print("cj detail", cj)
else:
print("VERIFY PASS")
"""
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("192.168.8.21", username="root", password="woaini88", timeout=15)
sftp = c.open_sftp()
with sftp.open("/tmp/verify_pending_fix.py", "w") as f:
f.write(VERIFY)
sftp.close()
_, stdout, stderr = c.exec_command("/opt/qihuo/venv/bin/python /tmp/verify_pending_fix.py")
print(stdout.read().decode("utf-8", errors="replace"))
print(stderr.read().decode("utf-8", errors="replace"))
c.close()