Files
crypto_monitor/strategy_trend_register.py
T
2026-05-23 11:33:01 +08:00

834 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""趋势回调:路由、轮询、页面数据(四所共用,依赖各 app 模块交易所能力)。"""
from __future__ import annotations
import inspect
import json
import os
import time
import uuid
from typing import Any, Optional
from flask import Flask, flash, redirect, request, url_for
from jinja2 import ChoiceLoader, FileSystemLoader
from strategy_config import resolve_trading_app_module
from strategy_db import init_strategy_tables
from strategy_trend_exchange import (
cancel_symbol_orders,
trend_market_add,
trend_market_close,
trend_refresh_stop_only,
)
from strategy_trend_lib import (
build_grid_prices,
build_leg_amounts_json,
calc_risk_fraction,
validate_trend_bounds,
)
MONITOR_TYPE_TREND = "趋势回调"
def trend_add_zone_label(direction: str) -> str:
return "补仓下沿" if (direction or "long").strip().lower() == "short" else "补仓上沿"
def install_strategy_trend(app: Flask, repo_root: str, app_module: Any = None, **build_kw) -> dict:
from strategy_register import attach_strategy_templates
attach_strategy_templates(app, repo_root)
cfg = build_trend_config(app_module, **build_kw)
app.extensions["strategy_trend_cfg"] = cfg
register_trend_routes(app, cfg)
@app.context_processor
def _trend_ctx():
return {"trend_add_zone_label": trend_add_zone_label}
return cfg
def build_trend_config(app_module: Any = None, **kw) -> dict[str, Any]:
m = resolve_trading_app_module(app_module)
dca = max(1, int(os.getenv("TREND_PULLBACK_DCA_LEGS", kw.get("dca_legs", "5"))))
preview_ttl = max(10, int(os.getenv("TREND_PULLBACK_PREVIEW_TTL_SECONDS", "120")))
drift = float(os.getenv("TREND_PREVIEW_MAX_BALANCE_DRIFT_PCT", "5"))
be_pct = float(os.getenv("TREND_PULLBACK_MANUAL_BREAKEVEN_OFFSET_PCT", "0.3"))
buf = float(getattr(m, "FULL_MARGIN_BUFFER_RATIO", 0.95))
def amount_precise(ex_sym, amt):
fn = getattr(m, "_safe_amount_to_precision", None)
if callable(fn):
return fn(ex_sym, amt)
try:
m.ensure_markets_loaded()
return float(m.exchange.amount_to_precision(ex_sym, float(amt)))
except Exception:
return None
return {
"app_module": m,
"login_required": m.login_required,
"get_db": m.get_db,
"row_to_dict": m.row_to_dict,
"dca_legs": dca,
"preview_ttl": preview_ttl,
"drift_pct": drift,
"breakeven_offset_pct": be_pct,
"margin_buffer": buf,
"amount_precise": amount_precise,
"max_active_positions": int(getattr(m, "MAX_ACTIVE_POSITIONS", 1)),
"reset_hour": int(getattr(m, "TRADING_DAY_RESET_HOUR", 8)),
"monitor_type_trend": MONITOR_TYPE_TREND,
}
def _m(cfg: dict):
return cfg["app_module"]
def _row(cfg, row) -> dict:
return cfg["row_to_dict"](row)
def precheck_trend_start(cfg: dict, conn) -> tuple[bool, str]:
m = _m(cfg)
now = m.app_now()
if not m.trading_day_reset_allows_new_open(now):
return False, f"北京时间 {cfg['reset_hour']}:00 前不允许持仓"
active = m.get_active_position_count(conn)
if active >= cfg["max_active_positions"]:
return (
False,
f"已达最大持仓数({active}/{cfg['max_active_positions']}),"
"请先结束「实盘下单」中的持仓,再启动趋势回调",
)
trend_n = conn.execute(
"SELECT COUNT(*) FROM trend_pullback_plans WHERE status='active'"
).fetchone()[0]
if int(trend_n or 0) > 0:
return False, "已存在运行中的趋势回调计划"
return True, ""
def _cleanup_stale_previews(conn) -> None:
ms = int(time.time() * 1000)
stale = conn.execute(
"SELECT id FROM trend_pullback_previews WHERE expires_at_ms < ?", (ms,)
).fetchall()
for row in stale:
try:
conn.execute(
"UPDATE trend_pullback_preview_snapshots SET outcome='expired' "
"WHERE preview_id=? AND outcome='open'",
(row["id"],),
)
except Exception:
pass
conn.execute("DELETE FROM trend_pullback_previews WHERE expires_at_ms < ?", (ms,))
def parse_trend_plan(cfg: dict, form_dict) -> tuple[Optional[dict], Optional[str]]:
m = _m(cfg)
d = form_dict or {}
symbol = m.normalize_symbol_input(d.get("symbol"))
if not symbol:
return None, "symbol 不能为空"
direction = (d.get("direction") or "long").strip().lower()
if direction not in ("long", "short"):
return None, "方向错误"
try:
stop_loss = float(d.get("sl"))
add_upper = float(d.get("add_upper"))
take_profit = float(d.get("take_profit"))
risk_percent = float(d.get("risk_percent") or "5")
except Exception:
return None, "价格或风险比例格式错误"
try:
lev_raw = m.parse_positive_float(d.get("leverage"))
leverage = int(lev_raw) if lev_raw is not None else m.infer_leverage(symbol)
except Exception:
return None, "杠杆格式错误"
if leverage <= 0 or risk_percent <= 0:
return None, "杠杆与风险比例必须大于0"
bound_err = validate_trend_bounds(direction, stop_loss, add_upper)
if bound_err:
return None, bound_err
snap = m.get_available_trading_usdt()
if snap is None or snap <= 0:
return None, "无法读取合约账户 USDT 可用余额,请检查 API 与账户类型"
live_price = m.get_price(symbol)
if live_price is None:
return None, "获取实时价格失败"
exchange_symbol = m.normalize_exchange_symbol(symbol)
rf = calc_risk_fraction(direction, add_upper, stop_loss)
if rf is None or rf <= 0:
return None, "止损与补仓区间边界组合无法计算风险比例"
risk_budget = float(snap) * (risk_percent / 100.0)
notional = risk_budget / rf
margin_plan = notional / float(leverage)
margin_plan = min(margin_plan, float(snap) * cfg["margin_buffer"])
if margin_plan <= 0:
return None, "计划保证金过小"
try:
target_amt, _ = m.prepare_order_amount(exchange_symbol, margin_plan, leverage, live_price)
except Exception as e:
return None, str(e)
ap = cfg["amount_precise"]
first_amt = ap(exchange_symbol, float(target_amt) * 0.5)
if first_amt is None or first_amt <= 0:
return None, "首仓张数过小(低于交易所最小张数),请提高风险比例或杠杆"
remainder_total = ap(exchange_symbol, max(0.0, float(target_amt) - float(first_amt)))
if remainder_total is None:
remainder_total = 0.0
m.ensure_markets_loaded()
market = m.exchange.market(exchange_symbol)
min_amt = float((market.get("limits", {}).get("amount", {}) or {}).get("min") or 0)
n_legs, leg_json, per_ref = build_leg_amounts_json(
exchange_symbol, remainder_total, cfg["dca_legs"], ap, min_amt
)
if n_legs <= 0:
return None, "剩余计划张数不足以拆出补仓档,请提高风险比例或放宽止损与补仓区间间距"
grid = build_grid_prices(direction, stop_loss, add_upper, n_legs)
if len(grid) != n_legs:
return None, "补仓网格生成失败"
opened_at = m.app_now_str()
try:
leg_list = json.loads(leg_json)
except Exception:
leg_list = []
return {
"symbol": symbol,
"exchange_symbol": exchange_symbol,
"direction": direction,
"leverage": leverage,
"stop_loss": stop_loss,
"add_upper": add_upper,
"take_profit": take_profit,
"risk_percent": risk_percent,
"snapshot_available_usdt": float(snap),
"snapshot_at": opened_at,
"live_price_ref": float(live_price),
"plan_margin_capital": float(margin_plan),
"target_order_amount": float(target_amt),
"first_order_amount": float(first_amt),
"remainder_total": float(remainder_total),
"dca_legs": int(n_legs),
"per_leg_amount": float(per_ref),
"grid_prices_json": json.dumps(grid),
"leg_amounts_json": leg_json,
"grid": grid,
"leg_amounts": leg_list,
}, None
def _insert_preview_snapshot(conn, preview_id: str, created: str, exp_ms: int, pl: dict) -> None:
conn.execute(
"""INSERT INTO trend_pullback_preview_snapshots (
preview_id,symbol,exchange_symbol,direction,leverage,stop_loss,add_upper,take_profit,risk_percent,
snapshot_available_usdt,snapshot_at,live_price_ref,plan_margin_capital,target_order_amount,first_order_amount,remainder_total,
dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,expires_at_ms,preview_created_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
preview_id,
pl["symbol"],
pl["exchange_symbol"],
pl["direction"],
pl["leverage"],
pl["stop_loss"],
pl["add_upper"],
pl["take_profit"],
pl["risk_percent"],
pl["snapshot_available_usdt"],
pl["snapshot_at"],
pl["live_price_ref"],
pl["plan_margin_capital"],
pl["target_order_amount"],
pl["first_order_amount"],
pl["remainder_total"],
pl["dca_legs"],
pl["per_leg_amount"],
pl["grid_prices_json"],
pl["leg_amounts_json"],
exp_ms,
created,
),
)
def enrich_trend_plan(cfg: dict, row) -> dict:
m = _m(cfg)
d = _row(cfg, row)
try:
d["breakeven_applied"] = int(d.get("breakeven_applied") or 0) != 0
except Exception:
d["breakeven_applied"] = False
ex_sym = d.get("exchange_symbol") or m.normalize_exchange_symbol(d.get("symbol") or "")
direction = (d.get("direction") or "long").lower()
metrics_fn = getattr(m, "get_live_position_exchange_metrics", None)
if callable(metrics_fn):
met = metrics_fn(ex_sym, direction)
if met and met.get("unrealized_pnl") is not None:
d["floating_pnl"] = float(met["unrealized_pnl"])
else:
d["floating_pnl"] = None
if met and met.get("mark_price") is not None:
d["floating_mark"] = float(met["mark_price"])
else:
d["floating_mark"] = None
else:
d["floating_pnl"] = d["floating_mark"] = None
return d
def _weighted_avg(old_avg, old_amt, fill_px, add_amt):
try:
oa, aa = float(old_amt), float(add_amt)
if oa <= 0:
return float(fill_px)
return (float(old_avg) * oa + float(fill_px) * aa) / (oa + aa)
except Exception:
return float(fill_px or 0)
def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) -> None:
m = _m(cfg)
sym = row["symbol"]
direction = row["direction"] or "long"
ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(sym)
closed_at = m.app_now_str()
opened_at = row["opened_at"] or closed_at
hold_seconds = m.calc_hold_seconds(opened_at, m.parse_dt_for_trading_day(closed_at) or m.app_now())
margin_cap = float(row["plan_margin_capital"] or 0)
lev = int(row["leverage"] or 1)
avg_e = float(row["avg_entry_price"] or 0)
pnl_amount = m.calc_pnl(direction, avg_e, float(exit_price), margin_cap, lev)
res = m.normalize_result_with_pnl(result_label, pnl_amount)
risk_amt = m.calc_risk_amount_from_plan(
direction, float(row["add_upper"]), float(row["stop_loss"]), margin_cap, lev
)
planned_rr = m.calc_rr_ratio(direction, avg_e, float(row["stop_loss"]), float(row["take_profit"]))
session_date = row["session_date"] or m.get_trading_day()
session_capital = m.update_session_capital(conn, session_date, pnl_amount)
try:
cancel_symbol_orders(cfg, ex_sym)
except Exception:
pass
extra = getattr(m, "build_wechat_close_message", None)
send = getattr(m, "send_wechat_msg", None)
if callable(extra) and callable(send):
send(
extra(
symbol=sym,
direction=direction,
result=f"{res}{MONITOR_TYPE_TREND}",
pnl_amount=pnl_amount,
hold_seconds=hold_seconds,
trigger_price=avg_e,
current_price=float(exit_price),
stop_loss=float(row["stop_loss"]),
take_profit=float(row["take_profit"]),
close_order_id="-",
extra_note="计划本金口径:启动时合约可用余额快照;止盈由程序监控",
session_capital_fallback=session_capital,
)
)
kwargs = dict(
conn=conn,
symbol=sym,
monitor_type=MONITOR_TYPE_TREND,
direction=direction,
trigger_price=avg_e,
stop_loss=float(row["stop_loss"]),
initial_stop_loss=float(row.get("initial_stop_loss") or row["stop_loss"]),
take_profit=float(row["take_profit"]),
margin_capital=margin_cap,
leverage=lev,
pnl_amount=pnl_amount,
hold_seconds=hold_seconds,
trade_style="trend_pullback",
risk_amount=risk_amt,
planned_rr=planned_rr,
actual_rr=m.calc_actual_rr(pnl_amount, risk_amt),
result=res,
opened_at=opened_at,
closed_at=closed_at,
)
if "trend_plan_id" in inspect.signature(m.insert_trade_record).parameters:
m.insert_trade_record(**kwargs, trend_plan_id=int(row["id"]))
else:
m.insert_trade_record(**kwargs)
st = (
"stopped_tp"
if result_label == "止盈"
else ("stopped_sl" if result_label == "止损" else "stopped_manual")
)
conn.execute(
"UPDATE trend_pullback_plans SET status=?, message=? WHERE id=?",
(st, res, row["id"]),
)
def check_trend_pullback_plans(cfg: dict) -> None:
m = _m(cfg)
ok_live, _ = m.ensure_exchange_live_ready()
if not ok_live:
return
conn = cfg["get_db"]()
rows = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active'"
).fetchall()
for row in rows:
try:
sym = row["symbol"]
direction = (row["direction"] or "long").lower()
ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(sym)
sl = float(row["stop_loss"])
tp = float(row["take_profit"])
lev = int(row["leverage"] or 1)
p = m.get_price(sym)
if not p:
continue
pf = float(p)
last_p = row["last_mark_price"]
last_pf = float(last_p) if last_p is not None else pf
pos = m.get_live_position_contracts(ex_sym, direction)
if pos is None:
continue
legs_done = int(row["legs_done"] or 0)
try:
leg_amounts = [float(x) for x in json.loads(row["leg_amounts_json"] or "[]")]
except Exception:
leg_amounts = []
try:
grid = json.loads(row["grid_prices_json"] or "[]")
except Exception:
grid = []
hit_tp = (direction == "long" and pf >= tp) or (direction == "short" and pf <= tp)
if hit_tp and pos > 0:
try:
close_resp = trend_market_close(cfg, ex_sym, direction, float(pos), lev)
exit_p = m.extract_trade_price_from_order(close_resp) or pf
except Exception as e:
if not m.is_no_position_error(str(e)):
continue
exit_p = pf
_finalize_plan(cfg, conn, row, "止盈", exit_p)
continue
if pos <= 0 and int(row["first_order_done"] or 0):
_finalize_plan(cfg, conn, row, "止损", pf)
continue
if int(row["first_order_done"] or 0) and legs_done < len(grid) and legs_done < len(leg_amounts):
level = float(grid[legs_done])
fired = False
if direction == "long":
fired = last_pf > level and pf <= level
else:
fired = last_pf < level and pf >= level
if fired:
amt = float(m.exchange.amount_to_precision(ex_sym, leg_amounts[legs_done]))
if amt > 0:
add_resp = trend_market_add(cfg, ex_sym, direction, amt, lev)
fill_px = m.extract_trade_price_from_order(add_resp) or pf
old_avg = float(row["avg_entry_price"] or fill_px)
old_open = float(row["order_amount_open"] or 0)
new_avg = _weighted_avg(old_avg, old_open, fill_px, amt)
conn.execute(
"UPDATE trend_pullback_plans SET legs_done=?, avg_entry_price=?, "
"order_amount_open=?, last_mark_price=? WHERE id=?",
(legs_done + 1, new_avg, old_open + amt, pf, row["id"]),
)
row = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=?", (row["id"],)
).fetchone()
try:
trend_refresh_stop_only(cfg, ex_sym, direction, sl)
except Exception:
pass
conn.execute(
"UPDATE trend_pullback_plans SET last_mark_price=? WHERE id=?",
(pf, row["id"]),
)
except Exception:
continue
conn.commit()
conn.close()
def apply_manual_breakeven(cfg: dict, conn, row, offset_pct=None) -> tuple[bool, Optional[str]]:
m = _m(cfg)
if (row["status"] or "").strip() != "active":
return False, "计划已结束"
if not int(row["first_order_done"] or 0):
return False, "尚未完成首仓,无法保本"
avg_e = float(row["avg_entry_price"] or 0)
if avg_e <= 0:
return False, "缺少有效持仓均价"
direction = (row["direction"] or "long").lower()
ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(row["symbol"])
pos = m.get_live_position_contracts(ex_sym, direction)
if pos is None or float(pos) <= 0:
return False, "交易所当前无该方向持仓"
be_fn = getattr(m, "calc_trend_manual_breakeven_stop", None)
if not callable(be_fn):
pct = float(offset_pct if offset_pct is not None else cfg["breakeven_offset_pct"])
if direction == "short":
new_sl_raw = avg_e * (1.0 - pct / 100.0)
else:
new_sl_raw = avg_e * (1.0 + pct / 100.0)
else:
new_sl_raw = be_fn(direction, avg_e, offset_pct)
if new_sl_raw is None:
return False, "保本价计算失败"
new_sl = m.round_price_to_exchange(ex_sym, new_sl_raw)
if new_sl is None:
return False, "保本价经交易所精度舍入后无效"
new_sl = float(new_sl)
cur_sl = float(row["stop_loss"] or 0)
if direction == "long":
if new_sl <= cur_sl:
return False, f"新止损 {new_sl} 未高于当前止损 {cur_sl}(多仓需上移)"
else:
if new_sl >= cur_sl:
return False, f"新止损 {new_sl} 未低于当前止损 {cur_sl}(空仓需下移)"
try:
trend_refresh_stop_only(cfg, ex_sym, direction, new_sl)
except Exception as e:
fe = getattr(m, "friendly_exchange_error", None)
return False, fe(e) if callable(fe) else str(e)
conn.execute(
"UPDATE trend_pullback_plans SET stop_loss=?, breakeven_applied=1, breakeven_applied_at=? WHERE id=?",
(new_sl, m.app_now_str(), row["id"]),
)
return True, None
def load_trend_page_context(conn, request_obj, cfg: dict) -> dict[str, Any]:
m = _m(cfg)
_cleanup_stale_previews(conn)
trend_active = int(
conn.execute(
"SELECT COUNT(*) FROM trend_pullback_plans WHERE status='active'"
).fetchone()[0]
or 0
)
trend_plans = []
for r in conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC"
).fetchall():
try:
trend_plans.append(enrich_trend_plan(cfg, r))
except Exception:
trend_plans.append(_row(cfg, r))
now = m.app_now()
active_count = m.get_active_position_count(conn)
can_trade_trend = (
m.trading_day_reset_allows_new_open(now)
and active_count < cfg["max_active_positions"]
and trend_active == 0
)
trend_preview = None
trend_preview_levels = []
preview_expires_ms = None
trend_preview_expired = False
pid_arg = (request_obj.args.get("preview_id") or "").strip()
if pid_arg:
pr = conn.execute(
"SELECT * FROM trend_pullback_previews WHERE id=?", (pid_arg,)
).fetchone()
now_ms = int(time.time() * 1000)
if pr and int(pr["expires_at_ms"] or 0) >= now_ms:
trend_preview = _row(cfg, pr)
preview_expires_ms = int(pr["expires_at_ms"])
try:
grid = json.loads(trend_preview.get("grid_prices_json") or "[]")
legs = json.loads(trend_preview.get("leg_amounts_json") or "[]")
except Exception:
grid, legs = [], []
for i, pair in enumerate(zip(grid, legs), 1):
trend_preview_levels.append({"i": i, "price": pair[0], "contracts": pair[1]})
elif pr:
trend_preview_expired = True
return {
"trend_plans": trend_plans,
"trend_active": trend_active,
"can_trade_trend": can_trade_trend,
"trend_preview": trend_preview,
"trend_preview_levels": trend_preview_levels,
"preview_expires_ms": preview_expires_ms,
"trend_preview_expired": trend_preview_expired,
"trend_pullback_dca_legs": cfg["dca_legs"],
"trend_pullback_preview_ttl": cfg["preview_ttl"],
"trend_preview_max_drift_pct": cfg["drift_pct"],
"trend_manual_breakeven_offset_pct": cfg["breakeven_offset_pct"],
}
def register_trend_routes(app: Flask, cfg: dict) -> None:
lr = cfg["login_required"]
get_db = cfg["get_db"]
def _redirect_trend(**kw):
return redirect(url_for("strategy_trend_page", **kw))
@app.route("/preview_trend_pullback", methods=["POST"])
@lr
def preview_trend_pullback():
conn = get_db()
init_strategy_tables(conn)
okp, msg = precheck_trend_start(cfg, conn)
if not okp:
conn.close()
flash(msg)
return _redirect_trend()
m = _m(cfg)
ok_live, reason = m.ensure_exchange_live_ready()
if not ok_live:
conn.close()
flash(reason)
return _redirect_trend()
payload, err = parse_trend_plan(cfg, request.form)
if err:
conn.close()
flash(err)
return _redirect_trend()
pid = str(uuid.uuid4())
exp_ms = int(time.time() * 1000) + cfg["preview_ttl"] * 1000
created = m.app_now_str()
conn.execute(
"""INSERT INTO trend_pullback_previews (
id,symbol,exchange_symbol,direction,leverage,stop_loss,add_upper,take_profit,risk_percent,
snapshot_available_usdt,snapshot_at,live_price_ref,plan_margin_capital,target_order_amount,first_order_amount,remainder_total,
dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,expires_at_ms,created_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
pid,
payload["symbol"],
payload["exchange_symbol"],
payload["direction"],
payload["leverage"],
payload["stop_loss"],
payload["add_upper"],
payload["take_profit"],
payload["risk_percent"],
payload["snapshot_available_usdt"],
payload["snapshot_at"],
payload["live_price_ref"],
payload["plan_margin_capital"],
payload["target_order_amount"],
payload["first_order_amount"],
payload["remainder_total"],
payload["dca_legs"],
payload["per_leg_amount"],
payload["grid_prices_json"],
payload["leg_amounts_json"],
exp_ms,
created,
),
)
_insert_preview_snapshot(conn, pid, created, exp_ms, payload)
conn.commit()
conn.close()
flash(f"预览已生成,有效期 {cfg['preview_ttl']} 秒,请核对后点击「确认执行」。")
return _redirect_trend(preview_id=pid)
@app.route("/execute_trend_pullback", methods=["POST"])
@lr
def execute_trend_pullback():
pid = (request.form.get("preview_id") or "").strip()
if not pid:
flash("缺少预览 ID")
return _redirect_trend()
conn = get_db()
init_strategy_tables(conn)
_cleanup_stale_previews(conn)
pr = conn.execute(
"SELECT * FROM trend_pullback_previews WHERE id=?", (pid,)
).fetchone()
now_ms = int(time.time() * 1000)
if not pr or int(pr["expires_at_ms"] or 0) < now_ms:
conn.close()
flash("预览已过期或不存在,请重新生成预览")
return _redirect_trend()
okp, msg = precheck_trend_start(cfg, conn)
if not okp:
conn.close()
flash(msg)
return _redirect_trend(preview_id=pid)
m = _m(cfg)
ok_live, reason = m.ensure_exchange_live_ready()
if not ok_live:
conn.close()
flash(reason)
return _redirect_trend(preview_id=pid)
snap_prev = float(pr["snapshot_available_usdt"] or 0)
snap_now = m.get_available_trading_usdt()
if snap_now is None or snap_now <= 0:
conn.close()
flash("无法读取当前合约可用余额,请稍后重试")
return _redirect_trend(preview_id=pid)
drift = abs(float(snap_now) - snap_prev) / max(snap_prev, 1e-9) * 100.0
if drift > cfg["drift_pct"]:
conn.close()
flash(
f"当前可用余额与预览快照偏差 {drift:.2f}%,超过允许 {cfg['drift_pct']}%,请重新生成预览"
)
return _redirect_trend(preview_id=pid)
symbol = pr["symbol"]
exchange_symbol = pr["exchange_symbol"]
direction = pr["direction"] or "long"
leverage = int(pr["leverage"] or 1)
stop_loss = float(pr["stop_loss"])
first_amt = float(pr["first_order_amount"] or 0)
live_price = m.get_price(symbol)
if live_price is None:
conn.close()
flash("获取实时价格失败")
return _redirect_trend(preview_id=pid)
try:
o1 = m.place_exchange_order(
exchange_symbol, direction, first_amt, leverage, stop_loss=None, take_profit=None
)
fill1 = m.resolve_order_entry_price(o1, exchange_symbol, live_price)
trend_refresh_stop_only(cfg, exchange_symbol, direction, stop_loss)
except Exception as e:
conn.close()
fe = getattr(m, "friendly_exchange_error", lambda x, **k: str(x))
flash(fe(e, available_usdt=snap_now))
return _redirect_trend(preview_id=pid)
trading_day = m.get_trading_day(m.app_now())
opened_at = m.app_now_str()
opened_ms = getattr(m, "_to_ms_with_fallback", lambda a, b: None)(None, opened_at)
cur = conn.execute(
"""INSERT INTO trend_pullback_plans (
status,symbol,exchange_symbol,direction,leverage,stop_loss,initial_stop_loss,add_upper,take_profit,risk_percent,
snapshot_available_usdt,snapshot_at,plan_margin_capital,target_order_amount,first_order_amount,remainder_total,
dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,legs_done,first_order_done,last_mark_price,avg_entry_price,order_amount_open,opened_at,opened_at_ms,session_date,message
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
"active",
symbol,
exchange_symbol,
direction,
leverage,
stop_loss,
stop_loss,
float(pr["add_upper"]),
float(pr["take_profit"]),
float(pr["risk_percent"] or 5),
float(snap_now),
opened_at,
float(pr["plan_margin_capital"] or 0),
float(pr["target_order_amount"] or 0),
first_amt,
float(pr["remainder_total"] or 0),
int(pr["dca_legs"] or 0),
float(pr["per_leg_amount"] or 0),
pr["grid_prices_json"] or "[]",
pr["leg_amounts_json"] or "[]",
0,
1,
float(live_price),
fill1,
first_amt,
opened_at,
opened_ms,
trading_day,
f"预览ID:{pid[:8]}",
),
)
new_id = int(cur.lastrowid)
conn.execute(
"UPDATE trend_pullback_preview_snapshots SET outcome='executed', executed_plan_id=? WHERE preview_id=?",
(new_id, pid),
)
conn.execute("DELETE FROM trend_pullback_previews WHERE id=?", (pid,))
conn.commit()
conn.close()
flash("趋势回调已执行:首仓已成交并挂交易所止损,止盈由程序监控。")
return _redirect_trend()
@app.route("/cancel_trend_pullback_preview", methods=["POST"])
@lr
def cancel_trend_pullback_preview():
pid = (request.form.get("preview_id") or "").strip()
conn = get_db()
if pid:
conn.execute(
"UPDATE trend_pullback_preview_snapshots SET outcome='cancelled' WHERE preview_id=? AND outcome='open'",
(pid,),
)
conn.execute("DELETE FROM trend_pullback_previews WHERE id=?", (pid,))
conn.commit()
conn.close()
flash("已取消预览")
return _redirect_trend()
@app.route("/trend_pullback_breakeven/<int:pid>", methods=["POST"])
@lr
def trend_pullback_breakeven(pid: int):
offset_pct = None
raw = (request.form.get("breakeven_offset_pct") or "").strip()
if raw:
try:
offset_pct = float(raw)
if offset_pct < 0:
raise ValueError
except ValueError:
flash("保本偏移% 格式无效")
return _redirect_trend()
conn = get_db()
row = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,)
).fetchone()
if not row:
conn.close()
flash("未找到运行中的趋势回调计划")
return _redirect_trend()
ok, err = apply_manual_breakeven(cfg, conn, row, offset_pct=offset_pct)
conn.commit()
conn.close()
flash("已手动保本" if ok else (err or "手动保本失败"))
return _redirect_trend()
@app.route("/stop_trend_pullback/<int:pid>")
@lr
def stop_trend_pullback(pid: int):
conn = get_db()
row = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,)
).fetchone()
if not row:
conn.close()
flash("未找到运行中的趋势回调计划")
return redirect("/trade")
m = _m(cfg)
ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(row["symbol"])
direction = row["direction"] or "long"
lev = int(row["leverage"] or 1)
px = m.get_price(row["symbol"])
exit_p = float(px) if px is not None else 0.0
ok_live, _ = m.ensure_exchange_live_ready()
if ok_live:
pos = m.get_live_position_contracts(ex_sym, direction)
if pos is not None and pos > 0:
try:
close_resp = trend_market_close(cfg, ex_sym, direction, float(pos), lev)
ep = m.extract_trade_price_from_order(close_resp)
if ep:
exit_p = float(ep)
except Exception as e:
if not m.is_no_position_error(str(e)):
conn.close()
flash(f"平仓失败:{e}")
return redirect("/trade")
try:
cancel_symbol_orders(cfg, ex_sym)
except Exception:
pass
_finalize_plan(cfg, conn, row, "手动平仓", exit_p)
conn.commit()
conn.close()
flash("已结束趋势回调计划")
return redirect("/trade")