"""中控改委托后同步实例 order_monitors 计划价(交易所已由 agent 挂单)。""" from __future__ import annotations from typing import Any, Callable def cond_order_role(row: dict[str, Any]) -> str | None: lbl = (row.get("label") or "").strip().lower() if "止损" in lbl and "止盈止损" not in lbl: return "sl" if "止盈" in lbl: return "tp" return None def dedupe_conditional_orders_by_role(orders: list) -> list: """同一持仓条件单列表:每种止盈/止损只保留一条(避免 OKX OCO 拆分 + Flask 补全重复)。""" if not orders: return [] by_role: dict[str, dict] = {} others: list[dict] = [] for row in orders: if not isinstance(row, dict): continue role = cond_order_role(row) if role: by_role[role] = row else: others.append(row) out = list(others) for role in ("tp", "sl"): if role in by_role: out.append(by_role[role]) return out def exchange_tpsl_from_cond_orders(cond: list) -> dict[str, Any] | None: """从子代理条件单列表还原 exchange_tpsl 槽位。""" slots: dict[str, Any] = {"sl": None, "tp": None} for row in cond or []: if not isinstance(row, dict): continue role = cond_order_role(row) if role not in ("sl", "tp"): continue trig = row.get("trigger_price") if trig is None: continue try: trig_f = float(trig) except (TypeError, ValueError): continue oid = row.get("algo_id") or row.get("id") or "" slots[role] = { "order_id": str(oid) if oid not in (None, "") else "", "trigger_price": trig_f, "trigger_display": f"{trig_f:g}", "amount": row.get("amount"), "type": row.get("type") or "", } if not slots["sl"] and not slots["tp"]: return None return slots def sync_active_monitor_tpsl_prices( conn, symbol: str, direction: str, stop_loss: float, take_profit: float, *, symbols_match: Callable[[str, str], bool], ) -> dict[str, Any]: """按 symbol+方向更新 active 下单监控的 stop_loss / take_profit。""" sym = (symbol or "").strip() side = (direction or "").strip().lower() if not sym: return {"ok": False, "msg": "symbol 不能为空"} if side not in ("long", "short"): return {"ok": False, "msg": "side 须为 long 或 short"} try: sl = float(stop_loss) tp = float(take_profit) except (TypeError, ValueError): return {"ok": False, "msg": "stop_loss / take_profit 须为数字"} if sl <= 0 or tp <= 0: return {"ok": False, "msg": "止损、止盈须大于 0"} rows = conn.execute( "SELECT id, symbol, exchange_symbol, direction FROM order_monitors WHERE status='active'" ).fetchall() updated_ids: list[int] = [] for row in rows: r_sym = row["exchange_symbol"] if "exchange_symbol" in row.keys() else row["symbol"] r_sym = r_sym or row["symbol"] if not symbols_match(sym, r_sym or ""): continue r_dir = (row["direction"] or "").strip().lower() if r_dir and r_dir != side: continue oid = int(row["id"]) conn.execute( "UPDATE order_monitors SET stop_loss=?, take_profit=? WHERE id=? AND status='active'", (sl, tp, oid), ) updated_ids.append(oid) return { "ok": True, "updated": len(updated_ids), "order_monitor_ids": updated_ids, "stop_loss": sl, "take_profit": tp, }