diff --git a/crypto_monitor_gate_bot/app.py b/crypto_monitor_gate_bot/app.py index 60a2c7c..593f251 100644 --- a/crypto_monitor_gate_bot/app.py +++ b/crypto_monitor_gate_bot/app.py @@ -6902,6 +6902,30 @@ def stop_trend_pullback(pid): conn = get_db() row = conn.execute("SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,)).fetchone() if not row: + from strategy_trend_register import ( + _ensure_trend_plan_trade_record, + _trend_plan_trade_exists, + build_trend_config, + ) + + cfg = app.extensions.get("strategy_trend_cfg") or build_trend_config( + sys.modules[__name__] + ) + stopped = conn.execute( + "SELECT id FROM trend_pullback_plans WHERE id=? " + "AND status IN ('stopped_sl','stopped_tp','stopped_manual')", + (pid,), + ).fetchone() + if stopped and not _trend_plan_trade_exists(conn, pid): + try: + if _ensure_trend_plan_trade_record(cfg, conn, pid, prefer_label="手动平仓"): + conn.close() + flash("计划已结束,已补录缺失的交易记录") + return redirect(url_for("strategy_trend_page")) + except Exception as e: + conn.close() + flash(f"补录交易记录失败:{e}") + return redirect(url_for("strategy_trend_page")) conn.close() flash("未找到运行中的趋势回调计划") return redirect("/trade") diff --git a/hub_bridge.py b/hub_bridge.py index 29da12b..db1255c 100644 --- a/hub_bridge.py +++ b/hub_bridge.py @@ -587,6 +587,38 @@ def register_hub_routes(app): return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 return jsonify(_invoke_view_get("stop_trend_pullback", f"/stop_trend_pullback/{pid}")) + @app.route("/api/hub/trend/sync-flat", methods=["POST"]) + @_hub_auth_required + def api_hub_trend_sync_flat(): + """中控市价全平后:结束仍 active 的同币种同向趋势计划。""" + if not _ctx().get("has_trend"): + return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 + body = request.get_json(silent=True) or {} + symbol = (body.get("symbol") or request.form.get("symbol") or "").strip() + side = ( + body.get("side") + or body.get("direction") + or request.form.get("side") + or "" + ).strip().lower() + if not symbol: + return jsonify({"ok": False, "msg": "symbol 不能为空"}), 400 + if side not in ("long", "short"): + return jsonify({"ok": False, "msg": "side 须为 long 或 short"}), 400 + cfg = current_app.extensions.get("strategy_trend_cfg") + get_db = _ctx().get("get_db") + if not cfg or not callable(get_db): + return jsonify({"ok": False, "msg": "趋势配置未就绪"}), 500 + from strategy_trend_register import sync_trend_plans_after_external_close + + conn = get_db() + try: + return jsonify(sync_trend_plans_after_external_close(cfg, conn, symbol, side)) + except Exception as e: + return jsonify({"ok": False, "msg": str(e)}), 500 + finally: + conn.close() + @app.route("/api/hub/trend/breakeven/", methods=["POST"]) @_hub_auth_required def api_hub_trend_breakeven(pid): diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index 53042e7..83c0e12 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -1586,6 +1586,17 @@ async def api_close_position(exchange_id: str, body: ClosePositionBody): "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } + if out.get("ok") and "trend" in (ex.get("capabilities") or []): + async with httpx.AsyncClient() as flask_client: + sync_parsed = await _fetch_flask_json( + flask_client, + ex, + "/api/hub/trend/sync-flat", + method="POST", + json_body={"symbol": sym, "side": side}, + ) + if isinstance(sync_parsed, dict): + out["trend_sync"] = sync_parsed _schedule_board_refresh() return out diff --git a/strategy_snapshot_lib.py b/strategy_snapshot_lib.py index 31b107e..caeb141 100644 --- a/strategy_snapshot_lib.py +++ b/strategy_snapshot_lib.py @@ -8,6 +8,13 @@ from typing import Any, Callable, Optional STRATEGY_TREND = "trend_pullback" STRATEGY_ROLL = "roll" STRATEGY_SNAPSHOTS_MAX_ROWS = 100 +# 同一趋势计划只允许一条「结束类」快照(中控全平 + 监控止损 + 实例结束计划) +FINAL_TREND_CLOSE_RANK = { + "手动平仓": 3, + "止盈": 2, + "止损": 1, +} +FINAL_TREND_CLOSE_LABELS = tuple(FINAL_TREND_CLOSE_RANK.keys()) STRATEGY_SNAPSHOTS_SQL = """ CREATE TABLE IF NOT EXISTS strategy_trade_snapshots ( @@ -134,9 +141,30 @@ def _snapshot_key_exists( return row is not None +def _final_trend_close_rank(result_label: str) -> int: + return int(FINAL_TREND_CLOSE_RANK.get((result_label or "").strip(), 0)) + + +def _purge_weaker_trend_final_snapshots( + conn, plan_id: int, result_label: str +) -> None: + """写入更高优先级结束快照时,删除同计划较弱的结束记录。""" + rank = _final_trend_close_rank(result_label) + if rank <= 0 or plan_id <= 0: + return + for label, lr in FINAL_TREND_CLOSE_RANK.items(): + if lr < rank: + conn.execute( + """DELETE FROM strategy_trade_snapshots + WHERE strategy_type=? AND source_id=? AND result_label=?""", + (STRATEGY_TREND, int(plan_id), label), + ) + + def dedupe_strategy_snapshots(conn) -> int: - """删除同源同结果的重复快照,仅保留每组最大 id。""" + """删除重复快照:同结果去重 + 同计划仅保留最高优先级结束类记录。""" init_strategy_snapshot_table(conn) + removed = 0 cur = conn.execute( """DELETE FROM strategy_trade_snapshots WHERE id IN ( @@ -148,7 +176,46 @@ def dedupe_strategy_snapshots(conn) -> int: AND s1.id < s2.id )""" ) - return int(getattr(cur, "rowcount", 0) or 0) + removed += int(getattr(cur, "rowcount", 0) or 0) + rows = conn.execute( + f"""SELECT id, source_id, result_label FROM strategy_trade_snapshots + WHERE strategy_type=? AND result_label IN ({",".join("?" * len(FINAL_TREND_CLOSE_LABELS))})""", + (STRATEGY_TREND, *FINAL_TREND_CLOSE_LABELS), + ).fetchall() + by_plan: dict[int, list] = {} + for row in rows: + d = _row_dict(row) + try: + pid = int(d.get("source_id") or 0) + except (TypeError, ValueError): + pid = 0 + if pid <= 0: + continue + by_plan.setdefault(pid, []).append(d) + drop_ids: list[int] = [] + for snaps in by_plan.values(): + if len(snaps) <= 1: + continue + best = max( + snaps, + key=lambda s: ( + _final_trend_close_rank(str(s.get("result_label") or "")), + int(s.get("id") or 0), + ), + ) + keep_id = int(best.get("id") or 0) + for s in snaps: + sid = int(s.get("id") or 0) + if sid and sid != keep_id: + drop_ids.append(sid) + if drop_ids: + placeholders = ",".join("?" * len(drop_ids)) + cur2 = conn.execute( + f"DELETE FROM strategy_trade_snapshots WHERE id IN ({placeholders})", + drop_ids, + ) + removed += int(getattr(cur2, "rowcount", 0) or 0) + return removed def save_trend_plan_snapshot( @@ -167,7 +234,19 @@ def save_trend_plan_snapshot( if plan_id <= 0: return label = (result_label or "").strip() - if _snapshot_key_exists(conn, STRATEGY_TREND, plan_id, label): + close_rank = _final_trend_close_rank(label) + if close_rank > 0: + existing = conn.execute( + f"""SELECT result_label FROM strategy_trade_snapshots + WHERE strategy_type=? AND source_id=? AND result_label IN ({",".join("?" * len(FINAL_TREND_CLOSE_LABELS))})""", + (STRATEGY_TREND, plan_id, *FINAL_TREND_CLOSE_LABELS), + ).fetchall() + for ex in existing: + ex_label = str(_row_dict(ex).get("result_label") or "") + if _final_trend_close_rank(ex_label) >= close_rank: + return + _purge_weaker_trend_final_snapshots(conn, plan_id, label) + elif _snapshot_key_exists(conn, STRATEGY_TREND, plan_id, label): return m = cfg.get("app_module") close_ts = (closed_at or "").strip() or ( @@ -413,14 +492,28 @@ def list_strategy_snapshots(conn, *, limit: int = 200) -> list[dict]: source_id = int(enriched.get("source_id") or 0) except (TypeError, ValueError): source_id = 0 - key = (st, source_id, (enriched.get("result_label") or "").strip()) + result_label = (enriched.get("result_label") or "").strip() + close_rank = _final_trend_close_rank(result_label) + if st == STRATEGY_TREND and source_id > 0 and close_rank > 0: + plan_key = (st, source_id) + snap_id = int(enriched.get("id") or 0) + prev = seen.get(plan_key) + if prev is not None: + prev_id, prev_rank = prev + if prev_rank > close_rank or (prev_rank == close_rank and prev_id >= snap_id): + continue + out = [x for x in out if int(x.get("id") or 0) != prev_id] + seen[plan_key] = (snap_id, close_rank) + out.append(enriched) + continue + key = (st, source_id, result_label) snap_id = int(enriched.get("id") or 0) prev = seen.get(key) - if prev is not None and prev >= snap_id: + if prev is not None and prev[0] >= snap_id: continue if prev is not None: - out = [x for x in out if int(x.get("id") or 0) != prev] - seen[key] = snap_id + out = [x for x in out if int(x.get("id") or 0) != prev[0]] + seen[key] = (snap_id, 0) out.append(enriched) return out diff --git a/strategy_trend_register.py b/strategy_trend_register.py index a054756..ba32346 100644 --- a/strategy_trend_register.py +++ b/strategy_trend_register.py @@ -657,6 +657,136 @@ def _call_insert_trade_record(m, plan_id: int, kwargs: dict) -> None: fn(**call) +def _best_trend_close_snapshot(conn, plan_id: int) -> dict | None: + from strategy_snapshot_lib import ( + FINAL_TREND_CLOSE_LABELS, + STRATEGY_TREND, + _final_trend_close_rank, + ) + + rows = conn.execute( + f"""SELECT * FROM strategy_trade_snapshots + WHERE strategy_type=? AND source_id=? + AND result_label IN ({",".join("?" * len(FINAL_TREND_CLOSE_LABELS))})""", + (STRATEGY_TREND, int(plan_id), *FINAL_TREND_CLOSE_LABELS), + ).fetchall() + if not rows: + return None + parsed = [_row_dict(row) for row in rows] + return max( + parsed, + key=lambda d: ( + _final_trend_close_rank(str(d.get("result_label") or "")), + int(d.get("id") or 0), + ), + ) + + +def _ensure_trend_plan_trade_record( + cfg: dict, conn, plan_id: int, *, prefer_label: str = "手动平仓" +) -> bool: + """计划已结束但 trade_records 缺失时,从策略快照补录一条。""" + if _trend_plan_trade_exists(conn, plan_id): + return True + m = _m(cfg) + plan = conn.execute( + "SELECT * FROM trend_pullback_plans WHERE id=?", (int(plan_id),) + ).fetchone() + if not plan: + return False + plan_d = _row_dict(plan) + snap = _best_trend_close_snapshot(conn, plan_id) + if not snap: + return False + try: + payload = json.loads(snap.get("snapshot_json") or "{}") + except Exception: + payload = {} + sym = snap.get("symbol") or plan_d.get("symbol") or payload.get("symbol") + direction = snap.get("direction") or plan_d.get("direction") or "long" + result = (prefer_label or "").strip() or (snap.get("result_label") or "").strip() or "手动平仓" + opened_at = snap.get("opened_at") or plan_d.get("opened_at") + closed_at = snap.get("closed_at") + pnl_amount = snap.get("pnl_amount") + if pnl_amount is None: + pnl_amount = payload.get("pnl_amount") + avg_e = float(payload.get("avg_entry_price") or plan_d.get("avg_entry_price") or 0) + margin_cap = trend_effective_margin_capital(plan_d) + lev = int(plan_d.get("leverage") or 1) + hold_seconds = m.calc_hold_seconds( + opened_at or "", + m.parse_dt_for_trading_day(closed_at) or m.app_now(), + ) + res = m.normalize_result_with_pnl(result, float(pnl_amount or 0)) + risk_amt = m.calc_risk_amount_from_plan( + direction, + float(plan_d.get("add_upper") or 0), + float(plan_d.get("stop_loss") or 0), + float(plan_d.get("plan_margin_capital") or 0), + lev, + ) + planned_rr = m.calc_rr_ratio( + direction, + avg_e, + float(plan_d.get("stop_loss") or 0), + float(plan_d.get("take_profit") or 0), + ) + session_date = plan_d.get("session_date") or m.get_trading_day() + _bump_session_capital_no_commit(m, conn, session_date, float(pnl_amount or 0)) + _call_insert_trade_record( + m, + plan_id, + dict( + conn=conn, + symbol=sym, + monitor_type=MONITOR_TYPE_TREND, + direction=direction, + trigger_price=avg_e, + stop_loss=float(plan_d.get("stop_loss") or 0), + initial_stop_loss=float(plan_d.get("initial_stop_loss") or plan_d.get("stop_loss") or 0), + take_profit=float(plan_d.get("take_profit") or 0), + margin_capital=margin_cap, + leverage=lev, + pnl_amount=pnl_amount, + hold_seconds=hold_seconds, + trade_style="trend_pullback", + risk_amount=risk_amt, + planned_rr=planned_rr, + actual_rr=m.calc_actual_rr(pnl_amount, risk_amt), + result=res, + opened_at=opened_at, + closed_at=closed_at, + entry_reason=ENTRY_REASON_TREND_PULLBACK, + ), + ) + conn.commit() + return True + + +def sync_trend_plans_after_external_close( + cfg: dict, conn, symbol: str, direction: str +) -> dict[str, Any]: + """中控/外部全平后:结束仍 active 的同币种同向趋势计划(避免监控再记一条止损)。""" + m = _m(cfg) + sym = m.normalize_symbol_input(symbol) if hasattr(m, "normalize_symbol_input") else (symbol or "").strip() + if not sym: + return {"ok": False, "msg": "symbol 无效", "finalized": 0} + direction = (direction or "long").strip().lower() + rows = conn.execute( + "SELECT * FROM trend_pullback_plans WHERE status='active' AND symbol=? AND direction=?", + (sym, direction), + ).fetchall() + finalized = 0 + for row in rows: + px = m.get_price(row["symbol"]) + exit_p = float(px) if px is not None else 0.0 + before = _trend_plan_trade_exists(conn, int(row["id"])) + _finalize_plan(cfg, conn, row, "手动平仓", exit_p) + if not before: + finalized += 1 + return {"ok": True, "finalized": finalized, "symbol": sym, "direction": direction} + + def _trend_plan_trade_exists(conn, plan_id: int) -> bool: try: return conn.execute( @@ -1664,6 +1794,21 @@ def register_trend_routes(app: Flask, cfg: dict) -> None: "SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,) ).fetchone() if not row: + stopped = conn.execute( + "SELECT * FROM trend_pullback_plans WHERE id=? " + "AND status IN ('stopped_sl','stopped_tp','stopped_manual')", + (pid,), + ).fetchone() + if stopped and not _trend_plan_trade_exists(conn, pid): + try: + if _ensure_trend_plan_trade_record(cfg, conn, pid, prefer_label="手动平仓"): + conn.close() + flash("计划已结束,已补录缺失的交易记录") + return _redirect_trend() + except Exception as e: + conn.close() + flash(f"补录交易记录失败:{e}") + return _redirect_trend() conn.close() flash("未找到运行中的趋势回调计划") return _redirect_trend() diff --git a/tests/test_strategy_snapshot_dedup.py b/tests/test_strategy_snapshot_dedup.py index ef1282c..6395159 100644 --- a/tests/test_strategy_snapshot_dedup.py +++ b/tests/test_strategy_snapshot_dedup.py @@ -143,9 +143,41 @@ def test_list_strategy_snapshots_hides_duplicate_keys(): assert int(stop_rows[0]["id"]) == 12 +def test_dedupe_keeps_manual_over_stop_loss(): + conn = _mem_conn() + payload = json.dumps({"symbol": "ONDO/USDT"}, ensure_ascii=False) + for snap_id, label in ((10, "止损"), (11, "手动平仓")): + conn.execute( + """INSERT INTO strategy_trade_snapshots ( + id, strategy_type, source_id, symbol, result_label, snapshot_json, closed_at, created_at, pnl_amount + ) VALUES (?,?,?,?,?,?,?,?,?)""", + ( + snap_id, + STRATEGY_TREND, + 7, + "ONDO/USDT", + label, + payload, + "2026-06-08 08:44:00", + "2026-06-08 08:44:00", + -2.23, + ), + ) + conn.commit() + removed = dedupe_strategy_snapshots(conn) + conn.commit() + assert removed == 1 + row = conn.execute( + "SELECT result_label FROM strategy_trade_snapshots WHERE source_id=?", + (7,), + ).fetchone() + assert row["result_label"] == "手动平仓" + + if __name__ == "__main__": test_save_trend_plan_snapshot_skips_duplicate_result() test_dedupe_strategy_snapshots_handles_many_duplicates() test_dedupe_strategy_snapshots_keeps_latest_id() test_list_strategy_snapshots_hides_duplicate_keys() + test_dedupe_keeps_manual_over_stop_loss() print("all ok")