修复滚仓

This commit is contained in:
dekun
2026-05-28 14:43:15 +08:00
parent e8f77ebd57
commit 33042890b5
11 changed files with 387 additions and 6 deletions
+286
View File
@@ -0,0 +1,286 @@
"""滚仓挂单监控:斐波限价止盈侧突破撤单、成交同步、活跃组结案(各所共用)。"""
from __future__ import annotations
from typing import Any, Optional
from fib_key_monitor_lib import fib_invalidate_by_mark
from strategy_db import init_strategy_tables
ROLL_LEG_STATUS_LABELS = {
"pending": "挂单中",
"filled": "已成交",
"cancelled": "已撤销",
"invalidated": "止盈侧突破失效",
}
def roll_leg_status_label(status: Optional[str]) -> str:
s = (status or "").strip().lower()
return ROLL_LEG_STATUS_LABELS.get(s, status or "")
def check_roll_monitors(cfg: dict[str, Any]) -> None:
get_db = cfg["get_db"]
conn = get_db()
try:
init_strategy_tables(conn)
_reconcile_roll_groups(conn, cfg)
_check_pending_roll_legs(conn, cfg)
conn.commit()
except Exception:
try:
conn.rollback()
except Exception:
pass
finally:
try:
conn.close()
except Exception:
pass
def _row_dict(row) -> dict:
if row is None:
return {}
try:
return dict(row)
except Exception:
return {}
def _now(cfg: dict) -> str:
fn = cfg.get("app_now_str")
return fn() if callable(fn) else ""
def _close_roll_group(
conn,
cfg: dict,
group: dict,
*,
cancel_pending: bool = True,
) -> None:
gid = int(group["id"])
if cancel_pending:
for leg in conn.execute(
"SELECT * FROM roll_legs WHERE roll_group_id=? AND status='pending'",
(gid,),
).fetchall():
ld = _row_dict(leg)
_cancel_roll_leg_order(cfg, group, ld)
conn.execute(
"UPDATE roll_legs SET status='cancelled' WHERE id=? AND status='pending'",
(ld["id"],),
)
conn.execute(
"UPDATE roll_groups SET status='closed', updated_at=? WHERE id=? AND status='active'",
(_now(cfg), gid),
)
def _reconcile_roll_groups(conn, cfg: dict) -> None:
rows = conn.execute(
"""SELECT g.*, m.status AS monitor_status
FROM roll_groups g
LEFT JOIN order_monitors m ON m.id = g.order_monitor_id
WHERE g.status='active'"""
).fetchall()
for row in rows:
g = _row_dict(row)
symbol = g.get("symbol") or ""
direction = (g.get("direction") or "long").strip().lower()
ex_sym = g.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol)
mon_ok = (row["monitor_status"] or "").strip().lower() == "active"
pos = cfg["get_position"](ex_sym, direction)
qty = float(pos.get("contracts") or 0)
if not mon_ok or qty <= 0:
_close_roll_group(conn, cfg, g, cancel_pending=True)
def _cancel_roll_leg_order(cfg: dict, group: dict, leg: dict) -> None:
oid = (leg.get("exchange_order_id") or "").strip()
if not oid:
return
symbol = group.get("symbol") or ""
ex_sym = group.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol)
cancel = cfg.get("cancel_limit_order")
if callable(cancel):
try:
cancel(ex_sym, oid)
except Exception:
pass
def _check_pending_roll_legs(conn, cfg: dict) -> None:
rows = conn.execute(
"""SELECT l.*, g.symbol, g.exchange_symbol, g.direction, g.initial_take_profit,
g.order_monitor_id
FROM roll_legs l
INNER JOIN roll_groups g ON g.id = l.roll_group_id AND g.status='active'
WHERE l.status='pending'"""
).fetchall()
for row in rows:
leg = _row_dict(row)
group = {
"id": leg["roll_group_id"],
"symbol": leg["symbol"],
"exchange_symbol": leg["exchange_symbol"],
"direction": leg["direction"],
"initial_take_profit": leg["initial_take_profit"],
"order_monitor_id": leg["order_monitor_id"],
}
_process_pending_roll_leg(conn, cfg, group, leg)
def _process_pending_roll_leg(conn, cfg: dict, group: dict, leg: dict) -> None:
symbol = group.get("symbol") or ""
direction = (group.get("direction") or "long").strip().lower()
ex_sym = group.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol)
oid = (leg.get("exchange_order_id") or "").strip()
mark_fn = cfg.get("get_mark_price") or cfg.get("get_price")
mark = mark_fn(symbol) if callable(mark_fn) else None
if mark is None:
return
order_status_fn = cfg.get("limit_order_status")
order_st = order_status_fn(ex_sym, oid) if callable(order_status_fn) and oid else "missing"
fib_u, fib_l = leg.get("fib_upper"), leg.get("fib_lower")
has_fib = fib_u is not None and fib_l is not None
if order_st == "filled":
_finalize_roll_leg_fill(conn, cfg, group, leg, ex_sym, direction, float(mark))
return
if has_fib and fib_invalidate_by_mark(direction, mark, fib_u, fib_l):
if order_st == "open":
_cancel_roll_leg_order(cfg, group, leg)
_invalidate_roll_leg(conn, cfg, group, leg, float(mark))
return
if order_st in ("canceled", "missing", "unknown") and has_fib:
if fib_invalidate_by_mark(direction, mark, fib_u, fib_l):
_invalidate_roll_leg(conn, cfg, group, leg, float(mark))
def _invalidate_roll_leg(
conn, cfg: dict, group: dict, leg: dict, mark: float
) -> None:
leg_id = int(leg["id"])
gid = int(group["id"])
cur = conn.execute(
"SELECT status FROM roll_legs WHERE id=?", (leg_id,)
).fetchone()
if not cur or (cur[0] or "").strip().lower() == "invalidated":
return
conn.execute(
"UPDATE roll_legs SET status='invalidated' WHERE id=? AND status='pending'",
(leg_id,),
)
conn.execute(
"""UPDATE roll_groups SET leg_count = CASE WHEN leg_count > 0 THEN leg_count - 1 ELSE 0 END,
updated_at=? WHERE id=?""",
(_now(cfg), gid),
)
_send_roll_invalidate_wechat(cfg, group, leg, mark)
def _finalize_roll_leg_fill(
conn,
cfg: dict,
group: dict,
leg: dict,
ex_sym: str,
direction: str,
mark: float,
) -> None:
leg_id = int(leg["id"])
gid = int(group["id"])
new_sl = float(leg.get("new_stop_loss") or 0)
tp0 = float(group.get("initial_take_profit") or 0)
fill_px = float(leg.get("limit_price") or mark)
conn.execute(
"UPDATE roll_legs SET status='filled', fill_price=? WHERE id=? AND status='pending'",
(fill_px, leg_id),
)
if new_sl > 0:
conn.execute(
"UPDATE roll_groups SET current_stop_loss=?, updated_at=? WHERE id=?",
(new_sl, _now(cfg), gid),
)
mon_id = group.get("order_monitor_id")
if mon_id and new_sl > 0:
conn.execute(
"UPDATE order_monitors SET stop_loss=? WHERE id=? AND status='active'",
(new_sl, mon_id),
)
replace = cfg.get("replace_tpsl")
if callable(replace) and new_sl > 0 and tp0 > 0:
mon = None
if mon_id:
row = conn.execute(
"SELECT * FROM order_monitors WHERE id=?", (mon_id,)
).fetchone()
mon = _row_dict(row) if row else None
try:
replace(ex_sym, direction, new_sl, tp0, mon)
except Exception:
pass
notify = cfg.get("send_wechat")
if callable(notify):
sym = group.get("symbol") or ""
mode = leg.get("add_mode") or "限价"
fmt = cfg.get("format_price")
px_txt = fmt(sym, fill_px) if callable(fmt) else str(fill_px)
sl_txt = fmt(sym, new_sl) if callable(fmt) else str(new_sl)
acct = _wechat_account(cfg)
dir_txt = _wechat_dir(cfg, direction)
notify(
f"# ✅ {sym} 滚仓限价已成交\n"
f"**账户:{acct}**\n"
f"- 方式:{mode}{dir_txt}\n"
f"- 成交价:{px_txt}|新止损:{sl_txt}\n"
f"- 交易所止损已尝试同步(止盈仍为首仓)\n"
)
def _send_roll_invalidate_wechat(
cfg: dict, group: dict, leg: dict, mark: float
) -> None:
notify = cfg.get("send_wechat")
if not callable(notify):
return
sym = group.get("symbol") or ""
direction = (group.get("direction") or "long").strip().lower()
mode = leg.get("add_mode") or "斐波限价"
fmt = cfg.get("format_price")
mark_txt = fmt(sym, mark) if callable(fmt) else str(mark)
acct = _wechat_account(cfg)
dir_txt = _wechat_dir(cfg, direction)
notify(
f"# ⚠️ {sym} 滚仓斐波挂单失效\n"
f"**账户:{acct}**\n"
f"- 方式:{mode}{dir_txt}\n"
f"- 标记价 {mark_txt} 已触达止盈侧(未成交),已撤限价加仓单\n"
f"- 本条滚仓腿已结案,可继续下一档或重新挂单\n"
)
def _wechat_account(cfg: dict) -> str:
fn = cfg.get("wechat_account_label")
if callable(fn):
try:
return str(fn())
except Exception:
pass
return str(cfg.get("exchange_display") or "")
def _wechat_dir(cfg: dict, direction: str) -> str:
fn = cfg.get("wechat_direction_text")
if callable(fn):
try:
return str(fn(direction))
except Exception:
pass
return "做多" if (direction or "long").strip().lower() == "long" else "做空"