"""各实例当日平仓记录查询(供 hub_bridge /api/hub/trades/today 与中控 AI 聚合)。""" from __future__ import annotations from datetime import datetime, timedelta from typing import Any, Callable, Optional def trading_day_from_dt(dt: datetime, reset_hour: int = 8) -> str: """与实例 get_trading_day 一致:小时 < reset_hour 归属上一日历日。""" if dt.hour < reset_hour: dt = dt - timedelta(days=1) return dt.strftime("%Y-%m-%d") def current_trading_day(*, now: datetime | None = None, reset_hour: int = 8) -> str: return trading_day_from_dt(now or datetime.now(), reset_hour) def _row_dict(row, row_to_dict: Optional[Callable] = None) -> dict: if row is None: return {} if row_to_dict: try: return dict(row_to_dict(row)) except Exception: pass try: keys = row.keys() if hasattr(row, "keys") else () if keys: return {k: row[k] for k in keys} except Exception: pass try: return dict(row) except Exception: return {} def fetch_trades_for_trading_day( conn, trading_day: str, *, row_to_dict_fn: Optional[Callable] = None, limit: int = 200, ) -> list[dict[str, Any]]: """返回指定交易日的已平仓记录(优先 session_date,否则 closed_at 日期)。""" day = (trading_day or "").strip()[:10] if not day: return [] lim = max(1, min(int(limit or 200), 500)) rows = conn.execute( f""" SELECT symbol, exchange_symbol, direction, result, pnl_amount, closed_at, opened_at, session_date, monitor_type, actual_rr, planned_rr, trade_style, entry_reason FROM trade_records WHERE ( (session_date IS NOT NULL AND TRIM(session_date) != '' AND session_date = ?) OR ( (session_date IS NULL OR TRIM(session_date) = '') AND closed_at IS NOT NULL AND TRIM(closed_at) != '' AND substr(closed_at, 1, 10) = ? ) ) AND result IS NOT NULL AND TRIM(result) != '' ORDER BY COALESCE(closed_at, opened_at) ASC LIMIT ? """, (day, day, lim), ).fetchall() out: list[dict[str, Any]] = [] for row in rows: d = _row_dict(row, row_to_dict_fn) try: pnl = float(d.get("pnl_amount") or 0) except (TypeError, ValueError): pnl = 0.0 out.append( { "symbol": d.get("symbol"), "exchange_symbol": d.get("exchange_symbol"), "direction": d.get("direction"), "result": d.get("result"), "pnl_amount": round(pnl, 4), "closed_at": d.get("closed_at"), "opened_at": d.get("opened_at"), "session_date": d.get("session_date"), "monitor_type": d.get("monitor_type"), "actual_rr": d.get("actual_rr"), "planned_rr": d.get("planned_rr"), "trade_style": d.get("trade_style"), "entry_reason": d.get("entry_reason"), } ) return out def summarize_trades(trades: list[dict]) -> dict[str, Any]: """单笔列表 → 笔数 / 盈亏 / 胜败统计。""" total_pnl = 0.0 win = loss = flat = 0 for t in trades or []: try: pnl = float(t.get("pnl_amount") or 0) except (TypeError, ValueError): pnl = 0.0 total_pnl += pnl if pnl > 1e-9: win += 1 elif pnl < -1e-9: loss += 1 else: flat += 1 return { "closed_count": len(trades or []), "win_count": win, "loss_count": loss, "flat_count": flat, "total_pnl_u": round(total_pnl, 4), }