# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """交易统计计算与缓存结构。""" from __future__ import annotations import calendar import json import threading from datetime import date, datetime from typing import Any, Optional from zoneinfo import ZoneInfo from db_conn import commit_retry, execute_retry _stats_refresh_lock = threading.Lock() TZ = ZoneInfo("Asia/Shanghai") STATS_VIEWS = [ {"key": "by_time", "label": "按时间统计"}, {"key": "by_week", "label": "周统计"}, {"key": "by_month", "label": "月统计"}, {"key": "by_symbol", "label": "按品种统计"}, {"key": "by_fee", "label": "按手续费统计"}, {"key": "by_direction", "label": "按方向统计"}, {"key": "by_trade_type", "label": "按交易类型统计"}, {"key": "by_emotion", "label": "情绪单统计"}, ] BREAKDOWN_COLUMNS = [ {"key": "label", "label": "维度"}, {"key": "count", "label": "交易次数"}, {"key": "wins", "label": "盈利笔数"}, {"key": "losses", "label": "亏损笔数"}, {"key": "win_rate", "label": "胜率(%)"}, {"key": "avg_profit", "label": "平均盈利"}, {"key": "avg_loss", "label": "平均亏损"}, {"key": "profit_loss_ratio", "label": "盈亏比"}, {"key": "total_fee", "label": "累计手续费"}, {"key": "total_net", "label": "净盈亏合计"}, {"key": "max_loss", "label": "最大亏损"}, {"key": "max_profit", "label": "最大盈利"}, ] def _parse_dt(value: str) -> Optional[datetime]: if not value: return None text = value.strip().replace(" ", "T") try: return datetime.fromisoformat(text) except ValueError: return None def _row_dict(row) -> dict: return dict(row) if row is not None else {} def _net_pnl(row: dict) -> float: if row.get("pnl_net") is not None: return float(row["pnl_net"]) pnl = float(row.get("pnl") or 0) fee = float(row.get("fee") or 0) return round(pnl - fee, 2) def _fee(row: dict) -> float: return float(row.get("fee") or 0) def _margin_pct(pnl_net: float, margin: Optional[float]) -> Optional[float]: if margin and margin > 0: return round(pnl_net / margin * 100, 2) return None def _agg_group(rows: list[dict], key_fn) -> list[dict]: groups: dict[str, list[dict]] = {} for row in rows: key = key_fn(row) or "未知" groups.setdefault(key, []).append(row) result = [] for label, items in sorted(groups.items(), key=lambda x: x[0]): result.append(_agg_metrics(label, items)) return result def _agg_metrics(label: str, items: list[dict]) -> dict: nets = [_net_pnl(r) for r in items] wins = [n for n in nets if n > 0] losses = [n for n in nets if n < 0] count = len(items) win_cnt = len(wins) loss_cnt = len(losses) avg_profit = round(sum(wins) / len(wins), 2) if wins else 0.0 avg_loss = round(sum(losses) / len(losses), 2) if losses else 0.0 pl_ratio = round(avg_profit / abs(avg_loss), 2) if wins and losses and avg_loss != 0 else 0.0 total_fee = round(sum(_fee(r) for r in items), 2) total_net = round(sum(nets), 2) max_loss = round(min(losses), 2) if losses else 0.0 max_profit = round(max(wins), 2) if wins else 0.0 win_rate = round(win_cnt / count * 100, 2) if count else 0.0 return { "label": label, "count": count, "wins": win_cnt, "losses": loss_cnt, "win_rate": win_rate, "avg_profit": avg_profit, "avg_loss": avg_loss, "profit_loss_ratio": pl_ratio, "total_fee": total_fee, "total_net": total_net, "max_loss": max_loss, "max_profit": max_profit, } def _max_consecutive_losses(nets: list[float]) -> int: streak = 0 best = 0 for n in nets: if n < 0: streak += 1 best = max(best, streak) else: streak = 0 return best def _max_drawdown(nets: list[float], initial_capital: float) -> tuple[float, float]: equity = initial_capital peak = initial_capital max_dd = 0.0 max_dd_pct = 0.0 for n in nets: equity += n if equity > peak: peak = equity dd = peak - equity if dd > max_dd: max_dd = dd if peak > 0: pct = dd / peak * 100 if pct > max_dd_pct: max_dd_pct = pct return round(max_dd, 2), round(max_dd_pct, 2) def fetch_trade_rows(conn) -> list[dict]: rows = conn.execute( "SELECT * FROM trade_logs ORDER BY close_time ASC, id ASC" ).fetchall() return [_row_dict(r) for r in rows] def fetch_review_rows(conn) -> list[dict]: rows = conn.execute( "SELECT * FROM review_records ORDER BY close_time ASC, id ASC" ).fetchall() return [_row_dict(r) for r in rows] def compute_summary(trades: list[dict], reviews: list[dict], live_capital: float) -> dict: nets = [_net_pnl(t) for t in trades] count = len(trades) wins = [n for n in nets if n > 0] losses = [n for n in nets if n < 0] win_cnt = len(wins) loss_cnt = len(losses) avg_profit = round(sum(wins) / len(wins), 2) if wins else 0.0 avg_loss = round(sum(losses) / len(losses), 2) if losses else 0.0 pl_ratio = round(avg_profit / abs(avg_loss), 2) if wins and losses and avg_loss != 0 else 0.0 total_fee = round(sum(_fee(t) for t in trades) + sum(_fee(r) for r in reviews), 2) max_loss_amt = round(min(losses), 2) if losses else 0.0 max_profit_amt = round(max(wins), 2) if wins else 0.0 margins_loss = [ _margin_pct(_net_pnl(t), t.get("margin")) for t in trades if _net_pnl(t) < 0 and t.get("margin") ] margins_profit = [ _margin_pct(_net_pnl(t), t.get("margin")) for t in trades if _net_pnl(t) > 0 and t.get("margin") ] max_loss_pct = round(min(margins_loss), 2) if margins_loss else 0.0 max_profit_pct = round(max(margins_profit), 2) if margins_profit else 0.0 consec_loss = _max_consecutive_losses(nets) max_dd, max_dd_pct = _max_drawdown(nets, live_capital) emotion_cnt = sum(1 for r in reviews if r.get("is_emotion")) review_cnt = len(reviews) denom = count if count else review_cnt emotion_ratio = round(emotion_cnt / denom * 100, 2) if denom else 0.0 return { "total_trades": count, "win_rate": round(win_cnt / count * 100, 2) if count else 0.0, "avg_profit": avg_profit, "avg_loss": avg_loss, "profit_loss_ratio": pl_ratio, "consecutive_losses": consec_loss, "max_drawdown": max_dd, "max_drawdown_pct": max_dd_pct, "max_loss_amount": max_loss_amt, "max_loss_pct": max_loss_pct, "max_profit_amount": max_profit_amt, "max_profit_pct": max_profit_pct, "total_fee": total_fee, "emotion_count": emotion_cnt, "emotion_ratio": emotion_ratio, "review_count": review_cnt, "win_count": win_cnt, "loss_count": loss_cnt, } def compute_breakdowns(trades: list[dict], reviews: list[dict]) -> dict[str, dict]: def day_key(row: dict) -> str: dt = _parse_dt(row.get("close_time") or row.get("created_at") or "") return dt.date().isoformat() if dt else "未知" def week_key(row: dict) -> str: dt = _parse_dt(row.get("close_time") or row.get("created_at") or "") if not dt: return "未知" iso = dt.isocalendar() return f"{iso.year}-W{iso.week:02d}" def month_key(row: dict) -> str: dt = _parse_dt(row.get("close_time") or row.get("created_at") or "") return dt.strftime("%Y-%m") if dt else "未知" def symbol_key(row: dict) -> str: return row.get("symbol_name") or row.get("symbol") or "未知" def direction_key(row: dict) -> str: d = row.get("direction") or "" return "做多" if d == "long" else ("做空" if d == "short" else d or "未知") def type_key(row: dict) -> str: return row.get("monitor_type") or "未知" by_fee_rows = [] fee_groups = {} for t in trades: key = symbol_key(t) fee_groups.setdefault(key, []).append(t) for label, items in sorted(fee_groups.items()): row = _agg_metrics(label, items) row["avg_fee"] = round(row["total_fee"] / row["count"], 2) if row["count"] else 0.0 by_fee_rows.append(row) emotion_trades = [r for r in reviews if r.get("is_emotion")] non_emotion = [r for r in reviews if not r.get("is_emotion")] emotion_rows = [ _agg_metrics("情绪单", emotion_trades), _agg_metrics("非情绪单", non_emotion), ] fee_columns = BREAKDOWN_COLUMNS + [{"key": "avg_fee", "label": "平均手续费"}] return { "by_time": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, day_key)}, "by_week": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, week_key)}, "by_month": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, month_key)}, "by_symbol": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, symbol_key)}, "by_fee": {"columns": fee_columns, "rows": by_fee_rows}, "by_direction": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, direction_key)}, "by_trade_type": {"columns": BREAKDOWN_COLUMNS, "rows": _agg_group(trades, type_key)}, "by_emotion": {"columns": BREAKDOWN_COLUMNS, "rows": emotion_rows}, } def build_all_stats(conn, live_capital: float = 0.0) -> dict: trades = fetch_trade_rows(conn) reviews = fetch_review_rows(conn) summary = compute_summary(trades, reviews, live_capital) breakdowns = compute_breakdowns(trades, reviews) return { "updated_at": datetime.now(TZ).isoformat(timespec="seconds"), "summary": summary, "views": STATS_VIEWS, "breakdowns": breakdowns, } def save_stats_cache(conn, data: dict) -> None: execute_retry( conn, """INSERT INTO stats_cache (key, data_json, updated_at) VALUES ('all', ?, ?) ON CONFLICT(key) DO UPDATE SET data_json=excluded.data_json, updated_at=excluded.updated_at""", (json.dumps(data, ensure_ascii=False), data["updated_at"]), ) commit_retry(conn) def load_stats_cache(conn) -> Optional[dict]: row = conn.execute( "SELECT data_json FROM stats_cache WHERE key='all'" ).fetchone() if not row: return None try: return json.loads(row["data_json"]) except json.JSONDecodeError: return None def refresh_stats_cache(conn, live_capital: float = 0.0) -> dict: with _stats_refresh_lock: data = build_all_stats(conn, live_capital) save_stats_cache(conn, data) return data def _norm_symbol(symbol: str) -> str: s = (symbol or "").strip().lower() if "." in s: s = s.split(".")[0] return s def _close_day_key(row: dict) -> str: dt = _parse_dt(row.get("close_time") or row.get("created_at") or "") return dt.date().isoformat() if dt else "" def _close_ts(row: dict) -> float: dt = _parse_dt(row.get("close_time") or row.get("created_at") or "") return dt.timestamp() if dt else 0.0 def _direction_label(direction: str) -> str: if direction == "long": return "做多" if direction == "short": return "做空" return direction or "" def _index_reviews_by_day_sym(reviews: list[dict]) -> dict[tuple[str, str], list[dict]]: index: dict[tuple[str, str], list[dict]] = {} for review in reviews: day = _close_day_key(review) if not day: continue sym = _norm_symbol(review.get("symbol") or "") index.setdefault((day, sym), []).append(review) return index def _review_match_score(trade: dict, review: dict) -> float: score = abs(_close_ts(trade) - _close_ts(review)) lots_t = trade.get("lots") lots_r = review.get("lots") if lots_t is not None and lots_r is not None and float(lots_t) != float(lots_r): score += 86400.0 entry_t = trade.get("entry_price") entry_r = review.get("entry_price") if entry_t is not None and entry_r is not None and abs(float(entry_t) - float(entry_r)) > 0.01: score += 3600.0 return score def _find_review_for_trade( trade: dict, review_index: dict[tuple[str, str], list[dict]], used_review_ids: set[int], ) -> Optional[dict]: day = _close_day_key(trade) sym = _norm_symbol(trade.get("symbol") or "") candidates = [ r for r in review_index.get((day, sym), []) if r.get("id") not in used_review_ids ] if not candidates: return None return min(candidates, key=lambda r: _review_match_score(trade, r)) def _format_day_entry( *, trade: Optional[dict] = None, review: Optional[dict] = None, source: str, ) -> dict: row = review if source == "review" and review else trade or review or {} symbol = row.get("symbol") or "" pnl_net = _net_pnl(row) tags = (row.get("behavior_tags") or "").strip() is_emotion = bool(row.get("is_emotion")) return { "source": source, "trade_id": trade.get("id") if trade else None, "review_id": review.get("id") if review else None, "symbol": row.get("symbol_name") or symbol, "symbol_code": symbol, "direction": _direction_label(row.get("direction") or ""), "lots": row.get("lots"), "entry_price": row.get("entry_price"), "close_price": row.get("close_price"), "stop_loss": row.get("stop_loss"), "take_profit": row.get("take_profit"), "open_time": row.get("open_time") or "", "close_time": row.get("close_time") or "", "pnl": row.get("pnl"), "fee": row.get("fee"), "pnl_net": pnl_net, "result": row.get("result") if trade else None, "monitor_type": row.get("monitor_type") if trade else None, "is_emotion": is_emotion, "behavior_tags": tags, "open_type": row.get("open_type") if review else None, "exit_trigger": row.get("exit_trigger") if review else None, "exit_supplement": row.get("exit_supplement") if review else None, "holding_duration": row.get("holding_duration") if review else None, "initial_pnl": row.get("initial_pnl") if review else None, "actual_pnl": row.get("actual_pnl") if review else None, "timeframe": row.get("timeframe") if review else None, "notes": row.get("notes") if review else None, "screenshot": row.get("screenshot") if review else None, } def build_day_detail(trades: list[dict], reviews: list[dict], day: str) -> list[dict]: day_trades = [t for t in trades if _close_day_key(t) == day] day_reviews = [r for r in reviews if _close_day_key(r) == day] review_index = _index_reviews_by_day_sym(day_reviews) used_review_ids: set[int] = set() items: list[dict] = [] for trade in day_trades: review = _find_review_for_trade(trade, review_index, used_review_ids) if review: used_review_ids.add(int(review["id"])) items.append(_format_day_entry(trade=trade, review=review, source="review")) else: items.append(_format_day_entry(trade=trade, source="trade")) for review in day_reviews: if int(review.get("id") or 0) in used_review_ids: continue items.append(_format_day_entry(review=review, source="review")) items.sort(key=lambda x: _close_ts(x), reverse=True) return items def build_calendar_month(trades: list[dict], reviews: list[dict], year: int, month: int) -> dict: review_index = _index_reviews_by_day_sym(reviews) day_map: dict[str, dict] = {} matched_review_ids: dict[str, set[int]] = {} for trade in trades: dt = _parse_dt(trade.get("close_time") or "") if not dt or dt.year != year or dt.month != month: continue day = dt.date().isoformat() bucket = day_map.setdefault( day, { "date": day, "count": 0, "total_net": 0.0, "review_count": 0, "emotion_count": 0, "has_emotion": False, }, ) bucket["count"] += 1 used = matched_review_ids.setdefault(day, set()) review = _find_review_for_trade(trade, review_index, used) if review: rid = int(review["id"]) used.add(rid) bucket["total_net"] = round(bucket["total_net"] + _net_pnl(review), 2) bucket["review_count"] += 1 if review.get("is_emotion"): bucket["emotion_count"] += 1 bucket["has_emotion"] = True else: bucket["total_net"] = round(bucket["total_net"] + _net_pnl(trade), 2) for review in reviews: if not review.get("is_emotion"): continue day = _close_day_key(review) if not day: continue try: dt = date.fromisoformat(day) except ValueError: continue if dt.year != year or dt.month != month: continue bucket = day_map.setdefault( day, { "date": day, "count": 0, "total_net": 0.0, "review_count": 0, "emotion_count": 0, "has_emotion": False, }, ) bucket["has_emotion"] = True rid = int(review.get("id") or 0) if rid and rid not in matched_review_ids.get(day, set()): bucket["emotion_count"] += 1 _, last_day = calendar.monthrange(year, month) days = [] for d in range(1, last_day + 1): iso = date(year, month, d).isoformat() if iso in day_map: row = day_map[iso] row["total_net"] = round(row["total_net"], 2) days.append(row) else: days.append( { "date": iso, "count": 0, "total_net": 0.0, "review_count": 0, "emotion_count": 0, "has_emotion": False, } ) return { "year": year, "month": month, "days": days, "weekday_start": date(year, month, 1).weekday(), } def get_calendar_month(conn, year: int, month: int) -> dict: trades = fetch_trade_rows(conn) reviews = fetch_review_rows(conn) return build_calendar_month(trades, reviews, year, month) def get_calendar_day(conn, day: str) -> dict: trades = fetch_trade_rows(conn) reviews = fetch_review_rows(conn) items = build_day_detail(trades, reviews, day) total_net = round(sum(float(i.get("pnl_net") or 0) for i in items), 2) emotion_count = sum(1 for i in items if i.get("is_emotion")) return { "date": day, "count": len(items), "total_net": total_net, "emotion_count": emotion_count, "items": items, }