修复币安交易记录
This commit is contained in:
@@ -5480,32 +5480,25 @@ def api_sync_exchange_pnl():
|
|||||||
fetch_binance_closed_positions_history(symbols=symbols, force_refresh=True)
|
fetch_binance_closed_positions_history(symbols=symbols, force_refresh=True)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
sync_trade_records_from_exchange(conn, force=True)
|
updated = sync_trade_records_from_exchange(conn, force=True)
|
||||||
matched = 0
|
|
||||||
for r in rows:
|
|
||||||
tr = conn.execute(
|
|
||||||
"SELECT exchange_realized_pnl, exchange_sync_key FROM trade_records WHERE id=?",
|
|
||||||
(int(r["id"]),),
|
|
||||||
).fetchone()
|
|
||||||
if tr and tr["exchange_realized_pnl"] is not None:
|
|
||||||
matched += 1
|
|
||||||
conn.commit()
|
conn.commit()
|
||||||
conn.close()
|
conn.close()
|
||||||
tc = _BINANCE_CLOSED_POS_CACHE.get("trade_counts") or {}
|
tc = _BINANCE_CLOSED_POS_CACHE.get("trade_counts") or {}
|
||||||
trade_n = sum(int(v) for v in tc.values())
|
trade_n = sum(int(v) for v in tc.values())
|
||||||
pos_n = len(_BINANCE_CLOSED_POS_CACHE.get("hist") or [])
|
pos_n = len(_BINANCE_CLOSED_POS_CACHE.get("hist") or [])
|
||||||
err = _BINANCE_CLOSED_POS_CACHE.get("last_err") or {}
|
err = _BINANCE_CLOSED_POS_CACHE.get("last_err") or {}
|
||||||
msg = f"成交 {trade_n} 笔,重建仓位 {pos_n} 条,已写入 {matched}/{len(rows)} 条记录"
|
|
||||||
if trade_n == 0:
|
if trade_n == 0:
|
||||||
msg += "(未拉到成交,请检查 API 权限/代理/EXCHANGE_POSITION_SYNC_FROM_BJ)"
|
msg = f"本次未拉到成交(0 笔),未更新记录。请检查 API 权限/代理;同步起点 EXCHANGE_POSITION_SYNC_FROM_BJ 会按 7 天分段拉取。"
|
||||||
if pos_n == 0 and trade_n > 0:
|
else:
|
||||||
msg += "(有成交但未识别出完整平仓,请核对持仓模式 BINANCE_POSITION_MODE)"
|
msg = f"成交 {trade_n} 笔,重建仓位 {pos_n} 条,本次更新 {updated}/{len(rows)} 条"
|
||||||
|
if pos_n == 0:
|
||||||
|
msg += "(有成交但未识别完整平仓,请核对 BINANCE_POSITION_MODE=hedge)"
|
||||||
if err:
|
if err:
|
||||||
msg += f" 错误: {err}"
|
msg += f" 接口错误: {err}"
|
||||||
return jsonify(
|
return jsonify(
|
||||||
{
|
{
|
||||||
"ok": True,
|
"ok": trade_n > 0 and (updated > 0 or pos_n > 0),
|
||||||
"synced": matched,
|
"synced": updated,
|
||||||
"candidates": len(rows),
|
"candidates": len(rows),
|
||||||
"positions": pos_n,
|
"positions": pos_n,
|
||||||
"trade_counts": tc,
|
"trade_counts": tc,
|
||||||
@@ -5541,45 +5534,62 @@ def _unified_symbol_for_match(symbol_str):
|
|||||||
return s
|
return s
|
||||||
|
|
||||||
|
|
||||||
def _fetch_my_trades_paginated(exchange_symbol, since_ms, until_ms=None, max_pages=40):
|
def _fetch_my_trades_paginated(exchange_symbol, since_ms, until_ms=None, max_pages=120):
|
||||||
"""分页拉取成交(Binance userTrades)。"""
|
"""
|
||||||
|
分页拉取 U 本位成交。Binance 限制:startTime~endTime 窗口最长 7 天,需分段请求。
|
||||||
|
"""
|
||||||
global _BINANCE_CLOSED_POS_CACHE
|
global _BINANCE_CLOSED_POS_CACHE
|
||||||
if not (BINANCE_API_KEY and BINANCE_API_SECRET):
|
if not (BINANCE_API_KEY and BINANCE_API_SECRET):
|
||||||
_BINANCE_CLOSED_POS_CACHE["last_err"][exchange_symbol] = "未配置 API Key"
|
_BINANCE_CLOSED_POS_CACHE["last_err"][exchange_symbol] = "未配置 API Key"
|
||||||
return []
|
return []
|
||||||
ensure_markets_loaded()
|
ensure_markets_loaded()
|
||||||
|
until_ms = int(until_ms) if until_ms else int(time.time() * 1000)
|
||||||
|
start_ms = int(since_ms) if since_ms else until_ms - 7 * 24 * 60 * 60 * 1000
|
||||||
|
if start_ms >= until_ms:
|
||||||
|
start_ms = until_ms - 7 * 24 * 60 * 60 * 1000
|
||||||
|
week_ms = 7 * 24 * 60 * 60 * 1000 - 5000
|
||||||
out = []
|
out = []
|
||||||
since = int(since_ms) if since_ms else None
|
seen_ids = set()
|
||||||
until_ms = int(until_ms) if until_ms else None
|
|
||||||
last_err = None
|
last_err = None
|
||||||
for page in range(max_pages):
|
window_start = start_ms
|
||||||
|
pages = 0
|
||||||
|
while window_start < until_ms and pages < max_pages:
|
||||||
|
window_end = min(window_start + week_ms, until_ms)
|
||||||
try:
|
try:
|
||||||
batch = exchange.fetch_my_trades(exchange_symbol, since=since, limit=1000)
|
batch = exchange.fetch_my_trades(
|
||||||
|
exchange_symbol,
|
||||||
|
since=window_start,
|
||||||
|
limit=1000,
|
||||||
|
params={"endTime": window_end},
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
last_err = str(e)
|
last_err = str(e)
|
||||||
if page == 0 and since:
|
if pages == 0:
|
||||||
try:
|
try:
|
||||||
batch = exchange.fetch_my_trades(exchange_symbol, limit=1000)
|
batch = exchange.fetch_my_trades(exchange_symbol, limit=1000)
|
||||||
|
window_start = until_ms
|
||||||
except Exception as e2:
|
except Exception as e2:
|
||||||
last_err = str(e2)
|
last_err = str(e2)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
if not batch:
|
pages += 1
|
||||||
break
|
if batch:
|
||||||
last_ts = None
|
for t in batch:
|
||||||
for t in batch:
|
tid = t.get("id")
|
||||||
ts = _coerce_ts_ms(t.get("timestamp"))
|
if tid is not None and tid in seen_ids:
|
||||||
if until_ms and ts and ts > until_ms:
|
continue
|
||||||
continue
|
ts = _coerce_ts_ms(t.get("timestamp"))
|
||||||
out.append(t)
|
if ts and ts < start_ms:
|
||||||
if ts:
|
continue
|
||||||
last_ts = ts
|
if ts and ts > until_ms:
|
||||||
if len(batch) < 1000:
|
continue
|
||||||
break
|
if tid is not None:
|
||||||
if last_ts is None:
|
seen_ids.add(tid)
|
||||||
break
|
out.append(t)
|
||||||
since = last_ts + 1
|
window_start = window_end + 1
|
||||||
|
if not batch and window_start < until_ms:
|
||||||
|
continue
|
||||||
if last_err:
|
if last_err:
|
||||||
_BINANCE_CLOSED_POS_CACHE["last_err"][exchange_symbol] = last_err
|
_BINANCE_CLOSED_POS_CACHE["last_err"][exchange_symbol] = last_err
|
||||||
return out
|
return out
|
||||||
@@ -5856,13 +5866,13 @@ def sync_trade_record_exchange_pnl(conn, record_id, commit=True, force=False):
|
|||||||
|
|
||||||
|
|
||||||
def sync_trade_records_from_exchange(conn, force=False):
|
def sync_trade_records_from_exchange(conn, force=False):
|
||||||
"""为 trade_records 回填盈亏:成交重建已平仓位 + 时间匹配(对齐 App 仓位历史)。"""
|
"""为 trade_records 回填盈亏:成交重建已平仓位 + 时间匹配(对齐 App 仓位历史)。返回本次更新条数。"""
|
||||||
global _LAST_EXCHANGE_PNL_SYNC_AT, _BINANCE_CLOSED_POS_CACHE
|
global _LAST_EXCHANGE_PNL_SYNC_AT, _BINANCE_CLOSED_POS_CACHE
|
||||||
if not exchange_private_api_configured():
|
if not exchange_private_api_configured():
|
||||||
return
|
return 0
|
||||||
now = time.time()
|
now = time.time()
|
||||||
if not force and now - _LAST_EXCHANGE_PNL_SYNC_AT < 25.0:
|
if not force and now - _LAST_EXCHANGE_PNL_SYNC_AT < 25.0:
|
||||||
return
|
return 0
|
||||||
if force:
|
if force:
|
||||||
_BINANCE_CLOSED_POS_CACHE["at"] = 0.0
|
_BINANCE_CLOSED_POS_CACHE["at"] = 0.0
|
||||||
if force:
|
if force:
|
||||||
@@ -5881,7 +5891,7 @@ def sync_trade_records_from_exchange(conn, force=False):
|
|||||||
).fetchall()
|
).fetchall()
|
||||||
if not candidates:
|
if not candidates:
|
||||||
_LAST_EXCHANGE_PNL_SYNC_AT = now
|
_LAST_EXCHANGE_PNL_SYNC_AT = now
|
||||||
return
|
return 0
|
||||||
symbols = list({tr["symbol"] for tr in candidates if tr["symbol"]})
|
symbols = list({tr["symbol"] for tr in candidates if tr["symbol"]})
|
||||||
try:
|
try:
|
||||||
hist = fetch_binance_closed_positions_history(symbols=symbols, force_refresh=force)
|
hist = fetch_binance_closed_positions_history(symbols=symbols, force_refresh=force)
|
||||||
@@ -5893,8 +5903,13 @@ def sync_trade_records_from_exchange(conn, force=False):
|
|||||||
sk0 = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or ""
|
sk0 = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or ""
|
||||||
if sk0 and str(sk0).startswith("pos|"):
|
if sk0 and str(sk0).startswith("pos|"):
|
||||||
used.add(str(sk0).strip())
|
used.add(str(sk0).strip())
|
||||||
matched = 0
|
updated = 0
|
||||||
for tr in candidates:
|
for tr in candidates:
|
||||||
|
rid = int(tr["id"])
|
||||||
|
before = conn.execute(
|
||||||
|
"SELECT exchange_sync_key, exchange_realized_pnl FROM trade_records WHERE id=?",
|
||||||
|
(rid,),
|
||||||
|
).fetchone()
|
||||||
pos, _ = match_trade_record_to_position(
|
pos, _ = match_trade_record_to_position(
|
||||||
tr,
|
tr,
|
||||||
hist,
|
hist,
|
||||||
@@ -5902,24 +5917,29 @@ def sync_trade_records_from_exchange(conn, force=False):
|
|||||||
unified_symbol_fn=_unified_symbol_for_match,
|
unified_symbol_fn=_unified_symbol_for_match,
|
||||||
to_ms_fn=_to_ms_with_fallback,
|
to_ms_fn=_to_ms_with_fallback,
|
||||||
)
|
)
|
||||||
if not pos:
|
if pos:
|
||||||
continue
|
sk = pos.get("sync_key")
|
||||||
sk = pos.get("sync_key")
|
if sk and sk not in used:
|
||||||
if not sk or sk in used:
|
if _apply_closed_position_to_trade_record(conn, rid, pos):
|
||||||
continue
|
used.add(sk)
|
||||||
if _apply_closed_position_to_trade_record(conn, int(tr["id"]), pos):
|
updated += 1
|
||||||
used.add(sk)
|
continue
|
||||||
matched += 1
|
sk0 = (before["exchange_sync_key"] if before else None) or ""
|
||||||
for tr in candidates:
|
|
||||||
sk0 = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or ""
|
|
||||||
if sk0 and str(sk0).startswith("pos|"):
|
if sk0 and str(sk0).startswith("pos|"):
|
||||||
continue
|
continue
|
||||||
sync_trade_record_exchange_pnl(conn, int(tr["id"]), commit=False, force=force)
|
if sync_trade_record_exchange_pnl(conn, rid, commit=False, force=force):
|
||||||
|
after = conn.execute(
|
||||||
|
"SELECT exchange_sync_key FROM trade_records WHERE id=?", (rid,)
|
||||||
|
).fetchone()
|
||||||
|
sk1 = (after["exchange_sync_key"] if after else None) or ""
|
||||||
|
if sk1 != sk0 or (before and before["exchange_realized_pnl"] is None):
|
||||||
|
updated += 1
|
||||||
_LAST_EXCHANGE_PNL_SYNC_AT = now
|
_LAST_EXCHANGE_PNL_SYNC_AT = now
|
||||||
try:
|
try:
|
||||||
conn.commit()
|
conn.commit()
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
return updated
|
||||||
|
|
||||||
|
|
||||||
# ====================== 主页面 ======================
|
# ====================== 主页面 ======================
|
||||||
|
|||||||
Reference in New Issue
Block a user