diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 84982e7..554445e 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -5480,32 +5480,25 @@ def api_sync_exchange_pnl(): fetch_binance_closed_positions_history(symbols=symbols, force_refresh=True) except Exception: pass - sync_trade_records_from_exchange(conn, force=True) - matched = 0 - for r in rows: - tr = conn.execute( - "SELECT exchange_realized_pnl, exchange_sync_key FROM trade_records WHERE id=?", - (int(r["id"]),), - ).fetchone() - if tr and tr["exchange_realized_pnl"] is not None: - matched += 1 + updated = sync_trade_records_from_exchange(conn, force=True) conn.commit() conn.close() tc = _BINANCE_CLOSED_POS_CACHE.get("trade_counts") or {} trade_n = sum(int(v) for v in tc.values()) pos_n = len(_BINANCE_CLOSED_POS_CACHE.get("hist") or []) err = _BINANCE_CLOSED_POS_CACHE.get("last_err") or {} - msg = f"成交 {trade_n} 笔,重建仓位 {pos_n} 条,已写入 {matched}/{len(rows)} 条记录" if trade_n == 0: - msg += "(未拉到成交,请检查 API 权限/代理/EXCHANGE_POSITION_SYNC_FROM_BJ)" - if pos_n == 0 and trade_n > 0: - msg += "(有成交但未识别出完整平仓,请核对持仓模式 BINANCE_POSITION_MODE)" + msg = f"本次未拉到成交(0 笔),未更新记录。请检查 API 权限/代理;同步起点 EXCHANGE_POSITION_SYNC_FROM_BJ 会按 7 天分段拉取。" + else: + msg = f"成交 {trade_n} 笔,重建仓位 {pos_n} 条,本次更新 {updated}/{len(rows)} 条" + if pos_n == 0: + msg += "(有成交但未识别完整平仓,请核对 BINANCE_POSITION_MODE=hedge)" if err: - msg += f" 错误: {err}" + msg += f" 接口错误: {err}" return jsonify( { - "ok": True, - "synced": matched, + "ok": trade_n > 0 and (updated > 0 or pos_n > 0), + "synced": updated, "candidates": len(rows), "positions": pos_n, "trade_counts": tc, @@ -5541,45 +5534,62 @@ def _unified_symbol_for_match(symbol_str): return s -def _fetch_my_trades_paginated(exchange_symbol, since_ms, until_ms=None, max_pages=40): - """分页拉取成交(Binance userTrades)。""" +def _fetch_my_trades_paginated(exchange_symbol, since_ms, until_ms=None, max_pages=120): + """ + 分页拉取 U 本位成交。Binance 限制:startTime~endTime 窗口最长 7 天,需分段请求。 + """ global _BINANCE_CLOSED_POS_CACHE if not (BINANCE_API_KEY and BINANCE_API_SECRET): _BINANCE_CLOSED_POS_CACHE["last_err"][exchange_symbol] = "未配置 API Key" return [] ensure_markets_loaded() + until_ms = int(until_ms) if until_ms else int(time.time() * 1000) + start_ms = int(since_ms) if since_ms else until_ms - 7 * 24 * 60 * 60 * 1000 + if start_ms >= until_ms: + start_ms = until_ms - 7 * 24 * 60 * 60 * 1000 + week_ms = 7 * 24 * 60 * 60 * 1000 - 5000 out = [] - since = int(since_ms) if since_ms else None - until_ms = int(until_ms) if until_ms else None + seen_ids = set() last_err = None - for page in range(max_pages): + window_start = start_ms + pages = 0 + while window_start < until_ms and pages < max_pages: + window_end = min(window_start + week_ms, until_ms) try: - batch = exchange.fetch_my_trades(exchange_symbol, since=since, limit=1000) + batch = exchange.fetch_my_trades( + exchange_symbol, + since=window_start, + limit=1000, + params={"endTime": window_end}, + ) except Exception as e: last_err = str(e) - if page == 0 and since: + if pages == 0: try: batch = exchange.fetch_my_trades(exchange_symbol, limit=1000) + window_start = until_ms except Exception as e2: last_err = str(e2) break else: break - if not batch: - break - last_ts = None - for t in batch: - ts = _coerce_ts_ms(t.get("timestamp")) - if until_ms and ts and ts > until_ms: - continue - out.append(t) - if ts: - last_ts = ts - if len(batch) < 1000: - break - if last_ts is None: - break - since = last_ts + 1 + pages += 1 + if batch: + for t in batch: + tid = t.get("id") + if tid is not None and tid in seen_ids: + continue + ts = _coerce_ts_ms(t.get("timestamp")) + if ts and ts < start_ms: + continue + if ts and ts > until_ms: + continue + if tid is not None: + seen_ids.add(tid) + out.append(t) + window_start = window_end + 1 + if not batch and window_start < until_ms: + continue if last_err: _BINANCE_CLOSED_POS_CACHE["last_err"][exchange_symbol] = last_err return out @@ -5856,13 +5866,13 @@ def sync_trade_record_exchange_pnl(conn, record_id, commit=True, force=False): def sync_trade_records_from_exchange(conn, force=False): - """为 trade_records 回填盈亏:成交重建已平仓位 + 时间匹配(对齐 App 仓位历史)。""" + """为 trade_records 回填盈亏:成交重建已平仓位 + 时间匹配(对齐 App 仓位历史)。返回本次更新条数。""" global _LAST_EXCHANGE_PNL_SYNC_AT, _BINANCE_CLOSED_POS_CACHE if not exchange_private_api_configured(): - return + return 0 now = time.time() if not force and now - _LAST_EXCHANGE_PNL_SYNC_AT < 25.0: - return + return 0 if force: _BINANCE_CLOSED_POS_CACHE["at"] = 0.0 if force: @@ -5881,7 +5891,7 @@ def sync_trade_records_from_exchange(conn, force=False): ).fetchall() if not candidates: _LAST_EXCHANGE_PNL_SYNC_AT = now - return + return 0 symbols = list({tr["symbol"] for tr in candidates if tr["symbol"]}) try: hist = fetch_binance_closed_positions_history(symbols=symbols, force_refresh=force) @@ -5893,8 +5903,13 @@ def sync_trade_records_from_exchange(conn, force=False): sk0 = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or "" if sk0 and str(sk0).startswith("pos|"): used.add(str(sk0).strip()) - matched = 0 + updated = 0 for tr in candidates: + rid = int(tr["id"]) + before = conn.execute( + "SELECT exchange_sync_key, exchange_realized_pnl FROM trade_records WHERE id=?", + (rid,), + ).fetchone() pos, _ = match_trade_record_to_position( tr, hist, @@ -5902,24 +5917,29 @@ def sync_trade_records_from_exchange(conn, force=False): unified_symbol_fn=_unified_symbol_for_match, to_ms_fn=_to_ms_with_fallback, ) - if not pos: - continue - sk = pos.get("sync_key") - if not sk or sk in used: - continue - if _apply_closed_position_to_trade_record(conn, int(tr["id"]), pos): - used.add(sk) - matched += 1 - for tr in candidates: - sk0 = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or "" + if pos: + sk = pos.get("sync_key") + if sk and sk not in used: + if _apply_closed_position_to_trade_record(conn, rid, pos): + used.add(sk) + updated += 1 + continue + sk0 = (before["exchange_sync_key"] if before else None) or "" if sk0 and str(sk0).startswith("pos|"): continue - sync_trade_record_exchange_pnl(conn, int(tr["id"]), commit=False, force=force) + if sync_trade_record_exchange_pnl(conn, rid, commit=False, force=force): + after = conn.execute( + "SELECT exchange_sync_key FROM trade_records WHERE id=?", (rid,) + ).fetchone() + sk1 = (after["exchange_sync_key"] if after else None) or "" + if sk1 != sk0 or (before and before["exchange_realized_pnl"] is None): + updated += 1 _LAST_EXCHANGE_PNL_SYNC_AT = now try: conn.commit() except Exception: pass + return updated # ====================== 主页面 ======================