From 1e360864653f976e276c82173cc8347802c488fd Mon Sep 17 00:00:00 2001 From: dekun Date: Mon, 25 May 2026 11:25:14 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8Dokx=E6=AD=A2=E7=9B=88?= =?UTF-8?q?=E6=AD=A2=E6=8D=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- manual_trading_hub/agent.py | 9 +- manual_trading_hub/ecosystem.config.cjs | 1 + manual_trading_hub/hub.py | 129 ++++++++++++++++++++++-- manual_trading_hub/okx_orders_lib.py | 53 ++++++++++ manual_trading_hub/scripts/run_agent.sh | 2 + manual_trading_hub/static/app.js | 31 +++++- manual_trading_hub/static/index.html | 2 +- 7 files changed, 211 insertions(+), 16 deletions(-) create mode 100644 manual_trading_hub/okx_orders_lib.py diff --git a/manual_trading_hub/agent.py b/manual_trading_hub/agent.py index 32e21b6..b0f68f5 100644 --- a/manual_trading_hub/agent.py +++ b/manual_trading_hub/agent.py @@ -550,12 +550,14 @@ def _status_inner(x_control_token: str | None) -> Any: } ) + orders_fetch_error: str | None = None try: attach_orders_to_positions( positions_out, list_open_orders(ex, EXCHANGE_KIND, None), ) - except Exception: + except Exception as e: + orders_fetch_error = str(e) for p in positions_out: p.setdefault("conditional_orders", []) p.setdefault("regular_orders", []) @@ -564,7 +566,7 @@ def _status_inner(x_control_token: str | None) -> Any: pm = _position_mode_label() except Exception: pm = EXCHANGE_KIND - return { + out = { "ok": True, "exchange": EXCHANGE_KIND, "balance_usdt": balance_usdt, @@ -572,6 +574,9 @@ def _status_inner(x_control_token: str | None) -> Any: "total_unrealized_pnl": _finite_or_none(total_upnl), "position_mode": pm, } + if orders_fetch_error: + out["orders_fetch_error"] = orders_fetch_error + return out @app.get("/open-orders") diff --git a/manual_trading_hub/ecosystem.config.cjs b/manual_trading_hub/ecosystem.config.cjs index 16b409d..44a954e 100644 --- a/manual_trading_hub/ecosystem.config.cjs +++ b/manual_trading_hub/ecosystem.config.cjs @@ -39,6 +39,7 @@ function agentApp(name, exchangeDir, exchange, port) { EXCHANGE: exchange, PORT: String(port), HOST: "127.0.0.1", + PYTHONPATH: REPO_ROOT, }, }; } diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index 66ac374..619e06a 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -37,13 +37,22 @@ from hub_web_auth import ( ) from url_public import browser_url, default_review_url, public_origin +try: + from exchange_orders import symbols_match as _symbols_match +except ImportError: + + def _symbols_match(position_symbol: str, order_symbol: str) -> bool: + a = (position_symbol or "").strip().upper() + b = (order_symbol or "").strip().upper() + return bool(a and b and a == b) + HUB_HOST = os.getenv("HUB_HOST", "0.0.0.0") HUB_PORT = int(os.getenv("HUB_PORT", "5100")) HUB_BRIDGE_TOKEN = (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip() _trust_raw = (os.getenv("HUB_TRUST_LAN", "true") or "").strip().lower() HUB_TRUST_LAN = _trust_raw not in ("0", "false", "no", "off") DIR = Path(__file__).resolve().parent -HUB_BUILD = "20260525-okx-tpsl" +HUB_BUILD = "20260525-okx-tpsl2" HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8")) HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10")) _board_key_prices_raw = (os.getenv("HUB_BOARD_KEY_PRICES", "true") or "").strip().lower() @@ -352,33 +361,131 @@ def _flask_error_from_hub_mon(hub_mon: dict | None) -> str | None: ) +def _tpsl_slots_to_conditional_orders(exchange_tpsl: dict, symbol: str) -> list[dict]: + """将实例 price_snapshot 的 exchange_tpsl 转为中控条件单结构。""" + out: list[dict] = [] + if not isinstance(exchange_tpsl, dict): + return out + for role, label in (("sl", "止损"), ("tp", "止盈")): + slot = exchange_tpsl.get(role) + if not isinstance(slot, dict): + continue + trig = slot.get("trigger_price") + oid = slot.get("order_id") + if trig is None or oid is None: + continue + try: + trig_f = float(trig) + except (TypeError, ValueError): + continue + out.append( + { + "id": str(oid), + "symbol": symbol, + "channel": "algo", + "category": "conditional", + "label": f"{label} {trig_f:g}", + "trigger_price": trig_f, + "amount": None, + "status": "open", + } + ) + return out + + +def _find_exchange_tpsl_for_position( + symbol: str, + side: str, + order_prices: list, + hub_orders: list, +) -> dict | None: + side_l = (side or "").lower() + op_by_id = { + op.get("id"): op + for op in order_prices + if isinstance(op, dict) and op.get("id") is not None + } + for o in hub_orders: + if not isinstance(o, dict): + continue + o_sym = o.get("exchange_symbol") or o.get("symbol") or "" + if not _symbols_match(symbol, o_sym): + continue + if (o.get("direction") or "").lower() != side_l: + continue + op = op_by_id.get(o.get("id")) + if not isinstance(op, dict): + continue + et = op.get("exchange_tpsl") + if isinstance(et, dict) and (et.get("sl") or et.get("tp")): + return et + for op in order_prices: + if not isinstance(op, dict): + continue + if not _symbols_match(symbol, op.get("symbol") or ""): + continue + et = op.get("exchange_tpsl") + if isinstance(et, dict) and (et.get("sl") or et.get("tp")): + return et + return None + + +def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None: + """子代理挂单为空时,用实例 Flask 已算好的 exchange_tpsl 补全展示。""" + ag = agent_row.get("agent") + if not isinstance(ag, dict): + return + positions = ag.get("positions") + if not isinstance(positions, list) or not positions: + return + if not isinstance(snap, dict): + return + order_prices = snap.get("order_prices") or [] + hub_orders = [] + if isinstance(hub_mon, dict): + hub_orders = hub_mon.get("orders") or [] + for p in positions: + if not isinstance(p, dict): + continue + sym = p.get("symbol") or "" + side = p.get("side") or "" + et = _find_exchange_tpsl_for_position(sym, side, order_prices, hub_orders) + if not et: + continue + p["exchange_tpsl"] = et + cond = p.get("conditional_orders") or [] + if not cond: + p["conditional_orders"] = _tpsl_slots_to_conditional_orders(et, sym) + + async def _fetch_exchange_flask_bundle( client: httpx.AsyncClient, ex: dict -) -> tuple[dict | None, dict | None, list | None]: - """单所 Flask:monitor / meta /(可选)price_snapshot 并行拉取。""" +) -> tuple[dict | None, dict | None, list | None, dict | None]: + """单所 Flask:monitor / meta / price_snapshot(有 flask_url 时)并行拉取。""" caps = ex.get("capabilities") or [] tasks = [ _fetch_flask_json(client, ex, "/api/hub/monitor"), _fetch_flask_json(client, ex, "/api/hub/meta"), ] - want_prices = HUB_BOARD_KEY_PRICES and "key" in caps - if want_prices: + has_flask = bool((ex.get("flask_url") or "").strip()) + if has_flask: tasks.append(_fetch_flask_json(client, ex, "/api/price_snapshot")) results = await asyncio.gather(*tasks) hub_mon = results[0] meta = results[1] + snap = results[2] if has_flask and len(results) > 2 else None key_prices = None - if want_prices and len(results) > 2: - snap = results[2] - if isinstance(snap, dict): - key_prices = snap.get("key_prices") - return hub_mon, meta, key_prices + want_prices = HUB_BOARD_KEY_PRICES and "key" in caps + if want_prices and isinstance(snap, dict): + key_prices = snap.get("key_prices") + return hub_mon, meta, key_prices, snap if isinstance(snap, dict) else None async def _assemble_board_row( client: httpx.AsyncClient, ex: dict, agent_row: dict ) -> dict: - hub_mon, meta, key_prices = await _fetch_exchange_flask_bundle(client, ex) + hub_mon, meta, key_prices, snap = await _fetch_exchange_flask_bundle(client, ex) + _merge_flask_exchange_tpsl(agent_row, snap, hub_mon if isinstance(hub_mon, dict) else None) flask_ok = isinstance(hub_mon, dict) and hub_mon.get("ok") is not False raw_review = (ex.get("review_url") or "").strip() review_link = browser_url(raw_review) if raw_review else default_review_url( diff --git a/manual_trading_hub/okx_orders_lib.py b/manual_trading_hub/okx_orders_lib.py new file mode 100644 index 0000000..3311a4d --- /dev/null +++ b/manual_trading_hub/okx_orders_lib.py @@ -0,0 +1,53 @@ +""" +OKX 挂单聚合(子代理本地副本,避免依赖仓库根 PYTHONPATH)。 +普通委托 + 算法单 conditional / oco / trigger。 +""" +from __future__ import annotations + +from typing import Any + + +def _order_dedupe_key(order: dict) -> str: + info = order.get("info") or {} + if not isinstance(info, dict): + info = {} + return str(order.get("id") or info.get("algoId") or info.get("ordId") or "") + + +def fetch_okx_all_open_orders(ex, exchange_symbol: str) -> list[dict]: + """合并 OKX 普通挂单与算法挂单(去重)。""" + if not exchange_symbol: + return [] + ex.load_markets() + sym = exchange_symbol + try: + sym = ex.market(exchange_symbol)["symbol"] + except Exception: + pass + seen: set[str] = set() + out: list[dict] = [] + + def add_batch(batch: list | None) -> None: + for o in batch or []: + if not isinstance(o, dict): + continue + k = _order_dedupe_key(o) + if not k or k in seen: + continue + seen.add(k) + out.append(o) + + try: + add_batch(ex.fetch_open_orders(sym)) + except Exception: + pass + for params in ( + {"ordType": "conditional"}, + {"ordType": "oco"}, + {"trigger": True}, + ): + try: + add_batch(ex.fetch_open_orders(sym, params=dict(params))) + except Exception: + pass + return out diff --git a/manual_trading_hub/scripts/run_agent.sh b/manual_trading_hub/scripts/run_agent.sh index 98c4995..8c90dba 100644 --- a/manual_trading_hub/scripts/run_agent.sh +++ b/manual_trading_hub/scripts/run_agent.sh @@ -4,6 +4,8 @@ set -e set -o pipefail HUB_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +REPO_ROOT="$(cd "${HUB_DIR}/.." && pwd)" +export PYTHONPATH="${REPO_ROOT}${PYTHONPATH:+:${PYTHONPATH}}" # shellcheck source=lib_load_dotenv.sh source "${HUB_DIR}/scripts/lib_load_dotenv.sh" diff --git a/manual_trading_hub/static/app.js b/manual_trading_hub/static/app.js index 8af5487..2dcd5f7 100644 --- a/manual_trading_hub/static/app.js +++ b/manual_trading_hub/static/app.js @@ -168,6 +168,33 @@ return localStorage.getItem(ordersCollapseKey(exchangeId, symbol)) === "1"; } + function condOrdersFromPosition(pos) { + const cond = Array.isArray(pos.conditional_orders) ? pos.conditional_orders : []; + if (cond.length) return cond; + const et = pos.exchange_tpsl; + if (!et) return []; + const out = []; + if (et.sl && et.sl.trigger_price != null) { + out.push({ + label: "止损", + trigger_price: Number(et.sl.trigger_price), + amount: null, + id: et.sl.order_id, + channel: "algo", + }); + } + if (et.tp && et.tp.trigger_price != null) { + out.push({ + label: "止盈", + trigger_price: Number(et.tp.trigger_price), + amount: null, + id: et.tp.order_id, + channel: "algo", + }); + } + return out; + } + function findMonitorOrder(orders, symbol, side) { const want = (side || "").toLowerCase(); for (const o of orders || []) { @@ -452,7 +479,7 @@ const sideCn = side === "long" ? "做多" : "做空"; const sideCls = side === "long" ? "pos-side-long" : "pos-side-short"; const mo = monitorOrder || {}; - const cond = Array.isArray(pos.conditional_orders) ? pos.conditional_orders : []; + const cond = condOrdersFromPosition(pos); const reg = Array.isArray(pos.regular_orders) ? pos.regular_orders : []; const guess = guessTpslFromCondOrders(side, cond); const symAttr = esc(symbol).replace(/"/g, """); @@ -583,7 +610,7 @@ const symAttr = esc(x.symbol || "").replace(/"/g, """); const sideAttr = esc((x.side || "").toLowerCase()).replace(/"/g, """); const contractsAttr = esc(String(x.contracts != null ? x.contracts : "")).replace(/"/g, """); - const cond = Array.isArray(x.conditional_orders) ? x.conditional_orders : []; + const cond = condOrdersFromPosition(x); const reg = Array.isArray(x.regular_orders) ? x.regular_orders : []; const guess = guessTpslFromCondOrders(x.side, cond); const slAttr = esc(String(guess.sl)).replace(/"/g, """); diff --git a/manual_trading_hub/static/index.html b/manual_trading_hub/static/index.html index 722e38b..0966276 100644 --- a/manual_trading_hub/static/index.html +++ b/manual_trading_hub/static/index.html @@ -109,6 +109,6 @@
- +