feat: strategy records dual panels with filters and 100-row cap

Split trend and roll snapshot lists with expandable rows, client filters, and DB prune to latest 100 entries.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-04 10:55:34 +08:00
parent 7037dc2334
commit f1e95afb89
3 changed files with 402 additions and 72 deletions
+133 -1
View File
@@ -7,6 +7,7 @@ from typing import Any, Callable, Optional
STRATEGY_TREND = "trend_pullback"
STRATEGY_ROLL = "roll"
STRATEGY_SNAPSHOTS_MAX_ROWS = 100
STRATEGY_SNAPSHOTS_SQL = """
CREATE TABLE IF NOT EXISTS strategy_trade_snapshots (
@@ -160,6 +161,7 @@ def save_trend_plan_snapshot(
closed_at,
),
)
prune_strategy_snapshots(conn, keep=STRATEGY_SNAPSHOTS_MAX_ROWS)
def save_roll_group_snapshot(
@@ -220,6 +222,125 @@ def save_roll_group_snapshot(
closed_at,
),
)
prune_strategy_snapshots(conn, keep=STRATEGY_SNAPSHOTS_MAX_ROWS)
def prune_strategy_snapshots(conn, *, keep: int = STRATEGY_SNAPSHOTS_MAX_ROWS) -> None:
"""仅保留最近 keep 条策略快照(按 closed_at / id 倒序)。"""
k = max(1, min(int(keep), 500))
conn.execute(
"""DELETE FROM strategy_trade_snapshots
WHERE id NOT IN (
SELECT id FROM strategy_trade_snapshots
ORDER BY COALESCE(closed_at, created_at, '') DESC, id DESC
LIMIT ?
)""",
(k,),
)
def _snapshot_pnl(row: dict, snap: dict) -> float | None:
for key in ("pnl_amount",):
v = row.get(key)
if v is not None and v != "":
try:
return float(v)
except (TypeError, ValueError):
pass
v = snap.get("pnl_amount")
if v is not None and v != "":
try:
return float(v)
except (TypeError, ValueError):
pass
return None
def _trend_dca_stats(snap: dict) -> dict:
levels = snap.get("dca_levels") or build_trend_dca_levels(snap)
dca_only = [
lv
for lv in levels
if (lv.get("leg_key") or "") != "first" and (lv.get("label") or "") != "首仓"
]
done = sum(1 for lv in dca_only if lv.get("status") == "done")
total = len(dca_only)
pending = total - done
if total <= 0:
tag = "na"
elif done <= 0:
tag = "no_dca"
elif done >= total:
tag = "dca_done"
else:
tag = "dca_partial"
return {
"dca_done": done,
"dca_total": total,
"dca_pending": pending,
"dca_tag": tag,
}
def _roll_leg_stats(snap: dict) -> dict:
legs = snap.get("legs") or []
if not isinstance(legs, list):
legs = []
filled = sum(1 for lg in legs if (lg.get("status") or "").lower() == "filled")
total = len(legs)
pending = total - filled
if total <= 0:
tag = "na"
elif filled <= 0:
tag = "no_dca"
elif filled >= total:
tag = "dca_done"
else:
tag = "dca_partial"
return {
"dca_done": filled,
"dca_total": total,
"dca_pending": pending,
"dca_tag": tag,
}
def enrich_strategy_snapshot_row(row: dict) -> dict:
d = dict(row or {})
snap = d.get("snapshot") or {}
st = (d.get("strategy_type") or "").strip()
pnl = _snapshot_pnl(d, snap)
if pnl is not None:
if pnl > 1e-9:
d["filter_pnl"] = "profit"
elif pnl < -1e-9:
d["filter_pnl"] = "loss"
else:
d["filter_pnl"] = "flat"
else:
d["filter_pnl"] = "unknown"
sym = (d.get("symbol") or d.get("exchange_symbol") or "").strip()
d["filter_symbol"] = sym.upper().split("/")[0].split(":")[0] if sym else ""
closed = (d.get("closed_at") or d.get("created_at") or "").strip()
d["sort_ts"] = closed
if st == STRATEGY_TREND:
stats = _trend_dca_stats(snap)
d.update(stats)
legs_txt = (
f"{stats['dca_done']}/{stats['dca_total']}"
if stats["dca_total"] > 0
else "0/0"
)
d["summary_dca"] = legs_txt
else:
stats = _roll_leg_stats(snap)
d.update(stats)
d["summary_dca"] = (
f"{stats['dca_done']}/{stats['dca_total']}"
if stats["dca_total"] > 0
else ""
)
return d
def list_strategy_snapshots(conn, *, limit: int = 200) -> list[dict]:
@@ -237,5 +358,16 @@ def list_strategy_snapshots(conn, *, limit: int = 200) -> list[dict]:
d["snapshot"] = {}
st = (d.get("strategy_type") or "").strip()
d["strategy_label"] = "趋势回调" if st == STRATEGY_TREND else "顺势加仓"
out.append(d)
out.append(enrich_strategy_snapshot_row(d))
return out
def list_strategy_snapshots_split(
conn, *, limit: int = STRATEGY_SNAPSHOTS_MAX_ROWS
) -> tuple[list[dict], list[dict], list[str]]:
"""趋势 / 顺势分组,及筛选用币种列表。"""
all_rows = list_strategy_snapshots(conn, limit=limit)
trend = [r for r in all_rows if (r.get("strategy_type") or "") == STRATEGY_TREND]
roll = [r for r in all_rows if (r.get("strategy_type") or "") == STRATEGY_ROLL]
symbols = sorted({r.get("filter_symbol") or "" for r in all_rows if r.get("filter_symbol")})
return trend, roll, symbols