"""各实例当日平仓记录查询(供 hub_bridge /api/hub/trades/today 与中控 AI 聚合)。""" from __future__ import annotations from datetime import datetime, timedelta from typing import Any, Callable, Optional from strategy_trade_labels import ( MONITOR_TYPE_ROLL, MONITOR_TYPE_TREND_PULLBACK, entry_reason_for_monitor_type, ) TRADE_COMPLETED_RESULTS = ( "止盈", "止损", "保本止盈", "移动止盈", "手动平仓", "强制清仓", "外部平仓", ) def trading_day_from_dt(dt: datetime, reset_hour: int = 8) -> str: """与实例 get_trading_day 一致:小时 < reset_hour 归属上一日历日。""" if dt.hour < reset_hour: dt = dt - timedelta(days=1) return dt.strftime("%Y-%m-%d") def current_trading_day(*, now: datetime | None = None, reset_hour: int = 8) -> str: return trading_day_from_dt(now or datetime.now(), reset_hour) def parse_dt_for_trading_day(raw: Any) -> datetime | None: if raw is None: return None s = str(raw).strip().replace("Z", "").replace("T", " ") if not s: return None for fmt, ln in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d %H:%M", 16), ("%Y-%m-%d", 10)): try: return datetime.strptime(s[:ln], fmt) except ValueError: continue return None def trading_day_window_bounds(trading_day: str, reset_hour: int = 8) -> tuple[str, str]: """交易日 [reset_hour, 次日 reset_hour) 对应的北京时间字符串区间(闭区间)。""" day = datetime.strptime((trading_day or "").strip()[:10], "%Y-%m-%d") start = day.replace(hour=reset_hour, minute=0, second=0, microsecond=0) end = start + timedelta(days=1) - timedelta(seconds=1) return start.strftime("%Y-%m-%d %H:%M:%S"), end.strftime("%Y-%m-%d %H:%M:%S") def _row_dict(row, row_to_dict: Optional[Callable] = None) -> dict: if row is None: return {} if row_to_dict: try: return dict(row_to_dict(row)) except Exception: pass try: keys = row.keys() if hasattr(row, "keys") else () if keys: return {k: row[k] for k in keys} except Exception: pass try: return dict(row) except Exception: return {} def _effective_field(d: dict, reviewed_key: str, base_key: str, default: Any = None) -> Any: rv = d.get(reviewed_key) if rv is not None and str(rv).strip() != "": return rv bv = d.get(base_key) if bv is not None and str(bv).strip() != "": return bv return default def format_hold_minutes(minutes: Any) -> str: try: total = int(minutes or 0) except (TypeError, ValueError): return "0分钟" if total <= 0: return "0分钟" hours = total // 60 mins = total % 60 if hours: return f"{hours}小时{mins}分钟" return f"{mins}分钟" def _normalize_monitor_type_label(raw: Any) -> str: mt = str(raw or "").strip() if mt in ("trend_pullback", "trend"): return MONITOR_TYPE_TREND_PULLBACK if mt in ("roll",): return MONITOR_TYPE_ROLL return mt def effective_entry_type(d: dict) -> str: """复盘开仓类型优先,与实例交易记录 effective_entry_reason 一致。""" er = _effective_field(d, "reviewed_entry_reason", "entry_reason") if er is not None and str(er).strip(): return str(er).strip() mt = _normalize_monitor_type_label(d.get("monitor_type")) er2 = entry_reason_for_monitor_type(mt) if er2: return er2 kst = str(d.get("key_signal_type") or "").strip() if kst: return kst legacy = str(d.get("entry_type") or "").strip() if legacy and legacy not in ("trend_pullback", "roll", "trend"): return _normalize_monitor_type_label(legacy) or legacy return mt def display_entry_type_label(d: dict) -> str: """档案/列表展示用开仓类型(不回落为「下单监控」若已有复盘或建档类型)。""" label = effective_entry_type(d).strip() if not label: return "—" return _normalize_monitor_type_label(label) or label def effective_hold_minutes( d: dict, *, opened_ms: int | None = None, closed_ms: int | None = None, ) -> int: hm = _effective_field(d, "reviewed_hold_minutes", "hold_minutes") if hm is not None and str(hm).strip() != "": try: return max(0, int(hm)) except (TypeError, ValueError): pass hs = _effective_field(d, "reviewed_hold_seconds", "hold_seconds") if hs is not None and str(hs).strip() != "": try: return max(0, int(int(hs) // 60)) except (TypeError, ValueError): pass oms = opened_ms if opened_ms is not None else d.get("opened_at_ms") cms = closed_ms if closed_ms is not None else d.get("closed_at_ms") try: oms_i = int(oms) if oms not in (None, "") else None cms_i = int(cms) if cms not in (None, "") else None except (TypeError, ValueError): oms_i = cms_i = None if oms_i and cms_i and cms_i > oms_i: return max(0, int((cms_i - oms_i) // 60_000)) return 0 def _effective_pnl(d: dict) -> float: reviewed = d.get("reviewed_pnl_amount") if reviewed is not None and str(reviewed).strip() != "": try: return float(reviewed) except (TypeError, ValueError): pass ex = d.get("exchange_realized_pnl") if ex is not None and str(ex).strip() != "": try: return float(ex) except (TypeError, ValueError): pass try: return float(d.get("pnl_amount") or 0) except (TypeError, ValueError): return 0.0 def _trade_close_dt(d: dict) -> datetime | None: raw = _effective_field(d, "reviewed_closed_at", "closed_at") if raw is None or str(raw).strip() == "": raw = d.get("created_at") or d.get("opened_at") return parse_dt_for_trading_day(raw) def _normalize_trade_row( d: dict, *, trading_day: str, reset_hour: int, ) -> dict[str, Any] | None: effective_result = str(_effective_field(d, "reviewed_result", "result") or "").strip() if effective_result not in TRADE_COMPLETED_RESULTS: return None close_dt = _trade_close_dt(d) if not close_dt: return None if trading_day_from_dt(close_dt, reset_hour) != trading_day: return None pnl = _effective_pnl(d) closed_at = _effective_field(d, "reviewed_closed_at", "closed_at") opened_at = _effective_field(d, "reviewed_opened_at", "opened_at") return { "symbol": d.get("symbol"), "direction": d.get("direction"), "result": effective_result, "pnl_amount": round(pnl, 4), "closed_at": closed_at, "opened_at": opened_at, "monitor_type": d.get("monitor_type"), "actual_rr": d.get("actual_rr"), "planned_rr": d.get("planned_rr"), "trade_style": d.get("trade_style"), "entry_reason": d.get("entry_reason"), "reviewed": bool(d.get("reviewed_at") or d.get("reviewed_result")), } def fetch_trades_for_trading_day( conn, trading_day: str, *, row_to_dict_fn: Optional[Callable] = None, reset_hour: int = 8, limit: int = 200, ) -> list[dict[str, Any]]: """返回指定交易日的已平仓记录(与 /records 交易记录一致,复盘字段优先)。""" day = (trading_day or "").strip()[:10] if not day: return [] lim = max(1, min(int(limit or 200), 500)) start_bj, end_bj = trading_day_window_bounds(day, reset_hour) ts_expr = "REPLACE(COALESCE(reviewed_closed_at, closed_at, created_at, opened_at), 'T', ' ')" rows = conn.execute( f""" SELECT symbol, direction, result, reviewed_result, pnl_amount, reviewed_pnl_amount, exchange_realized_pnl, closed_at, reviewed_closed_at, opened_at, reviewed_opened_at, created_at, monitor_type, actual_rr, planned_rr, trade_style, entry_reason, reviewed_at FROM trade_records WHERE {ts_expr} >= ? AND {ts_expr} <= ? ORDER BY {ts_expr} ASC LIMIT ? """, (start_bj, end_bj, lim * 3), ).fetchall() out: list[dict[str, Any]] = [] for row in rows: d = _row_dict(row, row_to_dict_fn) norm = _normalize_trade_row(d, trading_day=day, reset_hour=reset_hour) if norm: out.append(norm) if len(out) >= lim: break return out def _normalize_archive_trade_row( d: dict, *, exchange_key: str = "", reset_hour: int = 8, ) -> dict[str, Any] | None: """全历史档案用:已平仓记录(不按交易日截断)。""" effective_result = str(_effective_field(d, "reviewed_result", "result") or "").strip() if effective_result not in TRADE_COMPLETED_RESULTS: return None close_dt = _trade_close_dt(d) if not close_dt: return None pnl = _effective_pnl(d) closed_at = _effective_field(d, "reviewed_closed_at", "closed_at") opened_at = _effective_field(d, "reviewed_opened_at", "opened_at") opened_ms = d.get("opened_at_ms") closed_ms = d.get("closed_at_ms") if opened_ms in (None, ""): odt = parse_dt_for_trading_day(opened_at) opened_ms = int(odt.timestamp() * 1000) if odt else None if closed_ms in (None, ""): cdt = close_dt closed_ms = int(cdt.timestamp() * 1000) if cdt else None try: trade_id = int(d.get("id")) except (TypeError, ValueError): return None opened_ms_i = int(opened_ms) if opened_ms else None closed_ms_i = int(closed_ms) if closed_ms else None hold_m = effective_hold_minutes(d, opened_ms=opened_ms_i, closed_ms=closed_ms_i) entry_type = display_entry_type_label(d) reviewed = bool( d.get("reviewed_at") or d.get("reviewed_result") or d.get("reviewed_opened_at") or d.get("reviewed_closed_at") or d.get("reviewed_entry_reason") or d.get("reviewed_hold_minutes") ) return { "id": trade_id, "exchange_key": (exchange_key or "").strip().lower(), "symbol": (d.get("symbol") or "").strip().upper(), "direction": d.get("direction"), "result": effective_result, "pnl_amount": round(pnl, 4), "closed_at": closed_at, "opened_at": opened_at, "opened_at_ms": opened_ms_i, "closed_at_ms": closed_ms_i, "monitor_type": _normalize_monitor_type_label(d.get("monitor_type")), "entry_type": entry_type, "entry_reason": entry_type, "hold_minutes": hold_m, "hold_minutes_text": format_hold_minutes(hold_m), "actual_rr": d.get("actual_rr"), "planned_rr": d.get("planned_rr"), "trade_style": d.get("trade_style"), "trigger_price": d.get("trigger_price"), "stop_loss": _effective_field(d, "reviewed_stop_loss", "stop_loss"), "take_profit": _effective_field(d, "reviewed_take_profit", "take_profit"), "reviewed": reviewed, "trading_day": trading_day_from_dt(close_dt, reset_hour), } _SNAPSHOT_STATUS_TO_RESULT = { "stopped_sl": "止损", "stopped_tp": "止盈", "stopped_manual": "手动平仓", "stopped_external": "外部平仓", } def _table_columns(conn, table: str) -> set[str]: try: rows = conn.execute(f"PRAGMA table_info({table})").fetchall() except Exception: return set() out: set[str] = set() for r in rows: try: out.add(str(r[1])) except (IndexError, TypeError): try: out.add(str(r["name"])) except Exception: continue return out def _archive_ts_expr(cols: set[str]) -> str: parts = [c for c in ("reviewed_closed_at", "closed_at", "created_at", "opened_at") if c in cols] if not parts: return "''" return f"REPLACE(COALESCE({', '.join(parts)}), 'T', ' ')" def _archive_trade_select_sql(cols: set[str]) -> str: wanted = [ "id", "symbol", "direction", "result", "reviewed_result", "pnl_amount", "reviewed_pnl_amount", "exchange_realized_pnl", "closed_at", "reviewed_closed_at", "opened_at", "reviewed_opened_at", "opened_at_ms", "closed_at_ms", "created_at", "monitor_type", "key_signal_type", "actual_rr", "planned_rr", "trade_style", "entry_reason", "reviewed_entry_reason", "hold_minutes", "reviewed_hold_minutes", "hold_seconds", "reviewed_hold_seconds", "trigger_price", "stop_loss", "take_profit", "reviewed_stop_loss", "reviewed_take_profit", "reviewed_at", "trend_plan_id", ] select_cols = [c for c in wanted if c in cols] if "id" not in select_cols: select_cols = ["id"] + select_cols return ", ".join(select_cols) def _existing_trend_plan_ids(conn) -> set[int]: cols = _table_columns(conn, "trade_records") if "trend_plan_id" not in cols: return set() rows = conn.execute( "SELECT DISTINCT trend_plan_id FROM trade_records WHERE trend_plan_id IS NOT NULL" ).fetchall() out: set[int] = set() for row in rows: d = _row_dict(row) try: out.add(int(d.get("trend_plan_id"))) except (TypeError, ValueError): continue return out def _normalize_snapshot_archive_row( snap: dict, *, reset_hour: int = 8, ) -> dict[str, Any] | None: result = str(snap.get("result_label") or "").strip() if not result: result = _SNAPSHOT_STATUS_TO_RESULT.get( str(snap.get("status_at_close") or "").strip(), "" ) if result not in TRADE_COMPLETED_RESULTS: return None closed_at = snap.get("closed_at") close_dt = parse_dt_for_trading_day(closed_at) if not close_dt: return None opened_at = snap.get("opened_at") opened_ms = _parse_ms_from_row(snap.get("opened_at")) closed_ms = _parse_ms_from_row(closed_at) try: snap_id = int(snap.get("id")) except (TypeError, ValueError): return None try: pnl = float(snap.get("pnl_amount") or 0) except (TypeError, ValueError): pnl = 0.0 st = str(snap.get("strategy_type") or "").strip() monitor_type = _normalize_monitor_type_label( "trend_pullback" if st == "trend_pullback" else ("roll" if st == "roll" else st) ) hold_m = effective_hold_minutes( {}, opened_ms=opened_ms, closed_ms=closed_ms, ) entry_type = entry_reason_for_monitor_type(monitor_type) or monitor_type return { "id": -snap_id, "symbol": (snap.get("symbol") or "").strip().upper(), "direction": snap.get("direction"), "result": result, "pnl_amount": round(pnl, 4), "closed_at": closed_at, "opened_at": opened_at, "opened_at_ms": opened_ms, "closed_at_ms": closed_ms, "monitor_type": monitor_type, "entry_type": entry_type, "entry_reason": entry_type, "hold_minutes": hold_m, "hold_minutes_text": format_hold_minutes(hold_m), "from_snapshot": True, "snapshot_id": snap_id, "trend_plan_id": snap.get("source_id"), "reviewed": False, "trading_day": trading_day_from_dt(close_dt, reset_hour), } def _parse_ms_from_row(raw: Any) -> int | None: if raw in (None, ""): return None try: if isinstance(raw, (int, float)): v = int(raw) return v if v > 1_000_000_000_000 else v * 1000 except (TypeError, ValueError): pass dt = parse_dt_for_trading_day(raw) return int(dt.timestamp() * 1000) if dt else None def _fetch_strategy_snapshots_for_archive( conn, *, days: int = 365, reset_hour: int = 8, limit: int = 2000, skip_plan_ids: set[int] | None = None, ) -> list[dict[str, Any]]: cols = _table_columns(conn, "strategy_trade_snapshots") if not cols: return [] lim = max(1, min(int(limit or 2000), 5000)) day_span = max(1, min(int(days or 365), 3650)) cutoff = datetime.now() - timedelta(days=day_span) cutoff_s = cutoff.strftime("%Y-%m-%d %H:%M:%S") ts_expr = "REPLACE(COALESCE(closed_at, opened_at, created_at), 'T', ' ')" rows = conn.execute( f""" SELECT * FROM strategy_trade_snapshots WHERE {ts_expr} >= ? ORDER BY {ts_expr} DESC LIMIT ? """, (cutoff_s, lim * 2), ).fetchall() skip = skip_plan_ids or set() out: list[dict[str, Any]] = [] for row in rows: d = _row_dict(row) try: source_id = int(d.get("source_id") or 0) except (TypeError, ValueError): source_id = 0 if source_id > 0 and source_id in skip: continue norm = _normalize_snapshot_archive_row(d, reset_hour=reset_hour) if norm: out.append(norm) if len(out) >= lim: break return out def fetch_trades_for_archive( conn, *, days: int = 365, row_to_dict_fn: Optional[Callable] = None, reset_hour: int = 8, limit: int = 2000, include_strategy_snapshots: bool = True, ) -> list[dict[str, Any]]: """返回近 N 天已平仓记录(trade_records + 未落库的 strategy 快照)。""" lim = max(1, min(int(limit or 2000), 5000)) day_span = max(1, min(int(days or 365), 3650)) cutoff = datetime.now() - timedelta(days=day_span) cutoff_s = cutoff.strftime("%Y-%m-%d %H:%M:%S") cols = _table_columns(conn, "trade_records") if not cols: records: list[dict[str, Any]] = [] else: ts_expr = _archive_ts_expr(cols) sql = f""" SELECT {_archive_trade_select_sql(cols)} FROM trade_records WHERE {ts_expr} >= ? ORDER BY {ts_expr} DESC LIMIT ? """ rows = conn.execute(sql, (cutoff_s, lim * 2)).fetchall() records = [] for row in rows: d = _row_dict(row, row_to_dict_fn) norm = _normalize_archive_trade_row(d, reset_hour=reset_hour) if norm: records.append(norm) if len(records) >= lim: break if not include_strategy_snapshots: return records skip_ids = _existing_trend_plan_ids(conn) for rec in records: try: pid = int(rec.get("trend_plan_id") or 0) except (TypeError, ValueError): pid = 0 if pid > 0: skip_ids.add(pid) snaps = _fetch_strategy_snapshots_for_archive( conn, days=days, reset_hour=reset_hour, limit=max(0, lim - len(records)), skip_plan_ids=skip_ids, ) merged = records + snaps merged.sort( key=lambda x: int(x.get("closed_at_ms") or 0), reverse=True, ) return merged[:lim] def summarize_trades(trades: list[dict]) -> dict[str, Any]: """单笔列表 → 笔数 / 盈亏 / 胜败统计。""" total_pnl = 0.0 win = loss = flat = 0 for t in trades or []: try: pnl = float(t.get("pnl_amount") or 0) except (TypeError, ValueError): pnl = 0.0 total_pnl += pnl if pnl > 1e-9: win += 1 elif pnl < -1e-9: loss += 1 else: flat += 1 return { "closed_count": len(trades or []), "win_count": win, "loss_count": loss, "flat_count": flat, "total_pnl_u": round(total_pnl, 4), }