From 0b1b5d7672a1be6ef9492c4c02e199ee23d6a1fc Mon Sep 17 00:00:00 2001 From: dekun Date: Thu, 28 May 2026 22:47:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E4=BA=A4=E6=98=93=E8=AE=B0?= =?UTF-8?q?=E5=BD=95bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crypto_monitor_binance/app.py | 75 +++++++++++++++++++++++- crypto_monitor_gate/app.py | 102 ++++++++++++++++++++++++++++----- crypto_monitor_gate_bot/app.py | 76 +++++++++++++++++++++++- crypto_monitor_okx/app.py | 76 +++++++++++++++++++++++- 4 files changed, 308 insertions(+), 21 deletions(-) diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 1804a54..48f4d66 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -3508,6 +3508,7 @@ def is_no_position_error(err_msg): "nothing to close", "pos size is 0", "position amount is 0", + "empty position", ] return any(k in msg for k in keywords) @@ -3943,7 +3944,9 @@ def reconcile_external_closes(conn, days=None): cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000) except Exception: cutoff_ms = None - rows = conn.execute("SELECT * FROM order_monitors WHERE status='active'").fetchall() + rows = conn.execute( + "SELECT * FROM order_monitors WHERE status IN ('active', 'error')" + ).fetchall() for r in rows: if cutoff_ms is not None: opened_at_v = get_opened_at_value(r) @@ -3951,6 +3954,17 @@ def reconcile_external_closes(conn, days=None): # 手动同步按最近 N 天过滤,避免把更早历史单误同步进来 if opened_ms is None or opened_ms < cutoff_ms: continue + oid = int(r["id"]) + if r["status"] == "error": + opened_at_chk = get_opened_at_value(r) + existing = conn.execute( + "SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type=? LIMIT 1", + (r["symbol"], opened_at_chk, order_row_monitor_type(r)), + ).fetchone() + if existing: + conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,)) + synced_count += 1 + continue exchange_symbol = r["exchange_symbol"] or normalize_exchange_symbol(r["symbol"]) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) if live_contracts is None: @@ -5386,7 +5400,47 @@ def check_order_monitors(): conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,)) conn.commit() continue - conn.execute("UPDATE order_monitors SET status='error' WHERE id=?", (pid,)) + ex_sym_fail = r["exchange_symbol"] or normalize_exchange_symbol(sym) + cancel_binance_futures_open_orders(ex_sym_fail) + live_contracts = get_live_position_contracts(ex_sym_fail, direction) + if live_contracts is not None and live_contracts <= 0: + record_res, record_pnl, record_closed, sync_miss = resolve_synced_flat_close( + r, opened_at, opened_at_ms=opened_at_ms + ) + record_miss = f"{sync_miss};本地触发{res}时平仓API失败:{e}" + monitor_status = "stopped" + else: + record_res, record_pnl, record_closed = res, pnl_amount, closed_at + record_miss = f"触发{res}后交易所平仓失败(请核对交易所仓位):{e}" + monitor_status = "error" + record_hold = calc_hold_seconds( + opened_at, parse_dt_for_trading_day(record_closed) or now + ) + insert_trade_record( + conn, + symbol=sym, + monitor_type=order_row_monitor_type(r), + key_signal_type=order_row_key_signal_type(r), + direction=direction, + trigger_price=trigger_price, + stop_loss=stop_loss, + initial_stop_loss=r["initial_stop_loss"] or stop_loss, + take_profit=take_profit, + margin_capital=margin_capital, + leverage=leverage, + pnl_amount=record_pnl, + hold_seconds=record_hold, + trade_style=r["trade_style"], + risk_amount=r["risk_amount"], + planned_rr=calc_rr_ratio(direction, trigger_price, r["initial_stop_loss"] or stop_loss, take_profit), + actual_rr=calc_actual_rr(record_pnl, r["risk_amount"]), + result=record_res, + miss_reason=record_miss, + opened_at=opened_at, + closed_at=record_closed, + ) + session_capital = update_session_capital(conn, session_date, record_pnl) + conn.execute("UPDATE order_monitors SET status=? WHERE id=?", (monitor_status, pid)) conn.commit() send_wechat_msg( build_wechat_monitor_error_message( @@ -5396,6 +5450,23 @@ def check_order_monitors(): error_text=str(e), ) ) + if monitor_status == "stopped": + send_wechat_msg( + build_wechat_close_message( + symbol=sym, + direction=direction, + result=f"{record_res}(已补记入交易记录)", + pnl_amount=record_pnl, + hold_seconds=record_hold, + trigger_price=trigger_price, + current_price=p, + stop_loss=stop_loss, + take_profit=take_profit, + close_order_id="-", + extra_note=record_miss, + session_capital_fallback=session_capital, + ) + ) continue cancel_binance_futures_open_orders(r["exchange_symbol"] or normalize_exchange_symbol(sym)) exit_ref = exit_p if exit_p and float(exit_p) > 0 else p diff --git a/crypto_monitor_gate/app.py b/crypto_monitor_gate/app.py index c23a962..216d6db 100644 --- a/crypto_monitor_gate/app.py +++ b/crypto_monitor_gate/app.py @@ -3383,7 +3383,8 @@ def is_no_position_error(err_msg): msg = (err_msg or "").lower() keywords = [ "no position", "position does not exist", "position not exist", - "pos size is 0", "nothing to close", "reduceonly", "51008" + "pos size is 0", "nothing to close", "reduceonly", "51008", + "empty position", "increase_position", ] return any(k in msg for k in keywords) @@ -3884,7 +3885,9 @@ def reconcile_external_closes(conn, days=None): cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000) except Exception: cutoff_ms = None - rows = conn.execute("SELECT * FROM order_monitors WHERE status='active'").fetchall() + rows = conn.execute( + "SELECT * FROM order_monitors WHERE status IN ('active', 'error')" + ).fetchall() for r in rows: if cutoff_ms is not None: opened_at_v = get_opened_at_value(r) @@ -3892,24 +3895,40 @@ def reconcile_external_closes(conn, days=None): # 手动同步按最近 N 天过滤,避免把更早历史单误同步进来 if opened_ms is None or opened_ms < cutoff_ms: continue + oid = int(r["id"]) + if r["status"] == "error": + opened_at_chk = get_opened_at_value(r) + existing = conn.execute( + "SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type=? LIMIT 1", + (r["symbol"], opened_at_chk, order_row_monitor_type(r)), + ).fetchone() + if existing: + conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,)) + synced_count += 1 + continue exchange_symbol = resolve_monitor_exchange_symbol(r) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) - oid = int(r["id"]) if live_contracts is None: _RECONCILE_FLAT_STREAK.pop(oid, None) continue if live_contracts > 0: _RECONCILE_FLAT_STREAK.pop(oid, None) continue - streak = int(_RECONCILE_FLAT_STREAK.get(oid, 0)) + 1 - _RECONCILE_FLAT_STREAK[oid] = streak - if streak < RECONCILE_FLAT_CONFIRM_POLLS: - continue - _RECONCILE_FLAT_STREAK.pop(oid, None) - print( - f"[reconcile_external_closes] {r['symbol']} id={oid} " - f"flat x{streak} polls -> sync close" - ) + if r["status"] != "error": + streak = int(_RECONCILE_FLAT_STREAK.get(oid, 0)) + 1 + _RECONCILE_FLAT_STREAK[oid] = streak + if streak < RECONCILE_FLAT_CONFIRM_POLLS: + continue + _RECONCILE_FLAT_STREAK.pop(oid, None) + print( + f"[reconcile_external_closes] {r['symbol']} id={oid} " + f"flat x{streak} polls -> sync close" + ) + else: + _RECONCILE_FLAT_STREAK.pop(oid, None) + print( + f"[reconcile_external_closes] error recovery {r['symbol']} id={oid} flat -> sync close" + ) cancel_gate_swap_trigger_orders(exchange_symbol) opened_at = get_opened_at_value(r) opened_at_ms = _to_ms_with_fallback(r["opened_at_ms"] if "opened_at_ms" in r.keys() else None, opened_at) @@ -5341,7 +5360,47 @@ def check_order_monitors(): conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,)) conn.commit() continue - conn.execute("UPDATE order_monitors SET status='error' WHERE id=?", (pid,)) + ex_sym_fail = r["exchange_symbol"] or normalize_exchange_symbol(sym) + cancel_gate_swap_trigger_orders(ex_sym_fail) + live_contracts = get_live_position_contracts(ex_sym_fail, direction) + if live_contracts is not None and live_contracts <= 0: + record_res, record_pnl, record_closed, sync_miss = resolve_synced_flat_close( + r, opened_at, opened_at_ms=opened_at_ms + ) + record_miss = f"{sync_miss};本地触发{res}时平仓API失败:{e}" + monitor_status = "stopped" + else: + record_res, record_pnl, record_closed = res, pnl_amount, closed_at + record_miss = f"触发{res}后交易所平仓失败(请核对交易所仓位):{e}" + monitor_status = "error" + record_hold = calc_hold_seconds( + opened_at, parse_dt_for_trading_day(record_closed) or now + ) + insert_trade_record( + conn, + symbol=sym, + monitor_type=order_row_monitor_type(r), + key_signal_type=order_row_key_signal_type(r), + direction=direction, + trigger_price=trigger_price, + stop_loss=stop_loss, + initial_stop_loss=r["initial_stop_loss"] or stop_loss, + take_profit=take_profit, + margin_capital=margin_capital_for_trade_record(trade_basis_row), + leverage=leverage, + pnl_amount=record_pnl, + hold_seconds=record_hold, + trade_style=r["trade_style"], + risk_amount=r["risk_amount"], + planned_rr=calc_rr_ratio(direction, trigger_price, r["initial_stop_loss"] or stop_loss, take_profit), + actual_rr=calc_actual_rr(record_pnl, r["risk_amount"]), + result=record_res, + miss_reason=record_miss, + opened_at=opened_at, + closed_at=record_closed, + ) + session_capital = update_session_capital(conn, session_date, record_pnl) + conn.execute("UPDATE order_monitors SET status=? WHERE id=?", (monitor_status, pid)) conn.commit() send_wechat_msg( build_wechat_monitor_error_message( @@ -5351,6 +5410,23 @@ def check_order_monitors(): error_text=str(e), ) ) + if monitor_status == "stopped": + send_wechat_msg( + build_wechat_close_message( + symbol=sym, + direction=direction, + result=f"{record_res}(已补记入交易记录)", + pnl_amount=record_pnl, + hold_seconds=record_hold, + trigger_price=trigger_price, + current_price=p, + stop_loss=stop_loss, + take_profit=take_profit, + close_order_id="-", + extra_note=record_miss, + session_capital_fallback=session_capital, + ) + ) continue cancel_gate_swap_trigger_orders(r["exchange_symbol"] or normalize_exchange_symbol(sym)) session_capital = update_session_capital(conn, session_date, pnl_amount) diff --git a/crypto_monitor_gate_bot/app.py b/crypto_monitor_gate_bot/app.py index d0d6044..b1aac99 100644 --- a/crypto_monitor_gate_bot/app.py +++ b/crypto_monitor_gate_bot/app.py @@ -3288,7 +3288,8 @@ def is_no_position_error(err_msg): msg = (err_msg or "").lower() keywords = [ "no position", "position does not exist", "position not exist", - "pos size is 0", "nothing to close", "reduceonly", "51008" + "pos size is 0", "nothing to close", "reduceonly", "51008", + "empty position", "increase_position", ] return any(k in msg for k in keywords) @@ -3966,7 +3967,9 @@ def reconcile_external_closes(conn, days=None): cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000) except Exception: cutoff_ms = None - rows = conn.execute("SELECT * FROM order_monitors WHERE status='active'").fetchall() + rows = conn.execute( + "SELECT * FROM order_monitors WHERE status IN ('active', 'error')" + ).fetchall() for r in rows: if cutoff_ms is not None: opened_at_v = get_opened_at_value(r) @@ -3974,6 +3977,17 @@ def reconcile_external_closes(conn, days=None): # 手动同步按最近 N 天过滤,避免把更早历史单误同步进来 if opened_ms is None or opened_ms < cutoff_ms: continue + oid = int(r["id"]) + if r["status"] == "error": + opened_at_chk = get_opened_at_value(r) + existing = conn.execute( + "SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type='下单监控' LIMIT 1", + (r["symbol"], opened_at_chk), + ).fetchone() + if existing: + conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,)) + synced_count += 1 + continue exchange_symbol = r["exchange_symbol"] or normalize_exchange_symbol(r["symbol"]) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) if live_contracts is None: @@ -4933,7 +4947,46 @@ def check_order_monitors(): conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,)) conn.commit() continue - conn.execute("UPDATE order_monitors SET status='error' WHERE id=?", (pid,)) + ex_sym_fail = r["exchange_symbol"] or normalize_exchange_symbol(sym) + cancel_gate_swap_trigger_orders(ex_sym_fail) + live_contracts = get_live_position_contracts(ex_sym_fail, direction) + if live_contracts is not None and live_contracts <= 0: + record_res, record_pnl, record_closed, sync_miss = resolve_synced_flat_close( + r, opened_at, opened_at_ms=opened_at_ms + ) + record_miss = f"{sync_miss};本地触发{res}时平仓API失败:{e}" + monitor_status = "stopped" + else: + record_res, record_pnl, record_closed = res, pnl_amount, closed_at + record_miss = f"触发{res}后交易所平仓失败(请核对交易所仓位):{e}" + monitor_status = "error" + record_hold = calc_hold_seconds( + opened_at, parse_dt_for_trading_day(record_closed) or now + ) + insert_trade_record( + conn, + symbol=sym, + monitor_type="下单监控", + direction=direction, + trigger_price=trigger_price, + stop_loss=stop_loss, + initial_stop_loss=r["initial_stop_loss"] or stop_loss, + take_profit=take_profit, + margin_capital=margin_capital, + leverage=leverage, + pnl_amount=record_pnl, + hold_seconds=record_hold, + trade_style=r["trade_style"], + risk_amount=r["risk_amount"], + planned_rr=calc_rr_ratio(direction, trigger_price, r["initial_stop_loss"] or stop_loss, take_profit), + actual_rr=calc_actual_rr(record_pnl, r["risk_amount"]), + result=record_res, + miss_reason=record_miss, + opened_at=opened_at, + closed_at=record_closed, + ) + session_capital = update_session_capital(conn, session_date, record_pnl) + conn.execute("UPDATE order_monitors SET status=? WHERE id=?", (monitor_status, pid)) conn.commit() send_wechat_msg( build_wechat_monitor_error_message( @@ -4943,6 +4996,23 @@ def check_order_monitors(): error_text=str(e), ) ) + if monitor_status == "stopped": + send_wechat_msg( + build_wechat_close_message( + symbol=sym, + direction=direction, + result=f"{record_res}(已补记入交易记录)", + pnl_amount=record_pnl, + hold_seconds=record_hold, + trigger_price=trigger_price, + current_price=p, + stop_loss=stop_loss, + take_profit=take_profit, + close_order_id="-", + extra_note=record_miss, + session_capital_fallback=session_capital, + ) + ) continue cancel_gate_swap_trigger_orders(r["exchange_symbol"] or normalize_exchange_symbol(sym)) session_capital = update_session_capital(conn, session_date, pnl_amount) diff --git a/crypto_monitor_okx/app.py b/crypto_monitor_okx/app.py index e5b9997..c115065 100644 --- a/crypto_monitor_okx/app.py +++ b/crypto_monitor_okx/app.py @@ -2952,7 +2952,8 @@ def is_no_position_error(err_msg): msg = (err_msg or "").lower() keywords = [ "no position", "position does not exist", "position not exist", - "pos size is 0", "nothing to close", "reduceonly", "51008" + "pos size is 0", "nothing to close", "reduceonly", "51008", + "empty position", "increase_position", ] return any(k in msg for k in keywords) @@ -3264,7 +3265,9 @@ def reconcile_external_closes(conn, days=None): cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000) except Exception: cutoff_ms = None - rows = conn.execute("SELECT * FROM order_monitors WHERE status='active'").fetchall() + rows = conn.execute( + "SELECT * FROM order_monitors WHERE status IN ('active', 'error')" + ).fetchall() for r in rows: if cutoff_ms is not None: opened_at_v = get_opened_at_value(r) @@ -3272,6 +3275,17 @@ def reconcile_external_closes(conn, days=None): # 手动同步按最近 N 天过滤,避免把更早历史单误同步进来 if opened_ms is None or opened_ms < cutoff_ms: continue + oid = int(r["id"]) + if r["status"] == "error": + opened_at_chk = get_opened_at_value(r) + existing = conn.execute( + "SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type=? LIMIT 1", + (r["symbol"], opened_at_chk, order_row_monitor_type(r)), + ).fetchone() + if existing: + conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,)) + synced_count += 1 + continue exchange_symbol = r["exchange_symbol"] or normalize_okx_symbol(r["symbol"]) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) if live_contracts is None: @@ -4869,7 +4883,46 @@ def check_order_monitors(): conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,)) conn.commit() continue - conn.execute("UPDATE order_monitors SET status='error' WHERE id=?", (pid,)) + ex_sym_fail = r["exchange_symbol"] or normalize_okx_symbol(sym) + live_contracts = get_live_position_contracts(ex_sym_fail, direction) + if live_contracts is not None and live_contracts <= 0: + record_res, record_pnl, record_closed, sync_miss = resolve_synced_flat_close( + r, opened_at, opened_at_ms=opened_at_ms + ) + record_miss = f"{sync_miss};本地触发{res}时平仓API失败:{e}" + monitor_status = "stopped" + else: + record_res, record_pnl, record_closed = res, pnl_amount, closed_at + record_miss = f"触发{res}后交易所平仓失败(请核对交易所仓位):{e}" + monitor_status = "error" + record_hold = calc_hold_seconds( + opened_at, parse_dt_for_trading_day(record_closed) or now + ) + insert_trade_record( + conn, + symbol=sym, + monitor_type=order_row_monitor_type(r), + key_signal_type=order_row_key_signal_type(r), + direction=direction, + trigger_price=trigger_price, + stop_loss=stop_loss, + initial_stop_loss=r["initial_stop_loss"] or stop_loss, + take_profit=take_profit, + margin_capital=margin_capital, + leverage=leverage, + pnl_amount=record_pnl, + hold_seconds=record_hold, + trade_style=r["trade_style"], + risk_amount=r["risk_amount"], + planned_rr=calc_rr_ratio(direction, trigger_price, r["initial_stop_loss"] or stop_loss, take_profit), + actual_rr=calc_actual_rr(record_pnl, r["risk_amount"]), + result=record_res, + miss_reason=record_miss, + opened_at=opened_at, + closed_at=record_closed, + ) + session_capital = update_session_capital(conn, session_date, record_pnl) + conn.execute("UPDATE order_monitors SET status=? WHERE id=?", (monitor_status, pid)) conn.commit() send_wechat_msg( build_wechat_monitor_error_message( @@ -4879,6 +4932,23 @@ def check_order_monitors(): error_text=str(e), ) ) + if monitor_status == "stopped": + send_wechat_msg( + build_wechat_close_message( + symbol=sym, + direction=direction, + result=f"{record_res}(已补记入交易记录)", + pnl_amount=record_pnl, + hold_seconds=record_hold, + trigger_price=trigger_price, + current_price=p, + stop_loss=stop_loss, + take_profit=take_profit, + close_order_id="-", + extra_note=record_miss, + session_capital_fallback=session_capital, + ) + ) continue session_capital = update_session_capital(conn, session_date, pnl_amount) send_wechat_msg(