"""各实例当日平仓记录查询(供 hub_bridge /api/hub/trades/today 与中控 AI 聚合)。""" from __future__ import annotations from datetime import datetime, timedelta from typing import Any, Callable, Optional TRADE_COMPLETED_RESULTS = ( "止盈", "止损", "保本止盈", "移动止盈", "手动平仓", "强制清仓", "外部平仓", ) def trading_day_from_dt(dt: datetime, reset_hour: int = 8) -> str: """与实例 get_trading_day 一致:小时 < reset_hour 归属上一日历日。""" if dt.hour < reset_hour: dt = dt - timedelta(days=1) return dt.strftime("%Y-%m-%d") def current_trading_day(*, now: datetime | None = None, reset_hour: int = 8) -> str: return trading_day_from_dt(now or datetime.now(), reset_hour) def parse_dt_for_trading_day(raw: Any) -> datetime | None: if raw is None: return None s = str(raw).strip().replace("Z", "").replace("T", " ") if not s: return None for fmt, ln in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d %H:%M", 16), ("%Y-%m-%d", 10)): try: return datetime.strptime(s[:ln], fmt) except ValueError: continue return None def trading_day_window_bounds(trading_day: str, reset_hour: int = 8) -> tuple[str, str]: """交易日 [reset_hour, 次日 reset_hour) 对应的北京时间字符串区间(闭区间)。""" day = datetime.strptime((trading_day or "").strip()[:10], "%Y-%m-%d") start = day.replace(hour=reset_hour, minute=0, second=0, microsecond=0) end = start + timedelta(days=1) - timedelta(seconds=1) return start.strftime("%Y-%m-%d %H:%M:%S"), end.strftime("%Y-%m-%d %H:%M:%S") def _row_dict(row, row_to_dict: Optional[Callable] = None) -> dict: if row is None: return {} if row_to_dict: try: return dict(row_to_dict(row)) except Exception: pass try: keys = row.keys() if hasattr(row, "keys") else () if keys: return {k: row[k] for k in keys} except Exception: pass try: return dict(row) except Exception: return {} def _effective_field(d: dict, reviewed_key: str, base_key: str, default: Any = None) -> Any: rv = d.get(reviewed_key) if rv is not None and str(rv).strip() != "": return rv bv = d.get(base_key) if bv is not None and str(bv).strip() != "": return bv return default def _effective_pnl(d: dict) -> float: reviewed = d.get("reviewed_pnl_amount") if reviewed is not None and str(reviewed).strip() != "": try: return float(reviewed) except (TypeError, ValueError): pass ex = d.get("exchange_realized_pnl") if ex is not None and str(ex).strip() != "": try: return float(ex) except (TypeError, ValueError): pass try: return float(d.get("pnl_amount") or 0) except (TypeError, ValueError): return 0.0 def _trade_close_dt(d: dict) -> datetime | None: raw = _effective_field(d, "reviewed_closed_at", "closed_at") if raw is None or str(raw).strip() == "": raw = d.get("created_at") or d.get("opened_at") return parse_dt_for_trading_day(raw) def _normalize_trade_row( d: dict, *, trading_day: str, reset_hour: int, ) -> dict[str, Any] | None: effective_result = str(_effective_field(d, "reviewed_result", "result") or "").strip() if effective_result not in TRADE_COMPLETED_RESULTS: return None close_dt = _trade_close_dt(d) if not close_dt: return None if trading_day_from_dt(close_dt, reset_hour) != trading_day: return None pnl = _effective_pnl(d) closed_at = _effective_field(d, "reviewed_closed_at", "closed_at") opened_at = _effective_field(d, "reviewed_opened_at", "opened_at") return { "symbol": d.get("symbol"), "direction": d.get("direction"), "result": effective_result, "pnl_amount": round(pnl, 4), "closed_at": closed_at, "opened_at": opened_at, "monitor_type": d.get("monitor_type"), "actual_rr": d.get("actual_rr"), "planned_rr": d.get("planned_rr"), "trade_style": d.get("trade_style"), "entry_reason": d.get("entry_reason"), "reviewed": bool(d.get("reviewed_at") or d.get("reviewed_result")), } def fetch_trades_for_trading_day( conn, trading_day: str, *, row_to_dict_fn: Optional[Callable] = None, reset_hour: int = 8, limit: int = 200, ) -> list[dict[str, Any]]: """返回指定交易日的已平仓记录(与 /records 交易记录一致,复盘字段优先)。""" day = (trading_day or "").strip()[:10] if not day: return [] lim = max(1, min(int(limit or 200), 500)) start_bj, end_bj = trading_day_window_bounds(day, reset_hour) ts_expr = "REPLACE(COALESCE(reviewed_closed_at, closed_at, created_at, opened_at), 'T', ' ')" rows = conn.execute( f""" SELECT symbol, direction, result, reviewed_result, pnl_amount, reviewed_pnl_amount, exchange_realized_pnl, closed_at, reviewed_closed_at, opened_at, reviewed_opened_at, created_at, monitor_type, actual_rr, planned_rr, trade_style, entry_reason, reviewed_at FROM trade_records WHERE {ts_expr} >= ? AND {ts_expr} <= ? ORDER BY {ts_expr} ASC LIMIT ? """, (start_bj, end_bj, lim * 3), ).fetchall() out: list[dict[str, Any]] = [] for row in rows: d = _row_dict(row, row_to_dict_fn) norm = _normalize_trade_row(d, trading_day=day, reset_hour=reset_hour) if norm: out.append(norm) if len(out) >= lim: break return out def _normalize_archive_trade_row( d: dict, *, exchange_key: str = "", reset_hour: int = 8, ) -> dict[str, Any] | None: """全历史档案用:已平仓记录(不按交易日截断)。""" effective_result = str(_effective_field(d, "reviewed_result", "result") or "").strip() if effective_result not in TRADE_COMPLETED_RESULTS: return None close_dt = _trade_close_dt(d) if not close_dt: return None pnl = _effective_pnl(d) closed_at = _effective_field(d, "reviewed_closed_at", "closed_at") opened_at = _effective_field(d, "reviewed_opened_at", "opened_at") opened_ms = d.get("opened_at_ms") closed_ms = d.get("closed_at_ms") if opened_ms in (None, ""): odt = parse_dt_for_trading_day(opened_at) opened_ms = int(odt.timestamp() * 1000) if odt else None if closed_ms in (None, ""): cdt = close_dt closed_ms = int(cdt.timestamp() * 1000) if cdt else None try: trade_id = int(d.get("id")) except (TypeError, ValueError): return None return { "id": trade_id, "exchange_key": (exchange_key or "").strip().lower(), "symbol": (d.get("symbol") or "").strip().upper(), "direction": d.get("direction"), "result": effective_result, "pnl_amount": round(pnl, 4), "closed_at": closed_at, "opened_at": opened_at, "opened_at_ms": int(opened_ms) if opened_ms else None, "closed_at_ms": int(closed_ms) if closed_ms else None, "monitor_type": d.get("monitor_type"), "actual_rr": d.get("actual_rr"), "planned_rr": d.get("planned_rr"), "trade_style": d.get("trade_style"), "entry_reason": d.get("entry_reason"), "trigger_price": d.get("trigger_price"), "stop_loss": _effective_field(d, "reviewed_stop_loss", "stop_loss"), "take_profit": _effective_field(d, "reviewed_take_profit", "take_profit"), "reviewed": bool(d.get("reviewed_at") or d.get("reviewed_result")), "trading_day": trading_day_from_dt(close_dt, reset_hour), } _SNAPSHOT_STATUS_TO_RESULT = { "stopped_sl": "止损", "stopped_tp": "止盈", "stopped_manual": "手动平仓", "stopped_external": "外部平仓", } def _table_columns(conn, table: str) -> set[str]: try: rows = conn.execute(f"PRAGMA table_info({table})").fetchall() except Exception: return set() out: set[str] = set() for r in rows: try: out.add(str(r[1])) except (IndexError, TypeError): try: out.add(str(r["name"])) except Exception: continue return out def _archive_ts_expr(cols: set[str]) -> str: parts = [c for c in ("reviewed_closed_at", "closed_at", "created_at", "opened_at") if c in cols] if not parts: return "''" return f"REPLACE(COALESCE({', '.join(parts)}), 'T', ' ')" def _archive_trade_select_sql(cols: set[str]) -> str: wanted = [ "id", "symbol", "direction", "result", "reviewed_result", "pnl_amount", "reviewed_pnl_amount", "exchange_realized_pnl", "closed_at", "reviewed_closed_at", "opened_at", "reviewed_opened_at", "opened_at_ms", "closed_at_ms", "created_at", "monitor_type", "actual_rr", "planned_rr", "trade_style", "entry_reason", "trigger_price", "stop_loss", "take_profit", "reviewed_stop_loss", "reviewed_take_profit", "reviewed_at", "trend_plan_id", ] select_cols = [c for c in wanted if c in cols] if "id" not in select_cols: select_cols = ["id"] + select_cols return ", ".join(select_cols) def _existing_trend_plan_ids(conn) -> set[int]: cols = _table_columns(conn, "trade_records") if "trend_plan_id" not in cols: return set() rows = conn.execute( "SELECT DISTINCT trend_plan_id FROM trade_records WHERE trend_plan_id IS NOT NULL" ).fetchall() out: set[int] = set() for row in rows: d = _row_dict(row) try: out.add(int(d.get("trend_plan_id"))) except (TypeError, ValueError): continue return out def _normalize_snapshot_archive_row( snap: dict, *, reset_hour: int = 8, ) -> dict[str, Any] | None: result = str(snap.get("result_label") or "").strip() if not result: result = _SNAPSHOT_STATUS_TO_RESULT.get( str(snap.get("status_at_close") or "").strip(), "" ) if result not in TRADE_COMPLETED_RESULTS: return None closed_at = snap.get("closed_at") close_dt = parse_dt_for_trading_day(closed_at) if not close_dt: return None opened_at = snap.get("opened_at") opened_ms = _parse_ms_from_row(snap.get("opened_at")) closed_ms = _parse_ms_from_row(closed_at) try: snap_id = int(snap.get("id")) except (TypeError, ValueError): return None try: pnl = float(snap.get("pnl_amount") or 0) except (TypeError, ValueError): pnl = 0.0 st = str(snap.get("strategy_type") or "").strip() monitor_type = "trend_pullback" if st == "trend_pullback" else ("roll" if st == "roll" else st) return { "id": -snap_id, "symbol": (snap.get("symbol") or "").strip().upper(), "direction": snap.get("direction"), "result": result, "pnl_amount": round(pnl, 4), "closed_at": closed_at, "opened_at": opened_at, "opened_at_ms": opened_ms, "closed_at_ms": closed_ms, "monitor_type": monitor_type, "entry_reason": "trend_pullback" if st == "trend_pullback" else monitor_type, "from_snapshot": True, "snapshot_id": snap_id, "trend_plan_id": snap.get("source_id"), "trading_day": trading_day_from_dt(close_dt, reset_hour), } def _parse_ms_from_row(raw: Any) -> int | None: if raw in (None, ""): return None try: if isinstance(raw, (int, float)): v = int(raw) return v if v > 1_000_000_000_000 else v * 1000 except (TypeError, ValueError): pass dt = parse_dt_for_trading_day(raw) return int(dt.timestamp() * 1000) if dt else None def _fetch_strategy_snapshots_for_archive( conn, *, days: int = 365, reset_hour: int = 8, limit: int = 2000, skip_plan_ids: set[int] | None = None, ) -> list[dict[str, Any]]: cols = _table_columns(conn, "strategy_trade_snapshots") if not cols: return [] lim = max(1, min(int(limit or 2000), 5000)) day_span = max(1, min(int(days or 365), 3650)) cutoff = datetime.now() - timedelta(days=day_span) cutoff_s = cutoff.strftime("%Y-%m-%d %H:%M:%S") ts_expr = "REPLACE(COALESCE(closed_at, opened_at, created_at), 'T', ' ')" rows = conn.execute( f""" SELECT * FROM strategy_trade_snapshots WHERE {ts_expr} >= ? ORDER BY {ts_expr} DESC LIMIT ? """, (cutoff_s, lim * 2), ).fetchall() skip = skip_plan_ids or set() out: list[dict[str, Any]] = [] for row in rows: d = _row_dict(row) try: source_id = int(d.get("source_id") or 0) except (TypeError, ValueError): source_id = 0 if source_id > 0 and source_id in skip: continue norm = _normalize_snapshot_archive_row(d, reset_hour=reset_hour) if norm: out.append(norm) if len(out) >= lim: break return out def fetch_trades_for_archive( conn, *, days: int = 365, row_to_dict_fn: Optional[Callable] = None, reset_hour: int = 8, limit: int = 2000, include_strategy_snapshots: bool = True, ) -> list[dict[str, Any]]: """返回近 N 天已平仓记录(trade_records + 未落库的 strategy 快照)。""" lim = max(1, min(int(limit or 2000), 5000)) day_span = max(1, min(int(days or 365), 3650)) cutoff = datetime.now() - timedelta(days=day_span) cutoff_s = cutoff.strftime("%Y-%m-%d %H:%M:%S") cols = _table_columns(conn, "trade_records") if not cols: records: list[dict[str, Any]] = [] else: ts_expr = _archive_ts_expr(cols) sql = f""" SELECT {_archive_trade_select_sql(cols)} FROM trade_records WHERE {ts_expr} >= ? ORDER BY {ts_expr} DESC LIMIT ? """ rows = conn.execute(sql, (cutoff_s, lim * 2)).fetchall() records = [] for row in rows: d = _row_dict(row, row_to_dict_fn) norm = _normalize_archive_trade_row(d, reset_hour=reset_hour) if norm: records.append(norm) if len(records) >= lim: break if not include_strategy_snapshots: return records skip_ids = _existing_trend_plan_ids(conn) for rec in records: try: pid = int(rec.get("trend_plan_id") or 0) except (TypeError, ValueError): pid = 0 if pid > 0: skip_ids.add(pid) snaps = _fetch_strategy_snapshots_for_archive( conn, days=days, reset_hour=reset_hour, limit=max(0, lim - len(records)), skip_plan_ids=skip_ids, ) merged = records + snaps merged.sort( key=lambda x: int(x.get("closed_at_ms") or 0), reverse=True, ) return merged[:lim] def summarize_trades(trades: list[dict]) -> dict[str, Any]: """单笔列表 → 笔数 / 盈亏 / 胜败统计。""" total_pnl = 0.0 win = loss = flat = 0 for t in trades or []: try: pnl = float(t.get("pnl_amount") or 0) except (TypeError, ValueError): pnl = 0.0 total_pnl += pnl if pnl > 1e-9: win += 1 elif pnl < -1e-9: loss += 1 else: flat += 1 return { "closed_count": len(trades or []), "win_count": win, "loss_count": loss, "flat_count": flat, "total_pnl_u": round(total_pnl, 4), }