"""策略结束快照:趋势回调 / 顺势加仓(四所共用)。""" from __future__ import annotations import json from datetime import datetime, timezone from typing import Any, Callable, Optional STRATEGY_TREND = "trend_pullback" STRATEGY_ROLL = "roll" STRATEGY_SNAPSHOTS_SQL = """ CREATE TABLE IF NOT EXISTS strategy_trade_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, strategy_type TEXT NOT NULL, source_id INTEGER, symbol TEXT, exchange_symbol TEXT, direction TEXT, result_label TEXT, status_at_close TEXT, opened_at TEXT, closed_at TEXT, pnl_amount REAL, snapshot_json TEXT NOT NULL, created_at TEXT ) """ def init_strategy_snapshot_table(conn) -> None: conn.execute(STRATEGY_SNAPSHOTS_SQL) conn.execute( "CREATE INDEX IF NOT EXISTS idx_strategy_snapshots_closed " "ON strategy_trade_snapshots(closed_at DESC)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_strategy_snapshots_type " "ON strategy_trade_snapshots(strategy_type, source_id)" ) def _row_dict(row) -> dict: if row is None: return {} try: return dict(row) except Exception: return {} def _json_dumps(obj: Any) -> str: return json.dumps(obj, ensure_ascii=False, separators=(",", ":")) def build_trend_dca_levels(plan: dict) -> list[dict]: """首仓 + 补仓档位列表(供策略页 / 中控)。""" out: list[dict] = [] p = plan or {} try: legs_done = int(p.get("legs_done") or 0) except (TypeError, ValueError): legs_done = 0 try: dca_legs = int(p.get("dca_legs") or 0) except (TypeError, ValueError): dca_legs = 0 first_done = int(p.get("first_order_done") or 0) != 0 try: grid = json.loads(p.get("grid_prices_json") or "[]") if not isinstance(grid, list): grid = [] except Exception: grid = [] try: leg_amounts = json.loads(p.get("leg_amounts_json") or "[]") if not isinstance(leg_amounts, list): leg_amounts = [] except Exception: leg_amounts = [] out.append( { "i": 0, "leg_key": "first", "label": "首仓", "price": None, "contracts": p.get("first_order_amount"), "status": "done" if first_done else "pending", "status_label": "已开仓" if first_done else "待开仓", } ) n = max(len(grid), len(leg_amounts), dca_legs) for idx in range(n): leg_i = idx + 1 price = grid[idx] if idx < len(grid) else None contracts = leg_amounts[idx] if idx < len(leg_amounts) else None done = leg_i <= legs_done out.append( { "i": leg_i, "leg_key": f"dca_{leg_i}", "label": f"补仓{leg_i}", "price": price, "contracts": contracts, "status": "done" if done else "pending", "status_label": "已补仓" if done else "待补仓", } ) return out def attach_trend_dca_levels(plan: dict) -> dict: d = dict(plan or {}) d["dca_levels"] = build_trend_dca_levels(d) return d def save_trend_plan_snapshot( cfg: dict, conn, plan_row: Any, *, result_label: str, exit_price: float | None = None, pnl_amount: float | None = None, ) -> None: init_strategy_snapshot_table(conn) row = _row_dict(plan_row) plan_id = int(row.get("id") or 0) if plan_id <= 0: return m = cfg.get("app_module") closed_at = ( m.app_now_str() if m is not None and hasattr(m, "app_now_str") else datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") ) payload = attach_trend_dca_levels(row) payload["result_label"] = result_label payload["exit_price"] = exit_price payload["pnl_amount"] = pnl_amount payload["status_at_close"] = row.get("status") conn.execute( """INSERT INTO strategy_trade_snapshots ( strategy_type, source_id, symbol, exchange_symbol, direction, result_label, status_at_close, opened_at, closed_at, pnl_amount, snapshot_json, created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", ( STRATEGY_TREND, plan_id, row.get("symbol"), row.get("exchange_symbol"), row.get("direction"), result_label, row.get("status"), row.get("opened_at"), closed_at, pnl_amount, _json_dumps(payload), closed_at, ), ) def save_roll_group_snapshot( cfg: dict, conn, group: dict, *, result_label: str = "结束", pnl_amount: float | None = None, ) -> None: init_strategy_snapshot_table(conn) g = dict(group or {}) gid = int(g.get("id") or 0) if gid <= 0: return legs = [] for leg in conn.execute( "SELECT * FROM roll_legs WHERE roll_group_id=? ORDER BY leg_index ASC, id ASC", (gid,), ).fetchall(): ld = _row_dict(leg) try: from strategy_roll_monitor_lib import roll_leg_status_label ld["status_label"] = roll_leg_status_label(ld.get("status")) except Exception: ld["status_label"] = ld.get("status") or "" legs.append(ld) m = cfg.get("app_module") closed_at = ( m.app_now_str() if m is not None and hasattr(m, "app_now_str") else datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") ) payload = { "group": g, "legs": legs, "result_label": result_label, "pnl_amount": pnl_amount, } conn.execute( """INSERT INTO strategy_trade_snapshots ( strategy_type, source_id, symbol, exchange_symbol, direction, result_label, status_at_close, opened_at, closed_at, pnl_amount, snapshot_json, created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", ( STRATEGY_ROLL, gid, g.get("symbol"), g.get("exchange_symbol"), g.get("direction"), result_label, g.get("status"), g.get("created_at"), closed_at, pnl_amount, _json_dumps(payload), closed_at, ), ) def list_strategy_snapshots(conn, *, limit: int = 200) -> list[dict]: init_strategy_snapshot_table(conn) rows = conn.execute( "SELECT * FROM strategy_trade_snapshots ORDER BY id DESC LIMIT ?", (max(1, min(int(limit), 500)),), ).fetchall() out = [] for r in rows: d = _row_dict(r) try: d["snapshot"] = json.loads(d.get("snapshot_json") or "{}") except Exception: d["snapshot"] = {} st = (d.get("strategy_type") or "").strip() d["strategy_label"] = "趋势回调" if st == STRATEGY_TREND else "顺势加仓" out.append(d) return out