""" 中控子代理:拉取交易所挂单/条件单并规范化展示;撤销单笔或按合约批量撤销;挂止盈止损(先撤条件单再挂)。 """ from __future__ import annotations import os import time from typing import Any from okx_orders_lib import fetch_okx_all_open_orders def _coerce_float(*values) -> float | None: for v in values: if v is None or v == "": continue try: return float(v) except (TypeError, ValueError): continue return None def _symbol_base_coin(symbol: str) -> str: """ZEC/USDT:USDT、ZEC-USDT-SWAP 等统一为标的币 ZEC。""" s = (symbol or "").strip().upper() if not s: return "" if "-SWAP" in s: s = s.replace("-SWAP", "") if "-" in s: return s.split("-", 1)[0] if "/" in s: return s.split("/", 1)[0] if ":" in s: return s.split(":", 1)[0] return s def symbols_match(position_symbol: str, order_symbol: str) -> bool: a = (position_symbol or "").strip().upper() b = (order_symbol or "").strip().upper() if not a or not b: return False if a == b: return True ba, bb = _symbol_base_coin(a), _symbol_base_coin(b) if ba and bb and ba == bb: return True for suf in (":USDT", "/USDT:USDT", "/USDT"): a2 = a.replace(suf, "") b2 = b.replace(suf, "") if f"{a2}/USDT" == b or f"{a2}/USDT:USDT" == b: return True if f"{b2}/USDT" == a or f"{b2}/USDT:USDT" == a: return True if a2 == b2: return True return False def _order_type_str(order: dict) -> str: info = order.get("info") or {} if isinstance(info, dict): for key in ("orderType", "type", "origType", "algoType", "ordType"): val = info.get(key) if val: return str(val).upper() return str(order.get("type") or "").upper() def _is_conditional_type(typ: str) -> bool: t = (typ or "").upper() if not t: return False keys = ("STOP", "TAKE_PROFIT", "TRAIL", "TRIGGER", "CONDITIONAL", "OCO") return any(k in t for k in keys) def _order_label(typ: str, side: str, reduce_only: bool | None) -> str: t = (typ or "").upper() side_l = (side or "").lower() parts = [] if "TAKE_PROFIT" in t: parts.append("止盈") elif "STOP" in t: parts.append("止损") elif "LIMIT" in t: parts.append("限价") elif "MARKET" in t: parts.append("市价") else: parts.append(typ or "委托") if side_l == "buy": parts.append("买入") elif side_l == "sell": parts.append("卖出") if reduce_only: parts.append("·只减仓") return " ".join(parts) def _normalize_raw_order(order: dict, *, channel: str) -> dict[str, Any] | None: if not isinstance(order, dict): return None info = order.get("info") or {} if not isinstance(info, dict): info = {} oid = order.get("id") or info.get("algoId") or info.get("orderId") or info.get("ordId") if oid is None: return None sym = str(order.get("symbol") or info.get("symbol") or info.get("instId") or "") typ = _order_type_str(order) side = str(order.get("side") or info.get("side") or "").lower() reduce_only = order.get("reduceOnly") if reduce_only is None: reduce_only = info.get("reduceOnly") try: reduce_only = bool(reduce_only) if reduce_only is not None else None except (TypeError, ValueError): reduce_only = None sl_trig = _coerce_float(info.get("slTriggerPx"), order.get("stopLossPrice")) tp_trig = _coerce_float(info.get("tpTriggerPx"), order.get("takeProfitPrice")) trig = _coerce_float( order.get("stopPrice"), order.get("triggerPrice"), info.get("triggerPrice"), info.get("stopPrice"), info.get("triggerPx"), sl_trig, tp_trig, ) price = _coerce_float(order.get("price"), info.get("price"), info.get("ordPx")) amt = _coerce_float(order.get("amount"), order.get("remaining"), info.get("quantity"), info.get("origQty"), info.get("sz")) category = "conditional" if _is_conditional_type(typ) or channel == "algo" else "limit" label = _order_label(typ, side, reduce_only) if sl_trig is not None and tp_trig is not None: label = f"止盈止损 SL={sl_trig:g} TP={tp_trig:g}" elif sl_trig is not None: label = f"止损 {sl_trig:g}" elif tp_trig is not None: label = f"止盈 {tp_trig:g}" return { "id": str(oid), "symbol": sym, "channel": channel, "category": category, "label": label, "type": typ, "side": side, "amount": amt, "trigger_price": trig, "price": price, "reduce_only": reduce_only, "status": str(order.get("status") or info.get("status") or "open"), } def _okx_normalize_orders(raw: dict, channel: str) -> list[dict[str, Any]]: """OKX 算法单常一笔同时含 SL+TP,拆成两条供中控「交易所止盈止损」展示。""" n = _normalize_raw_order(dict(raw), channel=channel) if not n: return [] info = raw.get("info") or {} if not isinstance(info, dict): info = {} sl_trig = _coerce_float(info.get("slTriggerPx"), raw.get("stopLossPrice")) tp_trig = _coerce_float(info.get("tpTriggerPx"), raw.get("takeProfitPrice")) if sl_trig is None or tp_trig is None or sl_trig == tp_trig: return [n] base_id = n["id"] rows: list[dict[str, Any]] = [] for role, px, lbl in ( ("sl", sl_trig, f"止损 {sl_trig:g}"), ("tp", tp_trig, f"止盈 {tp_trig:g}"), ): row = dict(n) row["id"] = f"{base_id}:{role}" row["algo_id"] = base_id row["label"] = lbl row["trigger_price"] = px row["category"] = "conditional" row["channel"] = channel rows.append(row) return rows def _okx_algo_order_id(order_id: str) -> str: oid = str(order_id or "") if ":" in oid: return oid.split(":", 1)[0] return oid def _binance_list(ex: Any, symbol: str | None) -> list[dict]: ex.load_markets() out: list[dict] = [] symbols: list[str] = [] if symbol: try: symbols = [ex.market(symbol)["symbol"]] except Exception: symbols = [symbol] else: symbols = [] try: for p in ex.fetch_positions() or []: sym = p.get("symbol") if sym: symbols.append(sym) except Exception: pass if symbol and not symbols: symbols = [symbol] def collect(ex_sym: str) -> None: market = ex.market(ex_sym) contract_id = market.get("id") try: for o in ex.fetch_open_orders(ex_sym) or []: item = dict(o) item["_channel"] = "regular" n = _normalize_raw_order(item, channel="regular") if n: out.append(n) except Exception: pass try: if contract_id and hasattr(ex, "fapiPrivateGetOpenAlgoOrders"): raw = ex.fapiPrivateGetOpenAlgoOrders({"symbol": contract_id}) items = raw if isinstance(raw, list) else (raw.get("orders") or raw.get("data") or []) for info in items or []: if not isinstance(info, dict): continue wrapped = { "id": info.get("algoId") or info.get("orderId"), "symbol": ex_sym, "info": info, "type": info.get("orderType") or info.get("type"), "side": (info.get("side") or "").lower(), "amount": info.get("quantity") or info.get("origQty"), "stopPrice": info.get("triggerPrice") or info.get("stopPrice"), "reduceOnly": info.get("reduceOnly"), } n = _normalize_raw_order(wrapped, channel="algo") if n: out.append(n) except Exception: pass if symbols: seen = set() for s in symbols: if s in seen: continue seen.add(s) collect(s) return out def _okx_list(ex: Any, symbol: str | None) -> list[dict]: ex.load_markets() out: list[dict] = [] symbols: list[str] = [] if symbol: try: symbols = [ex.market(symbol)["symbol"]] except Exception: symbols = [symbol] else: try: for p in ex.fetch_positions() or []: sym = p.get("symbol") if sym: symbols.append(sym) except Exception: pass if symbol and not symbols: symbols = [symbol] seen: set[tuple[str, str]] = set() for sym in symbols: try: for o in fetch_okx_all_open_orders(ex, sym): ch = "algo" if _is_conditional_type(_order_type_str(o)) else "regular" for n in _okx_normalize_orders(dict(o), channel=ch): key = (n["id"], n.get("channel") or ch) if key in seen: continue seen.add(key) out.append(n) except Exception: pass return out def _gate_trigger_params(ex: Any) -> dict: p = {"type": "swap", "trigger": True} try: ex.load_unified_status() if ex.options.get("unifiedAccount"): p["unifiedAccount"] = True except Exception: pass return p def _gate_list(ex: Any, symbol: str | None) -> list[dict]: ex.load_markets() out: list[dict] = [] symbols: list[str] = [] if symbol: try: symbols = [ex.market(symbol)["symbol"]] except Exception: symbols = [symbol] else: try: for p in ex.fetch_positions() or []: sym = p.get("symbol") if sym: symbols.append(sym) except Exception: pass if symbol and not symbols: symbols = [symbol] trig_params = _gate_trigger_params(ex) seen = set() for sym in symbols: if sym in seen: continue seen.add(sym) try: for o in ex.fetch_open_orders(sym) or []: n = _normalize_raw_order(dict(o), channel="regular") if n: out.append(n) except Exception: pass try: for o in ex.fetch_open_orders(sym, params=trig_params) or []: item = dict(o) item["type"] = item.get("type") or "trigger" n = _normalize_raw_order(item, channel="algo") if n: out.append(n) except Exception: pass return out def list_open_orders(ex: Any, exchange_kind: str, symbol: str | None = None) -> list[dict]: kind = (exchange_kind or "binance").lower() if kind == "binance": orders = _binance_list(ex, symbol) elif kind == "okx": orders = _okx_list(ex, symbol) else: orders = _gate_list(ex, symbol) if symbol: orders = [o for o in orders if symbols_match(symbol, o.get("symbol") or "")] # 去重 id+channel seen: set[tuple[str, str]] = set() uniq: list[dict] = [] for o in orders: key = (o["id"], o["channel"]) if key in seen: continue seen.add(key) uniq.append(o) return uniq def attach_orders_to_positions(positions: list[dict], orders: list[dict]) -> None: for p in positions: sym = p.get("symbol") or "" matched = [o for o in orders if symbols_match(sym, o.get("symbol") or "")] p["conditional_orders"] = [o for o in matched if o.get("category") == "conditional"] p["regular_orders"] = [o for o in matched if o.get("category") != "conditional"] def cancel_order( ex: Any, exchange_kind: str, symbol: str, order_id: str, channel: str = "regular", ) -> None: kind = (exchange_kind or "binance").lower() ex.load_markets() market = ex.market(symbol) unified = market["symbol"] ch = (channel or "regular").lower() if kind == "binance" and ch == "algo": contract_id = market.get("id") if contract_id and hasattr(ex, "fapiPrivateDeleteAlgoOrder"): ex.fapiPrivateDeleteAlgoOrder({"symbol": contract_id, "algoId": str(order_id)}) return params = None if kind == "gate" and ch == "algo": params = _gate_trigger_params(ex) oid = _okx_algo_order_id(order_id) if kind == "okx" else str(order_id) ex.cancel_order(oid, unified, params) def cancel_orders_for_symbol( ex: Any, exchange_kind: str, symbol: str, *, scope: str = "all", ) -> int: """scope: all | conditional | limit""" orders = list_open_orders(ex, exchange_kind, symbol) if scope == "conditional": orders = [o for o in orders if o.get("category") == "conditional"] elif scope == "limit": orders = [o for o in orders if o.get("category") != "conditional"] n = 0 for o in orders: try: cancel_order(ex, exchange_kind, symbol, o["id"], o.get("channel") or "regular") n += 1 except Exception: pass return n def _binance_cancel_algo_open(ex: Any, symbol: str) -> None: try: market = ex.market(symbol) cid = market.get("id") if cid and hasattr(ex, "fapiPrivateDeleteAlgoOpenOrders"): ex.fapiPrivateDeleteAlgoOpenOrders({"symbol": cid}) except Exception: pass def _binance_trigger_params() -> dict[str, Any]: wt = (os.getenv("BINANCE_TRIGGER_WORKING_TYPE") or "CONTRACT_PRICE").strip().upper() if wt not in ("CONTRACT_PRICE", "MARK_PRICE"): wt = "CONTRACT_PRICE" return {"workingType": wt} def _binance_place_tp_sl( ex: Any, symbol: str, direction: str, amount: float, stop_loss: float, take_profit: float, *, position_mode: str = "hedge", ) -> None: ex.load_markets() market = ex.market(symbol) if not market.get("swap"): raise RuntimeError("仅支持永续合约") close_side = "sell" if direction == "long" else "buy" amt = float(ex.amount_to_precision(symbol, float(amount))) if amt <= 0: raise RuntimeError("止盈止损:可平数量经精度舍入后为 0") sl_px = ex.price_to_precision(symbol, float(stop_loss)) tp_px = ex.price_to_precision(symbol, float(take_profit)) common = dict(_binance_trigger_params()) if (position_mode or "hedge").lower() in ("hedge", "dual", "double", "hedged"): common["positionSide"] = "LONG" if direction == "long" else "SHORT" last_err: Exception | None = None for attempt in range(6): try: ex.create_order( symbol, "STOP_MARKET", close_side, amt, None, dict(common, stopPrice=sl_px) ) time.sleep(0.05) ex.create_order( symbol, "TAKE_PROFIT_MARKET", close_side, amt, None, dict(common, stopPrice=tp_px), ) return except Exception as e: last_err = e cancel_orders_for_symbol(ex, "binance", symbol, scope="conditional") _binance_cancel_algo_open(ex, symbol) time.sleep(0.2 * (attempt + 1)) raise RuntimeError(f"Binance 未接受止盈/止损:{last_err}") def _okx_order_params(direction: str, *, reduce_only: bool, pos_mode: str, td_mode: str) -> dict: params: dict[str, Any] = {"tdMode": td_mode or "cross"} if (pos_mode or "hedge").lower() in ("hedge", "long_short_mode", "dual"): params["posSide"] = "long" if direction == "long" else "short" if reduce_only: params["reduceOnly"] = True return params def _okx_place_tp_sl( ex: Any, symbol: str, direction: str, amount: float, stop_loss: float, take_profit: float, *, pos_mode: str = "hedge", td_mode: str = "cross", ) -> None: ex.load_markets() close_side = "sell" if direction == "long" else "buy" amt = float(ex.amount_to_precision(symbol, float(amount))) if amt <= 0: raise RuntimeError("止盈止损:可平数量经精度舍入后为 0") base = _okx_order_params(direction, reduce_only=True, pos_mode=pos_mode, td_mode=td_mode) sl_px = float(stop_loss) tp_px = float(take_profit) last_err: Exception | None = None for attempt in range(6): try: ex.create_order( symbol, "market", close_side, amt, None, {**base, "stopLossPrice": sl_px}, ) time.sleep(0.05) ex.create_order( symbol, "market", close_side, amt, None, {**base, "takeProfitPrice": tp_px}, ) return except Exception as e: last_err = e cancel_orders_for_symbol(ex, "okx", symbol, scope="conditional") time.sleep(0.2 * (attempt + 1)) raise RuntimeError(f"OKX 未接受止盈/止损条件单:{last_err}") def _gate_tpsl_env() -> tuple[bool, int, int, str]: use_pos = (os.getenv("GATE_TPSL_USE_POSITION_ORDER") or "true").lower() in ("1", "true", "yes") exp = int(os.getenv("GATE_TPSL_TRIGGER_EXPIRATION", str(7 * 86400))) pt = int(os.getenv("GATE_TPSL_PRICE_TYPE", "0")) if pt < 0 or pt > 2: pt = 0 pos_mode = (os.getenv("GATE_POS_MODE") or "hedge").strip().lower() return use_pos, exp, pt, pos_mode def _gate_place_tp_sl_position( ex: Any, symbol: str, direction: str, stop_loss: float, take_profit: float, *, pos_mode: str, price_type: int, expiration: int, ) -> None: ex.load_markets() market = ex.market(symbol) if not market.get("swap"): raise RuntimeError("仅支持永续合约") settle = market["settleId"] contract = market["id"] order_type = "close-long-position" if direction == "long" else "close-short-position" close_side = "sell" if direction == "long" else "buy" sl_rule, tp_rule = (2, 1) if close_side == "sell" else (1, 2) initial: dict[str, Any] = { "contract": contract, "size": 0, "price": "0", "close": True, "reduce_only": True, "tif": "ioc", "text": "api", } if pos_mode in ("hedge", "dual", "double"): initial["auto_size"] = "close_long" if direction == "long" else "close_short" sl_s = ex.price_to_precision(symbol, float(stop_loss)) tp_s = ex.price_to_precision(symbol, float(take_profit)) def _payload(trigger_price: str, rule: int) -> dict: trig: dict[str, Any] = { "strategy_type": 0, "price_type": price_type, "price": trigger_price, "rule": rule, } if expiration > 0: trig["expiration"] = expiration return { "settle": settle, "initial": dict(initial), "trigger": trig, "order_type": order_type, } last_err: Exception | None = None for attempt in range(6): try: ex.privateFuturesPostSettlePriceOrders(_payload(sl_s, sl_rule)) try: ex.privateFuturesPostSettlePriceOrders(_payload(tp_s, tp_rule)) except Exception: cancel_orders_for_symbol(ex, "gate", symbol, scope="conditional") raise return except Exception as e: last_err = e time.sleep(0.2 * (attempt + 1)) raise RuntimeError(f"Gate 仓位类止盈/止损未接受:{last_err}") def _gate_place_tp_sl_legacy( ex: Any, symbol: str, direction: str, amount: float, stop_loss: float, take_profit: float, ) -> None: ex.load_markets() close_side = "sell" if direction == "long" else "buy" base = {"reduceOnly": True} last_err: Exception | None = None for attempt in range(6): try: ex.create_order( symbol, "market", close_side, amount, None, dict(base, stopLossPrice=float(stop_loss)), ) ex.create_order( symbol, "market", close_side, amount, None, dict(base, takeProfitPrice=float(take_profit)), ) return except Exception as e: last_err = e time.sleep(0.2 * (attempt + 1)) raise RuntimeError(f"Gate 条件止盈/止损未接受:{last_err}") def _gate_place_tp_sl( ex: Any, symbol: str, direction: str, amount: float, stop_loss: float, take_profit: float, ) -> None: use_pos, exp, pt, pos_mode = _gate_tpsl_env() if use_pos: try: _gate_place_tp_sl_position( ex, symbol, direction, stop_loss, take_profit, pos_mode=pos_mode, price_type=pt, expiration=exp, ) return except Exception: pass _gate_place_tp_sl_legacy(ex, symbol, direction, amount, stop_loss, take_profit) def replace_position_tpsl( ex: Any, exchange_kind: str, symbol: str, direction: str, amount: float, stop_loss: float, take_profit: float, ) -> dict[str, Any]: """ 先撤销该合约全部条件单,再挂止盈+止损。与四实例策略页逻辑对齐(读各目录 .env 中 GATE_/BINANCE_/OKX_ 参数)。 """ kind = (exchange_kind or "binance").lower() direction = (direction or "long").strip().lower() if direction not in ("long", "short"): raise ValueError("direction 须为 long 或 short") sl = float(stop_loss) tp = float(take_profit) if sl <= 0 or tp <= 0: raise ValueError("止损、止盈价格须大于 0") ex.load_markets() cancelled = cancel_orders_for_symbol(ex, kind, symbol, scope="conditional") if kind == "binance": _binance_cancel_algo_open(ex, symbol) time.sleep(0.08) amt = float(amount) if amt <= 0: raise ValueError("持仓数量无效") if kind == "binance": pm = (os.getenv("BINANCE_POSITION_MODE") or "hedge").strip().lower() _binance_place_tp_sl(ex, symbol, direction, amt, sl, tp, position_mode=pm) elif kind == "okx": pm = (os.getenv("OKX_POS_MODE") or "hedge").strip().lower() td = (os.getenv("OKX_TD_MODE") or "cross").strip() _okx_place_tp_sl(ex, symbol, direction, amt, sl, tp, pos_mode=pm, td_mode=td) else: _gate_place_tp_sl(ex, symbol, direction, amt, sl, tp) return { "symbol": symbol, "direction": direction, "amount": amt, "stop_loss": sl, "take_profit": tp, "cancelled_conditional": cancelled, }