"""策略结束快照:趋势回调 / 顺势加仓(四所共用)。""" from __future__ import annotations import json from datetime import datetime, timezone from typing import Any, Callable, Optional STRATEGY_TREND = "trend_pullback" STRATEGY_ROLL = "roll" STRATEGY_SNAPSHOTS_MAX_ROWS = 100 # 同一趋势计划只允许一条「结束类」快照(中控全平 + 监控止损 + 实例结束计划) FINAL_TREND_CLOSE_RANK = { "手动平仓": 3, "止盈": 2, "止损": 1, } FINAL_TREND_CLOSE_LABELS = tuple(FINAL_TREND_CLOSE_RANK.keys()) STRATEGY_SNAPSHOTS_SQL = """ CREATE TABLE IF NOT EXISTS strategy_trade_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, strategy_type TEXT NOT NULL, source_id INTEGER, symbol TEXT, exchange_symbol TEXT, direction TEXT, result_label TEXT, status_at_close TEXT, opened_at TEXT, closed_at TEXT, pnl_amount REAL, snapshot_json TEXT NOT NULL, created_at TEXT ) """ def init_strategy_snapshot_table(conn) -> None: conn.execute(STRATEGY_SNAPSHOTS_SQL) conn.execute( "CREATE INDEX IF NOT EXISTS idx_strategy_snapshots_closed " "ON strategy_trade_snapshots(closed_at DESC)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_strategy_snapshots_type " "ON strategy_trade_snapshots(strategy_type, source_id)" ) def _row_dict(row) -> dict: if row is None: return {} try: return dict(row) except Exception: return {} def _json_dumps(obj: Any) -> str: return json.dumps(obj, ensure_ascii=False, separators=(",", ":")) def build_trend_dca_levels(plan: dict) -> list[dict]: """首仓 + 补仓档位列表(供策略页 / 中控)。""" out: list[dict] = [] p = plan or {} try: legs_done = int(p.get("legs_done") or 0) except (TypeError, ValueError): legs_done = 0 try: dca_legs = int(p.get("dca_legs") or 0) except (TypeError, ValueError): dca_legs = 0 first_done = int(p.get("first_order_done") or 0) != 0 try: grid = json.loads(p.get("grid_prices_json") or "[]") if not isinstance(grid, list): grid = [] except Exception: grid = [] try: leg_amounts = json.loads(p.get("leg_amounts_json") or "[]") if not isinstance(leg_amounts, list): leg_amounts = [] except Exception: leg_amounts = [] out.append( { "i": 0, "leg_key": "first", "label": "首仓", "price": None, "contracts": p.get("first_order_amount"), "status": "done" if first_done else "pending", "status_label": "已开仓" if first_done else "待开仓", } ) n = max(len(grid), len(leg_amounts), dca_legs) for idx in range(n): leg_i = idx + 1 price = grid[idx] if idx < len(grid) else None contracts = leg_amounts[idx] if idx < len(leg_amounts) else None done = leg_i <= legs_done out.append( { "i": leg_i, "leg_key": f"dca_{leg_i}", "label": f"补仓{leg_i}", "price": price, "contracts": contracts, "status": "done" if done else "pending", "status_label": "已补仓" if done else "待补仓", } ) return out def attach_trend_dca_levels(plan: dict) -> dict: from strategy_trend_lib import enrich_trend_dca_levels_with_tp d = dict(plan or {}) levels = build_trend_dca_levels(d) d["dca_levels"] = enrich_trend_dca_levels_with_tp(d, levels) return d def _snapshot_key_exists( conn, strategy_type: str, source_id: int, result_label: str ) -> bool: if source_id <= 0: return False label = (result_label or "").strip() row = conn.execute( """SELECT 1 FROM strategy_trade_snapshots WHERE strategy_type=? AND source_id=? AND result_label=? LIMIT 1""", (strategy_type, int(source_id), label), ).fetchone() return row is not None def _final_trend_close_rank(result_label: str) -> int: return int(FINAL_TREND_CLOSE_RANK.get((result_label or "").strip(), 0)) def _purge_weaker_trend_final_snapshots( conn, plan_id: int, result_label: str ) -> None: """写入更高优先级结束快照时,删除同计划较弱的结束记录。""" rank = _final_trend_close_rank(result_label) if rank <= 0 or plan_id <= 0: return for label, lr in FINAL_TREND_CLOSE_RANK.items(): if lr < rank: conn.execute( """DELETE FROM strategy_trade_snapshots WHERE strategy_type=? AND source_id=? AND result_label=?""", (STRATEGY_TREND, int(plan_id), label), ) def dedupe_strategy_snapshots(conn) -> int: """删除重复快照:同结果去重 + 同计划仅保留最高优先级结束类记录。""" init_strategy_snapshot_table(conn) removed = 0 cur = conn.execute( """DELETE FROM strategy_trade_snapshots WHERE id IN ( SELECT s1.id FROM strategy_trade_snapshots s1 INNER JOIN strategy_trade_snapshots s2 ON s1.strategy_type = s2.strategy_type AND s1.source_id = s2.source_id AND s1.result_label = s2.result_label AND s1.id < s2.id )""" ) removed += int(getattr(cur, "rowcount", 0) or 0) rows = conn.execute( f"""SELECT id, source_id, result_label FROM strategy_trade_snapshots WHERE strategy_type=? AND result_label IN ({",".join("?" * len(FINAL_TREND_CLOSE_LABELS))})""", (STRATEGY_TREND, *FINAL_TREND_CLOSE_LABELS), ).fetchall() by_plan: dict[int, list] = {} for row in rows: d = _row_dict(row) try: pid = int(d.get("source_id") or 0) except (TypeError, ValueError): pid = 0 if pid <= 0: continue by_plan.setdefault(pid, []).append(d) drop_ids: list[int] = [] for snaps in by_plan.values(): if len(snaps) <= 1: continue best = max( snaps, key=lambda s: ( _final_trend_close_rank(str(s.get("result_label") or "")), int(s.get("id") or 0), ), ) keep_id = int(best.get("id") or 0) for s in snaps: sid = int(s.get("id") or 0) if sid and sid != keep_id: drop_ids.append(sid) if drop_ids: placeholders = ",".join("?" * len(drop_ids)) cur2 = conn.execute( f"DELETE FROM strategy_trade_snapshots WHERE id IN ({placeholders})", drop_ids, ) removed += int(getattr(cur2, "rowcount", 0) or 0) return removed def save_trend_plan_snapshot( cfg: dict, conn, plan_row: Any, *, result_label: str, exit_price: float | None = None, pnl_amount: float | None = None, closed_at: str | None = None, ) -> None: init_strategy_snapshot_table(conn) row = _row_dict(plan_row) plan_id = int(row.get("id") or 0) if plan_id <= 0: return label = (result_label or "").strip() close_rank = _final_trend_close_rank(label) if close_rank > 0: existing = conn.execute( f"""SELECT result_label FROM strategy_trade_snapshots WHERE strategy_type=? AND source_id=? AND result_label IN ({",".join("?" * len(FINAL_TREND_CLOSE_LABELS))})""", (STRATEGY_TREND, plan_id, *FINAL_TREND_CLOSE_LABELS), ).fetchall() for ex in existing: ex_label = str(_row_dict(ex).get("result_label") or "") if _final_trend_close_rank(ex_label) >= close_rank: return _purge_weaker_trend_final_snapshots(conn, plan_id, label) elif _snapshot_key_exists(conn, STRATEGY_TREND, plan_id, label): return m = cfg.get("app_module") close_ts = (closed_at or "").strip() or ( m.app_now_str() if m is not None and hasattr(m, "app_now_str") else datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") ) payload = attach_trend_dca_levels(row) payload["result_label"] = result_label payload["exit_price"] = exit_price payload["pnl_amount"] = pnl_amount payload["status_at_close"] = row.get("status") conn.execute( """INSERT INTO strategy_trade_snapshots ( strategy_type, source_id, symbol, exchange_symbol, direction, result_label, status_at_close, opened_at, closed_at, pnl_amount, snapshot_json, created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", ( STRATEGY_TREND, plan_id, row.get("symbol"), row.get("exchange_symbol"), row.get("direction"), result_label, row.get("status"), row.get("opened_at"), close_ts, pnl_amount, _json_dumps(payload), close_ts, ), ) prune_strategy_snapshots(conn, keep=STRATEGY_SNAPSHOTS_MAX_ROWS) def save_roll_group_snapshot( cfg: dict, conn, group: dict, *, result_label: str = "结束", pnl_amount: float | None = None, ) -> None: init_strategy_snapshot_table(conn) g = dict(group or {}) gid = int(g.get("id") or 0) if gid <= 0: return label = (result_label or "结束").strip() if _snapshot_key_exists(conn, STRATEGY_ROLL, gid, label): return legs = [] for leg in conn.execute( "SELECT * FROM roll_legs WHERE roll_group_id=? ORDER BY leg_index ASC, id ASC", (gid,), ).fetchall(): ld = _row_dict(leg) try: from strategy_roll_monitor_lib import roll_leg_status_label ld["status_label"] = roll_leg_status_label(ld.get("status")) except Exception: ld["status_label"] = ld.get("status") or "" legs.append(ld) m = cfg.get("app_module") closed_at = ( m.app_now_str() if m is not None and hasattr(m, "app_now_str") else datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S") ) payload = { "group": g, "legs": legs, "result_label": result_label, "pnl_amount": pnl_amount, } conn.execute( """INSERT INTO strategy_trade_snapshots ( strategy_type, source_id, symbol, exchange_symbol, direction, result_label, status_at_close, opened_at, closed_at, pnl_amount, snapshot_json, created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""", ( STRATEGY_ROLL, gid, g.get("symbol"), g.get("exchange_symbol"), g.get("direction"), result_label, g.get("status"), g.get("created_at"), closed_at, pnl_amount, _json_dumps(payload), closed_at, ), ) prune_strategy_snapshots(conn, keep=STRATEGY_SNAPSHOTS_MAX_ROWS) def prune_strategy_snapshots(conn, *, keep: int = STRATEGY_SNAPSHOTS_MAX_ROWS) -> None: """仅保留最近 keep 条策略快照(按 closed_at / id 倒序)。""" dedupe_strategy_snapshots(conn) k = max(1, min(int(keep), 500)) conn.execute( """DELETE FROM strategy_trade_snapshots WHERE id NOT IN ( SELECT id FROM strategy_trade_snapshots ORDER BY COALESCE(closed_at, created_at, '') DESC, id DESC LIMIT ? )""", (k,), ) def _snapshot_pnl(row: dict, snap: dict) -> float | None: for key in ("pnl_amount",): v = row.get(key) if v is not None and v != "": try: return float(v) except (TypeError, ValueError): pass v = snap.get("pnl_amount") if v is not None and v != "": try: return float(v) except (TypeError, ValueError): pass return None def _trend_dca_stats(snap: dict) -> dict: levels = snap.get("dca_levels") or build_trend_dca_levels(snap) dca_only = [ lv for lv in levels if (lv.get("leg_key") or "") != "first" and (lv.get("label") or "") != "首仓" ] done = sum(1 for lv in dca_only if lv.get("status") == "done") total = len(dca_only) pending = total - done if total <= 0: tag = "na" elif done <= 0: tag = "no_dca" elif done >= total: tag = "dca_done" else: tag = "dca_partial" return { "dca_done": done, "dca_total": total, "dca_pending": pending, "dca_tag": tag, } def _roll_leg_stats(snap: dict) -> dict: legs = snap.get("legs") or [] if not isinstance(legs, list): legs = [] filled = sum(1 for lg in legs if (lg.get("status") or "").lower() == "filled") total = len(legs) pending = total - filled if total <= 0: tag = "na" elif filled <= 0: tag = "no_dca" elif filled >= total: tag = "dca_done" else: tag = "dca_partial" return { "dca_done": filled, "dca_total": total, "dca_pending": pending, "dca_tag": tag, } def enrich_strategy_snapshot_row(row: dict) -> dict: d = dict(row or {}) snap = d.get("snapshot") or {} st = (d.get("strategy_type") or "").strip() pnl = _snapshot_pnl(d, snap) if pnl is not None: if pnl > 1e-9: d["filter_pnl"] = "profit" elif pnl < -1e-9: d["filter_pnl"] = "loss" else: d["filter_pnl"] = "flat" else: d["filter_pnl"] = "unknown" snap_sym = "" if isinstance(snap, dict): snap_sym = (snap.get("symbol") or snap.get("exchange_symbol") or "").strip() sym = (d.get("symbol") or d.get("exchange_symbol") or snap_sym or "").strip() if sym: d["symbol"] = d.get("symbol") or sym d["exchange_symbol"] = d.get("exchange_symbol") or sym d["filter_symbol"] = sym.upper().split("/")[0].split(":")[0] if sym else "" closed = (d.get("closed_at") or d.get("created_at") or "").strip() d["sort_ts"] = closed if st == STRATEGY_TREND: stats = _trend_dca_stats(snap) d.update(stats) legs_txt = ( f"{stats['dca_done']}/{stats['dca_total']}" if stats["dca_total"] > 0 else "0/0" ) d["summary_dca"] = legs_txt else: stats = _roll_leg_stats(snap) d.update(stats) d["summary_dca"] = ( f"{stats['dca_done']}/{stats['dca_total']}腿" if stats["dca_total"] > 0 else "—" ) return d def list_strategy_snapshots(conn, *, limit: int = 200) -> list[dict]: init_strategy_snapshot_table(conn) rows = conn.execute( "SELECT * FROM strategy_trade_snapshots ORDER BY id DESC LIMIT ?", (max(1, min(int(limit), 500)),), ).fetchall() out = [] seen: dict[tuple[str, int, str], int] = {} for r in rows: d = _row_dict(r) try: d["snapshot"] = json.loads(d.get("snapshot_json") or "{}") except Exception: d["snapshot"] = {} st = (d.get("strategy_type") or "").strip() d["strategy_label"] = "趋势回调" if st == STRATEGY_TREND else "顺势加仓" enriched = enrich_strategy_snapshot_row(d) try: source_id = int(enriched.get("source_id") or 0) except (TypeError, ValueError): source_id = 0 result_label = (enriched.get("result_label") or "").strip() close_rank = _final_trend_close_rank(result_label) if st == STRATEGY_TREND and source_id > 0 and close_rank > 0: plan_key = (st, source_id) snap_id = int(enriched.get("id") or 0) prev = seen.get(plan_key) if prev is not None: prev_id, prev_rank = prev if prev_rank > close_rank or (prev_rank == close_rank and prev_id >= snap_id): continue out = [x for x in out if int(x.get("id") or 0) != prev_id] seen[plan_key] = (snap_id, close_rank) out.append(enriched) continue key = (st, source_id, result_label) snap_id = int(enriched.get("id") or 0) prev = seen.get(key) if prev is not None and prev[0] >= snap_id: continue if prev is not None: out = [x for x in out if int(x.get("id") or 0) != prev[0]] seen[key] = (snap_id, 0) out.append(enriched) return out def list_strategy_snapshots_split( conn, *, limit: int = STRATEGY_SNAPSHOTS_MAX_ROWS ) -> tuple[list[dict], list[dict], list[str]]: """趋势 / 顺势分组,及筛选用币种列表。""" all_rows = list_strategy_snapshots(conn, limit=limit) trend = [r for r in all_rows if (r.get("strategy_type") or "") == STRATEGY_TREND] roll = [r for r in all_rows if (r.get("strategy_type") or "") == STRATEGY_ROLL] symbols = sorted({r.get("filter_symbol") or "" for r in all_rows if r.get("filter_symbol")}) return trend, roll, symbols