From 42c06c0f3831e964b11c4b7f79da3869b7159131 Mon Sep 17 00:00:00 2001 From: dekun Date: Sun, 14 Jun 2026 01:11:55 +0800 Subject: [PATCH] chore(scripts): remove one-off trigger entry patch scripts These temporary patch helpers are no longer needed after the feature landed in the main apps. Co-authored-by: Cursor --- scripts/patch_trigger_entry_binance_okx.py | 113 ----------- scripts/patch_trigger_entry_finish.py | 111 ----------- scripts/patch_trigger_entry_to_exchanges.py | 209 -------------------- 3 files changed, 433 deletions(-) delete mode 100644 scripts/patch_trigger_entry_binance_okx.py delete mode 100644 scripts/patch_trigger_entry_finish.py delete mode 100644 scripts/patch_trigger_entry_to_exchanges.py diff --git a/scripts/patch_trigger_entry_binance_okx.py b/scripts/patch_trigger_entry_binance_okx.py deleted file mode 100644 index 1fb548b..0000000 --- a/scripts/patch_trigger_entry_binance_okx.py +++ /dev/null @@ -1,113 +0,0 @@ -#!/usr/bin/env python3 -"""补打 binance / okx 触价开仓补丁。""" -from pathlib import Path - -ROOT = Path(__file__).resolve().parents[1] -GATE = ROOT / "crypto_monitor_gate" / "app.py" - -def extract_block(): - text = GATE.read_text(encoding="utf-8") - i = text.index("def _trigger_entry_exists_for_symbol") - j = text.index("def check_fib_key_monitors():") - return text[i:j] - -ADD_BRANCH = GATE.read_text(encoding="utf-8").split( - " if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):" -)[1].split(" if is_false_breakout_key_monitor_type(mt):")[0] - -TE_BRANCH = """ elif is_te: - direction = (r["direction"] or "long").lower() - entry = _sqlite_row_val(r, "fib_entry_price") - tp_v = _sqlite_row_val(r, "fib_take_profit") - entry_txt = format_price_for_symbol(r["symbol"], entry) if entry else "-" - tp_txt = format_price_for_symbol(r["symbol"], tp_v) if tp_v else "-" - tp_inv = trigger_entry_invalidate_by_tp(direction, price, float(tp_v)) if tp_v else False - prev = trigger_entry_gate_preview( - entry_display=entry_txt, - take_profit_display=tp_txt, - created_at=_sqlite_row_val(r, "created_at"), - now=app_now(), - tp_invalidated=tp_inv, - hours=TRIGGER_ENTRY_VALIDITY_HOURS, - ) - gate_summary = prev.get("summary") or "-" - gate_metrics = prev.get("metrics") or "" - fib_gate_ok = bool(prev.get("gate_ok")) -""" - -TRIG_IMPORT = """from trigger_entry_key_monitor_lib import ( - TRIGGER_ENTRY_CLOSE_EXCHANGE_FAILED, - TRIGGER_ENTRY_CLOSE_EXPIRED, - TRIGGER_ENTRY_CLOSE_FILLED, - TRIGGER_ENTRY_CLOSE_TP_INVALIDATE, - TRIGGER_ENTRY_MONITOR_TYPE, - TRIGGER_ENTRY_VALIDITY_HOURS, - check_trigger_entry_intent_limit, - count_pending_trigger_entries, - is_trigger_entry_expired, - is_trigger_entry_key_monitor_type, - trigger_entry_expires_at_text, - trigger_entry_gate_preview, - trigger_entry_invalidate_by_tp, - trigger_entry_reached, - validate_trigger_entry_geometry, - validate_trigger_entry_rr, -) -""" - -def patch(path: Path, block: str) -> None: - text = path.read_text(encoding="utf-8") - if "trigger_entry_key_monitor_lib" not in text: - text = text.replace("from position_sizing_lib import (", TRIG_IMPORT + "from position_sizing_lib import (", 1) - if "OPEN_SOURCE_KEY_TRIGGER" not in text: - text = text.replace( - " OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_MANUAL,", - " OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_KEY_TRIGGER,\n OPEN_SOURCE_MANUAL,", - 1, - ) - reps = [ - (' "关键位假突破",\n) + STRATEGY_ENTRY_REASON_OPTIONS', ' "关键位假突破",\n "关键位触价开仓",\n) + STRATEGY_ENTRY_REASON_OPTIONS'), - (' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n)', ' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n ("key_trigger", "关键位触价开仓", {"segment": "key_trigger"}),\n)'), - (' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n ):', ' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n "ALTER TABLE key_monitors ADD COLUMN session_date TEXT",\n ):'), - (' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n return False', ' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n if segment_key == "key_trigger":\n return kst == TRIGGER_ENTRY_MONITOR_TYPE\n return False'), - (' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n }', ' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n "key_trigger": TRIGGER_ENTRY_MONITOR_TYPE,\n }'), - (" check_fib_key_monitors()\n _roll_cfg", " check_fib_key_monitors()\n check_trigger_entry_key_monitors()\n _roll_cfg"), - (' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_limit_order_id,created_at FROM key_monitors"', ' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_stop_loss,fib_take_profit,fib_limit_order_id,created_at FROM key_monitors"'), - (' is_fb = is_false_breakout_key_monitor_type(r["monitor_type"])\n if is_fib or is_fb:\n price = get_symbol_mark_price(r["symbol"])', ' is_fb = is_false_breakout_key_monitor_type(r["monitor_type"])\n is_te = is_trigger_entry_key_monitor_type(r["monitor_type"])\n if is_fib or is_fb or is_te:\n price = get_symbol_mark_price(r["symbol"])'), - ] - for a, b in reps: - if a not in text: - raise SystemExit(f"{path.name}: missing\n{a[:70]}") - text = text.replace(a, b, 1) - if " + (TRIGGER_ENTRY_MONITOR_TYPE,)" not in text: - text = text.replace( - " + (FALSE_BREAKOUT_MONITOR_TYPE,)\n )", - " + (FALSE_BREAKOUT_MONITOR_TYPE,)\n + (TRIGGER_ENTRY_MONITOR_TYPE,)\n )", - 1, - ) - te_anchor = ' fb_gate_ok = bool(prev.get("gate_ok"))\n elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:' - if te_anchor not in text: - raise SystemExit(f"{path.name}: te anchor") - text = text.replace(te_anchor, ' fb_gate_ok = bool(prev.get("gate_ok"))\n' + TE_BRANCH + ' elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:', 1) - add_anchor = " if tc_en and not tc_h:\n tc_en = 0\n if is_false_breakout_key_monitor_type(mt):" - if add_anchor not in text: - raise SystemExit(f"{path.name}: add anchor") - text = text.replace( - add_anchor, - " if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):" + ADD_BRANCH + " if is_false_breakout_key_monitor_type(mt):", - 1, - ) - func_anchor = " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\ndef check_fib_key_monitors():" - if func_anchor not in text: - raise SystemExit(f"{path.name}: func anchor") - text = text.replace(func_anchor, " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\n" + block, 1) - path.write_text(text, encoding="utf-8") - print("patched", path.relative_to(ROOT)) - -def main(): - block = extract_block() - patch(ROOT / "crypto_monitor_binance" / "app.py", block) - patch(ROOT / "crypto_monitor_okx" / "app.py", block) - -if __name__ == "__main__": - main() diff --git a/scripts/patch_trigger_entry_finish.py b/scripts/patch_trigger_entry_finish.py deleted file mode 100644 index 9a5153a..0000000 --- a/scripts/patch_trigger_entry_finish.py +++ /dev/null @@ -1,111 +0,0 @@ -"""补全 okx / binance 触价开仓:import、stats、background、add_key。""" -from pathlib import Path - -ROOT = Path(__file__).resolve().parents[1] -GATE = ROOT / "crypto_monitor_gate" / "app.py" - -ADD_INSERT = GATE.read_text(encoding="utf-8").split( - " if is_trigger_entry_key_monitor_type(mt):" -)[1].split(" if is_false_breakout_key_monitor_type(mt):")[0] - -TRIG_IMPORT = """from trigger_entry_key_monitor_lib import ( - TRIGGER_ENTRY_CLOSE_EXCHANGE_FAILED, - TRIGGER_ENTRY_CLOSE_EXPIRED, - TRIGGER_ENTRY_CLOSE_FILLED, - TRIGGER_ENTRY_CLOSE_TP_INVALIDATE, - TRIGGER_ENTRY_MONITOR_TYPE, - TRIGGER_ENTRY_VALIDITY_HOURS, - check_trigger_entry_intent_limit, - count_pending_trigger_entries, - is_trigger_entry_expired, - is_trigger_entry_key_monitor_type, - trigger_entry_expires_at_text, - trigger_entry_gate_preview, - trigger_entry_invalidate_by_tp, - trigger_entry_reached, - validate_trigger_entry_geometry, - validate_trigger_entry_rr, -) -""" - -def patch_okx(): - p = ROOT / "crypto_monitor_okx" / "app.py" - t = p.read_text(encoding="utf-8") - if "trigger_entry_key_monitor_lib" not in t: - t = t.replace("from position_sizing_lib import (", TRIG_IMPORT + "from position_sizing_lib import (", 1) - if "OPEN_SOURCE_KEY_TRIGGER" not in t: - t = t.replace( - " OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_MANUAL,", - " OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_KEY_TRIGGER,\n OPEN_SOURCE_MANUAL,", - 1, - ) - if '"关键位触价开仓"' not in t: - t = t.replace( - ' "关键位假突破",\n) + STRATEGY_ENTRY_REASON_OPTIONS', - ' "关键位假突破",\n "关键位触价开仓",\n) + STRATEGY_ENTRY_REASON_OPTIONS', - 1, - ) - if "key_trigger" not in t: - t = t.replace( - ' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n)', - ' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n ("key_trigger", "关键位触价开仓", {"segment": "key_trigger"}),\n)', - 1, - ) - if "key_monitors ADD COLUMN session_date" not in t: - t = t.replace( - ' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n ):', - ' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n "ALTER TABLE key_monitors ADD COLUMN session_date TEXT",\n ):', - 1, - ) - if 'segment_key == "key_trigger"' not in t: - t = t.replace( - ' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n return False', - ' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n if segment_key == "key_trigger":\n return kst == TRIGGER_ENTRY_MONITOR_TYPE\n return False', - 1, - ) - if '"key_trigger":' not in t: - t = t.replace( - ' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n }', - ' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n "key_trigger": TRIGGER_ENTRY_MONITOR_TYPE,\n }', - 1, - ) - if "check_trigger_entry_key_monitors()" not in t: - t = t.replace( - " check_fib_key_monitors()\n _roll_cfg", - " check_fib_key_monitors()\n check_trigger_entry_key_monitors()\n _roll_cfg", - 1, - ) - if " + (TRIGGER_ENTRY_MONITOR_TYPE,)" not in t: - t = t.replace( - " + (FALSE_BREAKOUT_MONITOR_TYPE,)\n )", - " + (FALSE_BREAKOUT_MONITOR_TYPE,)\n + (TRIGGER_ENTRY_MONITOR_TYPE,)\n )", - 1, - ) - anchor = " be_flag = parse_breakeven_enabled_form(d.get(\"breakeven_enabled\"))\n if is_false_breakout_key_monitor_type(mt):" - if "is_trigger_entry_key_monitor_type(mt)" not in t: - t = t.replace( - anchor, - " be_flag = parse_breakeven_enabled_form(d.get(\"breakeven_enabled\"))\n tc_en = parse_time_close_enabled_form(d.get(\"time_close_enabled\"))\n tc_h = parse_time_close_hours_form(d.get(\"time_close_hours\")) if tc_en else None\n if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):" + ADD_INSERT + " if is_false_breakout_key_monitor_type(mt):", - 1, - ) - p.write_text(t, encoding="utf-8") - print("okx done") - -def patch_binance_add_key(): - p = ROOT / "crypto_monitor_binance" / "app.py" - t = p.read_text(encoding="utf-8") - anchor = " be_flag = parse_breakeven_enabled_form(d.get(\"breakeven_enabled\"))\n if is_false_breakout_key_monitor_type(mt):" - if "is_trigger_entry_key_monitor_type(mt)" in t: - print("binance add_key skip") - return - t = t.replace( - anchor, - " be_flag = parse_breakeven_enabled_form(d.get(\"breakeven_enabled\"))\n tc_en = parse_time_close_enabled_form(d.get(\"time_close_enabled\"))\n tc_h = parse_time_close_hours_form(d.get(\"time_close_hours\")) if tc_en else None\n if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):" + ADD_INSERT + " if is_false_breakout_key_monitor_type(mt):", - 1, - ) - p.write_text(t, encoding="utf-8") - print("binance add_key done") - -if __name__ == "__main__": - patch_okx() - patch_binance_add_key() diff --git a/scripts/patch_trigger_entry_to_exchanges.py b/scripts/patch_trigger_entry_to_exchanges.py deleted file mode 100644 index 3dcd72d..0000000 --- a/scripts/patch_trigger_entry_to_exchanges.py +++ /dev/null @@ -1,209 +0,0 @@ -#!/usr/bin/env python3 -"""将触价开仓补丁同步到四所 app.py(与 crypto_monitor_gate 对齐)。""" -from __future__ import annotations - -import re -from pathlib import Path - -ROOT = Path(__file__).resolve().parents[1] -GATE = ROOT / "crypto_monitor_gate" / "app.py" -TARGETS = [ - ROOT / "crypto_monitor_gate_bot" / "app.py", - ROOT / "crypto_monitor_binance" / "app.py", - ROOT / "crypto_monitor_okx" / "app.py", -] - -# 从 gate 提取触价相关函数块 -FUNC_BLOCK_START = "def _trigger_entry_exists_for_symbol(conn, symbol):" -FUNC_BLOCK_END = "def check_fib_key_monitors():" - - -def extract_gate_block() -> str: - text = GATE.read_text(encoding="utf-8") - i = text.index(FUNC_BLOCK_START) - j = text.index(FUNC_BLOCK_END) - return text[i:j] - - -def ensure_imports(text: str) -> str: - trigger_import = """from trigger_entry_key_monitor_lib import ( - TRIGGER_ENTRY_CLOSE_EXCHANGE_FAILED, - TRIGGER_ENTRY_CLOSE_EXPIRED, - TRIGGER_ENTRY_CLOSE_FILLED, - TRIGGER_ENTRY_CLOSE_TP_INVALIDATE, - TRIGGER_ENTRY_MONITOR_TYPE, - TRIGGER_ENTRY_VALIDITY_HOURS, - check_trigger_entry_intent_limit, - count_pending_trigger_entries, - is_trigger_entry_expired, - is_trigger_entry_key_monitor_type, - trigger_entry_expires_at_text, - trigger_entry_gate_preview, - trigger_entry_invalidate_by_tp, - trigger_entry_reached, - validate_trigger_entry_geometry, - validate_trigger_entry_rr, -) -""" - if "from trigger_entry_key_monitor_lib import" not in text: - text = text.replace( - "from position_sizing_lib import (", - trigger_import + "from position_sizing_lib import (", - 1, - ) - if "OPEN_SOURCE_KEY_TRIGGER" not in text: - text = text.replace( - " OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_MANUAL,", - " OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_KEY_TRIGGER,\n OPEN_SOURCE_MANUAL,", - 1, - ) - return text - - -def patch_file(path: Path, func_block: str) -> None: - text = path.read_text(encoding="utf-8") - text = ensure_imports(text) - - replacements = [ - ( - ' "关键位假突破",\n) + STRATEGY_ENTRY_REASON_OPTIONS', - ' "关键位假突破",\n "关键位触价开仓",\n) + STRATEGY_ENTRY_REASON_OPTIONS', - ), - ( - ' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n)', - ' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n ("key_trigger", "关键位触价开仓", {"segment": "key_trigger"}),\n)', - ), - ( - ' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n ):', - ' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n "ALTER TABLE key_monitors ADD COLUMN session_date TEXT",\n ):', - ), - ( - ' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n return False', - ' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n if segment_key == "key_trigger":\n return kst == TRIGGER_ENTRY_MONITOR_TYPE\n return False', - ), - ( - ' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n }', - ' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n "key_trigger": TRIGGER_ENTRY_MONITOR_TYPE,\n }', - ), - ( - " check_fib_key_monitors()\n _roll_cfg", - " check_fib_key_monitors()\n check_trigger_entry_key_monitors()\n _roll_cfg", - ), - ( - " + (FALSE_BREAKOUT_MONITOR_TYPE,)\n )", - " + (FALSE_BREAKOUT_MONITOR_TYPE,)\n + (TRIGGER_ENTRY_MONITOR_TYPE,)\n )", - ), - ( - ' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_limit_order_id,created_at FROM key_monitors"', - ' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_stop_loss,fib_take_profit,fib_limit_order_id,created_at FROM key_monitors"', - ), - ( - " is_fb = is_false_breakout_key_monitor_type(r[\"monitor_type\"])\n if is_fib or is_fb:\n price = get_symbol_mark_price(r[\"symbol\"])", - " is_fb = is_false_breakout_key_monitor_type(r[\"monitor_type\"])\n is_te = is_trigger_entry_key_monitor_type(r[\"monitor_type\"])\n if is_fib or is_fb or is_te:\n price = get_symbol_mark_price(r[\"symbol\"])", - ), - ] - for old, new in replacements: - if old not in text: - raise SystemExit(f"[{path.name}] missing pattern:\n{old[:80]}...") - text = text.replace(old, new, 1) - - # api_price_snapshot te branch - te_branch_old = """ gate_summary = prev.get("summary") or "-" - gate_metrics = prev.get("metrics") or "" - fb_gate_ok = bool(prev.get("gate_ok")) - elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:""" - te_branch_new = """ gate_summary = prev.get("summary") or "-" - gate_metrics = prev.get("metrics") or "" - fb_gate_ok = bool(prev.get("gate_ok")) - elif is_te: - direction = (r["direction"] or "long").lower() - entry = _sqlite_row_val(r, "fib_entry_price") - tp_v = _sqlite_row_val(r, "fib_take_profit") - entry_txt = format_price_for_symbol(r["symbol"], entry) if entry else "-" - tp_txt = format_price_for_symbol(r["symbol"], tp_v) if tp_v else "-" - tp_inv = trigger_entry_invalidate_by_tp(direction, price, float(tp_v)) if tp_v else False - prev = trigger_entry_gate_preview( - entry_display=entry_txt, - take_profit_display=tp_txt, - created_at=_sqlite_row_val(r, "created_at"), - now=app_now(), - tp_invalidated=tp_inv, - hours=TRIGGER_ENTRY_VALIDITY_HOURS, - ) - gate_summary = prev.get("summary") or "-" - gate_metrics = prev.get("metrics") or "" - fib_gate_ok = bool(prev.get("gate_ok")) - elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:""" - if te_branch_old not in text: - raise SystemExit(f"[{path.name}] missing api te branch anchor") - text = text.replace(te_branch_old, te_branch_new, 1) - - # add_key trigger branch - add_key_old = """ if tc_en and not tc_h: - tc_en = 0 - if is_false_breakout_key_monitor_type(mt):""" - add_key_new = """ if tc_en and not tc_h: - tc_en = 0 - if is_trigger_entry_key_monitor_type(mt): - if direction_sel not in ("long", "short"): - conn.close() - conn = None - flash("触价开仓请选择做多或做空") - return redirect("/key_monitor") - try: - entry_px = float(d.get("trigger_entry") or 0) - sl_px = float(d.get("trigger_sl") or 0) - tp_px = float(d.get("trigger_tp") or 0) - except (TypeError, ValueError): - entry_px = sl_px = tp_px = 0 - if entry_px <= 0 or sl_px <= 0 or tp_px <= 0: - conn.close() - conn = None - flash("触价开仓须填写有效的入场价、止损价、止盈价") - return redirect("/key_monitor") - ok_te, err_te = _add_trigger_entry_key_monitor( - conn, symbol, direction_sel, entry_px, sl_px, tp_px, breakeven_enabled=be_flag, - time_close_enabled=tc_en, time_close_hours=tc_h, - ) - conn.commit() - conn.close() - conn = None - if not ok_te: - flash(err_te or "触价开仓监控添加失败") - return redirect("/key_monitor") - flash( - f"触价开仓已添加({symbol} 日成交量排名 {rank}/{total})" - f"|有效期 {TRIGGER_ENTRY_VALIDITY_HOURS}h" - f"|标记价触达入场价后下一轮询市价开仓" - f"|移动保本:{'开' if be_flag else '关'}" - + (f"|{time_close_label(tc_h)}" if tc_en else "") - ) - return redirect("/key_monitor") - if is_false_breakout_key_monitor_type(mt):""" - if add_key_old not in text: - raise SystemExit(f"[{path.name}] missing add_key anchor") - text = text.replace(add_key_old, add_key_new, 1) - - # function block - anchor = " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\ndef check_fib_key_monitors():" - if anchor not in text: - raise SystemExit(f"[{path.name}] missing func insert anchor") - text = text.replace( - anchor, - " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\n" + func_block, - 1, - ) - - path.write_text(text, encoding="utf-8") - print(f"patched {path.relative_to(ROOT)}") - - -def main() -> None: - block = extract_gate_block() - for t in TARGETS: - patch_file(t, block) - print("done") - - -if __name__ == "__main__": - main()