diff --git a/modules/trading/install.py b/modules/trading/install.py index c9a4e73..0414944 100644 --- a/modules/trading/install.py +++ b/modules/trading/install.py @@ -1829,6 +1829,117 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se stats = {**stats, **{f"roll_{k}": v for k, v in roll_stats.items()}} return stats + def _quick_open_fill_hint(mode: str, sym: str, direction: str) -> bool: + """内存持仓快速判断是否已成交(不查柜台委托列表)。""" + want = (direction or "long").strip().lower() + try: + for p in _ctp_positions(mode, refresh_if_empty=False, refresh_margin=False): + if int(p.get("lots") or 0) <= 0: + continue + if (p.get("direction") or "long").strip().lower() != want: + continue + if _match_ctp_symbol(p.get("symbol") or "", sym): + return True + except Exception: + pass + return False + + def _schedule_open_order_finalize( + *, + mid: int, + mode: str, + sym: str, + direction: str, + lots: int, + price: float, + order_payload: dict, + result: dict, + ) -> None: + """报单后后台:对账、同步监控、微信通知(避免阻塞下单 HTTP)。""" + + def _run() -> None: + try: + conn = get_db() + try: + init_strategy_tables(conn) + _sync_trade_monitors_with_ctp(conn, mode) + cap = _capital(conn) + _reconcile_pending(conn, mode, capital=cap) + st_row = conn.execute( + "SELECT status FROM trade_order_monitors WHERE id=?", (mid,), + ).fetchone() + status = (st_row["status"] or "").strip().lower() if st_row else "" + filled = status == "active" + rejected = status == "closed" + if filled: + _sync_monitor_from_ctp( + conn, mid, sym, direction, mode, capital=cap, + ) + mon_row = conn.execute( + "SELECT * FROM trade_order_monitors WHERE id=?", (mid,), + ).fetchone() + if mon_row and (order_payload.get("stop_loss") or order_payload.get("take_profit")): + try: + ensure_monitor_order_columns(conn) + cancel_monitor_exit_orders(conn, dict(mon_row), mode=mode) + except Exception as exc: + logger.warning("清理旧版止盈止损挂单失败: %s", exc) + conn.commit() + from modules.core.db_conn import DB_PATH + from modules.notify.ai_worker import schedule_ai_event_analysis + from modules.trading.trade_notify import notify_manual_open_filled + + if rejected: + return + if filled: + open_sl = ( + float(order_payload.get("stop_loss") or 0) + if order_payload.get("stop_loss") + else None + ) + open_tp = None if order_payload.get("trailing_be") else order_payload.get("take_profit") + if open_tp is not None: + try: + open_tp = float(open_tp) + except (TypeError, ValueError): + open_tp = None + codes = ths_to_codes(sym) or {} + if open_sl and open_sl > 0: + notify_manual_open_filled( + send_wechat=send_wechat_msg, + get_setting=get_setting, + mode_label=trading_mode_label(get_setting), + sym=sym, + symbol_name=codes.get("name") or sym, + direction=direction, + entry=price, + sl=open_sl, + tp=open_tp, + lots=lots, + capital=cap, + order_id=str(result.get("order_id") or ""), + trailing_be=bool(order_payload.get("trailing_be")), + be_tick_buffer=get_trailing_be_tick_buffer(get_setting), + schedule_ai_fn=schedule_ai_event_analysis, + db_path=DB_PATH, + ) + else: + send_wechat_msg( + f"{trading_mode_label(get_setting)} 开仓 {sym} {direction} {lots}手 @{price}" + ) + else: + send_wechat_msg( + f"委托已提交 · {sym} {direction} {lots}手挂单中" + f"({get_pending_order_timeout_sec(get_setting) // 60} 分钟未成交自动撤单)" + ) + finally: + conn.close() + _push_position_snapshot_async(fast=True) + except Exception as exc: + logger.warning("open order finalize: %s", exc) + + threading.Thread(target=_run, daemon=True, name="open-order-finalize").start() + def _roll_insert_group( conn, *, @@ -3801,19 +3912,24 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se return jsonify({"ok": False, "error": "手数或价格无效"}), 400 order_type = (d.get("order_type") or d.get("price_type") or "limit").strip().lower() if order_type == "market" and price <= 0: - codes = ths_to_codes(sym) - price = fetch_price( - sym, - codes.get("market_code", "") if codes else "", - codes.get("sina_code", "") if codes else "", - ) or 0 + mode_early = get_trading_mode(get_setting) + if ctp_status(mode_early).get("connected"): + mark = ctp_get_tick_price(mode_early, sym) + if mark and float(mark) > 0: + price = float(mark) + if price <= 0: + codes = ths_to_codes(sym) + price = fetch_price( + sym, + codes.get("market_code", "") if codes else "", + codes.get("sina_code", "") if codes else "", + ) or 0 if not sym or price <= 0: return jsonify({"ok": False, "error": "品种或价格无效"}), 400 conn = get_db() init_strategy_tables(conn) mode = get_trading_mode(get_setting) if offset.startswith("open"): - _sync_trade_monitors_with_ctp(conn, mode) if not is_trading_session(): conn.close() return jsonify({"ok": False, "error": "不在交易时间段"}), 403 @@ -3861,7 +3977,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se lots = get_fixed_lots(get_setting) margin_pct = get_max_margin_pct(get_setting) usage = calc_margin_usage_pct( - _ctp_positions(mode), + _ctp_positions(mode, refresh_if_empty=False, refresh_margin=False), _capital(conn), extra_symbol=sym if offset.startswith("open") else "", extra_lots=lots if offset.startswith("open") else 0, @@ -3919,126 +4035,54 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se order_price=price, ) conn.commit() - try: - with _ctp_td_lock: - get_bridge().refresh_positions() - except Exception: - pass - _reconcile_pending(conn, mode, capital=_capital(conn)) - st_row = conn.execute( - "SELECT status FROM trade_order_monitors WHERE id=?", (mid,), - ).fetchone() - filled = st_row and (st_row["status"] or "").strip().lower() == "active" - rejected = st_row and (st_row["status"] or "").strip().lower() == "closed" - if rejected: - conn.commit() - conn.close() - _push_position_snapshot_async(fast=False) - return jsonify({ - "ok": False, - "error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)", - "lots": lots, - "filled": False, - }), 400 - if not filled: - try: - get_bridge().refresh_positions() - except Exception: - pass - _reconcile_pending(conn, mode, capital=_capital(conn)) - st_row = conn.execute( - "SELECT status FROM trade_order_monitors WHERE id=?", (mid,), - ).fetchone() - filled = st_row and (st_row["status"] or "").strip().lower() == "active" - rejected = st_row and (st_row["status"] or "").strip().lower() == "closed" - if rejected: - conn.commit() - conn.close() - _push_position_snapshot_async(fast=False) - return jsonify({ - "ok": False, - "error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)", - "lots": lots, - "filled": False, - }), 400 - if filled: - _sync_monitor_from_ctp( - conn, mid, sym, direction, mode, capital=_capital(conn), - ) - mon_row = conn.execute( - "SELECT * FROM trade_order_monitors WHERE id=?", (mid,), - ).fetchone() - if mon_row and (sl or tp): - try: - ensure_monitor_order_columns(conn) - cancel_monitor_exit_orders(conn, dict(mon_row), mode=mode) - except Exception as exc: - logger.warning("清理旧版止盈止损挂单失败: %s", exc) - conn.commit() - _push_position_snapshot_async(fast=False) + filled = _quick_open_fill_hint(mode, sym, direction) + timeout_min = max(1, get_pending_order_timeout_sec(get_setting) // 60) msg = ( f"开仓成功 · {lots} 手" if filled - else ( - f"委托已提交 · {lots} 手挂单中" - f"({get_pending_order_timeout_sec(get_setting) // 60} 分钟未成交自动撤单)" - ) + else f"委托已提交 · {lots} 手挂单中({timeout_min} 分钟未成交自动撤单)" ) + conn.close() + _schedule_open_order_finalize( + mid=mid, + mode=mode, + sym=sym, + direction=direction, + lots=lots, + price=price, + order_payload=dict(d), + result=dict(result), + ) + _push_position_snapshot_async(fast=True) + return jsonify({ + "ok": True, + "result": result, + "lots": lots, + "message": msg, + "filled": filled, + }) conn.commit() - if offset.startswith("open"): - from modules.core.db_conn import DB_PATH - from modules.notify.ai_worker import schedule_ai_event_analysis - from modules.trading.trade_notify import notify_manual_open_filled + msg = "委托已提交柜台" + if not offset.startswith("open"): + mode_label = trading_mode_label(get_setting) + close_msg = f"{mode_label} {offset} {sym} {direction} {lots}手 @{price}" - if filled: - open_sl = float(d.get("stop_loss") or 0) if d.get("stop_loss") else None - open_tp = None if d.get("trailing_be") else d.get("take_profit") - if open_tp is not None: - try: - open_tp = float(open_tp) - except (TypeError, ValueError): - open_tp = None - codes = ths_to_codes(sym) or {} - if open_sl and open_sl > 0: - notify_manual_open_filled( - send_wechat=send_wechat_msg, - get_setting=get_setting, - mode_label=trading_mode_label(get_setting), - sym=sym, - symbol_name=codes.get("name") or sym, - direction=direction, - entry=price, - sl=open_sl, - tp=open_tp, - lots=lots, - capital=_capital(conn), - order_id=str(result.get("order_id") or ""), - trailing_be=bool(d.get("trailing_be")), - be_tick_buffer=get_trailing_be_tick_buffer(get_setting), - schedule_ai_fn=schedule_ai_event_analysis, - db_path=DB_PATH, - ) - else: - send_wechat_msg( - f"{trading_mode_label(get_setting)} 开仓 {sym} {direction} {lots}手 @{price}" - ) - elif not filled: - send_wechat_msg( - f"委托已提交 · {sym} {direction} {lots}手挂单中" - f"({get_pending_order_timeout_sec(get_setting) // 60} 分钟未成交自动撤单)" - ) - elif not offset.startswith("open"): - send_wechat_msg( - f"{trading_mode_label(get_setting)} {offset} {sym} {direction} {lots}手 @{price}" - ) + def _notify_close() -> None: + try: + send_wechat_msg(close_msg) + except Exception as exc: + logger.debug("close wechat notify: %s", exc) + _push_position_snapshot_async(fast=True) + + threading.Thread(target=_notify_close, daemon=True, name="close-notify").start() conn.close() - _push_position_snapshot_async(fast=False) + _push_position_snapshot_async(fast=True) return jsonify({ "ok": True, "result": result, "lots": lots, - "message": msg if offset.startswith("open") else "委托已提交柜台", - "filled": filled if offset.startswith("open") else None, + "message": msg, + "filled": None, }) except (ValueError, RuntimeError) as exc: conn.close()