Return order API immediately after CTP submit; finalize reconcile and notify in background.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-03 21:34:34 +08:00
parent 9508d88938
commit b888a670b6
+157 -113
View File
@@ -1829,6 +1829,117 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
stats = {**stats, **{f"roll_{k}": v for k, v in roll_stats.items()}} stats = {**stats, **{f"roll_{k}": v for k, v in roll_stats.items()}}
return stats return stats
def _quick_open_fill_hint(mode: str, sym: str, direction: str) -> bool:
"""内存持仓快速判断是否已成交(不查柜台委托列表)。"""
want = (direction or "long").strip().lower()
try:
for p in _ctp_positions(mode, refresh_if_empty=False, refresh_margin=False):
if int(p.get("lots") or 0) <= 0:
continue
if (p.get("direction") or "long").strip().lower() != want:
continue
if _match_ctp_symbol(p.get("symbol") or "", sym):
return True
except Exception:
pass
return False
def _schedule_open_order_finalize(
*,
mid: int,
mode: str,
sym: str,
direction: str,
lots: int,
price: float,
order_payload: dict,
result: dict,
) -> None:
"""报单后后台:对账、同步监控、微信通知(避免阻塞下单 HTTP)。"""
def _run() -> None:
try:
conn = get_db()
try:
init_strategy_tables(conn)
_sync_trade_monitors_with_ctp(conn, mode)
cap = _capital(conn)
_reconcile_pending(conn, mode, capital=cap)
st_row = conn.execute(
"SELECT status FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
status = (st_row["status"] or "").strip().lower() if st_row else ""
filled = status == "active"
rejected = status == "closed"
if filled:
_sync_monitor_from_ctp(
conn, mid, sym, direction, mode, capital=cap,
)
mon_row = conn.execute(
"SELECT * FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
if mon_row and (order_payload.get("stop_loss") or order_payload.get("take_profit")):
try:
ensure_monitor_order_columns(conn)
cancel_monitor_exit_orders(conn, dict(mon_row), mode=mode)
except Exception as exc:
logger.warning("清理旧版止盈止损挂单失败: %s", exc)
conn.commit()
from modules.core.db_conn import DB_PATH
from modules.notify.ai_worker import schedule_ai_event_analysis
from modules.trading.trade_notify import notify_manual_open_filled
if rejected:
return
if filled:
open_sl = (
float(order_payload.get("stop_loss") or 0)
if order_payload.get("stop_loss")
else None
)
open_tp = None if order_payload.get("trailing_be") else order_payload.get("take_profit")
if open_tp is not None:
try:
open_tp = float(open_tp)
except (TypeError, ValueError):
open_tp = None
codes = ths_to_codes(sym) or {}
if open_sl and open_sl > 0:
notify_manual_open_filled(
send_wechat=send_wechat_msg,
get_setting=get_setting,
mode_label=trading_mode_label(get_setting),
sym=sym,
symbol_name=codes.get("name") or sym,
direction=direction,
entry=price,
sl=open_sl,
tp=open_tp,
lots=lots,
capital=cap,
order_id=str(result.get("order_id") or ""),
trailing_be=bool(order_payload.get("trailing_be")),
be_tick_buffer=get_trailing_be_tick_buffer(get_setting),
schedule_ai_fn=schedule_ai_event_analysis,
db_path=DB_PATH,
)
else:
send_wechat_msg(
f"{trading_mode_label(get_setting)} 开仓 {sym} {direction} {lots}手 @{price}"
)
else:
send_wechat_msg(
f"委托已提交 · {sym} {direction} {lots}手挂单中"
f"{get_pending_order_timeout_sec(get_setting) // 60} 分钟未成交自动撤单)"
)
finally:
conn.close()
_push_position_snapshot_async(fast=True)
except Exception as exc:
logger.warning("open order finalize: %s", exc)
threading.Thread(target=_run, daemon=True, name="open-order-finalize").start()
def _roll_insert_group( def _roll_insert_group(
conn, conn,
*, *,
@@ -3801,6 +3912,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
return jsonify({"ok": False, "error": "手数或价格无效"}), 400 return jsonify({"ok": False, "error": "手数或价格无效"}), 400
order_type = (d.get("order_type") or d.get("price_type") or "limit").strip().lower() order_type = (d.get("order_type") or d.get("price_type") or "limit").strip().lower()
if order_type == "market" and price <= 0: if order_type == "market" and price <= 0:
mode_early = get_trading_mode(get_setting)
if ctp_status(mode_early).get("connected"):
mark = ctp_get_tick_price(mode_early, sym)
if mark and float(mark) > 0:
price = float(mark)
if price <= 0:
codes = ths_to_codes(sym) codes = ths_to_codes(sym)
price = fetch_price( price = fetch_price(
sym, sym,
@@ -3813,7 +3930,6 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
init_strategy_tables(conn) init_strategy_tables(conn)
mode = get_trading_mode(get_setting) mode = get_trading_mode(get_setting)
if offset.startswith("open"): if offset.startswith("open"):
_sync_trade_monitors_with_ctp(conn, mode)
if not is_trading_session(): if not is_trading_session():
conn.close() conn.close()
return jsonify({"ok": False, "error": "不在交易时间段"}), 403 return jsonify({"ok": False, "error": "不在交易时间段"}), 403
@@ -3861,7 +3977,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
lots = get_fixed_lots(get_setting) lots = get_fixed_lots(get_setting)
margin_pct = get_max_margin_pct(get_setting) margin_pct = get_max_margin_pct(get_setting)
usage = calc_margin_usage_pct( usage = calc_margin_usage_pct(
_ctp_positions(mode), _ctp_positions(mode, refresh_if_empty=False, refresh_margin=False),
_capital(conn), _capital(conn),
extra_symbol=sym if offset.startswith("open") else "", extra_symbol=sym if offset.startswith("open") else "",
extra_lots=lots if offset.startswith("open") else 0, extra_lots=lots if offset.startswith("open") else 0,
@@ -3919,126 +4035,54 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
order_price=price, order_price=price,
) )
conn.commit() conn.commit()
try: filled = _quick_open_fill_hint(mode, sym, direction)
with _ctp_td_lock: timeout_min = max(1, get_pending_order_timeout_sec(get_setting) // 60)
get_bridge().refresh_positions()
except Exception:
pass
_reconcile_pending(conn, mode, capital=_capital(conn))
st_row = conn.execute(
"SELECT status FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
filled = st_row and (st_row["status"] or "").strip().lower() == "active"
rejected = st_row and (st_row["status"] or "").strip().lower() == "closed"
if rejected:
conn.commit()
conn.close()
_push_position_snapshot_async(fast=False)
return jsonify({
"ok": False,
"error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)",
"lots": lots,
"filled": False,
}), 400
if not filled:
try:
get_bridge().refresh_positions()
except Exception:
pass
_reconcile_pending(conn, mode, capital=_capital(conn))
st_row = conn.execute(
"SELECT status FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
filled = st_row and (st_row["status"] or "").strip().lower() == "active"
rejected = st_row and (st_row["status"] or "").strip().lower() == "closed"
if rejected:
conn.commit()
conn.close()
_push_position_snapshot_async(fast=False)
return jsonify({
"ok": False,
"error": "委托已被柜台拒绝或撤销(请确认合约状态与交易时段)",
"lots": lots,
"filled": False,
}), 400
if filled:
_sync_monitor_from_ctp(
conn, mid, sym, direction, mode, capital=_capital(conn),
)
mon_row = conn.execute(
"SELECT * FROM trade_order_monitors WHERE id=?", (mid,),
).fetchone()
if mon_row and (sl or tp):
try:
ensure_monitor_order_columns(conn)
cancel_monitor_exit_orders(conn, dict(mon_row), mode=mode)
except Exception as exc:
logger.warning("清理旧版止盈止损挂单失败: %s", exc)
conn.commit()
_push_position_snapshot_async(fast=False)
msg = ( msg = (
f"开仓成功 · {lots}" f"开仓成功 · {lots}"
if filled if filled
else ( else f"委托已提交 · {lots} 手挂单中({timeout_min} 分钟未成交自动撤单)"
f"委托已提交 · {lots} 手挂单中"
f"{get_pending_order_timeout_sec(get_setting) // 60} 分钟未成交自动撤单)"
)
)
conn.commit()
if offset.startswith("open"):
from modules.core.db_conn import DB_PATH
from modules.notify.ai_worker import schedule_ai_event_analysis
from modules.trading.trade_notify import notify_manual_open_filled
if filled:
open_sl = float(d.get("stop_loss") or 0) if d.get("stop_loss") else None
open_tp = None if d.get("trailing_be") else d.get("take_profit")
if open_tp is not None:
try:
open_tp = float(open_tp)
except (TypeError, ValueError):
open_tp = None
codes = ths_to_codes(sym) or {}
if open_sl and open_sl > 0:
notify_manual_open_filled(
send_wechat=send_wechat_msg,
get_setting=get_setting,
mode_label=trading_mode_label(get_setting),
sym=sym,
symbol_name=codes.get("name") or sym,
direction=direction,
entry=price,
sl=open_sl,
tp=open_tp,
lots=lots,
capital=_capital(conn),
order_id=str(result.get("order_id") or ""),
trailing_be=bool(d.get("trailing_be")),
be_tick_buffer=get_trailing_be_tick_buffer(get_setting),
schedule_ai_fn=schedule_ai_event_analysis,
db_path=DB_PATH,
)
else:
send_wechat_msg(
f"{trading_mode_label(get_setting)} 开仓 {sym} {direction} {lots}手 @{price}"
)
elif not filled:
send_wechat_msg(
f"委托已提交 · {sym} {direction} {lots}手挂单中"
f"{get_pending_order_timeout_sec(get_setting) // 60} 分钟未成交自动撤单)"
)
elif not offset.startswith("open"):
send_wechat_msg(
f"{trading_mode_label(get_setting)} {offset} {sym} {direction} {lots}手 @{price}"
) )
conn.close() conn.close()
_push_position_snapshot_async(fast=False) _schedule_open_order_finalize(
mid=mid,
mode=mode,
sym=sym,
direction=direction,
lots=lots,
price=price,
order_payload=dict(d),
result=dict(result),
)
_push_position_snapshot_async(fast=True)
return jsonify({ return jsonify({
"ok": True, "ok": True,
"result": result, "result": result,
"lots": lots, "lots": lots,
"message": msg if offset.startswith("open") else "委托已提交柜台", "message": msg,
"filled": filled if offset.startswith("open") else None, "filled": filled,
})
conn.commit()
msg = "委托已提交柜台"
if not offset.startswith("open"):
mode_label = trading_mode_label(get_setting)
close_msg = f"{mode_label} {offset} {sym} {direction} {lots}手 @{price}"
def _notify_close() -> None:
try:
send_wechat_msg(close_msg)
except Exception as exc:
logger.debug("close wechat notify: %s", exc)
_push_position_snapshot_async(fast=True)
threading.Thread(target=_notify_close, daemon=True, name="close-notify").start()
conn.close()
_push_position_snapshot_async(fast=True)
return jsonify({
"ok": True,
"result": result,
"lots": lots,
"message": msg,
"filled": None,
}) })
except (ValueError, RuntimeError) as exc: except (ValueError, RuntimeError) as exc:
conn.close() conn.close()