From eb975b0133f5e47b33213e2d62386aa58345b604 Mon Sep 17 00:00:00 2001 From: dekun Date: Sat, 4 Jul 2026 22:44:16 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BA=A4=E6=98=93=E5=AE=89=E5=85=A8?= =?UTF-8?q?=E5=AE=A1=E8=AE=A1=E4=BF=AE=E5=A4=8D=20=E2=80=94=20=E8=A1=A5?= =?UTF-8?q?=E5=81=BF=E5=B9=B3=E4=BB=93=E3=80=81=E4=B8=AD=E6=8E=A7=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E3=80=81=E6=BB=9A=E4=BB=93/=E8=B6=8B=E5=8A=BF?= =?UTF-8?q?=E9=98=B2=E6=8A=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Cursor --- crypto_monitor_binance/app.py | 125 +++++++++- crypto_monitor_gate/app.py | 223 +++++++++++------- crypto_monitor_okx/app.py | 122 +++++++++- lib/hub/hub_reconcile_flat_lib.py | 95 ++++++++ .../trigger_entry_key_monitor_lib.py | 28 +++ lib/strategy/strategy_roll_monitor_lib.py | 17 +- lib/strategy/strategy_trend_register.py | 85 +++++-- lib/trade/compensating_close_lib.py | 16 ++ manual_trading_hub/agent.py | 2 +- manual_trading_hub/exchange_orders.py | 9 +- manual_trading_hub/hub.py | 115 ++++++--- 11 files changed, 675 insertions(+), 162 deletions(-) create mode 100644 lib/hub/hub_reconcile_flat_lib.py create mode 100644 lib/trade/compensating_close_lib.py diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 7fc485c..31b40ca 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -130,6 +130,9 @@ from lib.key_monitor.trigger_entry_key_monitor_lib import ( TRIGGER_ENTRY_VALIDITY_HOURS, check_trigger_entry_intent_limit, count_pending_trigger_entries, + acquire_trigger_entry_exec_lock, + is_trigger_entry_in_flight_row, + release_trigger_entry_exec_lock, is_breakout_trigger_entry_key_monitor_type, is_trigger_entry_expired, is_trigger_entry_key_monitor_type, @@ -3469,6 +3472,40 @@ def ensure_markets_loaded(force=False): MARKETS_LOADED = True +def _abort_market_open_after_tpsl_failure(exchange_symbol, direction, order, planned_amount): + from lib.trade.compensating_close_lib import run_compensating_close + + def _close(): + ensure_markets_loaded() + try: + cancel_binance_futures_open_orders(exchange_symbol) + except Exception: + pass + live = get_live_position_contracts(exchange_symbol, direction) + amt = live if live is not None and live > 0 else _filled_amount_for_tpsl(order, planned_amount) + if amt is None or float(amt) <= 0: + return + side = "sell" if direction == "long" else "buy" + try: + amount = float(exchange.amount_to_precision(exchange_symbol, float(amt))) + except Exception: + amount = float(amt) + last_err = None + for params in _binance_market_close_param_candidates(direction): + try: + exchange.create_order(exchange_symbol, "market", side, amount, None, params) + return + except Exception as e: + last_err = e + if _is_binance_close_param_retryable(str(e)): + continue + raise + if last_err: + raise last_err + + run_compensating_close(_close, log_prefix="binance_compensating_close") + + def place_exchange_order(exchange_symbol, direction, amount, leverage, stop_loss=None, take_profit=None): ensure_markets_loaded() mm = "cross" if BINANCE_MARGIN_MODE in ("cross", "cross_margin") else "isolated" @@ -3487,8 +3524,10 @@ def place_exchange_order(exchange_symbol, direction, amount, leverage, stop_loss _binance_place_tp_sl_orders(exchange_symbol, direction, pos_amt, stop_loss, take_profit) order["tpsl_attached"] = True except RuntimeError: + _abort_market_open_after_tpsl_failure(exchange_symbol, direction, order, amount) raise except Exception as e: + _abort_market_open_after_tpsl_failure(exchange_symbol, direction, order, amount) raise RuntimeError(f"交易所未接受条件止盈/止损委托,已拒绝开仓:{str(e)}") from e return order @@ -4495,6 +4534,72 @@ def resolve_synced_flat_close(row, opened_at_str, opened_at_ms=None): ) +def _finalize_hub_flat_monitor_binance(conn, r, *, result, pnl_amount, closed_at, miss_reason): + opened_at = get_opened_at_value(r) + closed_at_dt = parse_dt_for_trading_day(closed_at) or app_now() + hold_seconds = calc_hold_seconds(opened_at, closed_at_dt) + session_date = r["session_date"] or get_trading_day(closed_at_dt) + update_session_capital(conn, session_date, pnl_amount) + insert_trade_record( + conn, + symbol=r["symbol"], + monitor_type=trade_record_monitor_type(conn, r), + trend_plan_id=trend_plan_id_from_monitor_row(r), + key_signal_type=order_row_key_signal_type(r), + direction=r["direction"], + trigger_price=r["trigger_price"], + stop_loss=r["stop_loss"], + initial_stop_loss=r["initial_stop_loss"] or r["stop_loss"], + take_profit=r["take_profit"], + margin_capital=r["margin_capital"], + leverage=r["leverage"], + pnl_amount=pnl_amount, + hold_seconds=hold_seconds, + trade_style=r["trade_style"], + risk_amount=r["risk_amount"], + planned_rr=calc_rr_ratio( + r["direction"], + r["trigger_price"], + r["initial_stop_loss"] or r["stop_loss"], + r["take_profit"], + ), + actual_rr=calc_actual_rr(pnl_amount, r["risk_amount"]), + result=result, + miss_reason=handoff_trade_miss_reason(miss_reason, r), + opened_at=opened_at, + closed_at=closed_at, + ) + conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (r["id"],)) + clear_key_sizing_snapshot_if_flat(conn, r["session_date"] or get_trading_day()) + + +def reconcile_hub_external_close(conn, symbol, direction): + from lib.hub.hub_reconcile_flat_lib import reconcile_hub_external_close_impl + from lib.hub.hub_symbol_lib import symbols_match + + global _RECONCILE_FLAT_STREAK + + return reconcile_hub_external_close_impl( + conn, + symbol, + direction, + exchange_configured=exchange_private_api_configured, + not_configured_msg="未配置 BINANCE_API_KEY / BINANCE_API_SECRET", + symbols_match=symbols_match, + get_opened_at_value=get_opened_at_value, + resolve_monitor_exchange_symbol=resolve_monitor_exchange_symbol, + get_live_position_contracts=get_live_position_contracts, + cancel_conditional_orders=cancel_binance_futures_open_orders, + resolve_synced_flat_close=resolve_synced_flat_close, + finalize_stopped_monitor=_finalize_hub_flat_monitor_binance, + sync_trade_records=None, + reconcile_flat_streak=_RECONCILE_FLAT_STREAK, + to_ms_with_fallback=_to_ms_with_fallback, + prefer_manual_resolve=False, + order_row_monitor_type=order_row_monitor_type, + ) + + def reconcile_external_closes(conn, days=None): global _RECONCILE_FLAT_STREAK if not exchange_private_api_configured(): @@ -5777,7 +5882,7 @@ def _market_open_for_trigger_entry( def _execute_trigger_entry_cross(conn, row): - """标记价触达计划入场:先删监控行防重复触发,再市价开仓。""" + """标记价触达计划入场:加锁防重复触发,成交成功后再删监控行。""" symbol = row["symbol"] direction = (row["direction"] or "long").lower() ex_sym = normalize_exchange_symbol(symbol) @@ -5788,7 +5893,8 @@ def _execute_trigger_entry_cross(conn, row): tc_en, tc_h, _ = time_close_settings_from_row(row) kid = int(row["id"]) - conn.execute("DELETE FROM key_monitors WHERE id=?", (kid,)) + if not acquire_trigger_entry_exec_lock(conn, kid): + return False, "触价开仓进行中" conn.commit() try: @@ -5806,6 +5912,8 @@ def _execute_trigger_entry_cross(conn, row): time_close_hours=tc_h, ) except Exception as e: + release_trigger_entry_exec_lock(conn, kid) + conn.commit() fail_msg = friendly_exchange_error(e) send_wechat_msg( f"# ❌ {symbol} 触价开仓异常\n" @@ -5817,6 +5925,8 @@ def _execute_trigger_entry_cross(conn, row): return False, fail_msg if ok and det: + conn.execute("DELETE FROM key_monitors WHERE id=?", (kid,)) + conn.commit() rr_txt = format_wechat_scalar_2dp(det.get("planned_rr_fill")) if det.get("planned_rr_fill") is not None else "-" msg = ( f"# ✅ {symbol} 触价开仓成交\n" @@ -5833,6 +5943,8 @@ def _execute_trigger_entry_cross(conn, row): send_wechat_msg(msg) insert_key_monitor_history(conn, row, 0, msg, TRIGGER_ENTRY_CLOSE_FILLED) return True, None + release_trigger_entry_exec_lock(conn, kid) + conn.commit() fail_msg = err or "触价触发后开仓失败" send_wechat_msg( f"# ❌ {symbol} 触价开仓失败\n" @@ -5860,6 +5972,8 @@ def check_trigger_entry_key_monitors(): sl = float(_sqlite_row_val(r, "fib_stop_loss") or 0) tp = float(_sqlite_row_val(r, "fib_take_profit") or 0) kid = int(r["id"]) + if is_trigger_entry_in_flight_row(r): + continue if entry <= 0 or sl <= 0 or tp <= 0: _finalize_key_monitor_one_shot(conn, r, "触价计划价位无效", "fib_plan_invalid") continue @@ -6388,7 +6502,7 @@ def check_order_monitors(): new_sl = round_price_to_exchange(ex_sym, new_sl) tp_ex = float(take_profit or 0) ok_live, _live_reason = ensure_exchange_live_ready() - synced_ex = not ok_live + synced_ex = False if ok_live and tp_ex > 0: try: replace_active_monitor_tpsl_on_exchange(r, new_sl, tp_ex) @@ -6818,8 +6932,8 @@ def background_task(): from lib.strategy.strategy_trend_register import check_trend_pullback_plans check_trend_pullback_plans(cfg) - except: - pass + except Exception as e: + print(f"[monitor_loop] {e}", flush=True) time.sleep(MONITOR_POLL_SECONDS) @@ -9667,6 +9781,7 @@ try: ohlcv_fn=_hub_fetch_ohlcv, volume_rank_fn=_hub_fetch_volume_rank, market_fn=_hub_fetch_market, + reconcile_hub_flat_fn=reconcile_hub_external_close, risk_status_fn=hub_account_risk_status, user_close_fn=hub_user_initiated_close, render_main_page_fn=render_main_page, diff --git a/crypto_monitor_gate/app.py b/crypto_monitor_gate/app.py index 1ed39c8..52675ab 100644 --- a/crypto_monitor_gate/app.py +++ b/crypto_monitor_gate/app.py @@ -131,6 +131,9 @@ from lib.key_monitor.trigger_entry_key_monitor_lib import ( TRIGGER_ENTRY_VALIDITY_HOURS, check_trigger_entry_intent_limit, count_pending_trigger_entries, + acquire_trigger_entry_exec_lock, + is_trigger_entry_in_flight_row, + release_trigger_entry_exec_lock, is_breakout_trigger_entry_key_monitor_type, is_trigger_entry_expired, is_trigger_entry_key_monitor_type, @@ -3135,7 +3138,7 @@ def _gate_place_tp_sl_orders_position_price_orders(exchange_symbol, direction, s try: exchange.privateFuturesPostSettlePriceOrders(_payload(tp_s, tp_rule)) except Exception: - cancel_gate_swap_trigger_orders(exchange_symbol) + # 保留已挂止损,仅放弃本次 TP;上层可补偿平仓或重试 raise return except Exception as e: @@ -3252,6 +3255,27 @@ def ensure_markets_loaded(force=False): MARKETS_LOADED = True +def _abort_market_open_after_tpsl_failure(exchange_symbol, direction, order, planned_amount): + """TP/SL 挂失败时市价平掉刚开的仓并撤残留条件单。""" + from lib.trade.compensating_close_lib import run_compensating_close + + def _close(): + ensure_markets_loaded() + try: + cancel_gate_swap_trigger_orders(exchange_symbol) + except Exception: + pass + live = get_live_position_contracts(exchange_symbol, direction) + amt = live if live is not None and live > 0 else _gate_contracts_amount_for_tpsl(order, planned_amount) + if amt is None or float(amt) <= 0: + return + side = "sell" if direction == "long" else "buy" + params = build_gate_order_params(direction, reduce_only=True) + exchange.create_order(exchange_symbol, "market", side, float(amt), None, params) + + run_compensating_close(_close, log_prefix="gate_compensating_close") + + def place_exchange_order(exchange_symbol, direction, amount, leverage, stop_loss=None, take_profit=None): ensure_markets_loaded() exchange.set_leverage(leverage, exchange_symbol) @@ -3265,22 +3289,48 @@ def place_exchange_order(exchange_symbol, direction, amount, leverage, stop_loss _gate_place_tp_sl_orders(exchange_symbol, direction, contracts_amt, stop_loss, take_profit) order["tpsl_attached"] = True except RuntimeError: + _abort_market_open_after_tpsl_failure(exchange_symbol, direction, order, amount) raise except Exception as e: + _abort_market_open_after_tpsl_failure(exchange_symbol, direction, order, amount) raise RuntimeError(f"交易所未接受条件止盈/止损委托,已拒绝开仓:{str(e)}") from e return order def close_exchange_order(order_row): + """ + 市价全平。数量优先取交易所当前持仓张数,避免仅用入库 order_amount 导致平不干净。 + """ ensure_markets_loaded() exchange_symbol = order_row["exchange_symbol"] or normalize_exchange_symbol(order_row["symbol"]) - amount = float(order_row["order_amount"] or 0) - if amount <= 0: - raise ValueError("平仓失败:缺少有效下单数量") direction = order_row["direction"] + db_amt = float(order_row["order_amount"] or 0) side = "sell" if direction == "long" else "buy" - params = build_gate_order_params(direction, reduce_only=True) - return exchange.create_order(exchange_symbol, "market", side, amount, None, params) + last_resp = None + for _ in range(3): + live = get_live_position_contracts(exchange_symbol, direction) + if live is not None and live > 0: + raw_amt = live + else: + raw_amt = db_amt + if raw_amt <= 0: + if last_resp is not None: + return last_resp + raise ValueError("平仓失败:缺少有效下单数量") + try: + amount = float(exchange.amount_to_precision(exchange_symbol, raw_amt)) + except Exception: + amount = float(raw_amt) + if amount <= 0: + if last_resp is not None: + return last_resp + raise ValueError("平仓失败:数量经精度舍入后为 0") + params = build_gate_order_params(direction, reduce_only=True) + last_resp = exchange.create_order(exchange_symbol, "market", side, amount, None, params) + live_after = get_live_position_contracts(exchange_symbol, direction) + if live_after is None or live_after <= 0: + return last_resp + return last_resp def _gate_swap_trigger_order_params(): @@ -4113,89 +4163,71 @@ def resolve_synced_flat_close(row, opened_at_str, opened_at_ms=None, *, prefer_m ) +def _finalize_hub_flat_monitor(conn, r, *, result, pnl_amount, closed_at, miss_reason): + opened_at = get_opened_at_value(r) + closed_at_dt = parse_dt_for_trading_day(closed_at) or app_now() + hold_seconds = calc_hold_seconds(opened_at, closed_at_dt) + session_date = r["session_date"] or get_trading_day(closed_at_dt) + update_session_capital(conn, session_date, pnl_amount) + insert_trade_record( + conn, + symbol=r["symbol"], + monitor_type=trade_record_monitor_type(conn, r), + trend_plan_id=trend_plan_id_from_monitor_row(r), + key_signal_type=order_row_key_signal_type(r), + direction=r["direction"], + trigger_price=r["trigger_price"], + stop_loss=r["stop_loss"], + initial_stop_loss=r["initial_stop_loss"] or r["stop_loss"], + take_profit=r["take_profit"], + margin_capital=margin_capital_for_trade_record(r), + leverage=r["leverage"], + pnl_amount=pnl_amount, + hold_seconds=hold_seconds, + trade_style=r["trade_style"], + risk_amount=r["risk_amount"], + planned_rr=calc_rr_ratio( + r["direction"], + r["trigger_price"], + r["initial_stop_loss"] or r["stop_loss"], + r["take_profit"], + ), + actual_rr=calc_actual_rr(pnl_amount, r["risk_amount"]), + result=result, + miss_reason=handoff_trade_miss_reason(miss_reason, r), + opened_at=opened_at, + closed_at=closed_at, + ) + conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (r["id"],)) + clear_key_sizing_snapshot_if_flat(conn, r["session_date"] or get_trading_day()) + + def reconcile_hub_external_close(conn, symbol, direction): """中控市价全平后:立即同步匹配 order_monitor,并读 Gate 平仓历史。""" - if not exchange_private_api_configured(): - return {"ok": False, "msg": "未配置 GATE_API_KEY / GATE_API_SECRET", "synced": 0} - from lib.exchange.gate_position_history_lib import unified_symbol_for_match + from lib.hub.hub_reconcile_flat_lib import reconcile_hub_external_close_impl + from lib.hub.hub_symbol_lib import symbols_match - sym_u = unified_symbol_for_match(symbol) - dir_l = (direction or "").strip().lower() - if dir_l not in ("long", "short"): - return {"ok": False, "msg": "side 须为 long 或 short", "synced": 0} - synced = 0 - rows = conn.execute( - "SELECT * FROM order_monitors WHERE status IN ('active', 'error')" - ).fetchall() - for r in rows: - if unified_symbol_for_match(r["symbol"]) != sym_u: - continue - if (r["direction"] or "").strip().lower() != dir_l: - continue - oid = int(r["id"]) - if r["status"] == "error": - opened_at_chk = get_opened_at_value(r) - existing = conn.execute( - "SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type=? LIMIT 1", - (r["symbol"], opened_at_chk, order_row_monitor_type(r)), - ).fetchone() - if existing: - conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,)) - synced += 1 - continue - exchange_symbol = resolve_monitor_exchange_symbol(r) - live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) - if live_contracts is None: - continue - if live_contracts > 0: - time.sleep(0.6) - live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) - if live_contracts is None or live_contracts > 0: - continue - global _RECONCILE_FLAT_STREAK - _RECONCILE_FLAT_STREAK.pop(oid, None) - cancel_gate_swap_trigger_orders(exchange_symbol) - opened_at = get_opened_at_value(r) - opened_at_ms = _to_ms_with_fallback(r["opened_at_ms"] if "opened_at_ms" in r.keys() else None, opened_at) - result, pnl_amount, closed_at, miss_reason = resolve_synced_flat_close( - r, opened_at, opened_at_ms=opened_at_ms, prefer_manual=True - ) - closed_at_dt = parse_dt_for_trading_day(closed_at) or app_now() - hold_seconds = calc_hold_seconds(opened_at, closed_at_dt) - session_date = r["session_date"] or get_trading_day(closed_at_dt) - update_session_capital(conn, session_date, pnl_amount) - insert_trade_record( - conn, - symbol=r["symbol"], - monitor_type=trade_record_monitor_type(conn, r), - trend_plan_id=trend_plan_id_from_monitor_row(r), - key_signal_type=order_row_key_signal_type(r), - direction=r["direction"], - trigger_price=r["trigger_price"], - stop_loss=r["stop_loss"], - initial_stop_loss=r["initial_stop_loss"] or r["stop_loss"], - take_profit=r["take_profit"], - margin_capital=margin_capital_for_trade_record(r), - leverage=r["leverage"], - pnl_amount=pnl_amount, - hold_seconds=hold_seconds, - trade_style=r["trade_style"], - risk_amount=r["risk_amount"], - planned_rr=calc_rr_ratio(r["direction"], r["trigger_price"], r["initial_stop_loss"] or r["stop_loss"], r["take_profit"]), - actual_rr=calc_actual_rr(pnl_amount, r["risk_amount"]), - result=result, - miss_reason=handoff_trade_miss_reason(miss_reason, r), - opened_at=opened_at, - closed_at=closed_at, - ) - conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (r["id"],)) - clear_key_sizing_snapshot_if_flat(conn, r["session_date"] or get_trading_day()) - synced += 1 - try: - sync_trade_records_from_exchange(conn, force=True) - except Exception: - pass - return {"ok": True, "synced": synced} + global _RECONCILE_FLAT_STREAK + + return reconcile_hub_external_close_impl( + conn, + symbol, + direction, + exchange_configured=exchange_private_api_configured, + not_configured_msg="未配置 GATE_API_KEY / GATE_API_SECRET", + symbols_match=symbols_match, + get_opened_at_value=get_opened_at_value, + resolve_monitor_exchange_symbol=resolve_monitor_exchange_symbol, + get_live_position_contracts=get_live_position_contracts, + cancel_conditional_orders=cancel_gate_swap_trigger_orders, + resolve_synced_flat_close=resolve_synced_flat_close, + finalize_stopped_monitor=_finalize_hub_flat_monitor, + sync_trade_records=sync_trade_records_from_exchange, + reconcile_flat_streak=_RECONCILE_FLAT_STREAK, + to_ms_with_fallback=_to_ms_with_fallback, + prefer_manual_resolve=True, + order_row_monitor_type=order_row_monitor_type, + ) def reconcile_external_closes(conn, days=None): @@ -5491,7 +5523,7 @@ def _market_open_for_trigger_entry( def _execute_trigger_entry_cross(conn, row): - """标记价触达计划入场:先删监控行防重复触发,再市价开仓。""" + """标记价触达计划入场:加锁防重复触发,成交成功后再删监控行。""" symbol = row["symbol"] direction = (row["direction"] or "long").lower() ex_sym = normalize_exchange_symbol(symbol) @@ -5502,7 +5534,8 @@ def _execute_trigger_entry_cross(conn, row): tc_en, tc_h, _ = time_close_settings_from_row(row) kid = int(row["id"]) - conn.execute("DELETE FROM key_monitors WHERE id=?", (kid,)) + if not acquire_trigger_entry_exec_lock(conn, kid): + return False, "触价开仓进行中" conn.commit() try: @@ -5520,6 +5553,8 @@ def _execute_trigger_entry_cross(conn, row): time_close_hours=tc_h, ) except Exception as e: + release_trigger_entry_exec_lock(conn, kid) + conn.commit() fail_msg = friendly_exchange_error(e) send_wechat_msg( f"# ❌ {symbol} 触价开仓异常\n" @@ -5531,6 +5566,8 @@ def _execute_trigger_entry_cross(conn, row): return False, fail_msg if ok and det: + conn.execute("DELETE FROM key_monitors WHERE id=?", (kid,)) + conn.commit() rr_txt = format_wechat_scalar_2dp(det.get("planned_rr_fill")) if det.get("planned_rr_fill") is not None else "-" msg = ( f"# ✅ {symbol} 触价开仓成交\n" @@ -5547,6 +5584,8 @@ def _execute_trigger_entry_cross(conn, row): send_wechat_msg(msg) insert_key_monitor_history(conn, row, 0, msg, TRIGGER_ENTRY_CLOSE_FILLED) return True, None + release_trigger_entry_exec_lock(conn, kid) + conn.commit() fail_msg = err or "触价触发后开仓失败" send_wechat_msg( f"# ❌ {symbol} 触价开仓失败\n" @@ -5574,6 +5613,8 @@ def check_trigger_entry_key_monitors(): sl = float(_sqlite_row_val(r, "fib_stop_loss") or 0) tp = float(_sqlite_row_val(r, "fib_take_profit") or 0) kid = int(r["id"]) + if is_trigger_entry_in_flight_row(r): + continue if entry <= 0 or sl <= 0 or tp <= 0: _finalize_key_monitor_one_shot(conn, r, "触价计划价位无效", "fib_plan_invalid") continue @@ -6117,7 +6158,7 @@ def check_order_monitors(): new_sl = round_price_to_exchange(ex_sym, new_sl) tp_ex = float(take_profit or 0) ok_live, _live_reason = ensure_exchange_live_ready() - synced_ex = not ok_live + synced_ex = False if ok_live and tp_ex > 0: try: replace_active_monitor_tpsl_on_exchange(r, new_sl, tp_ex) @@ -6526,8 +6567,8 @@ def background_task(): from lib.strategy.strategy_trend_register import check_trend_pullback_plans check_trend_pullback_plans(cfg) - except: - pass + except Exception as e: + print(f"[monitor_loop] {e}", flush=True) time.sleep(MONITOR_POLL_SECONDS) diff --git a/crypto_monitor_okx/app.py b/crypto_monitor_okx/app.py index e0be0bd..99f6214 100644 --- a/crypto_monitor_okx/app.py +++ b/crypto_monitor_okx/app.py @@ -131,6 +131,9 @@ from lib.key_monitor.trigger_entry_key_monitor_lib import ( TRIGGER_ENTRY_VALIDITY_HOURS, check_trigger_entry_intent_limit, count_pending_trigger_entries, + acquire_trigger_entry_exec_lock, + is_trigger_entry_in_flight_row, + release_trigger_entry_exec_lock, is_breakout_trigger_entry_key_monitor_type, is_trigger_entry_expired, is_trigger_entry_key_monitor_type, @@ -2754,15 +2757,39 @@ def place_exchange_order(exchange_symbol, direction, amount, leverage, stop_loss def close_exchange_order(order_row): + """ + 市价全平。数量优先取交易所当前持仓张数,避免仅用入库 order_amount 导致平不干净。 + """ ensure_markets_loaded() exchange_symbol = order_row["exchange_symbol"] or normalize_okx_symbol(order_row["symbol"]) - amount = float(order_row["order_amount"] or 0) - if amount <= 0: - raise ValueError("平仓失败:缺少有效下单数量") direction = order_row["direction"] + db_amt = float(order_row["order_amount"] or 0) side = "sell" if direction == "long" else "buy" - params = build_okx_order_params(direction, reduce_only=True) - return exchange.create_order(exchange_symbol, "market", side, amount, None, params) + last_resp = None + for _ in range(3): + live = get_live_position_contracts(exchange_symbol, direction) + if live is not None and live > 0: + raw_amt = live + else: + raw_amt = db_amt + if raw_amt <= 0: + if last_resp is not None: + return last_resp + raise ValueError("平仓失败:缺少有效下单数量") + try: + amount = float(exchange.amount_to_precision(exchange_symbol, raw_amt)) + except Exception: + amount = float(raw_amt) + if amount <= 0: + if last_resp is not None: + return last_resp + raise ValueError("平仓失败:数量经精度舍入后为 0") + params = build_okx_order_params(direction, reduce_only=True) + last_resp = exchange.create_order(exchange_symbol, "market", side, amount, None, params) + live_after = get_live_position_contracts(exchange_symbol, direction) + if live_after is None or live_after <= 0: + return last_resp + return last_resp def cancel_okx_swap_open_orders(exchange_symbol): @@ -3557,6 +3584,71 @@ def resolve_synced_flat_close(row, opened_at_str, opened_at_ms=None): ) +def _finalize_hub_flat_monitor_okx(conn, r, *, result, pnl_amount, closed_at, miss_reason): + opened_at = get_opened_at_value(r) + closed_at_dt = parse_dt_for_trading_day(closed_at) or app_now() + hold_seconds = calc_hold_seconds(opened_at, closed_at_dt) + session_date = r["session_date"] or get_trading_day(closed_at_dt) + update_session_capital(conn, session_date, pnl_amount) + insert_trade_record( + conn, + symbol=r["symbol"], + monitor_type=trade_record_monitor_type(conn, r), + trend_plan_id=trend_plan_id_from_monitor_row(r), + key_signal_type=order_row_key_signal_type(r), + direction=r["direction"], + trigger_price=r["trigger_price"], + stop_loss=r["stop_loss"], + initial_stop_loss=r["initial_stop_loss"] or r["stop_loss"], + take_profit=r["take_profit"], + margin_capital=r["margin_capital"], + leverage=r["leverage"], + pnl_amount=pnl_amount, + hold_seconds=hold_seconds, + trade_style=r["trade_style"], + risk_amount=r["risk_amount"], + planned_rr=calc_rr_ratio( + r["direction"], + r["trigger_price"], + r["initial_stop_loss"] or r["stop_loss"], + r["take_profit"], + ), + actual_rr=calc_actual_rr(pnl_amount, r["risk_amount"]), + result=result, + miss_reason=handoff_trade_miss_reason(miss_reason, r), + opened_at=opened_at, + closed_at=closed_at, + ) + conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (r["id"],)) + + +def reconcile_hub_external_close(conn, symbol, direction): + from lib.hub.hub_reconcile_flat_lib import reconcile_hub_external_close_impl + from lib.hub.hub_symbol_lib import symbols_match + + global _RECONCILE_FLAT_STREAK + + return reconcile_hub_external_close_impl( + conn, + symbol, + direction, + exchange_configured=exchange_private_api_configured, + not_configured_msg="未配置 OKX_API_KEY / OKX_API_SECRET", + symbols_match=symbols_match, + get_opened_at_value=get_opened_at_value, + resolve_monitor_exchange_symbol=resolve_monitor_exchange_symbol, + get_live_position_contracts=get_live_position_contracts, + cancel_conditional_orders=cancel_okx_swap_open_orders, + resolve_synced_flat_close=resolve_synced_flat_close, + finalize_stopped_monitor=_finalize_hub_flat_monitor_okx, + sync_trade_records=sync_trade_records_from_exchange, + reconcile_flat_streak=_RECONCILE_FLAT_STREAK, + to_ms_with_fallback=_to_ms_with_fallback, + prefer_manual_resolve=False, + order_row_monitor_type=order_row_monitor_type, + ) + + def reconcile_external_closes(conn, days=None): global _RECONCILE_FLAT_STREAK if not exchange_private_api_configured(): @@ -5006,7 +5098,7 @@ def _market_open_for_trigger_entry( def _execute_trigger_entry_cross(conn, row): - """标记价触达计划入场:先删监控行防重复触发,再市价开仓。""" + """标记价触达计划入场:加锁防重复触发,成交成功后再删监控行。""" symbol = row["symbol"] direction = (row["direction"] or "long").lower() ex_sym = normalize_exchange_symbol(symbol) @@ -5017,7 +5109,8 @@ def _execute_trigger_entry_cross(conn, row): tc_en, tc_h, _ = time_close_settings_from_row(row) kid = int(row["id"]) - conn.execute("DELETE FROM key_monitors WHERE id=?", (kid,)) + if not acquire_trigger_entry_exec_lock(conn, kid): + return False, "触价开仓进行中" conn.commit() try: @@ -5035,6 +5128,8 @@ def _execute_trigger_entry_cross(conn, row): time_close_hours=tc_h, ) except Exception as e: + release_trigger_entry_exec_lock(conn, kid) + conn.commit() fail_msg = friendly_exchange_error(e) send_wechat_msg( f"# ❌ {symbol} 触价开仓异常\n" @@ -5046,6 +5141,8 @@ def _execute_trigger_entry_cross(conn, row): return False, fail_msg if ok and det: + conn.execute("DELETE FROM key_monitors WHERE id=?", (kid,)) + conn.commit() rr_txt = format_wechat_scalar_2dp(det.get("planned_rr_fill")) if det.get("planned_rr_fill") is not None else "-" msg = ( f"# ✅ {symbol} 触价开仓成交\n" @@ -5062,6 +5159,8 @@ def _execute_trigger_entry_cross(conn, row): send_wechat_msg(msg) insert_key_monitor_history(conn, row, 0, msg, TRIGGER_ENTRY_CLOSE_FILLED) return True, None + release_trigger_entry_exec_lock(conn, kid) + conn.commit() fail_msg = err or "触价触发后开仓失败" send_wechat_msg( f"# ❌ {symbol} 触价开仓失败\n" @@ -5089,6 +5188,8 @@ def check_trigger_entry_key_monitors(): sl = float(_sqlite_row_val(r, "fib_stop_loss") or 0) tp = float(_sqlite_row_val(r, "fib_take_profit") or 0) kid = int(r["id"]) + if is_trigger_entry_in_flight_row(r): + continue if entry <= 0 or sl <= 0 or tp <= 0: _finalize_key_monitor_one_shot(conn, r, "触价计划价位无效", "fib_plan_invalid") continue @@ -5856,7 +5957,7 @@ def check_order_monitors(): new_sl = round_price_to_exchange(ex_sym, new_sl) tp_ex = float(take_profit or 0) ok_live, _live_reason = ensure_okx_live_ready() - synced_ex = not ok_live + synced_ex = False last_ex_sync = float(_BREAKEVEN_LAST_EX_SYNC.get(pid, 0)) interval_ok = ( time.time() - last_ex_sync @@ -6265,8 +6366,8 @@ def background_task(): from lib.strategy.strategy_trend_register import check_trend_pullback_plans check_trend_pullback_plans(cfg) - except: - pass + except Exception as e: + print(f"[monitor_loop] {e}", flush=True) time.sleep(MONITOR_POLL_SECONDS) @@ -9051,6 +9152,7 @@ try: ohlcv_fn=_hub_fetch_ohlcv, volume_rank_fn=_hub_fetch_volume_rank, market_fn=_hub_fetch_market, + reconcile_hub_flat_fn=reconcile_hub_external_close, risk_status_fn=hub_account_risk_status, user_close_fn=hub_user_initiated_close, render_main_page_fn=render_main_page, diff --git a/lib/hub/hub_reconcile_flat_lib.py b/lib/hub/hub_reconcile_flat_lib.py new file mode 100644 index 0000000..e604789 --- /dev/null +++ b/lib/hub/hub_reconcile_flat_lib.py @@ -0,0 +1,95 @@ +"""Hub 中控市价全平后立即同步 order_monitors(三所共用)。""" +from __future__ import annotations + +import time +from typing import Any, Callable + + +def reconcile_hub_external_close_impl( + conn, + symbol: str, + direction: str, + *, + exchange_configured: Callable[[], bool], + not_configured_msg: str, + symbols_match: Callable[[str, str], bool], + get_opened_at_value: Callable[[Any], str], + resolve_monitor_exchange_symbol: Callable[[Any], str], + get_live_position_contracts: Callable[[str, str], float | None], + cancel_conditional_orders: Callable[[str], None], + resolve_synced_flat_close: Callable[..., tuple], + finalize_stopped_monitor: Callable[..., None], + sync_trade_records: Callable[..., None] | None = None, + reconcile_flat_streak: dict | None = None, + to_ms_with_fallback: Callable[..., int | None] | None = None, + prefer_manual_resolve: bool = False, + order_row_monitor_type: Callable[[Any], str] | None = None, +) -> dict[str, Any]: + if not exchange_configured(): + return {"ok": False, "msg": not_configured_msg, "synced": 0} + sym_req = (symbol or "").strip() + dir_l = (direction or "").strip().lower() + if dir_l not in ("long", "short"): + return {"ok": False, "msg": "side 须为 long 或 short", "synced": 0} + synced = 0 + streak = reconcile_flat_streak if reconcile_flat_streak is not None else {} + rows = conn.execute( + "SELECT * FROM order_monitors WHERE status IN ('active', 'error')" + ).fetchall() + for r in rows: + if not symbols_match(str(r["symbol"] or ""), sym_req): + continue + if (r["direction"] or "").strip().lower() != dir_l: + continue + oid = int(r["id"]) + if r["status"] == "error": + opened_at_chk = get_opened_at_value(r) + mtype = order_row_monitor_type(r) if order_row_monitor_type else r["monitor_type"] + existing = conn.execute( + "SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type=? LIMIT 1", + (r["symbol"], opened_at_chk, mtype), + ).fetchone() + if existing: + conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,)) + synced += 1 + continue + exchange_symbol = resolve_monitor_exchange_symbol(r) + live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) + if live_contracts is None: + continue + if live_contracts > 0: + time.sleep(0.6) + live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) + if live_contracts is None or live_contracts > 0: + continue + streak.pop(oid, None) + cancel_conditional_orders(exchange_symbol) + opened_at = get_opened_at_value(r) + opened_at_ms = None + if to_ms_with_fallback is not None: + keys = r.keys() if hasattr(r, "keys") else () + opened_at_ms = to_ms_with_fallback( + r["opened_at_ms"] if "opened_at_ms" in keys else None, + opened_at, + ) + resolve_kw = {"opened_at_ms": opened_at_ms} + if prefer_manual_resolve: + resolve_kw["prefer_manual"] = True + result, pnl_amount, closed_at, miss_reason = resolve_synced_flat_close( + r, opened_at, **resolve_kw + ) + finalize_stopped_monitor( + conn, + r, + result=result, + pnl_amount=pnl_amount, + closed_at=closed_at, + miss_reason=miss_reason, + ) + synced += 1 + if sync_trade_records is not None: + try: + sync_trade_records(conn, force=True) + except Exception: + pass + return {"ok": True, "synced": synced} diff --git a/lib/key_monitor/trigger_entry_key_monitor_lib.py b/lib/key_monitor/trigger_entry_key_monitor_lib.py index cffe72f..5e7d8d2 100644 --- a/lib/key_monitor/trigger_entry_key_monitor_lib.py +++ b/lib/key_monitor/trigger_entry_key_monitor_lib.py @@ -37,6 +37,34 @@ KEY_ENTRY_REASON_CALLBACK = "关键位回调触价开仓" KEY_ENTRY_REASON_BREAKOUT = "关键位突破触价开仓" KEY_ENTRY_REASON_TRIGGER_LEGACY = "关键位触价开仓" +TRIGGER_ENTRY_IN_FLIGHT_OID = "__trigger_entry_in_flight__" + + +def is_trigger_entry_in_flight_row(row: Any) -> bool: + if row is None: + return False + try: + v = row["fib_limit_order_id"] + except (KeyError, IndexError, TypeError): + v = getattr(row, "fib_limit_order_id", None) + return (v or "").strip() == TRIGGER_ENTRY_IN_FLIGHT_OID + + +def acquire_trigger_entry_exec_lock(conn: Any, monitor_id: int) -> bool: + cur = conn.execute( + "UPDATE key_monitors SET fib_limit_order_id=? WHERE id=? " + "AND (fib_limit_order_id IS NULL OR fib_limit_order_id='')", + (TRIGGER_ENTRY_IN_FLIGHT_OID, int(monitor_id)), + ) + return int(cur.rowcount or 0) == 1 + + +def release_trigger_entry_exec_lock(conn: Any, monitor_id: int) -> None: + conn.execute( + "UPDATE key_monitors SET fib_limit_order_id=NULL WHERE id=? AND fib_limit_order_id=?", + (int(monitor_id), TRIGGER_ENTRY_IN_FLIGHT_OID), + ) + def normalize_trigger_entry_monitor_type(monitor_type: Optional[str]) -> str: mt = (monitor_type or "").strip() diff --git a/lib/strategy/strategy_roll_monitor_lib.py b/lib/strategy/strategy_roll_monitor_lib.py index 64107a2..28a6efd 100644 --- a/lib/strategy/strategy_roll_monitor_lib.py +++ b/lib/strategy/strategy_roll_monitor_lib.py @@ -40,7 +40,8 @@ def check_roll_monitors(cfg: dict[str, Any]) -> None: _reconcile_roll_groups(conn, cfg) _check_pending_roll_legs(conn, cfg) conn.commit() - except Exception: + except Exception as e: + print(f"[roll_monitor] {e}", flush=True) try: conn.rollback() except Exception: @@ -408,7 +409,19 @@ def _execute_pending_roll_leg( return oid = str(order.get("id") or "") if isinstance(order, dict) else "" - cfg["replace_tpsl"](ex_sym, direction, sl, tp0, mon) + try: + cfg["replace_tpsl"](ex_sym, direction, sl, tp0, mon) + except Exception as tpsl_err: + fe = cfg.get("friendly_error") + msg = fe(tpsl_err) if callable(fe) else str(tpsl_err) + conn.execute( + """UPDATE roll_legs SET status='error', exchange_order_id=?, fill_price=?, amount=? + WHERE id=? AND status='pending'""", + (oid, fill, float(amount), leg_id), + ) + _notify_roll_fail(cfg, group, leg, mark, f"加仓成交但止盈止损更新失败: {msg}") + return + conn.execute( """UPDATE roll_legs SET status='filled', fill_price=?, amount=?, exchange_order_id=?, new_stop_loss=? WHERE id=? AND status='pending'""", diff --git a/lib/strategy/strategy_trend_register.py b/lib/strategy/strategy_trend_register.py index 1dfb9c0..30b74f2 100644 --- a/lib/strategy/strategy_trend_register.py +++ b/lib/strategy/strategy_trend_register.py @@ -244,7 +244,7 @@ def _row(cfg, row) -> dict: return cfg["row_to_dict"](row) -def precheck_trend_start(cfg: dict, conn) -> tuple[bool, str]: +def precheck_trend_start(cfg: dict, conn, *, symbol: str = "", direction: str = "long") -> tuple[bool, str]: m = _m(cfg) mode = getattr(m, "POSITION_SIZING_MODE", None) or "risk" try: @@ -255,9 +255,41 @@ def precheck_trend_start(cfg: dict, conn) -> tuple[bool, str]: return False, src_msg except Exception: pass - now = m.app_now() - if not m.trading_day_reset_allows_new_open(now): - return False, f"北京时间 {cfg['reset_hour']}:00 前不允许持仓" + sym = (symbol or "").strip() + dir_l = (direction or "long").strip().lower() + if sym and dir_l in ("long", "short") and hasattr(m, "precheck_risk"): + ok_risk, risk_msg = m.precheck_risk(conn, sym, dir_l) + if not ok_risk: + return False, risk_msg + else: + now = m.app_now() + if not m.trading_day_reset_allows_new_open(now): + return False, f"北京时间 {cfg['reset_hour']}:00 前不允许持仓" + from lib.trade.account_risk_lib import account_risk_blocks_trading, position_limit_reached + + ok_risk, risk_reason = account_risk_blocks_trading( + conn, + trading_day=m.get_trading_day(now), + now=now, + fmt_local_ms=getattr(m, "ms_to_app_local_str", lambda _x: ""), + ) + if not ok_risk: + return False, risk_reason + reached, active_count, mx = position_limit_reached( + conn, max_active_positions=cfg["max_active_positions"] + ) + if reached: + return False, f"已达最大持仓数({active_count}/{mx})" + from lib.trade.daily_open_limit_lib import check_daily_open_hard_limit + + ok_daily, daily_reason, _opens = check_daily_open_hard_limit( + conn, + m.get_trading_day(now), + getattr(m, "DAILY_OPEN_HARD_LIMIT", 0), + cfg["reset_hour"], + ) + if not ok_daily: + return False, daily_reason active = m.get_active_position_count(conn) if active >= cfg["max_active_positions"]: return ( @@ -1604,22 +1636,27 @@ def register_trend_routes(app: Flask, cfg: dict) -> None: def preview_trend_pullback(): conn = get_db() init_strategy_tables(conn) - okp, msg = precheck_trend_start(cfg, conn) - if not okp: - conn.close() - flash(msg) - return _redirect_trend() m = _m(cfg) - ok_live, reason = m.ensure_exchange_live_ready() - if not ok_live: - conn.close() - flash(reason) - return _redirect_trend() payload, err = parse_trend_plan(cfg, request.form) if err: conn.close() flash(err) return _redirect_trend() + okp, msg = precheck_trend_start( + cfg, + conn, + symbol=str(payload.get("symbol") or ""), + direction=str(payload.get("direction") or "long"), + ) + if not okp: + conn.close() + flash(msg) + return _redirect_trend() + ok_live, reason = m.ensure_exchange_live_ready() + if not ok_live: + conn.close() + flash(reason) + return _redirect_trend() pid = str(uuid.uuid4()) exp_ms = int(time.time() * 1000) + cfg["preview_ttl"] * 1000 created = m.app_now_str() @@ -1678,7 +1715,12 @@ def register_trend_routes(app: Flask, cfg: dict) -> None: conn.close() flash("预览已过期或不存在,请重新生成预览") return _redirect_trend() - okp, msg = precheck_trend_start(cfg, conn) + okp, msg = precheck_trend_start( + cfg, + conn, + symbol=str(pr["symbol"] or ""), + direction=str(pr["direction"] or "long"), + ) if not okp: conn.close() flash(msg) @@ -1718,7 +1760,18 @@ def register_trend_routes(app: Flask, cfg: dict) -> None: exchange_symbol, direction, first_amt, leverage, stop_loss=None, take_profit=None ) fill1 = m.resolve_order_entry_price(o1, exchange_symbol, live_price) - trend_refresh_stop_only(cfg, exchange_symbol, direction, stop_loss) + try: + trend_refresh_stop_only(cfg, exchange_symbol, direction, stop_loss) + except Exception as sl_err: + from lib.strategy.strategy_trend_exchange import cancel_symbol_orders, trend_market_close + + try: + pos_qty = m.get_live_position_contracts(exchange_symbol, direction) or first_amt + trend_market_close(cfg, exchange_symbol, direction, float(pos_qty), leverage) + cancel_symbol_orders(cfg, exchange_symbol) + except Exception as close_err: + print(f"[trend_start] compensating close failed: {close_err}", flush=True) + raise sl_err except Exception as e: conn.close() fe = getattr(m, "friendly_exchange_error", lambda x, **k: str(x)) diff --git a/lib/trade/compensating_close_lib.py b/lib/trade/compensating_close_lib.py new file mode 100644 index 0000000..2798b66 --- /dev/null +++ b/lib/trade/compensating_close_lib.py @@ -0,0 +1,16 @@ +"""开仓后挂 TP/SL 失败时的补偿平仓(避免裸仓)。""" +from __future__ import annotations + +from typing import Callable + + +def log_compensating_close_error(prefix: str, exc: BaseException) -> None: + print(f"[{prefix}] {exc}", flush=True) + + +def run_compensating_close(close_fn: Callable[[], None], *, log_prefix: str = "compensating_close") -> None: + """执行补偿平仓;二次失败只打日志,不掩盖原始异常。""" + try: + close_fn() + except Exception as e: + log_compensating_close_error(log_prefix, e) diff --git a/manual_trading_hub/agent.py b/manual_trading_hub/agent.py index 1e37dce..8a57f1c 100644 --- a/manual_trading_hub/agent.py +++ b/manual_trading_hub/agent.py @@ -868,7 +868,7 @@ def emergency_close_position( for p in raw: if not isinstance(p, dict): continue - if (p.get("symbol") or "").strip() != sym: + if not symbols_match(sym, (p.get("symbol") or "").strip()): continue c = _position_contracts(p) if abs(c) < 1e-12: diff --git a/manual_trading_hub/exchange_orders.py b/manual_trading_hub/exchange_orders.py index 7e9ecae..1d2e5d4 100644 --- a/manual_trading_hub/exchange_orders.py +++ b/manual_trading_hub/exchange_orders.py @@ -453,8 +453,11 @@ def cancel_orders_for_symbol( try: cancel_order(ex, exchange_kind, symbol, o["id"], o.get("channel") or "regular") n += 1 - except Exception: - pass + except Exception as e: + print( + f"[cancel_orders_for_symbol] {exchange_kind} {symbol} id={o.get('id')}: {e}", + flush=True, + ) return n @@ -655,7 +658,7 @@ def _gate_place_tp_sl_position( try: ex.privateFuturesPostSettlePriceOrders(_payload(tp_s, tp_rule)) except Exception: - cancel_orders_for_symbol(ex, "gate", symbol, scope="conditional") + # 保留已挂止损,仅放弃本次 TP raise return except Exception as e: diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index 377a44d..b598d41 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -1535,6 +1535,50 @@ async def _notify_instance_user_close( ) +async def _sync_flask_after_position_close( + client: httpx.AsyncClient, + ex: dict, + *, + symbol: str, + side: str, +) -> dict: + """中控/agent 平仓后同步 Flask order_monitors、趋势与滚仓状态。""" + sym = (symbol or "").strip() + side_l = (side or "").strip().lower() + out: dict = {} + if not sym or side_l not in ("long", "short"): + return out + order_sync = await _fetch_flask_json( + client, + ex, + "/api/hub/order/sync-flat", + method="POST", + json_body={"symbol": sym, "side": side_l}, + ) + if isinstance(order_sync, dict): + out["order_sync"] = order_sync + if "trend" in (ex.get("capabilities") or []): + sync_parsed = await _fetch_flask_json( + client, + ex, + "/api/hub/trend/sync-flat", + method="POST", + json_body={"symbol": sym, "side": side_l}, + ) + if isinstance(sync_parsed, dict): + out["trend_sync"] = sync_parsed + roll_sync = await _fetch_flask_json( + client, + ex, + "/api/hub/roll/sync-flat", + method="POST", + json_body={"symbol": sym, "side": side_l}, + ) + if isinstance(roll_sync, dict): + out["roll_sync"] = roll_sync + return out + + def _flask_error_from_hub_mon(hub_mon: dict | None) -> str | None: if not isinstance(hub_mon, dict) or hub_mon.get("ok") is not False: return None @@ -2346,37 +2390,11 @@ async def api_close_position(exchange_id: str, body: ClosePositionBody): "ok": bool(isinstance(payload, dict) and payload.get("ok")), } if out.get("ok"): - ex_key = (ex.get("key") or "").strip().lower() async with httpx.AsyncClient() as flask_client: - if ex_key == "gate": - order_sync = await _fetch_flask_json( - flask_client, - ex, - "/api/hub/order/sync-flat", - method="POST", - json_body={"symbol": sym, "side": side}, - ) - if isinstance(order_sync, dict): - out["order_sync"] = order_sync - if "trend" in (ex.get("capabilities") or []): - sync_parsed = await _fetch_flask_json( - flask_client, - ex, - "/api/hub/trend/sync-flat", - method="POST", - json_body={"symbol": sym, "side": side}, - ) - if isinstance(sync_parsed, dict): - out["trend_sync"] = sync_parsed - roll_sync = await _fetch_flask_json( - flask_client, - ex, - "/api/hub/roll/sync-flat", - method="POST", - json_body={"symbol": sym, "side": side}, + sync_bundle = await _sync_flask_after_position_close( + flask_client, ex, symbol=sym, side=side ) - if isinstance(roll_sync, dict): - out["roll_sync"] = roll_sync + out.update(sync_bundle) risk_sync = await _notify_instance_user_close(flask_client, ex, count=1) if isinstance(risk_sync, dict): out["risk_sync"] = risk_sync @@ -2414,6 +2432,14 @@ async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody): "ok": bool(isinstance(payload, dict) and payload.get("ok")), } if out.get("ok") and (ex.get("flask_url") or "").strip(): + placed = payload.get("placed") if isinstance(payload, dict) else None + sl_sync = body.stop_loss + tp_sync = body.take_profit + if isinstance(placed, dict): + if placed.get("stop_loss") is not None: + sl_sync = placed["stop_loss"] + if placed.get("take_profit") is not None: + tp_sync = placed["take_profit"] async with httpx.AsyncClient() as flask_client: sync_parsed = await _fetch_flask_json( flask_client, @@ -2423,8 +2449,8 @@ async def api_place_tpsl(exchange_id: str, body: PlaceTpslBody): json_body={ "symbol": body.symbol, "side": body.side, - "stop_loss": body.stop_loss, - "take_profit": body.take_profit, + "stop_loss": sl_sync, + "take_profit": tp_sync, }, ) if isinstance(sync_parsed, dict): @@ -2451,9 +2477,19 @@ async def api_close_exchange(exchange_id: str): closed = body.get("closed") or [] n = len(closed) if isinstance(closed, list) else 0 if n > 0: - risk_sync = await _notify_instance_user_close(client, ex, count=n) - if isinstance(risk_sync, dict): - out["risk_sync"] = risk_sync + async with httpx.AsyncClient() as flask_client: + for item in closed: + if not isinstance(item, dict): + continue + sym_i = (item.get("symbol") or "").strip() + side_i = (item.get("side") or "").strip().lower() + if sym_i and side_i in ("long", "short"): + await _sync_flask_after_position_close( + flask_client, ex, symbol=sym_i, side=side_i + ) + risk_sync = await _notify_instance_user_close(flask_client, ex, count=n) + if isinstance(risk_sync, dict): + out["risk_sync"] = risk_sync _schedule_board_refresh() return out @@ -2478,6 +2514,17 @@ async def api_close_all(body: CloseAllBody | None = Body(default=None)): closed = payload.get("closed") or [] n = len(closed) if isinstance(closed, list) else 0 if n > 0: + for item in closed: + if not isinstance(item, dict): + continue + sym_i = (item.get("symbol") or "").strip() + side_i = (item.get("side") or "").strip().lower() + if sym_i and side_i in ("long", "short"): + sync_bundle = await _sync_flask_after_position_close( + client, ex, symbol=sym_i, side=side_i + ) + if sync_bundle: + row["flask_sync"] = sync_bundle risk_sync = await _notify_instance_user_close(client, ex, count=n) if isinstance(risk_sync, dict): row["risk_sync"] = risk_sync