Harden roll pending: never auto-delete on CTP snapshot gaps, repair orphans, fast monitor revive.
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+89
-24
@@ -3657,6 +3657,60 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
(now_s, gid),
|
(now_s, gid),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _account_has_margin_in_use(mode: str) -> bool:
|
||||||
|
if not ctp_status(mode).get("connected"):
|
||||||
|
return False
|
||||||
|
margin_raw = ctp_account_margin_used(mode)
|
||||||
|
return margin_raw is not None and float(margin_raw) > 0
|
||||||
|
|
||||||
|
def _roll_group_has_pending(conn, gid: int) -> bool:
|
||||||
|
return bool(conn.execute(
|
||||||
|
"SELECT 1 FROM roll_legs WHERE roll_group_id=? AND status=? LIMIT 1",
|
||||||
|
(int(gid), LEG_STATUS_PENDING),
|
||||||
|
).fetchone())
|
||||||
|
|
||||||
|
def _repair_orphan_roll_pending(conn) -> int:
|
||||||
|
"""pending 腿所在滚仓组被误关时恢复(避免「提交后消失」)。"""
|
||||||
|
rows = conn.execute(
|
||||||
|
"""SELECT l.id AS leg_id, l.roll_group_id, g.order_monitor_id, g.symbol, g.direction
|
||||||
|
FROM roll_legs l
|
||||||
|
JOIN roll_groups g ON g.id = l.roll_group_id
|
||||||
|
WHERE l.status=? AND g.status != 'active'
|
||||||
|
ORDER BY l.id DESC""",
|
||||||
|
(LEG_STATUS_PENDING,),
|
||||||
|
).fetchall()
|
||||||
|
fixed = 0
|
||||||
|
seen_monitors: set[int] = set()
|
||||||
|
for r in rows:
|
||||||
|
gid = int(r["roll_group_id"])
|
||||||
|
mid = int(r["order_monitor_id"] or 0)
|
||||||
|
if mid and mid in seen_monitors:
|
||||||
|
continue
|
||||||
|
if mid:
|
||||||
|
seen_monitors.add(mid)
|
||||||
|
sym = (r["symbol"] or "").strip()
|
||||||
|
direction = (r["direction"] or "long").strip().lower()
|
||||||
|
conn.execute(
|
||||||
|
"UPDATE roll_groups SET status='active', updated_at=? WHERE id=?",
|
||||||
|
(datetime.now().strftime("%Y-%m-%d %H:%M:%S"), gid),
|
||||||
|
)
|
||||||
|
if mid:
|
||||||
|
mon_row = conn.execute(
|
||||||
|
"SELECT status FROM trade_order_monitors WHERE id=?",
|
||||||
|
(mid,),
|
||||||
|
).fetchone()
|
||||||
|
if mon_row and (mon_row["status"] or "").strip().lower() != "active":
|
||||||
|
execute_retry(
|
||||||
|
conn,
|
||||||
|
"UPDATE trade_order_monitors SET status='active' WHERE id=?",
|
||||||
|
(mid,),
|
||||||
|
)
|
||||||
|
clear_close_pending(sym, direction)
|
||||||
|
fixed += 1
|
||||||
|
if fixed:
|
||||||
|
commit_retry(conn)
|
||||||
|
return fixed
|
||||||
|
|
||||||
def _ctp_has_open_position(mode: str, sym: str, direction: str) -> bool:
|
def _ctp_has_open_position(mode: str, sym: str, direction: str) -> bool:
|
||||||
"""柜台是否仍有该品种方向持仓(滚仓组是否应保留)。"""
|
"""柜台是否仍有该品种方向持仓(滚仓组是否应保留)。"""
|
||||||
direction = (direction or "long").strip().lower()
|
direction = (direction or "long").strip().lower()
|
||||||
@@ -3676,6 +3730,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
"""仅当持仓监控已结束且柜台无对应持仓时,才归档滚仓组。"""
|
"""仅当持仓监控已结束且柜台无对应持仓时,才归档滚仓组。"""
|
||||||
mode = get_trading_mode(get_setting)
|
mode = get_trading_mode(get_setting)
|
||||||
_revive_monitors_for_open_ctp(conn, mode)
|
_revive_monitors_for_open_ctp(conn, mode)
|
||||||
|
_repair_orphan_roll_pending(conn)
|
||||||
rows = conn.execute(
|
rows = conn.execute(
|
||||||
"""SELECT g.*, m.status AS monitor_status
|
"""SELECT g.*, m.status AS monitor_status
|
||||||
FROM roll_groups g
|
FROM roll_groups g
|
||||||
@@ -3697,6 +3752,19 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
).fetchone()
|
).fetchone()
|
||||||
if fresh and (fresh["status"] or "").strip().lower() == "active":
|
if fresh and (fresh["status"] or "").strip().lower() == "active":
|
||||||
continue
|
continue
|
||||||
|
if _roll_group_has_pending(conn, gid):
|
||||||
|
if (
|
||||||
|
_ctp_has_open_position(mode, sym, direction)
|
||||||
|
or _account_has_margin_in_use(mode)
|
||||||
|
):
|
||||||
|
if mid:
|
||||||
|
_revive_closed_monitor(conn, sym, direction)
|
||||||
|
continue
|
||||||
|
logger.debug(
|
||||||
|
"keep pending roll group #%s despite inactive monitor",
|
||||||
|
gid,
|
||||||
|
)
|
||||||
|
continue
|
||||||
if _ctp_has_open_position(mode, sym, direction):
|
if _ctp_has_open_position(mode, sym, direction):
|
||||||
if mid:
|
if mid:
|
||||||
_revive_closed_monitor(conn, sym, direction)
|
_revive_closed_monitor(conn, sym, direction)
|
||||||
@@ -3705,6 +3773,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
gid, sym, direction,
|
gid, sym, direction,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
if _account_has_margin_in_use(mode):
|
||||||
|
if mid:
|
||||||
|
_revive_closed_monitor(conn, sym, direction)
|
||||||
|
continue
|
||||||
_archive_roll_group(conn, grp, result_label="持仓已结束")
|
_archive_roll_group(conn, grp, result_label="持仓已结束")
|
||||||
closed += 1
|
closed += 1
|
||||||
if closed:
|
if closed:
|
||||||
@@ -3825,6 +3897,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
mode = get_trading_mode(get_setting)
|
mode = get_trading_mode(get_setting)
|
||||||
if ctp_status(mode).get("connected"):
|
if ctp_status(mode).get("connected"):
|
||||||
_revive_monitors_for_open_ctp(conn, mode)
|
_revive_monitors_for_open_ctp(conn, mode)
|
||||||
|
_repair_orphan_roll_pending(conn)
|
||||||
_revive_roll_monitors_light(conn)
|
_revive_roll_monitors_light(conn)
|
||||||
need_initial = conn.execute(
|
need_initial = conn.execute(
|
||||||
"SELECT id FROM roll_groups WHERE status='active' AND COALESCE(initial_lots, 0)=0 LIMIT 1"
|
"SELECT id FROM roll_groups WHERE status='active' AND COALESCE(initial_lots, 0)=0 LIMIT 1"
|
||||||
@@ -4540,39 +4613,30 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
mon = dict(row)
|
mon = dict(row)
|
||||||
if (mon.get("status") or "").strip().lower() == "active":
|
if (mon.get("status") or "").strip().lower() == "active":
|
||||||
return mon
|
return mon
|
||||||
mode = get_trading_mode(get_setting)
|
if int(mon.get("lots") or 0) <= 0:
|
||||||
if not _cached_ctp_status(mode).get("connected"):
|
|
||||||
return None
|
return None
|
||||||
|
mode = get_trading_mode(get_setting)
|
||||||
sym = (mon.get("symbol") or "").strip()
|
sym = (mon.get("symbol") or "").strip()
|
||||||
direction = (mon.get("direction") or "long").strip().lower()
|
direction = (mon.get("direction") or "long").strip().lower()
|
||||||
for p in _positions_for_monitor_restore(mode, allow_ctp=False):
|
still_holding = False
|
||||||
if int(p.get("lots") or 0) <= 0:
|
if ctp_status(mode).get("connected"):
|
||||||
continue
|
still_holding = (
|
||||||
if (p.get("direction") or "long").strip().lower() != direction:
|
_ctp_has_open_position(mode, sym, direction)
|
||||||
continue
|
or _account_has_margin_in_use(mode)
|
||||||
if not _match_ctp_symbol(p.get("symbol") or "", sym):
|
)
|
||||||
continue
|
if not still_holding:
|
||||||
|
revived = _revive_closed_monitor(conn, sym, direction)
|
||||||
|
if revived and int(revived.get("id") or 0) == int(mon_id):
|
||||||
|
return revived
|
||||||
|
return None
|
||||||
execute_retry(
|
execute_retry(
|
||||||
conn,
|
conn,
|
||||||
"UPDATE trade_order_monitors SET status='active' WHERE id=?",
|
"UPDATE trade_order_monitors SET status='active' WHERE id=?",
|
||||||
(int(mon_id),),
|
(int(mon_id),),
|
||||||
)
|
)
|
||||||
|
clear_close_pending(sym, direction)
|
||||||
mon["status"] = "active"
|
mon["status"] = "active"
|
||||||
_sync_monitor_from_ctp(
|
return mon
|
||||||
conn,
|
|
||||||
int(mon_id),
|
|
||||||
sym,
|
|
||||||
direction,
|
|
||||||
mode,
|
|
||||||
ctp=p,
|
|
||||||
capital=_capital(conn),
|
|
||||||
)
|
|
||||||
fresh = conn.execute(
|
|
||||||
"SELECT * FROM trade_order_monitors WHERE id=?",
|
|
||||||
(int(mon_id),),
|
|
||||||
).fetchone()
|
|
||||||
return dict(fresh) if fresh else mon
|
|
||||||
return None
|
|
||||||
|
|
||||||
def _roll_mark_price(
|
def _roll_mark_price(
|
||||||
sym: str,
|
sym: str,
|
||||||
@@ -4747,6 +4811,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se
|
|||||||
try:
|
try:
|
||||||
if ctp_status(mode).get("connected"):
|
if ctp_status(mode).get("connected"):
|
||||||
_revive_monitors_for_open_ctp(conn, mode)
|
_revive_monitors_for_open_ctp(conn, mode)
|
||||||
|
_repair_orphan_roll_pending(conn)
|
||||||
check_roll_monitors(
|
check_roll_monitors(
|
||||||
conn,
|
conn,
|
||||||
get_mark_price_fn=lambda sym: _roll_mark_price(sym, {}, mode, allow_ctp=True),
|
get_mark_price_fn=lambda sym: _roll_mark_price(sym, {}, mode, allow_ctp=True),
|
||||||
|
|||||||
@@ -793,6 +793,13 @@ def reconcile_monitors_without_position(conn, mode: str, *, grace_sec: int = 120
|
|||||||
(mid,),
|
(mid,),
|
||||||
).fetchone():
|
).fetchone():
|
||||||
continue
|
continue
|
||||||
|
if conn.execute(
|
||||||
|
"""SELECT 1 FROM roll_legs l
|
||||||
|
JOIN roll_groups g ON g.id = l.roll_group_id
|
||||||
|
WHERE g.order_monitor_id=? AND l.status='pending' LIMIT 1""",
|
||||||
|
(mid,),
|
||||||
|
).fetchone():
|
||||||
|
continue
|
||||||
try:
|
try:
|
||||||
cancel_monitor_exit_orders(conn, mon, mode=mode)
|
cancel_monitor_exit_orders(conn, mon, mode=mode)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
|
|||||||
Reference in New Issue
Block a user