Files
crypto_monitor/strategy_snapshot_lib.py
dekun cfa28e7f4e Fix hub full-close double-booking trend plans.
Sync active plans after hub position close, merge final close snapshots per plan, and backfill missing trade records when ending an already-stopped plan.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-08 09:06:36 +08:00

530 lines
17 KiB
Python

"""策略结束快照:趋势回调 / 顺势加仓(四所共用)。"""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Any, Callable, Optional
STRATEGY_TREND = "trend_pullback"
STRATEGY_ROLL = "roll"
STRATEGY_SNAPSHOTS_MAX_ROWS = 100
# 同一趋势计划只允许一条「结束类」快照(中控全平 + 监控止损 + 实例结束计划)
FINAL_TREND_CLOSE_RANK = {
"手动平仓": 3,
"止盈": 2,
"止损": 1,
}
FINAL_TREND_CLOSE_LABELS = tuple(FINAL_TREND_CLOSE_RANK.keys())
STRATEGY_SNAPSHOTS_SQL = """
CREATE TABLE IF NOT EXISTS strategy_trade_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
strategy_type TEXT NOT NULL,
source_id INTEGER,
symbol TEXT,
exchange_symbol TEXT,
direction TEXT,
result_label TEXT,
status_at_close TEXT,
opened_at TEXT,
closed_at TEXT,
pnl_amount REAL,
snapshot_json TEXT NOT NULL,
created_at TEXT
)
"""
def init_strategy_snapshot_table(conn) -> None:
conn.execute(STRATEGY_SNAPSHOTS_SQL)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_strategy_snapshots_closed "
"ON strategy_trade_snapshots(closed_at DESC)"
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_strategy_snapshots_type "
"ON strategy_trade_snapshots(strategy_type, source_id)"
)
def _row_dict(row) -> dict:
if row is None:
return {}
try:
return dict(row)
except Exception:
return {}
def _json_dumps(obj: Any) -> str:
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
def build_trend_dca_levels(plan: dict) -> list[dict]:
"""首仓 + 补仓档位列表(供策略页 / 中控)。"""
out: list[dict] = []
p = plan or {}
try:
legs_done = int(p.get("legs_done") or 0)
except (TypeError, ValueError):
legs_done = 0
try:
dca_legs = int(p.get("dca_legs") or 0)
except (TypeError, ValueError):
dca_legs = 0
first_done = int(p.get("first_order_done") or 0) != 0
try:
grid = json.loads(p.get("grid_prices_json") or "[]")
if not isinstance(grid, list):
grid = []
except Exception:
grid = []
try:
leg_amounts = json.loads(p.get("leg_amounts_json") or "[]")
if not isinstance(leg_amounts, list):
leg_amounts = []
except Exception:
leg_amounts = []
out.append(
{
"i": 0,
"leg_key": "first",
"label": "首仓",
"price": None,
"contracts": p.get("first_order_amount"),
"status": "done" if first_done else "pending",
"status_label": "已开仓" if first_done else "待开仓",
}
)
n = max(len(grid), len(leg_amounts), dca_legs)
for idx in range(n):
leg_i = idx + 1
price = grid[idx] if idx < len(grid) else None
contracts = leg_amounts[idx] if idx < len(leg_amounts) else None
done = leg_i <= legs_done
out.append(
{
"i": leg_i,
"leg_key": f"dca_{leg_i}",
"label": f"补仓{leg_i}",
"price": price,
"contracts": contracts,
"status": "done" if done else "pending",
"status_label": "已补仓" if done else "待补仓",
}
)
return out
def attach_trend_dca_levels(plan: dict) -> dict:
from strategy_trend_lib import enrich_trend_dca_levels_with_tp
d = dict(plan or {})
levels = build_trend_dca_levels(d)
d["dca_levels"] = enrich_trend_dca_levels_with_tp(d, levels)
return d
def _snapshot_key_exists(
conn, strategy_type: str, source_id: int, result_label: str
) -> bool:
if source_id <= 0:
return False
label = (result_label or "").strip()
row = conn.execute(
"""SELECT 1 FROM strategy_trade_snapshots
WHERE strategy_type=? AND source_id=? AND result_label=?
LIMIT 1""",
(strategy_type, int(source_id), label),
).fetchone()
return row is not None
def _final_trend_close_rank(result_label: str) -> int:
return int(FINAL_TREND_CLOSE_RANK.get((result_label or "").strip(), 0))
def _purge_weaker_trend_final_snapshots(
conn, plan_id: int, result_label: str
) -> None:
"""写入更高优先级结束快照时,删除同计划较弱的结束记录。"""
rank = _final_trend_close_rank(result_label)
if rank <= 0 or plan_id <= 0:
return
for label, lr in FINAL_TREND_CLOSE_RANK.items():
if lr < rank:
conn.execute(
"""DELETE FROM strategy_trade_snapshots
WHERE strategy_type=? AND source_id=? AND result_label=?""",
(STRATEGY_TREND, int(plan_id), label),
)
def dedupe_strategy_snapshots(conn) -> int:
"""删除重复快照:同结果去重 + 同计划仅保留最高优先级结束类记录。"""
init_strategy_snapshot_table(conn)
removed = 0
cur = conn.execute(
"""DELETE FROM strategy_trade_snapshots
WHERE id IN (
SELECT s1.id FROM strategy_trade_snapshots s1
INNER JOIN strategy_trade_snapshots s2
ON s1.strategy_type = s2.strategy_type
AND s1.source_id = s2.source_id
AND s1.result_label = s2.result_label
AND s1.id < s2.id
)"""
)
removed += int(getattr(cur, "rowcount", 0) or 0)
rows = conn.execute(
f"""SELECT id, source_id, result_label FROM strategy_trade_snapshots
WHERE strategy_type=? AND result_label IN ({",".join("?" * len(FINAL_TREND_CLOSE_LABELS))})""",
(STRATEGY_TREND, *FINAL_TREND_CLOSE_LABELS),
).fetchall()
by_plan: dict[int, list] = {}
for row in rows:
d = _row_dict(row)
try:
pid = int(d.get("source_id") or 0)
except (TypeError, ValueError):
pid = 0
if pid <= 0:
continue
by_plan.setdefault(pid, []).append(d)
drop_ids: list[int] = []
for snaps in by_plan.values():
if len(snaps) <= 1:
continue
best = max(
snaps,
key=lambda s: (
_final_trend_close_rank(str(s.get("result_label") or "")),
int(s.get("id") or 0),
),
)
keep_id = int(best.get("id") or 0)
for s in snaps:
sid = int(s.get("id") or 0)
if sid and sid != keep_id:
drop_ids.append(sid)
if drop_ids:
placeholders = ",".join("?" * len(drop_ids))
cur2 = conn.execute(
f"DELETE FROM strategy_trade_snapshots WHERE id IN ({placeholders})",
drop_ids,
)
removed += int(getattr(cur2, "rowcount", 0) or 0)
return removed
def save_trend_plan_snapshot(
cfg: dict,
conn,
plan_row: Any,
*,
result_label: str,
exit_price: float | None = None,
pnl_amount: float | None = None,
closed_at: str | None = None,
) -> None:
init_strategy_snapshot_table(conn)
row = _row_dict(plan_row)
plan_id = int(row.get("id") or 0)
if plan_id <= 0:
return
label = (result_label or "").strip()
close_rank = _final_trend_close_rank(label)
if close_rank > 0:
existing = conn.execute(
f"""SELECT result_label FROM strategy_trade_snapshots
WHERE strategy_type=? AND source_id=? AND result_label IN ({",".join("?" * len(FINAL_TREND_CLOSE_LABELS))})""",
(STRATEGY_TREND, plan_id, *FINAL_TREND_CLOSE_LABELS),
).fetchall()
for ex in existing:
ex_label = str(_row_dict(ex).get("result_label") or "")
if _final_trend_close_rank(ex_label) >= close_rank:
return
_purge_weaker_trend_final_snapshots(conn, plan_id, label)
elif _snapshot_key_exists(conn, STRATEGY_TREND, plan_id, label):
return
m = cfg.get("app_module")
close_ts = (closed_at or "").strip() or (
m.app_now_str()
if m is not None and hasattr(m, "app_now_str")
else datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
)
payload = attach_trend_dca_levels(row)
payload["result_label"] = result_label
payload["exit_price"] = exit_price
payload["pnl_amount"] = pnl_amount
payload["status_at_close"] = row.get("status")
conn.execute(
"""INSERT INTO strategy_trade_snapshots (
strategy_type, source_id, symbol, exchange_symbol, direction,
result_label, status_at_close, opened_at, closed_at, pnl_amount, snapshot_json, created_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(
STRATEGY_TREND,
plan_id,
row.get("symbol"),
row.get("exchange_symbol"),
row.get("direction"),
result_label,
row.get("status"),
row.get("opened_at"),
close_ts,
pnl_amount,
_json_dumps(payload),
close_ts,
),
)
prune_strategy_snapshots(conn, keep=STRATEGY_SNAPSHOTS_MAX_ROWS)
def save_roll_group_snapshot(
cfg: dict,
conn,
group: dict,
*,
result_label: str = "结束",
pnl_amount: float | None = None,
) -> None:
init_strategy_snapshot_table(conn)
g = dict(group or {})
gid = int(g.get("id") or 0)
if gid <= 0:
return
label = (result_label or "结束").strip()
if _snapshot_key_exists(conn, STRATEGY_ROLL, gid, label):
return
legs = []
for leg in conn.execute(
"SELECT * FROM roll_legs WHERE roll_group_id=? ORDER BY leg_index ASC, id ASC",
(gid,),
).fetchall():
ld = _row_dict(leg)
try:
from strategy_roll_monitor_lib import roll_leg_status_label
ld["status_label"] = roll_leg_status_label(ld.get("status"))
except Exception:
ld["status_label"] = ld.get("status") or ""
legs.append(ld)
m = cfg.get("app_module")
closed_at = (
m.app_now_str()
if m is not None and hasattr(m, "app_now_str")
else datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
)
payload = {
"group": g,
"legs": legs,
"result_label": result_label,
"pnl_amount": pnl_amount,
}
conn.execute(
"""INSERT INTO strategy_trade_snapshots (
strategy_type, source_id, symbol, exchange_symbol, direction,
result_label, status_at_close, opened_at, closed_at, pnl_amount, snapshot_json, created_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
(
STRATEGY_ROLL,
gid,
g.get("symbol"),
g.get("exchange_symbol"),
g.get("direction"),
result_label,
g.get("status"),
g.get("created_at"),
closed_at,
pnl_amount,
_json_dumps(payload),
closed_at,
),
)
prune_strategy_snapshots(conn, keep=STRATEGY_SNAPSHOTS_MAX_ROWS)
def prune_strategy_snapshots(conn, *, keep: int = STRATEGY_SNAPSHOTS_MAX_ROWS) -> None:
"""仅保留最近 keep 条策略快照(按 closed_at / id 倒序)。"""
dedupe_strategy_snapshots(conn)
k = max(1, min(int(keep), 500))
conn.execute(
"""DELETE FROM strategy_trade_snapshots
WHERE id NOT IN (
SELECT id FROM strategy_trade_snapshots
ORDER BY COALESCE(closed_at, created_at, '') DESC, id DESC
LIMIT ?
)""",
(k,),
)
def _snapshot_pnl(row: dict, snap: dict) -> float | None:
for key in ("pnl_amount",):
v = row.get(key)
if v is not None and v != "":
try:
return float(v)
except (TypeError, ValueError):
pass
v = snap.get("pnl_amount")
if v is not None and v != "":
try:
return float(v)
except (TypeError, ValueError):
pass
return None
def _trend_dca_stats(snap: dict) -> dict:
levels = snap.get("dca_levels") or build_trend_dca_levels(snap)
dca_only = [
lv
for lv in levels
if (lv.get("leg_key") or "") != "first" and (lv.get("label") or "") != "首仓"
]
done = sum(1 for lv in dca_only if lv.get("status") == "done")
total = len(dca_only)
pending = total - done
if total <= 0:
tag = "na"
elif done <= 0:
tag = "no_dca"
elif done >= total:
tag = "dca_done"
else:
tag = "dca_partial"
return {
"dca_done": done,
"dca_total": total,
"dca_pending": pending,
"dca_tag": tag,
}
def _roll_leg_stats(snap: dict) -> dict:
legs = snap.get("legs") or []
if not isinstance(legs, list):
legs = []
filled = sum(1 for lg in legs if (lg.get("status") or "").lower() == "filled")
total = len(legs)
pending = total - filled
if total <= 0:
tag = "na"
elif filled <= 0:
tag = "no_dca"
elif filled >= total:
tag = "dca_done"
else:
tag = "dca_partial"
return {
"dca_done": filled,
"dca_total": total,
"dca_pending": pending,
"dca_tag": tag,
}
def enrich_strategy_snapshot_row(row: dict) -> dict:
d = dict(row or {})
snap = d.get("snapshot") or {}
st = (d.get("strategy_type") or "").strip()
pnl = _snapshot_pnl(d, snap)
if pnl is not None:
if pnl > 1e-9:
d["filter_pnl"] = "profit"
elif pnl < -1e-9:
d["filter_pnl"] = "loss"
else:
d["filter_pnl"] = "flat"
else:
d["filter_pnl"] = "unknown"
snap_sym = ""
if isinstance(snap, dict):
snap_sym = (snap.get("symbol") or snap.get("exchange_symbol") or "").strip()
sym = (d.get("symbol") or d.get("exchange_symbol") or snap_sym or "").strip()
if sym:
d["symbol"] = d.get("symbol") or sym
d["exchange_symbol"] = d.get("exchange_symbol") or sym
d["filter_symbol"] = sym.upper().split("/")[0].split(":")[0] if sym else ""
closed = (d.get("closed_at") or d.get("created_at") or "").strip()
d["sort_ts"] = closed
if st == STRATEGY_TREND:
stats = _trend_dca_stats(snap)
d.update(stats)
legs_txt = (
f"{stats['dca_done']}/{stats['dca_total']}"
if stats["dca_total"] > 0
else "0/0"
)
d["summary_dca"] = legs_txt
else:
stats = _roll_leg_stats(snap)
d.update(stats)
d["summary_dca"] = (
f"{stats['dca_done']}/{stats['dca_total']}"
if stats["dca_total"] > 0
else ""
)
return d
def list_strategy_snapshots(conn, *, limit: int = 200) -> list[dict]:
init_strategy_snapshot_table(conn)
rows = conn.execute(
"SELECT * FROM strategy_trade_snapshots ORDER BY id DESC LIMIT ?",
(max(1, min(int(limit), 500)),),
).fetchall()
out = []
seen: dict[tuple[str, int, str], int] = {}
for r in rows:
d = _row_dict(r)
try:
d["snapshot"] = json.loads(d.get("snapshot_json") or "{}")
except Exception:
d["snapshot"] = {}
st = (d.get("strategy_type") or "").strip()
d["strategy_label"] = "趋势回调" if st == STRATEGY_TREND else "顺势加仓"
enriched = enrich_strategy_snapshot_row(d)
try:
source_id = int(enriched.get("source_id") or 0)
except (TypeError, ValueError):
source_id = 0
result_label = (enriched.get("result_label") or "").strip()
close_rank = _final_trend_close_rank(result_label)
if st == STRATEGY_TREND and source_id > 0 and close_rank > 0:
plan_key = (st, source_id)
snap_id = int(enriched.get("id") or 0)
prev = seen.get(plan_key)
if prev is not None:
prev_id, prev_rank = prev
if prev_rank > close_rank or (prev_rank == close_rank and prev_id >= snap_id):
continue
out = [x for x in out if int(x.get("id") or 0) != prev_id]
seen[plan_key] = (snap_id, close_rank)
out.append(enriched)
continue
key = (st, source_id, result_label)
snap_id = int(enriched.get("id") or 0)
prev = seen.get(key)
if prev is not None and prev[0] >= snap_id:
continue
if prev is not None:
out = [x for x in out if int(x.get("id") or 0) != prev[0]]
seen[key] = (snap_id, 0)
out.append(enriched)
return out
def list_strategy_snapshots_split(
conn, *, limit: int = STRATEGY_SNAPSHOTS_MAX_ROWS
) -> tuple[list[dict], list[dict], list[str]]:
"""趋势 / 顺势分组,及筛选用币种列表。"""
all_rows = list_strategy_snapshots(conn, limit=limit)
trend = [r for r in all_rows if (r.get("strategy_type") or "") == STRATEGY_TREND]
roll = [r for r in all_rows if (r.get("strategy_type") or "") == STRATEGY_ROLL]
symbols = sorted({r.get("filter_symbol") or "" for r in all_rows if r.get("filter_symbol")})
return trend, roll, symbols