fix(trend): write trade_records when hub closes plan on gate_bot

Gate_bot insert_trade_record lacked entry_reason, causing _finalize_plan to save strategy snapshots but fail trade insert. Filter kwargs by signature, insert before plan commit, and add backfill script.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-07 20:32:44 +08:00
parent 08082eb88f
commit 80226eebcf
4 changed files with 328 additions and 33 deletions
+5 -2
View File
@@ -2257,6 +2257,7 @@ def insert_trade_record(
closed_at=None,
closed_at_ms=None,
exchange_trade_id=None,
entry_reason=None,
trend_plan_id=None,
):
hold_minutes = calc_hold_minutes(hold_seconds)
@@ -2264,13 +2265,15 @@ def insert_trade_record(
close_ts = closed_at or app_now_str()
open_ts_ms = _to_ms_with_fallback(opened_at_ms, open_ts)
close_ts_ms = _to_ms_with_fallback(closed_at_ms, close_ts)
er = (entry_reason or "").strip() or None
conn.execute(
"INSERT INTO trade_records (symbol,monitor_type,direction,trigger_price,stop_loss,initial_stop_loss,take_profit,margin_capital,leverage,pnl_amount,hold_seconds,trade_style,risk_amount,planned_rr,actual_rr,hold_minutes,opened_at,opened_at_ms,closed_at,closed_at_ms,result,miss_reason,exchange_trade_id,trend_plan_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
"INSERT INTO trade_records (symbol,monitor_type,direction,trigger_price,stop_loss,initial_stop_loss,take_profit,margin_capital,leverage,pnl_amount,hold_seconds,trade_style,risk_amount,planned_rr,actual_rr,hold_minutes,opened_at,opened_at_ms,closed_at,closed_at_ms,result,miss_reason,exchange_trade_id,entry_reason,trend_plan_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
symbol, monitor_type, direction, trigger_price, stop_loss, initial_stop_loss, take_profit,
margin_capital, leverage, pnl_amount, hold_seconds,
trade_style, risk_amount, planned_rr, actual_rr, hold_minutes,
open_ts, open_ts_ms, close_ts, close_ts_ms, result, miss_reason, exchange_trade_id, trend_plan_id
open_ts, open_ts_ms, close_ts, close_ts_ms, result, miss_reason, exchange_trade_id, er,
trend_plan_id,
)
)
+188
View File
@@ -0,0 +1,188 @@
#!/usr/bin/env python3
"""补录缺失的趋势回调 trade_records(策略快照已有、交易记录漏写)。
典型原因:gate_bot insert_trade_record 曾不接受 entry_reason_finalize_plan 写快照后插入失败。
用法:
python scripts/backfill_trend_trade_records.py --db crypto_monitor_gate_bot/crypto.db --dry-run
python scripts/backfill_trend_trade_records.py --db crypto_monitor_gate_bot/crypto.db --apply
"""
from __future__ import annotations
import argparse
import json
import sqlite3
import sys
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parents[1]
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from strategy_snapshot_lib import STRATEGY_TREND # noqa: E402
from strategy_trade_labels import ENTRY_REASON_TREND_PULLBACK, MONITOR_TYPE_TREND_PULLBACK # noqa: E402
STATUS_TO_RESULT = {
"stopped_sl": "止损",
"stopped_tp": "止盈",
"stopped_manual": "手动平仓",
}
def _row_dict(row) -> dict:
if row is None:
return {}
try:
return dict(row)
except Exception:
return {}
def _hold_minutes(hold_seconds: int) -> int:
try:
return max(0, int(round(float(hold_seconds) / 60.0)))
except (TypeError, ValueError):
return 0
def backfill_one(conn: sqlite3.Connection, snap: dict, *, apply: bool) -> dict:
plan_id = int(snap.get("source_id") or 0)
if plan_id <= 0:
return {"plan_id": plan_id, "skipped": True, "reason": "invalid source_id"}
exists = conn.execute(
"SELECT id FROM trade_records WHERE trend_plan_id=? LIMIT 1", (plan_id,)
).fetchone()
if exists:
return {"plan_id": plan_id, "skipped": True, "reason": "trade_exists"}
try:
payload = json.loads(snap.get("snapshot_json") or "{}")
except Exception:
payload = {}
plan = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=?", (plan_id,)
).fetchone()
plan_d = _row_dict(plan)
symbol = snap.get("symbol") or plan_d.get("symbol") or payload.get("symbol")
direction = snap.get("direction") or plan_d.get("direction") or payload.get("direction") or "long"
result = (snap.get("result_label") or "").strip() or STATUS_TO_RESULT.get(
plan_d.get("status") or "", "手动平仓"
)
opened_at = snap.get("opened_at") or plan_d.get("opened_at")
closed_at = snap.get("closed_at")
pnl_amount = snap.get("pnl_amount")
if pnl_amount is None:
pnl_amount = payload.get("pnl_amount")
trigger_price = payload.get("avg_entry_price") or plan_d.get("avg_entry_price")
stop_loss = payload.get("stop_loss") or plan_d.get("stop_loss")
take_profit = payload.get("take_profit") or plan_d.get("take_profit")
margin_capital = payload.get("plan_margin_capital") or plan_d.get("plan_margin_capital")
leverage = payload.get("leverage") or plan_d.get("leverage")
opened_ms = plan_d.get("opened_at_ms")
closed_ms = None
hold_seconds = 0
if opened_at and closed_at:
try:
from datetime import datetime
fmt = "%Y-%m-%d %H:%M:%S"
o = datetime.strptime(str(opened_at).strip()[:19], fmt)
c = datetime.strptime(str(closed_at).strip()[:19], fmt)
hold_seconds = max(0, int((c - o).total_seconds()))
except Exception:
hold_seconds = 0
row = {
"symbol": symbol,
"monitor_type": MONITOR_TYPE_TREND_PULLBACK,
"direction": direction,
"trigger_price": trigger_price,
"stop_loss": stop_loss,
"initial_stop_loss": plan_d.get("initial_stop_loss") or stop_loss,
"take_profit": take_profit,
"margin_capital": margin_capital,
"leverage": leverage,
"pnl_amount": pnl_amount,
"hold_seconds": hold_seconds,
"trade_style": "trend_pullback",
"result": result,
"opened_at": opened_at,
"opened_at_ms": opened_ms,
"closed_at": closed_at,
"closed_at_ms": closed_ms,
"entry_reason": ENTRY_REASON_TREND_PULLBACK,
"trend_plan_id": plan_id,
}
if not apply:
return {"plan_id": plan_id, "dry_run": True, "row": row}
conn.execute(
"""INSERT INTO trade_records (
symbol, monitor_type, direction, trigger_price, stop_loss, initial_stop_loss,
take_profit, margin_capital, leverage, pnl_amount, hold_seconds, trade_style,
hold_minutes, opened_at, opened_at_ms, closed_at, closed_at_ms, result,
entry_reason, trend_plan_id
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
row["symbol"],
row["monitor_type"],
row["direction"],
row["trigger_price"],
row["stop_loss"],
row["initial_stop_loss"],
row["take_profit"],
row["margin_capital"],
row["leverage"],
row["pnl_amount"],
row["hold_seconds"],
row["trade_style"],
_hold_minutes(hold_seconds),
row["opened_at"],
row["opened_at_ms"],
row["closed_at"],
row["closed_at_ms"],
row["result"],
row["entry_reason"],
row["trend_plan_id"],
),
)
return {"plan_id": plan_id, "inserted": True}
def main() -> int:
ap = argparse.ArgumentParser()
ap.add_argument("--db", required=True, help="实例 sqlite 路径")
ap.add_argument("--apply", action="store_true", help="写入数据库(默认 dry-run")
args = ap.parse_args()
db_path = Path(args.db)
if not db_path.is_file():
print(f"数据库不存在: {db_path}")
return 1
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
snaps = conn.execute(
"""SELECT * FROM strategy_trade_snapshots
WHERE strategy_type=? ORDER BY id DESC""",
(STRATEGY_TREND,),
).fetchall()
out = []
for s in snaps:
r = backfill_one(conn, _row_dict(s), apply=args.apply)
out.append(r)
print(r)
if args.apply:
conn.commit()
conn.close()
inserted = sum(1 for x in out if x.get("inserted"))
print(f"done: inserted={inserted} total_snapshots={len(snaps)} apply={args.apply}")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+43 -31
View File
@@ -645,6 +645,18 @@ def _plan_stop_status(result_label: str) -> str:
return "stopped_manual"
def _call_insert_trade_record(m, plan_id: int, kwargs: dict) -> None:
"""按各所 insert_trade_record 签名过滤参数,避免未知字段导致记账失败。"""
fn = getattr(m, "insert_trade_record", None)
if not callable(fn):
raise RuntimeError("app_module 缺少 insert_trade_record")
allowed = set(inspect.signature(fn).parameters.keys())
call = {k: v for k, v in kwargs.items() if k in allowed}
if "trend_plan_id" in allowed:
call["trend_plan_id"] = int(plan_id)
fn(**call)
def _trend_plan_trade_exists(conn, plan_id: int) -> bool:
try:
return conn.execute(
@@ -705,6 +717,37 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) -
cancel_symbol_orders(cfg, ex_sym)
except Exception:
pass
if not _trend_plan_trade_exists(conn, plan_id):
session_date = row["session_date"] or m.get_trading_day()
session_capital = m.update_session_capital(conn, session_date, pnl_amount)
_call_insert_trade_record(
m,
plan_id,
dict(
conn=conn,
symbol=sym,
monitor_type=MONITOR_TYPE_TREND,
direction=direction,
trigger_price=avg_e,
stop_loss=float(row["stop_loss"]),
initial_stop_loss=float(row.get("initial_stop_loss") or row["stop_loss"]),
take_profit=float(row["take_profit"]),
margin_capital=margin_cap,
leverage=lev,
pnl_amount=pnl_amount,
hold_seconds=hold_seconds,
trade_style="trend_pullback",
risk_amount=risk_amt,
planned_rr=planned_rr,
actual_rr=m.calc_actual_rr(pnl_amount, risk_amt),
result=res,
opened_at=opened_at,
closed_at=closed_at,
entry_reason=ENTRY_REASON_TREND_PULLBACK,
),
)
else:
session_capital = None
st = _plan_stop_status(result_label)
cur = conn.execute(
"UPDATE trend_pullback_plans SET status=?, message=? WHERE id=? AND status='active'",
@@ -728,36 +771,6 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) -
)
except Exception:
pass
if _trend_plan_trade_exists(conn, plan_id):
return
session_date = row["session_date"] or m.get_trading_day()
session_capital = m.update_session_capital(conn, session_date, pnl_amount)
kwargs = dict(
conn=conn,
symbol=sym,
monitor_type=MONITOR_TYPE_TREND,
direction=direction,
trigger_price=avg_e,
stop_loss=float(row["stop_loss"]),
initial_stop_loss=float(row.get("initial_stop_loss") or row["stop_loss"]),
take_profit=float(row["take_profit"]),
margin_capital=margin_cap,
leverage=lev,
pnl_amount=pnl_amount,
hold_seconds=hold_seconds,
trade_style="trend_pullback",
risk_amount=risk_amt,
planned_rr=planned_rr,
actual_rr=m.calc_actual_rr(pnl_amount, risk_amt),
result=res,
opened_at=opened_at,
closed_at=closed_at,
entry_reason=ENTRY_REASON_TREND_PULLBACK,
)
if "trend_plan_id" in inspect.signature(m.insert_trade_record).parameters:
m.insert_trade_record(**kwargs, trend_plan_id=plan_id)
else:
m.insert_trade_record(**kwargs)
extra = getattr(m, "build_wechat_close_message", None)
send = getattr(m, "send_wechat_msg", None)
if callable(extra) and callable(send):
@@ -777,7 +790,6 @@ def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) -
session_capital_fallback=session_capital,
)
)
conn.commit()
def _trend_plan_open_age_sec(row, m) -> float:
+92
View File
@@ -0,0 +1,92 @@
"""趋势计划结束:须写入 trade_records(四所统一)。"""
from __future__ import annotations
import inspect
import sqlite3
import sys
import unittest
from pathlib import Path
from unittest.mock import MagicMock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from strategy_trend_register import _call_insert_trade_record # noqa: E402
class _GateBotLikeModule:
"""模拟 gate_bot:曾有 trend_plan_id 但缺 entry_reason 参数。"""
@staticmethod
def insert_trade_record(
conn,
symbol,
monitor_type,
direction,
trigger_price,
stop_loss,
initial_stop_loss=None,
take_profit=None,
margin_capital=None,
leverage=None,
pnl_amount=0,
hold_seconds=0,
trade_style=None,
risk_amount=None,
planned_rr=None,
actual_rr=None,
result="",
miss_reason=None,
opened_at=None,
opened_at_ms=None,
closed_at=None,
closed_at_ms=None,
exchange_trade_id=None,
trend_plan_id=None,
):
conn.execute(
"INSERT INTO trade_records (symbol, monitor_type, direction, result, trend_plan_id) "
"VALUES (?,?,?,?,?)",
(symbol, monitor_type, direction, result, trend_plan_id),
)
class TestTrendFinalizeTradeRecord(unittest.TestCase):
def test_call_insert_filters_unknown_entry_reason(self):
conn = sqlite3.connect(":memory:")
conn.execute(
"CREATE TABLE trade_records (symbol TEXT, monitor_type TEXT, direction TEXT, "
"result TEXT, trend_plan_id INTEGER)"
)
m = _GateBotLikeModule()
_call_insert_trade_record(
m,
4,
dict(
conn=conn,
symbol="ONDO/USDT",
monitor_type="趋势回调",
direction="long",
trigger_price=0.35,
stop_loss=0.329,
result="止损",
entry_reason="趋势回调",
),
)
row = conn.execute(
"SELECT symbol, monitor_type, trend_plan_id FROM trade_records"
).fetchone()
self.assertEqual(row[0], "ONDO/USDT")
self.assertEqual(row[1], "趋势回调")
self.assertEqual(row[2], 4)
def test_gate_bot_insert_accepts_entry_reason(self):
from crypto_monitor_gate_bot import app as gate_bot_app # noqa: E402
sig = inspect.signature(gate_bot_app.insert_trade_record)
self.assertIn("entry_reason", sig.parameters)
self.assertIn("trend_plan_id", sig.parameters)
if __name__ == "__main__":
unittest.main()