""" OKX 挂单聚合(子代理本地副本,避免依赖仓库根 PYTHONPATH)。 普通委托 + 算法单 conditional / oco / trigger。 """ from __future__ import annotations from typing import Any def _order_dedupe_key(order: dict) -> str: info = order.get("info") or {} if not isinstance(info, dict): info = {} return str(order.get("id") or info.get("algoId") or info.get("ordId") or "") def fetch_okx_all_open_orders(ex, exchange_symbol: str) -> list[dict]: """合并 OKX 普通挂单与算法挂单(去重)。""" if not exchange_symbol: return [] ex.load_markets() sym = exchange_symbol try: sym = ex.market(exchange_symbol)["symbol"] except Exception: pass seen: set[str] = set() out: list[dict] = [] def add_batch(batch: list | None) -> None: for o in batch or []: if not isinstance(o, dict): continue k = _order_dedupe_key(o) if not k or k in seen: continue seen.add(k) out.append(o) try: add_batch(ex.fetch_open_orders(sym)) except Exception: pass for params in ( {"ordType": "conditional"}, {"ordType": "oco"}, {"trigger": True}, ): try: add_batch(ex.fetch_open_orders(sym, params=dict(params))) except Exception: pass return out