修复交易记录bug

This commit is contained in:
dekun
2026-05-28 22:47:18 +08:00
parent 33042890b5
commit 0b1b5d7672
4 changed files with 308 additions and 21 deletions
+73 -2
View File
@@ -3508,6 +3508,7 @@ def is_no_position_error(err_msg):
"nothing to close", "nothing to close",
"pos size is 0", "pos size is 0",
"position amount is 0", "position amount is 0",
"empty position",
] ]
return any(k in msg for k in keywords) return any(k in msg for k in keywords)
@@ -3943,7 +3944,9 @@ def reconcile_external_closes(conn, days=None):
cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000) cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000)
except Exception: except Exception:
cutoff_ms = None cutoff_ms = None
rows = conn.execute("SELECT * FROM order_monitors WHERE status='active'").fetchall() rows = conn.execute(
"SELECT * FROM order_monitors WHERE status IN ('active', 'error')"
).fetchall()
for r in rows: for r in rows:
if cutoff_ms is not None: if cutoff_ms is not None:
opened_at_v = get_opened_at_value(r) opened_at_v = get_opened_at_value(r)
@@ -3951,6 +3954,17 @@ def reconcile_external_closes(conn, days=None):
# 手动同步按最近 N 天过滤,避免把更早历史单误同步进来 # 手动同步按最近 N 天过滤,避免把更早历史单误同步进来
if opened_ms is None or opened_ms < cutoff_ms: if opened_ms is None or opened_ms < cutoff_ms:
continue continue
oid = int(r["id"])
if r["status"] == "error":
opened_at_chk = get_opened_at_value(r)
existing = conn.execute(
"SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type=? LIMIT 1",
(r["symbol"], opened_at_chk, order_row_monitor_type(r)),
).fetchone()
if existing:
conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,))
synced_count += 1
continue
exchange_symbol = r["exchange_symbol"] or normalize_exchange_symbol(r["symbol"]) exchange_symbol = r["exchange_symbol"] or normalize_exchange_symbol(r["symbol"])
live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"])
if live_contracts is None: if live_contracts is None:
@@ -5386,7 +5400,47 @@ def check_order_monitors():
conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,)) conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,))
conn.commit() conn.commit()
continue continue
conn.execute("UPDATE order_monitors SET status='error' WHERE id=?", (pid,)) ex_sym_fail = r["exchange_symbol"] or normalize_exchange_symbol(sym)
cancel_binance_futures_open_orders(ex_sym_fail)
live_contracts = get_live_position_contracts(ex_sym_fail, direction)
if live_contracts is not None and live_contracts <= 0:
record_res, record_pnl, record_closed, sync_miss = resolve_synced_flat_close(
r, opened_at, opened_at_ms=opened_at_ms
)
record_miss = f"{sync_miss};本地触发{res}时平仓API失败:{e}"
monitor_status = "stopped"
else:
record_res, record_pnl, record_closed = res, pnl_amount, closed_at
record_miss = f"触发{res}后交易所平仓失败(请核对交易所仓位):{e}"
monitor_status = "error"
record_hold = calc_hold_seconds(
opened_at, parse_dt_for_trading_day(record_closed) or now
)
insert_trade_record(
conn,
symbol=sym,
monitor_type=order_row_monitor_type(r),
key_signal_type=order_row_key_signal_type(r),
direction=direction,
trigger_price=trigger_price,
stop_loss=stop_loss,
initial_stop_loss=r["initial_stop_loss"] or stop_loss,
take_profit=take_profit,
margin_capital=margin_capital,
leverage=leverage,
pnl_amount=record_pnl,
hold_seconds=record_hold,
trade_style=r["trade_style"],
risk_amount=r["risk_amount"],
planned_rr=calc_rr_ratio(direction, trigger_price, r["initial_stop_loss"] or stop_loss, take_profit),
actual_rr=calc_actual_rr(record_pnl, r["risk_amount"]),
result=record_res,
miss_reason=record_miss,
opened_at=opened_at,
closed_at=record_closed,
)
session_capital = update_session_capital(conn, session_date, record_pnl)
conn.execute("UPDATE order_monitors SET status=? WHERE id=?", (monitor_status, pid))
conn.commit() conn.commit()
send_wechat_msg( send_wechat_msg(
build_wechat_monitor_error_message( build_wechat_monitor_error_message(
@@ -5396,6 +5450,23 @@ def check_order_monitors():
error_text=str(e), error_text=str(e),
) )
) )
if monitor_status == "stopped":
send_wechat_msg(
build_wechat_close_message(
symbol=sym,
direction=direction,
result=f"{record_res}(已补记入交易记录)",
pnl_amount=record_pnl,
hold_seconds=record_hold,
trigger_price=trigger_price,
current_price=p,
stop_loss=stop_loss,
take_profit=take_profit,
close_order_id="-",
extra_note=record_miss,
session_capital_fallback=session_capital,
)
)
continue continue
cancel_binance_futures_open_orders(r["exchange_symbol"] or normalize_exchange_symbol(sym)) cancel_binance_futures_open_orders(r["exchange_symbol"] or normalize_exchange_symbol(sym))
exit_ref = exit_p if exit_p and float(exit_p) > 0 else p exit_ref = exit_p if exit_p and float(exit_p) > 0 else p
+80 -4
View File
@@ -3383,7 +3383,8 @@ def is_no_position_error(err_msg):
msg = (err_msg or "").lower() msg = (err_msg or "").lower()
keywords = [ keywords = [
"no position", "position does not exist", "position not exist", "no position", "position does not exist", "position not exist",
"pos size is 0", "nothing to close", "reduceonly", "51008" "pos size is 0", "nothing to close", "reduceonly", "51008",
"empty position", "increase_position",
] ]
return any(k in msg for k in keywords) return any(k in msg for k in keywords)
@@ -3884,7 +3885,9 @@ def reconcile_external_closes(conn, days=None):
cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000) cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000)
except Exception: except Exception:
cutoff_ms = None cutoff_ms = None
rows = conn.execute("SELECT * FROM order_monitors WHERE status='active'").fetchall() rows = conn.execute(
"SELECT * FROM order_monitors WHERE status IN ('active', 'error')"
).fetchall()
for r in rows: for r in rows:
if cutoff_ms is not None: if cutoff_ms is not None:
opened_at_v = get_opened_at_value(r) opened_at_v = get_opened_at_value(r)
@@ -3892,15 +3895,26 @@ def reconcile_external_closes(conn, days=None):
# 手动同步按最近 N 天过滤,避免把更早历史单误同步进来 # 手动同步按最近 N 天过滤,避免把更早历史单误同步进来
if opened_ms is None or opened_ms < cutoff_ms: if opened_ms is None or opened_ms < cutoff_ms:
continue continue
oid = int(r["id"])
if r["status"] == "error":
opened_at_chk = get_opened_at_value(r)
existing = conn.execute(
"SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type=? LIMIT 1",
(r["symbol"], opened_at_chk, order_row_monitor_type(r)),
).fetchone()
if existing:
conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,))
synced_count += 1
continue
exchange_symbol = resolve_monitor_exchange_symbol(r) exchange_symbol = resolve_monitor_exchange_symbol(r)
live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"])
oid = int(r["id"])
if live_contracts is None: if live_contracts is None:
_RECONCILE_FLAT_STREAK.pop(oid, None) _RECONCILE_FLAT_STREAK.pop(oid, None)
continue continue
if live_contracts > 0: if live_contracts > 0:
_RECONCILE_FLAT_STREAK.pop(oid, None) _RECONCILE_FLAT_STREAK.pop(oid, None)
continue continue
if r["status"] != "error":
streak = int(_RECONCILE_FLAT_STREAK.get(oid, 0)) + 1 streak = int(_RECONCILE_FLAT_STREAK.get(oid, 0)) + 1
_RECONCILE_FLAT_STREAK[oid] = streak _RECONCILE_FLAT_STREAK[oid] = streak
if streak < RECONCILE_FLAT_CONFIRM_POLLS: if streak < RECONCILE_FLAT_CONFIRM_POLLS:
@@ -3910,6 +3924,11 @@ def reconcile_external_closes(conn, days=None):
f"[reconcile_external_closes] {r['symbol']} id={oid} " f"[reconcile_external_closes] {r['symbol']} id={oid} "
f"flat x{streak} polls -> sync close" f"flat x{streak} polls -> sync close"
) )
else:
_RECONCILE_FLAT_STREAK.pop(oid, None)
print(
f"[reconcile_external_closes] error recovery {r['symbol']} id={oid} flat -> sync close"
)
cancel_gate_swap_trigger_orders(exchange_symbol) cancel_gate_swap_trigger_orders(exchange_symbol)
opened_at = get_opened_at_value(r) opened_at = get_opened_at_value(r)
opened_at_ms = _to_ms_with_fallback(r["opened_at_ms"] if "opened_at_ms" in r.keys() else None, opened_at) opened_at_ms = _to_ms_with_fallback(r["opened_at_ms"] if "opened_at_ms" in r.keys() else None, opened_at)
@@ -5341,7 +5360,47 @@ def check_order_monitors():
conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,)) conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,))
conn.commit() conn.commit()
continue continue
conn.execute("UPDATE order_monitors SET status='error' WHERE id=?", (pid,)) ex_sym_fail = r["exchange_symbol"] or normalize_exchange_symbol(sym)
cancel_gate_swap_trigger_orders(ex_sym_fail)
live_contracts = get_live_position_contracts(ex_sym_fail, direction)
if live_contracts is not None and live_contracts <= 0:
record_res, record_pnl, record_closed, sync_miss = resolve_synced_flat_close(
r, opened_at, opened_at_ms=opened_at_ms
)
record_miss = f"{sync_miss};本地触发{res}时平仓API失败:{e}"
monitor_status = "stopped"
else:
record_res, record_pnl, record_closed = res, pnl_amount, closed_at
record_miss = f"触发{res}后交易所平仓失败(请核对交易所仓位):{e}"
monitor_status = "error"
record_hold = calc_hold_seconds(
opened_at, parse_dt_for_trading_day(record_closed) or now
)
insert_trade_record(
conn,
symbol=sym,
monitor_type=order_row_monitor_type(r),
key_signal_type=order_row_key_signal_type(r),
direction=direction,
trigger_price=trigger_price,
stop_loss=stop_loss,
initial_stop_loss=r["initial_stop_loss"] or stop_loss,
take_profit=take_profit,
margin_capital=margin_capital_for_trade_record(trade_basis_row),
leverage=leverage,
pnl_amount=record_pnl,
hold_seconds=record_hold,
trade_style=r["trade_style"],
risk_amount=r["risk_amount"],
planned_rr=calc_rr_ratio(direction, trigger_price, r["initial_stop_loss"] or stop_loss, take_profit),
actual_rr=calc_actual_rr(record_pnl, r["risk_amount"]),
result=record_res,
miss_reason=record_miss,
opened_at=opened_at,
closed_at=record_closed,
)
session_capital = update_session_capital(conn, session_date, record_pnl)
conn.execute("UPDATE order_monitors SET status=? WHERE id=?", (monitor_status, pid))
conn.commit() conn.commit()
send_wechat_msg( send_wechat_msg(
build_wechat_monitor_error_message( build_wechat_monitor_error_message(
@@ -5351,6 +5410,23 @@ def check_order_monitors():
error_text=str(e), error_text=str(e),
) )
) )
if monitor_status == "stopped":
send_wechat_msg(
build_wechat_close_message(
symbol=sym,
direction=direction,
result=f"{record_res}(已补记入交易记录)",
pnl_amount=record_pnl,
hold_seconds=record_hold,
trigger_price=trigger_price,
current_price=p,
stop_loss=stop_loss,
take_profit=take_profit,
close_order_id="-",
extra_note=record_miss,
session_capital_fallback=session_capital,
)
)
continue continue
cancel_gate_swap_trigger_orders(r["exchange_symbol"] or normalize_exchange_symbol(sym)) cancel_gate_swap_trigger_orders(r["exchange_symbol"] or normalize_exchange_symbol(sym))
session_capital = update_session_capital(conn, session_date, pnl_amount) session_capital = update_session_capital(conn, session_date, pnl_amount)
+73 -3
View File
@@ -3288,7 +3288,8 @@ def is_no_position_error(err_msg):
msg = (err_msg or "").lower() msg = (err_msg or "").lower()
keywords = [ keywords = [
"no position", "position does not exist", "position not exist", "no position", "position does not exist", "position not exist",
"pos size is 0", "nothing to close", "reduceonly", "51008" "pos size is 0", "nothing to close", "reduceonly", "51008",
"empty position", "increase_position",
] ]
return any(k in msg for k in keywords) return any(k in msg for k in keywords)
@@ -3966,7 +3967,9 @@ def reconcile_external_closes(conn, days=None):
cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000) cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000)
except Exception: except Exception:
cutoff_ms = None cutoff_ms = None
rows = conn.execute("SELECT * FROM order_monitors WHERE status='active'").fetchall() rows = conn.execute(
"SELECT * FROM order_monitors WHERE status IN ('active', 'error')"
).fetchall()
for r in rows: for r in rows:
if cutoff_ms is not None: if cutoff_ms is not None:
opened_at_v = get_opened_at_value(r) opened_at_v = get_opened_at_value(r)
@@ -3974,6 +3977,17 @@ def reconcile_external_closes(conn, days=None):
# 手动同步按最近 N 天过滤,避免把更早历史单误同步进来 # 手动同步按最近 N 天过滤,避免把更早历史单误同步进来
if opened_ms is None or opened_ms < cutoff_ms: if opened_ms is None or opened_ms < cutoff_ms:
continue continue
oid = int(r["id"])
if r["status"] == "error":
opened_at_chk = get_opened_at_value(r)
existing = conn.execute(
"SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type='下单监控' LIMIT 1",
(r["symbol"], opened_at_chk),
).fetchone()
if existing:
conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,))
synced_count += 1
continue
exchange_symbol = r["exchange_symbol"] or normalize_exchange_symbol(r["symbol"]) exchange_symbol = r["exchange_symbol"] or normalize_exchange_symbol(r["symbol"])
live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"])
if live_contracts is None: if live_contracts is None:
@@ -4933,7 +4947,46 @@ def check_order_monitors():
conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,)) conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,))
conn.commit() conn.commit()
continue continue
conn.execute("UPDATE order_monitors SET status='error' WHERE id=?", (pid,)) ex_sym_fail = r["exchange_symbol"] or normalize_exchange_symbol(sym)
cancel_gate_swap_trigger_orders(ex_sym_fail)
live_contracts = get_live_position_contracts(ex_sym_fail, direction)
if live_contracts is not None and live_contracts <= 0:
record_res, record_pnl, record_closed, sync_miss = resolve_synced_flat_close(
r, opened_at, opened_at_ms=opened_at_ms
)
record_miss = f"{sync_miss};本地触发{res}时平仓API失败:{e}"
monitor_status = "stopped"
else:
record_res, record_pnl, record_closed = res, pnl_amount, closed_at
record_miss = f"触发{res}后交易所平仓失败(请核对交易所仓位):{e}"
monitor_status = "error"
record_hold = calc_hold_seconds(
opened_at, parse_dt_for_trading_day(record_closed) or now
)
insert_trade_record(
conn,
symbol=sym,
monitor_type="下单监控",
direction=direction,
trigger_price=trigger_price,
stop_loss=stop_loss,
initial_stop_loss=r["initial_stop_loss"] or stop_loss,
take_profit=take_profit,
margin_capital=margin_capital,
leverage=leverage,
pnl_amount=record_pnl,
hold_seconds=record_hold,
trade_style=r["trade_style"],
risk_amount=r["risk_amount"],
planned_rr=calc_rr_ratio(direction, trigger_price, r["initial_stop_loss"] or stop_loss, take_profit),
actual_rr=calc_actual_rr(record_pnl, r["risk_amount"]),
result=record_res,
miss_reason=record_miss,
opened_at=opened_at,
closed_at=record_closed,
)
session_capital = update_session_capital(conn, session_date, record_pnl)
conn.execute("UPDATE order_monitors SET status=? WHERE id=?", (monitor_status, pid))
conn.commit() conn.commit()
send_wechat_msg( send_wechat_msg(
build_wechat_monitor_error_message( build_wechat_monitor_error_message(
@@ -4943,6 +4996,23 @@ def check_order_monitors():
error_text=str(e), error_text=str(e),
) )
) )
if monitor_status == "stopped":
send_wechat_msg(
build_wechat_close_message(
symbol=sym,
direction=direction,
result=f"{record_res}(已补记入交易记录)",
pnl_amount=record_pnl,
hold_seconds=record_hold,
trigger_price=trigger_price,
current_price=p,
stop_loss=stop_loss,
take_profit=take_profit,
close_order_id="-",
extra_note=record_miss,
session_capital_fallback=session_capital,
)
)
continue continue
cancel_gate_swap_trigger_orders(r["exchange_symbol"] or normalize_exchange_symbol(sym)) cancel_gate_swap_trigger_orders(r["exchange_symbol"] or normalize_exchange_symbol(sym))
session_capital = update_session_capital(conn, session_date, pnl_amount) session_capital = update_session_capital(conn, session_date, pnl_amount)
+73 -3
View File
@@ -2952,7 +2952,8 @@ def is_no_position_error(err_msg):
msg = (err_msg or "").lower() msg = (err_msg or "").lower()
keywords = [ keywords = [
"no position", "position does not exist", "position not exist", "no position", "position does not exist", "position not exist",
"pos size is 0", "nothing to close", "reduceonly", "51008" "pos size is 0", "nothing to close", "reduceonly", "51008",
"empty position", "increase_position",
] ]
return any(k in msg for k in keywords) return any(k in msg for k in keywords)
@@ -3264,7 +3265,9 @@ def reconcile_external_closes(conn, days=None):
cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000) cutoff_ms = int((app_now() - timedelta(days=d)).timestamp() * 1000)
except Exception: except Exception:
cutoff_ms = None cutoff_ms = None
rows = conn.execute("SELECT * FROM order_monitors WHERE status='active'").fetchall() rows = conn.execute(
"SELECT * FROM order_monitors WHERE status IN ('active', 'error')"
).fetchall()
for r in rows: for r in rows:
if cutoff_ms is not None: if cutoff_ms is not None:
opened_at_v = get_opened_at_value(r) opened_at_v = get_opened_at_value(r)
@@ -3272,6 +3275,17 @@ def reconcile_external_closes(conn, days=None):
# 手动同步按最近 N 天过滤,避免把更早历史单误同步进来 # 手动同步按最近 N 天过滤,避免把更早历史单误同步进来
if opened_ms is None or opened_ms < cutoff_ms: if opened_ms is None or opened_ms < cutoff_ms:
continue continue
oid = int(r["id"])
if r["status"] == "error":
opened_at_chk = get_opened_at_value(r)
existing = conn.execute(
"SELECT id FROM trade_records WHERE symbol=? AND opened_at=? AND monitor_type=? LIMIT 1",
(r["symbol"], opened_at_chk, order_row_monitor_type(r)),
).fetchone()
if existing:
conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (oid,))
synced_count += 1
continue
exchange_symbol = r["exchange_symbol"] or normalize_okx_symbol(r["symbol"]) exchange_symbol = r["exchange_symbol"] or normalize_okx_symbol(r["symbol"])
live_contracts = get_live_position_contracts(exchange_symbol, r["direction"]) live_contracts = get_live_position_contracts(exchange_symbol, r["direction"])
if live_contracts is None: if live_contracts is None:
@@ -4869,7 +4883,46 @@ def check_order_monitors():
conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,)) conn.execute("UPDATE order_monitors SET status='stopped' WHERE id=?", (pid,))
conn.commit() conn.commit()
continue continue
conn.execute("UPDATE order_monitors SET status='error' WHERE id=?", (pid,)) ex_sym_fail = r["exchange_symbol"] or normalize_okx_symbol(sym)
live_contracts = get_live_position_contracts(ex_sym_fail, direction)
if live_contracts is not None and live_contracts <= 0:
record_res, record_pnl, record_closed, sync_miss = resolve_synced_flat_close(
r, opened_at, opened_at_ms=opened_at_ms
)
record_miss = f"{sync_miss};本地触发{res}时平仓API失败:{e}"
monitor_status = "stopped"
else:
record_res, record_pnl, record_closed = res, pnl_amount, closed_at
record_miss = f"触发{res}后交易所平仓失败(请核对交易所仓位):{e}"
monitor_status = "error"
record_hold = calc_hold_seconds(
opened_at, parse_dt_for_trading_day(record_closed) or now
)
insert_trade_record(
conn,
symbol=sym,
monitor_type=order_row_monitor_type(r),
key_signal_type=order_row_key_signal_type(r),
direction=direction,
trigger_price=trigger_price,
stop_loss=stop_loss,
initial_stop_loss=r["initial_stop_loss"] or stop_loss,
take_profit=take_profit,
margin_capital=margin_capital,
leverage=leverage,
pnl_amount=record_pnl,
hold_seconds=record_hold,
trade_style=r["trade_style"],
risk_amount=r["risk_amount"],
planned_rr=calc_rr_ratio(direction, trigger_price, r["initial_stop_loss"] or stop_loss, take_profit),
actual_rr=calc_actual_rr(record_pnl, r["risk_amount"]),
result=record_res,
miss_reason=record_miss,
opened_at=opened_at,
closed_at=record_closed,
)
session_capital = update_session_capital(conn, session_date, record_pnl)
conn.execute("UPDATE order_monitors SET status=? WHERE id=?", (monitor_status, pid))
conn.commit() conn.commit()
send_wechat_msg( send_wechat_msg(
build_wechat_monitor_error_message( build_wechat_monitor_error_message(
@@ -4879,6 +4932,23 @@ def check_order_monitors():
error_text=str(e), error_text=str(e),
) )
) )
if monitor_status == "stopped":
send_wechat_msg(
build_wechat_close_message(
symbol=sym,
direction=direction,
result=f"{record_res}(已补记入交易记录)",
pnl_amount=record_pnl,
hold_seconds=record_hold,
trigger_price=trigger_price,
current_price=p,
stop_loss=stop_loss,
take_profit=take_profit,
close_order_id="-",
extra_note=record_miss,
session_capital_fallback=session_capital,
)
)
continue continue
session_capital = update_session_capital(conn, session_date, pnl_amount) session_capital = update_session_capital(conn, session_date, pnl_amount)
send_wechat_msg( send_wechat_msg(