diff --git a/crypto_monitor_okx/.env.example b/crypto_monitor_okx/.env.example index 02f13c6..9228f16 100644 --- a/crypto_monitor_okx/.env.example +++ b/crypto_monitor_okx/.env.example @@ -70,6 +70,10 @@ OKX_TD_MODE=cross OKX_POS_MODE=hedge # 仓位查询 instType(OKX) OKX_POSITION_INST_TYPE=SWAP +# 从 OKX 历史仓位同步已实现盈亏(北京时间起点,空=近 90 天 0 点起) +# EXCHANGE_POSITION_SYNC_FROM_BJ=2026-01-01 +# 单次拉取历史仓位条数上限(OKX 每页最多 100,程序会分页) +# EXCHANGE_POSITION_HISTORY_LIMIT=200 # 关键位监控:5m收线突破过滤参数 KLINE_TIMEFRAME=5m diff --git a/crypto_monitor_okx/app.py b/crypto_monitor_okx/app.py index ca70766..1ad9f36 100644 --- a/crypto_monitor_okx/app.py +++ b/crypto_monitor_okx/app.py @@ -185,6 +185,9 @@ KLINE_TIMEFRAME = os.getenv("KLINE_TIMEFRAME", "5m") FULL_MARGIN_BUFFER_RATIO = float(os.getenv("FULL_MARGIN_BUFFER_RATIO", "0.98")) TRANSFER_CCY = os.getenv("TRANSFER_CCY", "USDT") OKX_POSITION_INST_TYPE = os.getenv("OKX_POSITION_INST_TYPE", "SWAP") +EXCHANGE_POSITION_SYNC_FROM_BJ = (os.getenv("EXCHANGE_POSITION_SYNC_FROM_BJ") or "").strip() +EXCHANGE_POSITION_HISTORY_LIMIT = max(50, min(1000, int(os.getenv("EXCHANGE_POSITION_HISTORY_LIMIT", "200")))) +_LAST_EXCHANGE_PNL_SYNC_AT = 0.0 UPLOAD_FOLDER = resolve_path(os.getenv("UPLOAD_DIR", "static/images")) ORDER_CHART_ENABLED = os.getenv("ORDER_CHART_ENABLED", "true").lower() == "true" ORDER_CHART_TFS = [x.strip() for x in (os.getenv("ORDER_CHART_TFS", "4h,1h,15m,5m") or "").split(",") if x.strip()] @@ -1230,6 +1233,16 @@ def init_db(): try: c.execute("ALTER TABLE trade_records ADD COLUMN reviewed_entry_reason TEXT") except: pass + for ddl in ( + "ALTER TABLE trade_records ADD COLUMN exchange_realized_pnl REAL", + "ALTER TABLE trade_records ADD COLUMN exchange_opened_at TEXT", + "ALTER TABLE trade_records ADD COLUMN exchange_closed_at TEXT", + "ALTER TABLE trade_records ADD COLUMN exchange_sync_key TEXT", + ): + try: + c.execute(ddl) + except Exception: + pass try: c.execute("ALTER TABLE journal_entries ADD COLUMN mood_ai_score INTEGER") except: pass @@ -1757,6 +1770,29 @@ def to_effective_trade_dict(row): item["effective_hold_seconds"] = get_effective_trade_field(row, "reviewed_hold_seconds", "hold_seconds", item.get("hold_seconds")) er_eff = get_effective_trade_field(row, "reviewed_entry_reason", "entry_reason", item.get("entry_reason")) item["effective_entry_reason"] = (str(er_eff).strip() if er_eff is not None else "") or "" + try: + _keys = row.keys() if hasattr(row, "keys") else [] + except Exception: + _keys = [] + _reviewed_pnl_raw = row["reviewed_pnl_amount"] if "reviewed_pnl_amount" in _keys else None + has_reviewed_pnl = _reviewed_pnl_raw is not None and str(_reviewed_pnl_raw).strip() != "" + ex_pnl = item.get("exchange_realized_pnl") + if not has_reviewed_pnl and ex_pnl is not None and str(ex_pnl).strip() != "": + try: + item["effective_pnl_amount"] = round(float(ex_pnl), FUNDS_DECIMALS) + item["display_pnl_source"] = "exchange" + ex_open = (str(item.get("exchange_opened_at") or "").strip() or None) + ex_close = (str(item.get("exchange_closed_at") or "").strip() or None) + if ex_open: + item["effective_opened_at"] = ex_open + if ex_close: + item["effective_closed_at"] = ex_close + except (TypeError, ValueError): + item["display_pnl_source"] = "local" + elif has_reviewed_pnl: + item["display_pnl_source"] = "reviewed" + else: + item["display_pnl_source"] = "local" return item @@ -3226,6 +3262,260 @@ def reconcile_external_closes(conn, days=None): synced_count += 1 return synced_count + +def _coerce_ts_ms(val): + if val is None or val == "": + return None + try: + v = float(val) + except (TypeError, ValueError): + return None + if v > 1e12: + return int(v) + if v > 1e9: + return int(v * 1000.0) + return int(v * 1000.0) + + +def _unified_symbol_for_match(symbol_str): + """统一 ETH/USDT:USDT、ETH-USDT-SWAP 便于与 trade_records 比对。""" + s = (symbol_str or "").strip().upper() + if not s: + return "" + if ":" in s: + s = s.split(":")[0] + if "-" in s and "/" not in s: + parts = s.split("-") + if len(parts) >= 2 and parts[-1] in ("SWAP", "FUTURES", "FUTURE"): + s = f"{parts[0]}/{parts[1]}" + else: + s = s.replace("-", "/") + if "_" in s and "/" not in s: + s = s.replace("_", "/") + if s.endswith("USDT") and "/" not in s and len(s) > 4: + s = f"{s[:-4]}/USDT" + return s + + +def exchange_position_sync_since_ms(): + s = EXCHANGE_POSITION_SYNC_FROM_BJ + if s: + for fmt, ln in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d", 10)): + try: + chunk = s[:ln] if len(s) >= ln else s[:10] + dt = datetime.strptime(chunk, fmt) + aware = dt.replace(tzinfo=APP_TZ) + return int(aware.timestamp() * 1000) + except Exception: + continue + dt0 = app_now() - timedelta(days=90) + try: + aware0 = datetime(dt0.year, dt0.month, dt0.day, 0, 0, 0, tzinfo=APP_TZ) + except Exception: + aware0 = datetime.now(APP_TZ) + return int(aware0.timestamp() * 1000) + + +def _normalize_okx_position_history_entry(p): + if not p or not isinstance(p, dict): + return None + info = p.get("info") or {} + if not isinstance(info, dict): + info = {} + sym = p.get("symbol") or "" + if not sym: + inst = str(info.get("instId") or "").strip() + if inst: + try: + ensure_markets_loaded() + sym = exchange.market(inst).get("symbol") or "" + except Exception: + parts = inst.split("-") + if len(parts) >= 2: + sym = f"{parts[0]}/{parts[1]}" + side = (p.get("side") or info.get("direction") or info.get("posSide") or "").strip().lower() + if side not in ("long", "short"): + try: + pos_val = float(info.get("pos") or 0) + if pos_val > 0: + side = "long" + elif pos_val < 0: + side = "short" + except (TypeError, ValueError): + side = "" + rp = p.get("realizedPnl") + if rp is None: + rp = info.get("realizedPnl") + if rp is None: + rp = info.get("pnl") + try: + rp_f = float(rp) if rp is not None and str(rp).strip() != "" else None + except (TypeError, ValueError): + rp_f = None + close_ms = _coerce_ts_ms(p.get("lastUpdateTimestamp")) + if close_ms is None: + close_ms = _coerce_ts_ms(info.get("uTime")) + open_ms = _coerce_ts_ms(p.get("timestamp")) + if open_ms is None: + open_ms = _coerce_ts_ms(info.get("cTime")) + pos_id = str(info.get("posId") or "").strip() + inst_id = str(info.get("instId") or "").strip() + u_raw = info.get("uTime") + sync_key = pos_id or f"{inst_id}|{u_raw}|{side}" + return { + "symbol_u": _unified_symbol_for_match(sym), + "side": side, + "close_ms": close_ms, + "open_ms": open_ms, + "pnl": rp_f, + "sync_key": sync_key, + } + + +def fetch_okx_positions_close_history(): + if not exchange_private_api_configured(): + return [] + ensure_markets_loaded() + since_ms = exchange_position_sync_since_ms() + out = [] + page_limit = 100 + max_total = int(EXCHANGE_POSITION_HISTORY_LIMIT) + before = None + while len(out) < max_total: + params = {"instType": OKX_POSITION_INST_TYPE} + if before is not None: + params["before"] = str(before) + try: + rows = exchange.fetch_positions_history( + None, + since=int(since_ms), + limit=page_limit, + params=params, + ) + except Exception: + break + if not rows: + break + batch_min_u = None + for p in rows: + h = _normalize_okx_position_history_entry(p) + if h and h["close_ms"] and h["side"] in ("long", "short") and h["symbol_u"]: + out.append(h) + info = p.get("info") or {} + u = _coerce_ts_ms(info.get("uTime")) or _coerce_ts_ms(p.get("lastUpdateTimestamp")) + if u and (batch_min_u is None or u < batch_min_u): + batch_min_u = u + if len(rows) < page_limit or batch_min_u is None: + break + if before is not None and batch_min_u >= before: + break + before = batch_min_u + return out[:max_total] + + +def sync_trade_records_from_exchange(conn, force=False): + """为未同步的 trade_records 回填 OKX 历史仓位中的已实现盈亏。返回统计 dict。""" + global _LAST_EXCHANGE_PNL_SYNC_AT + stats = {"ok": False, "hist_count": 0, "matched": 0, "pending": 0, "skipped": False} + if not exchange_private_api_configured(): + stats["reason"] = "未配置 OKX_API_KEY / OKX_API_SECRET / OKX_API_PASSPHRASE" + return stats + now = time.time() + if not force and now - _LAST_EXCHANGE_PNL_SYNC_AT < 25.0: + stats["ok"] = True + stats["skipped"] = True + return stats + try: + hist = fetch_okx_positions_close_history() + except Exception as e: + stats["reason"] = str(e) + return stats + stats["hist_count"] = len(hist) + if not hist: + stats["ok"] = True + stats["reason"] = "交易所平仓历史为空(请检查 API 权限或 EXCHANGE_POSITION_SYNC_FROM_BJ)" + return stats + candidates = conn.execute( + """ + SELECT id, symbol, direction, closed_at, closed_at_ms, opened_at, opened_at_ms + FROM trade_records + WHERE (exchange_sync_key IS NULL OR TRIM(exchange_sync_key) = '') + OR exchange_realized_pnl IS NULL + ORDER BY id DESC + LIMIT 200 + """ + ).fetchall() + stats["pending"] = len(candidates) + if not candidates: + stats["ok"] = True + _LAST_EXCHANGE_PNL_SYNC_AT = now + return stats + used = set() + matched = 0 + for tr in candidates: + close_ms_trade = _to_ms_with_fallback( + tr["closed_at_ms"] if "closed_at_ms" in tr.keys() else None, tr["closed_at"] + ) or opened_at_str_to_ms(tr["closed_at"]) + open_ms_trade = _to_ms_with_fallback( + tr["opened_at_ms"] if "opened_at_ms" in tr.keys() else None, tr["opened_at"] + ) or opened_at_str_to_ms(tr["opened_at"]) + if close_ms_trade is None: + continue + best = None + best_d = None + for h in hist: + sk = h["sync_key"] + if not sk or sk in used: + continue + if h["symbol_u"] != _unified_symbol_for_match(tr["symbol"]): + continue + if h["side"] != (tr["direction"] or "long").strip().lower(): + continue + cm = h["close_ms"] + if cm is None: + continue + if open_ms_trade is not None: + if cm < open_ms_trade - 15 * 60 * 1000: + continue + if cm > open_ms_trade + 15 * 86400 * 1000: + continue + else: + if abs(cm - close_ms_trade) > 3 * 86400 * 1000: + continue + d = abs(cm - close_ms_trade) + if best_d is None or d < best_d: + best_d = d + best = h + if best is None or best_d is None or best_d > 90 * 60 * 1000: + continue + sk = best["sync_key"] + if sk in used: + continue + eo = ms_to_app_local_str(best["open_ms"]) if best.get("open_ms") else None + ec = ms_to_app_local_str(best["close_ms"]) if best.get("close_ms") else None + pnl_val = best.get("pnl") + if pnl_val is None: + pnl_val = 0.0 + conn.execute( + """ + UPDATE trade_records + SET exchange_realized_pnl = ?, exchange_opened_at = ?, exchange_closed_at = ?, exchange_sync_key = ? + WHERE id = ? + """, + (float(pnl_val), eo, ec, sk, int(tr["id"])), + ) + used.add(sk) + matched += 1 + stats["matched"] = matched + stats["ok"] = True + _LAST_EXCHANGE_PNL_SYNC_AT = now + try: + conn.commit() + except Exception: + pass + return stats + + # 获取实时价格 def get_price(symbol): try: @@ -4750,6 +5040,12 @@ def render_main_page(page="trade"): order_list = [] for o in raw_order_list: order_list.append(enrich_order_item(row_to_dict(o), current_capital)) + exchange_pnl_sync = {} + if exchange_private_api_configured(): + try: + exchange_pnl_sync = sync_trade_records_from_exchange(conn) or {} + except Exception as e: + exchange_pnl_sync = {"ok": False, "reason": str(e)} raw_records = conn.execute( f"SELECT * FROM trade_records WHERE {sql_list_time_field('closed_at', 'created_at', 'opened_at')} >= ? " f"AND {sql_list_time_field('closed_at', 'created_at', 'opened_at')} <= ? ORDER BY id DESC LIMIT 1000", @@ -4844,10 +5140,24 @@ def render_main_page(page="trade"): key_auto_min_planned_rr=KEY_AUTO_MIN_PLANNED_RR, kline_timeframe=KLINE_TIMEFRAME, funding_usdt=funding_usdt, + exchange_pnl_sync=exchange_pnl_sync, **strategy_extra, ) +@app.route("/api/sync_exchange_pnl") +@login_required +def api_sync_exchange_pnl(): + conn = get_db() + stats = sync_trade_records_from_exchange(conn, force=True) + try: + conn.commit() + except Exception: + pass + conn.close() + return jsonify(stats) + + @app.route("/") @login_required def index(): @@ -6027,7 +6337,8 @@ def export_trade_records(): rows = conn.execute( "SELECT id,symbol,monitor_type,key_signal_type,direction,trigger_price,stop_loss,initial_stop_loss,take_profit," "margin_capital,leverage,pnl_amount,hold_seconds,hold_minutes,planned_rr,actual_rr,risk_amount," - "opened_at,closed_at,result,miss_reason,entry_reason,reviewed_entry_reason,created_at " + "opened_at,closed_at,result,miss_reason,entry_reason,reviewed_entry_reason," + "exchange_realized_pnl,exchange_opened_at,exchange_closed_at,created_at " f"FROM trade_records WHERE {sql_list_time_field('closed_at', 'created_at', 'opened_at')} >= ? " f"AND {sql_list_time_field('closed_at', 'created_at', 'opened_at')} <= ? ORDER BY id ASC", (start_bj, end_bj), @@ -6038,7 +6349,7 @@ def export_trade_records(): "stop_loss_open_snapshot", "initial_stop_loss", "take_profit", "margin_capital", "leverage", "pnl_amount", "hold_seconds", "hold_minutes", "planned_rr", "actual_rr", "risk_amount", "opened_at", "closed_at", "result", "miss_reason", "entry_reason", "reviewed_entry_reason", - "created_at", "开仓类型", + "exchange_realized_pnl", "exchange_opened_at", "exchange_closed_at", "created_at", "开仓类型", ] data = [] for r in rows: @@ -6052,6 +6363,9 @@ def export_trade_records(): snap, r["initial_stop_loss"], r["take_profit"], r["margin_capital"], r["leverage"], r["pnl_amount"], r["hold_seconds"], r["hold_minutes"], r["planned_rr"], r["actual_rr"], r["risk_amount"], r["opened_at"], r["closed_at"], r["result"], r["miss_reason"], r["entry_reason"], r["reviewed_entry_reason"], + r["exchange_realized_pnl"] if "exchange_realized_pnl" in r.keys() else None, + r["exchange_opened_at"] if "exchange_opened_at" in r.keys() else None, + r["exchange_closed_at"] if "exchange_closed_at" in r.keys() else None, r["created_at"], eff, )) day = app_now().strftime("%Y%m%d") diff --git a/crypto_monitor_okx/更新文档.md b/crypto_monitor_okx/更新文档.md index 1aacc69..a499da0 100644 --- a/crypto_monitor_okx/更新文档.md +++ b/crypto_monitor_okx/更新文档.md @@ -70,9 +70,15 @@ ## 与 Gate 的差异(其余) - 无独立「关键位监控」导航页(斐波在 **交易执行** 页添加)。 -- 无交易所已实现盈亏同步(`/api/sync_exchange_pnl`)。 - 箱体/收敛仍为 **提醒** 模式,不自动市价开仓(Gate/Binance 主站为自动开仓)。 +## 交易所已实现盈亏(与 Gate 一致) + +- 打开 **交易执行 / 交易记录** 等主页面时,若已配置 `OKX_API_KEY` / `OKX_API_SECRET` / `OKX_API_PASSPHRASE`(只读即可),同进程约 **25 秒**内最多调用一次 OKX **历史仓位**(`fetch_positions_history`),为未写入 `exchange_sync_key` 的记录匹配并回填 `exchange_realized_pnl`。 +- 复盘列表盈亏优先展示交易所 U(旁标 **所**);本地公式估算标 **估**;人工复核优先。 +- 手动强制同步:`GET /api/sync_exchange_pnl`(需登录)。 +- 可选 `.env`:`EXCHANGE_POSITION_SYNC_FROM_BJ`(北京时间起点)、`EXCHANGE_POSITION_HISTORY_LIMIT`(默认 200)。 + ## 配置与部署 - 详见 `.env.example` 中 OKX(`OKX_*`)与通用风控项。