Files
crypto_monitor/strategy_trend_lib.py
T

533 lines
18 KiB
Python

"""趋势回调策略:纯计算与校验(无 ccxt / Flask)。各所 adapter 负责张数精度与下单。"""
from __future__ import annotations
import json
from typing import Any, Callable, Optional, Tuple
AmountPreciseFn = Callable[[str, float], Optional[float]]
def calc_risk_fraction(direction: str, entry_price: float, stop_loss: float) -> Optional[float]:
try:
entry = float(entry_price)
sl = float(stop_loss)
if entry <= 0 or sl <= 0:
return None
if (direction or "long").strip().lower() == "short":
risk = sl - entry
else:
risk = entry - sl
if risk <= 0:
return None
return risk / entry
except (TypeError, ValueError):
return None
def trend_effective_margin_capital(plan: dict) -> float:
"""按已开仓张数占计划总张数比例折算保证金(首仓/部分补仓时的盈亏估算)。"""
try:
plan_margin = float(plan.get("plan_margin_capital") or 0)
target = float(plan.get("target_order_amount") or 0)
open_amt = float(plan.get("order_amount_open") or 0)
except (TypeError, ValueError):
return float((plan or {}).get("plan_margin_capital") or 0)
if plan_margin <= 0:
return 0.0
if target > 0 and open_amt > 0:
return round(plan_margin * min(1.0, open_amt / target), 8)
try:
first = float(plan.get("first_order_amount") or 0)
except (TypeError, ValueError):
first = 0.0
if target > 0 and first > 0:
return round(plan_margin * min(1.0, first / target), 8)
return plan_margin
def trend_dca_level_reached(direction: str, mark_price: float, level: float) -> bool:
"""做空:价升触达/越过档位即应补仓;做多:价跌触达/越过档位。"""
d = (direction or "long").strip().lower()
try:
pf = float(mark_price)
lv = float(level)
except (TypeError, ValueError):
return False
if d == "long":
return pf <= lv
return pf >= lv
def validate_trend_bounds(direction: str, stop_loss: float, add_upper: float) -> Optional[str]:
direction = (direction or "long").strip().lower()
if direction == "long":
if not (float(stop_loss) < float(add_upper)):
return "做多:止损价须低于补仓上沿"
else:
if not (float(stop_loss) > float(add_upper)):
return "做空:止损价须高于补仓下沿"
return None
def build_grid_prices(direction: str, sl: float, upper: float, n_legs: int) -> list[float]:
"""在 (止损, 补仓区间远侧边界) 内生成 n_legs 个触发价(不含端点)。"""
sl, upper = float(sl), float(upper)
out: list[float] = []
if n_legs <= 0:
return out
direction = (direction or "long").strip().lower()
if direction == "long":
if upper <= sl:
return out
span = upper - sl
for i in range(1, n_legs + 1):
t = i / float(n_legs + 1)
out.append(sl + t * span)
out.sort(reverse=True)
else:
if sl <= upper:
return out
span = sl - upper
for i in range(1, n_legs + 1):
t = i / float(n_legs + 1)
out.append(upper + t * span)
out.sort()
return [round(p, 10) for p in out]
def pick_dca_legs_and_per_leg(
exchange_symbol: str,
remainder_total: float,
want_legs: int,
amount_precise: AmountPreciseFn,
min_amount: float = 0.0,
) -> Tuple[int, float]:
"""按最小张数约束自动减少档位数。返回 (有效档数, 每档参考张数)。"""
legs = max(1, int(want_legs))
rem = float(remainder_total)
min_amt = float(min_amount or 0.0)
while legs >= 1:
per = rem / legs
per_p = amount_precise(exchange_symbol, per)
if per_p is None or per_p <= 0:
legs -= 1
continue
if min_amt and per_p + 1e-12 < min_amt:
legs -= 1
continue
return legs, per_p
one = amount_precise(exchange_symbol, rem)
if one is None or one <= 0:
return 0, 0.0
return 1, one
def build_leg_amounts_json(
exchange_symbol: str,
remainder_total: float,
want_legs: int,
amount_precise: AmountPreciseFn,
min_amount: float = 0.0,
) -> Tuple[int, str, float]:
"""拆分补仓张数 JSON。返回 (档位数, json列表, 每档参考)。"""
rem = amount_precise(exchange_symbol, float(remainder_total))
if rem is None or rem <= 0:
return 0, "[]", 0.0
n, _ = pick_dca_legs_and_per_leg(exchange_symbol, rem, want_legs, amount_precise, min_amount)
if n <= 0:
return 0, "[]", 0.0
if n <= 1:
one = amount_precise(exchange_symbol, rem)
if one is None or one <= 0:
return 0, "[]", 0.0
return 1, json.dumps([one]), one
unit = amount_precise(exchange_symbol, rem / n)
if unit is None or unit <= 0:
one = amount_precise(exchange_symbol, rem)
if one is None or one <= 0:
return 0, "[]", 0.0
return 1, json.dumps([one]), one
parts: list[float] = []
acc = 0.0
for _ in range(n - 1):
parts.append(unit)
acc += unit
last = amount_precise(exchange_symbol, max(0.0, rem - acc))
if last is None or last <= 0:
one = amount_precise(exchange_symbol, rem)
if one is None or one <= 0:
return 0, "[]", 0.0
return 1, json.dumps([one]), one
parts.append(last)
return n, json.dumps(parts), unit
def compute_trend_plan_core(
*,
direction: str,
stop_loss: float,
add_upper: float,
risk_percent: float,
snapshot_usdt: float,
leverage: int,
live_price: float,
target_order_amount: float,
exchange_symbol: str,
dca_legs: int,
amount_precise: AmountPreciseFn,
min_amount: float = 0.0,
full_margin_buffer_ratio: float = 0.95,
) -> Tuple[Optional[dict[str, Any]], Optional[str]]:
"""在已有 target_order_amount 时组装预览 payload(张数由调用方 prepare_order_amount 计算)。"""
rf = calc_risk_fraction(direction, add_upper, stop_loss)
if rf is None or rf <= 0:
return None, "止损与补仓区间边界组合无法计算风险比例"
risk_budget = float(snapshot_usdt) * (float(risk_percent) / 100.0)
notional = risk_budget / rf
margin_plan = notional / float(leverage)
margin_plan = min(margin_plan, float(snapshot_usdt) * float(full_margin_buffer_ratio))
if margin_plan <= 0:
return None, "计划保证金过小"
first_amt = amount_precise(exchange_symbol, float(target_order_amount) * 0.5)
if first_amt is None or first_amt <= 0:
return None, "首仓张数过小(低于交易所最小张数),请提高风险比例或杠杆"
remainder_total = amount_precise(exchange_symbol, max(0.0, float(target_order_amount) - float(first_amt)))
if remainder_total is None:
remainder_total = 0.0
n_legs, leg_json, per_ref = build_leg_amounts_json(
exchange_symbol, remainder_total, dca_legs, amount_precise, min_amount
)
if n_legs <= 0:
return None, "剩余计划张数不足以拆出补仓档,请提高风险比例或放宽止损与补仓区间间距"
grid = build_grid_prices(direction, stop_loss, add_upper, n_legs)
if len(grid) != n_legs:
return None, "补仓网格生成失败"
try:
leg_list = json.loads(leg_json)
except Exception:
leg_list = []
payload = {
"direction": direction,
"stop_loss": float(stop_loss),
"add_upper": float(add_upper),
"risk_percent": float(risk_percent),
"snapshot_available_usdt": float(snapshot_usdt),
"live_price_ref": float(live_price),
"plan_margin_capital": float(margin_plan),
"target_order_amount": float(target_order_amount),
"first_order_amount": float(first_amt),
"remainder_total": float(remainder_total),
"dca_legs": int(n_legs),
"per_leg_amount": float(per_ref),
"grid_prices_json": json.dumps(grid),
"leg_amounts_json": leg_json,
"grid": grid,
"leg_amounts": leg_list,
}
return payload, None
def calc_planned_reward_risk_ratio(
direction: str, entry_price: float, stop_loss: float, take_profit: float
) -> Optional[float]:
"""盈亏比(reward/risk),与四所 calc_rr_ratio 口径一致。"""
try:
entry = float(entry_price)
sl = float(stop_loss)
tp = float(take_profit)
if entry <= 0 or sl <= 0 or tp <= 0:
return None
direction = (direction or "long").strip().lower()
if direction == "short":
risk = sl - entry
reward = entry - tp
else:
risk = entry - sl
reward = tp - entry
if risk <= 0 or reward <= 0:
return None
return round(reward / risk, 4)
except (TypeError, ValueError):
return None
def calc_take_profit_for_rr(
direction: str, entry_price: float, stop_loss: float, reward_risk_ratio: float
) -> Optional[float]:
"""按统一止损与目标 RR 反推止盈价。"""
try:
entry = float(entry_price)
sl = float(stop_loss)
rr = float(reward_risk_ratio)
if entry <= 0 or sl <= 0 or rr <= 0:
return None
direction = (direction or "long").strip().lower()
if direction == "short":
risk = sl - entry
if risk <= 0:
return None
return round(entry - rr * risk, 10)
risk = entry - sl
if risk <= 0:
return None
return round(entry + rr * risk, 10)
except (TypeError, ValueError):
return None
def calc_risk_budget_usdt(snapshot_usdt: float, risk_percent: float) -> Optional[float]:
"""计划止损金额 U = 可用快照 × 风险比例。"""
try:
snap = float(snapshot_usdt)
rp = float(risk_percent)
if snap <= 0 or rp <= 0:
return None
return round(snap * rp / 100.0, 4)
except (TypeError, ValueError):
return None
def calc_money_reward_risk_ratio(profit_u: float, risk_u: float) -> Optional[float]:
"""金额盈亏比 = 止盈盈利 U / 止损金额 U。"""
try:
r = float(risk_u)
p = float(profit_u)
if r <= 0:
return None
return round(p / r, 4)
except (TypeError, ValueError):
return None
def calc_tp_profit_usdt(
direction: str,
avg_entry: float,
take_profit_price: float,
contracts: float,
contract_size: float = 1.0,
) -> Optional[float]:
"""到达止盈价时,按累计张数与加仓后均价的盈利 U。"""
try:
from hub_position_metrics import estimate_linear_swap_upnl_usdt
return estimate_linear_swap_upnl_usdt(
direction, float(avg_entry), float(take_profit_price), float(contracts), float(contract_size)
)
except (TypeError, ValueError):
return None
def weighted_avg_entry(legs: list[tuple[float, float]]) -> Optional[float]:
"""按 (成交价, 张数) 加权均价。"""
total = 0.0
cost = 0.0
for price, amount in legs or []:
try:
p = float(price)
a = float(amount)
except (TypeError, ValueError):
continue
if a <= 0:
continue
total += a
cost += p * a
if total <= 0:
return None
return cost / total
def build_trend_preview_level_rows(preview: dict) -> tuple[dict, list[dict]]:
"""
预览:表单止盈价下每档累计持仓的盈利 U;止损金额 = 快照×风险;盈亏比按金额对比。
返回 (增强后的 preview 字段, 表格行列表,含首仓行)。
"""
p = dict(preview or {})
direction = (p.get("direction") or "long").strip().lower()
try:
ref = float(p.get("live_price_ref"))
sl = float(p.get("stop_loss"))
user_tp = float(p.get("take_profit"))
first_amt = float(p.get("first_order_amount"))
snapshot = float(p.get("snapshot_available_usdt"))
risk_percent = float(p.get("risk_percent"))
except (TypeError, ValueError):
return p, []
risk_u = calc_risk_budget_usdt(snapshot, risk_percent)
if risk_u is None or risk_u <= 0:
return p, []
try:
contract_size = float(p.get("contract_size") or 1.0)
if contract_size <= 0:
contract_size = 1.0
except (TypeError, ValueError):
contract_size = 1.0
p["preview_risk_amount_u"] = risk_u
p["preview_take_profit_price"] = user_tp
p["preview_unified_stop_loss"] = sl
try:
grid = json.loads(p.get("grid_prices_json") or "[]")
if not isinstance(grid, list):
grid = []
except Exception:
grid = []
try:
leg_amounts = json.loads(p.get("leg_amounts_json") or "[]")
if not isinstance(leg_amounts, list):
leg_amounts = []
except Exception:
leg_amounts = []
def _row_dict(
*,
i: int,
label: str,
price: float,
leg_contracts: float,
cum_contracts: float,
avg: float,
is_first: bool,
) -> dict:
profit_u = calc_tp_profit_usdt(direction, avg, user_tp, cum_contracts, contract_size)
rr_money = calc_money_reward_risk_ratio(profit_u, risk_u) if profit_u is not None else None
return {
"i": i,
"label": label,
"price": price,
"contracts": leg_contracts,
"cum_contracts": cum_contracts,
"avg_entry": avg,
"take_profit_price": user_tp,
"profit_u": profit_u,
"risk_u": risk_u,
"rr": rr_money,
"stop_loss_price": sl,
"take_profit": profit_u,
"stop_loss": risk_u,
"is_first": is_first,
}
cum_contracts = first_amt
first_profit = calc_tp_profit_usdt(direction, ref, user_tp, cum_contracts, contract_size)
first_rr = calc_money_reward_risk_ratio(first_profit, risk_u) if first_profit is not None else None
p["preview_first_profit_u"] = first_profit
p["preview_target_rr"] = first_rr
p["preview_first_take_profit"] = user_tp
rows: list[dict] = [
_row_dict(
i=0,
label="首仓",
price=ref,
leg_contracts=first_amt,
cum_contracts=cum_contracts,
avg=ref,
is_first=True,
)
]
accumulated: list[tuple[float, float]] = [(ref, first_amt)]
for i, pair in enumerate(zip(grid, leg_amounts), 1):
try:
price = float(pair[0])
leg_contracts = float(pair[1])
except (TypeError, ValueError):
continue
accumulated.append((price, leg_contracts))
avg = weighted_avg_entry(accumulated)
if avg is None:
continue
cum_contracts += leg_contracts
rows.append(
_row_dict(
i=i,
label=f"补仓{i}",
price=price,
leg_contracts=leg_contracts,
cum_contracts=cum_contracts,
avg=avg,
is_first=False,
)
)
return p, rows
def enrich_trend_dca_levels_with_tp(plan: dict, levels: list[dict]) -> list[dict]:
"""运行中计划:为 dca_levels 补充加仓后均价、止盈盈利 U、止损金额 U、金额盈亏比。"""
if not levels:
return levels
p = plan or {}
direction = (p.get("direction") or "long").strip().lower()
try:
sl = float(p.get("stop_loss"))
user_tp = float(p.get("take_profit"))
first_amt = float(p.get("first_order_amount"))
snapshot = float(p.get("snapshot_available_usdt"))
risk_percent = float(p.get("risk_percent"))
except (TypeError, ValueError):
return levels
risk_u = calc_risk_budget_usdt(snapshot, risk_percent)
if risk_u is None or risk_u <= 0:
return levels
ref_raw = p.get("live_price_ref")
if ref_raw in (None, ""):
ref_raw = p.get("avg_entry_price")
try:
ref = float(ref_raw)
except (TypeError, ValueError):
return levels
try:
contract_size = float(p.get("contract_size") or 1.0)
if contract_size <= 0:
contract_size = 1.0
except (TypeError, ValueError):
contract_size = 1.0
out: list[dict] = []
accumulated: list[tuple[float, float]] = []
cum_contracts = 0.0
for lv in levels:
row = dict(lv)
is_first = row.get("leg_key") == "first" or row.get("label") == "首仓" or row.get("i") == 0
if is_first:
amt = row.get("contracts")
try:
amt_f = float(amt if amt is not None else first_amt)
except (TypeError, ValueError):
amt_f = first_amt
accumulated = [(ref, amt_f)]
cum_contracts = amt_f
row["avg_entry"] = ref
else:
price = row.get("price")
contracts = row.get("contracts")
if price is not None and contracts is not None:
try:
leg_contracts = float(contracts)
accumulated.append((float(price), leg_contracts))
avg = weighted_avg_entry(accumulated)
if avg is not None:
row["avg_entry"] = avg
cum_contracts += leg_contracts
except (TypeError, ValueError):
pass
avg_entry = row.get("avg_entry")
if avg_entry is not None:
profit_u = calc_tp_profit_usdt(
direction, float(avg_entry), user_tp, cum_contracts, contract_size
)
row["take_profit_price"] = user_tp
row["profit_u"] = profit_u
row["risk_u"] = risk_u
row["rr"] = calc_money_reward_risk_ratio(profit_u, risk_u) if profit_u is not None else None
row["take_profit"] = profit_u
row["stop_loss"] = risk_u
row["stop_loss_price"] = sl
out.append(row)
return out