"""各实例当日平仓记录查询(供 hub_bridge /api/hub/trades/today 与中控 AI 聚合)。""" from __future__ import annotations from datetime import datetime, timedelta from typing import Any, Callable, Optional TRADE_COMPLETED_RESULTS = ( "止盈", "止损", "保本止盈", "移动止盈", "手动平仓", "强制清仓", "外部平仓", ) def trading_day_from_dt(dt: datetime, reset_hour: int = 8) -> str: """与实例 get_trading_day 一致:小时 < reset_hour 归属上一日历日。""" if dt.hour < reset_hour: dt = dt - timedelta(days=1) return dt.strftime("%Y-%m-%d") def current_trading_day(*, now: datetime | None = None, reset_hour: int = 8) -> str: return trading_day_from_dt(now or datetime.now(), reset_hour) def parse_dt_for_trading_day(raw: Any) -> datetime | None: if raw is None: return None s = str(raw).strip().replace("Z", "").replace("T", " ") if not s: return None for fmt, ln in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d %H:%M", 16), ("%Y-%m-%d", 10)): try: return datetime.strptime(s[:ln], fmt) except ValueError: continue return None def trading_day_window_bounds(trading_day: str, reset_hour: int = 8) -> tuple[str, str]: """交易日 [reset_hour, 次日 reset_hour) 对应的北京时间字符串区间(闭区间)。""" day = datetime.strptime((trading_day or "").strip()[:10], "%Y-%m-%d") start = day.replace(hour=reset_hour, minute=0, second=0, microsecond=0) end = start + timedelta(days=1) - timedelta(seconds=1) return start.strftime("%Y-%m-%d %H:%M:%S"), end.strftime("%Y-%m-%d %H:%M:%S") def _row_dict(row, row_to_dict: Optional[Callable] = None) -> dict: if row is None: return {} if row_to_dict: try: return dict(row_to_dict(row)) except Exception: pass try: keys = row.keys() if hasattr(row, "keys") else () if keys: return {k: row[k] for k in keys} except Exception: pass try: return dict(row) except Exception: return {} def _effective_field(d: dict, reviewed_key: str, base_key: str, default: Any = None) -> Any: rv = d.get(reviewed_key) if rv is not None and str(rv).strip() != "": return rv bv = d.get(base_key) if bv is not None and str(bv).strip() != "": return bv return default def _effective_pnl(d: dict) -> float: reviewed = d.get("reviewed_pnl_amount") if reviewed is not None and str(reviewed).strip() != "": try: return float(reviewed) except (TypeError, ValueError): pass ex = d.get("exchange_realized_pnl") if ex is not None and str(ex).strip() != "": try: return float(ex) except (TypeError, ValueError): pass try: return float(d.get("pnl_amount") or 0) except (TypeError, ValueError): return 0.0 def _trade_close_dt(d: dict) -> datetime | None: raw = _effective_field(d, "reviewed_closed_at", "closed_at") if raw is None or str(raw).strip() == "": raw = d.get("created_at") or d.get("opened_at") return parse_dt_for_trading_day(raw) def _normalize_trade_row( d: dict, *, trading_day: str, reset_hour: int, ) -> dict[str, Any] | None: effective_result = str(_effective_field(d, "reviewed_result", "result") or "").strip() if effective_result not in TRADE_COMPLETED_RESULTS: return None close_dt = _trade_close_dt(d) if not close_dt: return None if trading_day_from_dt(close_dt, reset_hour) != trading_day: return None pnl = _effective_pnl(d) closed_at = _effective_field(d, "reviewed_closed_at", "closed_at") opened_at = _effective_field(d, "reviewed_opened_at", "opened_at") return { "symbol": d.get("symbol"), "direction": d.get("direction"), "result": effective_result, "pnl_amount": round(pnl, 4), "closed_at": closed_at, "opened_at": opened_at, "monitor_type": d.get("monitor_type"), "actual_rr": d.get("actual_rr"), "planned_rr": d.get("planned_rr"), "trade_style": d.get("trade_style"), "entry_reason": d.get("entry_reason"), "reviewed": bool(d.get("reviewed_at") or d.get("reviewed_result")), } def fetch_trades_for_trading_day( conn, trading_day: str, *, row_to_dict_fn: Optional[Callable] = None, reset_hour: int = 8, limit: int = 200, ) -> list[dict[str, Any]]: """返回指定交易日的已平仓记录(与 /records 交易记录一致,复盘字段优先)。""" day = (trading_day or "").strip()[:10] if not day: return [] lim = max(1, min(int(limit or 200), 500)) start_bj, end_bj = trading_day_window_bounds(day, reset_hour) ts_expr = "REPLACE(COALESCE(reviewed_closed_at, closed_at, created_at, opened_at), 'T', ' ')" rows = conn.execute( f""" SELECT symbol, direction, result, reviewed_result, pnl_amount, reviewed_pnl_amount, exchange_realized_pnl, closed_at, reviewed_closed_at, opened_at, reviewed_opened_at, created_at, monitor_type, actual_rr, planned_rr, trade_style, entry_reason, reviewed_at FROM trade_records WHERE {ts_expr} >= ? AND {ts_expr} <= ? ORDER BY {ts_expr} ASC LIMIT ? """, (start_bj, end_bj, lim * 3), ).fetchall() out: list[dict[str, Any]] = [] for row in rows: d = _row_dict(row, row_to_dict_fn) norm = _normalize_trade_row(d, trading_day=day, reset_hour=reset_hour) if norm: out.append(norm) if len(out) >= lim: break return out def summarize_trades(trades: list[dict]) -> dict[str, Any]: """单笔列表 → 笔数 / 盈亏 / 胜败统计。""" total_pnl = 0.0 win = loss = flat = 0 for t in trades or []: try: pnl = float(t.get("pnl_amount") or 0) except (TypeError, ValueError): pnl = 0.0 total_pnl += pnl if pnl > 1e-9: win += 1 elif pnl < -1e-9: loss += 1 else: flat += 1 return { "closed_count": len(trades or []), "win_count": win, "loss_count": loss, "flat_count": flat, "total_pnl_u": round(total_pnl, 4), }