"""趋势回调策略:纯计算与校验(无 ccxt / Flask)。各所 adapter 负责张数精度与下单。""" from __future__ import annotations import json from typing import Any, Callable, Optional, Tuple AmountPreciseFn = Callable[[str, float], Optional[float]] def calc_risk_fraction(direction: str, entry_price: float, stop_loss: float) -> Optional[float]: try: entry = float(entry_price) sl = float(stop_loss) if entry <= 0 or sl <= 0: return None if (direction or "long").strip().lower() == "short": risk = sl - entry else: risk = entry - sl if risk <= 0: return None return risk / entry except (TypeError, ValueError): return None def validate_trend_bounds(direction: str, stop_loss: float, add_upper: float) -> Optional[str]: direction = (direction or "long").strip().lower() if direction == "long": if not (float(stop_loss) < float(add_upper)): return "做多:止损价须低于补仓上沿" else: if not (float(stop_loss) > float(add_upper)): return "做空:止损价须高于补仓下沿" return None def build_grid_prices(direction: str, sl: float, upper: float, n_legs: int) -> list[float]: """在 (止损, 补仓区间远侧边界) 内生成 n_legs 个触发价(不含端点)。""" sl, upper = float(sl), float(upper) out: list[float] = [] if n_legs <= 0: return out direction = (direction or "long").strip().lower() if direction == "long": if upper <= sl: return out span = upper - sl for i in range(1, n_legs + 1): t = i / float(n_legs + 1) out.append(sl + t * span) out.sort(reverse=True) else: if sl <= upper: return out span = sl - upper for i in range(1, n_legs + 1): t = i / float(n_legs + 1) out.append(upper + t * span) out.sort() return [round(p, 10) for p in out] def pick_dca_legs_and_per_leg( exchange_symbol: str, remainder_total: float, want_legs: int, amount_precise: AmountPreciseFn, min_amount: float = 0.0, ) -> Tuple[int, float]: """按最小张数约束自动减少档位数。返回 (有效档数, 每档参考张数)。""" legs = max(1, int(want_legs)) rem = float(remainder_total) min_amt = float(min_amount or 0.0) while legs >= 1: per = rem / legs per_p = amount_precise(exchange_symbol, per) if per_p is None or per_p <= 0: legs -= 1 continue if min_amt and per_p + 1e-12 < min_amt: legs -= 1 continue return legs, per_p one = amount_precise(exchange_symbol, rem) if one is None or one <= 0: return 0, 0.0 return 1, one def build_leg_amounts_json( exchange_symbol: str, remainder_total: float, want_legs: int, amount_precise: AmountPreciseFn, min_amount: float = 0.0, ) -> Tuple[int, str, float]: """拆分补仓张数 JSON。返回 (档位数, json列表, 每档参考)。""" rem = amount_precise(exchange_symbol, float(remainder_total)) if rem is None or rem <= 0: return 0, "[]", 0.0 n, _ = pick_dca_legs_and_per_leg(exchange_symbol, rem, want_legs, amount_precise, min_amount) if n <= 0: return 0, "[]", 0.0 if n <= 1: one = amount_precise(exchange_symbol, rem) if one is None or one <= 0: return 0, "[]", 0.0 return 1, json.dumps([one]), one unit = amount_precise(exchange_symbol, rem / n) if unit is None or unit <= 0: one = amount_precise(exchange_symbol, rem) if one is None or one <= 0: return 0, "[]", 0.0 return 1, json.dumps([one]), one parts: list[float] = [] acc = 0.0 for _ in range(n - 1): parts.append(unit) acc += unit last = amount_precise(exchange_symbol, max(0.0, rem - acc)) if last is None or last <= 0: one = amount_precise(exchange_symbol, rem) if one is None or one <= 0: return 0, "[]", 0.0 return 1, json.dumps([one]), one parts.append(last) return n, json.dumps(parts), unit def compute_trend_plan_core( *, direction: str, stop_loss: float, add_upper: float, risk_percent: float, snapshot_usdt: float, leverage: int, live_price: float, target_order_amount: float, exchange_symbol: str, dca_legs: int, amount_precise: AmountPreciseFn, min_amount: float = 0.0, full_margin_buffer_ratio: float = 0.95, ) -> Tuple[Optional[dict[str, Any]], Optional[str]]: """在已有 target_order_amount 时组装预览 payload(张数由调用方 prepare_order_amount 计算)。""" rf = calc_risk_fraction(direction, add_upper, stop_loss) if rf is None or rf <= 0: return None, "止损与补仓区间边界组合无法计算风险比例" risk_budget = float(snapshot_usdt) * (float(risk_percent) / 100.0) notional = risk_budget / rf margin_plan = notional / float(leverage) margin_plan = min(margin_plan, float(snapshot_usdt) * float(full_margin_buffer_ratio)) if margin_plan <= 0: return None, "计划保证金过小" first_amt = amount_precise(exchange_symbol, float(target_order_amount) * 0.5) if first_amt is None or first_amt <= 0: return None, "首仓张数过小(低于交易所最小张数),请提高风险比例或杠杆" remainder_total = amount_precise(exchange_symbol, max(0.0, float(target_order_amount) - float(first_amt))) if remainder_total is None: remainder_total = 0.0 n_legs, leg_json, per_ref = build_leg_amounts_json( exchange_symbol, remainder_total, dca_legs, amount_precise, min_amount ) if n_legs <= 0: return None, "剩余计划张数不足以拆出补仓档,请提高风险比例或放宽止损与补仓区间间距" grid = build_grid_prices(direction, stop_loss, add_upper, n_legs) if len(grid) != n_legs: return None, "补仓网格生成失败" try: leg_list = json.loads(leg_json) except Exception: leg_list = [] payload = { "direction": direction, "stop_loss": float(stop_loss), "add_upper": float(add_upper), "risk_percent": float(risk_percent), "snapshot_available_usdt": float(snapshot_usdt), "live_price_ref": float(live_price), "plan_margin_capital": float(margin_plan), "target_order_amount": float(target_order_amount), "first_order_amount": float(first_amt), "remainder_total": float(remainder_total), "dca_legs": int(n_legs), "per_leg_amount": float(per_ref), "grid_prices_json": json.dumps(grid), "leg_amounts_json": leg_json, "grid": grid, "leg_amounts": leg_list, } return payload, None def calc_planned_reward_risk_ratio( direction: str, entry_price: float, stop_loss: float, take_profit: float ) -> Optional[float]: """盈亏比(reward/risk),与四所 calc_rr_ratio 口径一致。""" try: entry = float(entry_price) sl = float(stop_loss) tp = float(take_profit) if entry <= 0 or sl <= 0 or tp <= 0: return None direction = (direction or "long").strip().lower() if direction == "short": risk = sl - entry reward = entry - tp else: risk = entry - sl reward = tp - entry if risk <= 0 or reward <= 0: return None return round(reward / risk, 4) except (TypeError, ValueError): return None def calc_take_profit_for_rr( direction: str, entry_price: float, stop_loss: float, reward_risk_ratio: float ) -> Optional[float]: """按统一止损与目标 RR 反推止盈价。""" try: entry = float(entry_price) sl = float(stop_loss) rr = float(reward_risk_ratio) if entry <= 0 or sl <= 0 or rr <= 0: return None direction = (direction or "long").strip().lower() if direction == "short": risk = sl - entry if risk <= 0: return None return round(entry - rr * risk, 10) risk = entry - sl if risk <= 0: return None return round(entry + rr * risk, 10) except (TypeError, ValueError): return None def calc_risk_budget_usdt(snapshot_usdt: float, risk_percent: float) -> Optional[float]: """计划止损金额 U = 可用快照 × 风险比例。""" try: snap = float(snapshot_usdt) rp = float(risk_percent) if snap <= 0 or rp <= 0: return None return round(snap * rp / 100.0, 4) except (TypeError, ValueError): return None def calc_money_reward_risk_ratio(profit_u: float, risk_u: float) -> Optional[float]: """金额盈亏比 = 止盈盈利 U / 止损金额 U。""" try: r = float(risk_u) p = float(profit_u) if r <= 0: return None return round(p / r, 4) except (TypeError, ValueError): return None def calc_tp_profit_usdt( direction: str, avg_entry: float, take_profit_price: float, contracts: float, contract_size: float = 1.0, ) -> Optional[float]: """到达止盈价时,按累计张数与加仓后均价的盈利 U。""" try: from hub_position_metrics import estimate_linear_swap_upnl_usdt return estimate_linear_swap_upnl_usdt( direction, float(avg_entry), float(take_profit_price), float(contracts), float(contract_size) ) except (TypeError, ValueError): return None def weighted_avg_entry(legs: list[tuple[float, float]]) -> Optional[float]: """按 (成交价, 张数) 加权均价。""" total = 0.0 cost = 0.0 for price, amount in legs or []: try: p = float(price) a = float(amount) except (TypeError, ValueError): continue if a <= 0: continue total += a cost += p * a if total <= 0: return None return cost / total def build_trend_preview_level_rows(preview: dict) -> tuple[dict, list[dict]]: """ 预览:表单止盈价下每档累计持仓的盈利 U;止损金额 = 快照×风险;盈亏比按金额对比。 返回 (增强后的 preview 字段, 表格行列表,含首仓行)。 """ p = dict(preview or {}) direction = (p.get("direction") or "long").strip().lower() try: ref = float(p.get("live_price_ref")) sl = float(p.get("stop_loss")) user_tp = float(p.get("take_profit")) first_amt = float(p.get("first_order_amount")) snapshot = float(p.get("snapshot_available_usdt")) risk_percent = float(p.get("risk_percent")) except (TypeError, ValueError): return p, [] risk_u = calc_risk_budget_usdt(snapshot, risk_percent) if risk_u is None or risk_u <= 0: return p, [] try: contract_size = float(p.get("contract_size") or 1.0) if contract_size <= 0: contract_size = 1.0 except (TypeError, ValueError): contract_size = 1.0 p["preview_risk_amount_u"] = risk_u p["preview_take_profit_price"] = user_tp p["preview_unified_stop_loss"] = sl try: grid = json.loads(p.get("grid_prices_json") or "[]") if not isinstance(grid, list): grid = [] except Exception: grid = [] try: leg_amounts = json.loads(p.get("leg_amounts_json") or "[]") if not isinstance(leg_amounts, list): leg_amounts = [] except Exception: leg_amounts = [] def _row_dict( *, i: int, label: str, price: float, leg_contracts: float, cum_contracts: float, avg: float, is_first: bool, ) -> dict: profit_u = calc_tp_profit_usdt(direction, avg, user_tp, cum_contracts, contract_size) rr_money = calc_money_reward_risk_ratio(profit_u, risk_u) if profit_u is not None else None return { "i": i, "label": label, "price": price, "contracts": leg_contracts, "cum_contracts": cum_contracts, "avg_entry": avg, "take_profit_price": user_tp, "profit_u": profit_u, "risk_u": risk_u, "rr": rr_money, "stop_loss_price": sl, "take_profit": profit_u, "stop_loss": risk_u, "is_first": is_first, } cum_contracts = first_amt first_profit = calc_tp_profit_usdt(direction, ref, user_tp, cum_contracts, contract_size) first_rr = calc_money_reward_risk_ratio(first_profit, risk_u) if first_profit is not None else None p["preview_first_profit_u"] = first_profit p["preview_target_rr"] = first_rr p["preview_first_take_profit"] = user_tp rows: list[dict] = [ _row_dict( i=0, label="首仓", price=ref, leg_contracts=first_amt, cum_contracts=cum_contracts, avg=ref, is_first=True, ) ] accumulated: list[tuple[float, float]] = [(ref, first_amt)] for i, pair in enumerate(zip(grid, leg_amounts), 1): try: price = float(pair[0]) leg_contracts = float(pair[1]) except (TypeError, ValueError): continue accumulated.append((price, leg_contracts)) avg = weighted_avg_entry(accumulated) if avg is None: continue cum_contracts += leg_contracts rows.append( _row_dict( i=i, label=f"补仓{i}", price=price, leg_contracts=leg_contracts, cum_contracts=cum_contracts, avg=avg, is_first=False, ) ) return p, rows def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict]: """运行中计划:为 dca_levels 补充加仓后均价、止盈盈利 U、止损金额 U、金额盈亏比。""" if not levels: return levels p = plan or {} direction = (p.get("direction") or "long").strip().lower() try: sl = float(p.get("stop_loss")) user_tp = float(p.get("take_profit")) first_amt = float(p.get("first_order_amount")) snapshot = float(p.get("snapshot_available_usdt")) risk_percent = float(p.get("risk_percent")) except (TypeError, ValueError): return levels risk_u = calc_risk_budget_usdt(snapshot, risk_percent) if risk_u is None or risk_u <= 0: return levels ref_raw = p.get("live_price_ref") if ref_raw in (None, ""): ref_raw = p.get("avg_entry_price") try: ref = float(ref_raw) except (TypeError, ValueError): return levels try: contract_size = float(p.get("contract_size") or 1.0) if contract_size <= 0: contract_size = 1.0 except (TypeError, ValueError): contract_size = 1.0 out: list[dict] = [] accumulated: list[tuple[float, float]] = [] cum_contracts = 0.0 for lv in levels: row = dict(lv) is_first = row.get("leg_key") == "first" or row.get("label") == "首仓" or row.get("i") == 0 if is_first: amt = row.get("contracts") try: amt_f = float(amt if amt is not None else first_amt) except (TypeError, ValueError): amt_f = first_amt accumulated = [(ref, amt_f)] cum_contracts = amt_f row["avg_entry"] = ref else: price = row.get("price") contracts = row.get("contracts") if price is not None and contracts is not None: try: leg_contracts = float(contracts) accumulated.append((float(price), leg_contracts)) avg = weighted_avg_entry(accumulated) if avg is not None: row["avg_entry"] = avg cum_contracts += leg_contracts except (TypeError, ValueError): pass avg_entry = row.get("avg_entry") if avg_entry is not None: profit_u = calc_tp_profit_usdt( direction, float(avg_entry), user_tp, cum_contracts, contract_size ) row["take_profit_price"] = user_tp row["profit_u"] = profit_u row["risk_u"] = risk_u row["rr"] = calc_money_reward_risk_ratio(profit_u, risk_u) if profit_u is not None else None row["take_profit"] = profit_u row["stop_loss"] = risk_u row["stop_loss_price"] = sl out.append(row) return out