Use hub exchange instances for calculator contract precision.

Load enabled instances from settings, fetch market info via /api/hub/market, and apply exchange-specific amount and price precision in trend and roll calculators.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-23 18:13:02 +08:00
parent d938bc6c59
commit 5e507d0b66
14 changed files with 1140 additions and 204 deletions
+217 -79
View File
@@ -1,9 +1,9 @@
"""中控历史测算:趋势回调 / 滚仓,以损定仓(交易所精度张数按公式估算)。"""
"""中控历史测算:趋势回调 / 滚仓,以损定仓(交易所精度张数规则)。"""
from __future__ import annotations
from typing import Any, Callable, Optional, Tuple
from strategy_roll_lib import max_roll_legs, preview_roll
from strategy_roll_lib import max_roll_legs
from strategy_trend_lib import (
build_trend_preview_level_rows,
calc_risk_fraction,
@@ -12,37 +12,20 @@ from strategy_trend_lib import (
)
DEFAULT_DCA_LEGS = 5
DEFAULT_CONTRACT_SIZE = 1.0
MARGIN_BUFFER = 0.95
def _identity_amount_precise(_symbol: str, amount: float) -> Optional[float]:
try:
v = float(amount)
except (TypeError, ValueError):
return None
if v <= 0:
return None
return round(v, 8)
def _resolve_market(
exchange_id: str,
base: str,
) -> Tuple[Optional[dict[str, Any]], Optional[Callable[[float], Optional[float]]], Optional[str]]:
from hub_calculator_market_lib import get_calculator_market, make_amount_precise_fn_from_market
def amount_from_margin(
margin_capital: float,
leverage: int,
price: float,
contract_size: float = DEFAULT_CONTRACT_SIZE,
) -> Optional[float]:
try:
margin = float(margin_capital)
lev = int(leverage)
px = float(price)
cs = float(contract_size) if contract_size else DEFAULT_CONTRACT_SIZE
except (TypeError, ValueError):
return None
if margin <= 0 or lev <= 0 or px <= 0 or cs <= 0:
return None
notional = margin * lev
return notional / (px * cs)
market, err = get_calculator_market(exchange_id, base)
if err or not market:
return None, None, err or "无法解析合约"
amount_precise = make_amount_precise_fn_from_market(market)
return market, amount_precise, None
def calc_trend_calculator(
@@ -56,8 +39,15 @@ def calc_trend_calculator(
add_upper: float,
take_profit: float,
dca_legs: int = DEFAULT_DCA_LEGS,
contract_size: float = DEFAULT_CONTRACT_SIZE,
exchange_id: str = "0",
base: str = "ETH",
) -> Tuple[Optional[dict[str, Any]], Optional[str]]:
market, amount_precise, merr = _resolve_market(exchange_id, base)
if merr or not market or not amount_precise:
return None, merr or "无法解析合约"
contract_size = float(market.get("contract_size") or 1.0)
exchange_symbol = market["exchange_symbol"]
direction = (direction or "long").strip().lower()
if direction not in ("long", "short"):
return None, "方向须为 long 或 short"
@@ -70,7 +60,7 @@ def calc_trend_calculator(
upper = float(add_upper)
tp = float(take_profit)
legs = max(1, int(dca_legs))
cs = float(contract_size) if contract_size else DEFAULT_CONTRACT_SIZE
cs = float(contract_size) if contract_size else 1.0
except (TypeError, ValueError):
return None, "参数格式错误"
if capital <= 0 or rp <= 0 or lev <= 0 or entry <= 0 or sl <= 0 or upper <= 0 or tp <= 0:
@@ -90,9 +80,15 @@ def calc_trend_calculator(
if margin_plan <= 0:
return None, "计划保证金过小"
target_amt = amount_from_margin(margin_plan, lev, entry, cs)
target_amt = _amount_from_margin(margin_plan, lev, entry, cs)
if target_amt is None or target_amt <= 0:
return None, "无法计算计划张数,请检查入场价与杠杆"
target_amt = amount_precise(target_amt)
if target_amt is None or target_amt <= 0:
return None, "计划张数低于交易所最小精度"
def _amount_precise(_symbol: str, amount: float) -> Optional[float]:
return amount_precise(amount)
payload, err = compute_trend_plan_core(
direction=direction,
@@ -103,10 +99,10 @@ def calc_trend_calculator(
leverage=lev,
live_price=entry,
target_order_amount=target_amt,
exchange_symbol="CALC",
exchange_symbol=exchange_symbol,
dca_legs=legs,
amount_precise=_identity_amount_precise,
min_amount=0.0,
amount_precise=_amount_precise,
min_amount=float(market.get("min_amount") or 0.0),
full_margin_buffer_ratio=MARGIN_BUFFER,
)
if err:
@@ -117,11 +113,14 @@ def calc_trend_calculator(
payload["contract_size"] = cs
preview, rows = build_trend_preview_level_rows(payload)
def _f(v: Any, nd: int = 4) -> Any:
px_dec = int(market.get("price_decimals") or 4)
amt_dec = int(market.get("amount_decimals") or 4)
def _f(v: Any, nd: int | None = None) -> Any:
if v is None:
return None
try:
return round(float(v), nd)
return round(float(v), nd if nd is not None else 8)
except (TypeError, ValueError):
return v
@@ -130,9 +129,9 @@ def calc_trend_calculator(
table.append(
{
"label": row.get("label"),
"price": _f(row.get("price"), 8),
"contracts": _f(row.get("contracts"), 8),
"avg_entry": _f(row.get("avg_entry"), 8),
"price": _f(row.get("price"), px_dec),
"contracts": _f(row.get("contracts"), amt_dec),
"avg_entry": _f(row.get("avg_entry"), px_dec),
"profit_u": _f(row.get("profit_u")),
"risk_u": _f(row.get("risk_u")),
"rr": _f(row.get("rr"), 4),
@@ -145,20 +144,40 @@ def calc_trend_calculator(
"risk_percent": _f(rp, 2),
"risk_budget_u": _f(preview.get("preview_risk_amount_u")),
"leverage": lev,
"entry_price": _f(entry, 8),
"stop_loss": _f(sl, 8),
"add_upper": _f(upper, 8),
"take_profit": _f(tp, 8),
"entry_price": _f(entry, px_dec),
"stop_loss": _f(sl, px_dec),
"add_upper": _f(upper, px_dec),
"take_profit": _f(tp, px_dec),
"plan_margin_u": _f(preview.get("plan_margin_capital")),
"target_contracts": _f(preview.get("target_order_amount"), 8),
"first_contracts": _f(preview.get("first_order_amount"), 8),
"target_contracts": _f(preview.get("target_order_amount"), amt_dec),
"first_contracts": _f(preview.get("first_order_amount"), amt_dec),
"dca_legs": int(preview.get("dca_legs") or legs),
"first_profit_u": _f(preview.get("preview_first_profit_u")),
"first_rr": _f(preview.get("preview_target_rr"), 4),
"market": market,
"rows": table,
}, None
def _amount_from_margin(
margin_capital: float,
leverage: int,
price: float,
contract_size: float,
) -> Optional[float]:
try:
margin = float(margin_capital)
lev = int(leverage)
px = float(price)
cs = float(contract_size) if contract_size else 1.0
except (TypeError, ValueError):
return None
if margin <= 0 or lev <= 0 or px <= 0 or cs <= 0:
return None
notional = margin * lev
return notional / (px * cs)
def _round(v: Any, nd: int = 4) -> Any:
if v is None:
return None
@@ -182,28 +201,135 @@ def calc_initial_roll_qty(
entry_price: float,
stop_loss: float,
risk_budget_usdt: float,
contract_size: float = 1.0,
) -> Tuple[Optional[float], Optional[str]]:
"""首仓以损定仓:打到初始止损亏损 = 风险预算。"""
try:
entry = float(entry_price)
sl = float(stop_loss)
budget = float(risk_budget_usdt)
cs = float(contract_size) if contract_size else 1.0
except (TypeError, ValueError):
return None, "参数格式错误"
if entry <= 0 or sl <= 0 or budget <= 0:
if entry <= 0 or sl <= 0 or budget <= 0 or cs <= 0:
return None, "入场价、止损与风险预算须大于 0"
direction = (direction or "long").strip().lower()
if direction == "short":
per_unit = sl - entry
per_unit = (sl - entry) * cs
if per_unit <= 0:
return None, "做空:止损价须高于首仓入场价"
else:
per_unit = entry - sl
per_unit = (entry - sl) * cs
if per_unit <= 0:
return None, "做多:止损价须低于首仓入场价"
return budget / per_unit, None
def solve_add_amount_for_total_risk(
direction: str,
qty_existing: float,
entry_existing: float,
add_price: float,
new_stop: float,
risk_budget_usdt: float,
contract_size: float = 1.0,
) -> Tuple[Optional[float], Optional[str]]:
"""合并持仓打到新止损总亏损 = 风险预算,反推本次加仓张数。"""
try:
q1 = float(qty_existing)
e1 = float(entry_existing)
e2 = float(add_price)
sl = float(new_stop)
b = float(risk_budget_usdt)
cs = float(contract_size) if contract_size else 1.0
except (TypeError, ValueError):
return None, "参数格式错误"
if q1 <= 0 or e1 <= 0 or e2 <= 0 or b <= 0 or cs <= 0:
return None, "持仓或风险预算无效"
direction = (direction or "long").strip().lower()
if direction == "short":
denom = sl - e2
numer = b / cs - q1 * (sl - e1)
if denom <= 0:
return None, "做空:新止损须高于限价加仓价"
else:
denom = e2 - sl
numer = b / cs - q1 * (e1 - sl)
if denom <= 0:
return None, "做多:新止损须低于限价/市价加仓价"
q2 = numer / denom
if q2 <= 0:
return None, "按当前新止损与总风险%,无需加仓或无法再加(已满足风险上限)"
return q2, None
def _roll_leg_preview(
*,
direction: str,
qty_existing: float,
entry_existing: float,
take_profit: float,
add_price: float,
new_stop_loss: float,
risk_budget: float,
contract_size: float,
amount_precise: Callable[[float], Optional[float]],
) -> Tuple[Optional[dict[str, Any]], Optional[str]]:
direction = (direction or "long").strip().lower()
try:
tp = float(take_profit)
sl = float(new_stop_loss)
entry_add = float(add_price)
e1 = float(entry_existing)
except (TypeError, ValueError):
return None, "止损/止盈格式错误"
if sl <= 0 or tp <= 0 or entry_add <= 0:
return None, "止损与首仓止盈须大于0"
if direction == "long":
if sl >= entry_add:
return None, "做多:新止损须低于加仓价"
if tp <= e1:
return None, "做多:首仓止盈须高于当前持仓均价参考"
else:
if sl <= entry_add:
return None, "做空:新止损须高于加仓价"
if tp >= e1:
return None, "做空:首仓止盈须低于当前持仓均价参考"
q2_raw, err = solve_add_amount_for_total_risk(
direction,
qty_existing,
entry_existing,
entry_add,
sl,
risk_budget,
contract_size,
)
if err:
return None, err
q2 = amount_precise(float(q2_raw))
if q2 is None or q2 <= 0:
return None, "加仓张数低于交易所最小精度"
new_qty = float(qty_existing) + float(q2)
new_avg = (float(qty_existing) * float(entry_existing) + float(q2) * entry_add) / new_qty
cs = float(contract_size) if contract_size else 1.0
if direction == "long":
loss_at_sl = (new_avg - sl) * new_qty * cs
reward_at_tp = (tp - new_avg) * new_qty * cs
else:
loss_at_sl = (sl - new_avg) * new_qty * cs
reward_at_tp = (new_avg - tp) * new_qty * cs
return {
"add_amount_raw": q2,
"qty_after": new_qty,
"avg_entry_after": new_avg,
"add_price": entry_add,
"new_stop_loss": sl,
"loss_at_sl_usdt": loss_at_sl,
"reward_at_tp_usdt": reward_at_tp,
}, None
def calc_roll_calculator(
*,
direction: str,
@@ -214,12 +340,21 @@ def calc_roll_calculator(
take_profit: float,
add_legs: list[dict[str, float]] | None = None,
legs_done: int = 0,
exchange_id: str = "0",
base: str = "ETH",
) -> Tuple[Optional[dict[str, Any]], Optional[str]]:
"""
滚仓历史测算:首仓自动以损定仓;止盈锁定首仓价;最多 3 次滚仓加仓。
add_legs: [{add_price, new_stop_loss}, ...],按顺序链式计算。
legs_done: 已完成滚仓次数(仅标记,仍参与链式状态推进)。
"""
market, amount_precise, merr = _resolve_market(exchange_id, base)
if merr or not market or not amount_precise:
return None, merr or "无法解析合约"
contract_size = float(market.get("contract_size") or 1.0)
px_dec = int(market.get("price_decimals") or 4)
amt_dec = int(market.get("amount_decimals") or 4)
direction = (direction or "long").strip().lower()
if direction not in ("long", "short"):
return None, "方向须为 long 或 short"
@@ -261,34 +396,38 @@ def calc_roll_calculator(
return None, "做空:止盈价须低于首仓入场价"
risk_budget = capital * (rp / 100.0)
qty, err = calc_initial_roll_qty(direction, entry, initial_sl, risk_budget)
qty, err = calc_initial_roll_qty(direction, entry, initial_sl, risk_budget, contract_size)
if err:
return None, err
if qty is None or qty <= 0:
return None, "无法计算首仓张数"
qty_p = amount_precise(float(qty))
if qty_p is None or qty_p <= 0:
return None, "首仓张数低于交易所最小精度"
qty_f = float(qty)
qty_f = float(qty_p)
avg = entry
rows: list[dict[str, Any]] = []
cs = contract_size
if direction == "long":
first_loss = (avg - initial_sl) * qty_f
first_profit = (tp - avg) * qty_f
first_loss = (avg - initial_sl) * qty_f * cs
first_profit = (tp - avg) * qty_f * cs
else:
first_loss = (initial_sl - avg) * qty_f
first_profit = (avg - tp) * qty_f
first_loss = (initial_sl - avg) * qty_f * cs
first_profit = (avg - tp) * qty_f * cs
rows.append(
{
"label": "首仓",
"leg_index": 0,
"already_done": False,
"entry_or_add_price": _round(entry, 8),
"stop_loss": _round(initial_sl, 8),
"add_contracts": _round(qty_f, 8),
"total_contracts": _round(qty_f, 8),
"avg_entry": _round(avg, 8),
"take_profit": _round(tp, 8),
"entry_or_add_price": _round(entry, px_dec),
"stop_loss": _round(initial_sl, px_dec),
"add_contracts": _round(qty_f, amt_dec),
"total_contracts": _round(qty_f, amt_dec),
"avg_entry": _round(avg, px_dec),
"take_profit": _round(tp, px_dec),
"loss_at_sl_u": _round(first_loss),
"profit_at_tp_u": _round(first_profit),
"rr": _money_rr(first_profit, first_loss),
@@ -300,18 +439,16 @@ def calc_roll_calculator(
for i, leg in enumerate(legs_in):
leg_no = i + 1
preview, err = preview_roll(
preview, err = _roll_leg_preview(
direction=direction,
symbol="CALC",
qty_existing=current_qty,
entry_existing=current_avg,
initial_take_profit=tp,
add_mode="market",
new_stop_loss=leg["new_stop_loss"],
risk_percent=rp,
capital_base_usdt=capital,
take_profit=tp,
add_price=leg["add_price"],
legs_done=i,
new_stop_loss=leg["new_stop_loss"],
risk_budget=risk_budget,
contract_size=cs,
amount_precise=amount_precise,
)
if err:
return None, f"滚仓第 {leg_no} 次:{err}"
@@ -327,12 +464,12 @@ def calc_roll_calculator(
"label": f"滚仓{leg_no}",
"leg_index": leg_no,
"already_done": leg_no <= done,
"entry_or_add_price": _round(preview.get("add_price"), 8),
"stop_loss": _round(preview.get("new_stop_loss"), 8),
"add_contracts": _round(preview.get("add_amount_raw"), 8),
"total_contracts": _round(current_qty, 8),
"avg_entry": _round(current_avg, 8),
"take_profit": _round(tp, 8),
"entry_or_add_price": _round(preview.get("add_price"), px_dec),
"stop_loss": _round(preview.get("new_stop_loss"), px_dec),
"add_contracts": _round(preview.get("add_amount_raw"), amt_dec),
"total_contracts": _round(current_qty, amt_dec),
"avg_entry": _round(current_avg, px_dec),
"take_profit": _round(tp, px_dec),
"loss_at_sl_u": _round(loss),
"profit_at_tp_u": _round(reward),
"rr": _money_rr(reward, loss),
@@ -345,16 +482,17 @@ def calc_roll_calculator(
"capital_usdt": _round(capital),
"risk_percent": _round(rp, 2),
"risk_budget_u": _round(risk_budget),
"entry_price": _round(entry, 8),
"stop_loss": _round(initial_sl, 8),
"take_profit": _round(tp, 8),
"entry_price": _round(entry, px_dec),
"stop_loss": _round(initial_sl, px_dec),
"take_profit": _round(tp, px_dec),
"legs_done": done,
"roll_legs_planned": len(legs_in),
"first_contracts": _round(qty_f, 8),
"first_contracts": _round(qty_f, amt_dec),
"final_contracts": last.get("total_contracts"),
"final_avg_entry": last.get("avg_entry"),
"final_loss_at_sl_u": last.get("loss_at_sl_u"),
"final_profit_at_tp_u": last.get("profit_at_tp_u"),
"final_rr": last.get("rr"),
"market": market,
"rows": rows,
}, None