#!/usr/bin/env python3 """补打 binance / okx 触价开仓补丁。""" from pathlib import Path ROOT = Path(__file__).resolve().parents[1] GATE = ROOT / "crypto_monitor_gate" / "app.py" def extract_block(): text = GATE.read_text(encoding="utf-8") i = text.index("def _trigger_entry_exists_for_symbol") j = text.index("def check_fib_key_monitors():") return text[i:j] ADD_BRANCH = GATE.read_text(encoding="utf-8").split( " if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):" )[1].split(" if is_false_breakout_key_monitor_type(mt):")[0] TE_BRANCH = """ elif is_te: direction = (r["direction"] or "long").lower() entry = _sqlite_row_val(r, "fib_entry_price") tp_v = _sqlite_row_val(r, "fib_take_profit") entry_txt = format_price_for_symbol(r["symbol"], entry) if entry else "-" tp_txt = format_price_for_symbol(r["symbol"], tp_v) if tp_v else "-" tp_inv = trigger_entry_invalidate_by_tp(direction, price, float(tp_v)) if tp_v else False prev = trigger_entry_gate_preview( entry_display=entry_txt, take_profit_display=tp_txt, created_at=_sqlite_row_val(r, "created_at"), now=app_now(), tp_invalidated=tp_inv, hours=TRIGGER_ENTRY_VALIDITY_HOURS, ) gate_summary = prev.get("summary") or "-" gate_metrics = prev.get("metrics") or "" fib_gate_ok = bool(prev.get("gate_ok")) """ TRIG_IMPORT = """from trigger_entry_key_monitor_lib import ( TRIGGER_ENTRY_CLOSE_EXCHANGE_FAILED, TRIGGER_ENTRY_CLOSE_EXPIRED, TRIGGER_ENTRY_CLOSE_FILLED, TRIGGER_ENTRY_CLOSE_TP_INVALIDATE, TRIGGER_ENTRY_MONITOR_TYPE, TRIGGER_ENTRY_VALIDITY_HOURS, check_trigger_entry_intent_limit, count_pending_trigger_entries, is_trigger_entry_expired, is_trigger_entry_key_monitor_type, trigger_entry_expires_at_text, trigger_entry_gate_preview, trigger_entry_invalidate_by_tp, trigger_entry_reached, validate_trigger_entry_geometry, validate_trigger_entry_rr, ) """ def patch(path: Path, block: str) -> None: text = path.read_text(encoding="utf-8") if "trigger_entry_key_monitor_lib" not in text: text = text.replace("from position_sizing_lib import (", TRIG_IMPORT + "from position_sizing_lib import (", 1) if "OPEN_SOURCE_KEY_TRIGGER" not in text: text = text.replace( " OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_MANUAL,", " OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_KEY_TRIGGER,\n OPEN_SOURCE_MANUAL,", 1, ) reps = [ (' "关键位假突破",\n) + STRATEGY_ENTRY_REASON_OPTIONS', ' "关键位假突破",\n "关键位触价开仓",\n) + STRATEGY_ENTRY_REASON_OPTIONS'), (' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n)', ' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n ("key_trigger", "关键位触价开仓", {"segment": "key_trigger"}),\n)'), (' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n ):', ' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n "ALTER TABLE key_monitors ADD COLUMN session_date TEXT",\n ):'), (' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n return False', ' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n if segment_key == "key_trigger":\n return kst == TRIGGER_ENTRY_MONITOR_TYPE\n return False'), (' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n }', ' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n "key_trigger": TRIGGER_ENTRY_MONITOR_TYPE,\n }'), (" check_fib_key_monitors()\n _roll_cfg", " check_fib_key_monitors()\n check_trigger_entry_key_monitors()\n _roll_cfg"), (' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_limit_order_id,created_at FROM key_monitors"', ' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_stop_loss,fib_take_profit,fib_limit_order_id,created_at FROM key_monitors"'), (' is_fb = is_false_breakout_key_monitor_type(r["monitor_type"])\n if is_fib or is_fb:\n price = get_symbol_mark_price(r["symbol"])', ' is_fb = is_false_breakout_key_monitor_type(r["monitor_type"])\n is_te = is_trigger_entry_key_monitor_type(r["monitor_type"])\n if is_fib or is_fb or is_te:\n price = get_symbol_mark_price(r["symbol"])'), ] for a, b in reps: if a not in text: raise SystemExit(f"{path.name}: missing\n{a[:70]}") text = text.replace(a, b, 1) if " + (TRIGGER_ENTRY_MONITOR_TYPE,)" not in text: text = text.replace( " + (FALSE_BREAKOUT_MONITOR_TYPE,)\n )", " + (FALSE_BREAKOUT_MONITOR_TYPE,)\n + (TRIGGER_ENTRY_MONITOR_TYPE,)\n )", 1, ) te_anchor = ' fb_gate_ok = bool(prev.get("gate_ok"))\n elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:' if te_anchor not in text: raise SystemExit(f"{path.name}: te anchor") text = text.replace(te_anchor, ' fb_gate_ok = bool(prev.get("gate_ok"))\n' + TE_BRANCH + ' elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:', 1) add_anchor = " if tc_en and not tc_h:\n tc_en = 0\n if is_false_breakout_key_monitor_type(mt):" if add_anchor not in text: raise SystemExit(f"{path.name}: add anchor") text = text.replace( add_anchor, " if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):" + ADD_BRANCH + " if is_false_breakout_key_monitor_type(mt):", 1, ) func_anchor = " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\ndef check_fib_key_monitors():" if func_anchor not in text: raise SystemExit(f"{path.name}: func anchor") text = text.replace(func_anchor, " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\n" + block, 1) path.write_text(text, encoding="utf-8") print("patched", path.relative_to(ROOT)) def main(): block = extract_block() patch(ROOT / "crypto_monitor_binance" / "app.py", block) patch(ROOT / "crypto_monitor_okx" / "app.py", block) if __name__ == "__main__": main()