diff --git a/crypto_monitor_binance/.env.example b/crypto_monitor_binance/.env.example index 28f5336..bb0e506 100644 --- a/crypto_monitor_binance/.env.example +++ b/crypto_monitor_binance/.env.example @@ -75,10 +75,8 @@ BINANCE_TRIGGER_WORKING_TYPE=CONTRACT_PRICE # EXCHANGE_DISPLAY_NAME=Binance # 企业微信推送里展示的账户备注 # BINANCE_ACCOUNT_LABEL=binance实盘账户 -# 盈亏同步:false=按仓位历史口径(已实现盈亏+手续费,不含资金费);true=含资金费 +# 平仓盈亏估算:false=按仓位历史口径(已实现盈亏+手续费,不含资金费);true=含资金费 # BINANCE_PNL_INCLUDE_FUNDING=false -# 与币安 App 仓位历史对齐目标误差(USDT),默认 0.05 -# BINANCE_PNL_MATCH_TOLERANCE=0.05 # ============================================================================= # 关键位门控(页面「关键位监控」规则条与 _key_hard_checks 共用) diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 554445e..1a41356 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -44,10 +44,6 @@ from fib_key_monitor_lib import ( key_signal_type_for_trade_record, stored_key_signal_type, ) -from binance_closed_positions_lib import ( - match_trade_record_to_position, - rebuild_closed_positions_from_trades, -) from history_window_lib import ( PRESET_CUSTOM, PRESET_UTC_LAST24H, @@ -164,8 +160,6 @@ KEY_SIZING_USE_ZERO_POSITION_SNAPSHOT = os.getenv("KEY_SIZING_USE_ZERO_POSITION_ ORDER_MONITOR_TYPE_MANUAL = "下单监控" ORDER_MONITOR_TYPE_KEY_AUTO = "关键位监控" KEY_MONITOR_AUTO_TYPES = frozenset({"箱体突破", "收敛突破"}) -EXCHANGE_POSITION_SYNC_FROM_BJ = (os.getenv("EXCHANGE_POSITION_SYNC_FROM_BJ") or "").strip() -EXCHANGE_POSITION_HISTORY_LIMIT = max(50, min(1000, int(os.getenv("EXCHANGE_POSITION_HISTORY_LIMIT", "200")))) # 与币安 App「仓位历史-实现盈亏」对齐:默认仅 REALIZED_PNL(手续费另计;避免与 COMMISSION 重复扣) BINANCE_APP_PNL_INCOME_TYPES = frozenset({"REALIZED_PNL"}) BINANCE_APP_PNL_INCOME_WITH_FEE = frozenset({"REALIZED_PNL", "COMMISSION"}) @@ -177,10 +171,6 @@ BINANCE_PNL_INCLUDE_FUNDING = os.getenv("BINANCE_PNL_INCLUDE_FUNDING", "false"). "true", "yes", ) -# 与币安 App 仓位历史对齐目标误差(USDT) -BINANCE_PNL_MATCH_TOLERANCE = max(0.01, float(os.getenv("BINANCE_PNL_MATCH_TOLERANCE", "0.05"))) -_LAST_EXCHANGE_PNL_SYNC_AT = 0.0 -_BINANCE_CLOSED_POS_CACHE = {"at": 0.0, "hist": [], "trade_counts": {}, "last_err": {}} KEY_MONITOR_ALERT_ONLY_TYPES = frozenset({"关键阻力位", "关键支撑位"}) AUTO_TRANSFER_ENABLED = os.getenv("AUTO_TRANSFER_ENABLED", "false").lower() == "true" AUTO_TRANSFER_AMOUNT = float(os.getenv("AUTO_TRANSFER_AMOUNT", "30")) @@ -2268,39 +2258,6 @@ def resolve_trade_pnl_amount( if last_ts and not closed_at_str: closed_at_str = ms_to_app_local_str(int(last_ts)) close_ms = int(last_ts) - if close_ms and open_ms: - try: - hist = fetch_binance_closed_positions_history(symbols=[sym]) - fake = { - "symbol": sym, - "direction": direction, - "opened_at": opened_at_str or (row["opened_at"] if hasattr(row, "keys") else ""), - "closed_at": closed_at_str, - "opened_at_ms": open_ms, - "closed_at_ms": close_ms, - } - pos, _ = match_trade_record_to_position( - fake, - hist, - set(), - unified_symbol_fn=_unified_symbol_for_match, - to_ms_fn=_to_ms_with_fallback, - ) - if pos and pos.get("pnl") is not None: - eo = ms_to_app_local_str(pos["open_ms"]) if pos.get("open_ms") else None - ec = ms_to_app_local_str(pos["close_ms"]) if pos.get("close_ms") else None - ep = pos.get("exit_price") - if ep and (exit_price is None or float(exit_price or 0) <= 0): - exit_price = float(ep) - return ( - float(pos["pnl"]), - exit_price, - eo, - ec, - pos.get("sync_key"), - ) - except Exception: - pass net, sync_key, eo, ec = fetch_binance_net_pnl_for_trade( ex_sym, direction, open_ms, close_ms, closing_trades=closing_trades ) @@ -2468,13 +2425,7 @@ def insert_trade_record( open_ts, open_ts_ms, close_ts, close_ts_ms, result, miss_reason, exchange_trade_id, er or None ) ) - record_id = int(cur.lastrowid or 0) - if record_id and close_ts and result != "错过" and exchange_private_api_configured(): - try: - sync_trade_record_exchange_pnl(conn, record_id, commit=False) - except Exception: - pass - return record_id + return int(cur.lastrowid or 0) def calc_duration_text(open_str, close_str): @@ -5435,78 +5386,6 @@ def api_sync_positions(): return jsonify({"ok": True, "days": days, "synced": int(synced)}) -@app.route("/api/sync_exchange_pnl", methods=["POST"]) -@login_required -def api_sync_exchange_pnl(): - """立即为近期未同步记录回填 Binance 净盈亏(含手续费)。""" - if not exchange_private_api_configured(): - return jsonify({"ok": False, "msg": "未配置 Binance API,无法同步"}), 400 - payload = request.get_json(silent=True) or {} - limit = 120 - force_all = str(payload.get("force", "")).lower() in ("1", "true", "yes") - try: - if payload.get("limit") is not None: - limit = max(1, min(500, int(payload.get("limit")))) - except (TypeError, ValueError): - pass - global _LAST_EXCHANGE_PNL_SYNC_AT, _BINANCE_CLOSED_POS_CACHE - _LAST_EXCHANGE_PNL_SYNC_AT = 0.0 - _BINANCE_CLOSED_POS_CACHE["at"] = 0.0 - conn = get_db() - if force_all: - rows = conn.execute( - "SELECT id FROM trade_records ORDER BY id DESC LIMIT ?", - (limit,), - ).fetchall() - else: - rows = conn.execute( - """ - SELECT id FROM trade_records - WHERE (exchange_sync_key IS NULL OR TRIM(exchange_sync_key) = '') - ORDER BY id DESC - LIMIT ? - """, - (limit,), - ).fetchall() - sym_rows = conn.execute( - f""" - SELECT DISTINCT symbol FROM trade_records - WHERE id IN ({",".join("?" * len(rows)) if rows else "NULL"}) - """, - tuple(int(r["id"]) for r in rows), - ).fetchall() if rows else [] - symbols = [sr["symbol"] for sr in sym_rows if sr["symbol"]] - try: - fetch_binance_closed_positions_history(symbols=symbols, force_refresh=True) - except Exception: - pass - updated = sync_trade_records_from_exchange(conn, force=True) - conn.commit() - conn.close() - tc = _BINANCE_CLOSED_POS_CACHE.get("trade_counts") or {} - trade_n = sum(int(v) for v in tc.values()) - pos_n = len(_BINANCE_CLOSED_POS_CACHE.get("hist") or []) - err = _BINANCE_CLOSED_POS_CACHE.get("last_err") or {} - if trade_n == 0: - msg = f"本次未拉到成交(0 笔),未更新记录。请检查 API 权限/代理;同步起点 EXCHANGE_POSITION_SYNC_FROM_BJ 会按 7 天分段拉取。" - else: - msg = f"成交 {trade_n} 笔,重建仓位 {pos_n} 条,本次更新 {updated}/{len(rows)} 条" - if pos_n == 0: - msg += "(有成交但未识别完整平仓,请核对 BINANCE_POSITION_MODE=hedge)" - if err: - msg += f" 接口错误: {err}" - return jsonify( - { - "ok": trade_n > 0 and (updated > 0 or pos_n > 0), - "synced": updated, - "candidates": len(rows), - "positions": pos_n, - "trade_counts": tc, - "msg": msg, - } - ) - - def _coerce_ts_ms(val): if val is None or val == "": return None @@ -5521,179 +5400,6 @@ def _coerce_ts_ms(val): return int(v * 1000.0) -def _unified_symbol_for_match(symbol_str): - s = (symbol_str or "").strip().upper() - if not s: - return "" - if ":" in s: - s = s.split(":")[0] - if "_" in s and "/" not in s: - s = s.replace("_", "/") - if s.endswith("USDT") and "/" not in s and len(s) > 4: - s = f"{s[:-4]}/USDT" - return s - - -def _fetch_my_trades_paginated(exchange_symbol, since_ms, until_ms=None, max_pages=120): - """ - 分页拉取 U 本位成交。Binance 限制:startTime~endTime 窗口最长 7 天,需分段请求。 - """ - global _BINANCE_CLOSED_POS_CACHE - if not (BINANCE_API_KEY and BINANCE_API_SECRET): - _BINANCE_CLOSED_POS_CACHE["last_err"][exchange_symbol] = "未配置 API Key" - return [] - ensure_markets_loaded() - until_ms = int(until_ms) if until_ms else int(time.time() * 1000) - start_ms = int(since_ms) if since_ms else until_ms - 7 * 24 * 60 * 60 * 1000 - if start_ms >= until_ms: - start_ms = until_ms - 7 * 24 * 60 * 60 * 1000 - week_ms = 7 * 24 * 60 * 60 * 1000 - 5000 - out = [] - seen_ids = set() - last_err = None - window_start = start_ms - pages = 0 - while window_start < until_ms and pages < max_pages: - window_end = min(window_start + week_ms, until_ms) - try: - batch = exchange.fetch_my_trades( - exchange_symbol, - since=window_start, - limit=1000, - params={"endTime": window_end}, - ) - except Exception as e: - last_err = str(e) - if pages == 0: - try: - batch = exchange.fetch_my_trades(exchange_symbol, limit=1000) - window_start = until_ms - except Exception as e2: - last_err = str(e2) - break - else: - break - pages += 1 - if batch: - for t in batch: - tid = t.get("id") - if tid is not None and tid in seen_ids: - continue - ts = _coerce_ts_ms(t.get("timestamp")) - if ts and ts < start_ms: - continue - if ts and ts > until_ms: - continue - if tid is not None: - seen_ids.add(tid) - out.append(t) - window_start = window_end + 1 - if not batch and window_start < until_ms: - continue - if last_err: - _BINANCE_CLOSED_POS_CACHE["last_err"][exchange_symbol] = last_err - return out - - -def fetch_binance_closed_positions_history(symbols=None, force_refresh=False): - """ - 从成交重建已平仓位(对齐 App 仓位历史实现盈亏,不含资金费)。 - symbols: 可选 symbol 列表(如 NEAR/USDT),为空则仅返回缓存。 - """ - global _BINANCE_CLOSED_POS_CACHE - now = time.time() - if ( - not force_refresh - and _BINANCE_CLOSED_POS_CACHE["hist"] - and now - float(_BINANCE_CLOSED_POS_CACHE["at"] or 0) < 25.0 - and not symbols - ): - return list(_BINANCE_CLOSED_POS_CACHE["hist"]) - if not exchange_private_api_configured(): - return [] - sym_list = [] - for s in symbols or []: - try: - sym_list.append(normalize_exchange_symbol(s)) - except Exception: - continue - if not sym_list: - return list(_BINANCE_CLOSED_POS_CACHE["hist"] or []) - since_ms = exchange_position_sync_since_ms() - until_ms = int(time.time() * 1000) + 120_000 - trades_by_symbol = {} - trade_counts = {} - for ex_sym in sym_list: - trades = _fetch_my_trades_paginated(ex_sym, since_ms, until_ms) - trades_by_symbol[ex_sym] = trades - trade_counts[ex_sym] = len(trades) - - def _contract_size(ex_sym): - try: - ensure_markets_loaded() - return float(exchange.market(ex_sym).get("contractSize") or 1) - except Exception: - return 1.0 - - hist = rebuild_closed_positions_from_trades( - trades_by_symbol, - unified_symbol_fn=_unified_symbol_for_match, - position_mode=BINANCE_POSITION_MODE, - contract_size_fn=_contract_size, - since_ms=since_ms, - ) - prev = list(_BINANCE_CLOSED_POS_CACHE["hist"] or []) - if prev and not force_refresh: - keys = {h.get("sync_key") for h in prev if h.get("sync_key")} - for h in hist: - sk = h.get("sync_key") - if sk and sk not in keys: - prev.append(h) - hist = sorted(prev, key=lambda x: int(x.get("close_ms") or 0), reverse=True) - _BINANCE_CLOSED_POS_CACHE["at"] = now - _BINANCE_CLOSED_POS_CACHE["hist"] = hist - _BINANCE_CLOSED_POS_CACHE["trade_counts"] = trade_counts - return hist - - -def _apply_closed_position_to_trade_record(conn, trade_id, pos): - pnl_val = pos.get("pnl") - if pnl_val is None: - return False - sk = pos.get("sync_key") or "" - eo = ms_to_app_local_str(pos["open_ms"]) if pos.get("open_ms") else None - ec = ms_to_app_local_str(pos["close_ms"]) if pos.get("close_ms") else None - conn.execute( - """ - UPDATE trade_records - SET exchange_realized_pnl = ?, exchange_opened_at = ?, exchange_closed_at = ?, - exchange_sync_key = ?, pnl_amount = ? - WHERE id = ? - """, - (float(pnl_val), eo, ec, sk, float(pnl_val), int(trade_id)), - ) - return True - - -def exchange_position_sync_since_ms(): - s = EXCHANGE_POSITION_SYNC_FROM_BJ - if s: - for fmt, ln in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d", 10)): - try: - chunk = s[:ln] if len(s) >= ln else s[:10] - dt = datetime.strptime(chunk, fmt) - aware = dt.replace(tzinfo=APP_TZ) - return int(aware.timestamp() * 1000) - except Exception: - continue - dt0 = app_now() - timedelta(days=90) - try: - aware0 = datetime(dt0.year, dt0.month, dt0.day, 0, 0, 0, tzinfo=APP_TZ) - except Exception: - aware0 = datetime.now(APP_TZ) - return int(aware0.timestamp() * 1000) - - def _fetch_binance_income_entries(exchange_symbol, start_ms, end_ms): if not hasattr(exchange, "fapiPrivateGetIncome"): return [] @@ -5783,165 +5489,6 @@ def fetch_binance_net_pnl_for_trade( return None, None, None, None -def sync_trade_record_exchange_pnl(conn, record_id, commit=True, force=False): - """单条 trade_records:优先按成交重建的已平仓位匹配(对齐 App 仓位历史)。""" - if not exchange_private_api_configured(): - return False - tr = conn.execute("SELECT * FROM trade_records WHERE id=?", (int(record_id),)).fetchone() - if not tr: - return False - sk_existing = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or "" - if not force and str(sk_existing).strip().startswith("pos|"): - return tr["exchange_realized_pnl"] is not None - if not force and str(sk_existing).strip() and not str(sk_existing).startswith("pos|"): - pass - open_ms = _to_ms_with_fallback( - tr["opened_at_ms"] if "opened_at_ms" in tr.keys() else None, tr["opened_at"] - ) - close_ms = _to_ms_with_fallback( - tr["closed_at_ms"] if "closed_at_ms" in tr.keys() else None, tr["closed_at"] - ) - if open_ms is None or close_ms is None: - return False - try: - hist = fetch_binance_closed_positions_history( - symbols=[tr["symbol"]], force_refresh=force - ) - except Exception: - hist = [] - used = set() - if force: - used = set() - else: - rows = conn.execute( - "SELECT exchange_sync_key FROM trade_records WHERE exchange_sync_key LIKE 'pos|%'" - ).fetchall() - for r in rows: - sk = (r["exchange_sync_key"] if "exchange_sync_key" in r.keys() else None) or "" - if sk: - used.add(str(sk).strip()) - pos, _ = match_trade_record_to_position( - tr, - hist, - used, - unified_symbol_fn=_unified_symbol_for_match, - to_ms_fn=_to_ms_with_fallback, - ) - if pos: - if _apply_closed_position_to_trade_record(conn, int(record_id), pos): - if commit: - try: - conn.commit() - except Exception: - pass - return True - direction = (tr["direction"] or "long").strip().lower() - try: - ex_sym = normalize_exchange_symbol(tr["symbol"]) - except Exception: - return False - closing_trades = fetch_closing_fills_for_record( - ex_sym, direction, tr["opened_at"], tr["closed_at"], opened_at_ms=open_ms, closed_at_ms=close_ms - ) - net, sync_key, eo, ec = fetch_binance_net_pnl_for_trade( - ex_sym, direction, open_ms, close_ms, closing_trades=closing_trades - ) - if net is None or not sync_key: - return False - conn.execute( - """ - UPDATE trade_records - SET exchange_realized_pnl = ?, exchange_opened_at = ?, exchange_closed_at = ?, - exchange_sync_key = ?, pnl_amount = ? - WHERE id = ? - """, - (float(net), eo, ec, sync_key, float(net), int(record_id)), - ) - if commit: - try: - conn.commit() - except Exception: - pass - return True - - -def sync_trade_records_from_exchange(conn, force=False): - """为 trade_records 回填盈亏:成交重建已平仓位 + 时间匹配(对齐 App 仓位历史)。返回本次更新条数。""" - global _LAST_EXCHANGE_PNL_SYNC_AT, _BINANCE_CLOSED_POS_CACHE - if not exchange_private_api_configured(): - return 0 - now = time.time() - if not force and now - _LAST_EXCHANGE_PNL_SYNC_AT < 25.0: - return 0 - if force: - _BINANCE_CLOSED_POS_CACHE["at"] = 0.0 - if force: - candidates = conn.execute( - "SELECT * FROM trade_records ORDER BY id DESC LIMIT 200" - ).fetchall() - else: - candidates = conn.execute( - """ - SELECT * FROM trade_records - WHERE (exchange_sync_key IS NULL OR TRIM(exchange_sync_key) = '' - OR exchange_sync_key NOT LIKE 'pos|%') - ORDER BY id DESC - LIMIT 200 - """ - ).fetchall() - if not candidates: - _LAST_EXCHANGE_PNL_SYNC_AT = now - return 0 - symbols = list({tr["symbol"] for tr in candidates if tr["symbol"]}) - try: - hist = fetch_binance_closed_positions_history(symbols=symbols, force_refresh=force) - except Exception: - hist = [] - used = set() - if not force: - for tr in candidates: - sk0 = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or "" - if sk0 and str(sk0).startswith("pos|"): - used.add(str(sk0).strip()) - updated = 0 - for tr in candidates: - rid = int(tr["id"]) - before = conn.execute( - "SELECT exchange_sync_key, exchange_realized_pnl FROM trade_records WHERE id=?", - (rid,), - ).fetchone() - pos, _ = match_trade_record_to_position( - tr, - hist, - used, - unified_symbol_fn=_unified_symbol_for_match, - to_ms_fn=_to_ms_with_fallback, - ) - if pos: - sk = pos.get("sync_key") - if sk and sk not in used: - if _apply_closed_position_to_trade_record(conn, rid, pos): - used.add(sk) - updated += 1 - continue - sk0 = (before["exchange_sync_key"] if before else None) or "" - if sk0 and str(sk0).startswith("pos|"): - continue - if sync_trade_record_exchange_pnl(conn, rid, commit=False, force=force): - after = conn.execute( - "SELECT exchange_sync_key FROM trade_records WHERE id=?", (rid,) - ).fetchone() - sk1 = (after["exchange_sync_key"] if after else None) or "" - if sk1 != sk0 or (before and before["exchange_realized_pnl"] is None): - updated += 1 - _LAST_EXCHANGE_PNL_SYNC_AT = now - try: - conn.commit() - except Exception: - pass - return updated - - # ====================== 主页面 ====================== def render_main_page(page="trade"): now = app_now() @@ -5966,11 +5513,6 @@ def render_main_page(page="trade"): order_list = [] for o in raw_order_list: order_list.append(enrich_order_item(row_to_dict(o), current_capital)) - if exchange_private_api_configured(): - try: - sync_trade_records_from_exchange(conn) - except Exception: - pass raw_records = conn.execute( "SELECT * FROM trade_records WHERE COALESCE(closed_at, created_at, opened_at) >= ? " "AND COALESCE(closed_at, created_at, opened_at) <= ? ORDER BY id DESC LIMIT 1000", diff --git a/crypto_monitor_binance/binance_closed_positions_lib.py b/crypto_monitor_binance/binance_closed_positions_lib.py deleted file mode 100644 index 0cfaff8..0000000 --- a/crypto_monitor_binance/binance_closed_positions_lib.py +++ /dev/null @@ -1,286 +0,0 @@ -""" -从 Binance U 本位成交重建「已平仓位」列表,口径对齐 App 仓位历史(实现盈亏,不含资金费)。 -""" - -from __future__ import annotations - - -def _trade_id(trade): - info = trade.get("info") if isinstance(trade.get("info"), dict) else {} - for k in ("tradeId", "trade_id"): - v = info.get(k) - if v is not None and str(v).strip() != "": - return str(v).strip() - oid = trade.get("id") - return str(oid).strip() if oid is not None else "" - - -def trade_pnl_contribution(trade): - """单笔成交对实现盈亏的贡献:realizedPnl + 手续费(不重复扣 commission)。""" - info = trade.get("info") if isinstance(trade.get("info"), dict) else {} - total = 0.0 - has = False - rp = info.get("realizedPnl") - if rp is not None and str(rp).strip() != "": - try: - total += float(rp) - has = True - except (TypeError, ValueError): - pass - fee = trade.get("fee") - if isinstance(fee, dict) and fee.get("cost") is not None: - try: - total += float(fee["cost"]) - has = True - except (TypeError, ValueError): - pass - elif info.get("commission") is not None and str(info.get("commission")).strip() != "": - try: - c = float(info["commission"]) - total -= abs(c) if c > 0 else abs(c) - has = True - except (TypeError, ValueError): - pass - return total if has else 0.0 - - -def _trade_belongs_to_direction(trade, direction, position_mode): - direction = (direction or "long").strip().lower() - info = trade.get("info") if isinstance(trade.get("info"), dict) else {} - pos_side = (info.get("posSide") or trade.get("posSide") or "").strip().lower() - side = (trade.get("side") or "").strip().lower() - if position_mode == "hedge": - if direction == "long": - if pos_side in ("short",): - return False - if pos_side in ("long",): - return True - return side in ("buy", "sell") - if pos_side in ("long",): - return False - if pos_side in ("short",): - return True - return side in ("buy", "sell") - return True - - -def _leg_delta(trade, direction, position_mode): - """正数=加仓,负数=减仓(合约张数,未乘 contractSize)。""" - if not _trade_belongs_to_direction(trade, direction, position_mode): - return 0.0 - try: - amount = abs(float(trade.get("amount") or 0)) - except (TypeError, ValueError): - return 0.0 - if amount <= 0: - return 0.0 - side = (trade.get("side") or "").strip().lower() - info = trade.get("info") if isinstance(trade.get("info"), dict) else {} - pos_side = (info.get("posSide") or trade.get("posSide") or "").strip().lower() - direction = (direction or "long").strip().lower() - if position_mode == "hedge": - if direction == "long": - return amount if side == "buy" else -amount - return amount if side == "sell" else -amount - if direction == "long": - return amount if side == "buy" else -amount - return amount if side == "sell" else -amount - - -def rebuild_closed_positions_for_leg( - symbol_u, - direction, - trades, - *, - position_mode="hedge", - contract_size=1.0, - qty_eps=1e-9, - since_ms=None, -): - """从按时间排序的成交重建某一方向的已平仓位列表。""" - legs = [t for t in trades if _trade_belongs_to_direction(t, direction, position_mode)] - legs.sort(key=lambda x: int(x.get("timestamp") or 0)) - closed = [] - qty = 0.0 - open_ms = None - since_anchor_ms = int(since_ms) if since_ms else None - if since_anchor_ms is None and legs: - since_anchor_ms = int(legs[0].get("timestamp") or 0) or None - pnl_accum = 0.0 - close_ms = None - open_cost = 0.0 - open_qty = 0.0 - close_cost = 0.0 - close_qty = 0.0 - cycle_ids = [] - - def _flush(): - nonlocal qty, open_ms, pnl_accum, close_ms, open_cost, open_qty, close_cost, close_qty, cycle_ids - if open_ms is None or close_ms is None: - return - pnl_val = round(pnl_accum, 2) - entry = (open_cost / open_qty) if open_qty > 0 else None - exit_p = (close_cost / close_qty) if close_qty > 0 else None - sk = f"pos|{symbol_u}|{direction}|{open_ms}|{close_ms}|{pnl_val}" - closed.append( - { - "symbol_u": symbol_u, - "side": direction, - "open_ms": open_ms, - "close_ms": close_ms, - "pnl": pnl_val, - "entry_price": entry, - "exit_price": exit_p, - "sync_key": sk, - "trade_ids": list(cycle_ids), - } - ) - qty = 0.0 - open_ms = None - pnl_accum = 0.0 - close_ms = None - open_cost = open_qty = close_cost = close_qty = 0.0 - cycle_ids = [] - - for t in legs: - delta = _leg_delta(t, direction, position_mode) - if delta == 0: - continue - ts = int(t.get("timestamp") or 0) - try: - price = float(t.get("price") or 0) - except (TypeError, ValueError): - price = 0.0 - coin = abs(delta) * float(contract_size) - tid = _trade_id(t) - - if qty <= qty_eps and delta > 0: - open_ms = ts - pnl_accum = 0.0 - open_cost = open_qty = close_cost = close_qty = 0.0 - cycle_ids = [] - qty = delta - open_cost += price * coin - open_qty += coin - if tid: - cycle_ids.append(tid) - continue - - if delta > 0 and qty > qty_eps: - qty += delta - open_cost += price * coin - open_qty += coin - if tid: - cycle_ids.append(tid) - continue - - if delta < 0: - if qty <= qty_eps: - # 开仓早于拉取窗口:从首笔减仓起视为一段已平仓位 - qty = abs(delta) - open_ms = open_ms or since_anchor_ms or ts - pnl_accum = 0.0 - open_cost = open_qty = close_cost = close_qty = 0.0 - cycle_ids = [] - reduce = min(qty, abs(delta)) - qty -= reduce - pnl_accum += trade_pnl_contribution(t) - close_cost += price * reduce * float(contract_size) - close_qty += reduce * float(contract_size) - close_ms = ts - if tid: - cycle_ids.append(tid) - if qty <= qty_eps: - qty = 0.0 - _flush() - - return closed - - -def rebuild_closed_positions_from_trades( - trades_by_symbol, - *, - unified_symbol_fn, - position_mode="hedge", - contract_size_fn=None, - since_ms=None, -): - """ - trades_by_symbol: {exchange_symbol: [ccxt trade dict, ...]} - 返回已平仓位列表(含 symbol_u, side, open_ms, close_ms, pnl, sync_key)。 - """ - out = [] - for ex_sym, trades in (trades_by_symbol or {}).items(): - if not trades: - continue - symbol_u = unified_symbol_fn(ex_sym) if unified_symbol_fn else ex_sym - cs = 1.0 - if contract_size_fn: - try: - cs = float(contract_size_fn(ex_sym) or 1.0) - except Exception: - cs = 1.0 - for direction in ("long", "short"): - out.extend( - rebuild_closed_positions_for_leg( - symbol_u, - direction, - trades, - position_mode=position_mode, - contract_size=cs, - since_ms=since_ms, - ) - ) - out.sort(key=lambda x: int(x.get("close_ms") or 0), reverse=True) - return out - - -def match_trade_record_to_position( - trade_row, - closed_positions, - used_sync_keys, - *, - unified_symbol_fn, - to_ms_fn, - max_close_delta_ms=120 * 60 * 1000, - max_open_before_ms=15 * 60 * 1000, - max_open_after_ms=15 * 86400 * 1000, -): - """ - 为一条 trade_records 匹配最佳已平仓位;返回 (position_dict, close_delta_ms) 或 (None, None)。 - """ - sym_u = unified_symbol_fn(trade_row["symbol"]) if unified_symbol_fn else trade_row["symbol"] - direction = (trade_row["direction"] or "long").strip().lower() - close_ms = to_ms_fn( - trade_row["closed_at_ms"] if "closed_at_ms" in trade_row.keys() else None, - trade_row["closed_at"], - ) - open_ms = to_ms_fn( - trade_row["opened_at_ms"] if "opened_at_ms" in trade_row.keys() else None, - trade_row["opened_at"], - ) - if close_ms is None: - return None, None - best = None - best_d = None - for h in closed_positions: - sk = h.get("sync_key") - if not sk or sk in used_sync_keys: - continue - if h.get("symbol_u") != sym_u or h.get("side") != direction: - continue - cm = h.get("close_ms") - if cm is None: - continue - if open_ms is not None: - if cm < open_ms - max_open_before_ms: - continue - if cm > open_ms + max_open_after_ms: - continue - d = abs(int(cm) - int(close_ms)) - if best_d is None or d < best_d: - best_d = d - best = h - if best is None or best_d is None or best_d > max_close_delta_ms: - return None, None - return best, best_d diff --git a/crypto_monitor_binance/templates/index.html b/crypto_monitor_binance/templates/index.html index f512bda..2708c0d 100644 --- a/crypto_monitor_binance/templates/index.html +++ b/crypto_monitor_binance/templates/index.html @@ -231,7 +231,6 @@ 复盘记录 关键位(当前) 关键位历史 -