diff --git a/install_trading.py b/install_trading.py index 7abf243..3de840a 100644 --- a/install_trading.py +++ b/install_trading.py @@ -41,6 +41,7 @@ from ctp_fee_worker import start_ctp_fee_worker from order_pending import ( cancel_pending_monitor, pending_auto_cancel_remaining, + pending_monitor_has_live_order, reconcile_pending_orders, ) from db_conn import execute_retry @@ -1305,11 +1306,23 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se except Exception as exc: logger.warning("compose ctp order row failed: %s", exc) + ctp_active_map: dict[str, dict] = {} + for o in ctp_orders or []: + for key in (o.get("order_id"), o.get("vt_order_id")): + if key: + ctp_active_map[str(key)] = o + for r in conn.execute( "SELECT * FROM trade_order_monitors WHERE status='pending' ORDER BY id DESC" ).fetchall(): mon = dict(r) try: + if not pending_monitor_has_live_order( + mon, + active_orders=ctp_active_map, + active_order_list=ctp_orders or [], + ): + continue prow = _compose_pending_row( mon, mode=mode, capital=capital, now_iso=now_iso, ) @@ -2207,6 +2220,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "SELECT status FROM trade_order_monitors WHERE id=?", (mid,), ).fetchone() filled = st_row and (st_row["status"] or "").strip().lower() == "active" + rejected = st_row and (st_row["status"] or "").strip().lower() == "closed" + if rejected: + conn.commit() + conn.close() + _push_position_snapshot_async(fast=False) + return jsonify({ + "ok": False, + "error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)", + "lots": lots, + "filled": False, + }), 400 if not filled: try: get_bridge().refresh_positions() @@ -2217,6 +2241,17 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "SELECT status FROM trade_order_monitors WHERE id=?", (mid,), ).fetchone() filled = st_row and (st_row["status"] or "").strip().lower() == "active" + rejected = st_row and (st_row["status"] or "").strip().lower() == "closed" + if rejected: + conn.commit() + conn.close() + _push_position_snapshot_async(fast=False) + return jsonify({ + "ok": False, + "error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)", + "lots": lots, + "filled": False, + }), 400 if filled: _sync_monitor_from_ctp( conn, mid, sym, direction, mode, capital=_capital(conn), diff --git a/order_pending.py b/order_pending.py index 949a9c5..4f703fc 100644 --- a/order_pending.py +++ b/order_pending.py @@ -19,6 +19,33 @@ logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") DEFAULT_PENDING_ORDER_TIMEOUT_SEC = 300 +# 报单刚提交后短暂等待 CTP 回报,避免误判为拒单 +PENDING_ORDER_SETTLE_GRACE_SEC = 8 + + +def pending_monitor_has_live_order( + mon: dict, + *, + active_orders: dict[str, dict], + active_order_list: list[dict], + match_fn: Callable[[str, str], bool] | None = None, +) -> bool: + """本地 pending 是否仍对应 CTP 柜台上的有效开仓委托。""" + match = match_fn or _match_symbol + sym = mon.get("symbol") or "" + direction = mon.get("direction") or "long" + vt_oid = (mon.get("vt_order_id") or "").strip() + age = pending_age_sec(mon) + + if vt_oid and _vt_order_in_active(vt_oid, active_orders): + return True + if _symbol_open_order_active(active_order_list, sym, direction, match): + return True + if not vt_oid and age < PENDING_ORDER_SETTLE_GRACE_SEC: + return True + if vt_oid and age < PENDING_ORDER_SETTLE_GRACE_SEC: + return True + return False def parse_monitor_ts(raw: str) -> Optional[float]: @@ -183,8 +210,8 @@ def reconcile_pending_orders( continue live_open = _symbol_open_order_active(active_order_list, sym, direction, match) - if live_open or (vt_oid and age < limit_sec): - if live_open and age >= limit_sec and is_trading_session(): + if live_open: + if age >= limit_sec and is_trading_session(): cancel_oid = ( vt_oid or live_open.get("vt_order_id") @@ -199,6 +226,21 @@ def reconcile_pending_orders( stats["cancelled"] += 1 continue + # 有委托号但已不在 CTP 活跃列表且无持仓 → 拒单/已撤/终态 + if vt_oid: + if age < PENDING_ORDER_SETTLE_GRACE_SEC: + continue + conn.execute( + "UPDATE trade_order_monitors SET status='closed' WHERE id=?", + (mid,), + ) + stats["closed"] += 1 + logger.info( + "pending monitor=%s order=%s closed (no longer active on CTP)", + mid, vt_oid, + ) + continue + if age >= limit_sec: if vt_oid and is_trading_session(): if ctp_cancel_order(mode, vt_oid): diff --git a/scripts/deploy_pending_fix.py b/scripts/deploy_pending_fix.py new file mode 100644 index 0000000..6f1896c --- /dev/null +++ b/scripts/deploy_pending_fix.py @@ -0,0 +1,95 @@ +"""Deploy pending order reconcile fix and verify stale monitors cleared.""" +import paramiko +import sys +import textwrap +from pathlib import Path + +sys.stdout.reconfigure(encoding="utf-8", errors="replace") +root = Path(__file__).resolve().parents[1] + +FILES = [ + "order_pending.py", + "install_trading.py", +] + +VERIFY = textwrap.dedent( + ''' + import sqlite3, urllib.request, urllib.parse, http.cookiejar, json + + conn = sqlite3.connect("/opt/qihuo/data.db") + conn.row_factory = sqlite3.Row + pending = conn.execute( + "SELECT id, symbol, direction, vt_order_id, status FROM trade_order_monitors WHERE status='pending'" + ).fetchall() + print("pending_monitors", len(pending)) + for r in pending: + print(dict(r)) + + jar = http.cookiejar.CookieJar() + op = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar)) + user = pwd = "" + for line in open("/opt/qihuo/.env"): + line = line.strip() + if line.startswith("ADMIN_USERNAME="): + user = line.split("=", 1)[1].strip().strip('"').strip("'") + if line.startswith("ADMIN_PASSWORD="): + pwd = line.split("=", 1)[1].strip().strip('"').strip("'") + op.open(urllib.request.Request( + "http://127.0.0.1:6600/login", + urllib.parse.urlencode({"username": user, "password": pwd}).encode(), + )) + raw = op.open("http://127.0.0.1:6600/api/trading/live").read().decode() + data = json.loads(raw) + orders = data.get("active_orders") or [] + cj = [o for o in orders if "CJ" in (o.get("symbol_code") or "").upper()] + print("active_orders", len(orders), "cj_orders", len(cj)) + ok = True + if cj: + ok = False + print("FAIL still showing CJ pending in active_orders", cj) + if ok: + print("VERIFY PASS") + ''' +) + + +def main() -> None: + c = paramiko.SSHClient() + c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + c.connect("192.168.8.21", username="root", password="woaini88", timeout=15) + sftp = c.open_sftp() + for rel in FILES: + local = root / rel + remote = f"/opt/qihuo/{rel.replace(chr(92), '/')}" + sftp.put(str(local), remote) + print("uploaded", rel) + sftp.close() + + for cmd in ( + "cd /opt/qihuo && pm2 restart qihuo", + "sleep 4", + ): + print(">>>", cmd) + _, stdout, stderr = c.exec_command(cmd) + out = stdout.read().decode("utf-8", errors="replace") + err = stderr.read().decode("utf-8", errors="replace") + if out.strip(): + print(out.strip()) + if err.strip(): + print(err.strip()) + + sftp = c.open_sftp() + with sftp.open("/tmp/verify_pending_fix.py", "w") as f: + f.write(VERIFY) + sftp.close() + + _, stdout, stderr = c.exec_command("/opt/qihuo/venv/bin/python /tmp/verify_pending_fix.py") + print(stdout.read().decode("utf-8", errors="replace")) + err = stderr.read().decode("utf-8", errors="replace") + if err.strip(): + print(err.strip()) + c.close() + + +if __name__ == "__main__": + main() diff --git a/scripts/verify_pending_server.py b/scripts/verify_pending_server.py new file mode 100644 index 0000000..f443441 --- /dev/null +++ b/scripts/verify_pending_server.py @@ -0,0 +1,55 @@ +import paramiko +import sys + +sys.stdout.reconfigure(encoding="utf-8", errors="replace") + +VERIFY = r""" +import sqlite3, urllib.request, urllib.parse, http.cookiejar, json, os + +db = "/opt/qihuo/futures.db" +print("db", db) + +conn = sqlite3.connect(db) +conn.row_factory = sqlite3.Row +pending = conn.execute( + "SELECT id, symbol, direction, vt_order_id, status FROM trade_order_monitors WHERE status='pending'" +).fetchall() +print("pending_monitors", len(pending)) +for r in pending: + print(dict(r)) + +jar = http.cookiejar.CookieJar() +op = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar)) +user = pwd = "" +for line in open("/opt/qihuo/.env"): + line = line.strip() + if line.startswith("ADMIN_USERNAME="): + user = line.split("=", 1)[1].strip().strip('"').strip("'") + if line.startswith("ADMIN_PASSWORD="): + pwd = line.split("=", 1)[1].strip().strip('"').strip("'") +op.open(urllib.request.Request( + "http://127.0.0.1:6600/login", + urllib.parse.urlencode({"username": user, "password": pwd}).encode(), +)) +raw = op.open("http://127.0.0.1:6600/api/trading/live").read().decode() +data = json.loads(raw) +orders = data.get("active_orders") or [] +cj = [o for o in orders if "CJ" in (o.get("symbol_code") or "").upper()] +print("active_orders", len(orders), "cj_orders", len(cj)) +if cj: + print("cj detail", cj) +else: + print("VERIFY PASS") +""" + +c = paramiko.SSHClient() +c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) +c.connect("192.168.8.21", username="root", password="woaini88", timeout=15) +sftp = c.open_sftp() +with sftp.open("/tmp/verify_pending_fix.py", "w") as f: + f.write(VERIFY) +sftp.close() +_, stdout, stderr = c.exec_command("/opt/qihuo/venv/bin/python /tmp/verify_pending_fix.py") +print(stdout.read().decode("utf-8", errors="replace")) +print(stderr.read().decode("utf-8", errors="replace")) +c.close()