edf4bb835d
Co-authored-by: Cursor <cursoragent@cursor.com>
114 lines
6.6 KiB
Python
114 lines
6.6 KiB
Python
#!/usr/bin/env python3
|
|
"""补打 binance / okx 触价开仓补丁。"""
|
|
from pathlib import Path
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
GATE = ROOT / "crypto_monitor_gate" / "app.py"
|
|
|
|
def extract_block():
|
|
text = GATE.read_text(encoding="utf-8")
|
|
i = text.index("def _trigger_entry_exists_for_symbol")
|
|
j = text.index("def check_fib_key_monitors():")
|
|
return text[i:j]
|
|
|
|
ADD_BRANCH = GATE.read_text(encoding="utf-8").split(
|
|
" if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):"
|
|
)[1].split(" if is_false_breakout_key_monitor_type(mt):")[0]
|
|
|
|
TE_BRANCH = """ elif is_te:
|
|
direction = (r["direction"] or "long").lower()
|
|
entry = _sqlite_row_val(r, "fib_entry_price")
|
|
tp_v = _sqlite_row_val(r, "fib_take_profit")
|
|
entry_txt = format_price_for_symbol(r["symbol"], entry) if entry else "-"
|
|
tp_txt = format_price_for_symbol(r["symbol"], tp_v) if tp_v else "-"
|
|
tp_inv = trigger_entry_invalidate_by_tp(direction, price, float(tp_v)) if tp_v else False
|
|
prev = trigger_entry_gate_preview(
|
|
entry_display=entry_txt,
|
|
take_profit_display=tp_txt,
|
|
created_at=_sqlite_row_val(r, "created_at"),
|
|
now=app_now(),
|
|
tp_invalidated=tp_inv,
|
|
hours=TRIGGER_ENTRY_VALIDITY_HOURS,
|
|
)
|
|
gate_summary = prev.get("summary") or "-"
|
|
gate_metrics = prev.get("metrics") or ""
|
|
fib_gate_ok = bool(prev.get("gate_ok"))
|
|
"""
|
|
|
|
TRIG_IMPORT = """from trigger_entry_key_monitor_lib import (
|
|
TRIGGER_ENTRY_CLOSE_EXCHANGE_FAILED,
|
|
TRIGGER_ENTRY_CLOSE_EXPIRED,
|
|
TRIGGER_ENTRY_CLOSE_FILLED,
|
|
TRIGGER_ENTRY_CLOSE_TP_INVALIDATE,
|
|
TRIGGER_ENTRY_MONITOR_TYPE,
|
|
TRIGGER_ENTRY_VALIDITY_HOURS,
|
|
check_trigger_entry_intent_limit,
|
|
count_pending_trigger_entries,
|
|
is_trigger_entry_expired,
|
|
is_trigger_entry_key_monitor_type,
|
|
trigger_entry_expires_at_text,
|
|
trigger_entry_gate_preview,
|
|
trigger_entry_invalidate_by_tp,
|
|
trigger_entry_reached,
|
|
validate_trigger_entry_geometry,
|
|
validate_trigger_entry_rr,
|
|
)
|
|
"""
|
|
|
|
def patch(path: Path, block: str) -> None:
|
|
text = path.read_text(encoding="utf-8")
|
|
if "trigger_entry_key_monitor_lib" not in text:
|
|
text = text.replace("from position_sizing_lib import (", TRIG_IMPORT + "from position_sizing_lib import (", 1)
|
|
if "OPEN_SOURCE_KEY_TRIGGER" not in text:
|
|
text = text.replace(
|
|
" OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_MANUAL,",
|
|
" OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_KEY_TRIGGER,\n OPEN_SOURCE_MANUAL,",
|
|
1,
|
|
)
|
|
reps = [
|
|
(' "关键位假突破",\n) + STRATEGY_ENTRY_REASON_OPTIONS', ' "关键位假突破",\n "关键位触价开仓",\n) + STRATEGY_ENTRY_REASON_OPTIONS'),
|
|
(' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n)', ' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n ("key_trigger", "关键位触价开仓", {"segment": "key_trigger"}),\n)'),
|
|
(' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n ):', ' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n "ALTER TABLE key_monitors ADD COLUMN session_date TEXT",\n ):'),
|
|
(' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n return False', ' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n if segment_key == "key_trigger":\n return kst == TRIGGER_ENTRY_MONITOR_TYPE\n return False'),
|
|
(' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n }', ' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n "key_trigger": TRIGGER_ENTRY_MONITOR_TYPE,\n }'),
|
|
(" check_fib_key_monitors()\n _roll_cfg", " check_fib_key_monitors()\n check_trigger_entry_key_monitors()\n _roll_cfg"),
|
|
(' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_limit_order_id,created_at FROM key_monitors"', ' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_stop_loss,fib_take_profit,fib_limit_order_id,created_at FROM key_monitors"'),
|
|
(' is_fb = is_false_breakout_key_monitor_type(r["monitor_type"])\n if is_fib or is_fb:\n price = get_symbol_mark_price(r["symbol"])', ' is_fb = is_false_breakout_key_monitor_type(r["monitor_type"])\n is_te = is_trigger_entry_key_monitor_type(r["monitor_type"])\n if is_fib or is_fb or is_te:\n price = get_symbol_mark_price(r["symbol"])'),
|
|
]
|
|
for a, b in reps:
|
|
if a not in text:
|
|
raise SystemExit(f"{path.name}: missing\n{a[:70]}")
|
|
text = text.replace(a, b, 1)
|
|
if " + (TRIGGER_ENTRY_MONITOR_TYPE,)" not in text:
|
|
text = text.replace(
|
|
" + (FALSE_BREAKOUT_MONITOR_TYPE,)\n )",
|
|
" + (FALSE_BREAKOUT_MONITOR_TYPE,)\n + (TRIGGER_ENTRY_MONITOR_TYPE,)\n )",
|
|
1,
|
|
)
|
|
te_anchor = ' fb_gate_ok = bool(prev.get("gate_ok"))\n elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:'
|
|
if te_anchor not in text:
|
|
raise SystemExit(f"{path.name}: te anchor")
|
|
text = text.replace(te_anchor, ' fb_gate_ok = bool(prev.get("gate_ok"))\n' + TE_BRANCH + ' elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:', 1)
|
|
add_anchor = " if tc_en and not tc_h:\n tc_en = 0\n if is_false_breakout_key_monitor_type(mt):"
|
|
if add_anchor not in text:
|
|
raise SystemExit(f"{path.name}: add anchor")
|
|
text = text.replace(
|
|
add_anchor,
|
|
" if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):" + ADD_BRANCH + " if is_false_breakout_key_monitor_type(mt):",
|
|
1,
|
|
)
|
|
func_anchor = " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\ndef check_fib_key_monitors():"
|
|
if func_anchor not in text:
|
|
raise SystemExit(f"{path.name}: func anchor")
|
|
text = text.replace(func_anchor, " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\n" + block, 1)
|
|
path.write_text(text, encoding="utf-8")
|
|
print("patched", path.relative_to(ROOT))
|
|
|
|
def main():
|
|
block = extract_block()
|
|
patch(ROOT / "crypto_monitor_binance" / "app.py", block)
|
|
patch(ROOT / "crypto_monitor_okx" / "app.py", block)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|