"""中控历史测算:趋势回调 / 滚仓,以损定仓(按交易所精度与张数规则)。""" from __future__ import annotations from typing import Any, Callable, Optional, Tuple from strategy_roll_lib import max_roll_legs from strategy_trend_lib import ( build_trend_preview_level_rows, calc_risk_fraction, compute_trend_plan_core, validate_trend_bounds, ) DEFAULT_DCA_LEGS = 5 MARGIN_BUFFER = 0.95 def _resolve_market( exchange_id: str, base: str, ) -> Tuple[Optional[dict[str, Any]], Optional[Callable[[float], Optional[float]]], Optional[str]]: from hub_calculator_market_lib import get_calculator_market, make_amount_precise_fn_from_market market, err = get_calculator_market(exchange_id, base) if err or not market: return None, None, err or "无法解析合约" amount_precise = make_amount_precise_fn_from_market(market) return market, amount_precise, None def calc_trend_calculator( *, direction: str, capital_usdt: float, risk_percent: float, leverage: int, entry_price: float, stop_loss: float, add_upper: float, take_profit: float, dca_legs: int = DEFAULT_DCA_LEGS, exchange_id: str = "0", base: str = "ETH", ) -> Tuple[Optional[dict[str, Any]], Optional[str]]: market, amount_precise, merr = _resolve_market(exchange_id, base) if merr or not market or not amount_precise: return None, merr or "无法解析合约" contract_size = float(market.get("contract_size") or 1.0) exchange_symbol = market["exchange_symbol"] direction = (direction or "long").strip().lower() if direction not in ("long", "short"): return None, "方向须为 long 或 short" try: capital = float(capital_usdt) rp = float(risk_percent) lev = int(leverage) entry = float(entry_price) sl = float(stop_loss) upper = float(add_upper) tp = float(take_profit) legs = max(1, int(dca_legs)) cs = float(contract_size) if contract_size else 1.0 except (TypeError, ValueError): return None, "参数格式错误" if capital <= 0 or rp <= 0 or lev <= 0 or entry <= 0 or sl <= 0 or upper <= 0 or tp <= 0: return None, "资金、风险、杠杆与价格须大于 0" bound_err = validate_trend_bounds(direction, sl, upper) if bound_err: return None, bound_err rf = calc_risk_fraction(direction, upper, sl) if rf is None or rf <= 0: return None, "止损与补仓区间边界组合无法计算风险比例" risk_budget = capital * (rp / 100.0) notional = risk_budget / rf margin_plan = min(notional / float(lev), capital * MARGIN_BUFFER) if margin_plan <= 0: return None, "计划保证金过小" target_amt = _amount_from_margin(margin_plan, lev, entry, cs) if target_amt is None or target_amt <= 0: return None, "无法计算计划张数,请检查入场价与杠杆" target_amt = amount_precise(target_amt) if target_amt is None or target_amt <= 0: return None, "计划张数低于交易所最小精度" def _amount_precise(_symbol: str, amount: float) -> Optional[float]: return amount_precise(amount) payload, err = compute_trend_plan_core( direction=direction, stop_loss=sl, add_upper=upper, risk_percent=rp, snapshot_usdt=capital, leverage=lev, live_price=entry, target_order_amount=target_amt, exchange_symbol=exchange_symbol, dca_legs=legs, amount_precise=_amount_precise, min_amount=float(market.get("min_amount") or 0.0), full_margin_buffer_ratio=MARGIN_BUFFER, ) if err: return None, err payload["take_profit"] = tp payload["leverage"] = lev payload["contract_size"] = cs preview, rows = build_trend_preview_level_rows(payload) px_dec = int(market.get("price_decimals") or 4) amt_dec = int(market.get("amount_decimals") or 4) def _f(v: Any, nd: int | None = None) -> Any: if v is None: return None try: return round(float(v), nd if nd is not None else 8) except (TypeError, ValueError): return v table = [] for row in rows: table.append( { "label": row.get("label"), "price": _f(row.get("price"), px_dec), "contracts": _f(row.get("contracts"), amt_dec), "avg_entry": _f(row.get("avg_entry"), px_dec), "profit_u": _f(row.get("profit_u")), "risk_u": _f(row.get("risk_u")), "rr": _f(row.get("rr"), 4), } ) return { "direction": direction, "capital_usdt": _f(capital), "risk_percent": _f(rp, 2), "risk_budget_u": _f(preview.get("preview_risk_amount_u")), "leverage": lev, "entry_price": _f(entry, px_dec), "stop_loss": _f(sl, px_dec), "add_upper": _f(upper, px_dec), "take_profit": _f(tp, px_dec), "plan_margin_u": _f(preview.get("plan_margin_capital")), "target_contracts": _f(preview.get("target_order_amount"), amt_dec), "first_contracts": _f(preview.get("first_order_amount"), amt_dec), "dca_legs": int(preview.get("dca_legs") or legs), "first_profit_u": _f(preview.get("preview_first_profit_u")), "first_rr": _f(preview.get("preview_target_rr"), 4), "market": market, "rows": table, }, None def _amount_from_margin( margin_capital: float, leverage: int, price: float, contract_size: float, ) -> Optional[float]: try: margin = float(margin_capital) lev = int(leverage) px = float(price) cs = float(contract_size) if contract_size else 1.0 except (TypeError, ValueError): return None if margin <= 0 or lev <= 0 or px <= 0 or cs <= 0: return None notional = margin * lev return notional / (px * cs) def _round(v: Any, nd: int = 4) -> Any: if v is None: return None try: return round(float(v), nd) except (TypeError, ValueError): return v def _money_rr(profit_u: Optional[float], risk_u: Optional[float]) -> Optional[float]: try: if risk_u is None or float(risk_u) <= 0 or profit_u is None: return None return round(float(profit_u) / float(risk_u), 4) except (TypeError, ValueError): return None def calc_initial_roll_qty( direction: str, entry_price: float, stop_loss: float, risk_budget_usdt: float, contract_size: float = 1.0, ) -> Tuple[Optional[float], Optional[str]]: """首仓以损定仓:打到初始止损亏损 = 风险预算。""" try: entry = float(entry_price) sl = float(stop_loss) budget = float(risk_budget_usdt) cs = float(contract_size) if contract_size else 1.0 except (TypeError, ValueError): return None, "参数格式错误" if entry <= 0 or sl <= 0 or budget <= 0 or cs <= 0: return None, "入场价、止损与风险预算须大于 0" direction = (direction or "long").strip().lower() if direction == "short": per_unit = (sl - entry) * cs if per_unit <= 0: return None, "做空:止损价须高于首仓入场价" else: per_unit = (entry - sl) * cs if per_unit <= 0: return None, "做多:止损价须低于首仓入场价" return budget / per_unit, None def solve_add_amount_for_total_risk( direction: str, qty_existing: float, entry_existing: float, add_price: float, new_stop: float, risk_budget_usdt: float, contract_size: float = 1.0, ) -> Tuple[Optional[float], Optional[str]]: """合并持仓打到新止损总亏损 = 风险预算,反推本次加仓张数。""" try: q1 = float(qty_existing) e1 = float(entry_existing) e2 = float(add_price) sl = float(new_stop) b = float(risk_budget_usdt) cs = float(contract_size) if contract_size else 1.0 except (TypeError, ValueError): return None, "参数格式错误" if q1 <= 0 or e1 <= 0 or e2 <= 0 or b <= 0 or cs <= 0: return None, "持仓或风险预算无效" direction = (direction or "long").strip().lower() if direction == "short": denom = sl - e2 numer = b / cs - q1 * (sl - e1) if denom <= 0: return None, "做空:新止损须高于限价加仓价" else: denom = e2 - sl numer = b / cs - q1 * (e1 - sl) if denom <= 0: return None, "做多:新止损须低于限价/市价加仓价" q2 = numer / denom if q2 <= 0: return None, "按当前新止损与总风险%,无需加仓或无法再加(已满足风险上限)" return q2, None def _roll_leg_preview( *, direction: str, qty_existing: float, entry_existing: float, take_profit: float, add_price: float, new_stop_loss: float, risk_budget: float, contract_size: float, amount_precise: Callable[[float], Optional[float]], ) -> Tuple[Optional[dict[str, Any]], Optional[str]]: direction = (direction or "long").strip().lower() try: tp = float(take_profit) sl = float(new_stop_loss) entry_add = float(add_price) e1 = float(entry_existing) except (TypeError, ValueError): return None, "止损/止盈格式错误" if sl <= 0 or tp <= 0 or entry_add <= 0: return None, "止损与首仓止盈须大于0" if direction == "long": if sl >= entry_add: return None, "做多:新止损须低于加仓价" if tp <= e1: return None, "做多:首仓止盈须高于当前持仓均价参考" else: if sl <= entry_add: return None, "做空:新止损须高于加仓价" if tp >= e1: return None, "做空:首仓止盈须低于当前持仓均价参考" q2_raw, err = solve_add_amount_for_total_risk( direction, qty_existing, entry_existing, entry_add, sl, risk_budget, contract_size, ) if err: return None, err q2 = amount_precise(float(q2_raw)) if q2 is None or q2 <= 0: return None, "加仓张数低于交易所最小精度" new_qty = float(qty_existing) + float(q2) new_avg = (float(qty_existing) * float(entry_existing) + float(q2) * entry_add) / new_qty cs = float(contract_size) if contract_size else 1.0 if direction == "long": loss_at_sl = (new_avg - sl) * new_qty * cs reward_at_tp = (tp - new_avg) * new_qty * cs else: loss_at_sl = (sl - new_avg) * new_qty * cs reward_at_tp = (new_avg - tp) * new_qty * cs return { "add_amount_raw": q2, "qty_after": new_qty, "avg_entry_after": new_avg, "add_price": entry_add, "new_stop_loss": sl, "loss_at_sl_usdt": loss_at_sl, "reward_at_tp_usdt": reward_at_tp, }, None def calc_roll_calculator( *, direction: str, capital_usdt: float, risk_percent: float, entry_price: float, stop_loss: float, take_profit: float, add_legs: list[dict[str, float]] | None = None, legs_done: int = 0, exchange_id: str = "0", base: str = "ETH", ) -> Tuple[Optional[dict[str, Any]], Optional[str]]: """ 滚仓历史测算:首仓自动以损定仓;止盈锁定首仓价;最多 3 次滚仓加仓。 add_legs: [{add_price, new_stop_loss}, ...],按顺序链式计算。 legs_done: 已完成滚仓次数(仅标记,仍参与链式状态推进)。 """ market, amount_precise, merr = _resolve_market(exchange_id, base) if merr or not market or not amount_precise: return None, merr or "无法解析合约" contract_size = float(market.get("contract_size") or 1.0) px_dec = int(market.get("price_decimals") or 4) amt_dec = int(market.get("amount_decimals") or 4) direction = (direction or "long").strip().lower() if direction not in ("long", "short"): return None, "方向须为 long 或 short" try: capital = float(capital_usdt) rp = float(risk_percent) entry = float(entry_price) initial_sl = float(stop_loss) tp = float(take_profit) done = max(0, int(legs_done)) except (TypeError, ValueError): return None, "参数格式错误" if capital <= 0 or rp <= 0 or entry <= 0 or initial_sl <= 0 or tp <= 0: return None, "资金、风险与价格须大于 0" if done > max_roll_legs(direction): return None, f"已完成滚仓次数不能超过 {max_roll_legs(direction)} 次" legs_in: list[dict[str, float]] = [] for raw in add_legs or []: if not isinstance(raw, dict): continue try: ap = float(raw.get("add_price")) nsl = float(raw.get("new_stop_loss")) except (TypeError, ValueError): return None, "加仓价与新止损须为有效数字" if ap <= 0 or nsl <= 0: return None, "加仓价与新止损须大于 0" legs_in.append({"add_price": ap, "new_stop_loss": nsl}) if done + len(legs_in) > max_roll_legs(direction): return None, f"已完成 {done} 次 + 待测算 {len(legs_in)} 次,合计不能超过 {max_roll_legs(direction)} 次滚仓" if direction == "long": if tp <= entry: return None, "做多:止盈价须高于首仓入场价" else: if tp >= entry: return None, "做空:止盈价须低于首仓入场价" risk_budget = capital * (rp / 100.0) qty, err = calc_initial_roll_qty(direction, entry, initial_sl, risk_budget, contract_size) if err: return None, err if qty is None or qty <= 0: return None, "无法计算首仓张数" qty_p = amount_precise(float(qty)) if qty_p is None or qty_p <= 0: return None, "首仓张数低于交易所最小精度" qty_f = float(qty_p) avg = entry rows: list[dict[str, Any]] = [] cs = contract_size if direction == "long": first_loss = (avg - initial_sl) * qty_f * cs first_profit = (tp - avg) * qty_f * cs else: first_loss = (initial_sl - avg) * qty_f * cs first_profit = (avg - tp) * qty_f * cs rows.append( { "label": "首仓", "leg_index": 0, "already_done": False, "entry_or_add_price": _round(entry, px_dec), "stop_loss": _round(initial_sl, px_dec), "add_contracts": _round(qty_f, amt_dec), "total_contracts": _round(qty_f, amt_dec), "avg_entry": _round(avg, px_dec), "take_profit": _round(tp, px_dec), "loss_at_sl_u": _round(first_loss), "profit_at_tp_u": _round(first_profit), "rr": _money_rr(first_profit, first_loss), } ) current_qty = qty_f current_avg = avg for i, leg in enumerate(legs_in): leg_no = i + 1 preview, err = _roll_leg_preview( direction=direction, qty_existing=current_qty, entry_existing=current_avg, take_profit=tp, add_price=leg["add_price"], new_stop_loss=leg["new_stop_loss"], risk_budget=risk_budget, contract_size=cs, amount_precise=amount_precise, ) if err: return None, f"滚仓第 {leg_no} 次:{err}" if not preview: return None, f"滚仓第 {leg_no} 次计算失败" current_qty = float(preview["qty_after"]) current_avg = float(preview["avg_entry_after"]) loss = preview.get("loss_at_sl_usdt") reward = preview.get("reward_at_tp_usdt") rows.append( { "label": f"滚仓{leg_no}", "leg_index": leg_no, "already_done": leg_no <= done, "entry_or_add_price": _round(preview.get("add_price"), px_dec), "stop_loss": _round(preview.get("new_stop_loss"), px_dec), "add_contracts": _round(preview.get("add_amount_raw"), amt_dec), "total_contracts": _round(current_qty, amt_dec), "avg_entry": _round(current_avg, px_dec), "take_profit": _round(tp, px_dec), "loss_at_sl_u": _round(loss), "profit_at_tp_u": _round(reward), "rr": _money_rr(reward, loss), } ) last = rows[-1] return { "direction": direction, "capital_usdt": _round(capital), "risk_percent": _round(rp, 2), "risk_budget_u": _round(risk_budget), "entry_price": _round(entry, px_dec), "stop_loss": _round(initial_sl, px_dec), "take_profit": _round(tp, px_dec), "legs_done": done, "roll_legs_planned": len(legs_in), "first_contracts": _round(qty_f, amt_dec), "final_contracts": last.get("total_contracts"), "final_avg_entry": last.get("avg_entry"), "final_loss_at_sl_u": last.get("loss_at_sl_u"), "final_profit_at_tp_u": last.get("profit_at_tp_u"), "final_rr": last.get("rr"), "market": market, "rows": rows, }, None