修改okx交易记录

This commit is contained in:
dekun
2026-05-27 06:24:53 +08:00
parent 480e5b1d20
commit b26647060e
3 changed files with 327 additions and 3 deletions
+4
View File
@@ -70,6 +70,10 @@ OKX_TD_MODE=cross
OKX_POS_MODE=hedge OKX_POS_MODE=hedge
# 仓位查询 instTypeOKX # 仓位查询 instTypeOKX
OKX_POSITION_INST_TYPE=SWAP OKX_POSITION_INST_TYPE=SWAP
# 从 OKX 历史仓位同步已实现盈亏(北京时间起点,空=近 90 天 0 点起)
# EXCHANGE_POSITION_SYNC_FROM_BJ=2026-01-01
# 单次拉取历史仓位条数上限(OKX 每页最多 100,程序会分页)
# EXCHANGE_POSITION_HISTORY_LIMIT=200
# 关键位监控:5m收线突破过滤参数 # 关键位监控:5m收线突破过滤参数
KLINE_TIMEFRAME=5m KLINE_TIMEFRAME=5m
+316 -2
View File
@@ -185,6 +185,9 @@ KLINE_TIMEFRAME = os.getenv("KLINE_TIMEFRAME", "5m")
FULL_MARGIN_BUFFER_RATIO = float(os.getenv("FULL_MARGIN_BUFFER_RATIO", "0.98")) FULL_MARGIN_BUFFER_RATIO = float(os.getenv("FULL_MARGIN_BUFFER_RATIO", "0.98"))
TRANSFER_CCY = os.getenv("TRANSFER_CCY", "USDT") TRANSFER_CCY = os.getenv("TRANSFER_CCY", "USDT")
OKX_POSITION_INST_TYPE = os.getenv("OKX_POSITION_INST_TYPE", "SWAP") OKX_POSITION_INST_TYPE = os.getenv("OKX_POSITION_INST_TYPE", "SWAP")
EXCHANGE_POSITION_SYNC_FROM_BJ = (os.getenv("EXCHANGE_POSITION_SYNC_FROM_BJ") or "").strip()
EXCHANGE_POSITION_HISTORY_LIMIT = max(50, min(1000, int(os.getenv("EXCHANGE_POSITION_HISTORY_LIMIT", "200"))))
_LAST_EXCHANGE_PNL_SYNC_AT = 0.0
UPLOAD_FOLDER = resolve_path(os.getenv("UPLOAD_DIR", "static/images")) UPLOAD_FOLDER = resolve_path(os.getenv("UPLOAD_DIR", "static/images"))
ORDER_CHART_ENABLED = os.getenv("ORDER_CHART_ENABLED", "true").lower() == "true" ORDER_CHART_ENABLED = os.getenv("ORDER_CHART_ENABLED", "true").lower() == "true"
ORDER_CHART_TFS = [x.strip() for x in (os.getenv("ORDER_CHART_TFS", "4h,1h,15m,5m") or "").split(",") if x.strip()] ORDER_CHART_TFS = [x.strip() for x in (os.getenv("ORDER_CHART_TFS", "4h,1h,15m,5m") or "").split(",") if x.strip()]
@@ -1230,6 +1233,16 @@ def init_db():
try: try:
c.execute("ALTER TABLE trade_records ADD COLUMN reviewed_entry_reason TEXT") c.execute("ALTER TABLE trade_records ADD COLUMN reviewed_entry_reason TEXT")
except: pass except: pass
for ddl in (
"ALTER TABLE trade_records ADD COLUMN exchange_realized_pnl REAL",
"ALTER TABLE trade_records ADD COLUMN exchange_opened_at TEXT",
"ALTER TABLE trade_records ADD COLUMN exchange_closed_at TEXT",
"ALTER TABLE trade_records ADD COLUMN exchange_sync_key TEXT",
):
try:
c.execute(ddl)
except Exception:
pass
try: try:
c.execute("ALTER TABLE journal_entries ADD COLUMN mood_ai_score INTEGER") c.execute("ALTER TABLE journal_entries ADD COLUMN mood_ai_score INTEGER")
except: pass except: pass
@@ -1757,6 +1770,29 @@ def to_effective_trade_dict(row):
item["effective_hold_seconds"] = get_effective_trade_field(row, "reviewed_hold_seconds", "hold_seconds", item.get("hold_seconds")) item["effective_hold_seconds"] = get_effective_trade_field(row, "reviewed_hold_seconds", "hold_seconds", item.get("hold_seconds"))
er_eff = get_effective_trade_field(row, "reviewed_entry_reason", "entry_reason", item.get("entry_reason")) er_eff = get_effective_trade_field(row, "reviewed_entry_reason", "entry_reason", item.get("entry_reason"))
item["effective_entry_reason"] = (str(er_eff).strip() if er_eff is not None else "") or "" item["effective_entry_reason"] = (str(er_eff).strip() if er_eff is not None else "") or ""
try:
_keys = row.keys() if hasattr(row, "keys") else []
except Exception:
_keys = []
_reviewed_pnl_raw = row["reviewed_pnl_amount"] if "reviewed_pnl_amount" in _keys else None
has_reviewed_pnl = _reviewed_pnl_raw is not None and str(_reviewed_pnl_raw).strip() != ""
ex_pnl = item.get("exchange_realized_pnl")
if not has_reviewed_pnl and ex_pnl is not None and str(ex_pnl).strip() != "":
try:
item["effective_pnl_amount"] = round(float(ex_pnl), FUNDS_DECIMALS)
item["display_pnl_source"] = "exchange"
ex_open = (str(item.get("exchange_opened_at") or "").strip() or None)
ex_close = (str(item.get("exchange_closed_at") or "").strip() or None)
if ex_open:
item["effective_opened_at"] = ex_open
if ex_close:
item["effective_closed_at"] = ex_close
except (TypeError, ValueError):
item["display_pnl_source"] = "local"
elif has_reviewed_pnl:
item["display_pnl_source"] = "reviewed"
else:
item["display_pnl_source"] = "local"
return item return item
@@ -3226,6 +3262,260 @@ def reconcile_external_closes(conn, days=None):
synced_count += 1 synced_count += 1
return synced_count return synced_count
def _coerce_ts_ms(val):
if val is None or val == "":
return None
try:
v = float(val)
except (TypeError, ValueError):
return None
if v > 1e12:
return int(v)
if v > 1e9:
return int(v * 1000.0)
return int(v * 1000.0)
def _unified_symbol_for_match(symbol_str):
"""统一 ETH/USDT:USDT、ETH-USDT-SWAP 便于与 trade_records 比对。"""
s = (symbol_str or "").strip().upper()
if not s:
return ""
if ":" in s:
s = s.split(":")[0]
if "-" in s and "/" not in s:
parts = s.split("-")
if len(parts) >= 2 and parts[-1] in ("SWAP", "FUTURES", "FUTURE"):
s = f"{parts[0]}/{parts[1]}"
else:
s = s.replace("-", "/")
if "_" in s and "/" not in s:
s = s.replace("_", "/")
if s.endswith("USDT") and "/" not in s and len(s) > 4:
s = f"{s[:-4]}/USDT"
return s
def exchange_position_sync_since_ms():
s = EXCHANGE_POSITION_SYNC_FROM_BJ
if s:
for fmt, ln in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d", 10)):
try:
chunk = s[:ln] if len(s) >= ln else s[:10]
dt = datetime.strptime(chunk, fmt)
aware = dt.replace(tzinfo=APP_TZ)
return int(aware.timestamp() * 1000)
except Exception:
continue
dt0 = app_now() - timedelta(days=90)
try:
aware0 = datetime(dt0.year, dt0.month, dt0.day, 0, 0, 0, tzinfo=APP_TZ)
except Exception:
aware0 = datetime.now(APP_TZ)
return int(aware0.timestamp() * 1000)
def _normalize_okx_position_history_entry(p):
if not p or not isinstance(p, dict):
return None
info = p.get("info") or {}
if not isinstance(info, dict):
info = {}
sym = p.get("symbol") or ""
if not sym:
inst = str(info.get("instId") or "").strip()
if inst:
try:
ensure_markets_loaded()
sym = exchange.market(inst).get("symbol") or ""
except Exception:
parts = inst.split("-")
if len(parts) >= 2:
sym = f"{parts[0]}/{parts[1]}"
side = (p.get("side") or info.get("direction") or info.get("posSide") or "").strip().lower()
if side not in ("long", "short"):
try:
pos_val = float(info.get("pos") or 0)
if pos_val > 0:
side = "long"
elif pos_val < 0:
side = "short"
except (TypeError, ValueError):
side = ""
rp = p.get("realizedPnl")
if rp is None:
rp = info.get("realizedPnl")
if rp is None:
rp = info.get("pnl")
try:
rp_f = float(rp) if rp is not None and str(rp).strip() != "" else None
except (TypeError, ValueError):
rp_f = None
close_ms = _coerce_ts_ms(p.get("lastUpdateTimestamp"))
if close_ms is None:
close_ms = _coerce_ts_ms(info.get("uTime"))
open_ms = _coerce_ts_ms(p.get("timestamp"))
if open_ms is None:
open_ms = _coerce_ts_ms(info.get("cTime"))
pos_id = str(info.get("posId") or "").strip()
inst_id = str(info.get("instId") or "").strip()
u_raw = info.get("uTime")
sync_key = pos_id or f"{inst_id}|{u_raw}|{side}"
return {
"symbol_u": _unified_symbol_for_match(sym),
"side": side,
"close_ms": close_ms,
"open_ms": open_ms,
"pnl": rp_f,
"sync_key": sync_key,
}
def fetch_okx_positions_close_history():
if not exchange_private_api_configured():
return []
ensure_markets_loaded()
since_ms = exchange_position_sync_since_ms()
out = []
page_limit = 100
max_total = int(EXCHANGE_POSITION_HISTORY_LIMIT)
before = None
while len(out) < max_total:
params = {"instType": OKX_POSITION_INST_TYPE}
if before is not None:
params["before"] = str(before)
try:
rows = exchange.fetch_positions_history(
None,
since=int(since_ms),
limit=page_limit,
params=params,
)
except Exception:
break
if not rows:
break
batch_min_u = None
for p in rows:
h = _normalize_okx_position_history_entry(p)
if h and h["close_ms"] and h["side"] in ("long", "short") and h["symbol_u"]:
out.append(h)
info = p.get("info") or {}
u = _coerce_ts_ms(info.get("uTime")) or _coerce_ts_ms(p.get("lastUpdateTimestamp"))
if u and (batch_min_u is None or u < batch_min_u):
batch_min_u = u
if len(rows) < page_limit or batch_min_u is None:
break
if before is not None and batch_min_u >= before:
break
before = batch_min_u
return out[:max_total]
def sync_trade_records_from_exchange(conn, force=False):
"""为未同步的 trade_records 回填 OKX 历史仓位中的已实现盈亏。返回统计 dict。"""
global _LAST_EXCHANGE_PNL_SYNC_AT
stats = {"ok": False, "hist_count": 0, "matched": 0, "pending": 0, "skipped": False}
if not exchange_private_api_configured():
stats["reason"] = "未配置 OKX_API_KEY / OKX_API_SECRET / OKX_API_PASSPHRASE"
return stats
now = time.time()
if not force and now - _LAST_EXCHANGE_PNL_SYNC_AT < 25.0:
stats["ok"] = True
stats["skipped"] = True
return stats
try:
hist = fetch_okx_positions_close_history()
except Exception as e:
stats["reason"] = str(e)
return stats
stats["hist_count"] = len(hist)
if not hist:
stats["ok"] = True
stats["reason"] = "交易所平仓历史为空(请检查 API 权限或 EXCHANGE_POSITION_SYNC_FROM_BJ"
return stats
candidates = conn.execute(
"""
SELECT id, symbol, direction, closed_at, closed_at_ms, opened_at, opened_at_ms
FROM trade_records
WHERE (exchange_sync_key IS NULL OR TRIM(exchange_sync_key) = '')
OR exchange_realized_pnl IS NULL
ORDER BY id DESC
LIMIT 200
"""
).fetchall()
stats["pending"] = len(candidates)
if not candidates:
stats["ok"] = True
_LAST_EXCHANGE_PNL_SYNC_AT = now
return stats
used = set()
matched = 0
for tr in candidates:
close_ms_trade = _to_ms_with_fallback(
tr["closed_at_ms"] if "closed_at_ms" in tr.keys() else None, tr["closed_at"]
) or opened_at_str_to_ms(tr["closed_at"])
open_ms_trade = _to_ms_with_fallback(
tr["opened_at_ms"] if "opened_at_ms" in tr.keys() else None, tr["opened_at"]
) or opened_at_str_to_ms(tr["opened_at"])
if close_ms_trade is None:
continue
best = None
best_d = None
for h in hist:
sk = h["sync_key"]
if not sk or sk in used:
continue
if h["symbol_u"] != _unified_symbol_for_match(tr["symbol"]):
continue
if h["side"] != (tr["direction"] or "long").strip().lower():
continue
cm = h["close_ms"]
if cm is None:
continue
if open_ms_trade is not None:
if cm < open_ms_trade - 15 * 60 * 1000:
continue
if cm > open_ms_trade + 15 * 86400 * 1000:
continue
else:
if abs(cm - close_ms_trade) > 3 * 86400 * 1000:
continue
d = abs(cm - close_ms_trade)
if best_d is None or d < best_d:
best_d = d
best = h
if best is None or best_d is None or best_d > 90 * 60 * 1000:
continue
sk = best["sync_key"]
if sk in used:
continue
eo = ms_to_app_local_str(best["open_ms"]) if best.get("open_ms") else None
ec = ms_to_app_local_str(best["close_ms"]) if best.get("close_ms") else None
pnl_val = best.get("pnl")
if pnl_val is None:
pnl_val = 0.0
conn.execute(
"""
UPDATE trade_records
SET exchange_realized_pnl = ?, exchange_opened_at = ?, exchange_closed_at = ?, exchange_sync_key = ?
WHERE id = ?
""",
(float(pnl_val), eo, ec, sk, int(tr["id"])),
)
used.add(sk)
matched += 1
stats["matched"] = matched
stats["ok"] = True
_LAST_EXCHANGE_PNL_SYNC_AT = now
try:
conn.commit()
except Exception:
pass
return stats
# 获取实时价格 # 获取实时价格
def get_price(symbol): def get_price(symbol):
try: try:
@@ -4750,6 +5040,12 @@ def render_main_page(page="trade"):
order_list = [] order_list = []
for o in raw_order_list: for o in raw_order_list:
order_list.append(enrich_order_item(row_to_dict(o), current_capital)) order_list.append(enrich_order_item(row_to_dict(o), current_capital))
exchange_pnl_sync = {}
if exchange_private_api_configured():
try:
exchange_pnl_sync = sync_trade_records_from_exchange(conn) or {}
except Exception as e:
exchange_pnl_sync = {"ok": False, "reason": str(e)}
raw_records = conn.execute( raw_records = conn.execute(
f"SELECT * FROM trade_records WHERE {sql_list_time_field('closed_at', 'created_at', 'opened_at')} >= ? " f"SELECT * FROM trade_records WHERE {sql_list_time_field('closed_at', 'created_at', 'opened_at')} >= ? "
f"AND {sql_list_time_field('closed_at', 'created_at', 'opened_at')} <= ? ORDER BY id DESC LIMIT 1000", f"AND {sql_list_time_field('closed_at', 'created_at', 'opened_at')} <= ? ORDER BY id DESC LIMIT 1000",
@@ -4844,10 +5140,24 @@ def render_main_page(page="trade"):
key_auto_min_planned_rr=KEY_AUTO_MIN_PLANNED_RR, key_auto_min_planned_rr=KEY_AUTO_MIN_PLANNED_RR,
kline_timeframe=KLINE_TIMEFRAME, kline_timeframe=KLINE_TIMEFRAME,
funding_usdt=funding_usdt, funding_usdt=funding_usdt,
exchange_pnl_sync=exchange_pnl_sync,
**strategy_extra, **strategy_extra,
) )
@app.route("/api/sync_exchange_pnl")
@login_required
def api_sync_exchange_pnl():
conn = get_db()
stats = sync_trade_records_from_exchange(conn, force=True)
try:
conn.commit()
except Exception:
pass
conn.close()
return jsonify(stats)
@app.route("/") @app.route("/")
@login_required @login_required
def index(): def index():
@@ -6027,7 +6337,8 @@ def export_trade_records():
rows = conn.execute( rows = conn.execute(
"SELECT id,symbol,monitor_type,key_signal_type,direction,trigger_price,stop_loss,initial_stop_loss,take_profit," "SELECT id,symbol,monitor_type,key_signal_type,direction,trigger_price,stop_loss,initial_stop_loss,take_profit,"
"margin_capital,leverage,pnl_amount,hold_seconds,hold_minutes,planned_rr,actual_rr,risk_amount," "margin_capital,leverage,pnl_amount,hold_seconds,hold_minutes,planned_rr,actual_rr,risk_amount,"
"opened_at,closed_at,result,miss_reason,entry_reason,reviewed_entry_reason,created_at " "opened_at,closed_at,result,miss_reason,entry_reason,reviewed_entry_reason,"
"exchange_realized_pnl,exchange_opened_at,exchange_closed_at,created_at "
f"FROM trade_records WHERE {sql_list_time_field('closed_at', 'created_at', 'opened_at')} >= ? " f"FROM trade_records WHERE {sql_list_time_field('closed_at', 'created_at', 'opened_at')} >= ? "
f"AND {sql_list_time_field('closed_at', 'created_at', 'opened_at')} <= ? ORDER BY id ASC", f"AND {sql_list_time_field('closed_at', 'created_at', 'opened_at')} <= ? ORDER BY id ASC",
(start_bj, end_bj), (start_bj, end_bj),
@@ -6038,7 +6349,7 @@ def export_trade_records():
"stop_loss_open_snapshot", "initial_stop_loss", "take_profit", "margin_capital", "leverage", "stop_loss_open_snapshot", "initial_stop_loss", "take_profit", "margin_capital", "leverage",
"pnl_amount", "hold_seconds", "hold_minutes", "planned_rr", "actual_rr", "risk_amount", "pnl_amount", "hold_seconds", "hold_minutes", "planned_rr", "actual_rr", "risk_amount",
"opened_at", "closed_at", "result", "miss_reason", "entry_reason", "reviewed_entry_reason", "opened_at", "closed_at", "result", "miss_reason", "entry_reason", "reviewed_entry_reason",
"created_at", "开仓类型", "exchange_realized_pnl", "exchange_opened_at", "exchange_closed_at", "created_at", "开仓类型",
] ]
data = [] data = []
for r in rows: for r in rows:
@@ -6052,6 +6363,9 @@ def export_trade_records():
snap, r["initial_stop_loss"], r["take_profit"], r["margin_capital"], r["leverage"], snap, r["initial_stop_loss"], r["take_profit"], r["margin_capital"], r["leverage"],
r["pnl_amount"], r["hold_seconds"], r["hold_minutes"], r["planned_rr"], r["actual_rr"], r["risk_amount"], r["pnl_amount"], r["hold_seconds"], r["hold_minutes"], r["planned_rr"], r["actual_rr"], r["risk_amount"],
r["opened_at"], r["closed_at"], r["result"], r["miss_reason"], r["entry_reason"], r["reviewed_entry_reason"], r["opened_at"], r["closed_at"], r["result"], r["miss_reason"], r["entry_reason"], r["reviewed_entry_reason"],
r["exchange_realized_pnl"] if "exchange_realized_pnl" in r.keys() else None,
r["exchange_opened_at"] if "exchange_opened_at" in r.keys() else None,
r["exchange_closed_at"] if "exchange_closed_at" in r.keys() else None,
r["created_at"], eff, r["created_at"], eff,
)) ))
day = app_now().strftime("%Y%m%d") day = app_now().strftime("%Y%m%d")
+7 -1
View File
@@ -70,9 +70,15 @@
## 与 Gate 的差异(其余) ## 与 Gate 的差异(其余)
- 无独立「关键位监控」导航页(斐波在 **交易执行** 页添加)。 - 无独立「关键位监控」导航页(斐波在 **交易执行** 页添加)。
- 无交易所已实现盈亏同步(`/api/sync_exchange_pnl`)。
- 箱体/收敛仍为 **提醒** 模式,不自动市价开仓(Gate/Binance 主站为自动开仓)。 - 箱体/收敛仍为 **提醒** 模式,不自动市价开仓(Gate/Binance 主站为自动开仓)。
## 交易所已实现盈亏(与 Gate 一致)
- 打开 **交易执行 / 交易记录** 等主页面时,若已配置 `OKX_API_KEY` / `OKX_API_SECRET` / `OKX_API_PASSPHRASE`(只读即可),同进程约 **25 秒**内最多调用一次 OKX **历史仓位**`fetch_positions_history`),为未写入 `exchange_sync_key` 的记录匹配并回填 `exchange_realized_pnl`
- 复盘列表盈亏优先展示交易所 U(旁标 **所**);本地公式估算标 **估**;人工复核优先。
- 手动强制同步:`GET /api/sync_exchange_pnl`(需登录)。
- 可选 `.env``EXCHANGE_POSITION_SYNC_FROM_BJ`(北京时间起点)、`EXCHANGE_POSITION_HISTORY_LIMIT`(默认 200)。
## 配置与部署 ## 配置与部署
- 详见 `.env.example` 中 OKX`OKX_*`)与通用风控项。 - 详见 `.env.example` 中 OKX`OKX_*`)与通用风控项。