修复币安交易记录

This commit is contained in:
dekun
2026-05-19 18:25:21 +08:00
parent c365a89c47
commit a95ee401bc
4 changed files with 538 additions and 50 deletions
+262 -49
View File
@@ -44,6 +44,10 @@ from fib_key_monitor_lib import (
key_signal_type_for_trade_record,
stored_key_signal_type,
)
from binance_closed_positions_lib import (
match_trade_record_to_position,
rebuild_closed_positions_from_trades,
)
from history_window_lib import (
PRESET_CUSTOM,
PRESET_UTC_LAST24H,
@@ -173,7 +177,10 @@ BINANCE_PNL_INCLUDE_FUNDING = os.getenv("BINANCE_PNL_INCLUDE_FUNDING", "false").
"true",
"yes",
)
# 与币安 App 仓位历史对齐目标误差(USDT)
BINANCE_PNL_MATCH_TOLERANCE = max(0.01, float(os.getenv("BINANCE_PNL_MATCH_TOLERANCE", "0.05")))
_LAST_EXCHANGE_PNL_SYNC_AT = 0.0
_BINANCE_CLOSED_POS_CACHE = {"at": 0.0, "hist": []}
KEY_MONITOR_ALERT_ONLY_TYPES = frozenset({"关键阻力位", "关键支撑位"})
AUTO_TRANSFER_ENABLED = os.getenv("AUTO_TRANSFER_ENABLED", "false").lower() == "true"
AUTO_TRANSFER_AMOUNT = float(os.getenv("AUTO_TRANSFER_AMOUNT", "30"))
@@ -2261,6 +2268,39 @@ def resolve_trade_pnl_amount(
if last_ts and not closed_at_str:
closed_at_str = ms_to_app_local_str(int(last_ts))
close_ms = int(last_ts)
if close_ms and open_ms:
try:
hist = fetch_binance_closed_positions_history(symbols=[sym])
fake = {
"symbol": sym,
"direction": direction,
"opened_at": opened_at_str or (row["opened_at"] if hasattr(row, "keys") else ""),
"closed_at": closed_at_str,
"opened_at_ms": open_ms,
"closed_at_ms": close_ms,
}
pos, _ = match_trade_record_to_position(
fake,
hist,
set(),
unified_symbol_fn=_unified_symbol_for_match,
to_ms_fn=_to_ms_with_fallback,
)
if pos and pos.get("pnl") is not None:
eo = ms_to_app_local_str(pos["open_ms"]) if pos.get("open_ms") else None
ec = ms_to_app_local_str(pos["close_ms"]) if pos.get("close_ms") else None
ep = pos.get("exit_price")
if ep and (exit_price is None or float(exit_price or 0) <= 0):
exit_price = float(ep)
return (
float(pos["pnl"]),
exit_price,
eo,
ec,
pos.get("sync_key"),
)
except Exception:
pass
net, sync_key, eo, ec = fetch_binance_net_pnl_for_trade(
ex_sym, direction, open_ms, close_ms, closing_trades=closing_trades
)
@@ -5409,8 +5449,9 @@ def api_sync_exchange_pnl():
limit = max(1, min(500, int(payload.get("limit"))))
except (TypeError, ValueError):
pass
global _LAST_EXCHANGE_PNL_SYNC_AT
global _LAST_EXCHANGE_PNL_SYNC_AT, _BINANCE_CLOSED_POS_CACHE
_LAST_EXCHANGE_PNL_SYNC_AT = 0.0
_BINANCE_CLOSED_POS_CACHE["at"] = 0.0
conn = get_db()
if force_all:
rows = conn.execute(
@@ -5427,13 +5468,29 @@ def api_sync_exchange_pnl():
""",
(limit,),
).fetchall()
synced = 0
sym_rows = conn.execute(
f"""
SELECT DISTINCT symbol FROM trade_records
WHERE id IN ({",".join("?" * len(rows)) if rows else "NULL"})
""",
tuple(int(r["id"]) for r in rows),
).fetchall() if rows else []
symbols = [sr["symbol"] for sr in sym_rows if sr["symbol"]]
try:
fetch_binance_closed_positions_history(symbols=symbols, force_refresh=True)
except Exception:
pass
sync_trade_records_from_exchange(conn, force=True)
matched = 0
for r in rows:
if sync_trade_record_exchange_pnl(conn, int(r["id"]), commit=False, force=True):
synced += 1
tr = conn.execute(
"SELECT exchange_sync_key FROM trade_records WHERE id=?", (int(r["id"]),)
).fetchone()
if tr and str(tr["exchange_sync_key"] or "").startswith("pos|"):
matched += 1
conn.commit()
conn.close()
return jsonify({"ok": True, "synced": synced, "candidates": len(rows)})
return jsonify({"ok": True, "synced": matched, "candidates": len(rows), "positions": len(_BINANCE_CLOSED_POS_CACHE.get("hist") or [])})
def _coerce_ts_ms(val):
@@ -5463,6 +5520,114 @@ def _unified_symbol_for_match(symbol_str):
return s
def _fetch_my_trades_paginated(exchange_symbol, since_ms, until_ms=None, max_pages=40):
"""分页拉取成交(Binance userTrades)。"""
if not (BINANCE_API_KEY and BINANCE_API_SECRET):
return []
ensure_markets_loaded()
out = []
since = int(since_ms) if since_ms else None
until_ms = int(until_ms) if until_ms else None
for _ in range(max_pages):
try:
batch = exchange.fetch_my_trades(exchange_symbol, since=since, limit=1000)
except Exception:
break
if not batch:
break
last_ts = None
for t in batch:
ts = _coerce_ts_ms(t.get("timestamp"))
if until_ms and ts and ts > until_ms:
continue
if since and ts and ts < since:
continue
out.append(t)
if ts:
last_ts = ts
if len(batch) < 1000:
break
if last_ts is None:
break
since = last_ts + 1
return out
def fetch_binance_closed_positions_history(symbols=None, force_refresh=False):
"""
从成交重建已平仓位对齐 App 仓位历史实现盈亏不含资金费
symbols: 可选 symbol 列表 NEAR/USDT为空则仅返回缓存
"""
global _BINANCE_CLOSED_POS_CACHE
now = time.time()
if (
not force_refresh
and _BINANCE_CLOSED_POS_CACHE["hist"]
and now - float(_BINANCE_CLOSED_POS_CACHE["at"] or 0) < 25.0
and not symbols
):
return list(_BINANCE_CLOSED_POS_CACHE["hist"])
if not exchange_private_api_configured():
return []
sym_list = []
for s in symbols or []:
try:
sym_list.append(normalize_exchange_symbol(s))
except Exception:
continue
if not sym_list:
return list(_BINANCE_CLOSED_POS_CACHE["hist"] or [])
since_ms = exchange_position_sync_since_ms()
until_ms = int(time.time() * 1000) + 120_000
trades_by_symbol = {}
for ex_sym in sym_list:
trades_by_symbol[ex_sym] = _fetch_my_trades_paginated(ex_sym, since_ms, until_ms)
def _contract_size(ex_sym):
try:
ensure_markets_loaded()
return float(exchange.market(ex_sym).get("contractSize") or 1)
except Exception:
return 1.0
hist = rebuild_closed_positions_from_trades(
trades_by_symbol,
unified_symbol_fn=_unified_symbol_for_match,
position_mode=BINANCE_POSITION_MODE,
contract_size_fn=_contract_size,
)
prev = list(_BINANCE_CLOSED_POS_CACHE["hist"] or [])
if prev and not force_refresh:
keys = {h.get("sync_key") for h in prev if h.get("sync_key")}
for h in hist:
sk = h.get("sync_key")
if sk and sk not in keys:
prev.append(h)
hist = sorted(prev, key=lambda x: int(x.get("close_ms") or 0), reverse=True)
_BINANCE_CLOSED_POS_CACHE["at"] = now
_BINANCE_CLOSED_POS_CACHE["hist"] = hist
return hist
def _apply_closed_position_to_trade_record(conn, trade_id, pos):
pnl_val = pos.get("pnl")
if pnl_val is None:
return False
sk = pos.get("sync_key") or ""
eo = ms_to_app_local_str(pos["open_ms"]) if pos.get("open_ms") else None
ec = ms_to_app_local_str(pos["close_ms"]) if pos.get("close_ms") else None
conn.execute(
"""
UPDATE trade_records
SET exchange_realized_pnl = ?, exchange_opened_at = ?, exchange_closed_at = ?,
exchange_sync_key = ?, pnl_amount = ?
WHERE id = ?
""",
(float(pnl_val), eo, ec, sk, float(pnl_val), int(trade_id)),
)
return True
def exchange_position_sync_since_ms():
s = EXCHANGE_POSITION_SYNC_FROM_BJ
if s:
@@ -5572,16 +5737,17 @@ def fetch_binance_net_pnl_for_trade(
def sync_trade_record_exchange_pnl(conn, record_id, commit=True, force=False):
"""单条 trade_records 回填 Binance 净盈亏;成功时同时更新 pnl_amount 便于统计"""
"""单条 trade_records:优先按成交重建的已平仓位匹配(对齐 App 仓位历史)"""
if not exchange_private_api_configured():
return False
tr = conn.execute("SELECT * FROM trade_records WHERE id=?", (int(record_id),)).fetchone()
if not tr:
return False
sk_existing = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or ""
if not force and str(sk_existing).strip():
if not force and str(sk_existing).strip().startswith("pos|"):
return tr["exchange_realized_pnl"] is not None
direction = (tr["direction"] or "long").strip().lower()
if not force and str(sk_existing).strip() and not str(sk_existing).startswith("pos|"):
pass
open_ms = _to_ms_with_fallback(
tr["opened_at_ms"] if "opened_at_ms" in tr.keys() else None, tr["opened_at"]
)
@@ -5590,40 +5756,49 @@ def sync_trade_record_exchange_pnl(conn, record_id, commit=True, force=False):
)
if open_ms is None or close_ms is None:
return False
try:
hist = fetch_binance_closed_positions_history(
symbols=[tr["symbol"]], force_refresh=force
)
except Exception:
hist = []
used = set()
if force:
used = set()
else:
rows = conn.execute(
"SELECT exchange_sync_key FROM trade_records WHERE exchange_sync_key LIKE 'pos|%'"
).fetchall()
for r in rows:
sk = (r["exchange_sync_key"] if "exchange_sync_key" in r.keys() else None) or ""
if sk:
used.add(str(sk).strip())
pos, _ = match_trade_record_to_position(
tr,
hist,
used,
unified_symbol_fn=_unified_symbol_for_match,
to_ms_fn=_to_ms_with_fallback,
)
if pos:
if _apply_closed_position_to_trade_record(conn, int(record_id), pos):
if commit:
try:
conn.commit()
except Exception:
pass
return True
direction = (tr["direction"] or "long").strip().lower()
try:
ex_sym = normalize_exchange_symbol(tr["symbol"])
except Exception:
return False
closing_trades = fetch_closing_fills_for_record(
ex_sym,
direction,
tr["opened_at"],
tr["closed_at"],
opened_at_ms=open_ms,
closed_at_ms=close_ms,
ex_sym, direction, tr["opened_at"], tr["closed_at"], opened_at_ms=open_ms, closed_at_ms=close_ms
)
net, sync_key, eo, ec = fetch_binance_net_pnl_for_trade(
ex_sym, direction, open_ms, close_ms, closing_trades=closing_trades
)
net, sync_key, eo, ec = None, None, None, None
for attempt in range(3):
if attempt:
time.sleep(0.7)
net, sync_key, eo, ec = fetch_binance_net_pnl_for_trade(
ex_sym, direction, open_ms, close_ms, closing_trades=closing_trades
)
if net is not None and sync_key:
break
if net is None:
net = calc_binance_realized_pnl_from_trades(closing_trades)
if net is None:
net = calc_pnl_from_closing_trades(
direction, tr["trigger_price"], closing_trades, ex_sym
)
if net is not None:
try:
ensure_markets_loaded()
cid = exchange.market(ex_sym).get("id") or ex_sym
except Exception:
cid = ex_sym
sync_key = f"fills|{cid}|{direction}|{open_ms}|{close_ms}|{net}"
if net is None or not sync_key:
return False
conn.execute(
@@ -5643,27 +5818,65 @@ def sync_trade_record_exchange_pnl(conn, record_id, commit=True, force=False):
return True
def sync_trade_records_from_exchange(conn):
"""未同步的 trade_records 回填交易所口径净盈亏(Binance:income 流水汇总)。"""
global _LAST_EXCHANGE_PNL_SYNC_AT
def sync_trade_records_from_exchange(conn, force=False):
"""为 trade_records 回填盈亏:成交重建已平仓位 + 时间匹配(对齐 App 仓位历史)。"""
global _LAST_EXCHANGE_PNL_SYNC_AT, _BINANCE_CLOSED_POS_CACHE
if not exchange_private_api_configured():
return
now = time.time()
if now - _LAST_EXCHANGE_PNL_SYNC_AT < 25.0:
if not force and now - _LAST_EXCHANGE_PNL_SYNC_AT < 25.0:
return
candidates = conn.execute(
"""
SELECT id FROM trade_records
WHERE (exchange_sync_key IS NULL OR TRIM(exchange_sync_key) = '')
ORDER BY id DESC
LIMIT 120
"""
).fetchall()
if force:
_BINANCE_CLOSED_POS_CACHE["at"] = 0.0
if force:
candidates = conn.execute(
"SELECT * FROM trade_records ORDER BY id DESC LIMIT 200"
).fetchall()
else:
candidates = conn.execute(
"""
SELECT * FROM trade_records
WHERE (exchange_sync_key IS NULL OR TRIM(exchange_sync_key) = ''
OR exchange_sync_key NOT LIKE 'pos|%')
ORDER BY id DESC
LIMIT 200
"""
).fetchall()
if not candidates:
_LAST_EXCHANGE_PNL_SYNC_AT = now
return
symbols = list({tr["symbol"] for tr in candidates if tr["symbol"]})
try:
hist = fetch_binance_closed_positions_history(symbols=symbols, force_refresh=force)
except Exception:
hist = []
used = set()
for tr in candidates:
sync_trade_record_exchange_pnl(conn, int(tr["id"]), commit=False)
sk0 = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or ""
if sk0 and str(sk0).startswith("pos|"):
used.add(str(sk0).strip())
matched = 0
for tr in candidates:
pos, _ = match_trade_record_to_position(
tr,
hist,
used,
unified_symbol_fn=_unified_symbol_for_match,
to_ms_fn=_to_ms_with_fallback,
)
if not pos:
continue
sk = pos.get("sync_key")
if not sk or sk in used:
continue
if _apply_closed_position_to_trade_record(conn, int(tr["id"]), pos):
used.add(sk)
matched += 1
for tr in candidates:
sk0 = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or ""
if sk0 and str(sk0).startswith("pos|"):
continue
sync_trade_record_exchange_pnl(conn, int(tr["id"]), commit=False, force=force)
_LAST_EXCHANGE_PNL_SYNC_AT = now
try:
conn.commit()