"""滚仓程序监控:斐波/突破触价市价成交、失效、外部平仓同步(各所共用)。""" from __future__ import annotations from typing import Any, Optional from strategy_roll_lib import ( BREAKOUT_MODE, FIB_MODES, MARKET_MODE, mode_label, roll_breakout_invalidate, roll_breakout_trigger_crossed, roll_fib_invalidate, roll_fib_trigger_crossed, calc_risk_budget_usdt, max_roll_legs, preview_roll, solve_add_amount_for_total_risk, ) from strategy_db import init_strategy_tables ROLL_LEG_STATUS_LABELS = { "pending": "监控中", "filled": "已成交", "cancelled": "已删除", "invalidated": "已失效", } def roll_leg_status_label(status: Optional[str]) -> str: s = (status or "").strip().lower() return ROLL_LEG_STATUS_LABELS.get(s, status or "—") def check_roll_monitors(cfg: dict[str, Any]) -> None: get_db = cfg["get_db"] conn = get_db() try: init_strategy_tables(conn) _reconcile_roll_groups(conn, cfg) _check_pending_roll_legs(conn, cfg) conn.commit() except Exception: try: conn.rollback() except Exception: pass finally: try: conn.close() except Exception: pass def sync_roll_after_external_close( cfg: dict, conn, symbol: str, direction: str, *, reason: str = "持仓已平" ) -> dict[str, Any]: """中控/实例手动平仓后:取消 pending 腿并关闭 active 滚仓组(保留 filled 历史)。""" norm = cfg.get("normalize_symbol_input") sym = norm(symbol) if callable(norm) else (symbol or "").strip() if not sym: return {"ok": False, "msg": "symbol 无效", "closed_groups": 0, "cancelled_legs": 0} direction = (direction or "long").strip().lower() init_strategy_tables(conn) rows = conn.execute( """SELECT g.* FROM roll_groups g WHERE g.status='active' AND g.symbol=? AND g.direction=?""", (sym, direction), ).fetchall() closed = cancelled = 0 for row in rows: g = _row_dict(row) cancelled += _cancel_pending_legs_for_group(conn, cfg, g, status="cancelled") cur = conn.execute( "UPDATE roll_groups SET status='closed', updated_at=? WHERE id=? AND status='active'", (_now(cfg), int(g["id"])), ) if getattr(cur, "rowcount", 0): closed += 1 try: from strategy_wechat_notify import notify_roll_group_ended notify_roll_group_ended( cfg, group_id=int(g["id"]), symbol=sym, direction=direction, reason=reason, leg_count=int(g.get("leg_count") or 0), ) except Exception: pass try: from strategy_snapshot_lib import save_roll_group_snapshot save_roll_group_snapshot(cfg, conn, g, result_label="结束") except Exception: pass return { "ok": True, "symbol": sym, "direction": direction, "closed_groups": closed, "cancelled_legs": cancelled, } def cancel_roll_pending_leg(cfg: dict, conn, leg_id: int) -> tuple[bool, str]: """用户删除 pending 滚仓腿(不可修改,仅删除)。""" init_strategy_tables(conn) row = conn.execute( "SELECT l.*, g.symbol, g.direction, g.status AS group_status FROM roll_legs l " "INNER JOIN roll_groups g ON g.id = l.roll_group_id WHERE l.id=?", (int(leg_id),), ).fetchone() if not row: return False, "滚仓腿不存在" leg = _row_dict(row) if (leg.get("status") or "").strip().lower() != "pending": return False, "仅监控中的腿可删除" _cancel_roll_leg_order(cfg, {"symbol": leg.get("symbol"), "exchange_symbol": leg.get("exchange_symbol")}, leg) conn.execute( "UPDATE roll_legs SET status='cancelled' WHERE id=? AND status='pending'", (int(leg_id),), ) conn.commit() return True, "已删除滚仓监控" def count_filled_roll_legs(conn, roll_group_id: int) -> int: row = conn.execute( "SELECT COUNT(*) FROM roll_legs WHERE roll_group_id=? AND status='filled'", (int(roll_group_id),), ).fetchone() return int(row[0] if row else 0) def count_pending_roll_legs(conn, roll_group_id: int) -> int: row = conn.execute( "SELECT COUNT(*) FROM roll_legs WHERE roll_group_id=? AND status='pending'", (int(roll_group_id),), ).fetchone() return int(row[0] if row else 0) def _row_dict(row) -> dict: if row is None: return {} try: return dict(row) except Exception: return {} def _now(cfg: dict) -> str: fn = cfg.get("app_now_str") return fn() if callable(fn) else "" def _cancel_pending_legs_for_group(conn, cfg: dict, group: dict, *, status: str = "cancelled") -> int: gid = int(group["id"]) n = 0 for leg in conn.execute( "SELECT * FROM roll_legs WHERE roll_group_id=? AND status='pending'", (gid,), ).fetchall(): ld = _row_dict(leg) _cancel_roll_leg_order(cfg, group, ld) conn.execute( "UPDATE roll_legs SET status=? WHERE id=? AND status='pending'", (status, ld["id"]), ) n += 1 return n def _close_roll_group(conn, cfg: dict, group: dict, *, reason: str = "下单监控已结案或交易所无同向持仓") -> None: gid = int(group["id"]) _cancel_pending_legs_for_group(conn, cfg, group, status="cancelled") cur = conn.execute( "UPDATE roll_groups SET status='closed', updated_at=? WHERE id=? AND status='active'", (_now(cfg), gid), ) if getattr(cur, "rowcount", 0): try: from strategy_wechat_notify import notify_roll_group_ended notify_roll_group_ended( cfg, group_id=gid, symbol=group.get("symbol") or "", direction=group.get("direction") or "long", reason=reason, leg_count=int(group.get("leg_count") or 0), ) except Exception: pass try: from strategy_snapshot_lib import save_roll_group_snapshot save_roll_group_snapshot(cfg, conn, group, result_label="结束") except Exception: pass def _reconcile_roll_groups(conn, cfg: dict) -> None: rows = conn.execute( """SELECT g.*, m.status AS monitor_status FROM roll_groups g LEFT JOIN order_monitors m ON m.id = g.order_monitor_id WHERE g.status='active'""" ).fetchall() for row in rows: g = _row_dict(row) symbol = g.get("symbol") or "" direction = (g.get("direction") or "long").strip().lower() ex_sym = g.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol) mon_ok = (row["monitor_status"] or "").strip().lower() == "active" pos = cfg["get_position"](ex_sym, direction) qty = float(pos.get("contracts") or 0) if not mon_ok or qty <= 0: _close_roll_group(conn, cfg, g) def _cancel_roll_leg_order(cfg: dict, group: dict, leg: dict) -> None: oid = (leg.get("exchange_order_id") or "").strip() if not oid: return symbol = group.get("symbol") or "" ex_sym = group.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol) cancel = cfg.get("cancel_limit_order") if callable(cancel): try: cancel(ex_sym, oid) except Exception: pass def _contract_size(cfg: dict, ex_sym: str) -> float: get_cs = cfg.get("get_contract_size") if callable(get_cs): try: return float(get_cs(ex_sym) or 1.0) except Exception: pass return 1.0 def _resolve_add_mode(leg: dict) -> str: raw = (leg.get("add_mode") or "").strip().lower() if raw in (MARKET_MODE, "market", "市价", "市价加仓"): return MARKET_MODE if "786" in raw or raw == "fib_786": return "fib_786" if "618" in raw or raw == "fib_618": return "fib_618" if raw in (BREAKOUT_MODE, "突破", "突破加仓"): return BREAKOUT_MODE if raw.startswith("fib"): return raw.replace(".", "_").replace("0.", "0") return raw or MARKET_MODE def _check_pending_roll_legs(conn, cfg: dict) -> None: rows = conn.execute( """SELECT l.*, g.symbol, g.exchange_symbol, g.direction, g.initial_take_profit, g.order_monitor_id, g.risk_percent, g.leg_count FROM roll_legs l INNER JOIN roll_groups g ON g.id = l.roll_group_id AND g.status='active' WHERE l.status='pending'""" ).fetchall() for row in rows: leg = _row_dict(row) group = { "id": leg["roll_group_id"], "symbol": leg["symbol"], "exchange_symbol": leg["exchange_symbol"], "direction": leg["direction"], "initial_take_profit": leg["initial_take_profit"], "order_monitor_id": leg["order_monitor_id"], "risk_percent": leg.get("risk_percent"), "leg_count": leg.get("leg_count"), } _process_pending_roll_leg(conn, cfg, group, leg) def _process_pending_roll_leg(conn, cfg: dict, group: dict, leg: dict) -> None: symbol = group.get("symbol") or "" direction = (group.get("direction") or "long").strip().lower() ex_sym = group.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol) mark_fn = cfg.get("get_mark_price") or cfg.get("get_price") mark = mark_fn(symbol) if callable(mark_fn) else None if mark is None: return mark_f = float(mark) prev_mark = leg.get("last_mark_price") try: prev_f = float(prev_mark) if prev_mark not in (None, "") else None except (TypeError, ValueError): prev_f = None mode = _resolve_add_mode(leg) sl = float(leg.get("new_stop_loss") or 0) fib_u, fib_l = leg.get("fib_upper"), leg.get("fib_lower") bp = leg.get("breakthrough_price") if mode in FIB_MODES and fib_u is not None and fib_l is not None: if roll_fib_invalidate(direction, mark_f, float(fib_u), float(fib_l)): _invalidate_roll_leg(conn, cfg, group, leg, mark_f, reason="止盈侧突破") return elif mode == BREAKOUT_MODE and sl > 0: if roll_breakout_invalidate(direction, mark_f, sl): _invalidate_roll_leg(conn, cfg, group, leg, mark_f, reason="止损侧突破") return triggered = False if mode in FIB_MODES: lp = leg.get("limit_price") if lp is not None and roll_fib_trigger_crossed(direction, prev_f, mark_f, float(lp)): triggered = True elif mode == BREAKOUT_MODE and bp is not None: if roll_breakout_trigger_crossed(direction, prev_f, mark_f, float(bp)): triggered = True conn.execute( "UPDATE roll_legs SET last_mark_price=? WHERE id=? AND status='pending'", (mark_f, int(leg["id"])), ) if triggered: _execute_pending_roll_leg(conn, cfg, group, leg, ex_sym, direction, mark_f) return def _execute_pending_roll_leg( conn, cfg: dict, group: dict, leg: dict, ex_sym: str, direction: str, mark: float, ) -> None: leg_id = int(leg["id"]) gid = int(group["roll_group_id"]) if "roll_group_id" in leg else int(group["id"]) mon_id = group.get("order_monitor_id") mon = None if mon_id: row = conn.execute("SELECT * FROM order_monitors WHERE id=?", (mon_id,)).fetchone() mon = _row_dict(row) if row else None if not mon or (mon.get("status") or "").strip().lower() != "active": _invalidate_roll_leg(conn, cfg, group, leg, mark, reason="监控单已失效") return pos = cfg["get_position"](ex_sym, direction) or {} qty = float(pos.get("contracts") or 0) entry = float(pos.get("entry_price") or mon.get("trigger_price") or 0) if qty <= 0 or entry <= 0: _invalidate_roll_leg(conn, cfg, group, leg, mark, reason="无持仓") return filled = count_filled_roll_legs(conn, gid) if filled >= max_roll_legs(direction): _invalidate_roll_leg(conn, cfg, group, leg, mark, reason="滚仓次数已满") return try: risk_pct = float(mon.get("risk_percent") or group.get("risk_percent") or 2) except (TypeError, ValueError): risk_pct = 2.0 conn_cap = cfg["get_db"]() try: capital = float(cfg["get_trading_capital_usdt"](conn_cap)) finally: conn_cap.close() cs = _contract_size(cfg, ex_sym) sl = float(leg.get("new_stop_loss") or 0) tp0 = float(group.get("initial_take_profit") or mon.get("take_profit") or 0) mode = _resolve_add_mode(leg) q2_raw, err = solve_add_amount_for_total_risk( direction, qty, entry, mark, sl, calc_risk_budget_usdt(capital, risk_pct), cs ) if err or q2_raw is None or float(q2_raw) <= 0: _invalidate_roll_leg(conn, cfg, group, leg, mark, reason=err or "无法计算加仓张数") return amount = cfg["amount_to_precision"](ex_sym, float(q2_raw)) if amount is None or float(amount) <= 0: _invalidate_roll_leg(conn, cfg, group, leg, mark, reason="加仓张数低于交易所最小精度") return lev_fn = cfg.get("default_leverage") if not callable(lev_fn): lev_fn = lambda _s: 5 leverage = int(lev_fn(group.get("symbol") or "")) try: order = cfg["market_add"](ex_sym, direction, float(amount), leverage) fill = float( cfg.get("resolve_fill_price", lambda o, s, p: p)(order, ex_sym, mark) or mark ) except Exception as e: fe = cfg.get("friendly_error") msg = fe(e) if callable(fe) else str(e) _notify_roll_fail(cfg, group, leg, mark, msg) return oid = str(order.get("id") or "") if isinstance(order, dict) else "" cfg["replace_tpsl"](ex_sym, direction, sl, tp0, mon) conn.execute( """UPDATE roll_legs SET status='filled', fill_price=?, amount=?, exchange_order_id=?, new_stop_loss=? WHERE id=? AND status='pending'""", (fill, float(amount), oid, sl, leg_id), ) conn.execute( "UPDATE roll_groups SET leg_count=?, current_stop_loss=?, updated_at=? WHERE id=?", (filled + 1, sl, _now(cfg), gid), ) conn.execute( "UPDATE order_monitors SET stop_loss=? WHERE id=? AND status='active'", (sl, mon["id"]), ) notify = cfg.get("send_wechat") if callable(notify): sym = group.get("symbol") or "" mode_lbl = leg.get("add_mode") or mode_label(mode) fmt = cfg.get("format_price") px_txt = fmt(sym, fill) if callable(fmt) else str(fill) sl_txt = fmt(sym, sl) if callable(fmt) else str(sl) acct = _wechat_account(cfg) dir_txt = _wechat_dir(cfg, direction) notify( f"# ✅ {sym} 滚仓触价成交\n" f"**账户:{acct}**\n" f"- 方式:{mode_lbl}|{dir_txt}\n" f"- 成交价:{px_txt}|张数:{amount}\n" f"- 新止损:{sl_txt}(止盈仍为首仓)\n" ) def _invalidate_roll_leg( conn, cfg: dict, group: dict, leg: dict, mark: float, *, reason: str = "", ) -> None: leg_id = int(leg["id"]) cur = conn.execute("SELECT status FROM roll_legs WHERE id=?", (leg_id,)).fetchone() if not cur or (cur[0] or "").strip().lower() in ("invalidated", "filled", "cancelled"): return _cancel_roll_leg_order(cfg, group, leg) conn.execute( "UPDATE roll_legs SET status='invalidated' WHERE id=? AND status='pending'", (leg_id,), ) _send_roll_invalidate_wechat(cfg, group, leg, mark, reason=reason) def _notify_roll_fail(cfg: dict, group: dict, leg: dict, mark: float, reason: str) -> None: notify = cfg.get("send_wechat") if not callable(notify): return sym = group.get("symbol") or "" mode = leg.get("add_mode") or "滚仓" acct = _wechat_account(cfg) notify( f"# ❌ {sym} 滚仓触价成交失败\n" f"**账户:{acct}**\n" f"- 方式:{mode}\n" f"- 原因:{reason}\n" ) def _send_roll_invalidate_wechat( cfg: dict, group: dict, leg: dict, mark: float, *, reason: str = "" ) -> None: notify = cfg.get("send_wechat") if not callable(notify): return sym = group.get("symbol") or "" direction = (group.get("direction") or "long").strip().lower() mode = leg.get("add_mode") or "滚仓监控" fmt = cfg.get("format_price") mark_txt = fmt(sym, mark) if callable(fmt) else str(mark) acct = _wechat_account(cfg) dir_txt = _wechat_dir(cfg, direction) detail = reason or "条件不满足" notify( f"# ⚠️ {sym} 滚仓监控失效\n" f"**账户:{acct}**\n" f"- 方式:{mode}|{dir_txt}\n" f"- 标记价 {mark_txt}|{detail}\n" f"- 本条监控已结案,可重新提交\n" ) def _wechat_account(cfg: dict) -> str: fn = cfg.get("wechat_account_label") if callable(fn): try: return str(fn()) except Exception: pass return str(cfg.get("exchange_display") or "") def _wechat_dir(cfg: dict, direction: str) -> str: fn = cfg.get("wechat_direction_text") if callable(fn): try: return str(fn(direction)) except Exception: pass return "做多" if (direction or "long").strip().lower() == "long" else "做空"