Files
crypto_monitor/strategy_trend_register.py
T
dekun ed669fab80 fix(hub): show trend plan leverage, base, ratio, mark and floating PnL
Position and trend plan cards read sizing from trend_pullback_plans; merge agent mark/PnL; compute position_ratio_pct in hub enrich.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-04 10:13:44 +08:00

1285 lines
49 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""趋势回调:路由、轮询、页面数据(四所共用,依赖各 app 模块交易所能力)。"""
from __future__ import annotations
import inspect
import json
import os
import time
import uuid
from typing import Any, Optional
from flask import Flask, flash, redirect, request, url_for
from jinja2 import ChoiceLoader, FileSystemLoader
from strategy_config import resolve_trading_app_module
from strategy_db import init_strategy_tables
from strategy_trend_exchange import (
cancel_symbol_orders,
trend_market_add,
trend_market_close,
trend_refresh_stop_only,
trend_replace_tpsl,
)
from strategy_trend_lib import (
build_grid_prices,
build_leg_amounts_json,
calc_risk_fraction,
validate_trend_bounds,
)
from strategy_trade_labels import (
ENTRY_REASON_TREND_PULLBACK,
MONITOR_TYPE_TREND_PULLBACK,
TREND_HANDOFF_KEY_SIGNAL,
TREND_HANDOFF_TRADE_NOTE,
)
MONITOR_TYPE_TREND = MONITOR_TYPE_TREND_PULLBACK
# 趋势回调:交易所报空仓需连续 N 次轮询确认,避免 OKX 等 API 瞬时误判立即结束计划
_TREND_FLAT_STREAK: dict[int, int] = {}
TREND_FLAT_CONFIRM_POLLS = max(1, int(os.getenv("TREND_FLAT_CONFIRM_POLLS", "5")))
TREND_OPEN_GRACE_SEC = max(0, int(os.getenv("TREND_OPEN_GRACE_SEC", "180")))
def trend_add_zone_label(direction: str) -> str:
return "补仓下沿" if (direction or "long").strip().lower() == "short" else "补仓上沿"
def install_strategy_trend(app: Flask, repo_root: str, app_module: Any = None, **build_kw) -> dict:
from strategy_register import attach_strategy_templates
attach_strategy_templates(app, repo_root)
cfg = build_trend_config(app_module, **build_kw)
app.extensions["strategy_trend_cfg"] = cfg
register_trend_routes(app, cfg)
_patch_hub_monitor_enrich(app, cfg)
@app.context_processor
def _trend_ctx():
return {"trend_add_zone_label": trend_add_zone_label}
return cfg
def build_trend_config(app_module: Any = None, **kw) -> dict[str, Any]:
m = resolve_trading_app_module(app_module)
dca = max(1, int(os.getenv("TREND_PULLBACK_DCA_LEGS", kw.get("dca_legs", "5"))))
preview_ttl = max(10, int(os.getenv("TREND_PULLBACK_PREVIEW_TTL_SECONDS", "120")))
drift = float(os.getenv("TREND_PREVIEW_MAX_BALANCE_DRIFT_PCT", "5"))
be_pct = float(os.getenv("TREND_PULLBACK_MANUAL_BREAKEVEN_OFFSET_PCT", "0.3"))
buf = float(getattr(m, "FULL_MARGIN_BUFFER_RATIO", 0.95))
def amount_precise(ex_sym, amt):
fn = getattr(m, "_safe_amount_to_precision", None)
if callable(fn):
return fn(ex_sym, amt)
try:
m.ensure_markets_loaded()
return float(m.exchange.amount_to_precision(ex_sym, float(amt)))
except Exception:
return None
return {
"app_module": m,
"login_required": m.login_required,
"get_db": m.get_db,
"row_to_dict": m.row_to_dict,
"dca_legs": dca,
"preview_ttl": preview_ttl,
"drift_pct": drift,
"breakeven_offset_pct": be_pct,
"margin_buffer": buf,
"amount_precise": amount_precise,
"max_active_positions": int(getattr(m, "MAX_ACTIVE_POSITIONS", 1)),
"reset_hour": int(getattr(m, "TRADING_DAY_RESET_HOUR", 8)),
"monitor_type_trend": MONITOR_TYPE_TREND,
}
def _m(cfg: dict):
return cfg["app_module"]
def _row(cfg, row) -> dict:
return cfg["row_to_dict"](row)
def precheck_trend_start(cfg: dict, conn) -> tuple[bool, str]:
m = _m(cfg)
mode = getattr(m, "POSITION_SIZING_MODE", None) or "risk"
try:
from position_sizing_lib import OPEN_SOURCE_TREND, assert_open_source_allowed
ok_src, src_msg = assert_open_source_allowed(mode, OPEN_SOURCE_TREND)
if not ok_src:
return False, src_msg
except Exception:
pass
now = m.app_now()
if not m.trading_day_reset_allows_new_open(now):
return False, f"北京时间 {cfg['reset_hour']}:00 前不允许持仓"
active = m.get_active_position_count(conn)
if active >= cfg["max_active_positions"]:
return (
False,
f"已达最大持仓数({active}/{cfg['max_active_positions']}),"
"请先结束「实盘下单」中的持仓,再启动趋势回调",
)
trend_n = conn.execute(
"SELECT COUNT(*) FROM trend_pullback_plans WHERE status='active'"
).fetchone()[0]
if int(trend_n or 0) > 0:
return False, "已存在运行中的趋势回调计划"
return True, ""
def _cleanup_stale_previews(conn) -> None:
ms = int(time.time() * 1000)
stale = conn.execute(
"SELECT id FROM trend_pullback_previews WHERE expires_at_ms < ?", (ms,)
).fetchall()
for row in stale:
try:
conn.execute(
"UPDATE trend_pullback_preview_snapshots SET outcome='expired' "
"WHERE preview_id=? AND outcome='open'",
(row["id"],),
)
except Exception:
pass
conn.execute("DELETE FROM trend_pullback_previews WHERE expires_at_ms < ?", (ms,))
def parse_trend_plan(cfg: dict, form_dict) -> tuple[Optional[dict], Optional[str]]:
m = _m(cfg)
d = form_dict or {}
symbol = m.normalize_symbol_input(d.get("symbol"))
if not symbol:
return None, "symbol 不能为空"
direction = (d.get("direction") or "long").strip().lower()
if direction not in ("long", "short"):
return None, "方向错误"
try:
stop_loss = float(d.get("sl"))
add_upper = float(d.get("add_upper"))
take_profit = float(d.get("take_profit"))
risk_percent = float(d.get("risk_percent") or "5")
except Exception:
return None, "价格或风险比例格式错误"
try:
lev_raw = m.parse_positive_float(d.get("leverage"))
leverage = int(lev_raw) if lev_raw is not None else m.infer_leverage(symbol)
except Exception:
return None, "杠杆格式错误"
if leverage <= 0 or risk_percent <= 0:
return None, "杠杆与风险比例必须大于0"
bound_err = validate_trend_bounds(direction, stop_loss, add_upper)
if bound_err:
return None, bound_err
snap = m.get_available_trading_usdt()
if snap is None or snap <= 0:
return None, "无法读取合约账户 USDT 可用余额,请检查 API 与账户类型"
live_price = m.get_price(symbol)
if live_price is None:
return None, "获取实时价格失败"
exchange_symbol = m.normalize_exchange_symbol(symbol)
rf = calc_risk_fraction(direction, add_upper, stop_loss)
if rf is None or rf <= 0:
return None, "止损与补仓区间边界组合无法计算风险比例"
risk_budget = float(snap) * (risk_percent / 100.0)
notional = risk_budget / rf
margin_plan = notional / float(leverage)
margin_plan = min(margin_plan, float(snap) * cfg["margin_buffer"])
if margin_plan <= 0:
return None, "计划保证金过小"
try:
target_amt, _ = m.prepare_order_amount(exchange_symbol, margin_plan, leverage, live_price)
except Exception as e:
return None, str(e)
ap = cfg["amount_precise"]
first_amt = ap(exchange_symbol, float(target_amt) * 0.5)
if first_amt is None or first_amt <= 0:
return None, "首仓张数过小(低于交易所最小张数),请提高风险比例或杠杆"
remainder_total = ap(exchange_symbol, max(0.0, float(target_amt) - float(first_amt)))
if remainder_total is None:
remainder_total = 0.0
m.ensure_markets_loaded()
market = m.exchange.market(exchange_symbol)
min_amt = float((market.get("limits", {}).get("amount", {}) or {}).get("min") or 0)
n_legs, leg_json, per_ref = build_leg_amounts_json(
exchange_symbol, remainder_total, cfg["dca_legs"], ap, min_amt
)
if n_legs <= 0:
return None, "剩余计划张数不足以拆出补仓档,请提高风险比例或放宽止损与补仓区间间距"
grid = build_grid_prices(direction, stop_loss, add_upper, n_legs)
if len(grid) != n_legs:
return None, "补仓网格生成失败"
opened_at = m.app_now_str()
try:
leg_list = json.loads(leg_json)
except Exception:
leg_list = []
return {
"symbol": symbol,
"exchange_symbol": exchange_symbol,
"direction": direction,
"leverage": leverage,
"stop_loss": stop_loss,
"add_upper": add_upper,
"take_profit": take_profit,
"risk_percent": risk_percent,
"snapshot_available_usdt": float(snap),
"snapshot_at": opened_at,
"live_price_ref": float(live_price),
"plan_margin_capital": float(margin_plan),
"target_order_amount": float(target_amt),
"first_order_amount": float(first_amt),
"remainder_total": float(remainder_total),
"dca_legs": int(n_legs),
"per_leg_amount": float(per_ref),
"grid_prices_json": json.dumps(grid),
"leg_amounts_json": leg_json,
"grid": grid,
"leg_amounts": leg_list,
}, None
def _insert_preview_snapshot(conn, preview_id: str, created: str, exp_ms: int, pl: dict) -> None:
conn.execute(
"""INSERT INTO trend_pullback_preview_snapshots (
preview_id,symbol,exchange_symbol,direction,leverage,stop_loss,add_upper,take_profit,risk_percent,
snapshot_available_usdt,snapshot_at,live_price_ref,plan_margin_capital,target_order_amount,first_order_amount,remainder_total,
dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,expires_at_ms,preview_created_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
preview_id,
pl["symbol"],
pl["exchange_symbol"],
pl["direction"],
pl["leverage"],
pl["stop_loss"],
pl["add_upper"],
pl["take_profit"],
pl["risk_percent"],
pl["snapshot_available_usdt"],
pl["snapshot_at"],
pl["live_price_ref"],
pl["plan_margin_capital"],
pl["target_order_amount"],
pl["first_order_amount"],
pl["remainder_total"],
pl["dca_legs"],
pl["per_leg_amount"],
pl["grid_prices_json"],
pl["leg_amounts_json"],
exp_ms,
created,
),
)
def _format_trend_price(cfg: dict, symbol: str, value) -> str:
if value in (None, ""):
return ""
m = _m(cfg)
sym = symbol or ""
norm = getattr(m, "normalize_exchange_symbol", None)
if callable(norm):
try:
sym = norm(sym) or sym
except Exception:
pass
try:
m.ensure_markets_loaded()
return str(m.exchange.price_to_precision(sym, float(value)))
except Exception:
fn = getattr(m, "format_price_for_symbol", None)
if callable(fn):
return fn(symbol, value)
return str(value)
def _trend_add_leg_fields(cfg: dict, d: dict) -> dict:
"""解析已补仓次数与已触达网格价(供策略页与中控 monitor 共用)。"""
import json
out = dict(d)
try:
legs_done = int(out.get("legs_done") or 0)
except (TypeError, ValueError):
legs_done = 0
try:
dca_legs = int(out.get("dca_legs") or 0)
except (TypeError, ValueError):
dca_legs = 0
try:
grid = json.loads(out.get("grid_prices_json") or "[]")
if not isinstance(grid, list):
grid = []
except Exception:
grid = []
add_prices: list[float] = []
for x in grid[:legs_done]:
try:
add_prices.append(float(x))
except (TypeError, ValueError):
pass
sym = out.get("exchange_symbol") or out.get("symbol") or ""
out["add_count"] = legs_done
out["add_count_total"] = dca_legs
out["add_prices"] = add_prices
out["add_prices_display"] = [_format_trend_price(cfg, sym, p) for p in add_prices]
for field in ("stop_loss", "take_profit", "add_upper", "avg_entry_price"):
if out.get(field) not in (None, ""):
out[f"{field}_display"] = _format_trend_price(cfg, sym, out.get(field))
return out
def enrich_trend_plan_for_hub(cfg: dict, raw: dict) -> dict:
"""中控 /api/hub/monitor:与策略页运行中计划卡片同字段(浮盈亏、标记价、盈亏比等)。"""
d = enrich_trend_plan(cfg, dict(raw or {}))
d["monitor_source"] = "趋势回调计划"
m = _m(cfg)
direction = (d.get("direction") or "long").lower()
try:
avg_e = float(d["avg_entry_price"])
sl = float(d["stop_loss"])
tp = float(d["take_profit"])
rr_fn = getattr(m, "calc_rr_ratio", None)
if callable(rr_fn):
rr = rr_fn(direction, avg_e, sl, tp)
if rr is not None:
d["planned_rr"] = float(rr)
except (TypeError, ValueError, KeyError):
pass
try:
snap = float(d.get("snapshot_available_usdt") or 0)
margin = float(d.get("plan_margin_capital") or 0)
if snap > 0 and margin > 0:
d["position_ratio_pct"] = round(margin / snap * 100.0, 2)
except (TypeError, ValueError):
pass
return d
def _patch_hub_monitor_enrich(app: Flask, cfg: dict) -> None:
ctx = dict(app.config.get("HUB_CTX") or {})
prev = ctx.get("enrich_monitor")
def enrich_monitor(keys=None, orders=None, trends=None, rolls=None):
payload: dict[str, Any] = {}
if callable(prev):
try:
prev_out = prev(keys=keys, orders=orders, trends=trends, rolls=rolls)
if isinstance(prev_out, dict):
payload.update(prev_out)
except Exception:
pass
if trends:
payload["trends"] = [
enrich_trend_plan_for_hub(cfg, t) for t in trends if isinstance(t, dict)
]
return payload
ctx["enrich_monitor"] = enrich_monitor
app.config["HUB_CTX"] = ctx
def enrich_trend_plan(cfg: dict, row) -> dict:
m = _m(cfg)
d = _row(cfg, row)
d = _trend_add_leg_fields(cfg, d)
try:
d["breakeven_applied"] = int(d.get("breakeven_applied") or 0) != 0
except Exception:
d["breakeven_applied"] = False
ex_sym = d.get("exchange_symbol") or m.normalize_exchange_symbol(d.get("symbol") or "")
direction = (d.get("direction") or "long").lower()
metrics_fn = getattr(m, "get_live_position_exchange_metrics", None)
if callable(metrics_fn):
try:
lev = int(d.get("leverage") or 0) or None
except (TypeError, ValueError):
lev = None
try:
met = metrics_fn(ex_sym, direction, order_leverage=lev)
except TypeError:
met = metrics_fn(ex_sym, direction)
if met and met.get("unrealized_pnl") is not None:
d["floating_pnl"] = float(met["unrealized_pnl"])
elif (
met
and met.get("mark_price") is not None
and d.get("avg_entry_price") is not None
):
try:
entry = float(d["avg_entry_price"])
mark = float(met["mark_price"])
margin = float(d.get("plan_margin_capital") or 0)
leverage = int(d.get("leverage") or 1)
calc_pnl = getattr(m, "calc_pnl", None)
if callable(calc_pnl) and entry > 0 and margin > 0:
d["floating_pnl"] = float(
calc_pnl(direction, entry, mark, margin, leverage)
)
else:
d["floating_pnl"] = None
except (TypeError, ValueError):
d["floating_pnl"] = None
else:
d["floating_pnl"] = None
if met and met.get("mark_price") is not None:
d["floating_mark"] = float(met["mark_price"])
else:
d["floating_mark"] = None
else:
d["floating_pnl"] = d["floating_mark"] = None
return d
def _weighted_avg(old_avg, old_amt, fill_px, add_amt):
try:
oa, aa = float(old_amt), float(add_amt)
if oa <= 0:
return float(fill_px)
return (float(old_avg) * oa + float(fill_px) * aa) / (oa + aa)
except Exception:
return float(fill_px or 0)
def _plan_stop_status(result_label: str) -> str:
if result_label == "止盈":
return "stopped_tp"
if result_label == "止损":
return "stopped_sl"
return "stopped_manual"
def _trend_plan_trade_exists(conn, plan_id: int) -> bool:
try:
return conn.execute(
"SELECT id FROM trade_records WHERE trend_plan_id=? LIMIT 1",
(int(plan_id),),
).fetchone() is not None
except Exception:
return False
def _finalize_plan(cfg: dict, conn, row, result_label: str, exit_price: float) -> None:
m = _m(cfg)
plan_id = int(row["id"])
active = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'",
(plan_id,),
).fetchone()
if not active:
return
row = active
sym = row["symbol"]
direction = row["direction"] or "long"
ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(sym)
closed_at = m.app_now_str()
opened_at = row["opened_at"] or closed_at
hold_seconds = m.calc_hold_seconds(opened_at, m.parse_dt_for_trading_day(closed_at) or m.app_now())
margin_cap = float(row["plan_margin_capital"] or 0)
lev = int(row["leverage"] or 1)
avg_e = float(row["avg_entry_price"] or 0)
pnl_amount = m.calc_pnl(direction, avg_e, float(exit_price), margin_cap, lev)
res = m.normalize_result_with_pnl(result_label, pnl_amount)
risk_amt = m.calc_risk_amount_from_plan(
direction, float(row["add_upper"]), float(row["stop_loss"]), margin_cap, lev
)
planned_rr = m.calc_rr_ratio(direction, avg_e, float(row["stop_loss"]), float(row["take_profit"]))
try:
cancel_symbol_orders(cfg, ex_sym)
except Exception:
pass
st = _plan_stop_status(result_label)
cur = conn.execute(
"UPDATE trend_pullback_plans SET status=?, message=? WHERE id=? AND status='active'",
(st, res, plan_id),
)
if not getattr(cur, "rowcount", 0):
return
conn.commit()
if _trend_plan_trade_exists(conn, plan_id):
return
session_date = row["session_date"] or m.get_trading_day()
session_capital = m.update_session_capital(conn, session_date, pnl_amount)
kwargs = dict(
conn=conn,
symbol=sym,
monitor_type=MONITOR_TYPE_TREND,
direction=direction,
trigger_price=avg_e,
stop_loss=float(row["stop_loss"]),
initial_stop_loss=float(row.get("initial_stop_loss") or row["stop_loss"]),
take_profit=float(row["take_profit"]),
margin_capital=margin_cap,
leverage=lev,
pnl_amount=pnl_amount,
hold_seconds=hold_seconds,
trade_style="trend_pullback",
risk_amount=risk_amt,
planned_rr=planned_rr,
actual_rr=m.calc_actual_rr(pnl_amount, risk_amt),
result=res,
opened_at=opened_at,
closed_at=closed_at,
entry_reason=ENTRY_REASON_TREND_PULLBACK,
)
if "trend_plan_id" in inspect.signature(m.insert_trade_record).parameters:
m.insert_trade_record(**kwargs, trend_plan_id=plan_id)
else:
m.insert_trade_record(**kwargs)
extra = getattr(m, "build_wechat_close_message", None)
send = getattr(m, "send_wechat_msg", None)
if callable(extra) and callable(send):
send(
extra(
symbol=sym,
direction=direction,
result=f"{res}{MONITOR_TYPE_TREND}",
pnl_amount=pnl_amount,
hold_seconds=hold_seconds,
trigger_price=avg_e,
current_price=float(exit_price),
stop_loss=float(row["stop_loss"]),
take_profit=float(row["take_profit"]),
close_order_id="-",
extra_note="计划本金口径:启动时合约可用余额快照;止盈由程序监控",
session_capital_fallback=session_capital,
)
)
conn.commit()
def _trend_plan_open_age_sec(row, m) -> float:
opened_ms = None
try:
if "opened_at_ms" in row.keys() and row["opened_at_ms"]:
opened_ms = int(row["opened_at_ms"])
except Exception:
opened_ms = None
to_ms = getattr(m, "_to_ms_with_fallback", None)
if callable(to_ms):
opened_ms = to_ms(opened_ms, row["opened_at"] if "opened_at" in row.keys() else None)
if opened_ms is None and "opened_at" in row.keys():
opened_ms = to_ms(None, row["opened_at"])
if not opened_ms:
return 0.0
return max(0.0, (time.time() * 1000 - opened_ms) / 1000.0)
def _trend_hit_take_profit(direction: str, mark_price: float, take_profit: float, avg_entry: float) -> bool:
try:
pf = float(mark_price)
tp = float(take_profit)
entry = float(avg_entry)
except (TypeError, ValueError):
return False
if entry <= 0 or tp <= 0:
return False
direction = (direction or "long").lower()
if direction == "long":
return tp > entry and pf >= tp
return tp < entry and pf <= tp
def _should_finalize_trend_flat(row, pos, plan_id: int, m) -> bool:
"""首仓后交易所报无仓:需过开仓宽限期 + 连续空仓轮询,避免误判止损。"""
if pos is None:
return False
if float(pos) > 0:
_TREND_FLAT_STREAK.pop(plan_id, None)
return False
if not int(row["first_order_done"] or 0):
return False
age = _trend_plan_open_age_sec(row, m)
if age < TREND_OPEN_GRACE_SEC:
_TREND_FLAT_STREAK.pop(plan_id, None)
return False
try:
local_open = float(row["order_amount_open"] or 0)
except (TypeError, ValueError):
local_open = 0.0
required = TREND_FLAT_CONFIRM_POLLS
if local_open > 0 and age < TREND_OPEN_GRACE_SEC * 2:
required = max(required, TREND_FLAT_CONFIRM_POLLS * 2)
streak = int(_TREND_FLAT_STREAK.get(plan_id, 0)) + 1
_TREND_FLAT_STREAK[plan_id] = streak
if streak >= required:
print(
f"[trend_pullback] flat finalize plan={plan_id} sym={row['symbol']} "
f"age={age:.0f}s streak={streak} local_open={local_open}",
flush=True,
)
return True
return False
def check_trend_pullback_plans(cfg: dict) -> None:
m = _m(cfg)
ok_live, _ = m.ensure_exchange_live_ready()
if not ok_live:
return
conn = cfg["get_db"]()
rows = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active'"
).fetchall()
for row in rows:
try:
plan_id = int(row["id"])
sym = row["symbol"]
direction = (row["direction"] or "long").lower()
ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(sym)
sl = float(row["stop_loss"])
tp = float(row["take_profit"])
lev = int(row["leverage"] or 1)
p = m.get_price(sym)
if not p:
continue
pf = float(p)
last_p = row["last_mark_price"]
last_pf = float(last_p) if last_p is not None else pf
pos = m.get_live_position_contracts(ex_sym, direction)
if pos is None:
continue
try:
local_open = float(row["order_amount_open"] or 0)
except (TypeError, ValueError):
local_open = 0.0
if float(pos) <= 0 and local_open > 0:
age = _trend_plan_open_age_sec(row, m)
if age < TREND_OPEN_GRACE_SEC * 2:
print(
f"[trend_pullback] pos fallback plan={plan_id} sym={sym} "
f"ex_pos=0 local_open={local_open} age={age:.0f}s",
flush=True,
)
pos = local_open
legs_done = int(row["legs_done"] or 0)
try:
leg_amounts = [float(x) for x in json.loads(row["leg_amounts_json"] or "[]")]
except Exception:
leg_amounts = []
try:
grid = json.loads(row["grid_prices_json"] or "[]")
except Exception:
grid = []
avg_e = float(row["avg_entry_price"] or pf or 0)
hit_tp = _trend_hit_take_profit(direction, pf, tp, avg_e)
if hit_tp and pos > 0:
try:
close_resp = trend_market_close(cfg, ex_sym, direction, float(pos), lev)
exit_p = m.extract_trade_price_from_order(close_resp) or pf
except Exception as e:
if not m.is_no_position_error(str(e)):
continue
exit_p = pf
_finalize_plan(cfg, conn, row, "止盈", exit_p)
_TREND_FLAT_STREAK.pop(plan_id, None)
continue
if _should_finalize_trend_flat(row, pos, plan_id, m):
_finalize_plan(cfg, conn, row, "止损", pf)
_TREND_FLAT_STREAK.pop(plan_id, None)
continue
if int(row["first_order_done"] or 0) and legs_done < len(grid) and legs_done < len(leg_amounts):
level = float(grid[legs_done])
fired = False
if direction == "long":
fired = last_pf > level and pf <= level
else:
fired = last_pf < level and pf >= level
if fired:
amt = float(m.exchange.amount_to_precision(ex_sym, leg_amounts[legs_done]))
if amt > 0:
add_resp = trend_market_add(cfg, ex_sym, direction, amt, lev)
fill_px = m.extract_trade_price_from_order(add_resp) or pf
old_avg = float(row["avg_entry_price"] or fill_px)
old_open = float(row["order_amount_open"] or 0)
new_avg = _weighted_avg(old_avg, old_open, fill_px, amt)
conn.execute(
"UPDATE trend_pullback_plans SET legs_done=?, avg_entry_price=?, "
"order_amount_open=?, last_mark_price=? WHERE id=?",
(legs_done + 1, new_avg, old_open + amt, pf, row["id"]),
)
row = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=?", (row["id"],)
).fetchone()
try:
trend_refresh_stop_only(cfg, ex_sym, direction, sl)
except Exception:
pass
conn.execute(
"UPDATE trend_pullback_plans SET last_mark_price=? WHERE id=?",
(pf, row["id"]),
)
except Exception:
continue
conn.commit()
conn.close()
TREND_PLAN_STATUS_HANDOFF = "stopped_handoff"
def _order_monitor_manual_type(m) -> str:
return getattr(m, "ORDER_MONITOR_TYPE_MANUAL", None) or "下单监控"
def _insert_trend_handoff_order_monitor(
cfg: dict,
conn,
plan_row,
*,
new_sl: float,
pos_amt: float,
) -> int:
m = _m(cfg)
sym = plan_row["symbol"]
direction = (plan_row["direction"] or "long").lower()
ex_sym = plan_row["exchange_symbol"] or m.normalize_exchange_symbol(sym)
plan_id = int(plan_row["id"])
avg_e = float(plan_row["avg_entry_price"] or 0)
tp = float(plan_row["take_profit"] or 0)
lev = int(plan_row["leverage"] or 1)
margin_cap = float(plan_row["plan_margin_capital"] or 0)
init_sl = float(
plan_row["initial_stop_loss"]
if plan_row["initial_stop_loss"] not in (None, "")
else plan_row["stop_loss"]
or 0
)
risk_pct = float(plan_row["risk_percent"] or 5)
risk_amt = None
calc_risk = getattr(m, "calc_risk_amount_from_plan", None)
if callable(calc_risk):
try:
risk_amt = calc_risk(direction, avg_e, init_sl, margin_cap, lev)
except Exception:
risk_amt = None
be_rr = float(getattr(m, "BREAKEVEN_RR_TRIGGER", 1) or 1)
be_off = float(getattr(m, "BREAKEVEN_OFFSET_PCT", 0.3) or 0.3)
be_step = float(getattr(m, "BREAKEVEN_STEP_R", 1) or 1)
if direction == "short":
be_price = round(avg_e * (1 - be_off / 100.0), 8)
else:
be_price = round(avg_e * (1 + be_off / 100.0), 8)
rp = getattr(m, "round_price_to_exchange", None)
if callable(rp):
try:
be_price = float(rp(ex_sym, be_price) or be_price)
except Exception:
pass
opened_at = plan_row["opened_at"] or m.app_now_str()
to_ms = getattr(m, "_to_ms_with_fallback", None)
opened_ms = to_ms(plan_row["opened_at_ms"] if "opened_at_ms" in plan_row.keys() else None, opened_at) if callable(to_ms) else None
trading_day = plan_row["session_date"] or getattr(m, "get_trading_day", lambda: None)()
if not trading_day and callable(getattr(m, "get_trading_day", None)):
trading_day = m.get_trading_day()
notional = margin_cap * lev if margin_cap and lev else None
monitor_type = MONITOR_TYPE_TREND_PULLBACK
conn.execute(
"INSERT INTO order_monitors "
"(symbol, exchange_symbol, direction, trigger_price, stop_loss, initial_stop_loss, take_profit, "
"margin_capital, leverage, trade_style, risk_percent, risk_amount, "
"breakeven_rr_trigger, breakeven_offset_pct, breakeven_step_r, breakeven_armed, breakeven_price, "
"breakeven_enabled, notional_value, position_ratio, base_amount, order_amount, exchange_order_id, "
"opened_at, opened_at_ms, session_date, monitor_type, key_signal_type, trend_plan_id) "
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)",
(
sym,
ex_sym,
direction,
avg_e,
new_sl,
init_sl,
tp,
margin_cap,
lev,
"trend_pullback_handoff",
risk_pct,
risk_amt,
be_rr,
be_off,
be_step,
0,
be_price,
0,
notional,
None,
None,
float(pos_amt),
"",
opened_at,
opened_ms,
trading_day,
monitor_type,
TREND_HANDOFF_KEY_SIGNAL,
plan_id,
),
)
new_id = int(conn.execute("SELECT last_insert_rowid()").fetchone()[0])
persist = getattr(m, "try_persist_exchange_margin_for_order", None)
if callable(persist):
try:
persist(conn, new_id, ex_sym, direction, order_leverage=lev)
except Exception:
pass
return new_id
def apply_manual_breakeven(cfg: dict, conn, row, offset_pct=None) -> tuple[bool, Optional[str]]:
"""保本:结束趋势计划,持仓移交下单监控(备注趋势回调),交易所同时挂保本止损与止盈。"""
m = _m(cfg)
if (row["status"] or "").strip() != "active":
return False, "计划已结束"
if not int(row["first_order_done"] or 0):
return False, "尚未完成首仓,无法保本"
avg_e = float(row["avg_entry_price"] or 0)
if avg_e <= 0:
return False, "缺少有效持仓均价"
direction = (row["direction"] or "long").lower()
sym = row["symbol"]
ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(sym)
pos = m.get_live_position_contracts(ex_sym, direction)
if pos is None or float(pos) <= 0:
return False, "交易所当前无该方向持仓"
pos_amt = float(pos)
dup = conn.execute(
"SELECT id FROM order_monitors WHERE status='active' AND symbol=? AND direction=? LIMIT 1",
(sym, direction),
).fetchone()
if dup:
return False, "该币种已有运行中的下单监控,请先结束后再保本移交"
be_fn = getattr(m, "calc_trend_manual_breakeven_stop", None)
if not callable(be_fn):
pct = float(offset_pct if offset_pct is not None else cfg["breakeven_offset_pct"])
if direction == "short":
new_sl_raw = avg_e * (1.0 - pct / 100.0)
else:
new_sl_raw = avg_e * (1.0 + pct / 100.0)
else:
new_sl_raw = be_fn(direction, avg_e, offset_pct)
if new_sl_raw is None:
return False, "保本价计算失败"
new_sl = m.round_price_to_exchange(ex_sym, new_sl_raw)
if new_sl is None:
return False, "保本价经交易所精度舍入后无效"
new_sl = float(new_sl)
tp = float(row["take_profit"] or 0)
if tp <= 0:
return False, "计划止盈价无效"
cur_sl = float(row["stop_loss"] or 0)
if direction == "long":
if new_sl <= cur_sl:
return False, f"新止损 {new_sl} 未高于当前止损 {cur_sl}(多仓需上移)"
else:
if new_sl >= cur_sl:
return False, f"新止损 {new_sl} 未低于当前止损 {cur_sl}(空仓需下移)"
ok_live, live_reason = m.ensure_exchange_live_ready()
if not ok_live:
return False, live_reason or "实盘未就绪"
plan_id = int(row["id"])
handoff_row = {
"symbol": sym,
"exchange_symbol": ex_sym,
"direction": direction,
"order_amount": pos_amt,
}
try:
trend_replace_tpsl(cfg, handoff_row, new_sl, tp)
except Exception as e:
fe = getattr(m, "friendly_exchange_error", None)
return False, fe(e) if callable(fe) else str(e)
now_s = m.app_now_str()
_TREND_FLAT_STREAK.pop(plan_id, None)
cur = conn.execute(
"UPDATE trend_pullback_plans SET status=?, message=?, stop_loss=?, "
"breakeven_applied=1, breakeven_applied_at=? WHERE id=? AND status='active'",
(
TREND_PLAN_STATUS_HANDOFF,
f"保本移交下单监控({TREND_HANDOFF_TRADE_NOTE}",
new_sl,
now_s,
plan_id,
),
)
if not getattr(cur, "rowcount", 0):
return False, "计划状态更新失败(可能已被其他操作结束)"
try:
mon_id = _insert_trend_handoff_order_monitor(
cfg, conn, row, new_sl=new_sl, pos_amt=pos_amt
)
except Exception as e:
conn.execute(
"UPDATE trend_pullback_plans SET status='active', message=? WHERE id=?",
(f"移交下单监控失败:{e}", plan_id),
)
return False, f"移交下单监控失败:{e}"
pct_used = float(
offset_pct if offset_pct is not None else cfg["breakeven_offset_pct"]
)
extra = getattr(m, "build_wechat_close_message", None)
send = getattr(m, "send_wechat_msg", None)
pf = getattr(m, "format_price_for_symbol", None)
fmt = (lambda s, p: pf(s, p)) if callable(pf) else (lambda _s, p: str(p))
if callable(send):
lines = [
f"# ✅ {sym} 趋势回调保本移交",
f"- 计划 ID**{plan_id}** → 下单监控 **#{mon_id}**",
f"- 备注:**{TREND_HANDOFF_TRADE_NOTE}**",
f"- 保本止损:{fmt(sym, new_sl)} 止盈:{fmt(sym, tp)}",
f"- 交易所:已挂止盈止损;平仓后将写入交易记录({ENTRY_REASON_TREND_PULLBACK}",
]
wl = getattr(m, "_wechat_account_label", None)
if callable(wl):
lines.insert(1, f"**账户:{wl()}**")
send("\n".join(lines))
return True, None
def load_trend_page_context(conn, request_obj, cfg: dict) -> dict[str, Any]:
m = _m(cfg)
_cleanup_stale_previews(conn)
trend_active = int(
conn.execute(
"SELECT COUNT(*) FROM trend_pullback_plans WHERE status='active'"
).fetchone()[0]
or 0
)
trend_plans = []
for r in conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC"
).fetchall():
try:
trend_plans.append(enrich_trend_plan(cfg, r))
except Exception:
trend_plans.append(_row(cfg, r))
now = m.app_now()
active_count = m.get_active_position_count(conn)
can_trade_trend = (
m.trading_day_reset_allows_new_open(now)
and active_count < cfg["max_active_positions"]
and trend_active == 0
)
trend_preview = None
trend_preview_levels = []
preview_expires_ms = None
trend_preview_expired = False
pid_arg = (request_obj.args.get("preview_id") or "").strip()
if pid_arg:
pr = conn.execute(
"SELECT * FROM trend_pullback_previews WHERE id=?", (pid_arg,)
).fetchone()
now_ms = int(time.time() * 1000)
if pr and int(pr["expires_at_ms"] or 0) >= now_ms:
trend_preview = _row(cfg, pr)
preview_expires_ms = int(pr["expires_at_ms"])
try:
grid = json.loads(trend_preview.get("grid_prices_json") or "[]")
legs = json.loads(trend_preview.get("leg_amounts_json") or "[]")
except Exception:
grid, legs = [], []
for i, pair in enumerate(zip(grid, legs), 1):
trend_preview_levels.append({"i": i, "price": pair[0], "contracts": pair[1]})
elif pr:
trend_preview_expired = True
return {
"trend_plans": trend_plans,
"trend_active": trend_active,
"can_trade_trend": can_trade_trend,
"trend_preview": trend_preview,
"trend_preview_levels": trend_preview_levels,
"preview_expires_ms": preview_expires_ms,
"trend_preview_expired": trend_preview_expired,
"trend_pullback_dca_legs": cfg["dca_legs"],
"trend_pullback_preview_ttl": cfg["preview_ttl"],
"trend_preview_max_drift_pct": cfg["drift_pct"],
"trend_manual_breakeven_offset_pct": cfg["breakeven_offset_pct"],
}
def register_trend_routes(app: Flask, cfg: dict) -> None:
lr = cfg["login_required"]
get_db = cfg["get_db"]
def _redirect_trend(**kw):
return redirect(url_for("strategy_trading_page", **kw))
@app.route("/preview_trend_pullback", methods=["POST"])
@lr
def preview_trend_pullback():
conn = get_db()
init_strategy_tables(conn)
okp, msg = precheck_trend_start(cfg, conn)
if not okp:
conn.close()
flash(msg)
return _redirect_trend()
m = _m(cfg)
ok_live, reason = m.ensure_exchange_live_ready()
if not ok_live:
conn.close()
flash(reason)
return _redirect_trend()
payload, err = parse_trend_plan(cfg, request.form)
if err:
conn.close()
flash(err)
return _redirect_trend()
pid = str(uuid.uuid4())
exp_ms = int(time.time() * 1000) + cfg["preview_ttl"] * 1000
created = m.app_now_str()
conn.execute(
"""INSERT INTO trend_pullback_previews (
id,symbol,exchange_symbol,direction,leverage,stop_loss,add_upper,take_profit,risk_percent,
snapshot_available_usdt,snapshot_at,live_price_ref,plan_margin_capital,target_order_amount,first_order_amount,remainder_total,
dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,expires_at_ms,created_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
pid,
payload["symbol"],
payload["exchange_symbol"],
payload["direction"],
payload["leverage"],
payload["stop_loss"],
payload["add_upper"],
payload["take_profit"],
payload["risk_percent"],
payload["snapshot_available_usdt"],
payload["snapshot_at"],
payload["live_price_ref"],
payload["plan_margin_capital"],
payload["target_order_amount"],
payload["first_order_amount"],
payload["remainder_total"],
payload["dca_legs"],
payload["per_leg_amount"],
payload["grid_prices_json"],
payload["leg_amounts_json"],
exp_ms,
created,
),
)
_insert_preview_snapshot(conn, pid, created, exp_ms, payload)
conn.commit()
conn.close()
flash(f"预览已生成,有效期 {cfg['preview_ttl']} 秒,请核对后点击「确认执行」。")
return _redirect_trend(preview_id=pid)
@app.route("/execute_trend_pullback", methods=["POST"])
@lr
def execute_trend_pullback():
pid = (request.form.get("preview_id") or "").strip()
if not pid:
flash("缺少预览 ID")
return _redirect_trend()
conn = get_db()
init_strategy_tables(conn)
_cleanup_stale_previews(conn)
pr = conn.execute(
"SELECT * FROM trend_pullback_previews WHERE id=?", (pid,)
).fetchone()
now_ms = int(time.time() * 1000)
if not pr or int(pr["expires_at_ms"] or 0) < now_ms:
conn.close()
flash("预览已过期或不存在,请重新生成预览")
return _redirect_trend()
okp, msg = precheck_trend_start(cfg, conn)
if not okp:
conn.close()
flash(msg)
return _redirect_trend(preview_id=pid)
m = _m(cfg)
ok_live, reason = m.ensure_exchange_live_ready()
if not ok_live:
conn.close()
flash(reason)
return _redirect_trend(preview_id=pid)
snap_prev = float(pr["snapshot_available_usdt"] or 0)
snap_now = m.get_available_trading_usdt()
if snap_now is None or snap_now <= 0:
conn.close()
flash("无法读取当前合约可用余额,请稍后重试")
return _redirect_trend(preview_id=pid)
drift = abs(float(snap_now) - snap_prev) / max(snap_prev, 1e-9) * 100.0
if drift > cfg["drift_pct"]:
conn.close()
flash(
f"当前可用余额与预览快照偏差 {drift:.2f}%,超过允许 {cfg['drift_pct']}%,请重新生成预览"
)
return _redirect_trend(preview_id=pid)
symbol = pr["symbol"]
exchange_symbol = pr["exchange_symbol"]
direction = pr["direction"] or "long"
leverage = int(pr["leverage"] or 1)
stop_loss = float(pr["stop_loss"])
first_amt = float(pr["first_order_amount"] or 0)
live_price = m.get_price(symbol)
if live_price is None:
conn.close()
flash("获取实时价格失败")
return _redirect_trend(preview_id=pid)
try:
o1 = m.place_exchange_order(
exchange_symbol, direction, first_amt, leverage, stop_loss=None, take_profit=None
)
fill1 = m.resolve_order_entry_price(o1, exchange_symbol, live_price)
trend_refresh_stop_only(cfg, exchange_symbol, direction, stop_loss)
except Exception as e:
conn.close()
fe = getattr(m, "friendly_exchange_error", lambda x, **k: str(x))
flash(fe(e, available_usdt=snap_now))
return _redirect_trend(preview_id=pid)
trading_day = m.get_trading_day(m.app_now())
opened_at = m.app_now_str()
opened_ms = getattr(m, "_to_ms_with_fallback", lambda a, b: None)(None, opened_at)
cur = conn.execute(
"""INSERT INTO trend_pullback_plans (
status,symbol,exchange_symbol,direction,leverage,stop_loss,initial_stop_loss,add_upper,take_profit,risk_percent,
snapshot_available_usdt,snapshot_at,plan_margin_capital,target_order_amount,first_order_amount,remainder_total,
dca_legs,per_leg_amount,grid_prices_json,leg_amounts_json,legs_done,first_order_done,last_mark_price,avg_entry_price,order_amount_open,opened_at,opened_at_ms,session_date,message
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""",
(
"active",
symbol,
exchange_symbol,
direction,
leverage,
stop_loss,
stop_loss,
float(pr["add_upper"]),
float(pr["take_profit"]),
float(pr["risk_percent"] or 5),
float(snap_now),
opened_at,
float(pr["plan_margin_capital"] or 0),
float(pr["target_order_amount"] or 0),
first_amt,
float(pr["remainder_total"] or 0),
int(pr["dca_legs"] or 0),
float(pr["per_leg_amount"] or 0),
pr["grid_prices_json"] or "[]",
pr["leg_amounts_json"] or "[]",
0,
1,
float(live_price),
fill1,
first_amt,
opened_at,
opened_ms,
trading_day,
f"预览ID:{pid[:8]}",
),
)
new_id = int(cur.lastrowid)
conn.execute(
"UPDATE trend_pullback_preview_snapshots SET outcome='executed', executed_plan_id=? WHERE preview_id=?",
(new_id, pid),
)
conn.execute("DELETE FROM trend_pullback_previews WHERE id=?", (pid,))
conn.commit()
conn.close()
flash("趋势回调已执行:首仓已成交并挂交易所止损,止盈由程序监控。")
return _redirect_trend()
@app.route("/cancel_trend_pullback_preview", methods=["POST"])
@lr
def cancel_trend_pullback_preview():
pid = (request.form.get("preview_id") or "").strip()
conn = get_db()
if pid:
conn.execute(
"UPDATE trend_pullback_preview_snapshots SET outcome='cancelled' WHERE preview_id=? AND outcome='open'",
(pid,),
)
conn.execute("DELETE FROM trend_pullback_previews WHERE id=?", (pid,))
conn.commit()
conn.close()
flash("已取消预览")
return _redirect_trend()
@app.route("/trend_pullback_breakeven/<int:pid>", methods=["POST"])
@lr
def trend_pullback_breakeven(pid: int):
offset_pct = None
raw = (request.form.get("breakeven_offset_pct") or "").strip()
if raw:
try:
offset_pct = float(raw)
if offset_pct < 0:
raise ValueError
except ValueError:
flash("保本偏移% 格式无效")
return _redirect_trend()
conn = get_db()
row = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,)
).fetchone()
if not row:
conn.close()
flash("未找到运行中的趋势回调计划")
return _redirect_trend()
ok, err = apply_manual_breakeven(cfg, conn, row, offset_pct=offset_pct)
conn.commit()
conn.close()
flash(
"已保本:趋势计划已结束,持仓已移交下单监控并挂止盈止损;平仓后将写入交易记录"
if ok
else (err or "保本移交失败")
)
return _redirect_trend()
@app.route("/stop_trend_pullback/<int:pid>")
@lr
def stop_trend_pullback(pid: int):
conn = get_db()
row = conn.execute(
"SELECT * FROM trend_pullback_plans WHERE id=? AND status='active'", (pid,)
).fetchone()
if not row:
conn.close()
flash("未找到运行中的趋势回调计划")
return _redirect_trend()
m = _m(cfg)
ex_sym = row["exchange_symbol"] or m.normalize_exchange_symbol(row["symbol"])
direction = row["direction"] or "long"
lev = int(row["leverage"] or 1)
px = m.get_price(row["symbol"])
exit_p = float(px) if px is not None else 0.0
ok_live, _ = m.ensure_exchange_live_ready()
if ok_live:
pos = m.get_live_position_contracts(ex_sym, direction)
if pos is not None and pos > 0:
try:
close_resp = trend_market_close(cfg, ex_sym, direction, float(pos), lev)
ep = m.extract_trade_price_from_order(close_resp)
if ep:
exit_p = float(ep)
except Exception as e:
if not m.is_no_position_error(str(e)):
conn.close()
flash(f"平仓失败:{e}")
return _redirect_trend()
try:
cancel_symbol_orders(cfg, ex_sym)
except Exception:
pass
try:
_finalize_plan(cfg, conn, row, "手动平仓", exit_p)
except Exception as e:
conn.execute(
"UPDATE trend_pullback_plans SET status='stopped_manual', message=? "
"WHERE id=? AND status='active'",
(f"结束异常:{e}", pid),
)
conn.commit()
conn.close()
flash(f"计划已结束但记账可能不完整:{e}")
return _redirect_trend()
conn.close()
flash("已结束趋势回调计划")
return _redirect_trend()