From 33042890b5319774e4c4dc65990a339294d77e3c Mon Sep 17 00:00:00 2001 From: dekun Date: Thu, 28 May 2026 14:43:15 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E6=BB=9A=E4=BB=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crypto_monitor_binance/app.py | 5 + crypto_monitor_gate/app.py | 5 + crypto_monitor_gate_bot/app.py | 5 + crypto_monitor_okx/app.py | 5 + hub_bridge.py | 4 +- strategy_config.py | 61 +++++ strategy_register.py | 1 + strategy_roll_monitor_lib.py | 286 ++++++++++++++++++++ strategy_templates/strategy_roll.html | 2 +- strategy_templates/strategy_roll_panel.html | 2 +- strategy_ui.py | 17 +- 11 files changed, 387 insertions(+), 6 deletions(-) create mode 100644 strategy_roll_monitor_lib.py diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 46f499c..1804a54 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -5545,6 +5545,11 @@ def background_task(): conn.close() force_close_before_reset() check_fib_key_monitors() + _roll_cfg = app.extensions.get("strategy_roll_cfg") + if _roll_cfg: + from strategy_roll_monitor_lib import check_roll_monitors + + check_roll_monitors(_roll_cfg) check_key_monitors() check_order_monitors() cfg = app.extensions.get("strategy_trend_cfg") diff --git a/crypto_monitor_gate/app.py b/crypto_monitor_gate/app.py index 5413984..c23a962 100644 --- a/crypto_monitor_gate/app.py +++ b/crypto_monitor_gate/app.py @@ -5490,6 +5490,11 @@ def background_task(): conn.close() force_close_before_reset() check_fib_key_monitors() + _roll_cfg = app.extensions.get("strategy_roll_cfg") + if _roll_cfg: + from strategy_roll_monitor_lib import check_roll_monitors + + check_roll_monitors(_roll_cfg) check_key_monitors() check_order_monitors() cfg = app.extensions.get("strategy_trend_cfg") diff --git a/crypto_monitor_gate_bot/app.py b/crypto_monitor_gate_bot/app.py index b37824a..d0d6044 100644 --- a/crypto_monitor_gate_bot/app.py +++ b/crypto_monitor_gate_bot/app.py @@ -5079,6 +5079,11 @@ def background_task(): conn.close() force_close_before_reset() check_trend_pullback_plans() + _roll_cfg = app.extensions.get("strategy_roll_cfg") + if _roll_cfg: + from strategy_roll_monitor_lib import check_roll_monitors + + check_roll_monitors(_roll_cfg) check_order_monitors() except: pass diff --git a/crypto_monitor_okx/app.py b/crypto_monitor_okx/app.py index 7ebfbc7..e5b9997 100644 --- a/crypto_monitor_okx/app.py +++ b/crypto_monitor_okx/app.py @@ -5015,6 +5015,11 @@ def background_task(): conn.close() force_close_before_reset() check_fib_key_monitors() + _roll_cfg = app.extensions.get("strategy_roll_cfg") + if _roll_cfg: + from strategy_roll_monitor_lib import check_roll_monitors + + check_roll_monitors(_roll_cfg) check_key_monitors() check_order_monitors() cfg = app.extensions.get("strategy_trend_cfg") diff --git a/hub_bridge.py b/hub_bridge.py index 356b28b..a7fbcc6 100644 --- a/hub_bridge.py +++ b/hub_bridge.py @@ -171,7 +171,9 @@ def register_hub_routes(app): rolls = [] try: for row in conn.execute( - "SELECT * FROM roll_groups WHERE status='active' ORDER BY id DESC" + """SELECT g.* FROM roll_groups g + INNER JOIN order_monitors m ON m.id = g.order_monitor_id AND m.status='active' + WHERE g.status='active' ORDER BY g.id DESC""" ).fetchall(): rolls.append(_row_to_dict(row)) except Exception: diff --git a/strategy_config.py b/strategy_config.py index 79823e5..1c67a7d 100644 --- a/strategy_config.py +++ b/strategy_config.py @@ -136,6 +136,60 @@ def build_strategy_config( except TypeError: return fn(err) + def limit_order_status(ex_sym, order_id): + fn = getattr(m, "fib_limit_order_status", None) + if callable(fn): + return fn(ex_sym, order_id) + return "unknown" + + def cancel_limit_order(ex_sym, order_id): + fn = getattr(m, "cancel_fib_limit_order", None) + if callable(fn): + try: + return fn(ex_sym, order_id) + except Exception: + pass + if not order_id: + return False + try: + m.exchange.cancel_order(str(order_id), ex_sym) + return True + except Exception: + return False + + def get_mark_price(symbol): + fn = getattr(m, "get_symbol_mark_price", None) or getattr(m, "get_price", None) + if not callable(fn): + return None + try: + return fn(symbol) + except Exception: + return None + + def wechat_account_label(): + fn = getattr(m, "_wechat_account_label", None) + if callable(fn): + try: + return fn() + except Exception: + pass + return getattr(m, "EXCHANGE_DISPLAY_NAME", "") or "" + + def wechat_direction_text(direction): + fn = getattr(m, "_wechat_direction_text", None) + if callable(fn): + try: + return fn(direction) + except Exception: + pass + d = (direction or "long").strip().lower() + return "做多" if d == "long" else "做空" + + def send_wechat(content): + fn = getattr(m, "send_wechat_msg", None) + if callable(fn): + fn(content) + note = trend_disabled_note or ( "趋势回调(自动补仓)请在 Gate 趋势机器人实例使用:/strategy/trend" ) @@ -163,4 +217,11 @@ def build_strategy_config( "resolve_fill_price": m.resolve_order_entry_price, "price_fmt": m.format_price_for_symbol, "count_active_trend_plans": count_trends if trend_enabled else count_trends, + "limit_order_status": limit_order_status, + "cancel_limit_order": cancel_limit_order, + "get_mark_price": get_mark_price, + "send_wechat": send_wechat, + "format_price": getattr(m, "format_price_for_symbol", None), + "wechat_account_label": wechat_account_label, + "wechat_direction_text": wechat_direction_text, } diff --git a/strategy_register.py b/strategy_register.py index 3e17ef7..345886b 100644 --- a/strategy_register.py +++ b/strategy_register.py @@ -20,6 +20,7 @@ def install_strategy_trading(app: Flask, repo_root: str, app_module: Any = None, attach_strategy_templates(app, repo_root) cfg = build_strategy_config(app_module, **build_kw) register_strategy_trading(app, cfg) + app.extensions["strategy_roll_cfg"] = cfg def attach_strategy_templates(app: Flask, repo_root: str) -> None: diff --git a/strategy_roll_monitor_lib.py b/strategy_roll_monitor_lib.py new file mode 100644 index 0000000..5000d00 --- /dev/null +++ b/strategy_roll_monitor_lib.py @@ -0,0 +1,286 @@ +"""滚仓挂单监控:斐波限价止盈侧突破撤单、成交同步、活跃组结案(各所共用)。""" +from __future__ import annotations + +from typing import Any, Optional + +from fib_key_monitor_lib import fib_invalidate_by_mark +from strategy_db import init_strategy_tables + +ROLL_LEG_STATUS_LABELS = { + "pending": "挂单中", + "filled": "已成交", + "cancelled": "已撤销", + "invalidated": "止盈侧突破失效", +} + + +def roll_leg_status_label(status: Optional[str]) -> str: + s = (status or "").strip().lower() + return ROLL_LEG_STATUS_LABELS.get(s, status or "—") + + +def check_roll_monitors(cfg: dict[str, Any]) -> None: + get_db = cfg["get_db"] + conn = get_db() + try: + init_strategy_tables(conn) + _reconcile_roll_groups(conn, cfg) + _check_pending_roll_legs(conn, cfg) + conn.commit() + except Exception: + try: + conn.rollback() + except Exception: + pass + finally: + try: + conn.close() + except Exception: + pass + + +def _row_dict(row) -> dict: + if row is None: + return {} + try: + return dict(row) + except Exception: + return {} + + +def _now(cfg: dict) -> str: + fn = cfg.get("app_now_str") + return fn() if callable(fn) else "" + + +def _close_roll_group( + conn, + cfg: dict, + group: dict, + *, + cancel_pending: bool = True, +) -> None: + gid = int(group["id"]) + if cancel_pending: + for leg in conn.execute( + "SELECT * FROM roll_legs WHERE roll_group_id=? AND status='pending'", + (gid,), + ).fetchall(): + ld = _row_dict(leg) + _cancel_roll_leg_order(cfg, group, ld) + conn.execute( + "UPDATE roll_legs SET status='cancelled' WHERE id=? AND status='pending'", + (ld["id"],), + ) + conn.execute( + "UPDATE roll_groups SET status='closed', updated_at=? WHERE id=? AND status='active'", + (_now(cfg), gid), + ) + + +def _reconcile_roll_groups(conn, cfg: dict) -> None: + rows = conn.execute( + """SELECT g.*, m.status AS monitor_status + FROM roll_groups g + LEFT JOIN order_monitors m ON m.id = g.order_monitor_id + WHERE g.status='active'""" + ).fetchall() + for row in rows: + g = _row_dict(row) + symbol = g.get("symbol") or "" + direction = (g.get("direction") or "long").strip().lower() + ex_sym = g.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol) + mon_ok = (row["monitor_status"] or "").strip().lower() == "active" + pos = cfg["get_position"](ex_sym, direction) + qty = float(pos.get("contracts") or 0) + if not mon_ok or qty <= 0: + _close_roll_group(conn, cfg, g, cancel_pending=True) + + +def _cancel_roll_leg_order(cfg: dict, group: dict, leg: dict) -> None: + oid = (leg.get("exchange_order_id") or "").strip() + if not oid: + return + symbol = group.get("symbol") or "" + ex_sym = group.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol) + cancel = cfg.get("cancel_limit_order") + if callable(cancel): + try: + cancel(ex_sym, oid) + except Exception: + pass + + +def _check_pending_roll_legs(conn, cfg: dict) -> None: + rows = conn.execute( + """SELECT l.*, g.symbol, g.exchange_symbol, g.direction, g.initial_take_profit, + g.order_monitor_id + FROM roll_legs l + INNER JOIN roll_groups g ON g.id = l.roll_group_id AND g.status='active' + WHERE l.status='pending'""" + ).fetchall() + for row in rows: + leg = _row_dict(row) + group = { + "id": leg["roll_group_id"], + "symbol": leg["symbol"], + "exchange_symbol": leg["exchange_symbol"], + "direction": leg["direction"], + "initial_take_profit": leg["initial_take_profit"], + "order_monitor_id": leg["order_monitor_id"], + } + _process_pending_roll_leg(conn, cfg, group, leg) + + +def _process_pending_roll_leg(conn, cfg: dict, group: dict, leg: dict) -> None: + symbol = group.get("symbol") or "" + direction = (group.get("direction") or "long").strip().lower() + ex_sym = group.get("exchange_symbol") or cfg["normalize_exchange_symbol"](symbol) + oid = (leg.get("exchange_order_id") or "").strip() + mark_fn = cfg.get("get_mark_price") or cfg.get("get_price") + mark = mark_fn(symbol) if callable(mark_fn) else None + if mark is None: + return + + order_status_fn = cfg.get("limit_order_status") + order_st = order_status_fn(ex_sym, oid) if callable(order_status_fn) and oid else "missing" + + fib_u, fib_l = leg.get("fib_upper"), leg.get("fib_lower") + has_fib = fib_u is not None and fib_l is not None + + if order_st == "filled": + _finalize_roll_leg_fill(conn, cfg, group, leg, ex_sym, direction, float(mark)) + return + + if has_fib and fib_invalidate_by_mark(direction, mark, fib_u, fib_l): + if order_st == "open": + _cancel_roll_leg_order(cfg, group, leg) + _invalidate_roll_leg(conn, cfg, group, leg, float(mark)) + return + + if order_st in ("canceled", "missing", "unknown") and has_fib: + if fib_invalidate_by_mark(direction, mark, fib_u, fib_l): + _invalidate_roll_leg(conn, cfg, group, leg, float(mark)) + + +def _invalidate_roll_leg( + conn, cfg: dict, group: dict, leg: dict, mark: float +) -> None: + leg_id = int(leg["id"]) + gid = int(group["id"]) + cur = conn.execute( + "SELECT status FROM roll_legs WHERE id=?", (leg_id,) + ).fetchone() + if not cur or (cur[0] or "").strip().lower() == "invalidated": + return + conn.execute( + "UPDATE roll_legs SET status='invalidated' WHERE id=? AND status='pending'", + (leg_id,), + ) + conn.execute( + """UPDATE roll_groups SET leg_count = CASE WHEN leg_count > 0 THEN leg_count - 1 ELSE 0 END, + updated_at=? WHERE id=?""", + (_now(cfg), gid), + ) + _send_roll_invalidate_wechat(cfg, group, leg, mark) + + +def _finalize_roll_leg_fill( + conn, + cfg: dict, + group: dict, + leg: dict, + ex_sym: str, + direction: str, + mark: float, +) -> None: + leg_id = int(leg["id"]) + gid = int(group["id"]) + new_sl = float(leg.get("new_stop_loss") or 0) + tp0 = float(group.get("initial_take_profit") or 0) + fill_px = float(leg.get("limit_price") or mark) + conn.execute( + "UPDATE roll_legs SET status='filled', fill_price=? WHERE id=? AND status='pending'", + (fill_px, leg_id), + ) + if new_sl > 0: + conn.execute( + "UPDATE roll_groups SET current_stop_loss=?, updated_at=? WHERE id=?", + (new_sl, _now(cfg), gid), + ) + mon_id = group.get("order_monitor_id") + if mon_id and new_sl > 0: + conn.execute( + "UPDATE order_monitors SET stop_loss=? WHERE id=? AND status='active'", + (new_sl, mon_id), + ) + replace = cfg.get("replace_tpsl") + if callable(replace) and new_sl > 0 and tp0 > 0: + mon = None + if mon_id: + row = conn.execute( + "SELECT * FROM order_monitors WHERE id=?", (mon_id,) + ).fetchone() + mon = _row_dict(row) if row else None + try: + replace(ex_sym, direction, new_sl, tp0, mon) + except Exception: + pass + notify = cfg.get("send_wechat") + if callable(notify): + sym = group.get("symbol") or "" + mode = leg.get("add_mode") or "限价" + fmt = cfg.get("format_price") + px_txt = fmt(sym, fill_px) if callable(fmt) else str(fill_px) + sl_txt = fmt(sym, new_sl) if callable(fmt) else str(new_sl) + acct = _wechat_account(cfg) + dir_txt = _wechat_dir(cfg, direction) + notify( + f"# ✅ {sym} 滚仓限价已成交\n" + f"**账户:{acct}**\n" + f"- 方式:{mode}|{dir_txt}\n" + f"- 成交价:{px_txt}|新止损:{sl_txt}\n" + f"- 交易所止损已尝试同步(止盈仍为首仓)\n" + ) + + +def _send_roll_invalidate_wechat( + cfg: dict, group: dict, leg: dict, mark: float +) -> None: + notify = cfg.get("send_wechat") + if not callable(notify): + return + sym = group.get("symbol") or "" + direction = (group.get("direction") or "long").strip().lower() + mode = leg.get("add_mode") or "斐波限价" + fmt = cfg.get("format_price") + mark_txt = fmt(sym, mark) if callable(fmt) else str(mark) + acct = _wechat_account(cfg) + dir_txt = _wechat_dir(cfg, direction) + notify( + f"# ⚠️ {sym} 滚仓斐波挂单失效\n" + f"**账户:{acct}**\n" + f"- 方式:{mode}|{dir_txt}\n" + f"- 标记价 {mark_txt} 已触达止盈侧(未成交),已撤限价加仓单\n" + f"- 本条滚仓腿已结案,可继续下一档或重新挂单\n" + ) + + +def _wechat_account(cfg: dict) -> str: + fn = cfg.get("wechat_account_label") + if callable(fn): + try: + return str(fn()) + except Exception: + pass + return str(cfg.get("exchange_display") or "") + + +def _wechat_dir(cfg: dict, direction: str) -> str: + fn = cfg.get("wechat_direction_text") + if callable(fn): + try: + return str(fn(direction)) + except Exception: + pass + return "做多" if (direction or "long").strip().lower() == "long" else "做空" diff --git a/strategy_templates/strategy_roll.html b/strategy_templates/strategy_roll.html index 46b2cd0..739d375 100644 --- a/strategy_templates/strategy_roll.html +++ b/strategy_templates/strategy_roll.html @@ -94,7 +94,7 @@ {{ leg.add_mode }} {{ leg.amount }} {{ leg.new_stop_loss }} - {{ leg.status }} + {{ leg.status_label or leg.status }} {% else %} 暂无 diff --git a/strategy_templates/strategy_roll_panel.html b/strategy_templates/strategy_roll_panel.html index e9ea286..aab9042 100644 --- a/strategy_templates/strategy_roll_panel.html +++ b/strategy_templates/strategy_roll_panel.html @@ -60,7 +60,7 @@ {{ leg.add_mode }} {{ leg.amount }} {{ leg.new_stop_loss }} - {{ leg.status }} + {{ leg.status_label or leg.status }} {% else %} 暂无 diff --git a/strategy_ui.py b/strategy_ui.py index 487eb08..e46f072 100644 --- a/strategy_ui.py +++ b/strategy_ui.py @@ -4,6 +4,7 @@ from __future__ import annotations from typing import Any, Callable, Optional from strategy_db import init_strategy_tables +from strategy_roll_monitor_lib import roll_leg_status_label def _row_to_dict(row) -> dict: @@ -42,14 +43,24 @@ def fetch_roll_page_data( monitors.append(_row_to_dict(row)) roll_groups = [] for row in conn.execute( - "SELECT * FROM roll_groups WHERE status='active' ORDER BY id DESC" + """SELECT g.* FROM roll_groups g + INNER JOIN order_monitors m ON m.id = g.order_monitor_id AND m.status='active' + WHERE g.status='active' + ORDER BY g.id DESC""" ).fetchall(): roll_groups.append(_row_to_dict(row)) + active_gids = {int(g["id"]) for g in roll_groups if g.get("id") is not None} roll_legs = [] for row in conn.execute( - "SELECT * FROM roll_legs ORDER BY id DESC LIMIT 50" + "SELECT * FROM roll_legs ORDER BY id DESC LIMIT 80" ).fetchall(): - roll_legs.append(_row_to_dict(row)) + leg = _row_to_dict(row) + gid = leg.get("roll_group_id") + if gid is not None and int(gid) not in active_gids: + continue + leg["status_label"] = roll_leg_status_label(leg.get("status")) + roll_legs.append(leg) + roll_legs = roll_legs[:50] return { "roll_monitors": monitors, "roll_groups": roll_groups,