refactor: 将共用代码迁入 lib/ 模块化目录

统一 strategy、key_monitor、trade、hub 等共用库到 lib/ 子包,并补充 lib-structure 文档,便于四所与中控维护。

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-02 16:23:09 +08:00
parent 4742a0bb9d
commit 5797d49d8a
190 changed files with 27946 additions and 27499 deletions
+248 -248
View File
@@ -1,248 +1,248 @@
#!/usr/bin/env python3
"""补录缺失的趋势回调策略结束快照(strategy_trade_snapshots)。
适用:gate_bot 等在计划结束(止盈/止损/手动)时因 strategy_trend_cfg 未注册而漏写快照的历史数据。
保本移交路径通常已有快照,本脚本默认跳过「已有任意快照」的计划。
用法(在仓库根目录,Linux 请用 python3):
python3 scripts/backfill_trend_strategy_snapshots.py \\
--db crypto_monitor_gate_bot/crypto.db --dry-run
python3 scripts/backfill_trend_strategy_snapshots.py \\
--db crypto_monitor_gate_bot/crypto.db --apply
"""
from __future__ import annotations
import argparse
import sqlite3
import sys
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[1]
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from strategy_snapshot_lib import ( # noqa: E402
STRATEGY_TREND,
init_strategy_snapshot_table,
save_trend_plan_snapshot,
)
PLAN_STATUS_LABEL = {
"stopped_sl": "止损",
"stopped_tp": "止盈",
"stopped_manual": "手动平仓",
"stopped_handoff": "保本移交",
}
TRADE_RESULT_LABEL = {
"止损": "止损",
"止盈": "止盈",
"手动平仓": "手动平仓",
"移动止盈": "止盈",
"保本止盈": "止盈",
"强制清仓": "手动平仓",
}
def _row_dict(row) -> dict:
if row is None:
return {}
try:
return dict(row)
except Exception:
return {}
def infer_exit_price(
direction: str,
entry: float | None,
margin: float | None,
leverage: float | None,
pnl: float | None,
) -> float | None:
"""由本地 calc_pnl 口径反推平仓价(供补录快照 exit_price)。"""
try:
trigger = float(entry)
margin_f = float(margin)
lev = float(leverage)
pnl_f = float(pnl)
except (TypeError, ValueError):
return None
if trigger <= 0 or margin_f <= 0 or lev <= 0:
return None
notional = margin_f * lev
if notional <= 0:
return None
ratio = pnl_f / notional
if (direction or "long").strip().lower() == "short":
return round(trigger * (1.0 - ratio), 10)
return round(trigger * (1.0 + ratio), 10)
def resolve_result_label(plan: dict, trade: dict | None) -> str:
status = (plan.get("status") or "").strip()
if status in PLAN_STATUS_LABEL:
return PLAN_STATUS_LABEL[status]
if trade:
res = (trade.get("result") or "").strip()
if res in TRADE_RESULT_LABEL:
return TRADE_RESULT_LABEL[res]
if res:
return res
msg = (plan.get("message") or "").strip()
if msg:
return msg[:32]
return "结束"
def find_missing_plans(
conn: sqlite3.Connection,
*,
plan_id: int | None = None,
since: str | None = None,
) -> list[dict]:
sql = """
SELECT p.*
FROM trend_pullback_plans p
WHERE TRIM(COALESCE(p.status, '')) != 'active'
AND NOT EXISTS (
SELECT 1 FROM strategy_trade_snapshots s
WHERE s.strategy_type = ? AND s.source_id = p.id
)
"""
params: list[object] = [STRATEGY_TREND]
if plan_id is not None:
sql += " AND p.id = ?"
params.append(int(plan_id))
if since:
sql += " AND COALESCE(p.opened_at, '') >= ?"
params.append(since.strip())
sql += " ORDER BY p.id ASC"
rows = conn.execute(sql, params).fetchall()
return [_row_dict(r) for r in rows]
def fetch_trade_for_plan(conn: sqlite3.Connection, plan_id: int) -> dict | None:
row = conn.execute(
"""
SELECT * FROM trade_records
WHERE trend_plan_id = ?
ORDER BY COALESCE(closed_at_ms, 0) DESC, id DESC
LIMIT 1
""",
(int(plan_id),),
).fetchone()
return _row_dict(row) if row else None
def backfill_one(conn: sqlite3.Connection, plan: dict, *, dry_run: bool) -> dict:
plan_id = int(plan["id"])
trade = fetch_trade_for_plan(conn, plan_id)
result_label = resolve_result_label(plan, trade)
pnl_amount = None
closed_at = None
exit_price = None
entry = plan.get("avg_entry_price") or plan.get("live_price_ref")
margin = plan.get("plan_margin_capital")
leverage = plan.get("leverage")
if trade:
pnl_amount = trade.get("pnl_amount")
closed_at = trade.get("closed_at")
entry = trade.get("trigger_price") or entry
margin = trade.get("margin_capital") or margin
leverage = trade.get("leverage") or leverage
exit_price = infer_exit_price(
plan.get("direction") or trade.get("direction") or "long",
entry,
margin,
leverage,
pnl_amount,
)
info = {
"plan_id": plan_id,
"symbol": plan.get("symbol"),
"status": plan.get("status"),
"result_label": result_label,
"closed_at": closed_at,
"pnl_amount": pnl_amount,
"exit_price": exit_price,
"legs_done": plan.get("legs_done"),
"dca_legs": plan.get("dca_legs"),
"has_trade": bool(trade),
}
if dry_run:
return info
save_trend_plan_snapshot(
{},
conn,
plan,
result_label=result_label,
exit_price=exit_price,
pnl_amount=float(pnl_amount) if pnl_amount is not None else None,
closed_at=closed_at,
)
return info
def main() -> int:
parser = argparse.ArgumentParser(
description="Backfill missing trend_pullback strategy_trade_snapshots rows."
)
parser.add_argument("--db", required=True, help="Path to instance sqlite db")
parser.add_argument("--plan-id", type=int, help="Only backfill this trend plan id")
parser.add_argument(
"--since",
help="Only plans with opened_at >= YYYY-MM-DD (optional)",
)
parser.add_argument("--dry-run", action="store_true", help="Preview only (default)")
parser.add_argument("--apply", action="store_true", help="Write snapshots")
args = parser.parse_args()
if not args.dry_run and not args.apply:
args.dry_run = True
db_path = Path(args.db).expanduser().resolve()
if not db_path.is_file():
print(f"[ERR] DB not found: {db_path}")
return 1
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
init_strategy_snapshot_table(conn)
missing = find_missing_plans(
conn, plan_id=args.plan_id, since=args.since
)
if not missing:
print("[INFO] No closed trend plans missing strategy snapshots.")
conn.close()
return 0
print(f"[INFO] Found {len(missing)} plan(s) without strategy snapshot.")
applied = 0
for plan in missing:
info = backfill_one(conn, plan, dry_run=not args.apply)
trade_hint = "有交易记录" if info["has_trade"] else "无交易记录"
print(
f" - plan #{info['plan_id']} {info['symbol']} "
f"status={info['status']}{info['result_label']} "
f"closed={info['closed_at'] or ''} pnl={info['pnl_amount']} "
f"补仓 {info['legs_done']}/{info['dca_legs']} ({trade_hint})"
)
applied += 1
if args.apply:
conn.commit()
print(f"[OK] Backfilled {applied} snapshot(s).")
else:
print("[DRY-RUN] No changes written. Re-run with --apply to commit.")
conn.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""补录缺失的趋势回调策略结束快照(strategy_trade_snapshots)。
适用:gate_bot 等在计划结束(止盈/止损/手动)时因 strategy_trend_cfg 未注册而漏写快照的历史数据。
保本移交路径通常已有快照,本脚本默认跳过「已有任意快照」的计划。
用法(在仓库根目录,Linux 请用 python3):
python3 scripts/backfill_trend_strategy_snapshots.py \\
--db crypto_monitor_gate_bot/crypto.db --dry-run
python3 scripts/backfill_trend_strategy_snapshots.py \\
--db crypto_monitor_gate_bot/crypto.db --apply
"""
from __future__ import annotations
import argparse
import sqlite3
import sys
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[1]
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from lib.strategy.strategy_snapshot_lib import ( # noqa: E402
STRATEGY_TREND,
init_strategy_snapshot_table,
save_trend_plan_snapshot,
)
PLAN_STATUS_LABEL = {
"stopped_sl": "止损",
"stopped_tp": "止盈",
"stopped_manual": "手动平仓",
"stopped_handoff": "保本移交",
}
TRADE_RESULT_LABEL = {
"止损": "止损",
"止盈": "止盈",
"手动平仓": "手动平仓",
"移动止盈": "止盈",
"保本止盈": "止盈",
"强制清仓": "手动平仓",
}
def _row_dict(row) -> dict:
if row is None:
return {}
try:
return dict(row)
except Exception:
return {}
def infer_exit_price(
direction: str,
entry: float | None,
margin: float | None,
leverage: float | None,
pnl: float | None,
) -> float | None:
"""由本地 calc_pnl 口径反推平仓价(供补录快照 exit_price)。"""
try:
trigger = float(entry)
margin_f = float(margin)
lev = float(leverage)
pnl_f = float(pnl)
except (TypeError, ValueError):
return None
if trigger <= 0 or margin_f <= 0 or lev <= 0:
return None
notional = margin_f * lev
if notional <= 0:
return None
ratio = pnl_f / notional
if (direction or "long").strip().lower() == "short":
return round(trigger * (1.0 - ratio), 10)
return round(trigger * (1.0 + ratio), 10)
def resolve_result_label(plan: dict, trade: dict | None) -> str:
status = (plan.get("status") or "").strip()
if status in PLAN_STATUS_LABEL:
return PLAN_STATUS_LABEL[status]
if trade:
res = (trade.get("result") or "").strip()
if res in TRADE_RESULT_LABEL:
return TRADE_RESULT_LABEL[res]
if res:
return res
msg = (plan.get("message") or "").strip()
if msg:
return msg[:32]
return "结束"
def find_missing_plans(
conn: sqlite3.Connection,
*,
plan_id: int | None = None,
since: str | None = None,
) -> list[dict]:
sql = """
SELECT p.*
FROM trend_pullback_plans p
WHERE TRIM(COALESCE(p.status, '')) != 'active'
AND NOT EXISTS (
SELECT 1 FROM strategy_trade_snapshots s
WHERE s.strategy_type = ? AND s.source_id = p.id
)
"""
params: list[object] = [STRATEGY_TREND]
if plan_id is not None:
sql += " AND p.id = ?"
params.append(int(plan_id))
if since:
sql += " AND COALESCE(p.opened_at, '') >= ?"
params.append(since.strip())
sql += " ORDER BY p.id ASC"
rows = conn.execute(sql, params).fetchall()
return [_row_dict(r) for r in rows]
def fetch_trade_for_plan(conn: sqlite3.Connection, plan_id: int) -> dict | None:
row = conn.execute(
"""
SELECT * FROM trade_records
WHERE trend_plan_id = ?
ORDER BY COALESCE(closed_at_ms, 0) DESC, id DESC
LIMIT 1
""",
(int(plan_id),),
).fetchone()
return _row_dict(row) if row else None
def backfill_one(conn: sqlite3.Connection, plan: dict, *, dry_run: bool) -> dict:
plan_id = int(plan["id"])
trade = fetch_trade_for_plan(conn, plan_id)
result_label = resolve_result_label(plan, trade)
pnl_amount = None
closed_at = None
exit_price = None
entry = plan.get("avg_entry_price") or plan.get("live_price_ref")
margin = plan.get("plan_margin_capital")
leverage = plan.get("leverage")
if trade:
pnl_amount = trade.get("pnl_amount")
closed_at = trade.get("closed_at")
entry = trade.get("trigger_price") or entry
margin = trade.get("margin_capital") or margin
leverage = trade.get("leverage") or leverage
exit_price = infer_exit_price(
plan.get("direction") or trade.get("direction") or "long",
entry,
margin,
leverage,
pnl_amount,
)
info = {
"plan_id": plan_id,
"symbol": plan.get("symbol"),
"status": plan.get("status"),
"result_label": result_label,
"closed_at": closed_at,
"pnl_amount": pnl_amount,
"exit_price": exit_price,
"legs_done": plan.get("legs_done"),
"dca_legs": plan.get("dca_legs"),
"has_trade": bool(trade),
}
if dry_run:
return info
save_trend_plan_snapshot(
{},
conn,
plan,
result_label=result_label,
exit_price=exit_price,
pnl_amount=float(pnl_amount) if pnl_amount is not None else None,
closed_at=closed_at,
)
return info
def main() -> int:
parser = argparse.ArgumentParser(
description="Backfill missing trend_pullback strategy_trade_snapshots rows."
)
parser.add_argument("--db", required=True, help="Path to instance sqlite db")
parser.add_argument("--plan-id", type=int, help="Only backfill this trend plan id")
parser.add_argument(
"--since",
help="Only plans with opened_at >= YYYY-MM-DD (optional)",
)
parser.add_argument("--dry-run", action="store_true", help="Preview only (default)")
parser.add_argument("--apply", action="store_true", help="Write snapshots")
args = parser.parse_args()
if not args.dry_run and not args.apply:
args.dry_run = True
db_path = Path(args.db).expanduser().resolve()
if not db_path.is_file():
print(f"[ERR] DB not found: {db_path}")
return 1
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
init_strategy_snapshot_table(conn)
missing = find_missing_plans(
conn, plan_id=args.plan_id, since=args.since
)
if not missing:
print("[INFO] No closed trend plans missing strategy snapshots.")
conn.close()
return 0
print(f"[INFO] Found {len(missing)} plan(s) without strategy snapshot.")
applied = 0
for plan in missing:
info = backfill_one(conn, plan, dry_run=not args.apply)
trade_hint = "有交易记录" if info["has_trade"] else "无交易记录"
print(
f" - plan #{info['plan_id']} {info['symbol']} "
f"status={info['status']}{info['result_label']} "
f"closed={info['closed_at'] or ''} pnl={info['pnl_amount']} "
f"补仓 {info['legs_done']}/{info['dca_legs']} ({trade_hint})"
)
applied += 1
if args.apply:
conn.commit()
print(f"[OK] Backfilled {applied} snapshot(s).")
else:
print("[DRY-RUN] No changes written. Re-run with --apply to commit.")
conn.close()
return 0
if __name__ == "__main__":
raise SystemExit(main())
+188 -188
View File
@@ -1,188 +1,188 @@
#!/usr/bin/env python3
"""补录缺失的趋势回调 trade_records(策略快照已有、交易记录漏写)。
典型原因:gate_bot insert_trade_record 曾不接受 entry_reason_finalize_plan 写快照后插入失败。
用法:
python scripts/backfill_trend_trade_records.py --db crypto_monitor_gate_bot/crypto.db --dry-run
python scripts/backfill_trend_trade_records.py --db crypto_monitor_gate_bot/crypto.db --apply
"""
from __future__ import annotations
import argparse
import json
import sqlite3
import sys
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[1]
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from strategy_snapshot_lib import STRATEGY_TREND # noqa: E402
from strategy_trade_labels import ENTRY_REASON_TREND_PULLBACK, MONITOR_TYPE_TREND_PULLBACK # noqa: E402
STATUS_TO_RESULT = {
"stopped_sl": "止损",
"stopped_tp": "止盈",
"stopped_manual": "手动平仓",
}
def _row_dict(row) -> dict:
if row is None:
return {}
try:
return dict(row)
except Exception:
return {}
def _hold_minutes(hold_seconds: int) -> int:
try:
return max(0, int(round(float(hold_seconds) / 60.0)))
except (TypeError, ValueError):
return 0
def backfill_one(conn: sqlite3.Connection, snap: dict, *, apply: bool) -> dict:
plan_id = int(snap.get("source_id") or 0)
if plan_id <= 0:
return {"plan_id": plan_id, "skipped": True, "reason": "invalid source_id"}
exists = conn.execute(
"SELECT id FROM trade_records WHERE trend_plan_id=? LIMIT 1", (plan_id,)
).fetchone()
if exists:
return {"plan_id": plan_id, "skipped": True, "reason": "trade_exists"}
try:
payload = json.loads(snap.get("snapshot_json") or "{}")
except Exception:
payload = {}
plan = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=?", (plan_id,)
).fetchone()
plan_d = _row_dict(plan)
symbol = snap.get("symbol") or plan_d.get("symbol") or payload.get("symbol")
direction = snap.get("direction") or plan_d.get("direction") or payload.get("direction") or "long"
result = (snap.get("result_label") or "").strip() or STATUS_TO_RESULT.get(
plan_d.get("status") or "", "手动平仓"
)
opened_at = snap.get("opened_at") or plan_d.get("opened_at")
closed_at = snap.get("closed_at")
pnl_amount = snap.get("pnl_amount")
if pnl_amount is None:
pnl_amount = payload.get("pnl_amount")
trigger_price = payload.get("avg_entry_price") or plan_d.get("avg_entry_price")
stop_loss = payload.get("stop_loss") or plan_d.get("stop_loss")
take_profit = payload.get("take_profit") or plan_d.get("take_profit")
margin_capital = payload.get("plan_margin_capital") or plan_d.get("plan_margin_capital")
leverage = payload.get("leverage") or plan_d.get("leverage")
opened_ms = plan_d.get("opened_at_ms")
closed_ms = None
hold_seconds = 0
if opened_at and closed_at:
try:
from datetime import datetime
fmt = "%Y-%m-%d %H:%M:%S"
o = datetime.strptime(str(opened_at).strip()[:19], fmt)
c = datetime.strptime(str(closed_at).strip()[:19], fmt)
hold_seconds = max(0, int((c - o).total_seconds()))
except Exception:
hold_seconds = 0
row = {
"symbol": symbol,
"monitor_type": MONITOR_TYPE_TREND_PULLBACK,
"direction": direction,
"trigger_price": trigger_price,
"stop_loss": stop_loss,
"initial_stop_loss": plan_d.get("initial_stop_loss") or stop_loss,
"take_profit": take_profit,
"margin_capital": margin_capital,
"leverage": leverage,
"pnl_amount": pnl_amount,
"hold_seconds": hold_seconds,
"trade_style": "trend_pullback",
"result": result,
"opened_at": opened_at,
"opened_at_ms": opened_ms,
"closed_at": closed_at,
"closed_at_ms": closed_ms,
"entry_reason": ENTRY_REASON_TREND_PULLBACK,
"trend_plan_id": plan_id,
}
if not apply:
return {"plan_id": plan_id, "dry_run": True, "row": row}
conn.execute(
"""INSERT INTO trade_records (
symbol, monitor_type, direction, trigger_price, stop_loss, initial_stop_loss,
take_profit, margin_capital, leverage, pnl_amount, hold_seconds, trade_style,
hold_minutes, opened_at, opened_at_ms, closed_at, closed_at_ms, result,
entry_reason, trend_plan_id
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
row["symbol"],
row["monitor_type"],
row["direction"],
row["trigger_price"],
row["stop_loss"],
row["initial_stop_loss"],
row["take_profit"],
row["margin_capital"],
row["leverage"],
row["pnl_amount"],
row["hold_seconds"],
row["trade_style"],
_hold_minutes(hold_seconds),
row["opened_at"],
row["opened_at_ms"],
row["closed_at"],
row["closed_at_ms"],
row["result"],
row["entry_reason"],
row["trend_plan_id"],
),
)
return {"plan_id": plan_id, "inserted": True}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--db", required=True, help="实例 sqlite 路径")
ap.add_argument("--apply", action="store_true", help="写入数据库(默认 dry-run")
args = ap.parse_args()
db_path = Path(args.db)
if not db_path.is_file():
print(f"数据库不存在: {db_path}")
return 1
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
snaps = conn.execute(
"""SELECT * FROM strategy_trade_snapshots
WHERE strategy_type=? ORDER BY id DESC""",
(STRATEGY_TREND,),
).fetchall()
out = []
for s in snaps:
r = backfill_one(conn, _row_dict(s), apply=args.apply)
out.append(r)
print(r)
if args.apply:
conn.commit()
conn.close()
inserted = sum(1 for x in out if x.get("inserted"))
print(f"done: inserted={inserted} total_snapshots={len(snaps)} apply={args.apply}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""补录缺失的趋势回调 trade_records(策略快照已有、交易记录漏写)。
典型原因:gate_bot insert_trade_record 曾不接受 entry_reason_finalize_plan 写快照后插入失败。
用法:
python scripts/backfill_trend_trade_records.py --db crypto_monitor_gate_bot/crypto.db --dry-run
python scripts/backfill_trend_trade_records.py --db crypto_monitor_gate_bot/crypto.db --apply
"""
from __future__ import annotations
import argparse
import json
import sqlite3
import sys
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[1]
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from lib.strategy.strategy_snapshot_lib import STRATEGY_TREND # noqa: E402
from lib.strategy.strategy_trade_labels import ENTRY_REASON_TREND_PULLBACK, MONITOR_TYPE_TREND_PULLBACK # noqa: E402
STATUS_TO_RESULT = {
"stopped_sl": "止损",
"stopped_tp": "止盈",
"stopped_manual": "手动平仓",
}
def _row_dict(row) -> dict:
if row is None:
return {}
try:
return dict(row)
except Exception:
return {}
def _hold_minutes(hold_seconds: int) -> int:
try:
return max(0, int(round(float(hold_seconds) / 60.0)))
except (TypeError, ValueError):
return 0
def backfill_one(conn: sqlite3.Connection, snap: dict, *, apply: bool) -> dict:
plan_id = int(snap.get("source_id") or 0)
if plan_id <= 0:
return {"plan_id": plan_id, "skipped": True, "reason": "invalid source_id"}
exists = conn.execute(
"SELECT id FROM trade_records WHERE trend_plan_id=? LIMIT 1", (plan_id,)
).fetchone()
if exists:
return {"plan_id": plan_id, "skipped": True, "reason": "trade_exists"}
try:
payload = json.loads(snap.get("snapshot_json") or "{}")
except Exception:
payload = {}
plan = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=?", (plan_id,)
).fetchone()
plan_d = _row_dict(plan)
symbol = snap.get("symbol") or plan_d.get("symbol") or payload.get("symbol")
direction = snap.get("direction") or plan_d.get("direction") or payload.get("direction") or "long"
result = (snap.get("result_label") or "").strip() or STATUS_TO_RESULT.get(
plan_d.get("status") or "", "手动平仓"
)
opened_at = snap.get("opened_at") or plan_d.get("opened_at")
closed_at = snap.get("closed_at")
pnl_amount = snap.get("pnl_amount")
if pnl_amount is None:
pnl_amount = payload.get("pnl_amount")
trigger_price = payload.get("avg_entry_price") or plan_d.get("avg_entry_price")
stop_loss = payload.get("stop_loss") or plan_d.get("stop_loss")
take_profit = payload.get("take_profit") or plan_d.get("take_profit")
margin_capital = payload.get("plan_margin_capital") or plan_d.get("plan_margin_capital")
leverage = payload.get("leverage") or plan_d.get("leverage")
opened_ms = plan_d.get("opened_at_ms")
closed_ms = None
hold_seconds = 0
if opened_at and closed_at:
try:
from datetime import datetime
fmt = "%Y-%m-%d %H:%M:%S"
o = datetime.strptime(str(opened_at).strip()[:19], fmt)
c = datetime.strptime(str(closed_at).strip()[:19], fmt)
hold_seconds = max(0, int((c - o).total_seconds()))
except Exception:
hold_seconds = 0
row = {
"symbol": symbol,
"monitor_type": MONITOR_TYPE_TREND_PULLBACK,
"direction": direction,
"trigger_price": trigger_price,
"stop_loss": stop_loss,
"initial_stop_loss": plan_d.get("initial_stop_loss") or stop_loss,
"take_profit": take_profit,
"margin_capital": margin_capital,
"leverage": leverage,
"pnl_amount": pnl_amount,
"hold_seconds": hold_seconds,
"trade_style": "trend_pullback",
"result": result,
"opened_at": opened_at,
"opened_at_ms": opened_ms,
"closed_at": closed_at,
"closed_at_ms": closed_ms,
"entry_reason": ENTRY_REASON_TREND_PULLBACK,
"trend_plan_id": plan_id,
}
if not apply:
return {"plan_id": plan_id, "dry_run": True, "row": row}
conn.execute(
"""INSERT INTO trade_records (
symbol, monitor_type, direction, trigger_price, stop_loss, initial_stop_loss,
take_profit, margin_capital, leverage, pnl_amount, hold_seconds, trade_style,
hold_minutes, opened_at, opened_at_ms, closed_at, closed_at_ms, result,
entry_reason, trend_plan_id
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
row["symbol"],
row["monitor_type"],
row["direction"],
row["trigger_price"],
row["stop_loss"],
row["initial_stop_loss"],
row["take_profit"],
row["margin_capital"],
row["leverage"],
row["pnl_amount"],
row["hold_seconds"],
row["trade_style"],
_hold_minutes(hold_seconds),
row["opened_at"],
row["opened_at_ms"],
row["closed_at"],
row["closed_at_ms"],
row["result"],
row["entry_reason"],
row["trend_plan_id"],
),
)
return {"plan_id": plan_id, "inserted": True}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--db", required=True, help="实例 sqlite 路径")
ap.add_argument("--apply", action="store_true", help="写入数据库(默认 dry-run")
args = ap.parse_args()
db_path = Path(args.db)
if not db_path.is_file():
print(f"数据库不存在: {db_path}")
return 1
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
snaps = conn.execute(
"""SELECT * FROM strategy_trade_snapshots
WHERE strategy_type=? ORDER BY id DESC""",
(STRATEGY_TREND,),
).fetchall()
out = []
for s in snaps:
r = backfill_one(conn, _row_dict(s), apply=args.apply)
out.append(r)
print(r)
if args.apply:
conn.commit()
conn.close()
inserted = sum(1 for x in out if x.get("inserted"))
print(f"done: inserted={inserted} total_snapshots={len(snaps)} apply={args.apply}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+33 -33
View File
@@ -1,33 +1,33 @@
"""Build embed_page_fragment.html from gate index.html."""
from __future__ import annotations
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
src_lines = (ROOT / "crypto_monitor_gate" / "templates" / "index.html").read_text(
encoding="utf-8"
).splitlines()
# 1-based line numbers from index.html
macro_body = src_lines[243:262] # {% macro %} … {% endmacro %}
grid_inner = src_lines[328:736] # inside .grid (exclude outer wrapper)
stats_block = src_lines[738:772]
out_lines = [
"{# Hub iframe tab fragment — shared via embed_templates #}",
*macro_body,
'<div class="grid">',
*grid_inner,
"</div>",
*stats_block,
]
out_dir = ROOT / "embed_templates"
out_dir.mkdir(exist_ok=True)
text = "\n".join(out_lines) + "\n"
text = text.replace(
"{% include 'order_monitor_rule_tips_gate.html' %}",
"{% include order_rule_tips_tpl %}",
)
(out_dir / "embed_page_fragment.html").write_text(text, encoding="utf-8")
print("wrote", out_dir / "embed_page_fragment.html", "lines", len(out_lines))
"""Build embed_page_fragment.html from gate index.html."""
from __future__ import annotations
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
src_lines = (ROOT / "crypto_monitor_gate" / "templates" / "index.html").read_text(
encoding="utf-8"
).splitlines()
# 1-based line numbers from index.html
macro_body = src_lines[243:262] # {% macro %} … {% endmacro %}
grid_inner = src_lines[328:736] # inside .grid (exclude outer wrapper)
stats_block = src_lines[738:772]
out_lines = [
"{# Hub iframe tab fragment — shared via embed_templates #}",
*macro_body,
'<div class="grid">',
*grid_inner,
"</div>",
*stats_block,
]
out_dir = ROOT / "lib" / "instance" / "templates"
out_dir.mkdir(exist_ok=True)
text = "\n".join(out_lines) + "\n"
text = text.replace(
"{% include 'order_monitor_rule_tips_gate.html' %}",
"{% include order_rule_tips_tpl %}",
)
(out_dir / "embed_page_fragment.html").write_text(text, encoding="utf-8")
print("wrote", out_dir / "embed_page_fragment.html", "lines", len(out_lines))
+93 -93
View File
@@ -1,93 +1,93 @@
#!/usr/bin/env python3
"""清空中控 K 线 SQLite 缓存(hub_kline.db),便于清库后全量重拉。
用法(Linux 云服务器,在仓库根目录):
python3 scripts/clear_hub_kline_db.py --dry-run
python3 scripts/clear_hub_kline_db.py --apply
python3 scripts/clear_hub_kline_db.py --apply --exchange binance --symbol BTC/USDT --timeframe 15m
默认库路径:环境变量 HUB_KLINE_DB_PATH,或 manual_trading_hub/data/hub_kline.db
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from hub_kline_store import ( # noqa: E402
clear_all_bars,
clear_series_bars,
default_db_path,
init_db,
)
def main() -> int:
parser = argparse.ArgumentParser(description="Clear manual-trading-hub K-line SQLite cache.")
parser.add_argument(
"--db",
default=os.getenv("HUB_KLINE_DB_PATH", "").strip() or str(default_db_path()),
help="hub_kline.db path",
)
parser.add_argument("--exchange", default="", help="exchange_key, e.g. binance")
parser.add_argument("--symbol", default="", help="symbol, e.g. BTC/USDT")
parser.add_argument("--timeframe", default="", help="optional timeframe, e.g. 15m")
parser.add_argument("--dry-run", action="store_true", help="count only")
parser.add_argument("--apply", action="store_true", help="execute delete")
args = parser.parse_args()
db_path = Path(args.db)
if not db_path.is_file():
print(f"DB not found: {db_path}", file=sys.stderr)
return 1
init_db(db_path)
ex = (args.exchange or "").strip().lower()
sym = (args.symbol or "").strip().upper()
tf = (args.timeframe or "").strip().lower() or None
if args.dry_run and not args.apply:
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
if ex and sym:
if tf:
n = conn.execute(
"SELECT COUNT(*) FROM ohlcv_bars WHERE exchange_key=? AND symbol=? AND timeframe=?",
(ex, sym, tf),
).fetchone()[0]
print(f"would delete series rows: {n} ({ex} {sym} {tf})")
else:
n = conn.execute(
"SELECT COUNT(*) FROM ohlcv_bars WHERE exchange_key=? AND symbol=?",
(ex, sym),
).fetchone()[0]
print(f"would delete symbol rows: {n} ({ex} {sym} all tf)")
else:
n = conn.execute("SELECT COUNT(*) FROM ohlcv_bars").fetchone()[0]
print(f"would delete all ohlcv_bars rows: {n}")
finally:
conn.close()
return 0
if not args.apply:
print("Specify --apply to delete (or --dry-run to preview).", file=sys.stderr)
return 1
if ex and sym:
removed = clear_series_bars(ex, sym, tf, db_path)
scope = f"{ex} {sym}" + (f" {tf}" if tf else " (all timeframes)")
print(f"cleared {removed} rows for {scope}")
else:
removed = clear_all_bars(db_path)
print(f"cleared all {removed} ohlcv_bars rows from {db_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""清空中控 K 线 SQLite 缓存(hub_kline.db),便于清库后全量重拉。
用法(Linux 云服务器,在仓库根目录):
python3 scripts/clear_hub_kline_db.py --dry-run
python3 scripts/clear_hub_kline_db.py --apply
python3 scripts/clear_hub_kline_db.py --apply --exchange binance --symbol BTC/USDT --timeframe 15m
默认库路径:环境变量 HUB_KLINE_DB_PATH,或 manual_trading_hub/data/hub_kline.db
"""
from __future__ import annotations
import argparse
import os
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from lib.hub.hub_kline_store import ( # noqa: E402
clear_all_bars,
clear_series_bars,
default_db_path,
init_db,
)
def main() -> int:
parser = argparse.ArgumentParser(description="Clear manual-trading-hub K-line SQLite cache.")
parser.add_argument(
"--db",
default=os.getenv("HUB_KLINE_DB_PATH", "").strip() or str(default_db_path()),
help="hub_kline.db path",
)
parser.add_argument("--exchange", default="", help="exchange_key, e.g. binance")
parser.add_argument("--symbol", default="", help="symbol, e.g. BTC/USDT")
parser.add_argument("--timeframe", default="", help="optional timeframe, e.g. 15m")
parser.add_argument("--dry-run", action="store_true", help="count only")
parser.add_argument("--apply", action="store_true", help="execute delete")
args = parser.parse_args()
db_path = Path(args.db)
if not db_path.is_file():
print(f"DB not found: {db_path}", file=sys.stderr)
return 1
init_db(db_path)
ex = (args.exchange or "").strip().lower()
sym = (args.symbol or "").strip().upper()
tf = (args.timeframe or "").strip().lower() or None
if args.dry_run and not args.apply:
import sqlite3
conn = sqlite3.connect(str(db_path))
try:
if ex and sym:
if tf:
n = conn.execute(
"SELECT COUNT(*) FROM ohlcv_bars WHERE exchange_key=? AND symbol=? AND timeframe=?",
(ex, sym, tf),
).fetchone()[0]
print(f"would delete series rows: {n} ({ex} {sym} {tf})")
else:
n = conn.execute(
"SELECT COUNT(*) FROM ohlcv_bars WHERE exchange_key=? AND symbol=?",
(ex, sym),
).fetchone()[0]
print(f"would delete symbol rows: {n} ({ex} {sym} all tf)")
else:
n = conn.execute("SELECT COUNT(*) FROM ohlcv_bars").fetchone()[0]
print(f"would delete all ohlcv_bars rows: {n}")
finally:
conn.close()
return 0
if not args.apply:
print("Specify --apply to delete (or --dry-run to preview).", file=sys.stderr)
return 1
if ex and sym:
removed = clear_series_bars(ex, sym, tf, db_path)
scope = f"{ex} {sym}" + (f" {tf}" if tf else " (all timeframes)")
print(f"cleared {removed} rows for {scope}")
else:
removed = clear_all_bars(db_path)
print(f"cleared all {removed} ohlcv_bars rows from {db_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+67 -67
View File
@@ -1,67 +1,67 @@
#!/usr/bin/env python3
"""清理 strategy_trade_snapshots 重复行(同计划 + 同结果仅保留 id 最大的一条)。
用法(在实例目录,如 crypto_monitor_gate_bot):
python ../scripts/dedupe_strategy_snapshots.py
python ../scripts/dedupe_strategy_snapshots.py --db crypto.db
"""
from __future__ import annotations
import argparse
import os
import sqlite3
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from strategy_snapshot_lib import dedupe_strategy_snapshots, init_strategy_snapshot_table # noqa: E402
def main() -> int:
parser = argparse.ArgumentParser(description="Dedupe strategy_trade_snapshots rows.")
parser.add_argument(
"--db",
default=os.getenv("DB_PATH", "crypto.db"),
help="SQLite database path (default: DB_PATH or crypto.db)",
)
parser.add_argument("--dry-run", action="store_true", help="Count only, do not delete")
args = parser.parse_args()
db_path = Path(args.db)
if not db_path.is_file():
print(f"DB not found: {db_path}", file=sys.stderr)
return 1
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
init_strategy_snapshot_table(conn)
before = conn.execute("SELECT COUNT(*) AS c FROM strategy_trade_snapshots").fetchone()["c"]
dup_groups = conn.execute(
"""SELECT strategy_type, source_id, result_label, COUNT(*) AS n
FROM strategy_trade_snapshots
GROUP BY strategy_type, source_id, result_label
HAVING n > 1
ORDER BY n DESC"""
).fetchall()
extra = sum(int(r["n"]) - 1 for r in dup_groups)
print(f"snapshots total={before}, duplicate rows to remove={extra}, groups={len(dup_groups)}")
for r in dup_groups[:20]:
print(
f" {r['strategy_type']} plan={r['source_id']} "
f"{r['result_label']} x{r['n']}"
)
if args.dry_run:
conn.close()
return 0
removed = dedupe_strategy_snapshots(conn)
conn.commit()
after = conn.execute("SELECT COUNT(*) AS c FROM strategy_trade_snapshots").fetchone()["c"]
conn.close()
print(f"removed={removed}, remaining={after}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""清理 strategy_trade_snapshots 重复行(同计划 + 同结果仅保留 id 最大的一条)。
用法(在实例目录,如 crypto_monitor_gate_bot):
python ../scripts/dedupe_strategy_snapshots.py
python ../scripts/dedupe_strategy_snapshots.py --db crypto.db
"""
from __future__ import annotations
import argparse
import os
import sqlite3
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from lib.strategy.strategy_snapshot_lib import dedupe_strategy_snapshots, init_strategy_snapshot_table # noqa: E402
def main() -> int:
parser = argparse.ArgumentParser(description="Dedupe strategy_trade_snapshots rows.")
parser.add_argument(
"--db",
default=os.getenv("DB_PATH", "crypto.db"),
help="SQLite database path (default: DB_PATH or crypto.db)",
)
parser.add_argument("--dry-run", action="store_true", help="Count only, do not delete")
args = parser.parse_args()
db_path = Path(args.db)
if not db_path.is_file():
print(f"DB not found: {db_path}", file=sys.stderr)
return 1
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
init_strategy_snapshot_table(conn)
before = conn.execute("SELECT COUNT(*) AS c FROM strategy_trade_snapshots").fetchone()["c"]
dup_groups = conn.execute(
"""SELECT strategy_type, source_id, result_label, COUNT(*) AS n
FROM strategy_trade_snapshots
GROUP BY strategy_type, source_id, result_label
HAVING n > 1
ORDER BY n DESC"""
).fetchall()
extra = sum(int(r["n"]) - 1 for r in dup_groups)
print(f"snapshots total={before}, duplicate rows to remove={extra}, groups={len(dup_groups)}")
for r in dup_groups[:20]:
print(
f" {r['strategy_type']} plan={r['source_id']} "
f"{r['result_label']} x{r['n']}"
)
if args.dry_run:
conn.close()
return 0
removed = dedupe_strategy_snapshots(conn)
conn.commit()
after = conn.execute("SELECT COUNT(*) AS c FROM strategy_trade_snapshots").fetchone()["c"]
conn.close()
print(f"removed={removed}, remaining={after}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+49 -49
View File
@@ -1,49 +1,49 @@
"""One-off: extract instance_page.css / instance_page_boot.js from gate index.html."""
from __future__ import annotations
import re
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
src = ROOT / "crypto_monitor_gate" / "templates" / "index.html"
text = src.read_text(encoding="utf-8")
m = re.search(r"<style>(.*?)</style>", text, re.S)
if m:
(ROOT / "static" / "instance_page.css").write_text(m.group(1).strip() + "\n", encoding="utf-8")
marker = '<script src="/static/manual_order_rr_preview.js?v=3"></script>'
if marker in text:
part = text.split(marker, 1)[1]
m2 = re.search(r"<script>(.*?)</script>\s*</body>", part, re.S)
if m2:
boot = m2.group(1).strip()
boot = boot.replace(
"setInterval(refreshAccountSnapshot, {{ balance_refresh_seconds * 1000 }});",
"setInterval(refreshAccountSnapshot, Number(document.body.dataset.balanceRefreshMs || 30000));",
)
boot = boot.replace(
"setInterval(refreshPriceSnapshotConditional, {{ price_refresh_seconds * 1000 }});",
"setInterval(refreshPriceSnapshotConditional, Number(document.body.dataset.priceRefreshMs || 5000));",
)
(ROOT / "static" / "instance_page_boot.js").write_text(boot + "\n", encoding="utf-8")
part2 = text.split(marker, 1)[1]
m3 = re.search(r"<script>(.*?)</script>\s*</body>", part2, re.S)
if m3:
boot_tpl = m3.group(1).strip()
boot_tpl = boot_tpl.replace(
"setInterval(refreshAccountSnapshot, {{ balance_refresh_seconds * 1000 }});",
"setInterval(refreshAccountSnapshot, Number(document.body.dataset.balanceRefreshMs || 30000));",
)
boot_tpl = boot_tpl.replace(
"setInterval(refreshPriceSnapshotConditional, {{ price_refresh_seconds * 1000 }});",
"setInterval(refreshPriceSnapshotConditional, Number(document.body.dataset.priceRefreshMs || 5000));",
)
embed_dir = ROOT / "embed_templates"
embed_dir.mkdir(exist_ok=True)
(embed_dir / "embed_boot_scripts.html").write_text(
"<script>\n" + boot_tpl + "\n</script>\n", encoding="utf-8"
)
print("done")
"""One-off: extract instance_page.css / instance_page_boot.js from gate index.html."""
from __future__ import annotations
import re
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
src = ROOT / "crypto_monitor_gate" / "templates" / "index.html"
text = src.read_text(encoding="utf-8")
m = re.search(r"<style>(.*?)</style>", text, re.S)
if m:
(ROOT / "lib" / "common" / "static" / "instance_page.css").write_text(m.group(1).strip() + "\n", encoding="utf-8")
marker = '<script src="/static/manual_order_rr_preview.js?v=3"></script>'
if marker in text:
part = text.split(marker, 1)[1]
m2 = re.search(r"<script>(.*?)</script>\s*</body>", part, re.S)
if m2:
boot = m2.group(1).strip()
boot = boot.replace(
"setInterval(refreshAccountSnapshot, {{ balance_refresh_seconds * 1000 }});",
"setInterval(refreshAccountSnapshot, Number(document.body.dataset.balanceRefreshMs || 30000));",
)
boot = boot.replace(
"setInterval(refreshPriceSnapshotConditional, {{ price_refresh_seconds * 1000 }});",
"setInterval(refreshPriceSnapshotConditional, Number(document.body.dataset.priceRefreshMs || 5000));",
)
(ROOT / "lib" / "common" / "static" / "instance_page_boot.js").write_text(boot + "\n", encoding="utf-8")
part2 = text.split(marker, 1)[1]
m3 = re.search(r"<script>(.*?)</script>\s*</body>", part2, re.S)
if m3:
boot_tpl = m3.group(1).strip()
boot_tpl = boot_tpl.replace(
"setInterval(refreshAccountSnapshot, {{ balance_refresh_seconds * 1000 }});",
"setInterval(refreshAccountSnapshot, Number(document.body.dataset.balanceRefreshMs || 30000));",
)
boot_tpl = boot_tpl.replace(
"setInterval(refreshPriceSnapshotConditional, {{ price_refresh_seconds * 1000 }});",
"setInterval(refreshPriceSnapshotConditional, Number(document.body.dataset.priceRefreshMs || 5000));",
)
embed_dir = ROOT / "lib" / "instance" / "templates"
embed_dir.mkdir(exist_ok=True)
(embed_dir / "embed_boot_scripts.html").write_text(
"<script>\n" + boot_tpl + "\n</script>\n", encoding="utf-8"
)
print("done")
+78 -78
View File
@@ -1,78 +1,78 @@
#!/usr/bin/env python3
"""修正趋势保本移交后 monitor_type 仍为「下单监控」的历史数据。"""
from __future__ import annotations
import argparse
import sqlite3
from pathlib import Path
from strategy_trade_labels import MONITOR_TYPE_TREND_PULLBACK
def main() -> int:
parser = argparse.ArgumentParser(description="Fix trend handoff order/trade monitor_type labels.")
parser.add_argument("--db", required=True, help="Path to instance sqlite db")
parser.add_argument("--dry-run", action="store_true", help="Preview only")
parser.add_argument("--apply", action="store_true", help="Apply updates")
args = parser.parse_args()
if not args.dry_run and not args.apply:
args.dry_run = True
db_path = Path(args.db).expanduser().resolve()
if not db_path.is_file():
print(f"[ERR] DB not found: {db_path}")
return 1
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(
"""
SELECT COUNT(*) AS c FROM order_monitors
WHERE trend_plan_id IS NOT NULL AND trend_plan_id > 0
AND (monitor_type IS NULL OR TRIM(monitor_type) = '' OR monitor_type = '下单监控')
"""
)
om_n = int(cur.fetchone()["c"])
cur.execute(
"""
SELECT COUNT(*) AS c FROM trade_records
WHERE trend_plan_id IS NOT NULL AND trend_plan_id > 0
AND (monitor_type IS NULL OR TRIM(monitor_type) = '' OR monitor_type = '下单监控')
"""
)
tr_n = int(cur.fetchone()["c"])
print(f"[INFO] order_monitors to fix: {om_n}")
print(f"[INFO] trade_records to fix: {tr_n}")
if args.dry_run:
conn.close()
return 0
cur.execute(
"""
UPDATE order_monitors
SET monitor_type=?
WHERE trend_plan_id IS NOT NULL AND trend_plan_id > 0
AND (monitor_type IS NULL OR TRIM(monitor_type) = '' OR monitor_type = '下单监控')
""",
(MONITOR_TYPE_TREND_PULLBACK,),
)
cur.execute(
"""
UPDATE trade_records
SET monitor_type=?
WHERE trend_plan_id IS NOT NULL AND trend_plan_id > 0
AND (monitor_type IS NULL OR TRIM(monitor_type) = '' OR monitor_type = '下单监控')
""",
(MONITOR_TYPE_TREND_PULLBACK,),
)
conn.commit()
conn.close()
print("[OK] Applied.")
return 0
if __name__ == "__main__":
raise SystemExit(main())
#!/usr/bin/env python3
"""修正趋势保本移交后 monitor_type 仍为「下单监控」的历史数据。"""
from __future__ import annotations
import argparse
import sqlite3
from pathlib import Path
from lib.strategy.strategy_trade_labels import MONITOR_TYPE_TREND_PULLBACK
def main() -> int:
parser = argparse.ArgumentParser(description="Fix trend handoff order/trade monitor_type labels.")
parser.add_argument("--db", required=True, help="Path to instance sqlite db")
parser.add_argument("--dry-run", action="store_true", help="Preview only")
parser.add_argument("--apply", action="store_true", help="Apply updates")
args = parser.parse_args()
if not args.dry_run and not args.apply:
args.dry_run = True
db_path = Path(args.db).expanduser().resolve()
if not db_path.is_file():
print(f"[ERR] DB not found: {db_path}")
return 1
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
cur = conn.cursor()
cur.execute(
"""
SELECT COUNT(*) AS c FROM order_monitors
WHERE trend_plan_id IS NOT NULL AND trend_plan_id > 0
AND (monitor_type IS NULL OR TRIM(monitor_type) = '' OR monitor_type = '下单监控')
"""
)
om_n = int(cur.fetchone()["c"])
cur.execute(
"""
SELECT COUNT(*) AS c FROM trade_records
WHERE trend_plan_id IS NOT NULL AND trend_plan_id > 0
AND (monitor_type IS NULL OR TRIM(monitor_type) = '' OR monitor_type = '下单监控')
"""
)
tr_n = int(cur.fetchone()["c"])
print(f"[INFO] order_monitors to fix: {om_n}")
print(f"[INFO] trade_records to fix: {tr_n}")
if args.dry_run:
conn.close()
return 0
cur.execute(
"""
UPDATE order_monitors
SET monitor_type=?
WHERE trend_plan_id IS NOT NULL AND trend_plan_id > 0
AND (monitor_type IS NULL OR TRIM(monitor_type) = '' OR monitor_type = '下单监控')
""",
(MONITOR_TYPE_TREND_PULLBACK,),
)
cur.execute(
"""
UPDATE trade_records
SET monitor_type=?
WHERE trend_plan_id IS NOT NULL AND trend_plan_id > 0
AND (monitor_type IS NULL OR TRIM(monitor_type) = '' OR monitor_type = '下单监控')
""",
(MONITOR_TYPE_TREND_PULLBACK,),
)
conn.commit()
conn.close()
print("[OK] Applied.")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+252
View File
@@ -0,0 +1,252 @@
#!/usr/bin/env python3
"""One-shot: move root shared modules into lib/ and rewrite imports."""
from __future__ import annotations
import re
import subprocess
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parent.parent
PACKAGE_FILES: dict[str, list[str]] = {
"strategy": [
"strategy_config.py",
"strategy_db.py",
"strategy_exchange_base.py",
"strategy_exchange_binance.py",
"strategy_exchange_gate.py",
"strategy_exchange_okx.py",
"strategy_records_register.py",
"strategy_register.py",
"strategy_roll_lib.py",
"strategy_roll_monitor_lib.py",
"strategy_roll_ui_lib.py",
"strategy_snapshot_lib.py",
"strategy_trade_labels.py",
"strategy_trend_exchange.py",
"strategy_trend_lib.py",
"strategy_trend_register.py",
"strategy_ui.py",
"strategy_wechat_notify.py",
],
"key_monitor": [
"key_monitor_full_margin_lib.py",
"key_monitor_lib.py",
"key_monitor_schema_lib.py",
"key_sl_tp_lib.py",
"fib_key_monitor_lib.py",
"false_breakout_key_monitor_lib.py",
"trigger_entry_key_monitor_lib.py",
],
"trade": [
"trade_result_lib.py",
"trade_exchange_stats_lib.py",
"trade_stats_calendar_lib.py",
"order_monitor_display_lib.py",
"position_sizing_lib.py",
"account_risk_lib.py",
"manual_sltp_lib.py",
"time_close_lib.py",
"daily_open_limit_lib.py",
],
"hub": [
"hub_auth.py",
"hub_bridge.py",
"hub_calculator_lib.py",
"hub_calculator_market_lib.py",
"hub_entry_plan_lib.py",
"hub_fund_history_lib.py",
"hub_host_status_lib.py",
"hub_kline_store.py",
"hub_macro_calendar_lib.py",
"hub_market_info_lib.py",
"hub_ohlcv_lib.py",
"hub_position_metrics.py",
"hub_sso.py",
"hub_symbol_archive_lib.py",
"hub_trades_lib.py",
"hub_volume_rank_lib.py",
],
"ai": [
"ai_client.py",
"ai_review_lib.py",
],
"instance": [
"instance_embed_context_lib.py",
"instance_embed_lib.py",
"instance_nav_lib.py",
"focus_chart_lib.py",
"journal_chart_lib.py",
],
"exchange": [
"gate_transfer_lib.py",
"gate_position_history_lib.py",
"okx_orders_lib.py",
],
"common": [
"form_submit_lib.py",
"history_window_lib.py",
"wechat_notify_lib.py",
"auto_transfer_daily_lib.py",
],
}
DIR_MOVES: list[tuple[str, str]] = [
("strategy_templates", "lib/strategy/templates"),
("embed_templates", "lib/instance/templates"),
("static", "lib/common/static"),
]
MODULE_TO_LIB: dict[str, str] = {}
for pkg, files in PACKAGE_FILES.items():
for fname in files:
MODULE_TO_LIB[fname[:-3]] = f"lib.{pkg}.{fname[:-3]}"
IMPORT_FROM_RE = re.compile(
r"^(\s*)from\s+(" + "|".join(re.escape(m) for m in sorted(MODULE_TO_LIB, key=len, reverse=True)) + r")\s+import\s+",
re.MULTILINE,
)
IMPORT_BARE_RE = re.compile(
r"^(\s*)import\s+(" + "|".join(re.escape(m) for m in sorted(MODULE_TO_LIB, key=len, reverse=True)) + r")(\s|$)",
re.MULTILINE,
)
def git_mv(src: Path, dst: Path) -> None:
dst.parent.mkdir(parents=True, exist_ok=True)
if not src.exists():
if dst.exists():
return
raise FileNotFoundError(src)
subprocess.run(["git", "mv", str(src), str(dst)], cwd=ROOT, check=True)
def move_files() -> None:
(ROOT / "lib").mkdir(exist_ok=True)
for pkg in PACKAGE_FILES:
(ROOT / "lib" / pkg).mkdir(parents=True, exist_ok=True)
init = ROOT / "lib" / pkg / "__init__.py"
if not init.exists():
init.write_text('"""Shared library package."""\n', encoding="utf-8")
lib_init = ROOT / "lib" / "__init__.py"
if not lib_init.exists():
lib_init.write_text('"""crypto_monitor shared libraries."""\n', encoding="utf-8")
paths_py = ROOT / "lib" / "paths.py"
if not paths_py.exists():
paths_py.write_text(
'''"""Repository path helpers for lib/ assets."""
from __future__ import annotations
import os
from pathlib import Path
LIB_DIR = Path(__file__).resolve().parent
REPO_ROOT = LIB_DIR.parent
def strategy_templates_dir(repo_root: str | Path | None = None) -> str:
root = Path(repo_root) if repo_root is not None else REPO_ROOT
return str(root / "lib" / "strategy" / "templates")
def embed_templates_dir(repo_root: str | Path | None = None) -> str:
root = Path(repo_root) if repo_root is not None else REPO_ROOT
return str(root / "lib" / "instance" / "templates")
def common_static_dir(repo_root: str | Path | None = None) -> str:
root = Path(repo_root) if repo_root is not None else REPO_ROOT
return str(root / "lib" / "common" / "static")
''',
encoding="utf-8",
)
for pkg, files in PACKAGE_FILES.items():
for fname in files:
git_mv(ROOT / fname, ROOT / "lib" / pkg / fname)
for src_rel, dst_rel in DIR_MOVES:
git_mv(ROOT / src_rel, ROOT / dst_rel)
def rewrite_imports_in_text(text: str) -> str:
def from_repl(m: re.Match) -> str:
mod = m.group(2)
return f"{m.group(1)}from {MODULE_TO_LIB[mod]} import "
def bare_repl(m: re.Match) -> str:
mod = m.group(2)
return f"{m.group(1)}import {MODULE_TO_LIB[mod]}{m.group(3)}"
text = IMPORT_FROM_RE.sub(from_repl, text)
text = IMPORT_BARE_RE.sub(bare_repl, text)
return text
def patch_path_literals(text: str) -> str:
replacements = [
('os.path.join(repo_root, "strategy_templates")', 'strategy_templates_dir(repo_root)'),
('os.path.join(repo_root, "embed_templates")', 'embed_templates_dir(repo_root)'),
('os.path.join(os.path.dirname(BASE_DIR), "static")', 'common_static_dir(os.path.dirname(BASE_DIR))'),
('_REPO_ROOT / "static"', '_REPO_ROOT / "lib" / "common" / "static"'),
('ROOT / "strategy_templates"', 'ROOT / "lib" / "strategy" / "templates"'),
('ROOT / "embed_templates"', 'ROOT / "lib" / "instance" / "templates"'),
('ROOT / "static"', 'ROOT / "lib" / "common" / "static"'),
]
for old, new in replacements:
text = text.replace(old, new)
return text
def ensure_paths_import(text: str, filepath: Path) -> str:
needs = []
if "strategy_templates_dir(" in text and "from lib.paths import" not in text:
needs.append("strategy_templates_dir")
if "embed_templates_dir(" in text and "from lib.paths import" not in text:
needs.append("embed_templates_dir")
if "common_static_dir(" in text and "from lib.paths import" not in text:
needs.append("common_static_dir")
if not needs:
return text
imp = f"from lib.paths import {', '.join(sorted(set(needs)))}\n"
if text.startswith('"""') or text.startswith("'''"):
end = text.find('"""', 3) if text.startswith('"""') else text.find("'''", 3)
if end != -1:
end += 3
return text[:end] + "\n\n" + imp + text[end + 1 :]
if text.startswith("from __future__"):
lines = text.splitlines(keepends=True)
i = 0
while i < len(lines) and (
lines[i].startswith("from __future__") or lines[i].strip() == ""
):
i += 1
return "".join(lines[:i]) + imp + "".join(lines[i:])
return imp + text
def rewrite_all_py_files() -> None:
skip = {ROOT / "scripts" / "migrate_to_lib.py"}
for path in ROOT.rglob("*.py"):
if path in skip or ".venv" in path.parts or "__pycache__" in path.parts:
continue
original = path.read_text(encoding="utf-8")
updated = rewrite_imports_in_text(original)
updated = patch_path_literals(updated)
updated = ensure_paths_import(updated, path)
if updated != original:
path.write_text(updated, encoding="utf-8")
def main() -> int:
move_files()
rewrite_all_py_files()
print("Migration complete.")
return 0
if __name__ == "__main__":
sys.exit(main())
+197 -197
View File
@@ -1,197 +1,197 @@
#!/usr/bin/env python3
"""一次性:为 okx/gate/gate_bot 注入与 binance 一致的计仓模式补丁(已 patch 过则跳过)。"""
from __future__ import annotations
import re
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
IMPORT_BLOCK = '''from position_sizing_lib import (
OPEN_SOURCE_KEY_AUTO,
OPEN_SOURCE_MANUAL,
assert_open_source_allowed,
compute_full_margin_sizing,
full_margin_requires_flat_position,
is_full_margin_mode,
leverage_for_full_margin,
load_position_sizing_mode,
mode_label_zh,
)
from key_monitor_full_margin_lib import (
monitor_type_disallowed_in_full_margin,
purge_disallowed_key_monitors,
)
'''
ENV_LINE = (
"# 计仓模式:risk=以损定仓(默认);full_margin=合约可用×比例全仓杠杆(仅 env 切换,须无仓)\n"
"POSITION_SIZING_MODE = load_position_sizing_mode()\n"
)
PURGE_FN = '''
def _purge_key_monitors_if_full_margin():
if not is_full_margin_mode(POSITION_SIZING_MODE):
return
conn = get_db()
try:
cancel = globals().get("_cancel_fib_monitor_limit")
if not callable(cancel):
cancel = lambda _row: None
purge_disallowed_key_monitors(
conn,
sizing_mode=POSITION_SIZING_MODE,
select_rows=lambda c: c.execute("SELECT * FROM key_monitors").fetchall(),
cancel_fib_limit=cancel,
delete_monitor=lambda c, kid: c.execute("DELETE FROM key_monitors WHERE id=?", (kid,)),
send_wechat=send_wechat_msg,
)
conn.commit()
except Exception as e:
print(f"[full_margin] purge key monitors: {e}", flush=True)
finally:
conn.close()
'''
MARKET_OPEN_GUARD = ''' ok_src, src_msg = assert_open_source_allowed(POSITION_SIZING_MODE, OPEN_SOURCE_KEY_AUTO)
if not ok_src:
return False, src_msg, None
'''
ADD_KEY_GUARD = ''' if is_full_margin_mode(POSITION_SIZING_MODE) and monitor_type_disallowed_in_full_margin(mt):
flash(
"全仓杠杆模式下不可添加箱体/收敛突破或斐波监控;"
"请改用阻力/支撑(仅提醒),或切换 POSITION_SIZING_MODE=risk 并重启(须无持仓)。"
)
return redirect("/key_monitor")
'''
TEMPLATE_RULE = ''' <div class="rule-tip">
计仓模式:<strong>{{ position_sizing_mode_label }}</strong>(仅 .env <code>POSITION_SIZING_MODE</code>,须无仓后重启)
{% if position_sizing_mode == 'full_margin' %}
|全仓:合约可用×{{ full_margin_buffer_ratio }}BTC/ETH {{ btc_leverage }}x、其它 {{ alt_leverage }}x,单仓;张数按交易所精度
{% else %}
|以损定仓:风险 {{ risk_percent }}%
{% endif %}
|移动保本:下单可勾选关闭;开启时 {{ breakeven_rr_trigger }}R 触发(每 1R 阶梯上移),偏移 {{ breakeven_offset_pct }}%
</div>'''
APPS = [
("crypto_monitor_okx", 4, "_market_open_for_key_monitor", True),
("crypto_monitor_gate", 2, "_market_open_for_key_monitor", True),
("crypto_monitor_gate_bot", 4, None, False),
]
def patch_app(app_dir: str, funds_dec: int, market_fn: str | None, has_fib: bool):
path = ROOT / app_dir / "app.py"
text = path.read_text(encoding="utf-8")
if "POSITION_SIZING_MODE" in text:
print(f"SKIP {app_dir}/app.py (already patched)")
return
if "from position_sizing_lib import" not in text:
anchor = "from key_monitor_lib import ("
if anchor not in text:
anchor = "from form_submit_lib import"
text = text.replace(
anchor,
IMPORT_BLOCK + "\n" + anchor,
1,
)
else:
text = text.replace(anchor, IMPORT_BLOCK + anchor, 1)
if "POSITION_SIZING_MODE = load_position_sizing_mode()" not in text:
text = text.replace(
"AUTO_TRANSFER_BJ_HOUR = int(os.getenv(\"AUTO_TRANSFER_BJ_HOUR\", \"8\"))\n",
"AUTO_TRANSFER_BJ_HOUR = int(os.getenv(\"AUTO_TRANSFER_BJ_HOUR\", \"8\"))\n" + ENV_LINE,
1,
)
if "_purge_key_monitors_if_full_margin" not in text:
text = text.replace("init_db()\n\n\ndef get_db():", "init_db()" + PURGE_FN + "\ndef get_db():", 1)
text = text.replace(
"install_strategy_trend(app,",
"_purge_key_monitors_if_full_margin()\n\ninstall_strategy_trend(app,",
1,
)
if market_fn and MARKET_OPEN_GUARD.strip() not in text:
text = text.replace(
f"def {market_fn}(\n",
f"def {market_fn}(\n",
1,
)
text = text.replace(
' """\n 与手动',
MARKET_OPEN_GUARD + ' """\n 与手动',
1,
)
# fallback: after docstring closing
if MARKET_OPEN_GUARD.strip() not in text:
pat = rf"(def {market_fn}\([^)]+\):\s*\n\s*\"\"\"[^\"\"]*\"\"\"\s*\n)"
text = re.sub(pat, r"\1" + MARKET_OPEN_GUARD, text, count=1)
if has_fib and ADD_KEY_GUARD.strip() not in text:
text = text.replace(
' if mt not in allowed_types:',
ADD_KEY_GUARD + ' if mt not in allowed_types:',
1,
) if "if mt not in allowed_types:" in text else text.replace(
' rank, total = _daily_volume_rank(symbol)',
ADD_KEY_GUARD + ' rank, total = _daily_volume_rank(symbol)',
1,
)
# render_template risk_percent= add template vars
if "position_sizing_mode=POSITION_SIZING_MODE" not in text:
text = text.replace(
"risk_percent=RISK_PERCENT,\n",
"risk_percent=RISK_PERCENT,\n"
" position_sizing_mode=POSITION_SIZING_MODE,\n"
" position_sizing_mode_label=mode_label_zh(POSITION_SIZING_MODE),\n"
" open_position_button_label=(\n"
' "开仓(全仓杠杆)" if is_full_margin_mode(POSITION_SIZING_MODE) else "开仓(以损定仓)"\n'
" ),\n",
1,
)
path.write_text(text, encoding="utf-8")
print(f"DONE {app_dir}/app.py (partial — verify add_order block manually if needed)")
def patch_template(app_dir: str):
tpl = ROOT / app_dir / "templates" / "index.html"
if not tpl.exists():
return
text = tpl.read_text(encoding="utf-8")
if "position_sizing_mode_label" in text:
print(f"SKIP {tpl}")
return
old = re.search(
r'<div class="rule-tip">\s*以损定仓:风险 \{\{ risk_percent \}\}%.*?</div>',
text,
re.S,
)
if old:
text = text[: old.start()] + TEMPLATE_RULE + text[old.end() :]
text = text.replace(
'<button type="submit">开仓(以损定仓)</button>',
'<button type="submit">{{ open_position_button_label }}</button>',
)
text = text.replace(
'<input id="order-leverage" name="leverage" type="number" min="1" step="1" placeholder="杠杆(可选)">',
'{% if position_sizing_mode != \'full_margin\' %}\n'
' <input id="order-leverage" name="leverage" type="number" min="1" step="1" placeholder="杠杆(可选)">\n'
' {% endif %}',
1,
)
tpl.write_text(text, encoding="utf-8")
print(f"DONE {tpl}")
def main():
for app_dir, funds, mfn, fib in APPS:
patch_app(app_dir, funds, mfn, fib)
patch_template(app_dir)
if __name__ == "__main__":
main()
#!/usr/bin/env python3
"""一次性:为 okx/gate/gate_bot 注入与 binance 一致的计仓模式补丁(已 patch 过则跳过)。"""
from __future__ import annotations
import re
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
IMPORT_BLOCK = '''from position_sizing_lib import (
OPEN_SOURCE_KEY_AUTO,
OPEN_SOURCE_MANUAL,
assert_open_source_allowed,
compute_full_margin_sizing,
full_margin_requires_flat_position,
is_full_margin_mode,
leverage_for_full_margin,
load_position_sizing_mode,
mode_label_zh,
)
from lib.key_monitor.key_monitor_full_margin_lib import (
monitor_type_disallowed_in_full_margin,
purge_disallowed_key_monitors,
)
'''
ENV_LINE = (
"# 计仓模式:risk=以损定仓(默认);full_margin=合约可用×比例全仓杠杆(仅 env 切换,须无仓)\n"
"POSITION_SIZING_MODE = load_position_sizing_mode()\n"
)
PURGE_FN = '''
def _purge_key_monitors_if_full_margin():
if not is_full_margin_mode(POSITION_SIZING_MODE):
return
conn = get_db()
try:
cancel = globals().get("_cancel_fib_monitor_limit")
if not callable(cancel):
cancel = lambda _row: None
purge_disallowed_key_monitors(
conn,
sizing_mode=POSITION_SIZING_MODE,
select_rows=lambda c: c.execute("SELECT * FROM key_monitors").fetchall(),
cancel_fib_limit=cancel,
delete_monitor=lambda c, kid: c.execute("DELETE FROM key_monitors WHERE id=?", (kid,)),
send_wechat=send_wechat_msg,
)
conn.commit()
except Exception as e:
print(f"[full_margin] purge key monitors: {e}", flush=True)
finally:
conn.close()
'''
MARKET_OPEN_GUARD = ''' ok_src, src_msg = assert_open_source_allowed(POSITION_SIZING_MODE, OPEN_SOURCE_KEY_AUTO)
if not ok_src:
return False, src_msg, None
'''
ADD_KEY_GUARD = ''' if is_full_margin_mode(POSITION_SIZING_MODE) and monitor_type_disallowed_in_full_margin(mt):
flash(
"全仓杠杆模式下不可添加箱体/收敛突破或斐波监控;"
"请改用阻力/支撑(仅提醒),或切换 POSITION_SIZING_MODE=risk 并重启(须无持仓)。"
)
return redirect("/key_monitor")
'''
TEMPLATE_RULE = ''' <div class="rule-tip">
计仓模式:<strong>{{ position_sizing_mode_label }}</strong>(仅 .env <code>POSITION_SIZING_MODE</code>,须无仓后重启)
{% if position_sizing_mode == 'full_margin' %}
|全仓:合约可用×{{ full_margin_buffer_ratio }}BTC/ETH {{ btc_leverage }}x、其它 {{ alt_leverage }}x,单仓;张数按交易所精度
{% else %}
|以损定仓:风险 {{ risk_percent }}%
{% endif %}
|移动保本:下单可勾选关闭;开启时 {{ breakeven_rr_trigger }}R 触发(每 1R 阶梯上移),偏移 {{ breakeven_offset_pct }}%
</div>'''
APPS = [
("crypto_monitor_okx", 4, "_market_open_for_key_monitor", True),
("crypto_monitor_gate", 2, "_market_open_for_key_monitor", True),
("crypto_monitor_gate_bot", 4, None, False),
]
def patch_app(app_dir: str, funds_dec: int, market_fn: str | None, has_fib: bool):
path = ROOT / app_dir / "app.py"
text = path.read_text(encoding="utf-8")
if "POSITION_SIZING_MODE" in text:
print(f"SKIP {app_dir}/app.py (already patched)")
return
if "from position_sizing_lib import" not in text:
anchor = "from key_monitor_lib import ("
if anchor not in text:
anchor = "from form_submit_lib import"
text = text.replace(
anchor,
IMPORT_BLOCK + "\n" + anchor,
1,
)
else:
text = text.replace(anchor, IMPORT_BLOCK + anchor, 1)
if "POSITION_SIZING_MODE = load_position_sizing_mode()" not in text:
text = text.replace(
"AUTO_TRANSFER_BJ_HOUR = int(os.getenv(\"AUTO_TRANSFER_BJ_HOUR\", \"8\"))\n",
"AUTO_TRANSFER_BJ_HOUR = int(os.getenv(\"AUTO_TRANSFER_BJ_HOUR\", \"8\"))\n" + ENV_LINE,
1,
)
if "_purge_key_monitors_if_full_margin" not in text:
text = text.replace("init_db()\n\n\ndef get_db():", "init_db()" + PURGE_FN + "\ndef get_db():", 1)
text = text.replace(
"install_strategy_trend(app,",
"_purge_key_monitors_if_full_margin()\n\ninstall_strategy_trend(app,",
1,
)
if market_fn and MARKET_OPEN_GUARD.strip() not in text:
text = text.replace(
f"def {market_fn}(\n",
f"def {market_fn}(\n",
1,
)
text = text.replace(
' """\n 与手动',
MARKET_OPEN_GUARD + ' """\n 与手动',
1,
)
# fallback: after docstring closing
if MARKET_OPEN_GUARD.strip() not in text:
pat = rf"(def {market_fn}\([^)]+\):\s*\n\s*\"\"\"[^\"\"]*\"\"\"\s*\n)"
text = re.sub(pat, r"\1" + MARKET_OPEN_GUARD, text, count=1)
if has_fib and ADD_KEY_GUARD.strip() not in text:
text = text.replace(
' if mt not in allowed_types:',
ADD_KEY_GUARD + ' if mt not in allowed_types:',
1,
) if "if mt not in allowed_types:" in text else text.replace(
' rank, total = _daily_volume_rank(symbol)',
ADD_KEY_GUARD + ' rank, total = _daily_volume_rank(symbol)',
1,
)
# render_template risk_percent= add template vars
if "position_sizing_mode=POSITION_SIZING_MODE" not in text:
text = text.replace(
"risk_percent=RISK_PERCENT,\n",
"risk_percent=RISK_PERCENT,\n"
" position_sizing_mode=POSITION_SIZING_MODE,\n"
" position_sizing_mode_label=mode_label_zh(POSITION_SIZING_MODE),\n"
" open_position_button_label=(\n"
' "开仓(全仓杠杆)" if is_full_margin_mode(POSITION_SIZING_MODE) else "开仓(以损定仓)"\n'
" ),\n",
1,
)
path.write_text(text, encoding="utf-8")
print(f"DONE {app_dir}/app.py (partial — verify add_order block manually if needed)")
def patch_template(app_dir: str):
tpl = ROOT / app_dir / "templates" / "index.html"
if not tpl.exists():
return
text = tpl.read_text(encoding="utf-8")
if "position_sizing_mode_label" in text:
print(f"SKIP {tpl}")
return
old = re.search(
r'<div class="rule-tip">\s*以损定仓:风险 \{\{ risk_percent \}\}%.*?</div>',
text,
re.S,
)
if old:
text = text[: old.start()] + TEMPLATE_RULE + text[old.end() :]
text = text.replace(
'<button type="submit">开仓(以损定仓)</button>',
'<button type="submit">{{ open_position_button_label }}</button>',
)
text = text.replace(
'<input id="order-leverage" name="leverage" type="number" min="1" step="1" placeholder="杠杆(可选)">',
'{% if position_sizing_mode != \'full_margin\' %}\n'
' <input id="order-leverage" name="leverage" type="number" min="1" step="1" placeholder="杠杆(可选)">\n'
' {% endif %}',
1,
)
tpl.write_text(text, encoding="utf-8")
print(f"DONE {tpl}")
def main():
for app_dir, funds, mfn, fib in APPS:
patch_app(app_dir, funds, mfn, fib)
patch_template(app_dir)
if __name__ == "__main__":
main()