Files
crypto_monitor/strategy_records_register.py
T
dekun 3fb2023efb feat: strategy trade snapshots, DCA detail, and hub trend layout
Persist ended trend pullback and roll group snapshots to a unified records page; show replenishment tiers on instance and hub cards with horizontal single-position layout.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-04 10:45:27 +08:00

56 lines
1.8 KiB
Python

"""策略交易记录页:已结束趋势 / 顺势加仓快照(四所统一)。"""
from __future__ import annotations
import json
from typing import Any
from flask import flash, redirect, url_for
from strategy_snapshot_lib import list_strategy_snapshots
def load_strategy_records_page(conn, *, limit: int = 200) -> dict[str, Any]:
snaps = list_strategy_snapshots(conn, limit=limit)
return {"strategy_snapshots": snaps, "strategy_records_limit": limit}
def register_strategy_records(app, cfg: dict[str, Any]) -> None:
login_required = cfg["login_required"]
get_db = cfg["get_db"]
def _lr(f):
return login_required(f)
@_lr
@app.route("/strategy/records")
def strategy_records_page():
m = cfg.get("app_module")
fn = getattr(m, "render_main_page", None)
if not callable(fn):
flash("render_main_page 未配置")
return redirect(url_for("strategy_trading_page"))
return fn("strategy_records")
@_lr
@app.route("/strategy/records/<int:snap_id>")
def strategy_records_detail(snap_id: int):
conn = get_db()
row = conn.execute(
"SELECT * FROM strategy_trade_snapshots WHERE id=?",
(int(snap_id),),
).fetchone()
conn.close()
if not row:
flash("未找到该策略快照")
return redirect(url_for("strategy_records_page"))
try:
snap = json.loads(row["snapshot_json"] or "{}")
except Exception:
snap = {}
dca = snap.get("dca_levels") or []
flash(
f"快照 #{snap_id} {row['strategy_type']} {row['symbol']} "
f"{row['result_label']} · 补仓档 {len(dca)} 项(详情见列表页)"
)
return redirect(url_for("strategy_records_page"))