chore(scripts): remove one-off trigger entry patch scripts

These temporary patch helpers are no longer needed after the feature landed in the main apps.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-14 01:11:55 +08:00
parent 4573ccca9a
commit 42c06c0f38
3 changed files with 0 additions and 433 deletions
-113
View File
@@ -1,113 +0,0 @@
#!/usr/bin/env python3
"""补打 binance / okx 触价开仓补丁。"""
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
GATE = ROOT / "crypto_monitor_gate" / "app.py"
def extract_block():
text = GATE.read_text(encoding="utf-8")
i = text.index("def _trigger_entry_exists_for_symbol")
j = text.index("def check_fib_key_monitors():")
return text[i:j]
ADD_BRANCH = GATE.read_text(encoding="utf-8").split(
" if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):"
)[1].split(" if is_false_breakout_key_monitor_type(mt):")[0]
TE_BRANCH = """ elif is_te:
direction = (r["direction"] or "long").lower()
entry = _sqlite_row_val(r, "fib_entry_price")
tp_v = _sqlite_row_val(r, "fib_take_profit")
entry_txt = format_price_for_symbol(r["symbol"], entry) if entry else "-"
tp_txt = format_price_for_symbol(r["symbol"], tp_v) if tp_v else "-"
tp_inv = trigger_entry_invalidate_by_tp(direction, price, float(tp_v)) if tp_v else False
prev = trigger_entry_gate_preview(
entry_display=entry_txt,
take_profit_display=tp_txt,
created_at=_sqlite_row_val(r, "created_at"),
now=app_now(),
tp_invalidated=tp_inv,
hours=TRIGGER_ENTRY_VALIDITY_HOURS,
)
gate_summary = prev.get("summary") or "-"
gate_metrics = prev.get("metrics") or ""
fib_gate_ok = bool(prev.get("gate_ok"))
"""
TRIG_IMPORT = """from trigger_entry_key_monitor_lib import (
TRIGGER_ENTRY_CLOSE_EXCHANGE_FAILED,
TRIGGER_ENTRY_CLOSE_EXPIRED,
TRIGGER_ENTRY_CLOSE_FILLED,
TRIGGER_ENTRY_CLOSE_TP_INVALIDATE,
TRIGGER_ENTRY_MONITOR_TYPE,
TRIGGER_ENTRY_VALIDITY_HOURS,
check_trigger_entry_intent_limit,
count_pending_trigger_entries,
is_trigger_entry_expired,
is_trigger_entry_key_monitor_type,
trigger_entry_expires_at_text,
trigger_entry_gate_preview,
trigger_entry_invalidate_by_tp,
trigger_entry_reached,
validate_trigger_entry_geometry,
validate_trigger_entry_rr,
)
"""
def patch(path: Path, block: str) -> None:
text = path.read_text(encoding="utf-8")
if "trigger_entry_key_monitor_lib" not in text:
text = text.replace("from position_sizing_lib import (", TRIG_IMPORT + "from position_sizing_lib import (", 1)
if "OPEN_SOURCE_KEY_TRIGGER" not in text:
text = text.replace(
" OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_MANUAL,",
" OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_KEY_TRIGGER,\n OPEN_SOURCE_MANUAL,",
1,
)
reps = [
(' "关键位假突破",\n) + STRATEGY_ENTRY_REASON_OPTIONS', ' "关键位假突破",\n "关键位触价开仓",\n) + STRATEGY_ENTRY_REASON_OPTIONS'),
(' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n)', ' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n ("key_trigger", "关键位触价开仓", {"segment": "key_trigger"}),\n)'),
(' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n ):', ' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n "ALTER TABLE key_monitors ADD COLUMN session_date TEXT",\n ):'),
(' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n return False', ' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n if segment_key == "key_trigger":\n return kst == TRIGGER_ENTRY_MONITOR_TYPE\n return False'),
(' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n }', ' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n "key_trigger": TRIGGER_ENTRY_MONITOR_TYPE,\n }'),
(" check_fib_key_monitors()\n _roll_cfg", " check_fib_key_monitors()\n check_trigger_entry_key_monitors()\n _roll_cfg"),
(' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_limit_order_id,created_at FROM key_monitors"', ' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_stop_loss,fib_take_profit,fib_limit_order_id,created_at FROM key_monitors"'),
(' is_fb = is_false_breakout_key_monitor_type(r["monitor_type"])\n if is_fib or is_fb:\n price = get_symbol_mark_price(r["symbol"])', ' is_fb = is_false_breakout_key_monitor_type(r["monitor_type"])\n is_te = is_trigger_entry_key_monitor_type(r["monitor_type"])\n if is_fib or is_fb or is_te:\n price = get_symbol_mark_price(r["symbol"])'),
]
for a, b in reps:
if a not in text:
raise SystemExit(f"{path.name}: missing\n{a[:70]}")
text = text.replace(a, b, 1)
if " + (TRIGGER_ENTRY_MONITOR_TYPE,)" not in text:
text = text.replace(
" + (FALSE_BREAKOUT_MONITOR_TYPE,)\n )",
" + (FALSE_BREAKOUT_MONITOR_TYPE,)\n + (TRIGGER_ENTRY_MONITOR_TYPE,)\n )",
1,
)
te_anchor = ' fb_gate_ok = bool(prev.get("gate_ok"))\n elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:'
if te_anchor not in text:
raise SystemExit(f"{path.name}: te anchor")
text = text.replace(te_anchor, ' fb_gate_ok = bool(prev.get("gate_ok"))\n' + TE_BRANCH + ' elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:', 1)
add_anchor = " if tc_en and not tc_h:\n tc_en = 0\n if is_false_breakout_key_monitor_type(mt):"
if add_anchor not in text:
raise SystemExit(f"{path.name}: add anchor")
text = text.replace(
add_anchor,
" if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):" + ADD_BRANCH + " if is_false_breakout_key_monitor_type(mt):",
1,
)
func_anchor = " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\ndef check_fib_key_monitors():"
if func_anchor not in text:
raise SystemExit(f"{path.name}: func anchor")
text = text.replace(func_anchor, " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\n" + block, 1)
path.write_text(text, encoding="utf-8")
print("patched", path.relative_to(ROOT))
def main():
block = extract_block()
patch(ROOT / "crypto_monitor_binance" / "app.py", block)
patch(ROOT / "crypto_monitor_okx" / "app.py", block)
if __name__ == "__main__":
main()
-111
View File
@@ -1,111 +0,0 @@
"""补全 okx / binance 触价开仓:import、stats、background、add_key。"""
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
GATE = ROOT / "crypto_monitor_gate" / "app.py"
ADD_INSERT = GATE.read_text(encoding="utf-8").split(
" if is_trigger_entry_key_monitor_type(mt):"
)[1].split(" if is_false_breakout_key_monitor_type(mt):")[0]
TRIG_IMPORT = """from trigger_entry_key_monitor_lib import (
TRIGGER_ENTRY_CLOSE_EXCHANGE_FAILED,
TRIGGER_ENTRY_CLOSE_EXPIRED,
TRIGGER_ENTRY_CLOSE_FILLED,
TRIGGER_ENTRY_CLOSE_TP_INVALIDATE,
TRIGGER_ENTRY_MONITOR_TYPE,
TRIGGER_ENTRY_VALIDITY_HOURS,
check_trigger_entry_intent_limit,
count_pending_trigger_entries,
is_trigger_entry_expired,
is_trigger_entry_key_monitor_type,
trigger_entry_expires_at_text,
trigger_entry_gate_preview,
trigger_entry_invalidate_by_tp,
trigger_entry_reached,
validate_trigger_entry_geometry,
validate_trigger_entry_rr,
)
"""
def patch_okx():
p = ROOT / "crypto_monitor_okx" / "app.py"
t = p.read_text(encoding="utf-8")
if "trigger_entry_key_monitor_lib" not in t:
t = t.replace("from position_sizing_lib import (", TRIG_IMPORT + "from position_sizing_lib import (", 1)
if "OPEN_SOURCE_KEY_TRIGGER" not in t:
t = t.replace(
" OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_MANUAL,",
" OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_KEY_TRIGGER,\n OPEN_SOURCE_MANUAL,",
1,
)
if '"关键位触价开仓"' not in t:
t = t.replace(
' "关键位假突破",\n) + STRATEGY_ENTRY_REASON_OPTIONS',
' "关键位假突破",\n "关键位触价开仓",\n) + STRATEGY_ENTRY_REASON_OPTIONS',
1,
)
if "key_trigger" not in t:
t = t.replace(
' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n)',
' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n ("key_trigger", "关键位触价开仓", {"segment": "key_trigger"}),\n)',
1,
)
if "key_monitors ADD COLUMN session_date" not in t:
t = t.replace(
' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n ):',
' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n "ALTER TABLE key_monitors ADD COLUMN session_date TEXT",\n ):',
1,
)
if 'segment_key == "key_trigger"' not in t:
t = t.replace(
' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n return False',
' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n if segment_key == "key_trigger":\n return kst == TRIGGER_ENTRY_MONITOR_TYPE\n return False',
1,
)
if '"key_trigger":' not in t:
t = t.replace(
' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n }',
' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n "key_trigger": TRIGGER_ENTRY_MONITOR_TYPE,\n }',
1,
)
if "check_trigger_entry_key_monitors()" not in t:
t = t.replace(
" check_fib_key_monitors()\n _roll_cfg",
" check_fib_key_monitors()\n check_trigger_entry_key_monitors()\n _roll_cfg",
1,
)
if " + (TRIGGER_ENTRY_MONITOR_TYPE,)" not in t:
t = t.replace(
" + (FALSE_BREAKOUT_MONITOR_TYPE,)\n )",
" + (FALSE_BREAKOUT_MONITOR_TYPE,)\n + (TRIGGER_ENTRY_MONITOR_TYPE,)\n )",
1,
)
anchor = " be_flag = parse_breakeven_enabled_form(d.get(\"breakeven_enabled\"))\n if is_false_breakout_key_monitor_type(mt):"
if "is_trigger_entry_key_monitor_type(mt)" not in t:
t = t.replace(
anchor,
" be_flag = parse_breakeven_enabled_form(d.get(\"breakeven_enabled\"))\n tc_en = parse_time_close_enabled_form(d.get(\"time_close_enabled\"))\n tc_h = parse_time_close_hours_form(d.get(\"time_close_hours\")) if tc_en else None\n if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):" + ADD_INSERT + " if is_false_breakout_key_monitor_type(mt):",
1,
)
p.write_text(t, encoding="utf-8")
print("okx done")
def patch_binance_add_key():
p = ROOT / "crypto_monitor_binance" / "app.py"
t = p.read_text(encoding="utf-8")
anchor = " be_flag = parse_breakeven_enabled_form(d.get(\"breakeven_enabled\"))\n if is_false_breakout_key_monitor_type(mt):"
if "is_trigger_entry_key_monitor_type(mt)" in t:
print("binance add_key skip")
return
t = t.replace(
anchor,
" be_flag = parse_breakeven_enabled_form(d.get(\"breakeven_enabled\"))\n tc_en = parse_time_close_enabled_form(d.get(\"time_close_enabled\"))\n tc_h = parse_time_close_hours_form(d.get(\"time_close_hours\")) if tc_en else None\n if tc_en and not tc_h:\n tc_en = 0\n if is_trigger_entry_key_monitor_type(mt):" + ADD_INSERT + " if is_false_breakout_key_monitor_type(mt):",
1,
)
p.write_text(t, encoding="utf-8")
print("binance add_key done")
if __name__ == "__main__":
patch_okx()
patch_binance_add_key()
-209
View File
@@ -1,209 +0,0 @@
#!/usr/bin/env python3
"""将触价开仓补丁同步到四所 app.py(与 crypto_monitor_gate 对齐)。"""
from __future__ import annotations
import re
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
GATE = ROOT / "crypto_monitor_gate" / "app.py"
TARGETS = [
ROOT / "crypto_monitor_gate_bot" / "app.py",
ROOT / "crypto_monitor_binance" / "app.py",
ROOT / "crypto_monitor_okx" / "app.py",
]
# 从 gate 提取触价相关函数块
FUNC_BLOCK_START = "def _trigger_entry_exists_for_symbol(conn, symbol):"
FUNC_BLOCK_END = "def check_fib_key_monitors():"
def extract_gate_block() -> str:
text = GATE.read_text(encoding="utf-8")
i = text.index(FUNC_BLOCK_START)
j = text.index(FUNC_BLOCK_END)
return text[i:j]
def ensure_imports(text: str) -> str:
trigger_import = """from trigger_entry_key_monitor_lib import (
TRIGGER_ENTRY_CLOSE_EXCHANGE_FAILED,
TRIGGER_ENTRY_CLOSE_EXPIRED,
TRIGGER_ENTRY_CLOSE_FILLED,
TRIGGER_ENTRY_CLOSE_TP_INVALIDATE,
TRIGGER_ENTRY_MONITOR_TYPE,
TRIGGER_ENTRY_VALIDITY_HOURS,
check_trigger_entry_intent_limit,
count_pending_trigger_entries,
is_trigger_entry_expired,
is_trigger_entry_key_monitor_type,
trigger_entry_expires_at_text,
trigger_entry_gate_preview,
trigger_entry_invalidate_by_tp,
trigger_entry_reached,
validate_trigger_entry_geometry,
validate_trigger_entry_rr,
)
"""
if "from trigger_entry_key_monitor_lib import" not in text:
text = text.replace(
"from position_sizing_lib import (",
trigger_import + "from position_sizing_lib import (",
1,
)
if "OPEN_SOURCE_KEY_TRIGGER" not in text:
text = text.replace(
" OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_MANUAL,",
" OPEN_SOURCE_KEY_AUTO,\n OPEN_SOURCE_KEY_TRIGGER,\n OPEN_SOURCE_MANUAL,",
1,
)
return text
def patch_file(path: Path, func_block: str) -> None:
text = path.read_text(encoding="utf-8")
text = ensure_imports(text)
replacements = [
(
' "关键位假突破",\n) + STRATEGY_ENTRY_REASON_OPTIONS',
' "关键位假突破",\n "关键位触价开仓",\n) + STRATEGY_ENTRY_REASON_OPTIONS',
),
(
' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n)',
' ("key_false_breakout", "关键位假突破", {"segment": "key_false_breakout"}),\n ("key_trigger", "关键位触价开仓", {"segment": "key_trigger"}),\n)',
),
(
' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n ):',
' "ALTER TABLE key_monitors ADD COLUMN last_rs_bar_ts INTEGER",\n "ALTER TABLE key_monitors ADD COLUMN session_date TEXT",\n ):',
),
(
' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n return False',
' if segment_key == "key_false_breakout":\n return kst == FALSE_BREAKOUT_MONITOR_TYPE\n if segment_key == "key_trigger":\n return kst == TRIGGER_ENTRY_MONITOR_TYPE\n return False',
),
(
' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n }',
' "key_false_breakout": FALSE_BREAKOUT_MONITOR_TYPE,\n "key_trigger": TRIGGER_ENTRY_MONITOR_TYPE,\n }',
),
(
" check_fib_key_monitors()\n _roll_cfg",
" check_fib_key_monitors()\n check_trigger_entry_key_monitors()\n _roll_cfg",
),
(
" + (FALSE_BREAKOUT_MONITOR_TYPE,)\n )",
" + (FALSE_BREAKOUT_MONITOR_TYPE,)\n + (TRIGGER_ENTRY_MONITOR_TYPE,)\n )",
),
(
' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_limit_order_id,created_at FROM key_monitors"',
' "SELECT id,symbol,monitor_type,direction,upper,lower,fib_entry_price,fib_stop_loss,fib_take_profit,fib_limit_order_id,created_at FROM key_monitors"',
),
(
" is_fb = is_false_breakout_key_monitor_type(r[\"monitor_type\"])\n if is_fib or is_fb:\n price = get_symbol_mark_price(r[\"symbol\"])",
" is_fb = is_false_breakout_key_monitor_type(r[\"monitor_type\"])\n is_te = is_trigger_entry_key_monitor_type(r[\"monitor_type\"])\n if is_fib or is_fb or is_te:\n price = get_symbol_mark_price(r[\"symbol\"])",
),
]
for old, new in replacements:
if old not in text:
raise SystemExit(f"[{path.name}] missing pattern:\n{old[:80]}...")
text = text.replace(old, new, 1)
# api_price_snapshot te branch
te_branch_old = """ gate_summary = prev.get("summary") or "-"
gate_metrics = prev.get("metrics") or ""
fb_gate_ok = bool(prev.get("gate_ok"))
elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:"""
te_branch_new = """ gate_summary = prev.get("summary") or "-"
gate_metrics = prev.get("metrics") or ""
fb_gate_ok = bool(prev.get("gate_ok"))
elif is_te:
direction = (r["direction"] or "long").lower()
entry = _sqlite_row_val(r, "fib_entry_price")
tp_v = _sqlite_row_val(r, "fib_take_profit")
entry_txt = format_price_for_symbol(r["symbol"], entry) if entry else "-"
tp_txt = format_price_for_symbol(r["symbol"], tp_v) if tp_v else "-"
tp_inv = trigger_entry_invalidate_by_tp(direction, price, float(tp_v)) if tp_v else False
prev = trigger_entry_gate_preview(
entry_display=entry_txt,
take_profit_display=tp_txt,
created_at=_sqlite_row_val(r, "created_at"),
now=app_now(),
tp_invalidated=tp_inv,
hours=TRIGGER_ENTRY_VALIDITY_HOURS,
)
gate_summary = prev.get("summary") or "-"
gate_metrics = prev.get("metrics") or ""
fib_gate_ok = bool(prev.get("gate_ok"))
elif (r["monitor_type"] or "").strip() in KEY_MONITOR_RS_TYPES:"""
if te_branch_old not in text:
raise SystemExit(f"[{path.name}] missing api te branch anchor")
text = text.replace(te_branch_old, te_branch_new, 1)
# add_key trigger branch
add_key_old = """ if tc_en and not tc_h:
tc_en = 0
if is_false_breakout_key_monitor_type(mt):"""
add_key_new = """ if tc_en and not tc_h:
tc_en = 0
if is_trigger_entry_key_monitor_type(mt):
if direction_sel not in ("long", "short"):
conn.close()
conn = None
flash("触价开仓请选择做多或做空")
return redirect("/key_monitor")
try:
entry_px = float(d.get("trigger_entry") or 0)
sl_px = float(d.get("trigger_sl") or 0)
tp_px = float(d.get("trigger_tp") or 0)
except (TypeError, ValueError):
entry_px = sl_px = tp_px = 0
if entry_px <= 0 or sl_px <= 0 or tp_px <= 0:
conn.close()
conn = None
flash("触价开仓须填写有效的入场价、止损价、止盈价")
return redirect("/key_monitor")
ok_te, err_te = _add_trigger_entry_key_monitor(
conn, symbol, direction_sel, entry_px, sl_px, tp_px, breakeven_enabled=be_flag,
time_close_enabled=tc_en, time_close_hours=tc_h,
)
conn.commit()
conn.close()
conn = None
if not ok_te:
flash(err_te or "触价开仓监控添加失败")
return redirect("/key_monitor")
flash(
f"触价开仓已添加({symbol} 日成交量排名 {rank}/{total}"
f"|有效期 {TRIGGER_ENTRY_VALIDITY_HOURS}h"
f"|标记价触达入场价后下一轮询市价开仓"
f"|移动保本:{'' if be_flag else ''}"
+ (f"{time_close_label(tc_h)}" if tc_en else "")
)
return redirect("/key_monitor")
if is_false_breakout_key_monitor_type(mt):"""
if add_key_old not in text:
raise SystemExit(f"[{path.name}] missing add_key anchor")
text = text.replace(add_key_old, add_key_new, 1)
# function block
anchor = " send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\ndef check_fib_key_monitors():"
if anchor not in text:
raise SystemExit(f"[{path.name}] missing func insert anchor")
text = text.replace(
anchor,
" send_wechat_msg(succ)\n _finalize_key_monitor_one_shot(conn, row, succ, close_reason)\n\n\n" + func_block,
1,
)
path.write_text(text, encoding="utf-8")
print(f"patched {path.relative_to(ROOT)}")
def main() -> None:
block = extract_gate_block()
for t in TARGETS:
patch_file(t, block)
print("done")
if __name__ == "__main__":
main()