84abf7e7f7
Co-authored-by: Cursor <cursoragent@cursor.com>
249 lines
7.3 KiB
Python
249 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
"""补录缺失的趋势回调策略结束快照(strategy_trade_snapshots)。
|
|
|
|
适用:gate_bot 等在计划结束(止盈/止损/手动)时因 strategy_trend_cfg 未注册而漏写快照的历史数据。
|
|
保本移交路径通常已有快照,本脚本默认跳过「已有任意快照」的计划。
|
|
|
|
用法(在仓库根目录,Linux 请用 python3):
|
|
python3 scripts/backfill_trend_strategy_snapshots.py \\
|
|
--db crypto_monitor_gate_bot/crypto.db --dry-run
|
|
python3 scripts/backfill_trend_strategy_snapshots.py \\
|
|
--db crypto_monitor_gate_bot/crypto.db --apply
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import sqlite3
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
_REPO_ROOT = Path(__file__).resolve().parents[1]
|
|
if str(_REPO_ROOT) not in sys.path:
|
|
sys.path.insert(0, str(_REPO_ROOT))
|
|
|
|
from strategy_snapshot_lib import ( # noqa: E402
|
|
STRATEGY_TREND,
|
|
init_strategy_snapshot_table,
|
|
save_trend_plan_snapshot,
|
|
)
|
|
|
|
PLAN_STATUS_LABEL = {
|
|
"stopped_sl": "止损",
|
|
"stopped_tp": "止盈",
|
|
"stopped_manual": "手动平仓",
|
|
"stopped_handoff": "保本移交",
|
|
}
|
|
|
|
TRADE_RESULT_LABEL = {
|
|
"止损": "止损",
|
|
"止盈": "止盈",
|
|
"手动平仓": "手动平仓",
|
|
"移动止盈": "止盈",
|
|
"保本止盈": "止盈",
|
|
"强制清仓": "手动平仓",
|
|
}
|
|
|
|
|
|
def _row_dict(row) -> dict:
|
|
if row is None:
|
|
return {}
|
|
try:
|
|
return dict(row)
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def infer_exit_price(
|
|
direction: str,
|
|
entry: float | None,
|
|
margin: float | None,
|
|
leverage: float | None,
|
|
pnl: float | None,
|
|
) -> float | None:
|
|
"""由本地 calc_pnl 口径反推平仓价(供补录快照 exit_price)。"""
|
|
try:
|
|
trigger = float(entry)
|
|
margin_f = float(margin)
|
|
lev = float(leverage)
|
|
pnl_f = float(pnl)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
if trigger <= 0 or margin_f <= 0 or lev <= 0:
|
|
return None
|
|
notional = margin_f * lev
|
|
if notional <= 0:
|
|
return None
|
|
ratio = pnl_f / notional
|
|
if (direction or "long").strip().lower() == "short":
|
|
return round(trigger * (1.0 - ratio), 10)
|
|
return round(trigger * (1.0 + ratio), 10)
|
|
|
|
|
|
def resolve_result_label(plan: dict, trade: dict | None) -> str:
|
|
status = (plan.get("status") or "").strip()
|
|
if status in PLAN_STATUS_LABEL:
|
|
return PLAN_STATUS_LABEL[status]
|
|
if trade:
|
|
res = (trade.get("result") or "").strip()
|
|
if res in TRADE_RESULT_LABEL:
|
|
return TRADE_RESULT_LABEL[res]
|
|
if res:
|
|
return res
|
|
msg = (plan.get("message") or "").strip()
|
|
if msg:
|
|
return msg[:32]
|
|
return "结束"
|
|
|
|
|
|
def find_missing_plans(
|
|
conn: sqlite3.Connection,
|
|
*,
|
|
plan_id: int | None = None,
|
|
since: str | None = None,
|
|
) -> list[dict]:
|
|
sql = """
|
|
SELECT p.*
|
|
FROM trend_pullback_plans p
|
|
WHERE TRIM(COALESCE(p.status, '')) != 'active'
|
|
AND NOT EXISTS (
|
|
SELECT 1 FROM strategy_trade_snapshots s
|
|
WHERE s.strategy_type = ? AND s.source_id = p.id
|
|
)
|
|
"""
|
|
params: list[object] = [STRATEGY_TREND]
|
|
if plan_id is not None:
|
|
sql += " AND p.id = ?"
|
|
params.append(int(plan_id))
|
|
if since:
|
|
sql += " AND COALESCE(p.opened_at, '') >= ?"
|
|
params.append(since.strip())
|
|
sql += " ORDER BY p.id ASC"
|
|
rows = conn.execute(sql, params).fetchall()
|
|
return [_row_dict(r) for r in rows]
|
|
|
|
|
|
def fetch_trade_for_plan(conn: sqlite3.Connection, plan_id: int) -> dict | None:
|
|
row = conn.execute(
|
|
"""
|
|
SELECT * FROM trade_records
|
|
WHERE trend_plan_id = ?
|
|
ORDER BY COALESCE(closed_at_ms, 0) DESC, id DESC
|
|
LIMIT 1
|
|
""",
|
|
(int(plan_id),),
|
|
).fetchone()
|
|
return _row_dict(row) if row else None
|
|
|
|
|
|
def backfill_one(conn: sqlite3.Connection, plan: dict, *, dry_run: bool) -> dict:
|
|
plan_id = int(plan["id"])
|
|
trade = fetch_trade_for_plan(conn, plan_id)
|
|
result_label = resolve_result_label(plan, trade)
|
|
pnl_amount = None
|
|
closed_at = None
|
|
exit_price = None
|
|
entry = plan.get("avg_entry_price") or plan.get("live_price_ref")
|
|
margin = plan.get("plan_margin_capital")
|
|
leverage = plan.get("leverage")
|
|
|
|
if trade:
|
|
pnl_amount = trade.get("pnl_amount")
|
|
closed_at = trade.get("closed_at")
|
|
entry = trade.get("trigger_price") or entry
|
|
margin = trade.get("margin_capital") or margin
|
|
leverage = trade.get("leverage") or leverage
|
|
exit_price = infer_exit_price(
|
|
plan.get("direction") or trade.get("direction") or "long",
|
|
entry,
|
|
margin,
|
|
leverage,
|
|
pnl_amount,
|
|
)
|
|
|
|
info = {
|
|
"plan_id": plan_id,
|
|
"symbol": plan.get("symbol"),
|
|
"status": plan.get("status"),
|
|
"result_label": result_label,
|
|
"closed_at": closed_at,
|
|
"pnl_amount": pnl_amount,
|
|
"exit_price": exit_price,
|
|
"legs_done": plan.get("legs_done"),
|
|
"dca_legs": plan.get("dca_legs"),
|
|
"has_trade": bool(trade),
|
|
}
|
|
|
|
if dry_run:
|
|
return info
|
|
|
|
save_trend_plan_snapshot(
|
|
{},
|
|
conn,
|
|
plan,
|
|
result_label=result_label,
|
|
exit_price=exit_price,
|
|
pnl_amount=float(pnl_amount) if pnl_amount is not None else None,
|
|
closed_at=closed_at,
|
|
)
|
|
return info
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(
|
|
description="Backfill missing trend_pullback strategy_trade_snapshots rows."
|
|
)
|
|
parser.add_argument("--db", required=True, help="Path to instance sqlite db")
|
|
parser.add_argument("--plan-id", type=int, help="Only backfill this trend plan id")
|
|
parser.add_argument(
|
|
"--since",
|
|
help="Only plans with opened_at >= YYYY-MM-DD (optional)",
|
|
)
|
|
parser.add_argument("--dry-run", action="store_true", help="Preview only (default)")
|
|
parser.add_argument("--apply", action="store_true", help="Write snapshots")
|
|
args = parser.parse_args()
|
|
if not args.dry_run and not args.apply:
|
|
args.dry_run = True
|
|
|
|
db_path = Path(args.db).expanduser().resolve()
|
|
if not db_path.is_file():
|
|
print(f"[ERR] DB not found: {db_path}")
|
|
return 1
|
|
|
|
conn = sqlite3.connect(str(db_path))
|
|
conn.row_factory = sqlite3.Row
|
|
init_strategy_snapshot_table(conn)
|
|
|
|
missing = find_missing_plans(
|
|
conn, plan_id=args.plan_id, since=args.since
|
|
)
|
|
if not missing:
|
|
print("[INFO] No closed trend plans missing strategy snapshots.")
|
|
conn.close()
|
|
return 0
|
|
|
|
print(f"[INFO] Found {len(missing)} plan(s) without strategy snapshot.")
|
|
applied = 0
|
|
for plan in missing:
|
|
info = backfill_one(conn, plan, dry_run=not args.apply)
|
|
trade_hint = "有交易记录" if info["has_trade"] else "无交易记录"
|
|
print(
|
|
f" - plan #{info['plan_id']} {info['symbol']} "
|
|
f"status={info['status']} → {info['result_label']} "
|
|
f"closed={info['closed_at'] or '—'} pnl={info['pnl_amount']} "
|
|
f"补仓 {info['legs_done']}/{info['dca_legs']} ({trade_hint})"
|
|
)
|
|
applied += 1
|
|
|
|
if args.apply:
|
|
conn.commit()
|
|
print(f"[OK] Backfilled {applied} snapshot(s).")
|
|
else:
|
|
print("[DRY-RUN] No changes written. Re-run with --apply to commit.")
|
|
|
|
conn.close()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|