Fix hub full-close double-booking trend plans.

Sync active plans after hub position close, merge final close snapshots per plan, and backfill missing trade records when ending an already-stopped plan.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-08 09:06:36 +08:00
parent e71bfe095c
commit cfa28e7f4e
6 changed files with 344 additions and 7 deletions
+24
View File
@@ -6902,6 +6902,30 @@ def stop_trend_pullback(pid):
conn = get_db() conn = get_db()
row = conn.execute("SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,)).fetchone() row = conn.execute("SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,)).fetchone()
if not row: if not row:
from strategy_trend_register import (
_ensure_trend_plan_trade_record,
_trend_plan_trade_exists,
build_trend_config,
)
cfg = app.extensions.get("strategy_trend_cfg") or build_trend_config(
sys.modules[__name__]
)
stopped = conn.execute(
"SELECT id FROM trend_pullback_plans WHERE id=? "
"AND status IN ('stopped_sl','stopped_tp','stopped_manual')",
(pid,),
).fetchone()
if stopped and not _trend_plan_trade_exists(conn, pid):
try:
if _ensure_trend_plan_trade_record(cfg, conn, pid, prefer_label="手动平仓"):
conn.close()
flash("计划已结束,已补录缺失的交易记录")
return redirect(url_for("strategy_trend_page"))
except Exception as e:
conn.close()
flash(f"补录交易记录失败:{e}")
return redirect(url_for("strategy_trend_page"))
conn.close() conn.close()
flash("未找到运行中的趋势回调计划") flash("未找到运行中的趋势回调计划")
return redirect("/trade") return redirect("/trade")
+32
View File
@@ -587,6 +587,38 @@ def register_hub_routes(app):
return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400
return jsonify(_invoke_view_get("stop_trend_pullback", f"/stop_trend_pullback/{pid}")) return jsonify(_invoke_view_get("stop_trend_pullback", f"/stop_trend_pullback/{pid}"))
@app.route("/api/hub/trend/sync-flat", methods=["POST"])
@_hub_auth_required
def api_hub_trend_sync_flat():
"""中控市价全平后:结束仍 active 的同币种同向趋势计划。"""
if not _ctx().get("has_trend"):
return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400
body = request.get_json(silent=True) or {}
symbol = (body.get("symbol") or request.form.get("symbol") or "").strip()
side = (
body.get("side")
or body.get("direction")
or request.form.get("side")
or ""
).strip().lower()
if not symbol:
return jsonify({"ok": False, "msg": "symbol 不能为空"}), 400
if side not in ("long", "short"):
return jsonify({"ok": False, "msg": "side 须为 long 或 short"}), 400
cfg = current_app.extensions.get("strategy_trend_cfg")
get_db = _ctx().get("get_db")
if not cfg or not callable(get_db):
return jsonify({"ok": False, "msg": "趋势配置未就绪"}), 500
from strategy_trend_register import sync_trend_plans_after_external_close
conn = get_db()
try:
return jsonify(sync_trend_plans_after_external_close(cfg, conn, symbol, side))
except Exception as e:
return jsonify({"ok": False, "msg": str(e)}), 500
finally:
conn.close()
@app.route("/api/hub/trend/breakeven/<int:pid>", methods=["POST"]) @app.route("/api/hub/trend/breakeven/<int:pid>", methods=["POST"])
@_hub_auth_required @_hub_auth_required
def api_hub_trend_breakeven(pid): def api_hub_trend_breakeven(pid):
+11
View File
@@ -1586,6 +1586,17 @@ async def api_close_position(exchange_id: str, body: ClosePositionBody):
"payload": payload, "payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")), "ok": bool(isinstance(payload, dict) and payload.get("ok")),
} }
if out.get("ok") and "trend" in (ex.get("capabilities") or []):
async with httpx.AsyncClient() as flask_client:
sync_parsed = await _fetch_flask_json(
flask_client,
ex,
"/api/hub/trend/sync-flat",
method="POST",
json_body={"symbol": sym, "side": side},
)
if isinstance(sync_parsed, dict):
out["trend_sync"] = sync_parsed
_schedule_board_refresh() _schedule_board_refresh()
return out return out
+100 -7
View File
@@ -8,6 +8,13 @@ from typing import Any, Callable, Optional
STRATEGY_TREND = "trend_pullback" STRATEGY_TREND = "trend_pullback"
STRATEGY_ROLL = "roll" STRATEGY_ROLL = "roll"
STRATEGY_SNAPSHOTS_MAX_ROWS = 100 STRATEGY_SNAPSHOTS_MAX_ROWS = 100
# 同一趋势计划只允许一条「结束类」快照(中控全平 + 监控止损 + 实例结束计划)
FINAL_TREND_CLOSE_RANK = {
"手动平仓": 3,
"止盈": 2,
"止损": 1,
}
FINAL_TREND_CLOSE_LABELS = tuple(FINAL_TREND_CLOSE_RANK.keys())
STRATEGY_SNAPSHOTS_SQL = """ STRATEGY_SNAPSHOTS_SQL = """
CREATE TABLE IF NOT EXISTS strategy_trade_snapshots ( CREATE TABLE IF NOT EXISTS strategy_trade_snapshots (
@@ -134,9 +141,30 @@ def _snapshot_key_exists(
return row is not None return row is not None
def _final_trend_close_rank(result_label: str) -> int:
return int(FINAL_TREND_CLOSE_RANK.get((result_label or "").strip(), 0))
def _purge_weaker_trend_final_snapshots(
conn, plan_id: int, result_label: str
) -> None:
"""写入更高优先级结束快照时,删除同计划较弱的结束记录。"""
rank = _final_trend_close_rank(result_label)
if rank <= 0 or plan_id <= 0:
return
for label, lr in FINAL_TREND_CLOSE_RANK.items():
if lr < rank:
conn.execute(
"""DELETE FROM strategy_trade_snapshots
WHERE strategy_type=? AND source_id=? AND result_label=?""",
(STRATEGY_TREND, int(plan_id), label),
)
def dedupe_strategy_snapshots(conn) -> int: def dedupe_strategy_snapshots(conn) -> int:
"""删除同源同结果的重复快照,仅保留每组最大 id""" """删除重复快照:同结果去重 + 同计划仅保留最高优先级结束类记录"""
init_strategy_snapshot_table(conn) init_strategy_snapshot_table(conn)
removed = 0
cur = conn.execute( cur = conn.execute(
"""DELETE FROM strategy_trade_snapshots """DELETE FROM strategy_trade_snapshots
WHERE id IN ( WHERE id IN (
@@ -148,7 +176,46 @@ def dedupe_strategy_snapshots(conn) -> int:
AND s1.id < s2.id AND s1.id < s2.id
)""" )"""
) )
return int(getattr(cur, "rowcount", 0) or 0) removed += int(getattr(cur, "rowcount", 0) or 0)
rows = conn.execute(
f"""SELECT id, source_id, result_label FROM strategy_trade_snapshots
WHERE strategy_type=? AND result_label IN ({",".join("?" * len(FINAL_TREND_CLOSE_LABELS))})""",
(STRATEGY_TREND, *FINAL_TREND_CLOSE_LABELS),
).fetchall()
by_plan: dict[int, list] = {}
for row in rows:
d = _row_dict(row)
try:
pid = int(d.get("source_id") or 0)
except (TypeError, ValueError):
pid = 0
if pid <= 0:
continue
by_plan.setdefault(pid, []).append(d)
drop_ids: list[int] = []
for snaps in by_plan.values():
if len(snaps) <= 1:
continue
best = max(
snaps,
key=lambda s: (
_final_trend_close_rank(str(s.get("result_label") or "")),
int(s.get("id") or 0),
),
)
keep_id = int(best.get("id") or 0)
for s in snaps:
sid = int(s.get("id") or 0)
if sid and sid != keep_id:
drop_ids.append(sid)
if drop_ids:
placeholders = ",".join("?" * len(drop_ids))
cur2 = conn.execute(
f"DELETE FROM strategy_trade_snapshots WHERE id IN ({placeholders})",
drop_ids,
)
removed += int(getattr(cur2, "rowcount", 0) or 0)
return removed
def save_trend_plan_snapshot( def save_trend_plan_snapshot(
@@ -167,7 +234,19 @@ def save_trend_plan_snapshot(
if plan_id <= 0: if plan_id <= 0:
return return
label = (result_label or "").strip() label = (result_label or "").strip()
if _snapshot_key_exists(conn, STRATEGY_TREND, plan_id, label): close_rank = _final_trend_close_rank(label)
if close_rank > 0:
existing = conn.execute(
f"""SELECT result_label FROM strategy_trade_snapshots
WHERE strategy_type=? AND source_id=? AND result_label IN ({",".join("?" * len(FINAL_TREND_CLOSE_LABELS))})""",
(STRATEGY_TREND, plan_id, *FINAL_TREND_CLOSE_LABELS),
).fetchall()
for ex in existing:
ex_label = str(_row_dict(ex).get("result_label") or "")
if _final_trend_close_rank(ex_label) >= close_rank:
return
_purge_weaker_trend_final_snapshots(conn, plan_id, label)
elif _snapshot_key_exists(conn, STRATEGY_TREND, plan_id, label):
return return
m = cfg.get("app_module") m = cfg.get("app_module")
close_ts = (closed_at or "").strip() or ( close_ts = (closed_at or "").strip() or (
@@ -413,14 +492,28 @@ def list_strategy_snapshots(conn, *, limit: int = 200) -> list[dict]:
source_id = int(enriched.get("source_id") or 0) source_id = int(enriched.get("source_id") or 0)
except (TypeError, ValueError): except (TypeError, ValueError):
source_id = 0 source_id = 0
key = (st, source_id, (enriched.get("result_label") or "").strip()) result_label = (enriched.get("result_label") or "").strip()
close_rank = _final_trend_close_rank(result_label)
if st == STRATEGY_TREND and source_id > 0 and close_rank > 0:
plan_key = (st, source_id)
snap_id = int(enriched.get("id") or 0)
prev = seen.get(plan_key)
if prev is not None:
prev_id, prev_rank = prev
if prev_rank > close_rank or (prev_rank == close_rank and prev_id >= snap_id):
continue
out = [x for x in out if int(x.get("id") or 0) != prev_id]
seen[plan_key] = (snap_id, close_rank)
out.append(enriched)
continue
key = (st, source_id, result_label)
snap_id = int(enriched.get("id") or 0) snap_id = int(enriched.get("id") or 0)
prev = seen.get(key) prev = seen.get(key)
if prev is not None and prev >= snap_id: if prev is not None and prev[0] >= snap_id:
continue continue
if prev is not None: if prev is not None:
out = [x for x in out if int(x.get("id") or 0) != prev] out = [x for x in out if int(x.get("id") or 0) != prev[0]]
seen[key] = snap_id seen[key] = (snap_id, 0)
out.append(enriched) out.append(enriched)
return out return out
+145
View File
@@ -657,6 +657,136 @@ def _call_insert_trade_record(m, plan_id: int, kwargs: dict) -> None:
fn(**call) fn(**call)
def _best_trend_close_snapshot(conn, plan_id: int) -> dict | None:
from strategy_snapshot_lib import (
FINAL_TREND_CLOSE_LABELS,
STRATEGY_TREND,
_final_trend_close_rank,
)
rows = conn.execute(
f"""SELECT * FROM strategy_trade_snapshots
WHERE strategy_type=? AND source_id=?
AND result_label IN ({",".join("?" * len(FINAL_TREND_CLOSE_LABELS))})""",
(STRATEGY_TREND, int(plan_id), *FINAL_TREND_CLOSE_LABELS),
).fetchall()
if not rows:
return None
parsed = [_row_dict(row) for row in rows]
return max(
parsed,
key=lambda d: (
_final_trend_close_rank(str(d.get("result_label") or "")),
int(d.get("id") or 0),
),
)
def _ensure_trend_plan_trade_record(
cfg: dict, conn, plan_id: int, *, prefer_label: str = "手动平仓"
) -> bool:
"""计划已结束但 trade_records 缺失时,从策略快照补录一条。"""
if _trend_plan_trade_exists(conn, plan_id):
return True
m = _m(cfg)
plan = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=?", (int(plan_id),)
).fetchone()
if not plan:
return False
plan_d = _row_dict(plan)
snap = _best_trend_close_snapshot(conn, plan_id)
if not snap:
return False
try:
payload = json.loads(snap.get("snapshot_json") or "{}")
except Exception:
payload = {}
sym = snap.get("symbol") or plan_d.get("symbol") or payload.get("symbol")
direction = snap.get("direction") or plan_d.get("direction") or "long"
result = (prefer_label or "").strip() or (snap.get("result_label") or "").strip() or "手动平仓"
opened_at = snap.get("opened_at") or plan_d.get("opened_at")
closed_at = snap.get("closed_at")
pnl_amount = snap.get("pnl_amount")
if pnl_amount is None:
pnl_amount = payload.get("pnl_amount")
avg_e = float(payload.get("avg_entry_price") or plan_d.get("avg_entry_price") or 0)
margin_cap = trend_effective_margin_capital(plan_d)
lev = int(plan_d.get("leverage") or 1)
hold_seconds = m.calc_hold_seconds(
opened_at or "",
m.parse_dt_for_trading_day(closed_at) or m.app_now(),
)
res = m.normalize_result_with_pnl(result, float(pnl_amount or 0))
risk_amt = m.calc_risk_amount_from_plan(
direction,
float(plan_d.get("add_upper") or 0),
float(plan_d.get("stop_loss") or 0),
float(plan_d.get("plan_margin_capital") or 0),
lev,
)
planned_rr = m.calc_rr_ratio(
direction,
avg_e,
float(plan_d.get("stop_loss") or 0),
float(plan_d.get("take_profit") or 0),
)
session_date = plan_d.get("session_date") or m.get_trading_day()
_bump_session_capital_no_commit(m, conn, session_date, float(pnl_amount or 0))
_call_insert_trade_record(
m,
plan_id,
dict(
conn=conn,
symbol=sym,
monitor_type=MONITOR_TYPE_TREND,
direction=direction,
trigger_price=avg_e,
stop_loss=float(plan_d.get("stop_loss") or 0),
initial_stop_loss=float(plan_d.get("initial_stop_loss") or plan_d.get("stop_loss") or 0),
take_profit=float(plan_d.get("take_profit") or 0),
margin_capital=margin_cap,
leverage=lev,
pnl_amount=pnl_amount,
hold_seconds=hold_seconds,
trade_style="trend_pullback",
risk_amount=risk_amt,
planned_rr=planned_rr,
actual_rr=m.calc_actual_rr(pnl_amount, risk_amt),
result=res,
opened_at=opened_at,
closed_at=closed_at,
entry_reason=ENTRY_REASON_TREND_PULLBACK,
),
)
conn.commit()
return True
def sync_trend_plans_after_external_close(
cfg: dict, conn, symbol: str, direction: str
) -> dict[str, Any]:
"""中控/外部全平后:结束仍 active 的同币种同向趋势计划(避免监控再记一条止损)。"""
m = _m(cfg)
sym = m.normalize_symbol_input(symbol) if hasattr(m, "normalize_symbol_input") else (symbol or "").strip()
if not sym:
return {"ok": False, "msg": "symbol 无效", "finalized": 0}
direction = (direction or "long").strip().lower()
rows = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active' AND symbol=? AND direction=?",
(sym, direction),
).fetchall()
finalized = 0
for row in rows:
px = m.get_price(row["symbol"])
exit_p = float(px) if px is not None else 0.0
before = _trend_plan_trade_exists(conn, int(row["id"]))
_finalize_plan(cfg, conn, row, "手动平仓", exit_p)
if not before:
finalized += 1
return {"ok": True, "finalized": finalized, "symbol": sym, "direction": direction}
def _trend_plan_trade_exists(conn, plan_id: int) -> bool: def _trend_plan_trade_exists(conn, plan_id: int) -> bool:
try: try:
return conn.execute( return conn.execute(
@@ -1664,6 +1794,21 @@ def register_trend_routes(app: Flask, cfg: dict) -> None:
"SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,) "SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,)
).fetchone() ).fetchone()
if not row: if not row:
stopped = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=? "
"AND status IN ('stopped_sl','stopped_tp','stopped_manual')",
(pid,),
).fetchone()
if stopped and not _trend_plan_trade_exists(conn, pid):
try:
if _ensure_trend_plan_trade_record(cfg, conn, pid, prefer_label="手动平仓"):
conn.close()
flash("计划已结束,已补录缺失的交易记录")
return _redirect_trend()
except Exception as e:
conn.close()
flash(f"补录交易记录失败:{e}")
return _redirect_trend()
conn.close() conn.close()
flash("未找到运行中的趋势回调计划") flash("未找到运行中的趋势回调计划")
return _redirect_trend() return _redirect_trend()
+32
View File
@@ -143,9 +143,41 @@ def test_list_strategy_snapshots_hides_duplicate_keys():
assert int(stop_rows[0]["id"]) == 12 assert int(stop_rows[0]["id"]) == 12
def test_dedupe_keeps_manual_over_stop_loss():
conn = _mem_conn()
payload = json.dumps({"symbol": "ONDO/USDT"}, ensure_ascii=False)
for snap_id, label in ((10, "止损"), (11, "手动平仓")):
conn.execute(
"""INSERT INTO strategy_trade_snapshots (
id, strategy_type, source_id, symbol, result_label, snapshot_json, closed_at, created_at, pnl_amount
) VALUES (?,?,?,?,?,?,?,?,?)""",
(
snap_id,
STRATEGY_TREND,
7,
"ONDO/USDT",
label,
payload,
"2026-06-08 08:44:00",
"2026-06-08 08:44:00",
-2.23,
),
)
conn.commit()
removed = dedupe_strategy_snapshots(conn)
conn.commit()
assert removed == 1
row = conn.execute(
"SELECT result_label FROM strategy_trade_snapshots WHERE source_id=?",
(7,),
).fetchone()
assert row["result_label"] == "手动平仓"
if __name__ == "__main__": if __name__ == "__main__":
test_save_trend_plan_snapshot_skips_duplicate_result() test_save_trend_plan_snapshot_skips_duplicate_result()
test_dedupe_strategy_snapshots_handles_many_duplicates() test_dedupe_strategy_snapshots_handles_many_duplicates()
test_dedupe_strategy_snapshots_keeps_latest_id() test_dedupe_strategy_snapshots_keeps_latest_id()
test_list_strategy_snapshots_hides_duplicate_keys() test_list_strategy_snapshots_hides_duplicate_keys()
test_dedupe_keeps_manual_over_stop_loss()
print("all ok") print("all ok")