fix(hub): merge strategy snapshots into archive for gate_bot
Include strategy_trade_snapshots when trade_records is empty, harden SQL for older schemas, and show per-exchange sync errors in the archive UI. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+238
-25
@@ -228,6 +228,195 @@ def _normalize_archive_trade_row(
|
||||
}
|
||||
|
||||
|
||||
_SNAPSHOT_STATUS_TO_RESULT = {
|
||||
"stopped_sl": "止损",
|
||||
"stopped_tp": "止盈",
|
||||
"stopped_manual": "手动平仓",
|
||||
"stopped_external": "外部平仓",
|
||||
}
|
||||
|
||||
|
||||
def _table_columns(conn, table: str) -> set[str]:
|
||||
try:
|
||||
rows = conn.execute(f"PRAGMA table_info({table})").fetchall()
|
||||
except Exception:
|
||||
return set()
|
||||
out: set[str] = set()
|
||||
for r in rows:
|
||||
try:
|
||||
out.add(str(r[1]))
|
||||
except (IndexError, TypeError):
|
||||
try:
|
||||
out.add(str(r["name"]))
|
||||
except Exception:
|
||||
continue
|
||||
return out
|
||||
|
||||
|
||||
def _archive_ts_expr(cols: set[str]) -> str:
|
||||
parts = [c for c in ("reviewed_closed_at", "closed_at", "created_at", "opened_at") if c in cols]
|
||||
if not parts:
|
||||
return "''"
|
||||
return f"REPLACE(COALESCE({', '.join(parts)}), 'T', ' ')"
|
||||
|
||||
|
||||
def _archive_trade_select_sql(cols: set[str]) -> str:
|
||||
wanted = [
|
||||
"id",
|
||||
"symbol",
|
||||
"direction",
|
||||
"result",
|
||||
"reviewed_result",
|
||||
"pnl_amount",
|
||||
"reviewed_pnl_amount",
|
||||
"exchange_realized_pnl",
|
||||
"closed_at",
|
||||
"reviewed_closed_at",
|
||||
"opened_at",
|
||||
"reviewed_opened_at",
|
||||
"opened_at_ms",
|
||||
"closed_at_ms",
|
||||
"created_at",
|
||||
"monitor_type",
|
||||
"actual_rr",
|
||||
"planned_rr",
|
||||
"trade_style",
|
||||
"entry_reason",
|
||||
"trigger_price",
|
||||
"stop_loss",
|
||||
"take_profit",
|
||||
"reviewed_stop_loss",
|
||||
"reviewed_take_profit",
|
||||
"reviewed_at",
|
||||
"trend_plan_id",
|
||||
]
|
||||
select_cols = [c for c in wanted if c in cols]
|
||||
if "id" not in select_cols:
|
||||
select_cols = ["id"] + select_cols
|
||||
return ", ".join(select_cols)
|
||||
|
||||
|
||||
def _existing_trend_plan_ids(conn) -> set[int]:
|
||||
cols = _table_columns(conn, "trade_records")
|
||||
if "trend_plan_id" not in cols:
|
||||
return set()
|
||||
rows = conn.execute(
|
||||
"SELECT DISTINCT trend_plan_id FROM trade_records WHERE trend_plan_id IS NOT NULL"
|
||||
).fetchall()
|
||||
out: set[int] = set()
|
||||
for row in rows:
|
||||
d = _row_dict(row)
|
||||
try:
|
||||
out.add(int(d.get("trend_plan_id")))
|
||||
except (TypeError, ValueError):
|
||||
continue
|
||||
return out
|
||||
|
||||
|
||||
def _normalize_snapshot_archive_row(
|
||||
snap: dict,
|
||||
*,
|
||||
reset_hour: int = 8,
|
||||
) -> dict[str, Any] | None:
|
||||
result = str(snap.get("result_label") or "").strip()
|
||||
if not result:
|
||||
result = _SNAPSHOT_STATUS_TO_RESULT.get(
|
||||
str(snap.get("status_at_close") or "").strip(), ""
|
||||
)
|
||||
if result not in TRADE_COMPLETED_RESULTS:
|
||||
return None
|
||||
closed_at = snap.get("closed_at")
|
||||
close_dt = parse_dt_for_trading_day(closed_at)
|
||||
if not close_dt:
|
||||
return None
|
||||
opened_at = snap.get("opened_at")
|
||||
opened_ms = _parse_ms_from_row(snap.get("opened_at"))
|
||||
closed_ms = _parse_ms_from_row(closed_at)
|
||||
try:
|
||||
snap_id = int(snap.get("id"))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
try:
|
||||
pnl = float(snap.get("pnl_amount") or 0)
|
||||
except (TypeError, ValueError):
|
||||
pnl = 0.0
|
||||
st = str(snap.get("strategy_type") or "").strip()
|
||||
monitor_type = "trend_pullback" if st == "trend_pullback" else ("roll" if st == "roll" else st)
|
||||
return {
|
||||
"id": -snap_id,
|
||||
"symbol": (snap.get("symbol") or "").strip().upper(),
|
||||
"direction": snap.get("direction"),
|
||||
"result": result,
|
||||
"pnl_amount": round(pnl, 4),
|
||||
"closed_at": closed_at,
|
||||
"opened_at": opened_at,
|
||||
"opened_at_ms": opened_ms,
|
||||
"closed_at_ms": closed_ms,
|
||||
"monitor_type": monitor_type,
|
||||
"entry_reason": "trend_pullback" if st == "trend_pullback" else monitor_type,
|
||||
"from_snapshot": True,
|
||||
"snapshot_id": snap_id,
|
||||
"trend_plan_id": snap.get("source_id"),
|
||||
"trading_day": trading_day_from_dt(close_dt, reset_hour),
|
||||
}
|
||||
|
||||
|
||||
def _parse_ms_from_row(raw: Any) -> int | None:
|
||||
if raw in (None, ""):
|
||||
return None
|
||||
try:
|
||||
if isinstance(raw, (int, float)):
|
||||
v = int(raw)
|
||||
return v if v > 1_000_000_000_000 else v * 1000
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
dt = parse_dt_for_trading_day(raw)
|
||||
return int(dt.timestamp() * 1000) if dt else None
|
||||
|
||||
|
||||
def _fetch_strategy_snapshots_for_archive(
|
||||
conn,
|
||||
*,
|
||||
days: int = 365,
|
||||
reset_hour: int = 8,
|
||||
limit: int = 2000,
|
||||
skip_plan_ids: set[int] | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
cols = _table_columns(conn, "strategy_trade_snapshots")
|
||||
if not cols:
|
||||
return []
|
||||
lim = max(1, min(int(limit or 2000), 5000))
|
||||
day_span = max(1, min(int(days or 365), 3650))
|
||||
cutoff = datetime.now() - timedelta(days=day_span)
|
||||
cutoff_s = cutoff.strftime("%Y-%m-%d %H:%M:%S")
|
||||
ts_expr = "REPLACE(COALESCE(closed_at, opened_at, created_at), 'T', ' ')"
|
||||
rows = conn.execute(
|
||||
f"""
|
||||
SELECT * FROM strategy_trade_snapshots
|
||||
WHERE {ts_expr} >= ?
|
||||
ORDER BY {ts_expr} DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(cutoff_s, lim * 2),
|
||||
).fetchall()
|
||||
skip = skip_plan_ids or set()
|
||||
out: list[dict[str, Any]] = []
|
||||
for row in rows:
|
||||
d = _row_dict(row)
|
||||
try:
|
||||
source_id = int(d.get("source_id") or 0)
|
||||
except (TypeError, ValueError):
|
||||
source_id = 0
|
||||
if source_id > 0 and source_id in skip:
|
||||
continue
|
||||
norm = _normalize_snapshot_archive_row(d, reset_hour=reset_hour)
|
||||
if norm:
|
||||
out.append(norm)
|
||||
if len(out) >= lim:
|
||||
break
|
||||
return out
|
||||
|
||||
|
||||
def fetch_trades_for_archive(
|
||||
conn,
|
||||
*,
|
||||
@@ -235,36 +424,60 @@ def fetch_trades_for_archive(
|
||||
row_to_dict_fn: Optional[Callable] = None,
|
||||
reset_hour: int = 8,
|
||||
limit: int = 2000,
|
||||
include_strategy_snapshots: bool = True,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""返回近 N 天已平仓记录(供币种档案聚合)。"""
|
||||
"""返回近 N 天已平仓记录(trade_records + 未落库的 strategy 快照)。"""
|
||||
lim = max(1, min(int(limit or 2000), 5000))
|
||||
day_span = max(1, min(int(days or 365), 3650))
|
||||
cutoff = datetime.now() - timedelta(days=day_span)
|
||||
cutoff_s = cutoff.strftime("%Y-%m-%d %H:%M:%S")
|
||||
ts_expr = "REPLACE(COALESCE(reviewed_closed_at, closed_at, created_at, opened_at), 'T', ' ')"
|
||||
rows = conn.execute(
|
||||
f"""
|
||||
SELECT id, symbol, direction, result, reviewed_result, pnl_amount, reviewed_pnl_amount,
|
||||
exchange_realized_pnl, closed_at, reviewed_closed_at, opened_at, reviewed_opened_at,
|
||||
opened_at_ms, closed_at_ms, created_at, monitor_type, actual_rr, planned_rr,
|
||||
trade_style, entry_reason, trigger_price, stop_loss, take_profit,
|
||||
reviewed_stop_loss, reviewed_take_profit, reviewed_at
|
||||
FROM trade_records
|
||||
WHERE {ts_expr} >= ?
|
||||
ORDER BY {ts_expr} DESC
|
||||
LIMIT ?
|
||||
""",
|
||||
(cutoff_s, lim * 2),
|
||||
).fetchall()
|
||||
out: list[dict[str, Any]] = []
|
||||
for row in rows:
|
||||
d = _row_dict(row, row_to_dict_fn)
|
||||
norm = _normalize_archive_trade_row(d, reset_hour=reset_hour)
|
||||
if norm:
|
||||
out.append(norm)
|
||||
if len(out) >= lim:
|
||||
break
|
||||
return out
|
||||
cols = _table_columns(conn, "trade_records")
|
||||
if not cols:
|
||||
records: list[dict[str, Any]] = []
|
||||
else:
|
||||
ts_expr = _archive_ts_expr(cols)
|
||||
sql = f"""
|
||||
SELECT {_archive_trade_select_sql(cols)}
|
||||
FROM trade_records
|
||||
WHERE {ts_expr} >= ?
|
||||
ORDER BY {ts_expr} DESC
|
||||
LIMIT ?
|
||||
"""
|
||||
rows = conn.execute(sql, (cutoff_s, lim * 2)).fetchall()
|
||||
records = []
|
||||
for row in rows:
|
||||
d = _row_dict(row, row_to_dict_fn)
|
||||
norm = _normalize_archive_trade_row(d, reset_hour=reset_hour)
|
||||
if norm:
|
||||
records.append(norm)
|
||||
if len(records) >= lim:
|
||||
break
|
||||
|
||||
if not include_strategy_snapshots:
|
||||
return records
|
||||
|
||||
skip_ids = _existing_trend_plan_ids(conn)
|
||||
for rec in records:
|
||||
try:
|
||||
pid = int(rec.get("trend_plan_id") or 0)
|
||||
except (TypeError, ValueError):
|
||||
pid = 0
|
||||
if pid > 0:
|
||||
skip_ids.add(pid)
|
||||
|
||||
snaps = _fetch_strategy_snapshots_for_archive(
|
||||
conn,
|
||||
days=days,
|
||||
reset_hour=reset_hour,
|
||||
limit=max(0, lim - len(records)),
|
||||
skip_plan_ids=skip_ids,
|
||||
)
|
||||
merged = records + snaps
|
||||
merged.sort(
|
||||
key=lambda x: int(x.get("closed_at_ms") or 0),
|
||||
reverse=True,
|
||||
)
|
||||
return merged[:lim]
|
||||
|
||||
|
||||
def summarize_trades(trades: list[dict]) -> dict[str, Any]:
|
||||
|
||||
Reference in New Issue
Block a user