"""策略结束快照:趋势回调 / 顺势加仓(四所共用)。""" from __future__ import annotations import json from datetime import datetime, timezone from typing import Any, Callable, Optional STRATEGY_TREND = "trend_pullback" STRATEGY_ROLL = "roll" STRATEGY_SNAPSHOTS_MAX_ROWS = 100 STRATEGY_SNAPSHOTS_SQL = """ CREATE TABLE IF NOT EXISTS strategy_trade_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, strategy_type TEXT NOT NULL, source_id INTEGER, symbol TEXT, exchange_symbol TEXT, direction TEXT, result_label TEXT, status_at_close TEXT, opened_at TEXT, closed_at TEXT, pnl_amount REAL, snapshot_json TEXT NOT NULL, created_at TEXT ) """ def init_strategy_snapshot_table(conn) -> None: conn.execute(STRATEGY_SNAPSHOTS_SQL) conn.execute( "CREATE INDEX IF NOT EXISTS idx_strategy_snapshots_closed " "ON strategy_trade_snapshots(closed_at DESC)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_strategy_snapshots_type " "ON strategy_trade_snapshots(strategy_type, source_id)" ) def _row_dict(row) -> dict: if row is None: return {} try: return dict(row) except Exception: return {} def _json_dumps(obj: Any) -> str: return json.dumps(obj, ensure_ascii=False, separators=(",", ":")) def build_trend_dca_levels(plan: dict) -> list[dict]: """首仓 + 补仓档位列表(供策略页 / 中控)。""" out: list[dict] = [] p = plan or {} try: legs_done = int(p.get("legs_done") or 0) except (TypeError, ValueError): legs_done = 0 try: dca_legs = int(p.get("dca_legs") or 0) except (TypeError, ValueError): dca_legs = 0 first_done = int(p.get("first_order_done") or 0) != 0 try: grid = json.loads(p.get("grid_prices_json") or "[]") if not isinstance(grid, list): grid = [] except Exception: grid = [] try: leg_amounts = json.loads(p.get("leg_amounts_json") or "[]") if not isinstance(leg_amounts, list): leg_amounts = [] except Exception: leg_amounts = [] out.append( { "i": 0, "leg_key": "first", "label": "首仓", "price": None, "contracts": p.get("first_order_amount"), "status": "done" if first_done else "pending", "status_label": "已开仓" if first_done else "待开仓", } ) n = max(len(grid), len(leg_amounts), dca_legs) for idx in range(n): leg_i = idx + 1 price = grid[idx] if idx < len(grid) else None contracts = leg_amounts[idx] if idx < len(leg_amounts) else None done = leg_i <= legs_done out.append( { "i": leg_i, "leg_key": f"dca_{leg_i}", "label": f"补仓{leg_i}", "price": price, "contracts": contracts, "status": "done" if done else "pending", "status_label": "已补仓" if done else "待补仓", } ) return out def attach_trend_dca_levels(plan: dict) -> dict: d = dict(plan or {}) d["dca_levels"] = build_trend_dca_levels(d) return d def save_trend_plan_snapshot( cfg: dict, conn, plan_row: Any, *, result_label: str, exit_price: float | None = None, pnl_amount: float | None = None, ) -> None: init_strategy_snapshot_table(conn) row = _row_dict(plan_row) plan_id = int(row.get("id") or 0) if plan_id <= 0: return m = cfg.get("app_module") closed_at = ( m.app_now_str() if m is not None and hasattr(m, "app_now_str") else datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") ) payload = attach_trend_dca_levels(row) payload["result_label"] = result_label payload["exit_price"] = exit_price payload["pnl_amount"] = pnl_amount payload["status_at_close"] = row.get("status") conn.execute( """INSERT INTO strategy_trade_snapshots ( strategy_type, source_id, symbol, exchange_symbol, direction, result_label, status_at_close, opened_at, closed_at, pnl_amount, snapshot_json, created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", ( STRATEGY_TREND, plan_id, row.get("symbol"), row.get("exchange_symbol"), row.get("direction"), result_label, row.get("status"), row.get("opened_at"), closed_at, pnl_amount, _json_dumps(payload), closed_at, ), ) prune_strategy_snapshots(conn, keep=STRATEGY_SNAPSHOTS_MAX_ROWS) def save_roll_group_snapshot( cfg: dict, conn, group: dict, *, result_label: str = "结束", pnl_amount: float | None = None, ) -> None: init_strategy_snapshot_table(conn) g = dict(group or {}) gid = int(g.get("id") or 0) if gid <= 0: return legs = [] for leg in conn.execute( "SELECT * FROM roll_legs WHERE roll_group_id=? ORDER BY leg_index ASC, id ASC", (gid,), ).fetchall(): ld = _row_dict(leg) try: from strategy_roll_monitor_lib import roll_leg_status_label ld["status_label"] = roll_leg_status_label(ld.get("status")) except Exception: ld["status_label"] = ld.get("status") or "" legs.append(ld) m = cfg.get("app_module") closed_at = ( m.app_now_str() if m is not None and hasattr(m, "app_now_str") else datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") ) payload = { "group": g, "legs": legs, "result_label": result_label, "pnl_amount": pnl_amount, } conn.execute( """INSERT INTO strategy_trade_snapshots ( strategy_type, source_id, symbol, exchange_symbol, direction, result_label, status_at_close, opened_at, closed_at, pnl_amount, snapshot_json, created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", ( STRATEGY_ROLL, gid, g.get("symbol"), g.get("exchange_symbol"), g.get("direction"), result_label, g.get("status"), g.get("created_at"), closed_at, pnl_amount, _json_dumps(payload), closed_at, ), ) prune_strategy_snapshots(conn, keep=STRATEGY_SNAPSHOTS_MAX_ROWS) def prune_strategy_snapshots(conn, *, keep: int = STRATEGY_SNAPSHOTS_MAX_ROWS) -> None: """仅保留最近 keep 条策略快照(按 closed_at / id 倒序)。""" k = max(1, min(int(keep), 500)) conn.execute( """DELETE FROM strategy_trade_snapshots WHERE id NOT IN ( SELECT id FROM strategy_trade_snapshots ORDER BY COALESCE(closed_at, created_at, '') DESC, id DESC LIMIT ? )""", (k,), ) def _snapshot_pnl(row: dict, snap: dict) -> float | None: for key in ("pnl_amount",): v = row.get(key) if v is not None and v != "": try: return float(v) except (TypeError, ValueError): pass v = snap.get("pnl_amount") if v is not None and v != "": try: return float(v) except (TypeError, ValueError): pass return None def _trend_dca_stats(snap: dict) -> dict: levels = snap.get("dca_levels") or build_trend_dca_levels(snap) dca_only = [ lv for lv in levels if (lv.get("leg_key") or "") != "first" and (lv.get("label") or "") != "首仓" ] done = sum(1 for lv in dca_only if lv.get("status") == "done") total = len(dca_only) pending = total - done if total <= 0: tag = "na" elif done <= 0: tag = "no_dca" elif done >= total: tag = "dca_done" else: tag = "dca_partial" return { "dca_done": done, "dca_total": total, "dca_pending": pending, "dca_tag": tag, } def _roll_leg_stats(snap: dict) -> dict: legs = snap.get("legs") or [] if not isinstance(legs, list): legs = [] filled = sum(1 for lg in legs if (lg.get("status") or "").lower() == "filled") total = len(legs) pending = total - filled if total <= 0: tag = "na" elif filled <= 0: tag = "no_dca" elif filled >= total: tag = "dca_done" else: tag = "dca_partial" return { "dca_done": filled, "dca_total": total, "dca_pending": pending, "dca_tag": tag, } def enrich_strategy_snapshot_row(row: dict) -> dict: d = dict(row or {}) snap = d.get("snapshot") or {} st = (d.get("strategy_type") or "").strip() pnl = _snapshot_pnl(d, snap) if pnl is not None: if pnl > 1e-9: d["filter_pnl"] = "profit" elif pnl < -1e-9: d["filter_pnl"] = "loss" else: d["filter_pnl"] = "flat" else: d["filter_pnl"] = "unknown" sym = (d.get("symbol") or d.get("exchange_symbol") or "").strip() d["filter_symbol"] = sym.upper().split("/")[0].split(":")[0] if sym else "" closed = (d.get("closed_at") or d.get("created_at") or "").strip() d["sort_ts"] = closed if st == STRATEGY_TREND: stats = _trend_dca_stats(snap) d.update(stats) legs_txt = ( f"{stats['dca_done']}/{stats['dca_total']}" if stats["dca_total"] > 0 else "0/0" ) d["summary_dca"] = legs_txt else: stats = _roll_leg_stats(snap) d.update(stats) d["summary_dca"] = ( f"{stats['dca_done']}/{stats['dca_total']}腿" if stats["dca_total"] > 0 else "—" ) return d def list_strategy_snapshots(conn, *, limit: int = 200) -> list[dict]: init_strategy_snapshot_table(conn) rows = conn.execute( "SELECT * FROM strategy_trade_snapshots ORDER BY id DESC LIMIT ?", (max(1, min(int(limit), 500)),), ).fetchall() out = [] for r in rows: d = _row_dict(r) try: d["snapshot"] = json.loads(d.get("snapshot_json") or "{}") except Exception: d["snapshot"] = {} st = (d.get("strategy_type") or "").strip() d["strategy_label"] = "趋势回调" if st == STRATEGY_TREND else "顺势加仓" out.append(enrich_strategy_snapshot_row(d)) return out def list_strategy_snapshots_split( conn, *, limit: int = STRATEGY_SNAPSHOTS_MAX_ROWS ) -> tuple[list[dict], list[dict], list[str]]: """趋势 / 顺势分组,及筛选用币种列表。""" all_rows = list_strategy_snapshots(conn, limit=limit) trend = [r for r in all_rows if (r.get("strategy_type") or "") == STRATEGY_TREND] roll = [r for r in all_rows if (r.get("strategy_type") or "") == STRATEGY_ROLL] symbols = sorted({r.get("filter_symbol") or "" for r in all_rows if r.get("filter_symbol")}) return trend, roll, symbols