diff --git a/lib/hub/hub_bridge.py b/lib/hub/hub_bridge.py index a32c920..f416a5b 100644 --- a/lib/hub/hub_bridge.py +++ b/lib/hub/hub_bridge.py @@ -777,6 +777,46 @@ def register_hub_routes(app): return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 return jsonify(_invoke_view_get("stop_trend_pullback", f"/stop_trend_pullback/{pid}")) + @app.route("/api/hub/order/sync-tpsl", methods=["POST"]) + @_hub_auth_required + def api_hub_order_sync_tpsl(): + """中控 agent 已挂 TP/SL 后:同步 order_monitors 计划价,避免刷新仍显示旧止损止盈。""" + body = request.get_json(silent=True) or {} + symbol = (body.get("symbol") or request.form.get("symbol") or "").strip() + side = ( + body.get("side") + or body.get("direction") + or request.form.get("side") + or "" + ).strip().lower() + if not symbol: + return jsonify({"ok": False, "msg": "symbol 不能为空"}), 400 + if side not in ("long", "short"): + return jsonify({"ok": False, "msg": "side 须为 long 或 short"}), 400 + try: + sl = float(body.get("stop_loss")) + tp = float(body.get("take_profit")) + except (TypeError, ValueError): + return jsonify({"ok": False, "msg": "stop_loss / take_profit 须为数字"}), 400 + get_db = _ctx().get("get_db") + if not callable(get_db): + return jsonify({"ok": False, "msg": "HUB_CTX 缺少 get_db"}), 500 + from lib.hub.hub_symbol_lib import symbols_match + from lib.hub.hub_order_sync_lib import sync_active_monitor_tpsl_prices + + conn = get_db() + try: + out = sync_active_monitor_tpsl_prices( + conn, symbol, side, sl, tp, symbols_match=symbols_match + ) + if out.get("ok"): + conn.commit() + return jsonify(out) + except Exception as e: + return jsonify({"ok": False, "msg": str(e)}), 500 + finally: + conn.close() + @app.route("/api/hub/order/sync-flat", methods=["POST"]) @_hub_auth_required def api_hub_order_sync_flat(): diff --git a/lib/hub/hub_order_sync_lib.py b/lib/hub/hub_order_sync_lib.py new file mode 100644 index 0000000..6269661 --- /dev/null +++ b/lib/hub/hub_order_sync_lib.py @@ -0,0 +1,114 @@ +"""中控改委托后同步实例 order_monitors 计划价(交易所已由 agent 挂单)。""" +from __future__ import annotations + +from typing import Any, Callable + + +def cond_order_role(row: dict[str, Any]) -> str | None: + lbl = (row.get("label") or "").strip().lower() + if "止损" in lbl and "止盈止损" not in lbl: + return "sl" + if "止盈" in lbl: + return "tp" + return None + + +def dedupe_conditional_orders_by_role(orders: list) -> list: + """同一持仓条件单列表:每种止盈/止损只保留一条(避免 OKX OCO 拆分 + Flask 补全重复)。""" + if not orders: + return [] + by_role: dict[str, dict] = {} + others: list[dict] = [] + for row in orders: + if not isinstance(row, dict): + continue + role = cond_order_role(row) + if role: + by_role[role] = row + else: + others.append(row) + out = list(others) + for role in ("tp", "sl"): + if role in by_role: + out.append(by_role[role]) + return out + + +def exchange_tpsl_from_cond_orders(cond: list) -> dict[str, Any] | None: + """从子代理条件单列表还原 exchange_tpsl 槽位。""" + slots: dict[str, Any] = {"sl": None, "tp": None} + for row in cond or []: + if not isinstance(row, dict): + continue + role = cond_order_role(row) + if role not in ("sl", "tp"): + continue + trig = row.get("trigger_price") + if trig is None: + continue + try: + trig_f = float(trig) + except (TypeError, ValueError): + continue + oid = row.get("algo_id") or row.get("id") or "" + slots[role] = { + "order_id": str(oid) if oid not in (None, "") else "", + "trigger_price": trig_f, + "trigger_display": f"{trig_f:g}", + "amount": row.get("amount"), + "type": row.get("type") or "", + } + if not slots["sl"] and not slots["tp"]: + return None + return slots + + +def sync_active_monitor_tpsl_prices( + conn, + symbol: str, + direction: str, + stop_loss: float, + take_profit: float, + *, + symbols_match: Callable[[str, str], bool], +) -> dict[str, Any]: + """按 symbol+方向更新 active 下单监控的 stop_loss / take_profit。""" + sym = (symbol or "").strip() + side = (direction or "").strip().lower() + if not sym: + return {"ok": False, "msg": "symbol 不能为空"} + if side not in ("long", "short"): + return {"ok": False, "msg": "side 须为 long 或 short"} + try: + sl = float(stop_loss) + tp = float(take_profit) + except (TypeError, ValueError): + return {"ok": False, "msg": "stop_loss / take_profit 须为数字"} + if sl <= 0 or tp <= 0: + return {"ok": False, "msg": "止损、止盈须大于 0"} + + rows = conn.execute( + "SELECT id, symbol, exchange_symbol, direction FROM order_monitors WHERE status='active'" + ).fetchall() + updated_ids: list[int] = [] + for row in rows: + r_sym = row["exchange_symbol"] if "exchange_symbol" in row.keys() else row["symbol"] + r_sym = r_sym or row["symbol"] + if not symbols_match(sym, r_sym or ""): + continue + r_dir = (row["direction"] or "").strip().lower() + if r_dir and r_dir != side: + continue + oid = int(row["id"]) + conn.execute( + "UPDATE order_monitors SET stop_loss=?, take_profit=? WHERE id=? AND status='active'", + (sl, tp, oid), + ) + updated_ids.append(oid) + return { + "ok": True, + "updated": len(updated_ids), + "order_monitor_ids": updated_ids, + "stop_loss": sl, + "take_profit": tp, + } diff --git a/lib/hub/hub_symbol_lib.py b/lib/hub/hub_symbol_lib.py new file mode 100644 index 0000000..dae6bdf --- /dev/null +++ b/lib/hub/hub_symbol_lib.py @@ -0,0 +1,38 @@ +"""合约 symbol 匹配(持仓 vs 监控/挂单)。""" + + +def _symbol_base_coin(symbol: str) -> str: + s = (symbol or "").strip().upper() + if not s: + return "" + if "-SWAP" in s: + s = s.replace("-SWAP", "") + if "-" in s: + return s.split("-", 1)[0] + if "/" in s: + return s.split("/", 1)[0] + if ":" in s: + return s.split(":", 1)[0] + return s + + +def symbols_match(position_symbol: str, order_symbol: str) -> bool: + a = (position_symbol or "").strip().upper() + b = (order_symbol or "").strip().upper() + if not a or not b: + return False + if a == b: + return True + ba, bb = _symbol_base_coin(a), _symbol_base_coin(b) + if ba and bb and ba == bb: + return True + for suf in (":USDT", "/USDT:USDT", "/USDT"): + a2 = a.replace(suf, "") + b2 = b.replace(suf, "") + if f"{a2}/USDT" == b or f"{a2}/USDT:USDT" == b: + return True + if f"{b2}/USDT" == a or f"{b2}/USDT:USDT" == a: + return True + if a2 == b2: + return True + return False diff --git a/manual_trading_hub/exchange_orders.py b/manual_trading_hub/exchange_orders.py index 268565b..9b702e7 100644 --- a/manual_trading_hub/exchange_orders.py +++ b/manual_trading_hub/exchange_orders.py @@ -8,6 +8,7 @@ import time from typing import Any from lib.exchange.okx_orders_lib import fetch_okx_all_open_orders +from lib.hub.hub_symbol_lib import symbols_match def _coerce_float(*values) -> float | None: @@ -37,28 +38,6 @@ def _symbol_base_coin(symbol: str) -> str: return s -def symbols_match(position_symbol: str, order_symbol: str) -> bool: - a = (position_symbol or "").strip().upper() - b = (order_symbol or "").strip().upper() - if not a or not b: - return False - if a == b: - return True - ba, bb = _symbol_base_coin(a), _symbol_base_coin(b) - if ba and bb and ba == bb: - return True - for suf in (":USDT", "/USDT:USDT", "/USDT"): - a2 = a.replace(suf, "") - b2 = b.replace(suf, "") - if f"{a2}/USDT" == b or f"{a2}/USDT:USDT" == b: - return True - if f"{b2}/USDT" == a or f"{b2}/USDT:USDT" == a: - return True - if a2 == b2: - return True - return False - - def _order_type_str(order: dict) -> str: info = order.get("info") or {} if isinstance(info, dict): @@ -424,7 +403,9 @@ def attach_orders_to_positions(positions: list[dict], orders: list[dict]) -> Non matched = [o for o in orders if symbols_match(sym, o.get("symbol") or "")] cond = [o for o in matched if o.get("category") == "conditional"] _enrich_gate_conditional_labels(cond, p.get("side") or "long") - p["conditional_orders"] = cond + from lib.hub.hub_order_sync_lib import dedupe_conditional_orders_by_role + + p["conditional_orders"] = dedupe_conditional_orders_by_role(cond) p["regular_orders"] = [o for o in matched if o.get("category") != "conditional"] diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index 47b5c9d..38620f2 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -14,6 +14,11 @@ _REPO_ROOT = Path(__file__).resolve().parent.parent if str(_REPO_ROOT) not in sys.path: sys.path.insert(0, str(_REPO_ROOT)) +from lib.hub.hub_order_sync_lib import ( + cond_order_role, + dedupe_conditional_orders_by_role, + exchange_tpsl_from_cond_orders, +) from lib.hub.hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days from lib.hub.hub_ohlcv_lib import ( CHART_TIMEFRAME_ORDER, @@ -1941,7 +1946,7 @@ def _merge_flask_position_mark_price( def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None: - """子代理挂单为空时,用实例 Flask 已算好的 exchange_tpsl 补全展示。""" + """子代理条件单优先;Flask exchange_tpsl 仅补缺失槽位,避免重复止损/止盈行。""" ag = agent_row.get("agent") if not isinstance(ag, dict): return @@ -1949,8 +1954,8 @@ def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict if not isinstance(positions, list) or not positions: return if not isinstance(snap, dict): - return - order_prices = snap.get("order_prices") or [] + snap = None + order_prices = (snap or {}).get("order_prices") or [] hub_orders = [] if isinstance(hub_mon, dict): hub_orders = hub_mon.get("orders") or [] @@ -1959,15 +1964,31 @@ def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict continue sym = p.get("symbol") or "" side = p.get("side") or "" - et = _find_exchange_tpsl_for_position(sym, side, order_prices, hub_orders) - if not et: - et = _exchange_tpsl_from_hub_order(hub_orders, sym, side) + cond = dedupe_conditional_orders_by_role(p.get("conditional_orders") or []) + roles_in_cond = { + r for row in cond if (r := cond_order_role(row)) in ("sl", "tp") + } + et = exchange_tpsl_from_cond_orders(cond) + if not et or roles_in_cond != {"sl", "tp"}: + flask_et = _find_exchange_tpsl_for_position(sym, side, order_prices, hub_orders) + if not flask_et: + flask_et = _exchange_tpsl_from_hub_order(hub_orders, sym, side) + if flask_et: + if et: + for role in ("sl", "tp"): + if not et.get(role) and flask_et.get(role): + et[role] = flask_et[role] + else: + et = flask_et if not et: + p["conditional_orders"] = cond continue p["exchange_tpsl"] = et - cond = p.get("conditional_orders") or [] merged = _tpsl_slots_to_conditional_orders(et, sym) - p["conditional_orders"] = _merge_conditional_orders_no_dup(cond, merged) + extra = [r for r in merged if cond_order_role(r) not in roles_in_cond] + p["conditional_orders"] = dedupe_conditional_orders_by_role( + _merge_conditional_orders_no_dup(cond, extra) + ) async def _fetch_exchange_flask_bundle( @@ -2392,6 +2413,22 @@ async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody): "payload": payload, "ok": bool(isinstance(payload, dict) and payload.get("ok")), } + if out.get("ok") and (ex.get("flask_url") or "").strip(): + async with httpx.AsyncClient() as flask_client: + sync_parsed = await _fetch_flask_json( + flask_client, + ex, + "/api/hub/order/sync-tpsl", + method="POST", + json_body={ + "symbol": body.symbol, + "side": body.side, + "stop_loss": body.stop_loss, + "take_profit": body.take_profit, + }, + ) + if isinstance(sync_parsed, dict): + out["order_sync"] = sync_parsed _schedule_board_refresh() return out diff --git a/manual_trading_hub/static/app.js b/manual_trading_hub/static/app.js index a8e6279..2c7c300 100644 --- a/manual_trading_hub/static/app.js +++ b/manual_trading_hub/static/app.js @@ -1831,6 +1831,29 @@ return localStorage.getItem(ordersCollapseKey(exchangeId, symbol)) === "1"; } + function condOrderRole(o) { + const lb = (o && o.label) || ""; + if (/止盈止损/.test(lb)) return null; + if (/止损/.test(lb)) return "sl"; + if (/止盈/.test(lb)) return "tp"; + return null; + } + + function dedupeCondOrdersByRole(orders) { + const list = Array.isArray(orders) ? orders : []; + const byRole = {}; + const others = []; + for (const o of list) { + const role = condOrderRole(o); + if (role) byRole[role] = o; + else others.push(o); + } + const out = others.slice(); + if (byRole.tp) out.push(byRole.tp); + if (byRole.sl) out.push(byRole.sl); + return out; + } + function dedupeCondOrdersByTrigger(orders) { const list = Array.isArray(orders) ? orders : []; const seen = new Set(); @@ -1869,9 +1892,10 @@ } function condOrdersFromPosition(pos) { - const cond = dedupeCondOrdersByTrigger( + let cond = dedupeCondOrdersByRole( Array.isArray(pos.conditional_orders) ? pos.conditional_orders : [] ); + cond = dedupeCondOrdersByTrigger(cond); const et = pos.exchange_tpsl; if (!et) return cond; upsertExTpslCondOrder(cond, "sl", et.sl); diff --git a/tests/test_hub_cond_orders_dedupe.py b/tests/test_hub_cond_orders_dedupe.py index 0313c08..503e1f6 100644 --- a/tests/test_hub_cond_orders_dedupe.py +++ b/tests/test_hub_cond_orders_dedupe.py @@ -1,6 +1,6 @@ """中控条件单列表:子代理与 Flask exchange_tpsl 合并去重。""" -from manual_trading_hub.hub import _merge_conditional_orders_no_dup +from manual_trading_hub.hub import _merge_conditional_orders_no_dup, _merge_flask_exchange_tpsl def test_merge_skips_duplicate_trigger_prices(): @@ -30,3 +30,38 @@ def test_merge_skips_duplicate_trigger_prices(): def test_merge_uses_extra_when_existing_empty(): extra = [{"id": "1", "label": "止损 57", "trigger_price": 57}] assert _merge_conditional_orders_no_dup([], extra) == extra + + +def test_merge_flask_skips_duplicate_sl_when_agent_has_both(): + agent_row = { + "agent": { + "positions": [ + { + "symbol": "SOL/USDT:USDT", + "side": "short", + "conditional_orders": [ + {"label": "止盈 76", "trigger_price": 76, "algo_id": "1"}, + {"label": "止损 84.1", "trigger_price": 84.1, "algo_id": "1"}, + {"label": "止损", "trigger_price": 84.1}, + ], + } + ] + } + } + snap = { + "order_prices": [ + { + "symbol": "SOL/USDT:USDT", + "side": "short", + "exchange_tpsl": { + "sl": {"trigger_price": 84.1, "order_id": "old"}, + "tp": {"trigger_price": 76, "order_id": "old"}, + }, + } + ] + } + _merge_flask_exchange_tpsl(agent_row, snap, None) + cond = agent_row["agent"]["positions"][0]["conditional_orders"] + sl_rows = [o for o in cond if "止损" in (o.get("label") or "")] + assert len(sl_rows) == 1 + assert len(cond) == 2 diff --git a/tests/test_hub_order_sync_lib.py b/tests/test_hub_order_sync_lib.py new file mode 100644 index 0000000..fe39e44 --- /dev/null +++ b/tests/test_hub_order_sync_lib.py @@ -0,0 +1,74 @@ +"""中控改委托同步与条件单按角色去重。""" + +from lib.hub.hub_order_sync_lib import ( + cond_order_role, + dedupe_conditional_orders_by_role, + exchange_tpsl_from_cond_orders, + sync_active_monitor_tpsl_prices, +) +from lib.hub.hub_symbol_lib import symbols_match + + +def test_cond_order_role(): + assert cond_order_role({"label": "止损 84.1"}) == "sl" + assert cond_order_role({"label": "止盈 76"}) == "tp" + assert cond_order_role({"label": "市价 买入"}) is None + + +def test_dedupe_conditional_orders_by_role_keeps_one_sl(): + rows = [ + {"label": "止盈 76", "trigger_price": 76}, + {"label": "止损", "trigger_price": 84.1}, + {"label": "止损 84.1", "trigger_price": 84.1, "id": "x:sl"}, + ] + out = dedupe_conditional_orders_by_role(rows) + assert len(out) == 2 + sl_rows = [r for r in out if cond_order_role(r) == "sl"] + assert len(sl_rows) == 1 + assert sl_rows[0]["label"] == "止损 84.1" + + +def test_exchange_tpsl_from_cond_orders(): + cond = [ + {"label": "止损 84.1", "trigger_price": 84.1, "algo_id": "1"}, + {"label": "止盈 76", "trigger_price": 76, "algo_id": "1"}, + ] + et = exchange_tpsl_from_cond_orders(cond) + assert et["sl"]["trigger_price"] == 84.1 + assert et["tp"]["trigger_price"] == 76 + + +def test_sync_active_monitor_tpsl_prices_updates_matching_order(): + class Row(dict): + def __getitem__(self, key): + return dict.get(self, key) + + class Conn: + def __init__(self): + self.rows = [ + Row( + id=5, + symbol="SOL/USDT:USDT", + exchange_symbol="SOL/USDT:USDT", + direction="short", + ) + ] + self.updates = [] + + def execute(self, sql, params=None): + if "SELECT" in sql: + return self + if "UPDATE" in sql and params: + self.updates.append(params) + return self + + def fetchall(self): + return self.rows + + conn = Conn() + out = sync_active_monitor_tpsl_prices( + conn, "SOL/USDT:USDT", "short", 85.0, 75.0, symbols_match=symbols_match + ) + assert out["ok"] is True + assert out["updated"] == 1 + assert conn.updates == [(85.0, 75.0, 5)]