"""顺势加仓(滚仓):纯计算。人工触发;止盈锁定首仓;斐波仅算限价。""" from __future__ import annotations from typing import Any, Optional, Tuple from fib_key_monitor_lib import calc_fib_plan, fib_ratio_from_type ROLL_MAX_LEGS_LONG = 3 ROLL_MAX_LEGS_SHORT = 3 ROLL_STOP_OFFSET_PCT_DEFAULT = 1.0 FIB_MODES = frozenset({"fib_618", "fib_786"}) def fib_ratio_from_mode(mode: str) -> Optional[float]: m = (mode or "").strip().lower() if m in ("fib_618", "618", "0.618"): return 0.618 if m in ("fib_786", "786", "0.786"): return 0.786 return None def fib_limit_entry(direction: str, upper: float, lower: float, mode: str) -> Tuple[Optional[float], Optional[str]]: """H/L 仅用于计算限价加仓价;多:下沿=止损侧;空:上沿=止损侧。""" ratio = fib_ratio_from_mode(mode) if ratio is None: return None, "斐波档位无效" h, l = float(upper), float(lower) if h <= l: return None, "上沿须大于下沿" direction = (direction or "long").strip().lower() if direction == "short": plan = calc_fib_plan("short", h, l, ratio) else: plan = calc_fib_plan("long", h, l, ratio) if not plan: return None, "无法计算斐波限价" entry, _sl, _tp = plan return float(entry), None def max_roll_legs(direction: str) -> int: return ROLL_MAX_LEGS_LONG if (direction or "long").strip().lower() == "long" else ROLL_MAX_LEGS_SHORT def resolve_roll_stop_spec( *, new_stop_loss: Optional[float] = None, stop_offset_pct: Optional[float] = None, entry_ref: float = 0.0, ) -> tuple[str, float]: """ 解析滚仓止损输入。 - stop_offset_pct:相对合并均价的偏移%,如 1 表示 1%(多:均价下方;空:均价上方)。 - new_stop_loss:兼容旧版绝对止损价;若数值很小(如 1.0)且相对均价过低,视为偏移%。 """ if stop_offset_pct is not None: try: pct = float(stop_offset_pct) if pct > 0: return "offset", pct except (TypeError, ValueError): pass if new_stop_loss is not None: try: sl = float(new_stop_loss) if sl > 0: ref = float(entry_ref or 0) if ref > 0 and sl <= min(30.0, ref * 0.25): return "offset", sl return "absolute", sl except (TypeError, ValueError): pass return "offset", ROLL_STOP_OFFSET_PCT_DEFAULT def unified_stop_from_avg(direction: str, avg: float, offset_pct: float) -> float: """合并均价 ± offset% 作为新统一止损(非保本)。""" avg_f = float(avg) pct = float(offset_pct) / 100.0 if avg_f <= 0 or pct <= 0: return 0.0 direction = (direction or "long").strip().lower() if direction == "short": return avg_f * (1.0 + pct) return avg_f * (1.0 - pct) def avg_entry_after_add( qty_existing: float, entry_existing: float, add_qty: float, add_price: float, ) -> float: q1 = float(qty_existing) e1 = float(entry_existing) q2 = float(add_qty) e2 = float(add_price) total = q1 + q2 if total <= 0: return 0.0 return (q1 * e1 + q2 * e2) / total def solve_add_amount_for_avg_stop_offset( direction: str, qty_existing: float, entry_existing: float, add_price: float, offset_pct: float, risk_budget_usdt: float, ) -> Tuple[Optional[float], Optional[str]]: """ 合并后止损 = 合并均价 ± offset%,且触及止损时总亏损 ≈ risk_budget。 loss = offset% × (Q1·E1 + Q2·E2) => Q2 = (B/p − Q1·E1) / E2 """ try: q1 = float(qty_existing) e1 = float(entry_existing) e2 = float(add_price) b = float(risk_budget_usdt) p = float(offset_pct) / 100.0 except (TypeError, ValueError): return None, "参数格式错误" if q1 <= 0 or e1 <= 0 or e2 <= 0 or b <= 0: return None, "持仓或风险预算无效" if p <= 0 or p >= 1: return None, "止损偏移%须大于 0 且小于 100" direction = (direction or "long").strip().lower() need_notional = b / p q2 = (need_notional - q1 * e1) / e2 if q2 <= 0: return None, "按当前偏移%与总风险%,无需加仓或无法再加(已满足风险上限)" new_avg = avg_entry_after_add(q1, e1, q2, e2) sl = unified_stop_from_avg(direction, new_avg, offset_pct) if direction == "short": if sl <= e2: return None, "做空:合并后止损须高于加仓价(请减小偏移%或风险%)" else: if sl >= e2: return None, "做多:合并后止损须低于加仓价(请减小偏移%或风险%)" return q2, None def solve_add_amount_for_total_risk( direction: str, qty_existing: float, entry_existing: float, add_price: float, new_stop: float, risk_budget_usdt: float, ) -> Tuple[Optional[float], Optional[str]]: """ 已知合并后若触及 new_stop 总亏损=risk_budget,反推本次加仓张数 Q2。 long: (avg - SL) * (Q1+Q2) = B => Q2 = (B - Q1*(E1-SL)) / (E2-SL) short: (SL - avg) * (Q1+Q2) = B => Q2 = (B - Q1*(SL-E1)) / (SL-E2) """ try: q1 = float(qty_existing) e1 = float(entry_existing) e2 = float(add_price) sl = float(new_stop) b = float(risk_budget_usdt) except (TypeError, ValueError): return None, "参数格式错误" if q1 <= 0 or e1 <= 0 or e2 <= 0 or b <= 0: return None, "持仓或风险预算无效" direction = (direction or "long").strip().lower() if direction == "short": denom = sl - e2 numer = b - q1 * (sl - e1) if denom <= 0: return None, "做空:新止损须高于限价加仓价" else: denom = e2 - sl numer = b - q1 * (e1 - sl) if denom <= 0: return None, "做多:新止损须低于限价/市价加仓价" q2 = numer / denom if q2 <= 0: return None, "按当前新止损与总风险%,无需加仓或无法再加(已满足风险上限)" return q2, None def preview_roll( *, direction: str, symbol: str, qty_existing: float, entry_existing: float, initial_take_profit: float, add_mode: str, new_stop_loss: Optional[float] = None, stop_offset_pct: Optional[float] = None, risk_percent: float, capital_base_usdt: float, add_price: Optional[float] = None, fib_upper: Optional[float] = None, fib_lower: Optional[float] = None, legs_done: int = 0, ) -> Tuple[Optional[dict[str, Any]], Optional[str]]: direction = (direction or "long").strip().lower() if legs_done >= max_roll_legs(direction): return None, f"{'做多' if direction == 'long' else '做空'}滚仓已达 {max_roll_legs(direction)} 次上限" mode = (add_mode or "market").strip().lower() if mode == "market": if add_price is None or add_price <= 0: return None, "市价加仓需要有效参考价" entry_add = float(add_price) mode_label = "市价" elif mode in FIB_MODES: if fib_upper is None or fib_lower is None: return None, "斐波限价须填写上沿与下沿" entry_add, err = fib_limit_entry(direction, float(fib_upper), float(fib_lower), mode) if err: return None, err mode_label = "斐波0.618" if "618" in mode else "斐波0.786" else: return None, "加仓方式无效" try: tp = float(initial_take_profit) except (TypeError, ValueError): return None, "止盈格式错误" if tp <= 0: return None, "首仓止盈须大于0" stop_mode, stop_val = resolve_roll_stop_spec( new_stop_loss=new_stop_loss, stop_offset_pct=stop_offset_pct, entry_ref=entry_existing, ) if direction == "long": if tp <= entry_existing: return None, "做多:首仓止盈须高于当前持仓均价参考" else: if tp >= entry_existing: return None, "做空:首仓止盈须低于当前持仓均价参考" risk_budget = float(capital_base_usdt) * (float(risk_percent) / 100.0) offset_pct: Optional[float] = None if stop_mode == "offset": offset_pct = float(stop_val) q2_raw, err = solve_add_amount_for_avg_stop_offset( direction, qty_existing, entry_existing, entry_add, offset_pct, risk_budget ) else: sl = float(stop_val) if sl <= 0: return None, "止损须大于0" if direction == "long": if sl >= entry_add: return None, "做多:新止损须低于加仓价" else: if sl <= entry_add: return None, "做空:新止损须高于加仓价" q2_raw, err = solve_add_amount_for_total_risk( direction, qty_existing, entry_existing, entry_add, sl, risk_budget ) if err: return None, err q2 = float(q2_raw) new_qty = qty_existing + q2 new_avg = avg_entry_after_add(qty_existing, entry_existing, q2, entry_add) if stop_mode == "offset": sl = unified_stop_from_avg(direction, new_avg, offset_pct) if direction == "long": loss_at_sl = (new_avg - sl) * new_qty reward_at_tp = (tp - new_avg) * new_qty else: loss_at_sl = (sl - new_avg) * new_qty reward_at_tp = (new_avg - tp) * new_qty return { "symbol": symbol, "direction": direction, "add_mode": mode, "add_mode_label": mode_label, "add_price": round(entry_add, 10), "new_stop_loss": round(sl, 10), "stop_offset_pct": offset_pct, "stop_mode": stop_mode, "initial_take_profit": tp, "risk_percent": float(risk_percent), "risk_budget_usdt": round(risk_budget, 4), "add_amount_raw": q2, "qty_existing": float(qty_existing), "entry_existing": float(entry_existing), "qty_after": new_qty, "avg_entry_after": round(new_avg, 10), "loss_at_sl_usdt": round(loss_at_sl, 4), "reward_at_tp_usdt": round(reward_at_tp, 4), "legs_done": int(legs_done), "leg_index_next": int(legs_done) + 1, "fib_upper": fib_upper, "fib_lower": fib_lower, }, None def roll_stop_after_fill( direction: str, qty_before: float, entry_before: float, add_qty: float, fill_price: float, *, stop_offset_pct: Optional[float] = None, absolute_stop: Optional[float] = None, ) -> float: """成交后按合并均价重算统一止损(偏移%模式)或沿用绝对止损。""" if stop_offset_pct is not None and float(stop_offset_pct) > 0: avg = avg_entry_after_add(qty_before, entry_before, add_qty, fill_price) return unified_stop_from_avg(direction, avg, float(stop_offset_pct)) return float(absolute_stop or 0)