ea92160d54
Records were still in the database but CSS clipping hid row text; restore visible summary styling and symbol fallback from snapshots. Co-authored-by: Cursor <cursoragent@cursor.com>
384 lines
11 KiB
Python
384 lines
11 KiB
Python
"""策略结束快照:趋势回调 / 顺势加仓(四所共用)。"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Callable, Optional
|
|
|
|
STRATEGY_TREND = "trend_pullback"
|
|
STRATEGY_ROLL = "roll"
|
|
STRATEGY_SNAPSHOTS_MAX_ROWS = 100
|
|
|
|
STRATEGY_SNAPSHOTS_SQL = """
|
|
CREATE TABLE IF NOT EXISTS strategy_trade_snapshots (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
strategy_type TEXT NOT NULL,
|
|
source_id INTEGER,
|
|
symbol TEXT,
|
|
exchange_symbol TEXT,
|
|
direction TEXT,
|
|
result_label TEXT,
|
|
status_at_close TEXT,
|
|
opened_at TEXT,
|
|
closed_at TEXT,
|
|
pnl_amount REAL,
|
|
snapshot_json TEXT NOT NULL,
|
|
created_at TEXT
|
|
)
|
|
"""
|
|
|
|
|
|
def init_strategy_snapshot_table(conn) -> None:
|
|
conn.execute(STRATEGY_SNAPSHOTS_SQL)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_strategy_snapshots_closed "
|
|
"ON strategy_trade_snapshots(closed_at DESC)"
|
|
)
|
|
conn.execute(
|
|
"CREATE INDEX IF NOT EXISTS idx_strategy_snapshots_type "
|
|
"ON strategy_trade_snapshots(strategy_type, source_id)"
|
|
)
|
|
|
|
|
|
def _row_dict(row) -> dict:
|
|
if row is None:
|
|
return {}
|
|
try:
|
|
return dict(row)
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _json_dumps(obj: Any) -> str:
|
|
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
|
|
|
|
|
|
def build_trend_dca_levels(plan: dict) -> list[dict]:
|
|
"""首仓 + 补仓档位列表(供策略页 / 中控)。"""
|
|
out: list[dict] = []
|
|
p = plan or {}
|
|
try:
|
|
legs_done = int(p.get("legs_done") or 0)
|
|
except (TypeError, ValueError):
|
|
legs_done = 0
|
|
try:
|
|
dca_legs = int(p.get("dca_legs") or 0)
|
|
except (TypeError, ValueError):
|
|
dca_legs = 0
|
|
first_done = int(p.get("first_order_done") or 0) != 0
|
|
try:
|
|
grid = json.loads(p.get("grid_prices_json") or "[]")
|
|
if not isinstance(grid, list):
|
|
grid = []
|
|
except Exception:
|
|
grid = []
|
|
try:
|
|
leg_amounts = json.loads(p.get("leg_amounts_json") or "[]")
|
|
if not isinstance(leg_amounts, list):
|
|
leg_amounts = []
|
|
except Exception:
|
|
leg_amounts = []
|
|
|
|
out.append(
|
|
{
|
|
"i": 0,
|
|
"leg_key": "first",
|
|
"label": "首仓",
|
|
"price": None,
|
|
"contracts": p.get("first_order_amount"),
|
|
"status": "done" if first_done else "pending",
|
|
"status_label": "已开仓" if first_done else "待开仓",
|
|
}
|
|
)
|
|
n = max(len(grid), len(leg_amounts), dca_legs)
|
|
for idx in range(n):
|
|
leg_i = idx + 1
|
|
price = grid[idx] if idx < len(grid) else None
|
|
contracts = leg_amounts[idx] if idx < len(leg_amounts) else None
|
|
done = leg_i <= legs_done
|
|
out.append(
|
|
{
|
|
"i": leg_i,
|
|
"leg_key": f"dca_{leg_i}",
|
|
"label": f"补仓{leg_i}",
|
|
"price": price,
|
|
"contracts": contracts,
|
|
"status": "done" if done else "pending",
|
|
"status_label": "已补仓" if done else "待补仓",
|
|
}
|
|
)
|
|
return out
|
|
|
|
|
|
def attach_trend_dca_levels(plan: dict) -> dict:
|
|
from strategy_trend_lib import enrich_trend_dca_levels_with_tp
|
|
|
|
d = dict(plan or {})
|
|
levels = build_trend_dca_levels(d)
|
|
d["dca_levels"] = enrich_trend_dca_levels_with_tp(d, levels)
|
|
return d
|
|
|
|
|
|
def save_trend_plan_snapshot(
|
|
cfg: dict,
|
|
conn,
|
|
plan_row: Any,
|
|
*,
|
|
result_label: str,
|
|
exit_price: float | None = None,
|
|
pnl_amount: float | None = None,
|
|
closed_at: str | None = None,
|
|
) -> None:
|
|
init_strategy_snapshot_table(conn)
|
|
row = _row_dict(plan_row)
|
|
plan_id = int(row.get("id") or 0)
|
|
if plan_id <= 0:
|
|
return
|
|
m = cfg.get("app_module")
|
|
close_ts = (closed_at or "").strip() or (
|
|
m.app_now_str()
|
|
if m is not None and hasattr(m, "app_now_str")
|
|
else datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
)
|
|
payload = attach_trend_dca_levels(row)
|
|
payload["result_label"] = result_label
|
|
payload["exit_price"] = exit_price
|
|
payload["pnl_amount"] = pnl_amount
|
|
payload["status_at_close"] = row.get("status")
|
|
conn.execute(
|
|
"""INSERT INTO strategy_trade_snapshots (
|
|
strategy_type, source_id, symbol, exchange_symbol, direction,
|
|
result_label, status_at_close, opened_at, closed_at, pnl_amount, snapshot_json, created_at
|
|
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
|
|
(
|
|
STRATEGY_TREND,
|
|
plan_id,
|
|
row.get("symbol"),
|
|
row.get("exchange_symbol"),
|
|
row.get("direction"),
|
|
result_label,
|
|
row.get("status"),
|
|
row.get("opened_at"),
|
|
close_ts,
|
|
pnl_amount,
|
|
_json_dumps(payload),
|
|
close_ts,
|
|
),
|
|
)
|
|
prune_strategy_snapshots(conn, keep=STRATEGY_SNAPSHOTS_MAX_ROWS)
|
|
|
|
|
|
def save_roll_group_snapshot(
|
|
cfg: dict,
|
|
conn,
|
|
group: dict,
|
|
*,
|
|
result_label: str = "结束",
|
|
pnl_amount: float | None = None,
|
|
) -> None:
|
|
init_strategy_snapshot_table(conn)
|
|
g = dict(group or {})
|
|
gid = int(g.get("id") or 0)
|
|
if gid <= 0:
|
|
return
|
|
legs = []
|
|
for leg in conn.execute(
|
|
"SELECT * FROM roll_legs WHERE roll_group_id=? ORDER BY leg_index ASC, id ASC",
|
|
(gid,),
|
|
).fetchall():
|
|
ld = _row_dict(leg)
|
|
try:
|
|
from strategy_roll_monitor_lib import roll_leg_status_label
|
|
|
|
ld["status_label"] = roll_leg_status_label(ld.get("status"))
|
|
except Exception:
|
|
ld["status_label"] = ld.get("status") or ""
|
|
legs.append(ld)
|
|
m = cfg.get("app_module")
|
|
closed_at = (
|
|
m.app_now_str()
|
|
if m is not None and hasattr(m, "app_now_str")
|
|
else datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
|
|
)
|
|
payload = {
|
|
"group": g,
|
|
"legs": legs,
|
|
"result_label": result_label,
|
|
"pnl_amount": pnl_amount,
|
|
}
|
|
conn.execute(
|
|
"""INSERT INTO strategy_trade_snapshots (
|
|
strategy_type, source_id, symbol, exchange_symbol, direction,
|
|
result_label, status_at_close, opened_at, closed_at, pnl_amount, snapshot_json, created_at
|
|
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)""",
|
|
(
|
|
STRATEGY_ROLL,
|
|
gid,
|
|
g.get("symbol"),
|
|
g.get("exchange_symbol"),
|
|
g.get("direction"),
|
|
result_label,
|
|
g.get("status"),
|
|
g.get("created_at"),
|
|
closed_at,
|
|
pnl_amount,
|
|
_json_dumps(payload),
|
|
closed_at,
|
|
),
|
|
)
|
|
prune_strategy_snapshots(conn, keep=STRATEGY_SNAPSHOTS_MAX_ROWS)
|
|
|
|
|
|
def prune_strategy_snapshots(conn, *, keep: int = STRATEGY_SNAPSHOTS_MAX_ROWS) -> None:
|
|
"""仅保留最近 keep 条策略快照(按 closed_at / id 倒序)。"""
|
|
k = max(1, min(int(keep), 500))
|
|
conn.execute(
|
|
"""DELETE FROM strategy_trade_snapshots
|
|
WHERE id NOT IN (
|
|
SELECT id FROM strategy_trade_snapshots
|
|
ORDER BY COALESCE(closed_at, created_at, '') DESC, id DESC
|
|
LIMIT ?
|
|
)""",
|
|
(k,),
|
|
)
|
|
|
|
|
|
def _snapshot_pnl(row: dict, snap: dict) -> float | None:
|
|
for key in ("pnl_amount",):
|
|
v = row.get(key)
|
|
if v is not None and v != "":
|
|
try:
|
|
return float(v)
|
|
except (TypeError, ValueError):
|
|
pass
|
|
v = snap.get("pnl_amount")
|
|
if v is not None and v != "":
|
|
try:
|
|
return float(v)
|
|
except (TypeError, ValueError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def _trend_dca_stats(snap: dict) -> dict:
|
|
levels = snap.get("dca_levels") or build_trend_dca_levels(snap)
|
|
dca_only = [
|
|
lv
|
|
for lv in levels
|
|
if (lv.get("leg_key") or "") != "first" and (lv.get("label") or "") != "首仓"
|
|
]
|
|
done = sum(1 for lv in dca_only if lv.get("status") == "done")
|
|
total = len(dca_only)
|
|
pending = total - done
|
|
if total <= 0:
|
|
tag = "na"
|
|
elif done <= 0:
|
|
tag = "no_dca"
|
|
elif done >= total:
|
|
tag = "dca_done"
|
|
else:
|
|
tag = "dca_partial"
|
|
return {
|
|
"dca_done": done,
|
|
"dca_total": total,
|
|
"dca_pending": pending,
|
|
"dca_tag": tag,
|
|
}
|
|
|
|
|
|
def _roll_leg_stats(snap: dict) -> dict:
|
|
legs = snap.get("legs") or []
|
|
if not isinstance(legs, list):
|
|
legs = []
|
|
filled = sum(1 for lg in legs if (lg.get("status") or "").lower() == "filled")
|
|
total = len(legs)
|
|
pending = total - filled
|
|
if total <= 0:
|
|
tag = "na"
|
|
elif filled <= 0:
|
|
tag = "no_dca"
|
|
elif filled >= total:
|
|
tag = "dca_done"
|
|
else:
|
|
tag = "dca_partial"
|
|
return {
|
|
"dca_done": filled,
|
|
"dca_total": total,
|
|
"dca_pending": pending,
|
|
"dca_tag": tag,
|
|
}
|
|
|
|
|
|
def enrich_strategy_snapshot_row(row: dict) -> dict:
|
|
d = dict(row or {})
|
|
snap = d.get("snapshot") or {}
|
|
st = (d.get("strategy_type") or "").strip()
|
|
pnl = _snapshot_pnl(d, snap)
|
|
if pnl is not None:
|
|
if pnl > 1e-9:
|
|
d["filter_pnl"] = "profit"
|
|
elif pnl < -1e-9:
|
|
d["filter_pnl"] = "loss"
|
|
else:
|
|
d["filter_pnl"] = "flat"
|
|
else:
|
|
d["filter_pnl"] = "unknown"
|
|
snap_sym = ""
|
|
if isinstance(snap, dict):
|
|
snap_sym = (snap.get("symbol") or snap.get("exchange_symbol") or "").strip()
|
|
sym = (d.get("symbol") or d.get("exchange_symbol") or snap_sym or "").strip()
|
|
if sym:
|
|
d["symbol"] = d.get("symbol") or sym
|
|
d["exchange_symbol"] = d.get("exchange_symbol") or sym
|
|
d["filter_symbol"] = sym.upper().split("/")[0].split(":")[0] if sym else ""
|
|
closed = (d.get("closed_at") or d.get("created_at") or "").strip()
|
|
d["sort_ts"] = closed
|
|
if st == STRATEGY_TREND:
|
|
stats = _trend_dca_stats(snap)
|
|
d.update(stats)
|
|
legs_txt = (
|
|
f"{stats['dca_done']}/{stats['dca_total']}"
|
|
if stats["dca_total"] > 0
|
|
else "0/0"
|
|
)
|
|
d["summary_dca"] = legs_txt
|
|
else:
|
|
stats = _roll_leg_stats(snap)
|
|
d.update(stats)
|
|
d["summary_dca"] = (
|
|
f"{stats['dca_done']}/{stats['dca_total']}腿"
|
|
if stats["dca_total"] > 0
|
|
else "—"
|
|
)
|
|
return d
|
|
|
|
|
|
def list_strategy_snapshots(conn, *, limit: int = 200) -> list[dict]:
|
|
init_strategy_snapshot_table(conn)
|
|
rows = conn.execute(
|
|
"SELECT * FROM strategy_trade_snapshots ORDER BY id DESC LIMIT ?",
|
|
(max(1, min(int(limit), 500)),),
|
|
).fetchall()
|
|
out = []
|
|
for r in rows:
|
|
d = _row_dict(r)
|
|
try:
|
|
d["snapshot"] = json.loads(d.get("snapshot_json") or "{}")
|
|
except Exception:
|
|
d["snapshot"] = {}
|
|
st = (d.get("strategy_type") or "").strip()
|
|
d["strategy_label"] = "趋势回调" if st == STRATEGY_TREND else "顺势加仓"
|
|
out.append(enrich_strategy_snapshot_row(d))
|
|
return out
|
|
|
|
|
|
def list_strategy_snapshots_split(
|
|
conn, *, limit: int = STRATEGY_SNAPSHOTS_MAX_ROWS
|
|
) -> tuple[list[dict], list[dict], list[str]]:
|
|
"""趋势 / 顺势分组,及筛选用币种列表。"""
|
|
all_rows = list_strategy_snapshots(conn, limit=limit)
|
|
trend = [r for r in all_rows if (r.get("strategy_type") or "") == STRATEGY_TREND]
|
|
roll = [r for r in all_rows if (r.get("strategy_type") or "") == STRATEGY_ROLL]
|
|
symbols = sorted({r.get("filter_symbol") or "" for r in all_rows if r.get("filter_symbol")})
|
|
return trend, roll, symbols
|