"""趋势回调:路由、轮询、页面数据(四所共用,依赖各 app 模块交易所能力)。""" from __future__ import annotations import inspect import json import os import time import uuid from typing import Any, Optional from flask import Flask, flash, redirect, request, url_for from jinja2 import ChoiceLoader, FileSystemLoader from strategy_config import resolve_trading_app_module from strategy_db import init_strategy_tables from strategy_trend_exchange import ( cancel_symbol_orders, trend_market_add, trend_market_close, trend_refresh_stop_only, ) from strategy_trend_lib import ( build_grid_prices, build_leg_amounts_json, calc_risk_fraction, validate_trend_bounds, ) from strategy_trade_labels import ( ENTRY_REASON_TREND_PULLBACK, MONITOR_TYPE_TREND_PULLBACK, ) MONITOR_TYPE_TREND = MONITOR_TYPE_TREND_PULLBACK # 趋势回调:交易所报空仓需连续 N 次轮询确认,避免 OKX 等 API 瞬时误判立即结束计划 _TREND_FLAT_STREAK: dict[int, int] = {} TREND_FLAT_CONFIRM_POLLS = max(1, int(os.getenv("TREND_FLAT_CONFIRM_POLLS", "5"))) TREND_OPEN_GRACE_SEC = max(0, int(os.getenv("TREND_OPEN_GRACE_SEC", "180"))) def trend_add_zone_label(direction: str) -> str: return "补仓下沿" if (direction or "long").strip().lower() == "short" else "补仓上沿" def install_strategy_trend(app: Flask, repo_root: str, app_module: Any = None, **build_kw) -> dict: from strategy_register import attach_strategy_templates attach_strategy_templates(app, repo_root) cfg = build_trend_config(app_module, **build_kw) app.extensions["strategy_trend_cfg"] = cfg register_trend_routes(app, cfg) @app.context_processor def _trend_ctx(): return {"trend_add_zone_label": trend_add_zone_label} return cfg def build_trend_config(app_module: Any = None, **kw) -> dict[str, Any]: m = resolve_trading_app_module(app_module) dca = max(1, int(os.getenv("TREND_PULLBACK_DCA_LEGS", kw.get("dca_legs", "5")))) preview_ttl = max(10, int(os.getenv("TREND_PULLBACK_PREVIEW_TTL_SECONDS", "120"))) drift = float(os.getenv("TREND_PREVIEW_MAX_BALANCE_DRIFT_PCT", "5")) be_pct = float(os.getenv("TREND_PULLBACK_MANUAL_BREAKEVEN_OFFSET_PCT", "0.3")) buf = float(getattr(m, "FULL_MARGIN_BUFFER_RATIO", 0.95)) def amount_precise(ex_sym, amt): fn = getattr(m, "_safe_amount_to_precision", None) if callable(fn): return fn(ex_sym, amt) try: m.ensure_markets_loaded() return float(m.exchange.amount_to_precision(ex_sym, float(amt))) except Exception: return None return { "app_module": m, "login_required": m.login_required, "get_db": m.get_db, "row_to_dict": m.row_to_dict, "dca_legs": dca, "preview_ttl": preview_ttl, "drift_pct": drift, "breakeven_offset_pct": be_pct, "margin_buffer": buf, "amount_precise": amount_precise, "max_active_positions": int(getattr(m, "MAX_ACTIVE_POSITIONS", 1)), "reset_hour": int(getattr(m, "TRADING_DAY_RESET_HOUR", 8)), "monitor_type_trend": MONITOR_TYPE_TREND, } def _m(cfg: dict): return cfg["app_module"] def _row(cfg, row) -> dict: return cfg["row_to_dict"](row) def precheck_trend_start(cfg: dict, conn) -> tuple[bool, str]: m = _m(cfg) now = m.app_now() if not m.trading_day_reset_allows_new_open(now): return False, f"北京时间 {cfg['reset_hour']}:00 前不允许持仓" active = m.get_active_position_count(conn) if active >= cfg["max_active_positions"]: return ( False, f"已达最大持仓数({active}/{cfg['max_active_positions']})," "请先结束「实盘下单」中的持仓,再启动趋势回调", ) trend_n = conn.execute( "SELECT COUNT(*) FROM trend_pullback_plans WHERE status='active'" ).fetchone()[0] if int(trend_n or 0) > 0: return False, "已存在运行中的趋势回调计划" return True, "" def _cleanup_stale_previews(conn) -> None: ms = int(time.time() * 1000) stale = conn.execute( "SELECT id FROM trend_pullback_previews WHERE expires_at_ms < ?", (ms,) ).fetchall() for row in stale: try: conn.execute( "UPDATE trend_pullback_preview_snapshots SET outcome='expired' " "WHERE preview_id=? AND outcome='open'", (row["id"],), ) except Exception: pass conn.execute("DELETE FROM trend_pullback_previews WHERE expires_at_ms < ?", (ms,)) def parse_trend_plan(cfg: dict, form_dict) -> tuple[Optional[dict], Optional[str]]: m = _m(cfg) d = form_dict or {} symbol = m.normalize_symbol_input(d.get("symbol")) if not symbol: return None, "symbol 不能为空" direction = (d.get("direction") or "long").strip().lower() if direction not in ("long", "short"): return None, "方向错误" try: stop_loss = float(d.get("sl")) add_upper = float(d.get("add_upper")) take_profit = float(d.get("take_profit")) risk_percent = float(d.get("risk_percent") or "5") except Exception: return None, "价格或风险比例格式错误" try: lev_raw = m.parse_positive_float(d.get("leverage")) leverage = int(lev_raw) if lev_raw is not None else m.infer_leverage(symbol) except Exception: return None, "杠杆格式错误" if leverage <= 0 or risk_percent <= 0: return None, "杠杆与风险比例必须大于0" bound_err = validate_trend_bounds(direction, stop_loss, add_upper) if bound_err: return None, bound_err snap = m.get_available_trading_usdt() if snap is None or snap <= 0: return None, "无法读取合约账户 USDT 可用余额,请检查 API 与账户类型" live_price = m.get_price(symbol) if live_price is None: return None, "获取实时价格失败" exchange_symbol = m.normalize_exchange_symbol(symbol) rf = calc_risk_fraction(direction, add_upper, stop_loss) if rf is None or rf <= 0: return None, "止损与补仓区间边界组合无法计算风险比例" risk_budget = float(snap) * (risk_percent / 100.0) notional = risk_budget / rf margin_plan = notional / float(leverage) margin_plan = min(margin_plan, float(snap) * cfg["margin_buffer"]) if margin_plan <= 0: return None, "计划保证金过小" try: target_amt, _ = m.prepare_order_amount(exchange_symbol, margin_plan, leverage, live_price) except Exception as e: return None, str(e) ap = cfg["amount_precise"] first_amt = ap(exchange_symbol, float(target_amt) * 0.5) if first_amt is None or first_amt <= 0: return None, "首仓张数过小(低于交易所最小张数),请提高风险比例或杠杆" remainder_total = ap(exchange_symbol, max(0.0, float(target_amt) - float(first_amt))) if remainder_total is None: remainder_total = 0.0 m.ensure_markets_loaded() market = m.exchange.market(exchange_symbol) min_amt = float((market.get("limits", {}).get("amount", {}) or {}).get("min") or 0) n_legs, leg_json, per_ref = build_leg_amounts_json( exchange_symbol, remainder_total, cfg["dca_legs"], ap, min_amt ) if n_legs <= 0: return None, "剩余计划张数不足以拆出补仓档,请提高风险比例或放宽止损与补仓区间间距" grid = build_grid_prices(direction, stop_loss, add_upper, n_legs) if len(grid) != n_legs: return None, "补仓网格生成失败" opened_at = m.app_now_str() try: leg_list = json.loads(leg_json) except Exception: leg_list = [] return { "symbol": symbol, "exchange_symbol": exchange_symbol, "direction": direction, "leverage": leverage, "stop_loss": stop_loss, "add_upper": add_upper, "take_profit": take_profit, "risk_percent": risk_percent, "snapshot_available_usdt": float(snap), "snapshot_at": opened_at, "live_price_ref": float(live_price), "plan_margin_capital": float(margin_plan), "target_order_amount": float(target_amt), "first_order_amount": float(first_amt), "remainder_total": float(remainder_total), "dca_legs": int(n_legs), "per_leg_amount": float(per_ref), "grid_prices_json": json.dumps(grid), "leg_amounts_json": leg_json, "grid": grid, "leg_amounts": leg_list, }, None def _insert_preview_snapshot(conn, preview_id: str, created: str, exp_ms: int, pl: dict) -> None: conn.execute( """INSERT INTO trend_pullback_preview_snapshots ( preview_id,symbol,exchange_symbol,direction,leverage,stop_loss,add_upper,take_profit,risk_percent, snapshot_available_usdt,snapshot_at,live_price_ref,plan_margin_capital,target_order_amount,first_order_amount,remainder_total, dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,expires_at_ms,preview_created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( preview_id, pl["symbol"], pl["exchange_symbol"], pl["direction"], pl["leverage"], pl["stop_loss"], pl["add_upper"], pl["take_profit"], pl["risk_percent"], pl["snapshot_available_usdt"], pl["snapshot_at"], pl["live_price_ref"], pl["plan_margin_capital"], pl["target_order_amount"], pl["first_order_amount"], pl["remainder_total"], pl["dca_legs"], pl["per_leg_amount"], pl["grid_prices_json"], pl["leg_amounts_json"], exp_ms, created, ), ) def enrich_trend_plan(cfg: dict, row) -> dict: m = _m(cfg) d = _row(cfg, row) try: d["breakeven_applied"] = int(d.get("breakeven_applied") or 0) != 0 except Exception: d["breakeven_applied"] = False ex_sym = d.get("exchange_symbol") or m.normalize_exchange_symbol(d.get("symbol") or "") direction = (d.get("direction") or "long").lower() metrics_fn = getattr(m, "get_live_position_exchange_metrics", None) if callable(metrics_fn): met = metrics_fn(ex_sym, direction) if met and met.get("unrealized_pnl") is not None: d["floating_pnl"] = float(met["unrealized_pnl"]) else: d["floating_pnl"] = None if met and met.get("mark_price") is not None: d["floating_mark"] = float(met["mark_price"]) else: d["floating_mark"] = None else: d["floating_pnl"] = d["floating_mark"] = None return d def _weighted_avg(old_avg, old_amt, fill_px, add_amt): try: oa, aa = float(old_amt), float(add_amt) if oa <= 0: return float(fill_px) return (float(old_avg) * oa + float(fill_px) * aa) / (oa + aa) except Exception: return float(fill_px or 0) def _plan_stop_status(result_label: str) -> str: if result_label == "止盈": return "stopped_tp" if result_label == "止损": return "stopped_sl" return "stopped_manual" def _trend_plan_trade_exists(conn, plan_id: int) -> bool: try: return conn.execute( "SELECT id FROM trade_records WHERE trend_plan_id=? LIMIT 1", (int(plan_id),), ).fetchone() is not None except Exception: return False def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) -> None: m = _m(cfg) plan_id = int(row["id"]) active = conn.execute( "SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (plan_id,), ).fetchone() if not active: return row = active sym = row["symbol"] direction = row["direction"] or "long" ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(sym) closed_at = m.app_now_str() opened_at = row["opened_at"] or closed_at hold_seconds = m.calc_hold_seconds(opened_at, m.parse_dt_for_trading_day(closed_at) or m.app_now()) margin_cap = float(row["plan_margin_capital"] or 0) lev = int(row["leverage"] or 1) avg_e = float(row["avg_entry_price"] or 0) pnl_amount = m.calc_pnl(direction, avg_e, float(exit_price), margin_cap, lev) res = m.normalize_result_with_pnl(result_label, pnl_amount) risk_amt = m.calc_risk_amount_from_plan( direction, float(row["add_upper"]), float(row["stop_loss"]), margin_cap, lev ) planned_rr = m.calc_rr_ratio(direction, avg_e, float(row["stop_loss"]), float(row["take_profit"])) try: cancel_symbol_orders(cfg, ex_sym) except Exception: pass st = _plan_stop_status(result_label) cur = conn.execute( "UPDATE trend_pullback_plans SET status=?, message=? WHERE id=? AND status='active'", (st, res, plan_id), ) if not getattr(cur, "rowcount", 0): return conn.commit() if _trend_plan_trade_exists(conn, plan_id): return session_date = row["session_date"] or m.get_trading_day() session_capital = m.update_session_capital(conn, session_date, pnl_amount) kwargs = dict( conn=conn, symbol=sym, monitor_type=MONITOR_TYPE_TREND, direction=direction, trigger_price=avg_e, stop_loss=float(row["stop_loss"]), initial_stop_loss=float(row.get("initial_stop_loss") or row["stop_loss"]), take_profit=float(row["take_profit"]), margin_capital=margin_cap, leverage=lev, pnl_amount=pnl_amount, hold_seconds=hold_seconds, trade_style="trend_pullback", risk_amount=risk_amt, planned_rr=planned_rr, actual_rr=m.calc_actual_rr(pnl_amount, risk_amt), result=res, opened_at=opened_at, closed_at=closed_at, entry_reason=ENTRY_REASON_TREND_PULLBACK, ) if "trend_plan_id" in inspect.signature(m.insert_trade_record).parameters: m.insert_trade_record(**kwargs, trend_plan_id=plan_id) else: m.insert_trade_record(**kwargs) extra = getattr(m, "build_wechat_close_message", None) send = getattr(m, "send_wechat_msg", None) if callable(extra) and callable(send): send( extra( symbol=sym, direction=direction, result=f"{res}({MONITOR_TYPE_TREND})", pnl_amount=pnl_amount, hold_seconds=hold_seconds, trigger_price=avg_e, current_price=float(exit_price), stop_loss=float(row["stop_loss"]), take_profit=float(row["take_profit"]), close_order_id="-", extra_note="计划本金口径:启动时合约可用余额快照;止盈由程序监控", session_capital_fallback=session_capital, ) ) conn.commit() def _trend_plan_open_age_sec(row, m) -> float: opened_ms = None try: if "opened_at_ms" in row.keys() and row["opened_at_ms"]: opened_ms = int(row["opened_at_ms"]) except Exception: opened_ms = None to_ms = getattr(m, "_to_ms_with_fallback", None) if callable(to_ms): opened_ms = to_ms(opened_ms, row["opened_at"] if "opened_at" in row.keys() else None) if opened_ms is None and "opened_at" in row.keys(): opened_ms = to_ms(None, row["opened_at"]) if not opened_ms: return 0.0 return max(0.0, (time.time() * 1000 - opened_ms) / 1000.0) def _trend_hit_take_profit(direction: str, mark_price: float, take_profit: float, avg_entry: float) -> bool: try: pf = float(mark_price) tp = float(take_profit) entry = float(avg_entry) except (TypeError, ValueError): return False if entry <= 0 or tp <= 0: return False direction = (direction or "long").lower() if direction == "long": return tp > entry and pf >= tp return tp < entry and pf <= tp def _should_finalize_trend_flat(row, pos, plan_id: int, m) -> bool: """首仓后交易所报无仓:需过开仓宽限期 + 连续空仓轮询,避免误判止损。""" if pos is None: return False if float(pos) > 0: _TREND_FLAT_STREAK.pop(plan_id, None) return False if not int(row["first_order_done"] or 0): return False age = _trend_plan_open_age_sec(row, m) if age < TREND_OPEN_GRACE_SEC: _TREND_FLAT_STREAK.pop(plan_id, None) return False try: local_open = float(row["order_amount_open"] or 0) except (TypeError, ValueError): local_open = 0.0 required = TREND_FLAT_CONFIRM_POLLS if local_open > 0 and age < TREND_OPEN_GRACE_SEC * 2: required = max(required, TREND_FLAT_CONFIRM_POLLS * 2) streak = int(_TREND_FLAT_STREAK.get(plan_id, 0)) + 1 _TREND_FLAT_STREAK[plan_id] = streak if streak >= required: print( f"[trend_pullback] flat finalize plan={plan_id} sym={row['symbol']} " f"age={age:.0f}s streak={streak} local_open={local_open}", flush=True, ) return True return False def check_trend_pullback_plans(cfg: dict) -> None: m = _m(cfg) ok_live, _ = m.ensure_exchange_live_ready() if not ok_live: return conn = cfg["get_db"]() rows = conn.execute( "SELECT * FROM trend_pullback_plans WHERE status='active'" ).fetchall() for row in rows: try: plan_id = int(row["id"]) sym = row["symbol"] direction = (row["direction"] or "long").lower() ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(sym) sl = float(row["stop_loss"]) tp = float(row["take_profit"]) lev = int(row["leverage"] or 1) p = m.get_price(sym) if not p: continue pf = float(p) last_p = row["last_mark_price"] last_pf = float(last_p) if last_p is not None else pf pos = m.get_live_position_contracts(ex_sym, direction) if pos is None: continue try: local_open = float(row["order_amount_open"] or 0) except (TypeError, ValueError): local_open = 0.0 if float(pos) <= 0 and local_open > 0: age = _trend_plan_open_age_sec(row, m) if age < TREND_OPEN_GRACE_SEC * 2: print( f"[trend_pullback] pos fallback plan={plan_id} sym={sym} " f"ex_pos=0 local_open={local_open} age={age:.0f}s", flush=True, ) pos = local_open legs_done = int(row["legs_done"] or 0) try: leg_amounts = [float(x) for x in json.loads(row["leg_amounts_json"] or "[]")] except Exception: leg_amounts = [] try: grid = json.loads(row["grid_prices_json"] or "[]") except Exception: grid = [] avg_e = float(row["avg_entry_price"] or pf or 0) hit_tp = _trend_hit_take_profit(direction, pf, tp, avg_e) if hit_tp and pos > 0: try: close_resp = trend_market_close(cfg, ex_sym, direction, float(pos), lev) exit_p = m.extract_trade_price_from_order(close_resp) or pf except Exception as e: if not m.is_no_position_error(str(e)): continue exit_p = pf _finalize_plan(cfg, conn, row, "止盈", exit_p) _TREND_FLAT_STREAK.pop(plan_id, None) continue if _should_finalize_trend_flat(row, pos, plan_id, m): _finalize_plan(cfg, conn, row, "止损", pf) _TREND_FLAT_STREAK.pop(plan_id, None) continue if int(row["first_order_done"] or 0) and legs_done < len(grid) and legs_done < len(leg_amounts): level = float(grid[legs_done]) fired = False if direction == "long": fired = last_pf > level and pf <= level else: fired = last_pf < level and pf >= level if fired: amt = float(m.exchange.amount_to_precision(ex_sym, leg_amounts[legs_done])) if amt > 0: add_resp = trend_market_add(cfg, ex_sym, direction, amt, lev) fill_px = m.extract_trade_price_from_order(add_resp) or pf old_avg = float(row["avg_entry_price"] or fill_px) old_open = float(row["order_amount_open"] or 0) new_avg = _weighted_avg(old_avg, old_open, fill_px, amt) conn.execute( "UPDATE trend_pullback_plans SET legs_done=?, avg_entry_price=?, " "order_amount_open=?, last_mark_price=? WHERE id=?", (legs_done + 1, new_avg, old_open + amt, pf, row["id"]), ) row = conn.execute( "SELECT * FROM trend_pullback_plans WHERE id=?", (row["id"],) ).fetchone() try: trend_refresh_stop_only(cfg, ex_sym, direction, sl) except Exception: pass conn.execute( "UPDATE trend_pullback_plans SET last_mark_price=? WHERE id=?", (pf, row["id"]), ) except Exception: continue conn.commit() conn.close() def apply_manual_breakeven(cfg: dict, conn, row, offset_pct=None) -> tuple[bool, Optional[str]]: m = _m(cfg) if (row["status"] or "").strip() != "active": return False, "计划已结束" if not int(row["first_order_done"] or 0): return False, "尚未完成首仓,无法保本" avg_e = float(row["avg_entry_price"] or 0) if avg_e <= 0: return False, "缺少有效持仓均价" direction = (row["direction"] or "long").lower() ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(row["symbol"]) pos = m.get_live_position_contracts(ex_sym, direction) if pos is None or float(pos) <= 0: return False, "交易所当前无该方向持仓" be_fn = getattr(m, "calc_trend_manual_breakeven_stop", None) if not callable(be_fn): pct = float(offset_pct if offset_pct is not None else cfg["breakeven_offset_pct"]) if direction == "short": new_sl_raw = avg_e * (1.0 - pct / 100.0) else: new_sl_raw = avg_e * (1.0 + pct / 100.0) else: new_sl_raw = be_fn(direction, avg_e, offset_pct) if new_sl_raw is None: return False, "保本价计算失败" new_sl = m.round_price_to_exchange(ex_sym, new_sl_raw) if new_sl is None: return False, "保本价经交易所精度舍入后无效" new_sl = float(new_sl) cur_sl = float(row["stop_loss"] or 0) if direction == "long": if new_sl <= cur_sl: return False, f"新止损 {new_sl} 未高于当前止损 {cur_sl}(多仓需上移)" else: if new_sl >= cur_sl: return False, f"新止损 {new_sl} 未低于当前止损 {cur_sl}(空仓需下移)" try: trend_refresh_stop_only(cfg, ex_sym, direction, new_sl) except Exception as e: fe = getattr(m, "friendly_exchange_error", None) return False, fe(e) if callable(fe) else str(e) conn.execute( "UPDATE trend_pullback_plans SET stop_loss=?, breakeven_applied=1, breakeven_applied_at=? WHERE id=?", (new_sl, m.app_now_str(), row["id"]), ) return True, None def load_trend_page_context(conn, request_obj, cfg: dict) -> dict[str, Any]: m = _m(cfg) _cleanup_stale_previews(conn) trend_active = int( conn.execute( "SELECT COUNT(*) FROM trend_pullback_plans WHERE status='active'" ).fetchone()[0] or 0 ) trend_plans = [] for r in conn.execute( "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC" ).fetchall(): try: trend_plans.append(enrich_trend_plan(cfg, r)) except Exception: trend_plans.append(_row(cfg, r)) now = m.app_now() active_count = m.get_active_position_count(conn) can_trade_trend = ( m.trading_day_reset_allows_new_open(now) and active_count < cfg["max_active_positions"] and trend_active == 0 ) trend_preview = None trend_preview_levels = [] preview_expires_ms = None trend_preview_expired = False pid_arg = (request_obj.args.get("preview_id") or "").strip() if pid_arg: pr = conn.execute( "SELECT * FROM trend_pullback_previews WHERE id=?", (pid_arg,) ).fetchone() now_ms = int(time.time() * 1000) if pr and int(pr["expires_at_ms"] or 0) >= now_ms: trend_preview = _row(cfg, pr) preview_expires_ms = int(pr["expires_at_ms"]) try: grid = json.loads(trend_preview.get("grid_prices_json") or "[]") legs = json.loads(trend_preview.get("leg_amounts_json") or "[]") except Exception: grid, legs = [], [] for i, pair in enumerate(zip(grid, legs), 1): trend_preview_levels.append({"i": i, "price": pair[0], "contracts": pair[1]}) elif pr: trend_preview_expired = True return { "trend_plans": trend_plans, "trend_active": trend_active, "can_trade_trend": can_trade_trend, "trend_preview": trend_preview, "trend_preview_levels": trend_preview_levels, "preview_expires_ms": preview_expires_ms, "trend_preview_expired": trend_preview_expired, "trend_pullback_dca_legs": cfg["dca_legs"], "trend_pullback_preview_ttl": cfg["preview_ttl"], "trend_preview_max_drift_pct": cfg["drift_pct"], "trend_manual_breakeven_offset_pct": cfg["breakeven_offset_pct"], } def register_trend_routes(app: Flask, cfg: dict) -> None: lr = cfg["login_required"] get_db = cfg["get_db"] def _redirect_trend(**kw): return redirect(url_for("strategy_trading_page", **kw)) @app.route("/preview_trend_pullback", methods=["POST"]) @lr def preview_trend_pullback(): conn = get_db() init_strategy_tables(conn) okp, msg = precheck_trend_start(cfg, conn) if not okp: conn.close() flash(msg) return _redirect_trend() m = _m(cfg) ok_live, reason = m.ensure_exchange_live_ready() if not ok_live: conn.close() flash(reason) return _redirect_trend() payload, err = parse_trend_plan(cfg, request.form) if err: conn.close() flash(err) return _redirect_trend() pid = str(uuid.uuid4()) exp_ms = int(time.time() * 1000) + cfg["preview_ttl"] * 1000 created = m.app_now_str() conn.execute( """INSERT INTO trend_pullback_previews ( id,symbol,exchange_symbol,direction,leverage,stop_loss,add_upper,take_profit,risk_percent, snapshot_available_usdt,snapshot_at,live_price_ref,plan_margin_capital,target_order_amount,first_order_amount,remainder_total, dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,expires_at_ms,created_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( pid, payload["symbol"], payload["exchange_symbol"], payload["direction"], payload["leverage"], payload["stop_loss"], payload["add_upper"], payload["take_profit"], payload["risk_percent"], payload["snapshot_available_usdt"], payload["snapshot_at"], payload["live_price_ref"], payload["plan_margin_capital"], payload["target_order_amount"], payload["first_order_amount"], payload["remainder_total"], payload["dca_legs"], payload["per_leg_amount"], payload["grid_prices_json"], payload["leg_amounts_json"], exp_ms, created, ), ) _insert_preview_snapshot(conn, pid, created, exp_ms, payload) conn.commit() conn.close() flash(f"预览已生成,有效期 {cfg['preview_ttl']} 秒,请核对后点击「确认执行」。") return _redirect_trend(preview_id=pid) @app.route("/execute_trend_pullback", methods=["POST"]) @lr def execute_trend_pullback(): pid = (request.form.get("preview_id") or "").strip() if not pid: flash("缺少预览 ID") return _redirect_trend() conn = get_db() init_strategy_tables(conn) _cleanup_stale_previews(conn) pr = conn.execute( "SELECT * FROM trend_pullback_previews WHERE id=?", (pid,) ).fetchone() now_ms = int(time.time() * 1000) if not pr or int(pr["expires_at_ms"] or 0) < now_ms: conn.close() flash("预览已过期或不存在,请重新生成预览") return _redirect_trend() okp, msg = precheck_trend_start(cfg, conn) if not okp: conn.close() flash(msg) return _redirect_trend(preview_id=pid) m = _m(cfg) ok_live, reason = m.ensure_exchange_live_ready() if not ok_live: conn.close() flash(reason) return _redirect_trend(preview_id=pid) snap_prev = float(pr["snapshot_available_usdt"] or 0) snap_now = m.get_available_trading_usdt() if snap_now is None or snap_now <= 0: conn.close() flash("无法读取当前合约可用余额,请稍后重试") return _redirect_trend(preview_id=pid) drift = abs(float(snap_now) - snap_prev) / max(snap_prev, 1e-9) * 100.0 if drift > cfg["drift_pct"]: conn.close() flash( f"当前可用余额与预览快照偏差 {drift:.2f}%,超过允许 {cfg['drift_pct']}%,请重新生成预览" ) return _redirect_trend(preview_id=pid) symbol = pr["symbol"] exchange_symbol = pr["exchange_symbol"] direction = pr["direction"] or "long" leverage = int(pr["leverage"] or 1) stop_loss = float(pr["stop_loss"]) first_amt = float(pr["first_order_amount"] or 0) live_price = m.get_price(symbol) if live_price is None: conn.close() flash("获取实时价格失败") return _redirect_trend(preview_id=pid) try: o1 = m.place_exchange_order( exchange_symbol, direction, first_amt, leverage, stop_loss=None, take_profit=None ) fill1 = m.resolve_order_entry_price(o1, exchange_symbol, live_price) trend_refresh_stop_only(cfg, exchange_symbol, direction, stop_loss) except Exception as e: conn.close() fe = getattr(m, "friendly_exchange_error", lambda x, **k: str(x)) flash(fe(e, available_usdt=snap_now)) return _redirect_trend(preview_id=pid) trading_day = m.get_trading_day(m.app_now()) opened_at = m.app_now_str() opened_ms = getattr(m, "_to_ms_with_fallback", lambda a, b: None)(None, opened_at) cur = conn.execute( """INSERT INTO trend_pullback_plans ( status,symbol,exchange_symbol,direction,leverage,stop_loss,initial_stop_loss,add_upper,take_profit,risk_percent, snapshot_available_usdt,snapshot_at,plan_margin_capital,target_order_amount,first_order_amount,remainder_total, dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,legs_done,first_order_done,last_mark_price,avg_entry_price,order_amount_open,opened_at,opened_at_ms,session_date,message ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", ( "active", symbol, exchange_symbol, direction, leverage, stop_loss, stop_loss, float(pr["add_upper"]), float(pr["take_profit"]), float(pr["risk_percent"] or 5), float(snap_now), opened_at, float(pr["plan_margin_capital"] or 0), float(pr["target_order_amount"] or 0), first_amt, float(pr["remainder_total"] or 0), int(pr["dca_legs"] or 0), float(pr["per_leg_amount"] or 0), pr["grid_prices_json"] or "[]", pr["leg_amounts_json"] or "[]", 0, 1, float(live_price), fill1, first_amt, opened_at, opened_ms, trading_day, f"预览ID:{pid[:8]}…", ), ) new_id = int(cur.lastrowid) conn.execute( "UPDATE trend_pullback_preview_snapshots SET outcome='executed', executed_plan_id=? WHERE preview_id=?", (new_id, pid), ) conn.execute("DELETE FROM trend_pullback_previews WHERE id=?", (pid,)) conn.commit() conn.close() flash("趋势回调已执行:首仓已成交并挂交易所止损,止盈由程序监控。") return _redirect_trend() @app.route("/cancel_trend_pullback_preview", methods=["POST"]) @lr def cancel_trend_pullback_preview(): pid = (request.form.get("preview_id") or "").strip() conn = get_db() if pid: conn.execute( "UPDATE trend_pullback_preview_snapshots SET outcome='cancelled' WHERE preview_id=? AND outcome='open'", (pid,), ) conn.execute("DELETE FROM trend_pullback_previews WHERE id=?", (pid,)) conn.commit() conn.close() flash("已取消预览") return _redirect_trend() @app.route("/trend_pullback_breakeven/", methods=["POST"]) @lr def trend_pullback_breakeven(pid: int): offset_pct = None raw = (request.form.get("breakeven_offset_pct") or "").strip() if raw: try: offset_pct = float(raw) if offset_pct < 0: raise ValueError except ValueError: flash("保本偏移% 格式无效") return _redirect_trend() conn = get_db() row = conn.execute( "SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,) ).fetchone() if not row: conn.close() flash("未找到运行中的趋势回调计划") return _redirect_trend() ok, err = apply_manual_breakeven(cfg, conn, row, offset_pct=offset_pct) conn.commit() conn.close() flash("已手动保本" if ok else (err or "手动保本失败")) return _redirect_trend() @app.route("/stop_trend_pullback/") @lr def stop_trend_pullback(pid: int): conn = get_db() row = conn.execute( "SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,) ).fetchone() if not row: conn.close() flash("未找到运行中的趋势回调计划") return _redirect_trend() m = _m(cfg) ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(row["symbol"]) direction = row["direction"] or "long" lev = int(row["leverage"] or 1) px = m.get_price(row["symbol"]) exit_p = float(px) if px is not None else 0.0 ok_live, _ = m.ensure_exchange_live_ready() if ok_live: pos = m.get_live_position_contracts(ex_sym, direction) if pos is not None and pos > 0: try: close_resp = trend_market_close(cfg, ex_sym, direction, float(pos), lev) ep = m.extract_trade_price_from_order(close_resp) if ep: exit_p = float(ep) except Exception as e: if not m.is_no_position_error(str(e)): conn.close() flash(f"平仓失败:{e}") return _redirect_trend() try: cancel_symbol_orders(cfg, ex_sym) except Exception: pass try: _finalize_plan(cfg, conn, row, "手动平仓", exit_p) except Exception as e: conn.execute( "UPDATE trend_pullback_plans SET status='stopped_manual', message=? " "WHERE id=? AND status='active'", (f"结束异常:{e}", pid), ) conn.commit() conn.close() flash(f"计划已结束但记账可能不完整:{e}") return _redirect_trend() conn.close() flash("已结束趋势回调计划") return _redirect_trend()