# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """顺势加仓(滚仓):纯计算与校验,期货版(手数整数、乘数计入盈亏)。""" from __future__ import annotations import math from typing import Any, Optional, Tuple from modules.trading.position_sizing import MODE_AMOUNT from strategy.fib_lib import calc_fib_plan ROLL_MAX_LEGS_LONG = 3 ROLL_MAX_LEGS_SHORT = 3 ROLL_STOP_OFFSET_PCT_DEFAULT = 1.0 ADD_MODE_MARKET = "market" ADD_MODE_FIB_618 = "fib_618" ADD_MODE_FIB_786 = "fib_786" ADD_MODE_BREAKOUT = "breakout" FIB_MODES = frozenset({ADD_MODE_FIB_618, ADD_MODE_FIB_786}) PENDING_MODES = frozenset({ADD_MODE_FIB_618, ADD_MODE_FIB_786, ADD_MODE_BREAKOUT}) ADD_MODE_LABELS = { ADD_MODE_MARKET: "市价加仓", ADD_MODE_FIB_618: "斐波0.618", ADD_MODE_FIB_786: "斐波0.786", ADD_MODE_BREAKOUT: "突破加仓", } LEG_STATUS_PENDING = "pending" LEG_STATUS_FILLED = "filled" LEG_STATUS_CANCELLED = "cancelled" LEG_STATUS_INVALIDATED = "invalidated" def add_mode_label(mode: str) -> str: return ADD_MODE_LABELS.get((mode or "").strip().lower(), mode or "") def fib_ratio_from_mode(mode: str) -> Optional[float]: m = (mode or "").strip().lower() if m in (ADD_MODE_FIB_618, "618", "0.618"): return 0.618 if m in (ADD_MODE_FIB_786, "786", "0.786"): return 0.786 return None def fib_limit_entry(direction: str, upper: float, lower: float, mode: str) -> Tuple[Optional[float], Optional[str]]: ratio = fib_ratio_from_mode(mode) if ratio is None: return None, "斐波档位无效" h, l = float(upper), float(lower) if h <= l: return None, "上沿须大于下沿" direction = (direction or "long").strip().lower() plan = calc_fib_plan(direction, h, l, ratio) if not plan: return None, "无法计算斐波限价" entry, _sl, _tp = plan return float(entry), None def max_roll_legs(direction: str) -> int: return ROLL_MAX_LEGS_LONG if (direction or "long").strip().lower() == "long" else ROLL_MAX_LEGS_SHORT def lots_precise(raw: float) -> int: if raw is None or raw < 1: return 0 return max(1, int(math.floor(float(raw)))) def unified_stop_from_avg(direction: str, avg: float, offset_pct: float) -> float: avg_f = float(avg) pct = float(offset_pct) / 100.0 direction = (direction or "long").strip().lower() if direction == "short": return avg_f * (1.0 + pct) return avg_f * (1.0 - pct) def avg_entry_after_add(qty_existing: float, entry_existing: float, add_qty: float, add_price: float) -> float: q1, e1, q2, e2 = float(qty_existing), float(entry_existing), float(add_qty), float(add_price) total = q1 + q2 return (q1 * e1 + q2 * e2) / total if total > 0 else 0.0 def solve_add_lots_for_total_risk( direction: str, qty_existing: float, entry_existing: float, add_price: float, new_stop: float, risk_budget: float, mult: int, ) -> Tuple[Optional[int], Optional[str]]: """方案 C:合并持仓打到新止损 S 时总亏损 ≤ B。""" q1, e1, e2, sl, b = float(qty_existing), float(entry_existing), float(add_price), float(new_stop), float(risk_budget) m = float(mult) direction = (direction or "long").strip().lower() if direction == "short": denom = (sl - e2) * m numer = b - q1 * (sl - e1) * m else: denom = (e2 - sl) * m numer = b - q1 * (e1 - sl) * m if denom <= 0: return None, "止损与加仓价关系无效" q2 = numer / denom lots = lots_precise(q2) if lots < 1: return None, "已满足风险上限或无法再加" return lots, None def roll_eligibility_error( *, sizing_mode: str, monitor: dict, has_active_trend: bool, legs_done: int = 0, has_pending_leg: bool = False, ) -> Optional[str]: if normalize_sizing_mode(sizing_mode) != MODE_AMOUNT: return "仅固定金额(以损定仓)模式可滚仓" if has_active_trend: return "趋势回调运行中,不可滚仓" if not monitor or (monitor.get("status") or "").strip().lower() != "active": return "无有效持仓监控" if int(monitor.get("trailing_be") or 0): return "移动保本持仓不可滚仓" direction = (monitor.get("direction") or "long").strip().lower() if legs_done >= max_roll_legs(direction): return f"滚仓已达 {max_roll_legs(direction)} 次上限" if has_pending_leg: return "已有监控中的加仓腿,请等待成交或删除后再提交" if int(monitor.get("lots") or 0) < 1: return "持仓手数为 0" if not float(monitor.get("take_profit") or 0): return "首仓须设置止盈(移动保本不可滚仓)" return None def normalize_sizing_mode(raw: str) -> str: from modules.trading.position_sizing import normalize_sizing_mode as _norm return _norm(raw) def resolve_risk_percent(monitor: dict, *, default: float) -> float: try: rp = float(monitor.get("risk_percent") or 0) if rp > 0: return rp except (TypeError, ValueError): pass return float(default) def validate_roll_geometry( direction: str, add_mode: str, new_stop: float, *, mark_price: float, limit_price: Optional[float] = None, breakthrough_price: Optional[float] = None, at_trigger: bool = False, off_session_pending: bool = False, ) -> Optional[str]: """几何校验。 做多斐波(回调):止损 < 触发价 < 当前价 做多突破(向上):止损 < 突破价 < 当前价 做空斐波(反弹):当前价 < 触发价 < 止损 做空突破(向下):突破价 < 当前价 < 止损(提交时);触发后当前价可 ≤ 突破价 """ direction = (direction or "long").strip().lower() mode = (add_mode or ADD_MODE_MARKET).strip().lower() sl = float(new_stop) mark = float(mark_price) if sl <= 0 or mark <= 0: return "止损或参考价无效" if mode == ADD_MODE_MARKET: if direction == "long" and sl >= mark: return "做多:新止损须低于当前价" if direction == "short" and sl <= mark: return "做空:新止损须高于当前价" return None trigger = None if mode in FIB_MODES: trigger = float(limit_price or 0) if trigger <= 0: return "须填写斐波触发价" if direction == "long": if not (sl < trigger < mark): return "做多斐波:须满足 止损 < 触发价 < 当前价" else: if not (mark < trigger < sl): return "做空斐波:须满足 当前价 < 触发价 < 止损" return None if mode == ADD_MODE_BREAKOUT: trigger = float(breakthrough_price or 0) if trigger <= 0: return "须填写突破价" if off_session_pending: if direction == "long" and not (sl < trigger): return "做多突破:休盘提交须满足 止损 < 突破价" if direction == "short" and not (trigger < sl): return "做空突破:休盘提交须满足 突破价 < 止损" return None if at_trigger: if direction == "long": if not (sl < trigger <= mark): return "做多突破:触发时须满足 止损 < 突破价 ≤ 当前价" else: if not (trigger < sl and mark < sl): return "做空突破:触发时须满足 突破价 < 止损且当前价 < 止损" return None if direction == "long": if not (sl < trigger < mark): return "做多突破:须满足 止损 < 突破价 < 当前价" else: if not (trigger < mark < sl): return "做空突破:须满足 突破价 < 当前价 < 止损" return None return "加仓方式无效" def detect_mark_cross( direction: str, add_mode: str, prev_mark: float, mark: float, trigger_price: float, ) -> bool: """标记价穿越触发价(上一 tick 与当前 tick 比较)。""" direction = (direction or "long").strip().lower() mode = (add_mode or "").strip().lower() p = float(trigger_price) prev_m = float(prev_mark) cur_m = float(mark) if p <= 0 or prev_m <= 0 or cur_m <= 0: return False if mode in FIB_MODES: if direction == "long": return prev_m > p and cur_m <= p return prev_m < p and cur_m >= p if mode == ADD_MODE_BREAKOUT: if direction == "long": return prev_m < p and cur_m >= p return prev_m > p and cur_m <= p return False def preview_roll( *, direction: str, symbol: str, qty_existing: float, entry_existing: float, initial_take_profit: float, add_mode: str, new_stop_loss: float, risk_budget: float, mult: int, mark_price: Optional[float] = None, add_price: Optional[float] = None, limit_price: Optional[float] = None, breakthrough_price: Optional[float] = None, fib_upper: Optional[float] = None, fib_lower: Optional[float] = None, legs_done: int = 0, at_trigger: bool = False, off_session_pending: bool = False, ) -> Tuple[Optional[dict[str, Any]], Optional[str]]: direction = (direction or "long").strip().lower() if legs_done >= max_roll_legs(direction): return None, f"滚仓已达 {max_roll_legs(direction)} 次上限" mode = (add_mode or ADD_MODE_MARKET).strip().lower() mark = float(mark_price or add_price or 0) if mark <= 0 and mode == ADD_MODE_BREAKOUT and off_session_pending: mark = float(breakthrough_price or 0) if mark <= 0: return None, "需要有效参考价" sl = float(new_stop_loss) tp = float(initial_take_profit) if sl <= 0 or tp <= 0: return None, "止损/止盈无效" entry_add = mark mode_label = add_mode_label(mode) trigger_price = mark is_pending = mode in PENDING_MODES if mode == ADD_MODE_MARKET: entry_add = mark elif mode in FIB_MODES: if limit_price and float(limit_price) > 0: entry_add = float(limit_price) trigger_price = entry_add elif fib_upper is not None and fib_lower is not None: entry_add, err = fib_limit_entry(direction, float(fib_upper), float(fib_lower), mode) if err: return None, err trigger_price = entry_add else: return None, "斐波须填触发价或上沿/下沿" elif mode == ADD_MODE_BREAKOUT: if not breakthrough_price or float(breakthrough_price) <= 0: return None, "须填写突破价" entry_add = float(breakthrough_price) trigger_price = entry_add else: return None, "加仓方式无效" geom_err = validate_roll_geometry( direction, mode, sl, mark_price=mark, limit_price=trigger_price if mode in FIB_MODES else None, breakthrough_price=trigger_price if mode == ADD_MODE_BREAKOUT else None, at_trigger=at_trigger, off_session_pending=off_session_pending and is_pending, ) if geom_err: return None, geom_err budget = float(risk_budget) if budget <= 0: return None, "固定金额无效" q2, err = solve_add_lots_for_total_risk( direction, qty_existing, entry_existing, entry_add, sl, budget, mult, ) if err: return None, err new_qty = qty_existing + q2 new_avg = avg_entry_after_add(qty_existing, entry_existing, q2, entry_add) m = float(mult) if direction == "long": loss_at_sl = (new_avg - sl) * new_qty * m reward_at_tp = (tp - new_avg) * new_qty * m else: loss_at_sl = (sl - new_avg) * new_qty * m reward_at_tp = (new_avg - tp) * new_qty * m return { "symbol": symbol, "direction": direction, "add_mode": mode, "add_mode_label": mode_label, "is_pending": is_pending, "add_price": round(entry_add, 4), "trigger_price": round(trigger_price, 4), "limit_price": round(trigger_price, 4) if mode in FIB_MODES else None, "breakthrough_price": round(trigger_price, 4) if mode == ADD_MODE_BREAKOUT else None, "new_stop_loss": round(sl, 4), "initial_take_profit": tp, "risk_budget": round(budget, 2), "fixed_amount": round(budget, 2), "add_lots": q2, "qty_after": int(new_qty), "avg_entry_after": round(new_avg, 4), "loss_at_sl": round(loss_at_sl, 2), "reward_at_tp": round(reward_at_tp, 2), "legs_done": legs_done, "mark_price": round(mark, 4), }, None