Fix roll first-lots display and make market add use pending orders.

Store initial_lots on roll groups, submit market roll as CTP pending legs with cancel closing empty groups, and backfill first-lots for existing active groups.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-02 22:12:04 +08:00
parent 3c53b2063f
commit 530738ae93
5 changed files with 392 additions and 105 deletions
+355 -95
View File
@@ -93,6 +93,7 @@ from strategy.strategy_roll_lib import (
from strategy.strategy_roll_monitor_lib import (
cancel_roll_leg,
check_roll_monitors,
close_roll_group_if_idle,
roll_sync_after_external_close,
)
from strategy.strategy_snapshot_lib import list_snapshots, save_snapshot
@@ -1802,7 +1803,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
}
def _reconcile_pending(conn, mode: str, *, capital: float = 0.0) -> dict[str, int]:
return reconcile_pending_orders(
stats = reconcile_pending_orders(
conn,
mode,
match_symbol_fn=_match_ctp_symbol,
@@ -1811,6 +1812,310 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
list_positions_fn=_ctp_positions,
timeout_sec=get_pending_order_timeout_sec(get_setting),
)
roll_stats = _reconcile_roll_market_orders(conn, mode, capital=capital)
if roll_stats:
stats = {**stats, **{f"roll_{k}": v for k, v in roll_stats.items()}}
return stats
def _roll_insert_group(
conn,
*,
mon: dict,
preview: dict,
initial_lots: int,
leg_count: int = 0,
) -> int:
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
risk_budget = float(preview.get("risk_budget") or get_fixed_amount(get_setting))
cur = conn.execute(
"""INSERT INTO roll_groups (
order_monitor_id, symbol, direction, initial_take_profit, initial_stop_loss,
current_stop_loss, risk_percent, leg_count, initial_lots, status, created_at, updated_at
) VALUES (?,?,?,?,?,?,?,?,?,'active',?,?) RETURNING id""",
(
int(mon["id"]), mon["symbol"], mon["direction"], mon["take_profit"], mon["stop_loss"],
preview["new_stop_loss"], risk_budget, int(leg_count), int(initial_lots), now, now,
),
)
row = cur.fetchone()
return int(row["id"] if isinstance(row, dict) else row[0])
def _finalize_roll_fill(
conn,
*,
mon: dict,
preview: dict,
add_mode: str,
pending_leg_id: Optional[int] = None,
fill_price: Optional[float] = None,
) -> tuple[bool, str]:
sym = mon["symbol"]
mon_id = int(mon["id"])
mode = get_trading_mode(get_setting)
price = float(fill_price if fill_price is not None else preview["add_price"])
new_lots = int(mon["lots"]) + int(preview["add_lots"])
new_avg = preview["avg_entry_after"]
new_sl = preview["new_stop_loss"]
conn.execute(
"UPDATE trade_order_monitors SET lots=?, entry_price=?, stop_loss=? WHERE id=?",
(new_lots, new_avg, new_sl, mon_id),
)
grp = _roll_group_for_monitor(conn, mon_id)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if grp:
gid = int(grp["id"])
leg_n = int(grp["leg_count"] or 0) + 1
conn.execute(
"UPDATE roll_groups SET leg_count=?, current_stop_loss=?, updated_at=? WHERE id=?",
(leg_n, new_sl, now, gid),
)
else:
gid = _roll_insert_group(
conn,
mon=mon,
preview=preview,
initial_lots=max(1, int(mon["lots"]) - int(preview["add_lots"])),
leg_count=1,
)
leg_n = 1
if pending_leg_id:
conn.execute(
"""UPDATE roll_legs SET status=?, fill_price=?, lots=?, new_stop_loss=?, created_at=?
WHERE id=?""",
(
LEG_STATUS_FILLED, price, int(preview["add_lots"]), new_sl, now,
int(pending_leg_id),
),
)
else:
conn.execute(
"""INSERT INTO roll_legs (
roll_group_id, leg_index, add_mode, fill_price, lots, new_stop_loss,
status, created_at, limit_price, breakthrough_price, last_mark_price, capital_snapshot
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(
gid, leg_n, add_mode, price, int(preview["add_lots"]), new_sl,
LEG_STATUS_FILLED, now,
preview.get("limit_price"), preview.get("breakthrough_price"),
preview.get("mark_price"), _capital(conn),
),
)
conn.commit()
send_wechat_msg(
f"滚仓成交 {sym} {add_mode_label(add_mode)} +{preview['add_lots']}"
f"新止损 {new_sl} 合计 {new_lots}"
)
_schedule_roll_entry_sync(mon_id, sym, mon["direction"], mode)
return True, "成交"
def _commit_roll_fill(
conn,
*,
mon: dict,
preview: dict,
add_mode: str,
mode: str,
pending_leg_id: Optional[int] = None,
) -> tuple[bool, str]:
sym = mon["symbol"]
price = float(preview["add_price"])
try:
execute_order(
conn, mode=mode, offset="open", symbol=sym,
direction=mon["direction"], lots=int(preview["add_lots"]), price=price,
settings=_settings_dict(),
order_type="market",
)
except ValueError as exc:
return False, str(exc)
return _finalize_roll_fill(
conn,
mon=mon,
preview=preview,
add_mode=add_mode,
pending_leg_id=pending_leg_id,
fill_price=price,
)
def _submit_roll_market_order(
conn,
*,
mon: dict,
preview: dict,
mode: str,
) -> tuple[bool, str]:
mon_id = int(mon["id"])
initial_lots = int(mon.get("lots") or 0)
if initial_lots <= 0:
return False, "持仓手数为 0"
grp = _roll_group_for_monitor(conn, mon_id)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
if grp:
gid = int(grp["id"])
else:
gid = _roll_insert_group(
conn, mon=mon, preview=preview, initial_lots=initial_lots, leg_count=0,
)
pending_n = conn.execute(
"SELECT COUNT(*) AS n FROM roll_legs WHERE roll_group_id=? AND status=?",
(gid, LEG_STATUS_PENDING),
).fetchone()["n"]
if int(pending_n or 0) > 0:
return False, "已有监控中的加仓腿"
try:
result = execute_order(
conn,
mode=mode,
offset="open",
symbol=mon["symbol"],
direction=mon["direction"],
lots=int(preview["add_lots"]),
price=float(preview["add_price"]),
settings=_settings_dict(),
order_type="market",
)
vt_order_id = str(result.get("order_id") or "")
except ValueError as exc:
close_roll_group_if_idle(conn, gid)
return False, str(exc)
leg_n = int(conn.execute(
"SELECT COUNT(*) AS n FROM roll_legs WHERE roll_group_id=? AND status=?",
(gid, LEG_STATUS_FILLED),
).fetchone()["n"]) + 1
conn.execute(
"""INSERT INTO roll_legs (
roll_group_id, leg_index, add_mode, lots, new_stop_loss, status, created_at,
limit_price, breakthrough_price, last_mark_price, capital_snapshot, vt_order_id
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(
gid, leg_n, ADD_MODE_MARKET, int(preview["add_lots"]), preview["new_stop_loss"],
LEG_STATUS_PENDING, now,
preview.get("limit_price"), preview.get("breakthrough_price"),
preview.get("mark_price"), _capital(conn), vt_order_id or None,
),
)
conn.commit()
_reconcile_roll_market_orders(conn, mode, capital=_capital(conn))
return True, "市价加仓委托已提交,成交后自动更新;撤单后滚仓组将消失"
def _reconcile_roll_market_orders(
conn, mode: str, *, capital: float = 0.0,
) -> dict[str, int]:
"""同步市价滚仓 pending 腿:成交→入账;撤单/拒单→删除腿并关闭空组。"""
from modules.trading.order_pending import PENDING_ORDER_SETTLE_GRACE_SEC
stats = {"filled": 0, "cancelled": 0}
if not ctp_status(mode).get("connected"):
return stats
rows = conn.execute(
"""SELECT l.*, g.order_monitor_id, g.symbol, g.direction, g.initial_take_profit,
g.leg_count AS group_leg_count,
m.lots AS mon_lots, m.entry_price AS mon_entry, m.take_profit AS mon_tp,
m.status AS mon_status
FROM roll_legs l
JOIN roll_groups g ON g.id = l.roll_group_id
JOIN trade_order_monitors m ON m.id = g.order_monitor_id
WHERE l.status=? AND l.add_mode=? AND g.status='active'""",
(LEG_STATUS_PENDING, ADD_MODE_MARKET),
).fetchall()
if not rows:
return stats
positions = list(_ctp_positions(mode, refresh_if_empty=False) or [])
try:
active_orders = {
str(k): o
for o in (ctp_list_active_orders(mode) or [])
for k in (o.get("order_id"), o.get("vt_order_id"))
if k
}
except Exception:
active_orders = {}
now_ts = time.time()
for raw in rows:
leg = dict(raw)
if (leg.get("mon_status") or "").strip().lower() != "active":
continue
sym = (leg.get("symbol") or "").strip()
direction = (leg.get("direction") or "long").strip().lower()
mon = {
"id": leg["order_monitor_id"],
"symbol": sym,
"direction": direction,
"lots": leg["mon_lots"],
"entry_price": leg["mon_entry"],
"take_profit": leg["mon_tp"] or leg["initial_take_profit"],
"stop_loss": leg.get("new_stop_loss"),
}
leg_lots = int(leg.get("lots") or 0)
mon_lots = int(leg.get("mon_lots") or 0)
pos = None
for p in positions:
if int(p.get("lots") or 0) <= 0:
continue
if (p.get("direction") or "long").strip().lower() != direction:
continue
if _match_ctp_symbol(p.get("symbol") or "", sym):
pos = p
break
pos_lots = int(pos.get("lots") or 0) if pos else mon_lots
vt_oid = (leg.get("vt_order_id") or "").strip()
if pos_lots >= mon_lots + leg_lots and leg_lots > 0:
mark = _roll_mark_price(sym, mon, mode, allow_ctp=True)
preview, err = preview_roll(
direction=direction,
symbol=sym,
qty_existing=float(mon_lots),
entry_existing=float(mon.get("entry_price") or 0),
initial_take_profit=float(mon.get("take_profit") or 0),
add_mode=ADD_MODE_MARKET,
new_stop_loss=float(leg.get("new_stop_loss") or 0),
risk_budget=float(leg.get("capital_snapshot") or get_fixed_amount(get_setting)),
mult=int(get_contract_spec(sym).get("mult") or 1),
mark_price=mark,
add_price=mark,
legs_done=int(leg.get("group_leg_count") or 0),
)
if err or not preview:
continue
preview["add_lots"] = leg_lots
ok, _ = _finalize_roll_fill(
conn,
mon=mon,
preview=preview,
add_mode=ADD_MODE_MARKET,
pending_leg_id=int(leg["id"]),
fill_price=mark,
)
if ok:
stats["filled"] += 1
continue
if vt_oid and vt_oid in active_orders:
continue
created = (leg.get("created_at") or "").strip()
age = 9999.0
if created:
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
age = now_ts - datetime.strptime(created[:19], fmt).timestamp()
break
except ValueError:
continue
if vt_oid and age < PENDING_ORDER_SETTLE_GRACE_SEC:
continue
if vt_oid:
try:
ctp_cancel_order(mode, vt_oid)
except Exception:
pass
conn.execute(
"UPDATE roll_legs SET status=? WHERE id=?",
(LEG_STATUS_CANCELLED, int(leg["id"])),
)
close_roll_group_if_idle(conn, int(leg["roll_group_id"]))
stats["cancelled"] += 1
if stats["filled"] or stats["cancelled"]:
conn.commit()
return stats
def _build_active_orders(
conn,
@@ -3043,7 +3348,13 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
gid = int(out.get("id") or 0)
filled_add_lots = int(filled_map.get(gid) or 0)
out["add_lots_filled"] = filled_add_lots
out["first_lots"] = max(0, int(lots) - filled_add_lots)
stored_initial = int(out.get("initial_lots") or 0)
if stored_initial > 0:
out["first_lots"] = stored_initial
elif int(lots) > 0 and filled_add_lots > 0 and int(lots) <= filled_add_lots:
out["first_lots"] = int(lots)
else:
out["first_lots"] = max(0, int(lots) - filled_add_lots)
out["total_lots"] = int(lots)
out["avg_entry"] = round(entry, 4) if entry > 0 else None
if lots > 0 and entry > 0 and tp > 0:
@@ -3239,6 +3550,27 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
capital = _capital(conn)
mode = get_trading_mode(get_setting)
_ensure_strategy_monitors(conn, mode)
for grow in conn.execute(
"SELECT id, order_monitor_id, initial_lots FROM roll_groups WHERE status='active'"
).fetchall():
gfix = dict(grow)
if int(gfix.get("initial_lots") or 0) > 0:
continue
mon_row = conn.execute(
"SELECT lots FROM trade_order_monitors WHERE id=?",
(int(gfix["order_monitor_id"]),),
).fetchone()
if not mon_row:
continue
gid = int(gfix["id"])
filled = _roll_filled_lots_map(conn, [gid]).get(gid, 0)
lots = int(mon_row["lots"] or 0)
initial = lots if lots > 0 and filled > 0 and lots <= filled else max(1, lots - filled)
conn.execute(
"UPDATE roll_groups SET initial_lots=? WHERE id=?",
(initial, gid),
)
commit_retry(conn)
active_trend = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC LIMIT 1"
).fetchone()
@@ -4085,87 +4417,6 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
return None, merr
return preview, None
def _commit_roll_fill(
conn,
*,
mon: dict,
preview: dict,
add_mode: str,
mode: str,
pending_leg_id: Optional[int] = None,
) -> tuple[bool, str]:
sym = mon["symbol"]
mon_id = int(mon["id"])
price = float(preview["add_price"])
try:
execute_order(
conn, mode=mode, offset="open", symbol=sym,
direction=mon["direction"], lots=int(preview["add_lots"]), price=price,
settings=_settings_dict(),
)
except ValueError as exc:
return False, str(exc)
new_lots = int(mon["lots"]) + int(preview["add_lots"])
new_avg = preview["avg_entry_after"]
new_sl = preview["new_stop_loss"]
conn.execute(
"UPDATE trade_order_monitors SET lots=?, entry_price=?, stop_loss=? WHERE id=?",
(new_lots, new_avg, new_sl, mon_id),
)
grp = _roll_group_for_monitor(conn, mon_id)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
risk_budget = float(preview.get("risk_budget") or get_fixed_amount(get_setting))
if grp:
gid = int(grp["id"])
leg_n = int(grp["leg_count"] or 0) + 1
conn.execute(
"UPDATE roll_groups SET leg_count=?, current_stop_loss=?, updated_at=? WHERE id=?",
(leg_n, new_sl, now, gid),
)
else:
cur = conn.execute(
"""INSERT INTO roll_groups (
order_monitor_id, symbol, direction, initial_take_profit, initial_stop_loss,
current_stop_loss, risk_percent, leg_count, status, created_at, updated_at
) VALUES (?,?,?,?,?,?,?,1,'active',?,?) RETURNING id""",
(
mon_id, sym, mon["direction"], mon["take_profit"], mon["stop_loss"],
new_sl, risk_budget, now, now,
),
)
row = cur.fetchone()
gid = int(row["id"] if isinstance(row, dict) else row[0])
leg_n = 1
if pending_leg_id:
conn.execute(
"""UPDATE roll_legs SET status=?, fill_price=?, lots=?, new_stop_loss=?, created_at=?
WHERE id=?""",
(
LEG_STATUS_FILLED, price, int(preview["add_lots"]), new_sl, now,
int(pending_leg_id),
),
)
else:
conn.execute(
"""INSERT INTO roll_legs (
roll_group_id, leg_index, add_mode, fill_price, lots, new_stop_loss,
status, created_at, limit_price, breakthrough_price, last_mark_price, capital_snapshot
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(
gid, leg_n, add_mode, price, int(preview["add_lots"]), new_sl,
LEG_STATUS_FILLED, now,
preview.get("limit_price"), preview.get("breakthrough_price"),
preview.get("mark_price"), _capital(conn),
),
)
conn.commit()
send_wechat_msg(
f"滚仓成交 {sym} {add_mode_label(add_mode)} +{preview['add_lots']}"
f"新止损 {new_sl} 合计 {new_lots}"
)
_schedule_roll_entry_sync(mon_id, sym, mon["direction"], mode)
return True, "成交"
def _schedule_roll_entry_sync(
mon_id: int, sym: str, direction: str, mode: str,
) -> None:
@@ -4209,25 +4460,16 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
add_mode: str,
) -> tuple[bool, str]:
mon_id = int(mon["id"])
initial_lots = int(mon.get("lots") or 0)
grp = _roll_group_for_monitor(conn, mon_id)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
capital = _capital(conn)
risk_budget = float(preview.get("risk_budget") or get_fixed_amount(get_setting))
if grp:
gid = int(grp["id"])
else:
cur = conn.execute(
"""INSERT INTO roll_groups (
order_monitor_id, symbol, direction, initial_take_profit, initial_stop_loss,
current_stop_loss, risk_percent, leg_count, status, created_at, updated_at
) VALUES (?,?,?,?,?,?,?,0,'active',?,?) RETURNING id""",
(
mon_id, mon["symbol"], mon["direction"], mon["take_profit"], mon["stop_loss"],
preview["new_stop_loss"], risk_budget, now, now,
),
gid = _roll_insert_group(
conn, mon=mon, preview=preview, initial_lots=initial_lots, leg_count=0,
)
row = cur.fetchone()
gid = int(row["id"] if isinstance(row, dict) else row[0])
leg_n = int(conn.execute(
"SELECT COUNT(*) AS n FROM roll_legs WHERE roll_group_id=? AND status=?",
(gid, LEG_STATUS_FILLED),
@@ -4403,6 +4645,12 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
if not _cached_ctp_status(mode).get("connected"):
conn.close()
return jsonify({"ok": False, "error": "请先连接 CTP"}), 400
if add_mode == ADD_MODE_MARKET:
ok, msg = _submit_roll_market_order(conn, mon=mon_d, preview=preview, mode=mode)
conn.close()
if not ok:
return jsonify({"ok": False, "error": msg}), 400
return jsonify({"ok": True, "message": msg, "pending": True})
ok, msg = _commit_roll_fill(
conn, mon=mon_d, preview=preview, add_mode=add_mode, mode=mode,
)
@@ -4416,6 +4664,18 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
def api_roll_cancel(leg_id: int):
conn = get_db()
init_strategy_tables(conn)
mode = get_trading_mode(get_setting)
leg = conn.execute(
"SELECT * FROM roll_legs WHERE id=? AND status=?",
(int(leg_id), LEG_STATUS_PENDING),
).fetchone()
if leg:
vt_oid = (dict(leg).get("vt_order_id") or "").strip()
if vt_oid and ctp_status(mode).get("connected"):
try:
ctp_cancel_order(mode, vt_oid)
except Exception as exc:
logger.debug("roll leg cancel order: %s", exc)
ok, msg = cancel_roll_leg(conn, leg_id)
if ok:
conn.commit()