"""按交易日聚合实例 trade_records 盈亏,供统计分析页日历 API 使用。""" from __future__ import annotations from datetime import datetime, timedelta from typing import Any, Callable def build_trade_stats_calendar( pnls: list[tuple], year: int, month: int, segment_key: str, row_matches_fn: Callable[[Any, str], bool], *, reset_hour: int = 8, ) -> dict[str, Any]: """pnls: _load_completed_trade_pnls 返回值 (pnl, close_dt, trading_day, row)。""" y = int(year) m = int(month) if m < 1 or m > 12: raise ValueError("month 无效") first = f"{y:04d}-{m:02d}-01" if m == 12: next_first = datetime(y + 1, 1, 1) else: next_first = datetime(y, m + 1, 1) last = (next_first - timedelta(days=1)).strftime("%Y-%m-%d") seg = (segment_key or "all").strip() or "all" days: dict[str, dict[str, Any]] = {} for pnl, _close_dt, td, row in pnls: if not td or td < first or td > last: continue if not row_matches_fn(row, seg): continue bucket = days.setdefault( td, { "trading_day": td, "open_count": 0, "pnl_total": 0.0, "turnover_total": 0.0, "commission_total": 0.0, "has_sick": False, "sick_count": 0, }, ) bucket["open_count"] += 1 bucket["pnl_total"] += float(pnl or 0) try: bucket["turnover_total"] += float(row["exchange_turnover_usdt"] or 0) except (TypeError, ValueError, KeyError): pass try: bucket["commission_total"] += float(row["exchange_commission_usdt"] or 0) except (TypeError, ValueError, KeyError): pass for d in days.values(): d["pnl_total"] = round(float(d["pnl_total"]), 4) d["turnover_total"] = round(float(d["turnover_total"]), 4) d["commission_total"] = round(float(d["commission_total"]), 4) month_pnl = sum(float(d["pnl_total"]) for d in days.values()) month_count = sum(int(d["open_count"]) for d in days.values()) return { "year": y, "month": m, "date_from": first, "date_to": last, "segment": seg, "reset_hour": int(reset_hour), "days": days, "month_pnl_total": round(month_pnl, 4), "month_open_count": month_count, }