diff --git a/crypto_monitor_gate_bot/app.py b/crypto_monitor_gate_bot/app.py index 966732c..60a2c7c 100644 --- a/crypto_monitor_gate_bot/app.py +++ b/crypto_monitor_gate_bot/app.py @@ -2257,6 +2257,7 @@ def insert_trade_record( closed_at=None, closed_at_ms=None, exchange_trade_id=None, + entry_reason=None, trend_plan_id=None, ): hold_minutes = calc_hold_minutes(hold_seconds) @@ -2264,13 +2265,15 @@ def insert_trade_record( close_ts = closed_at or app_now_str() open_ts_ms = _to_ms_with_fallback(opened_at_ms, open_ts) close_ts_ms = _to_ms_with_fallback(closed_at_ms, close_ts) + er = (entry_reason or "").strip() or None conn.execute( - "INSERT INTO trade_records (symbol,monitor_type,direction,trigger_price,stop_loss,initial_stop_loss,take_profit,margin_capital,leverage,pnl_amount,hold_seconds,trade_style,risk_amount,planned_rr,actual_rr,hold_minutes,opened_at,opened_at_ms,closed_at,closed_at_ms,result,miss_reason,exchange_trade_id,trend_plan_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", + "INSERT INTO trade_records (symbol,monitor_type,direction,trigger_price,stop_loss,initial_stop_loss,take_profit,margin_capital,leverage,pnl_amount,hold_seconds,trade_style,risk_amount,planned_rr,actual_rr,hold_minutes,opened_at,opened_at_ms,closed_at,closed_at_ms,result,miss_reason,exchange_trade_id,entry_reason,trend_plan_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", ( symbol, monitor_type, direction, trigger_price, stop_loss, initial_stop_loss, take_profit, margin_capital, leverage, pnl_amount, hold_seconds, trade_style, risk_amount, planned_rr, actual_rr, hold_minutes, - open_ts, open_ts_ms, close_ts, close_ts_ms, result, miss_reason, exchange_trade_id, trend_plan_id + open_ts, open_ts_ms, close_ts, close_ts_ms, result, miss_reason, exchange_trade_id, er, + trend_plan_id, ) ) diff --git a/scripts/backfill_trend_trade_records.py b/scripts/backfill_trend_trade_records.py new file mode 100644 index 0000000..306a5c9 --- /dev/null +++ b/scripts/backfill_trend_trade_records.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +"""补录缺失的趋势回调 trade_records(策略快照已有、交易记录漏写)。 + +典型原因:gate_bot insert_trade_record 曾不接受 entry_reason,_finalize_plan 写快照后插入失败。 + +用法: + python scripts/backfill_trend_trade_records.py --db crypto_monitor_gate_bot/crypto.db --dry-run + python scripts/backfill_trend_trade_records.py --db crypto_monitor_gate_bot/crypto.db --apply +""" +from __future__ import annotations + +import argparse +import json +import sqlite3 +import sys +from pathlib import Path + +_REPO_ROOT = Path(__file__).resolve().parents[1] +if str(_REPO_ROOT) not in sys.path: + sys.path.insert(0, str(_REPO_ROOT)) + +from strategy_snapshot_lib import STRATEGY_TREND # noqa: E402 +from strategy_trade_labels import ENTRY_REASON_TREND_PULLBACK, MONITOR_TYPE_TREND_PULLBACK # noqa: E402 + +STATUS_TO_RESULT = { + "stopped_sl": "止损", + "stopped_tp": "止盈", + "stopped_manual": "手动平仓", +} + + +def _row_dict(row) -> dict: + if row is None: + return {} + try: + return dict(row) + except Exception: + return {} + + +def _hold_minutes(hold_seconds: int) -> int: + try: + return max(0, int(round(float(hold_seconds) / 60.0))) + except (TypeError, ValueError): + return 0 + + +def backfill_one(conn: sqlite3.Connection, snap: dict, *, apply: bool) -> dict: + plan_id = int(snap.get("source_id") or 0) + if plan_id <= 0: + return {"plan_id": plan_id, "skipped": True, "reason": "invalid source_id"} + exists = conn.execute( + "SELECT id FROM trade_records WHERE trend_plan_id=? LIMIT 1", (plan_id,) + ).fetchone() + if exists: + return {"plan_id": plan_id, "skipped": True, "reason": "trade_exists"} + + try: + payload = json.loads(snap.get("snapshot_json") or "{}") + except Exception: + payload = {} + + plan = conn.execute( + "SELECT * FROM trend_pullback_plans WHERE id=?", (plan_id,) + ).fetchone() + plan_d = _row_dict(plan) + + symbol = snap.get("symbol") or plan_d.get("symbol") or payload.get("symbol") + direction = snap.get("direction") or plan_d.get("direction") or payload.get("direction") or "long" + result = (snap.get("result_label") or "").strip() or STATUS_TO_RESULT.get( + plan_d.get("status") or "", "手动平仓" + ) + opened_at = snap.get("opened_at") or plan_d.get("opened_at") + closed_at = snap.get("closed_at") + pnl_amount = snap.get("pnl_amount") + if pnl_amount is None: + pnl_amount = payload.get("pnl_amount") + + trigger_price = payload.get("avg_entry_price") or plan_d.get("avg_entry_price") + stop_loss = payload.get("stop_loss") or plan_d.get("stop_loss") + take_profit = payload.get("take_profit") or plan_d.get("take_profit") + margin_capital = payload.get("plan_margin_capital") or plan_d.get("plan_margin_capital") + leverage = payload.get("leverage") or plan_d.get("leverage") + + opened_ms = plan_d.get("opened_at_ms") + closed_ms = None + + hold_seconds = 0 + if opened_at and closed_at: + try: + from datetime import datetime + + fmt = "%Y-%m-%d %H:%M:%S" + o = datetime.strptime(str(opened_at).strip()[:19], fmt) + c = datetime.strptime(str(closed_at).strip()[:19], fmt) + hold_seconds = max(0, int((c - o).total_seconds())) + except Exception: + hold_seconds = 0 + + row = { + "symbol": symbol, + "monitor_type": MONITOR_TYPE_TREND_PULLBACK, + "direction": direction, + "trigger_price": trigger_price, + "stop_loss": stop_loss, + "initial_stop_loss": plan_d.get("initial_stop_loss") or stop_loss, + "take_profit": take_profit, + "margin_capital": margin_capital, + "leverage": leverage, + "pnl_amount": pnl_amount, + "hold_seconds": hold_seconds, + "trade_style": "trend_pullback", + "result": result, + "opened_at": opened_at, + "opened_at_ms": opened_ms, + "closed_at": closed_at, + "closed_at_ms": closed_ms, + "entry_reason": ENTRY_REASON_TREND_PULLBACK, + "trend_plan_id": plan_id, + } + + if not apply: + return {"plan_id": plan_id, "dry_run": True, "row": row} + + conn.execute( + """INSERT INTO trade_records ( + symbol, monitor_type, direction, trigger_price, stop_loss, initial_stop_loss, + take_profit, margin_capital, leverage, pnl_amount, hold_seconds, trade_style, + hold_minutes, opened_at, opened_at_ms, closed_at, closed_at_ms, result, + entry_reason, trend_plan_id + ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", + ( + row["symbol"], + row["monitor_type"], + row["direction"], + row["trigger_price"], + row["stop_loss"], + row["initial_stop_loss"], + row["take_profit"], + row["margin_capital"], + row["leverage"], + row["pnl_amount"], + row["hold_seconds"], + row["trade_style"], + _hold_minutes(hold_seconds), + row["opened_at"], + row["opened_at_ms"], + row["closed_at"], + row["closed_at_ms"], + row["result"], + row["entry_reason"], + row["trend_plan_id"], + ), + ) + return {"plan_id": plan_id, "inserted": True} + + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument("--db", required=True, help="实例 sqlite 路径") + ap.add_argument("--apply", action="store_true", help="写入数据库(默认 dry-run)") + args = ap.parse_args() + db_path = Path(args.db) + if not db_path.is_file(): + print(f"数据库不存在: {db_path}") + return 1 + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + snaps = conn.execute( + """SELECT * FROM strategy_trade_snapshots + WHERE strategy_type=? ORDER BY id DESC""", + (STRATEGY_TREND,), + ).fetchall() + out = [] + for s in snaps: + r = backfill_one(conn, _row_dict(s), apply=args.apply) + out.append(r) + print(r) + if args.apply: + conn.commit() + conn.close() + inserted = sum(1 for x in out if x.get("inserted")) + print(f"done: inserted={inserted} total_snapshots={len(snaps)} apply={args.apply}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/strategy_trend_register.py b/strategy_trend_register.py index f17a1af..aad9b65 100644 --- a/strategy_trend_register.py +++ b/strategy_trend_register.py @@ -645,6 +645,18 @@ def _plan_stop_status(result_label: str) -> str: return "stopped_manual" +def _call_insert_trade_record(m, plan_id: int, kwargs: dict) -> None: + """按各所 insert_trade_record 签名过滤参数,避免未知字段导致记账失败。""" + fn = getattr(m, "insert_trade_record", None) + if not callable(fn): + raise RuntimeError("app_module 缺少 insert_trade_record") + allowed = set(inspect.signature(fn).parameters.keys()) + call = {k: v for k, v in kwargs.items() if k in allowed} + if "trend_plan_id" in allowed: + call["trend_plan_id"] = int(plan_id) + fn(**call) + + def _trend_plan_trade_exists(conn, plan_id: int) -> bool: try: return conn.execute( @@ -705,6 +717,37 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) - cancel_symbol_orders(cfg, ex_sym) except Exception: pass + if not _trend_plan_trade_exists(conn, plan_id): + session_date = row["session_date"] or m.get_trading_day() + session_capital = m.update_session_capital(conn, session_date, pnl_amount) + _call_insert_trade_record( + m, + plan_id, + dict( + conn=conn, + symbol=sym, + monitor_type=MONITOR_TYPE_TREND, + direction=direction, + trigger_price=avg_e, + stop_loss=float(row["stop_loss"]), + initial_stop_loss=float(row.get("initial_stop_loss") or row["stop_loss"]), + take_profit=float(row["take_profit"]), + margin_capital=margin_cap, + leverage=lev, + pnl_amount=pnl_amount, + hold_seconds=hold_seconds, + trade_style="trend_pullback", + risk_amount=risk_amt, + planned_rr=planned_rr, + actual_rr=m.calc_actual_rr(pnl_amount, risk_amt), + result=res, + opened_at=opened_at, + closed_at=closed_at, + entry_reason=ENTRY_REASON_TREND_PULLBACK, + ), + ) + else: + session_capital = None st = _plan_stop_status(result_label) cur = conn.execute( "UPDATE trend_pullback_plans SET status=?, message=? WHERE id=? AND status='active'", @@ -728,36 +771,6 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) - ) except Exception: pass - if _trend_plan_trade_exists(conn, plan_id): - return - session_date = row["session_date"] or m.get_trading_day() - session_capital = m.update_session_capital(conn, session_date, pnl_amount) - kwargs = dict( - conn=conn, - symbol=sym, - monitor_type=MONITOR_TYPE_TREND, - direction=direction, - trigger_price=avg_e, - stop_loss=float(row["stop_loss"]), - initial_stop_loss=float(row.get("initial_stop_loss") or row["stop_loss"]), - take_profit=float(row["take_profit"]), - margin_capital=margin_cap, - leverage=lev, - pnl_amount=pnl_amount, - hold_seconds=hold_seconds, - trade_style="trend_pullback", - risk_amount=risk_amt, - planned_rr=planned_rr, - actual_rr=m.calc_actual_rr(pnl_amount, risk_amt), - result=res, - opened_at=opened_at, - closed_at=closed_at, - entry_reason=ENTRY_REASON_TREND_PULLBACK, - ) - if "trend_plan_id" in inspect.signature(m.insert_trade_record).parameters: - m.insert_trade_record(**kwargs, trend_plan_id=plan_id) - else: - m.insert_trade_record(**kwargs) extra = getattr(m, "build_wechat_close_message", None) send = getattr(m, "send_wechat_msg", None) if callable(extra) and callable(send): @@ -777,7 +790,6 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) - session_capital_fallback=session_capital, ) ) - conn.commit() def _trend_plan_open_age_sec(row, m) -> float: diff --git a/tests/test_trend_finalize_trade_record.py b/tests/test_trend_finalize_trade_record.py new file mode 100644 index 0000000..ca76b85 --- /dev/null +++ b/tests/test_trend_finalize_trade_record.py @@ -0,0 +1,92 @@ +"""趋势计划结束:须写入 trade_records(四所统一)。""" +from __future__ import annotations + +import inspect +import sqlite3 +import sys +import unittest +from pathlib import Path +from unittest.mock import MagicMock + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT)) + +from strategy_trend_register import _call_insert_trade_record # noqa: E402 + + +class _GateBotLikeModule: + """模拟 gate_bot:曾有 trend_plan_id 但缺 entry_reason 参数。""" + + @staticmethod + def insert_trade_record( + conn, + symbol, + monitor_type, + direction, + trigger_price, + stop_loss, + initial_stop_loss=None, + take_profit=None, + margin_capital=None, + leverage=None, + pnl_amount=0, + hold_seconds=0, + trade_style=None, + risk_amount=None, + planned_rr=None, + actual_rr=None, + result="", + miss_reason=None, + opened_at=None, + opened_at_ms=None, + closed_at=None, + closed_at_ms=None, + exchange_trade_id=None, + trend_plan_id=None, + ): + conn.execute( + "INSERT INTO trade_records (symbol, monitor_type, direction, result, trend_plan_id) " + "VALUES (?,?,?,?,?)", + (symbol, monitor_type, direction, result, trend_plan_id), + ) + + +class TestTrendFinalizeTradeRecord(unittest.TestCase): + def test_call_insert_filters_unknown_entry_reason(self): + conn = sqlite3.connect(":memory:") + conn.execute( + "CREATE TABLE trade_records (symbol TEXT, monitor_type TEXT, direction TEXT, " + "result TEXT, trend_plan_id INTEGER)" + ) + m = _GateBotLikeModule() + _call_insert_trade_record( + m, + 4, + dict( + conn=conn, + symbol="ONDO/USDT", + monitor_type="趋势回调", + direction="long", + trigger_price=0.35, + stop_loss=0.329, + result="止损", + entry_reason="趋势回调", + ), + ) + row = conn.execute( + "SELECT symbol, monitor_type, trend_plan_id FROM trade_records" + ).fetchone() + self.assertEqual(row[0], "ONDO/USDT") + self.assertEqual(row[1], "趋势回调") + self.assertEqual(row[2], 4) + + def test_gate_bot_insert_accepts_entry_reason(self): + from crypto_monitor_gate_bot import app as gate_bot_app # noqa: E402 + + sig = inspect.signature(gate_bot_app.insert_trade_record) + self.assertIn("entry_reason", sig.parameters) + self.assertIn("trend_plan_id", sig.parameters) + + +if __name__ == "__main__": + unittest.main()