"""顺势加仓(滚仓):纯计算。人工触发;止盈锁定首仓;程序监控触价市价成交。""" from __future__ import annotations from typing import Any, Optional, Tuple from fib_key_monitor_lib import calc_fib_plan, fib_invalidate_by_mark ROLL_MAX_LEGS_LONG = 3 ROLL_MAX_LEGS_SHORT = 3 MARKET_MODE = "market" FIB_MODES = frozenset({"fib_618", "fib_786"}) BREAKOUT_MODE = "breakout" MODE_LABELS = { MARKET_MODE: "市价加仓", "fib_618": "斐波0.618", "fib_786": "斐波0.786", BREAKOUT_MODE: "突破加仓", } def fib_ratio_from_mode(mode: str) -> Optional[float]: m = (mode or "").strip().lower() if m in ("fib_618", "618", "0.618"): return 0.618 if m in ("fib_786", "786", "0.786"): return 0.786 return None def mode_label(mode: str) -> str: m = (mode or MARKET_MODE).strip().lower() return MODE_LABELS.get(m, m) def fib_limit_entry(direction: str, upper: float, lower: float, mode: str) -> Tuple[Optional[float], Optional[str]]: """H/L 仅用于计算限价加仓价;多:下沿=止损侧;空:上沿=止损侧。""" ratio = fib_ratio_from_mode(mode) if ratio is None: return None, "斐波档位无效" h, l = float(upper), float(lower) if h <= l: return None, "上沿须大于下沿" direction = (direction or "long").strip().lower() if direction == "short": plan = calc_fib_plan("short", h, l, ratio) else: plan = calc_fib_plan("long", h, l, ratio) if not plan: return None, "无法计算斐波限价" entry, _sl, _tp = plan return float(entry), None def max_roll_legs(direction: str) -> int: return ROLL_MAX_LEGS_LONG if (direction or "long").strip().lower() == "long" else ROLL_MAX_LEGS_SHORT def avg_entry_after_add( qty_existing: float, entry_existing: float, add_qty: float, add_price: float, ) -> float: q1 = float(qty_existing) e1 = float(entry_existing) q2 = float(add_qty) e2 = float(add_price) total = q1 + q2 if total <= 0: return 0.0 return (q1 * e1 + q2 * e2) / total def calc_risk_budget_usdt(capital_base_usdt: float, risk_percent: float) -> float: return float(capital_base_usdt) * (float(risk_percent) / 100.0) def solve_add_amount_for_total_risk( direction: str, qty_existing: float, entry_existing: float, add_price: float, new_stop: float, risk_budget_usdt: float, contract_size: float = 1.0, ) -> Tuple[Optional[float], Optional[str]]: """ 合并持仓打到 new_stop 时总亏损 ≈ risk_budget(方案 C)。 long: (avg - SL) * (Q1+Q2) * cs = B => Q2 = (B/cs - Q1*(E1-SL)) / (E2-SL) short: (SL - avg) * (Q1+Q2) * cs = B => Q2 = (B/cs - Q1*(SL-E1)) / (SL-E2) """ try: q1 = float(qty_existing) e1 = float(entry_existing) e2 = float(add_price) sl = float(new_stop) b = float(risk_budget_usdt) cs = float(contract_size) if contract_size else 1.0 except (TypeError, ValueError): return None, "参数格式错误" if q1 <= 0 or e1 <= 0 or e2 <= 0 or b <= 0 or cs <= 0: return None, "持仓或风险预算无效" direction = (direction or "long").strip().lower() if direction == "short": denom = sl - e2 numer = b / cs - q1 * (sl - e1) if denom <= 0: return None, "做空:新止损须高于加仓价" else: denom = e2 - sl numer = b / cs - q1 * (e1 - sl) if denom <= 0: return None, "做多:新止损须低于加仓价" q2 = numer / denom if q2 <= 0: return None, "按当前新止损与风险预算,无需加仓或无法再加(已满足风险上限)" return q2, None def loss_at_stop_usdt( direction: str, avg: float, qty: float, stop: float, contract_size: float = 1.0, ) -> float: cs = float(contract_size or 1.0) direction = (direction or "long").strip().lower() if direction == "short": return (float(stop) - float(avg)) * float(qty) * cs return (float(avg) - float(stop)) * float(qty) * cs def reward_at_tp_usdt( direction: str, avg: float, take_profit: float, qty: float, contract_size: float = 1.0, ) -> float: cs = float(contract_size or 1.0) direction = (direction or "long").strip().lower() if direction == "short": return (float(avg) - float(take_profit)) * float(qty) * cs return (float(take_profit) - float(avg)) * float(qty) * cs def roll_fib_trigger_crossed( direction: str, prev_mark: Optional[float], mark: float, limit_price: float, ) -> bool: """斐波:多=向下穿越限价;空=向上穿越限价。""" try: m = float(mark) lv = float(limit_price) pm = float(prev_mark) if prev_mark is not None else None except (TypeError, ValueError): return False direction = (direction or "long").strip().lower() if direction == "long": if pm is None: return m <= lv return pm > lv and m <= lv if pm is None: return m >= lv return pm < lv and m >= lv def roll_breakout_trigger_crossed( direction: str, prev_mark: Optional[float], mark: float, breakthrough_price: float, ) -> bool: """突破:多=向上穿越突破价;空=向下穿越突破价。""" try: m = float(mark) bp = float(breakthrough_price) pm = float(prev_mark) if prev_mark is not None else None except (TypeError, ValueError): return False direction = (direction or "long").strip().lower() if direction == "long": if pm is None: return m > bp return pm <= bp and m > bp if pm is None: return m < bp return pm >= bp and m < bp def roll_fib_invalidate(direction: str, mark: float, upper: float, lower: float) -> bool: """斐波 pending 失效:止盈侧突破(多 mark>=H;空 mark<=L)。""" return fib_invalidate_by_mark(direction, mark, upper, lower) def roll_breakout_invalidate(direction: str, mark: float, stop_loss: float) -> bool: """突破 pending 失效:未到突破价先触达止损侧(多 mark<=S;空 mark>=S)。""" try: m = float(mark) sl = float(stop_loss) except (TypeError, ValueError): return False direction = (direction or "long").strip().lower() if direction == "long": return m <= sl return m >= sl def validate_roll_geometry( direction: str, add_mode: str, *, new_stop_loss: float, add_price: Optional[float] = None, fib_upper: Optional[float] = None, fib_lower: Optional[float] = None, breakthrough_price: Optional[float] = None, entry_existing: float = 0.0, initial_take_profit: float = 0.0, mark_price: Optional[float] = None, ) -> Optional[str]: direction = (direction or "long").strip().lower() mode = (add_mode or MARKET_MODE).strip().lower() try: sl = float(new_stop_loss) tp = float(initial_take_profit) e1 = float(entry_existing or 0) except (TypeError, ValueError): return "止损/止盈格式错误" if sl <= 0 or tp <= 0: return "止损与首仓止盈须大于0" if direction == "long": if e1 > 0 and tp <= e1: return "做多:首仓止盈须高于当前持仓均价" else: if e1 > 0 and tp >= e1: return "做空:首仓止盈须低于当前持仓均价" if mode == MARKET_MODE: if add_price is None or float(add_price) <= 0: return "市价加仓需要有效参考价" entry_add = float(add_price) elif mode in FIB_MODES: if fib_upper is None or fib_lower is None: return "斐波须填写上沿 H 与下沿 L" entry_add, err = fib_limit_entry(direction, float(fib_upper), float(fib_lower), mode) if err: return err if entry_add is None or entry_add <= 0: return "无法计算斐波限价" elif mode == BREAKOUT_MODE: if breakthrough_price is None: return "突破加仓须填写突破价" try: bp = float(breakthrough_price) except (TypeError, ValueError): return "突破价格式错误" if bp <= 0: return "突破价须大于0" entry_add = bp if direction == "long": if sl >= bp: return "做多:止损须低于突破价" if mark_price is not None and float(mark_price) >= bp: return "做多:当前价须低于突破价(等待向上突破)" else: if sl <= bp: return "做空:止损须高于突破价" if mark_price is not None and float(mark_price) <= bp: return "做空:当前价须高于突破价(等待向下跌破)" else: return "加仓方式无效" if mode != BREAKOUT_MODE: entry_add = float(entry_add) # type: ignore[arg-type] if direction == "long": if sl >= entry_add: return "做多:新止损须低于加仓价" else: if sl <= entry_add: return "做空:新止损须高于加仓价" return None def preview_roll( *, direction: str, symbol: str, qty_existing: float, entry_existing: float, initial_take_profit: float, add_mode: str, new_stop_loss: Optional[float] = None, risk_percent: float, capital_base_usdt: float, add_price: Optional[float] = None, fib_upper: Optional[float] = None, fib_lower: Optional[float] = None, breakthrough_price: Optional[float] = None, legs_done: int = 0, contract_size: float = 1.0, ) -> Tuple[Optional[dict[str, Any]], Optional[str]]: direction = (direction or "long").strip().lower() if legs_done >= max_roll_legs(direction): return None, f"{'做多' if direction == 'long' else '做空'}滚仓已达 {max_roll_legs(direction)} 次上限" mode = (add_mode or MARKET_MODE).strip().lower() if new_stop_loss is None: return None, "请填写新止损价" try: sl = float(new_stop_loss) except (TypeError, ValueError): return None, "止损价格式错误" if sl <= 0: return None, "止损须大于0" geom_err = validate_roll_geometry( direction, mode, new_stop_loss=sl, add_price=add_price, fib_upper=fib_upper, fib_lower=fib_lower, breakthrough_price=breakthrough_price, entry_existing=entry_existing, initial_take_profit=initial_take_profit, mark_price=add_price if mode == BREAKOUT_MODE else add_price, ) if geom_err: return None, geom_err if mode == MARKET_MODE: entry_add = float(add_price) # validated elif mode in FIB_MODES: entry_add, _ = fib_limit_entry(direction, float(fib_upper), float(fib_lower), mode) entry_add = float(entry_add or 0) else: entry_add = float(breakthrough_price or 0) risk_budget = calc_risk_budget_usdt(capital_base_usdt, risk_percent) q2_raw, err = solve_add_amount_for_total_risk( direction, qty_existing, entry_existing, entry_add, sl, risk_budget, contract_size, ) if err: return None, err q2 = float(q2_raw) new_qty = qty_existing + q2 new_avg = avg_entry_after_add(qty_existing, entry_existing, q2, entry_add) cs = float(contract_size or 1.0) loss_sl = loss_at_stop_usdt(direction, new_avg, new_qty, sl, cs) reward_tp = reward_at_tp_usdt(direction, new_avg, initial_take_profit, new_qty, cs) return { "symbol": symbol, "direction": direction, "add_mode": mode, "add_mode_label": mode_label(mode), "add_price": round(entry_add, 10), "new_stop_loss": round(sl, 10), "breakthrough_price": float(breakthrough_price) if breakthrough_price not in (None, "") else None, "initial_take_profit": float(initial_take_profit), "risk_percent": float(risk_percent), "risk_budget_usdt": round(risk_budget, 4), "add_amount_raw": q2, "qty_existing": float(qty_existing), "entry_existing": float(entry_existing), "qty_after": new_qty, "avg_entry_after": round(new_avg, 10), "loss_at_sl_usdt": round(loss_sl, 4), "reward_at_tp_usdt": round(reward_tp, 4), "legs_done": int(legs_done), "leg_index_next": int(legs_done) + 1, "fib_upper": fib_upper, "fib_lower": fib_lower, "contract_size": cs, }, None