Files
crypto_monitor/lib/hub/hub_order_sync_lib.py
T
2026-07-04 21:12:00 +08:00

115 lines
3.7 KiB
Python

"""中控改委托后同步实例 order_monitors 计划价(交易所已由 agent 挂单)。"""
from __future__ import annotations
from typing import Any, Callable
def cond_order_role(row: dict[str, Any]) -> str | None:
lbl = (row.get("label") or "").strip().lower()
if "止损" in lbl and "止盈止损" not in lbl:
return "sl"
if "止盈" in lbl:
return "tp"
return None
def dedupe_conditional_orders_by_role(orders: list) -> list:
"""同一持仓条件单列表:每种止盈/止损只保留一条(避免 OKX OCO 拆分 + Flask 补全重复)。"""
if not orders:
return []
by_role: dict[str, dict] = {}
others: list[dict] = []
for row in orders:
if not isinstance(row, dict):
continue
role = cond_order_role(row)
if role:
by_role[role] = row
else:
others.append(row)
out = list(others)
for role in ("tp", "sl"):
if role in by_role:
out.append(by_role[role])
return out
def exchange_tpsl_from_cond_orders(cond: list) -> dict[str, Any] | None:
"""从子代理条件单列表还原 exchange_tpsl 槽位。"""
slots: dict[str, Any] = {"sl": None, "tp": None}
for row in cond or []:
if not isinstance(row, dict):
continue
role = cond_order_role(row)
if role not in ("sl", "tp"):
continue
trig = row.get("trigger_price")
if trig is None:
continue
try:
trig_f = float(trig)
except (TypeError, ValueError):
continue
oid = row.get("algo_id") or row.get("id") or ""
slots[role] = {
"order_id": str(oid) if oid not in (None, "") else "",
"trigger_price": trig_f,
"trigger_display": f"{trig_f:g}",
"amount": row.get("amount"),
"type": row.get("type") or "",
}
if not slots["sl"] and not slots["tp"]:
return None
return slots
def sync_active_monitor_tpsl_prices(
conn,
symbol: str,
direction: str,
stop_loss: float,
take_profit: float,
*,
symbols_match: Callable[[str, str], bool],
) -> dict[str, Any]:
"""按 symbol+方向更新 active 下单监控的 stop_loss / take_profit。"""
sym = (symbol or "").strip()
side = (direction or "").strip().lower()
if not sym:
return {"ok": False, "msg": "symbol 不能为空"}
if side not in ("long", "short"):
return {"ok": False, "msg": "side 须为 long 或 short"}
try:
sl = float(stop_loss)
tp = float(take_profit)
except (TypeError, ValueError):
return {"ok": False, "msg": "stop_loss / take_profit 须为数字"}
if sl <= 0 or tp <= 0:
return {"ok": False, "msg": "止损、止盈须大于 0"}
rows = conn.execute(
"SELECT id, symbol, exchange_symbol, direction FROM order_monitors WHERE status='active'"
).fetchall()
updated_ids: list[int] = []
for row in rows:
r_sym = row["exchange_symbol"] if "exchange_symbol" in row.keys() else row["symbol"]
r_sym = r_sym or row["symbol"]
if not symbols_match(sym, r_sym or ""):
continue
r_dir = (row["direction"] or "").strip().lower()
if r_dir and r_dir != side:
continue
oid = int(row["id"])
conn.execute(
"UPDATE order_monitors SET stop_loss=?, take_profit=? WHERE id=? AND status='active'",
(sl, tp, oid),
)
updated_ids.append(oid)
return {
"ok": True,
"updated": len(updated_ids),
"order_monitor_ids": updated_ids,
"stop_loss": sl,
"take_profit": tp,
}