fix: 中控改委托后同步计划价并去重条件单展示

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-04 21:12:00 +08:00
parent 54c1984ec7
commit cb9d27bf9f
8 changed files with 376 additions and 33 deletions
+40
View File
@@ -777,6 +777,46 @@ def register_hub_routes(app):
return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400
return jsonify(_invoke_view_get("stop_trend_pullback", f"/stop_trend_pullback/{pid}")) return jsonify(_invoke_view_get("stop_trend_pullback", f"/stop_trend_pullback/{pid}"))
@app.route("/api/hub/order/sync-tpsl", methods=["POST"])
@_hub_auth_required
def api_hub_order_sync_tpsl():
"""中控 agent 已挂 TP/SL 后:同步 order_monitors 计划价,避免刷新仍显示旧止损止盈。"""
body = request.get_json(silent=True) or {}
symbol = (body.get("symbol") or request.form.get("symbol") or "").strip()
side = (
body.get("side")
or body.get("direction")
or request.form.get("side")
or ""
).strip().lower()
if not symbol:
return jsonify({"ok": False, "msg": "symbol 不能为空"}), 400
if side not in ("long", "short"):
return jsonify({"ok": False, "msg": "side 须为 long 或 short"}), 400
try:
sl = float(body.get("stop_loss"))
tp = float(body.get("take_profit"))
except (TypeError, ValueError):
return jsonify({"ok": False, "msg": "stop_loss / take_profit 须为数字"}), 400
get_db = _ctx().get("get_db")
if not callable(get_db):
return jsonify({"ok": False, "msg": "HUB_CTX 缺少 get_db"}), 500
from lib.hub.hub_symbol_lib import symbols_match
from lib.hub.hub_order_sync_lib import sync_active_monitor_tpsl_prices
conn = get_db()
try:
out = sync_active_monitor_tpsl_prices(
conn, symbol, side, sl, tp, symbols_match=symbols_match
)
if out.get("ok"):
conn.commit()
return jsonify(out)
except Exception as e:
return jsonify({"ok": False, "msg": str(e)}), 500
finally:
conn.close()
@app.route("/api/hub/order/sync-flat", methods=["POST"]) @app.route("/api/hub/order/sync-flat", methods=["POST"])
@_hub_auth_required @_hub_auth_required
def api_hub_order_sync_flat(): def api_hub_order_sync_flat():
+114
View File
@@ -0,0 +1,114 @@
"""中控改委托后同步实例 order_monitors 计划价(交易所已由 agent 挂单)。"""
from __future__ import annotations
from typing import Any, Callable
def cond_order_role(row: dict[str, Any]) -> str | None:
lbl = (row.get("label") or "").strip().lower()
if "止损" in lbl and "止盈止损" not in lbl:
return "sl"
if "止盈" in lbl:
return "tp"
return None
def dedupe_conditional_orders_by_role(orders: list) -> list:
"""同一持仓条件单列表:每种止盈/止损只保留一条(避免 OKX OCO 拆分 + Flask 补全重复)。"""
if not orders:
return []
by_role: dict[str, dict] = {}
others: list[dict] = []
for row in orders:
if not isinstance(row, dict):
continue
role = cond_order_role(row)
if role:
by_role[role] = row
else:
others.append(row)
out = list(others)
for role in ("tp", "sl"):
if role in by_role:
out.append(by_role[role])
return out
def exchange_tpsl_from_cond_orders(cond: list) -> dict[str, Any] | None:
"""从子代理条件单列表还原 exchange_tpsl 槽位。"""
slots: dict[str, Any] = {"sl": None, "tp": None}
for row in cond or []:
if not isinstance(row, dict):
continue
role = cond_order_role(row)
if role not in ("sl", "tp"):
continue
trig = row.get("trigger_price")
if trig is None:
continue
try:
trig_f = float(trig)
except (TypeError, ValueError):
continue
oid = row.get("algo_id") or row.get("id") or ""
slots[role] = {
"order_id": str(oid) if oid not in (None, "") else "",
"trigger_price": trig_f,
"trigger_display": f"{trig_f:g}",
"amount": row.get("amount"),
"type": row.get("type") or "",
}
if not slots["sl"] and not slots["tp"]:
return None
return slots
def sync_active_monitor_tpsl_prices(
conn,
symbol: str,
direction: str,
stop_loss: float,
take_profit: float,
*,
symbols_match: Callable[[str, str], bool],
) -> dict[str, Any]:
"""按 symbol+方向更新 active 下单监控的 stop_loss / take_profit。"""
sym = (symbol or "").strip()
side = (direction or "").strip().lower()
if not sym:
return {"ok": False, "msg": "symbol 不能为空"}
if side not in ("long", "short"):
return {"ok": False, "msg": "side 须为 long 或 short"}
try:
sl = float(stop_loss)
tp = float(take_profit)
except (TypeError, ValueError):
return {"ok": False, "msg": "stop_loss / take_profit 须为数字"}
if sl <= 0 or tp <= 0:
return {"ok": False, "msg": "止损、止盈须大于 0"}
rows = conn.execute(
"SELECT id, symbol, exchange_symbol, direction FROM order_monitors WHERE status='active'"
).fetchall()
updated_ids: list[int] = []
for row in rows:
r_sym = row["exchange_symbol"] if "exchange_symbol" in row.keys() else row["symbol"]
r_sym = r_sym or row["symbol"]
if not symbols_match(sym, r_sym or ""):
continue
r_dir = (row["direction"] or "").strip().lower()
if r_dir and r_dir != side:
continue
oid = int(row["id"])
conn.execute(
"UPDATE order_monitors SET stop_loss=?, take_profit=? WHERE id=? AND status='active'",
(sl, tp, oid),
)
updated_ids.append(oid)
return {
"ok": True,
"updated": len(updated_ids),
"order_monitor_ids": updated_ids,
"stop_loss": sl,
"take_profit": tp,
}
+38
View File
@@ -0,0 +1,38 @@
"""合约 symbol 匹配(持仓 vs 监控/挂单)。"""
def _symbol_base_coin(symbol: str) -> str:
s = (symbol or "").strip().upper()
if not s:
return ""
if "-SWAP" in s:
s = s.replace("-SWAP", "")
if "-" in s:
return s.split("-", 1)[0]
if "/" in s:
return s.split("/", 1)[0]
if ":" in s:
return s.split(":", 1)[0]
return s
def symbols_match(position_symbol: str, order_symbol: str) -> bool:
a = (position_symbol or "").strip().upper()
b = (order_symbol or "").strip().upper()
if not a or not b:
return False
if a == b:
return True
ba, bb = _symbol_base_coin(a), _symbol_base_coin(b)
if ba and bb and ba == bb:
return True
for suf in (":USDT", "/USDT:USDT", "/USDT"):
a2 = a.replace(suf, "")
b2 = b.replace(suf, "")
if f"{a2}/USDT" == b or f"{a2}/USDT:USDT" == b:
return True
if f"{b2}/USDT" == a or f"{b2}/USDT:USDT" == a:
return True
if a2 == b2:
return True
return False
+4 -23
View File
@@ -8,6 +8,7 @@ import time
from typing import Any from typing import Any
from lib.exchange.okx_orders_lib import fetch_okx_all_open_orders from lib.exchange.okx_orders_lib import fetch_okx_all_open_orders
from lib.hub.hub_symbol_lib import symbols_match
def _coerce_float(*values) -> float | None: def _coerce_float(*values) -> float | None:
@@ -37,28 +38,6 @@ def _symbol_base_coin(symbol: str) -> str:
return s return s
def symbols_match(position_symbol: str, order_symbol: str) -> bool:
a = (position_symbol or "").strip().upper()
b = (order_symbol or "").strip().upper()
if not a or not b:
return False
if a == b:
return True
ba, bb = _symbol_base_coin(a), _symbol_base_coin(b)
if ba and bb and ba == bb:
return True
for suf in (":USDT", "/USDT:USDT", "/USDT"):
a2 = a.replace(suf, "")
b2 = b.replace(suf, "")
if f"{a2}/USDT" == b or f"{a2}/USDT:USDT" == b:
return True
if f"{b2}/USDT" == a or f"{b2}/USDT:USDT" == a:
return True
if a2 == b2:
return True
return False
def _order_type_str(order: dict) -> str: def _order_type_str(order: dict) -> str:
info = order.get("info") or {} info = order.get("info") or {}
if isinstance(info, dict): if isinstance(info, dict):
@@ -424,7 +403,9 @@ def attach_orders_to_positions(positions: list[dict], orders: list[dict]) -> Non
matched = [o for o in orders if symbols_match(sym, o.get("symbol") or "")] matched = [o for o in orders if symbols_match(sym, o.get("symbol") or "")]
cond = [o for o in matched if o.get("category") == "conditional"] cond = [o for o in matched if o.get("category") == "conditional"]
_enrich_gate_conditional_labels(cond, p.get("side") or "long") _enrich_gate_conditional_labels(cond, p.get("side") or "long")
p["conditional_orders"] = cond from lib.hub.hub_order_sync_lib import dedupe_conditional_orders_by_role
p["conditional_orders"] = dedupe_conditional_orders_by_role(cond)
p["regular_orders"] = [o for o in matched if o.get("category") != "conditional"] p["regular_orders"] = [o for o in matched if o.get("category") != "conditional"]
+45 -8
View File
@@ -14,6 +14,11 @@ _REPO_ROOT = Path(__file__).resolve().parent.parent
if str(_REPO_ROOT) not in sys.path: if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT)) sys.path.insert(0, str(_REPO_ROOT))
from lib.hub.hub_order_sync_lib import (
cond_order_role,
dedupe_conditional_orders_by_role,
exchange_tpsl_from_cond_orders,
)
from lib.hub.hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days from lib.hub.hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days
from lib.hub.hub_ohlcv_lib import ( from lib.hub.hub_ohlcv_lib import (
CHART_TIMEFRAME_ORDER, CHART_TIMEFRAME_ORDER,
@@ -1941,7 +1946,7 @@ def _merge_flask_position_mark_price(
def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None: def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict | None) -> None:
"""子代理挂单为空时,用实例 Flask 已算好的 exchange_tpsl 补全展示""" """子代理条件单优先;Flask exchange_tpsl 仅补缺失槽位,避免重复止损/止盈行"""
ag = agent_row.get("agent") ag = agent_row.get("agent")
if not isinstance(ag, dict): if not isinstance(ag, dict):
return return
@@ -1949,8 +1954,8 @@ def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict
if not isinstance(positions, list) or not positions: if not isinstance(positions, list) or not positions:
return return
if not isinstance(snap, dict): if not isinstance(snap, dict):
return snap = None
order_prices = snap.get("order_prices") or [] order_prices = (snap or {}).get("order_prices") or []
hub_orders = [] hub_orders = []
if isinstance(hub_mon, dict): if isinstance(hub_mon, dict):
hub_orders = hub_mon.get("orders") or [] hub_orders = hub_mon.get("orders") or []
@@ -1959,15 +1964,31 @@ def _merge_flask_exchange_tpsl(agent_row: dict, snap: dict | None, hub_mon: dict
continue continue
sym = p.get("symbol") or "" sym = p.get("symbol") or ""
side = p.get("side") or "" side = p.get("side") or ""
et = _find_exchange_tpsl_for_position(sym, side, order_prices, hub_orders) cond = dedupe_conditional_orders_by_role(p.get("conditional_orders") or [])
if not et: roles_in_cond = {
et = _exchange_tpsl_from_hub_order(hub_orders, sym, side) r for row in cond if (r := cond_order_role(row)) in ("sl", "tp")
}
et = exchange_tpsl_from_cond_orders(cond)
if not et or roles_in_cond != {"sl", "tp"}:
flask_et = _find_exchange_tpsl_for_position(sym, side, order_prices, hub_orders)
if not flask_et:
flask_et = _exchange_tpsl_from_hub_order(hub_orders, sym, side)
if flask_et:
if et:
for role in ("sl", "tp"):
if not et.get(role) and flask_et.get(role):
et[role] = flask_et[role]
else:
et = flask_et
if not et: if not et:
p["conditional_orders"] = cond
continue continue
p["exchange_tpsl"] = et p["exchange_tpsl"] = et
cond = p.get("conditional_orders") or []
merged = _tpsl_slots_to_conditional_orders(et, sym) merged = _tpsl_slots_to_conditional_orders(et, sym)
p["conditional_orders"] = _merge_conditional_orders_no_dup(cond, merged) extra = [r for r in merged if cond_order_role(r) not in roles_in_cond]
p["conditional_orders"] = dedupe_conditional_orders_by_role(
_merge_conditional_orders_no_dup(cond, extra)
)
async def _fetch_exchange_flask_bundle( async def _fetch_exchange_flask_bundle(
@@ -2392,6 +2413,22 @@ async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody):
"payload": payload, "payload": payload,
"ok": bool(isinstance(payload, dict) and payload.get("ok")), "ok": bool(isinstance(payload, dict) and payload.get("ok")),
} }
if out.get("ok") and (ex.get("flask_url") or "").strip():
async with httpx.AsyncClient() as flask_client:
sync_parsed = await _fetch_flask_json(
flask_client,
ex,
"/api/hub/order/sync-tpsl",
method="POST",
json_body={
"symbol": body.symbol,
"side": body.side,
"stop_loss": body.stop_loss,
"take_profit": body.take_profit,
},
)
if isinstance(sync_parsed, dict):
out["order_sync"] = sync_parsed
_schedule_board_refresh() _schedule_board_refresh()
return out return out
+25 -1
View File
@@ -1831,6 +1831,29 @@
return localStorage.getItem(ordersCollapseKey(exchangeId, symbol)) === "1"; return localStorage.getItem(ordersCollapseKey(exchangeId, symbol)) === "1";
} }
function condOrderRole(o) {
const lb = (o && o.label) || "";
if (/止盈止损/.test(lb)) return null;
if (/止损/.test(lb)) return "sl";
if (/止盈/.test(lb)) return "tp";
return null;
}
function dedupeCondOrdersByRole(orders) {
const list = Array.isArray(orders) ? orders : [];
const byRole = {};
const others = [];
for (const o of list) {
const role = condOrderRole(o);
if (role) byRole[role] = o;
else others.push(o);
}
const out = others.slice();
if (byRole.tp) out.push(byRole.tp);
if (byRole.sl) out.push(byRole.sl);
return out;
}
function dedupeCondOrdersByTrigger(orders) { function dedupeCondOrdersByTrigger(orders) {
const list = Array.isArray(orders) ? orders : []; const list = Array.isArray(orders) ? orders : [];
const seen = new Set(); const seen = new Set();
@@ -1869,9 +1892,10 @@
} }
function condOrdersFromPosition(pos) { function condOrdersFromPosition(pos) {
const cond = dedupeCondOrdersByTrigger( let cond = dedupeCondOrdersByRole(
Array.isArray(pos.conditional_orders) ? pos.conditional_orders : [] Array.isArray(pos.conditional_orders) ? pos.conditional_orders : []
); );
cond = dedupeCondOrdersByTrigger(cond);
const et = pos.exchange_tpsl; const et = pos.exchange_tpsl;
if (!et) return cond; if (!et) return cond;
upsertExTpslCondOrder(cond, "sl", et.sl); upsertExTpslCondOrder(cond, "sl", et.sl);
+36 -1
View File
@@ -1,6 +1,6 @@
"""中控条件单列表:子代理与 Flask exchange_tpsl 合并去重。""" """中控条件单列表:子代理与 Flask exchange_tpsl 合并去重。"""
from manual_trading_hub.hub import _merge_conditional_orders_no_dup from manual_trading_hub.hub import _merge_conditional_orders_no_dup, _merge_flask_exchange_tpsl
def test_merge_skips_duplicate_trigger_prices(): def test_merge_skips_duplicate_trigger_prices():
@@ -30,3 +30,38 @@ def test_merge_skips_duplicate_trigger_prices():
def test_merge_uses_extra_when_existing_empty(): def test_merge_uses_extra_when_existing_empty():
extra = [{"id": "1", "label": "止损 57", "trigger_price": 57}] extra = [{"id": "1", "label": "止损 57", "trigger_price": 57}]
assert _merge_conditional_orders_no_dup([], extra) == extra assert _merge_conditional_orders_no_dup([], extra) == extra
def test_merge_flask_skips_duplicate_sl_when_agent_has_both():
agent_row = {
"agent": {
"positions": [
{
"symbol": "SOL/USDT:USDT",
"side": "short",
"conditional_orders": [
{"label": "止盈 76", "trigger_price": 76, "algo_id": "1"},
{"label": "止损 84.1", "trigger_price": 84.1, "algo_id": "1"},
{"label": "止损", "trigger_price": 84.1},
],
}
]
}
}
snap = {
"order_prices": [
{
"symbol": "SOL/USDT:USDT",
"side": "short",
"exchange_tpsl": {
"sl": {"trigger_price": 84.1, "order_id": "old"},
"tp": {"trigger_price": 76, "order_id": "old"},
},
}
]
}
_merge_flask_exchange_tpsl(agent_row, snap, None)
cond = agent_row["agent"]["positions"][0]["conditional_orders"]
sl_rows = [o for o in cond if "止损" in (o.get("label") or "")]
assert len(sl_rows) == 1
assert len(cond) == 2
+74
View File
@@ -0,0 +1,74 @@
"""中控改委托同步与条件单按角色去重。"""
from lib.hub.hub_order_sync_lib import (
cond_order_role,
dedupe_conditional_orders_by_role,
exchange_tpsl_from_cond_orders,
sync_active_monitor_tpsl_prices,
)
from lib.hub.hub_symbol_lib import symbols_match
def test_cond_order_role():
assert cond_order_role({"label": "止损 84.1"}) == "sl"
assert cond_order_role({"label": "止盈 76"}) == "tp"
assert cond_order_role({"label": "市价 买入"}) is None
def test_dedupe_conditional_orders_by_role_keeps_one_sl():
rows = [
{"label": "止盈 76", "trigger_price": 76},
{"label": "止损", "trigger_price": 84.1},
{"label": "止损 84.1", "trigger_price": 84.1, "id": "x:sl"},
]
out = dedupe_conditional_orders_by_role(rows)
assert len(out) == 2
sl_rows = [r for r in out if cond_order_role(r) == "sl"]
assert len(sl_rows) == 1
assert sl_rows[0]["label"] == "止损 84.1"
def test_exchange_tpsl_from_cond_orders():
cond = [
{"label": "止损 84.1", "trigger_price": 84.1, "algo_id": "1"},
{"label": "止盈 76", "trigger_price": 76, "algo_id": "1"},
]
et = exchange_tpsl_from_cond_orders(cond)
assert et["sl"]["trigger_price"] == 84.1
assert et["tp"]["trigger_price"] == 76
def test_sync_active_monitor_tpsl_prices_updates_matching_order():
class Row(dict):
def __getitem__(self, key):
return dict.get(self, key)
class Conn:
def __init__(self):
self.rows = [
Row(
id=5,
symbol="SOL/USDT:USDT",
exchange_symbol="SOL/USDT:USDT",
direction="short",
)
]
self.updates = []
def execute(self, sql, params=None):
if "SELECT" in sql:
return self
if "UPDATE" in sql and params:
self.updates.append(params)
return self
def fetchall(self):
return self.rows
conn = Conn()
out = sync_active_monitor_tpsl_prices(
conn, "SOL/USDT:USDT", "short", 85.0, 75.0, symbols_match=symbols_match
)
assert out["ok"] is True
assert out["updated"] == 1
assert conn.updates == [(85.0, 75.0, 5)]