diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 0fe0a0d..4d79498 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -2002,23 +2002,196 @@ def format_hold_minutes(minutes): return f"{mins}分钟" -def calc_pnl(direction, trigger_price, exit_price, margin_capital, leverage): +def calc_pnl(direction, trigger_price, exit_price, margin_capital, leverage, notional_usdt=None): + """估算盈亏(USDT)。优先用名义价值 notional_usdt,否则 margin×leverage。""" try: trigger = float(trigger_price) exit_p = float(exit_price) - margin = float(margin_capital) - lev = float(leverage) if trigger <= 0: return 0.0 + if notional_usdt is not None: + notional = float(notional_usdt) + else: + margin = float(margin_capital) + lev = float(leverage) + notional = margin * lev + if notional <= 0: + return 0.0 if direction == "short": pnl_ratio = (trigger - exit_p) / trigger else: pnl_ratio = (exit_p - trigger) / trigger - return round(margin * lev * pnl_ratio, FUNDS_DECIMALS) + return round(notional * pnl_ratio, FUNDS_DECIMALS) except Exception: return 0.0 +def get_plan_notional_usdt(row_or_dict): + """计划名义价值(USDT),与开仓 sizing 口径一致。""" + if row_or_dict is None: + return None + try: + if hasattr(row_or_dict, "keys"): + nv = row_or_dict["notional_value"] if "notional_value" in row_or_dict.keys() else None + margin = row_or_dict["margin_capital"] if "margin_capital" in row_or_dict.keys() else None + lev = row_or_dict["leverage"] if "leverage" in row_or_dict.keys() else None + sym = row_or_dict["symbol"] if "symbol" in row_or_dict.keys() else "" + else: + nv = row_or_dict.get("notional_value") + margin = row_or_dict.get("margin_capital") + lev = row_or_dict.get("leverage") + sym = row_or_dict.get("symbol") or "" + except Exception: + return None + try: + if nv is not None and str(nv).strip() != "": + v = float(nv) + if v > 0: + return round(v, FUNDS_DECIMALS) + except (TypeError, ValueError): + pass + try: + margin = float(margin or 0) + lev = float(lev or infer_leverage(sym) or 0) + if margin > 0 and lev > 0: + return round(margin * lev, FUNDS_DECIMALS) + except (TypeError, ValueError): + pass + return None + + +def _trade_ids_from_fills(trades): + ids = set() + for t in trades or []: + info = t.get("info") if isinstance(t.get("info"), dict) else {} + for src in (t, info): + if not isinstance(src, dict): + continue + for k in ("tradeId", "trade_id", "id"): + v = src.get(k) + if v is not None and str(v).strip() != "": + ids.add(str(v).strip()) + return ids + + +def _income_entry_trade_id(entry): + if not isinstance(entry, dict): + return "" + info = entry.get("info") if isinstance(entry.get("info"), dict) else {} + for src in (entry, info): + for k in ("tradeId", "trade_id"): + v = src.get(k) + if v is not None and str(v).strip() != "": + return str(v).strip() + return "" + + +def calc_pnl_from_closing_trades(direction, entry_price, trades, exchange_symbol=None): + """按减仓成交数量×价差汇总盈亏(不含资金费;比单点标记价更接近交易所)。""" + try: + entry = float(entry_price) + except (TypeError, ValueError): + return None + if entry <= 0 or not trades: + return None + contract_size = 1.0 + if exchange_symbol and BINANCE_API_KEY and BINANCE_API_SECRET: + try: + ensure_markets_loaded() + contract_size = float(exchange.market(exchange_symbol).get("contractSize") or 1) + except Exception: + contract_size = 1.0 + pnl = 0.0 + qty = 0.0 + for t in trades: + try: + price = float(t.get("price") or 0) + amount = float(t.get("amount") or 0) * contract_size + except (TypeError, ValueError): + continue + if price <= 0 or amount <= 0: + continue + qty += amount + if direction == "short": + pnl += amount * (entry - price) + else: + pnl += amount * (price - entry) + if qty <= 0: + return None + return round(pnl, FUNDS_DECIMALS) + + +def resolve_trade_pnl_amount( + row, + entry_price, + exit_price=None, + opened_at_str=None, + opened_at_ms=None, + closed_at_str=None, + closed_at_ms=None, +): + """ + 平仓盈亏:优先 Binance income 净额(含手续费),其次按减仓成交汇总,最后用计划名义×涨跌。 + 返回 (pnl, exit_price, exchange_opened_at, exchange_closed_at, exchange_sync_key)。 + """ + direction = (row["direction"] if hasattr(row, "keys") else row.get("direction") or "long").strip().lower() + sym = row["symbol"] if hasattr(row, "keys") else row.get("symbol") + ex_sym = ( + row["exchange_symbol"] + if hasattr(row, "keys") and "exchange_symbol" in row.keys() + else row.get("exchange_symbol") + ) or normalize_exchange_symbol(sym) + open_ms = _to_ms_with_fallback( + opened_at_ms if opened_at_ms is not None else (row["opened_at_ms"] if hasattr(row, "keys") and "opened_at_ms" in row.keys() else None), + opened_at_str or (row["opened_at"] if hasattr(row, "keys") else row.get("opened_at")), + ) + close_ms = _to_ms_with_fallback( + closed_at_ms, + closed_at_str, + ) + closing_trades = [] + if open_ms and (close_ms or closed_at_str): + closing_trades = fetch_closing_fills_for_record( + ex_sym, + direction, + opened_at_str or (row["opened_at"] if hasattr(row, "keys") else ""), + closed_at_str, + opened_at_ms=open_ms, + closed_at_ms=close_ms, + ) + if closing_trades: + wexit = calc_weighted_exit_price(closing_trades) + if wexit and (exit_price is None or float(exit_price or 0) <= 0): + exit_price = wexit + last_ts = closing_trades[-1].get("timestamp") + if last_ts and not closed_at_str: + closed_at_str = ms_to_app_local_str(int(last_ts)) + close_ms = int(last_ts) + net, sync_key, eo, ec = fetch_binance_net_pnl_for_trade( + ex_sym, direction, open_ms, close_ms, closing_trades=closing_trades + ) + if net is not None: + return net, exit_price, eo, ec, sync_key + if closing_trades: + fill_pnl = calc_pnl_from_closing_trades(direction, entry_price, closing_trades, ex_sym) + if fill_pnl is not None: + return fill_pnl, exit_price, None, None, None + notional = get_plan_notional_usdt(row) + margin = row["margin_capital"] if hasattr(row, "keys") else row.get("margin_capital") + lev = row["leverage"] if hasattr(row, "keys") else row.get("leverage") + if exit_price: + pnl = calc_pnl( + direction, + entry_price, + exit_price, + margin or DAILY_START_CAPITAL, + lev or infer_leverage(sym), + notional_usdt=notional, + ) + return pnl, exit_price, None, None, None + return 0.0, exit_price, None, None, None + + def calc_rr_ratio(direction, entry_price, stop_loss, take_profit): try: entry = float(entry_price) @@ -2149,7 +2322,7 @@ def insert_trade_record( kst = key_signal_type_for_trade_record(key_signal_type, KEY_MONITOR_AUTO_TYPES) snap_sl = initial_stop_loss if initial_stop_loss not in (None, "") else stop_loss er = (entry_reason or "").strip() or entry_reason_from_key_signal(kst) or "" - conn.execute( + cur = conn.execute( "INSERT INTO trade_records (symbol,monitor_type,key_signal_type,direction,trigger_price,stop_loss,initial_stop_loss,take_profit,margin_capital,leverage,pnl_amount,hold_seconds,trade_style,risk_amount,planned_rr,actual_rr,hold_minutes,opened_at,opened_at_ms,closed_at,closed_at_ms,result,miss_reason,exchange_trade_id,entry_reason) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)", ( symbol, monitor_type, kst, direction, trigger_price, snap_sl, snap_sl, take_profit, @@ -2158,6 +2331,13 @@ def insert_trade_record( open_ts, open_ts_ms, close_ts, close_ts_ms, result, miss_reason, exchange_trade_id, er or None ) ) + record_id = int(cur.lastrowid or 0) + if record_id and close_ts and result != "错过" and exchange_private_api_configured(): + try: + sync_trade_record_exchange_pnl(conn, record_id, commit=False) + except Exception: + pass + return record_id def calc_duration_text(open_str, close_str): @@ -3484,49 +3664,79 @@ def resolve_synced_flat_close(row, opened_at_str, opened_at_ms=None): trigger_price = row["trigger_price"] stop_loss = row["stop_loss"] take_profit = row["take_profit"] - margin_capital = row["margin_capital"] or DAILY_START_CAPITAL - leverage = row["leverage"] or infer_leverage(sym) exchange_symbol = row["exchange_symbol"] or normalize_exchange_symbol(sym) - trade = fetch_latest_closing_fill(exchange_symbol, direction, opened_at_str, opened_at_ms=opened_at_ms) - exit_px = None closed_at_str = app_now_str() - if trade: - try: - exit_px = float(trade.get("price") or 0) or None - except (TypeError, ValueError): - exit_px = None - ts = trade.get("timestamp") - if ts: - closed_at_str = ms_to_app_local_str(int(ts)) + closed_at_ms = None + closing_trades = fetch_closing_fills_for_record( + exchange_symbol, direction, opened_at_str, None, opened_at_ms=opened_at_ms + ) + exit_px = calc_weighted_exit_price(closing_trades) if closing_trades else None + if exit_px is None: + trade = fetch_latest_closing_fill(exchange_symbol, direction, opened_at_str, opened_at_ms=opened_at_ms) + if trade: + try: + exit_px = float(trade.get("price") or 0) or None + except (TypeError, ValueError): + exit_px = None + if not closing_trades: + closing_trades = [trade] + if closing_trades: + last_ts = closing_trades[-1].get("timestamp") + if last_ts: + closed_at_str = ms_to_app_local_str(int(last_ts)) + closed_at_ms = int(last_ts) + + open_ms = _to_ms_with_fallback( + row["opened_at_ms"] if "opened_at_ms" in row.keys() else None, opened_at_str + ) + close_ms = _to_ms_with_fallback(closed_at_ms, closed_at_str) + pnl, exit_px2, _, _, _ = resolve_trade_pnl_amount( + row, + trigger_price, + exit_px, + opened_at_str=opened_at_str, + opened_at_ms=open_ms, + closed_at_str=closed_at_str, + closed_at_ms=close_ms, + ) + if exit_px2: + exit_px = float(exit_px2) if exit_px is None or exit_px <= 0: p = get_price(sym) if p: guessed = classify_exit_by_levels(direction, trigger_price, stop_loss, take_profit, p) if guessed: - pnl = calc_pnl(direction, trigger_price, p, margin_capital, leverage) + pnl2, _, _, _, _ = resolve_trade_pnl_amount( + row, + trigger_price, + p, + opened_at_str=opened_at_str, + opened_at_ms=open_ms, + closed_at_str=closed_at_str, + closed_at_ms=close_ms, + ) return ( guessed, - pnl, + pnl2, closed_at_str, "未能拉取成交明细,按当前市价与止盈/止损位近似归类(建议核对交易所账单)", ) return ( "外部平仓", - 0.0, + pnl, closed_at_str, "检测到交易所仓位已关闭,且无法从成交记录还原平仓价", ) result = classify_exit_by_levels(direction, trigger_price, stop_loss, take_profit, exit_px) - pnl = calc_pnl(direction, trigger_price, exit_px, margin_capital, leverage) if result: return ( result, pnl, closed_at_str, - "按交易所成交记录同步为止盈/止损平仓", + "按交易所成交/流水同步为止盈/止损平仓", ) return ( "外部平仓", @@ -4733,6 +4943,7 @@ def check_order_monitors(): else: res = normalize_result_with_pnl(res, pnl_amount) close_order_id = "" + exit_p = None try: close_resp = close_exchange_order(r) close_order_id = close_resp.get("id", "") @@ -4804,6 +5015,16 @@ def check_order_monitors(): hold_seconds = calc_hold_seconds( opened_at, parse_dt_for_trading_day(closed_at) or now ) + exit_ref = exit_p if exit_p and float(exit_p) > 0 else p + pnl_amount, _, _, _, _ = resolve_trade_pnl_amount( + r, + trigger_price, + exit_ref, + opened_at_str=opened_at, + opened_at_ms=_to_ms_with_fallback(opened_at_ms, opened_at), + closed_at_str=closed_at, + closed_at_ms=_to_ms_with_fallback(None, closed_at), + ) insert_trade_record( conn, symbol=sym, @@ -4859,6 +5080,16 @@ def check_order_monitors(): ) continue cancel_binance_futures_open_orders(r["exchange_symbol"] or normalize_exchange_symbol(sym)) + exit_ref = exit_p if exit_p and float(exit_p) > 0 else p + pnl_amount, _, _, _, _ = resolve_trade_pnl_amount( + r, + trigger_price, + exit_ref, + opened_at_str=opened_at, + opened_at_ms=_to_ms_with_fallback(opened_at_ms, opened_at), + closed_at_str=closed_at, + closed_at_ms=_to_ms_with_fallback(None, closed_at), + ) session_capital = update_session_capital(conn, session_date, pnl_amount) send_wechat_msg( build_wechat_close_message( @@ -5077,6 +5308,47 @@ def api_sync_positions(): return jsonify({"ok": True, "days": days, "synced": int(synced)}) +@app.route("/api/sync_exchange_pnl", methods=["POST"]) +@login_required +def api_sync_exchange_pnl(): + """立即为近期未同步记录回填 Binance 净盈亏(含手续费)。""" + if not exchange_private_api_configured(): + return jsonify({"ok": False, "msg": "未配置 Binance API,无法同步"}), 400 + payload = request.get_json(silent=True) or {} + limit = 120 + force_all = str(payload.get("force", "")).lower() in ("1", "true", "yes") + try: + if payload.get("limit") is not None: + limit = max(1, min(500, int(payload.get("limit")))) + except (TypeError, ValueError): + pass + global _LAST_EXCHANGE_PNL_SYNC_AT + _LAST_EXCHANGE_PNL_SYNC_AT = 0.0 + conn = get_db() + if force_all: + rows = conn.execute( + "SELECT id FROM trade_records ORDER BY id DESC LIMIT ?", + (limit,), + ).fetchall() + else: + rows = conn.execute( + """ + SELECT id FROM trade_records + WHERE (exchange_sync_key IS NULL OR TRIM(exchange_sync_key) = '') + ORDER BY id DESC + LIMIT ? + """, + (limit,), + ).fetchall() + synced = 0 + for r in rows: + if sync_trade_record_exchange_pnl(conn, int(r["id"]), commit=False, force=True): + synced += 1 + conn.commit() + conn.close() + return jsonify({"ok": True, "synced": synced, "candidates": len(rows)}) + + def _coerce_ts_ms(val): if val is None or val == "": return None @@ -5153,7 +5425,9 @@ def _fetch_binance_income_entries(exchange_symbol, start_ms, end_ms): return out -def fetch_binance_net_pnl_for_trade(exchange_symbol, direction, open_ms, close_ms): +def fetch_binance_net_pnl_for_trade( + exchange_symbol, direction, open_ms, close_ms, closing_trades=None +): if open_ms is None or close_ms is None or close_ms < open_ms: return None, None, None, None buffer_ms = 5 * 60 * 1000 @@ -5162,6 +5436,7 @@ def fetch_binance_net_pnl_for_trade(exchange_symbol, direction, open_ms, close_m ) if not entries: return None, None, None, None + trade_ids = _trade_ids_from_fills(closing_trades) if closing_trades else None net = 0.0 first_t = None last_t = None @@ -5169,6 +5444,10 @@ def fetch_binance_net_pnl_for_trade(exchange_symbol, direction, open_ms, close_m it = (e.get("incomeType") or e.get("income_type") or "").strip() if it not in BINANCE_NET_INCOME_TYPES: continue + if trade_ids and it in ("REALIZED_PNL", "COMMISSION"): + tid = _income_entry_trade_id(e) + if tid and tid not in trade_ids: + continue try: net += float(e.get("income") or 0) except (TypeError, ValueError): @@ -5189,6 +5468,76 @@ def fetch_binance_net_pnl_for_trade(exchange_symbol, direction, open_ms, close_m return net, sync_key, eo, ec +def sync_trade_record_exchange_pnl(conn, record_id, commit=True, force=False): + """单条 trade_records 回填 Binance 净盈亏;成功时同时更新 pnl_amount 便于统计。""" + if not exchange_private_api_configured(): + return False + tr = conn.execute("SELECT * FROM trade_records WHERE id=?", (int(record_id),)).fetchone() + if not tr: + return False + sk_existing = (tr["exchange_sync_key"] if "exchange_sync_key" in tr.keys() else None) or "" + if not force and str(sk_existing).strip(): + return tr["exchange_realized_pnl"] is not None + direction = (tr["direction"] or "long").strip().lower() + open_ms = _to_ms_with_fallback( + tr["opened_at_ms"] if "opened_at_ms" in tr.keys() else None, tr["opened_at"] + ) + close_ms = _to_ms_with_fallback( + tr["closed_at_ms"] if "closed_at_ms" in tr.keys() else None, tr["closed_at"] + ) + if open_ms is None or close_ms is None: + return False + try: + ex_sym = normalize_exchange_symbol(tr["symbol"]) + except Exception: + return False + closing_trades = fetch_closing_fills_for_record( + ex_sym, + direction, + tr["opened_at"], + tr["closed_at"], + opened_at_ms=open_ms, + closed_at_ms=close_ms, + ) + net, sync_key, eo, ec = None, None, None, None + for attempt in range(3): + if attempt: + time.sleep(0.7) + net, sync_key, eo, ec = fetch_binance_net_pnl_for_trade( + ex_sym, direction, open_ms, close_ms, closing_trades=closing_trades + ) + if net is not None and sync_key: + break + if net is None: + net = calc_pnl_from_closing_trades( + direction, tr["trigger_price"], closing_trades, ex_sym + ) + if net is not None: + try: + ensure_markets_loaded() + cid = exchange.market(ex_sym).get("id") or ex_sym + except Exception: + cid = ex_sym + sync_key = f"fills|{cid}|{direction}|{open_ms}|{close_ms}|{net}" + if net is None or not sync_key: + return False + conn.execute( + """ + UPDATE trade_records + SET exchange_realized_pnl = ?, exchange_opened_at = ?, exchange_closed_at = ?, + exchange_sync_key = ?, pnl_amount = ? + WHERE id = ? + """, + (float(net), eo, ec, sync_key, float(net), int(record_id)), + ) + if commit: + try: + conn.commit() + except Exception: + pass + return True + + def sync_trade_records_from_exchange(conn): """为未同步的 trade_records 回填交易所口径净盈亏(Binance:income 流水汇总)。""" global _LAST_EXCHANGE_PNL_SYNC_AT @@ -5199,8 +5548,7 @@ def sync_trade_records_from_exchange(conn): return candidates = conn.execute( """ - SELECT id, symbol, direction, opened_at, opened_at_ms, closed_at, closed_at_ms - FROM trade_records + SELECT id FROM trade_records WHERE (exchange_sync_key IS NULL OR TRIM(exchange_sync_key) = '') ORDER BY id DESC LIMIT 120 @@ -5210,30 +5558,7 @@ def sync_trade_records_from_exchange(conn): _LAST_EXCHANGE_PNL_SYNC_AT = now return for tr in candidates: - direction = (tr["direction"] or "long").strip().lower() - open_ms = _to_ms_with_fallback( - tr["opened_at_ms"] if "opened_at_ms" in tr.keys() else None, tr["opened_at"] - ) - close_ms = _to_ms_with_fallback( - tr["closed_at_ms"] if "closed_at_ms" in tr.keys() else None, tr["closed_at"] - ) - if open_ms is None or close_ms is None: - continue - try: - ex_sym = normalize_exchange_symbol(tr["symbol"]) - except Exception: - continue - net, sync_key, eo, ec = fetch_binance_net_pnl_for_trade(ex_sym, direction, open_ms, close_ms) - if net is None or not sync_key: - continue - conn.execute( - """ - UPDATE trade_records - SET exchange_realized_pnl = ?, exchange_opened_at = ?, exchange_closed_at = ?, exchange_sync_key = ? - WHERE id = ? - """, - (float(net), eo, ec, sync_key, int(tr["id"])), - ) + sync_trade_record_exchange_pnl(conn, int(tr["id"]), commit=False) _LAST_EXCHANGE_PNL_SYNC_AT = now try: conn.commit() @@ -6548,21 +6873,55 @@ def del_order(id): return redirect("/") if row["status"] == "active": try: - p = get_price(row["symbol"]) or float(row["trigger_price"]) opened_at = get_opened_at_value(row) - closed_at = app_now_str() - hold_seconds = calc_hold_seconds(opened_at, app_now()) - pnl_amount = calc_pnl( - row["direction"], - row["trigger_price"], - p, - row["margin_capital"] or DAILY_START_CAPITAL, - row["leverage"] or infer_leverage(row["symbol"]) + opened_at_ms = _to_ms_with_fallback( + row["opened_at_ms"] if "opened_at_ms" in row.keys() else None, opened_at ) close_resp = close_exchange_order(row) close_order_id = close_resp.get("id", "") cancel_binance_futures_open_orders(row["exchange_symbol"] or normalize_exchange_symbol(row["symbol"])) - session_date = row["session_date"] or get_trading_day() + exit_p = extract_trade_price_from_order(close_resp) + closed_at = app_now_str() + closed_at_ms = None + if not exit_p or float(exit_p) <= 0: + tr_fill = fetch_latest_closing_fill( + row["exchange_symbol"] or normalize_exchange_symbol(row["symbol"]), + row["direction"], + opened_at, + opened_at_ms=opened_at_ms, + ) + if tr_fill and tr_fill.get("price"): + try: + exit_p = float(tr_fill["price"]) + except (TypeError, ValueError): + exit_p = None + ts = tr_fill.get("timestamp") + if ts: + closed_at = ms_to_app_local_str(int(ts)) + closed_at_ms = int(ts) + else: + tr_fill = fetch_latest_closing_fill( + row["exchange_symbol"] or normalize_exchange_symbol(row["symbol"]), + row["direction"], + opened_at, + opened_at_ms=opened_at_ms, + ) + if tr_fill and tr_fill.get("timestamp"): + closed_at = ms_to_app_local_str(int(tr_fill["timestamp"])) + closed_at_ms = int(tr_fill["timestamp"]) + pnl_amount, exit_p, _, _, _ = resolve_trade_pnl_amount( + row, + row["trigger_price"], + exit_p, + opened_at_str=opened_at, + opened_at_ms=opened_at_ms, + closed_at_str=closed_at, + closed_at_ms=closed_at_ms, + ) + p = exit_p or get_price(row["symbol"]) or float(row["trigger_price"]) + closed_at_dt = parse_dt_for_trading_day(closed_at) or app_now() + hold_seconds = calc_hold_seconds(opened_at, closed_at_dt) + session_date = row["session_date"] or get_trading_day(closed_at_dt) session_capital = update_session_capital(conn, session_date, pnl_amount) insert_trade_record( conn, diff --git a/crypto_monitor_binance/templates/index.html b/crypto_monitor_binance/templates/index.html index 2708c0d..4867d5f 100644 --- a/crypto_monitor_binance/templates/index.html +++ b/crypto_monitor_binance/templates/index.html @@ -231,6 +231,7 @@ 复盘记录 关键位(当前) 关键位历史 +