重构统计分析页:汇总指标、分项下拉与后台缓存

新增 stats_engine 与 stats_cache,提供 API 自动加载 8 种统计维度;交易与复盘变更时自动刷新缓存。

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-15 16:46:06 +08:00
parent 0e385b057d
commit e8b4dbbaca
4 changed files with 581 additions and 179 deletions
+310
View File
@@ -0,0 +1,310 @@
"""交易统计计算与缓存结构。"""
from __future__ import annotations
import json
from datetime import datetime
from typing import Any, Optional
from zoneinfo import ZoneInfo
TZ = ZoneInfo("Asia/Shanghai")
STATS_VIEWS = [
{"key": "by_time", "label": "按时间统计"},
{"key": "by_week", "label": "周统计"},
{"key": "by_month", "label": "月统计"},
{"key": "by_symbol", "label": "按品种统计"},
{"key": "by_fee", "label": "按手续费统计"},
{"key": "by_direction", "label": "按方向统计"},
{"key": "by_trade_type", "label": "按交易类型统计"},
{"key": "by_emotion", "label": "情绪单统计"},
]
BREAKDOWN_COLUMNS = [
{"key": "label", "label": "维度"},
{"key": "count", "label": "交易次数"},
{"key": "wins", "label": "盈利笔数"},
{"key": "losses", "label": "亏损笔数"},
{"key": "win_rate", "label": "胜率(%)"},
{"key": "avg_profit", "label": "平均盈利"},
{"key": "avg_loss", "label": "平均亏损"},
{"key": "profit_loss_ratio", "label": "盈亏比"},
{"key": "total_fee", "label": "累计手续费"},
{"key": "total_net", "label": "净盈亏合计"},
{"key": "max_loss", "label": "最大亏损"},
{"key": "max_profit", "label": "最大盈利"},
]
def _parse_dt(value: str) -> Optional[datetime]:
if not value:
return None
text = value.strip().replace(" ", "T")
try:
return datetime.fromisoformat(text)
except ValueError:
return None
def _row_dict(row) -> dict:
return dict(row) if row is not None else {}
def _net_pnl(row: dict) -> float:
if row.get("pnl_net") is not None:
return float(row["pnl_net"])
pnl = float(row.get("pnl") or 0)
fee = float(row.get("fee") or 0)
return round(pnl - fee, 2)
def _fee(row: dict) -> float:
return float(row.get("fee") or 0)
def _margin_pct(pnl_net: float, margin: Optional[float]) -> Optional[float]:
if margin and margin > 0:
return round(pnl_net / margin * 100, 2)
return None
def _agg_group(rows: list[dict], key_fn) -> list[dict]:
groups: dict[str, list[dict]] = {}
for row in rows:
key = key_fn(row) or "未知"
groups.setdefault(key, []).append(row)
result = []
for label, items in sorted(groups.items(), key=lambda x: x[0]):
result.append(_agg_metrics(label, items))
return result
def _agg_metrics(label: str, items: list[dict]) -> dict:
nets = [_net_pnl(r) for r in items]
wins = [n for n in nets if n > 0]
losses = [n for n in nets if n < 0]
count = len(items)
win_cnt = len(wins)
loss_cnt = len(losses)
avg_profit = round(sum(wins) / len(wins), 2) if wins else 0.0
avg_loss = round(sum(losses) / len(losses), 2) if losses else 0.0
pl_ratio = round(avg_profit / abs(avg_loss), 2) if wins and losses and avg_loss != 0 else 0.0
total_fee = round(sum(_fee(r) for r in items), 2)
total_net = round(sum(nets), 2)
max_loss = round(min(nets), 2) if nets else 0.0
max_profit = round(max(nets), 2) if nets else 0.0
win_rate = round(win_cnt / count * 100, 2) if count else 0.0
return {
"label": label,
"count": count,
"wins": win_cnt,
"losses": loss_cnt,
"win_rate": win_rate,
"avg_profit": avg_profit,
"avg_loss": avg_loss,
"profit_loss_ratio": pl_ratio,
"total_fee": total_fee,
"total_net": total_net,
"max_loss": max_loss,
"max_profit": max_profit,
}
def _max_consecutive_losses(nets: list[float]) -> int:
streak = 0
best = 0
for n in nets:
if n < 0:
streak += 1
best = max(best, streak)
else:
streak = 0
return best
def _max_drawdown(nets: list[float], initial_capital: float) -> tuple[float, float]:
equity = initial_capital
peak = initial_capital
max_dd = 0.0
max_dd_pct = 0.0
for n in nets:
equity += n
if equity > peak:
peak = equity
dd = peak - equity
if dd > max_dd:
max_dd = dd
if peak > 0:
pct = dd / peak * 100
if pct > max_dd_pct:
max_dd_pct = pct
return round(max_dd, 2), round(max_dd_pct, 2)
def fetch_trade_rows(conn) -> list[dict]:
rows = conn.execute(
"SELECT * FROM trade_logs ORDER BY close_time ASC, id ASC"
).fetchall()
return [_row_dict(r) for r in rows]
def fetch_review_rows(conn) -> list[dict]:
rows = conn.execute(
"SELECT * FROM review_records ORDER BY close_time ASC, id ASC"
).fetchall()
return [_row_dict(r) for r in rows]
def compute_summary(trades: list[dict], reviews: list[dict], live_capital: float) -> dict:
nets = [_net_pnl(t) for t in trades]
count = len(trades)
wins = [n for n in nets if n > 0]
losses = [n for n in nets if n < 0]
win_cnt = len(wins)
loss_cnt = len(losses)
avg_profit = round(sum(wins) / len(wins), 2) if wins else 0.0
avg_loss = round(sum(losses) / len(losses), 2) if losses else 0.0
pl_ratio = round(avg_profit / abs(avg_loss), 2) if wins and losses and avg_loss != 0 else 0.0
total_fee = round(sum(_fee(t) for t in trades) + sum(_fee(r) for r in reviews), 2)
max_loss_amt = round(min(nets), 2) if nets else 0.0
max_profit_amt = round(max(nets), 2) if nets else 0.0
margins_loss = [
_margin_pct(_net_pnl(t), t.get("margin"))
for t in trades
if _net_pnl(t) < 0 and t.get("margin")
]
margins_profit = [
_margin_pct(_net_pnl(t), t.get("margin"))
for t in trades
if _net_pnl(t) > 0 and t.get("margin")
]
max_loss_pct = round(min(margins_loss), 2) if margins_loss else 0.0
max_profit_pct = round(max(margins_profit), 2) if margins_profit else 0.0
consec_loss = _max_consecutive_losses(nets)
max_dd, max_dd_pct = _max_drawdown(nets, live_capital)
emotion_cnt = sum(1 for r in reviews if r.get("is_emotion"))
review_cnt = len(reviews)
denom = count if count else review_cnt
emotion_ratio = round(emotion_cnt / denom * 100, 2) if denom else 0.0
return {
"total_trades": count,
"win_rate": round(win_cnt / count * 100, 2) if count else 0.0,
"avg_profit": avg_profit,
"avg_loss": avg_loss,
"profit_loss_ratio": pl_ratio,
"consecutive_losses": consec_loss,
"max_drawdown": max_dd,
"max_drawdown_pct": max_dd_pct,
"max_loss_amount": max_loss_amt,
"max_loss_pct": max_loss_pct,
"max_profit_amount": max_profit_amt,
"max_profit_pct": max_profit_pct,
"total_fee": total_fee,
"emotion_count": emotion_cnt,
"emotion_ratio": emotion_ratio,
"review_count": review_cnt,
"win_count": win_cnt,
"loss_count": loss_cnt,
}
def compute_breakdowns(trades: list[dict], reviews: list[dict]) -> dict[str, dict]:
def day_key(row: dict) -> str:
dt = _parse_dt(row.get("close_time") or row.get("created_at") or "")
return dt.date().isoformat() if dt else "未知"
def week_key(row: dict) -> str:
dt = _parse_dt(row.get("close_time") or row.get("created_at") or "")
if not dt:
return "未知"
iso = dt.isocalendar()
return f"{iso.year}-W{iso.week:02d}"
def month_key(row: dict) -> str:
dt = _parse_dt(row.get("close_time") or row.get("created_at") or "")
return dt.strftime("%Y-%m") if dt else "未知"
def symbol_key(row: dict) -> str:
return row.get("symbol_name") or row.get("symbol") or "未知"
def direction_key(row: dict) -> str:
d = row.get("direction") or ""
return "做多" if d == "long" else ("做空" if d == "short" else d or "未知")
def type_key(row: dict) -> str:
return row.get("monitor_type") or "未知"
by_fee_rows = []
fee_groups = {}
for t in trades:
key = symbol_key(t)
fee_groups.setdefault(key, []).append(t)
for label, items in sorted(fee_groups.items()):
row = _agg_metrics(label, items)
row["avg_fee"] = round(row["total_fee"] / row["count"], 2) if row["count"] else 0.0
by_fee_rows.append(row)
emotion_trades = [r for r in reviews if r.get("is_emotion")]
non_emotion = [r for r in reviews if not r.get("is_emotion")]
emotion_rows = [
_agg_metrics("情绪单", emotion_trades),
_agg_metrics("非情绪单", non_emotion),
]
fee_columns = BREAKDOWN_COLUMNS + [{"key": "avg_fee", "label": "平均手续费"}]
return {
"by_time": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, day_key)},
"by_week": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, week_key)},
"by_month": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, month_key)},
"by_symbol": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, symbol_key)},
"by_fee": {"columns": fee_columns, "rows": by_fee_rows},
"by_direction": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, direction_key)},
"by_trade_type": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, type_key)},
"by_emotion": {"columns": BREAKDOWN_COLUMNS, "rows": emotion_rows},
}
def build_all_stats(conn, live_capital: float = 0.0) -> dict:
trades = fetch_trade_rows(conn)
reviews = fetch_review_rows(conn)
summary = compute_summary(trades, reviews, live_capital)
breakdowns = compute_breakdowns(trades, reviews)
return {
"updated_at": datetime.now(TZ).isoformat(timespec="seconds"),
"summary": summary,
"views": STATS_VIEWS,
"breakdowns": breakdowns,
}
def save_stats_cache(conn, data: dict) -> None:
conn.execute(
"""INSERT INTO stats_cache (key, data_json, updated_at)
VALUES ('all', ?, ?)
ON CONFLICT(key) DO UPDATE SET data_json=excluded.data_json, updated_at=excluded.updated_at""",
(json.dumps(data, ensure_ascii=False), data["updated_at"]),
)
conn.commit()
def load_stats_cache(conn) -> Optional[dict]:
row = conn.execute(
"SELECT data_json FROM stats_cache WHERE key='all'"
).fetchone()
if not row:
return None
try:
return json.loads(row["data_json"])
except json.JSONDecodeError:
return None
def refresh_stats_cache(conn, live_capital: float = 0.0) -> dict:
data = build_all_stats(conn, live_capital)
save_stats_cache(conn, data)
return data