diff --git a/docs/hub-symbol-archive-kline.md b/docs/hub-symbol-archive-kline.md index 693756e..2ce61ce 100644 --- a/docs/hub-symbol-archive-kline.md +++ b/docs/hub-symbol-archive-kline.md @@ -11,7 +11,7 @@ | 项 | 约定 | |----|------| | 列表粒度 | 一所一币一行 | -| 交易来源 | 四所 `trade_records`,经 `/api/hub/trades/archive` 拉取 | +| 交易来源 | 四所 `trade_records` + 未落库的 `strategy_trade_snapshots`(gate_bot 趋势漏记时补全),经 `/api/hub/trades/archive` 拉取 | | 筛选 | 交易所、有盈利单、有亏损单、犯病、情绪(中控 overlay) | | K 线真源 | 仅 **5m** 写入 `hub_symbol_archive.db` | | 建档种子 | 该币 **最早开仓** 向前 **30 天** 5m | diff --git a/hub_trades_lib.py b/hub_trades_lib.py index f98bb22..5351f6b 100644 --- a/hub_trades_lib.py +++ b/hub_trades_lib.py @@ -228,6 +228,195 @@ def _normalize_archive_trade_row( } +_SNAPSHOT_STATUS_TO_RESULT = { + "stopped_sl": "止损", + "stopped_tp": "止盈", + "stopped_manual": "手动平仓", + "stopped_external": "外部平仓", +} + + +def _table_columns(conn, table: str) -> set[str]: + try: + rows = conn.execute(f"PRAGMA table_info({table})").fetchall() + except Exception: + return set() + out: set[str] = set() + for r in rows: + try: + out.add(str(r[1])) + except (IndexError, TypeError): + try: + out.add(str(r["name"])) + except Exception: + continue + return out + + +def _archive_ts_expr(cols: set[str]) -> str: + parts = [c for c in ("reviewed_closed_at", "closed_at", "created_at", "opened_at") if c in cols] + if not parts: + return "''" + return f"REPLACE(COALESCE({', '.join(parts)}), 'T', ' ')" + + +def _archive_trade_select_sql(cols: set[str]) -> str: + wanted = [ + "id", + "symbol", + "direction", + "result", + "reviewed_result", + "pnl_amount", + "reviewed_pnl_amount", + "exchange_realized_pnl", + "closed_at", + "reviewed_closed_at", + "opened_at", + "reviewed_opened_at", + "opened_at_ms", + "closed_at_ms", + "created_at", + "monitor_type", + "actual_rr", + "planned_rr", + "trade_style", + "entry_reason", + "trigger_price", + "stop_loss", + "take_profit", + "reviewed_stop_loss", + "reviewed_take_profit", + "reviewed_at", + "trend_plan_id", + ] + select_cols = [c for c in wanted if c in cols] + if "id" not in select_cols: + select_cols = ["id"] + select_cols + return ", ".join(select_cols) + + +def _existing_trend_plan_ids(conn) -> set[int]: + cols = _table_columns(conn, "trade_records") + if "trend_plan_id" not in cols: + return set() + rows = conn.execute( + "SELECT DISTINCT trend_plan_id FROM trade_records WHERE trend_plan_id IS NOT NULL" + ).fetchall() + out: set[int] = set() + for row in rows: + d = _row_dict(row) + try: + out.add(int(d.get("trend_plan_id"))) + except (TypeError, ValueError): + continue + return out + + +def _normalize_snapshot_archive_row( + snap: dict, + *, + reset_hour: int = 8, +) -> dict[str, Any] | None: + result = str(snap.get("result_label") or "").strip() + if not result: + result = _SNAPSHOT_STATUS_TO_RESULT.get( + str(snap.get("status_at_close") or "").strip(), "" + ) + if result not in TRADE_COMPLETED_RESULTS: + return None + closed_at = snap.get("closed_at") + close_dt = parse_dt_for_trading_day(closed_at) + if not close_dt: + return None + opened_at = snap.get("opened_at") + opened_ms = _parse_ms_from_row(snap.get("opened_at")) + closed_ms = _parse_ms_from_row(closed_at) + try: + snap_id = int(snap.get("id")) + except (TypeError, ValueError): + return None + try: + pnl = float(snap.get("pnl_amount") or 0) + except (TypeError, ValueError): + pnl = 0.0 + st = str(snap.get("strategy_type") or "").strip() + monitor_type = "trend_pullback" if st == "trend_pullback" else ("roll" if st == "roll" else st) + return { + "id": -snap_id, + "symbol": (snap.get("symbol") or "").strip().upper(), + "direction": snap.get("direction"), + "result": result, + "pnl_amount": round(pnl, 4), + "closed_at": closed_at, + "opened_at": opened_at, + "opened_at_ms": opened_ms, + "closed_at_ms": closed_ms, + "monitor_type": monitor_type, + "entry_reason": "trend_pullback" if st == "trend_pullback" else monitor_type, + "from_snapshot": True, + "snapshot_id": snap_id, + "trend_plan_id": snap.get("source_id"), + "trading_day": trading_day_from_dt(close_dt, reset_hour), + } + + +def _parse_ms_from_row(raw: Any) -> int | None: + if raw in (None, ""): + return None + try: + if isinstance(raw, (int, float)): + v = int(raw) + return v if v > 1_000_000_000_000 else v * 1000 + except (TypeError, ValueError): + pass + dt = parse_dt_for_trading_day(raw) + return int(dt.timestamp() * 1000) if dt else None + + +def _fetch_strategy_snapshots_for_archive( + conn, + *, + days: int = 365, + reset_hour: int = 8, + limit: int = 2000, + skip_plan_ids: set[int] | None = None, +) -> list[dict[str, Any]]: + cols = _table_columns(conn, "strategy_trade_snapshots") + if not cols: + return [] + lim = max(1, min(int(limit or 2000), 5000)) + day_span = max(1, min(int(days or 365), 3650)) + cutoff = datetime.now() - timedelta(days=day_span) + cutoff_s = cutoff.strftime("%Y-%m-%d %H:%M:%S") + ts_expr = "REPLACE(COALESCE(closed_at, opened_at, created_at), 'T', ' ')" + rows = conn.execute( + f""" + SELECT * FROM strategy_trade_snapshots + WHERE {ts_expr} >= ? + ORDER BY {ts_expr} DESC + LIMIT ? + """, + (cutoff_s, lim * 2), + ).fetchall() + skip = skip_plan_ids or set() + out: list[dict[str, Any]] = [] + for row in rows: + d = _row_dict(row) + try: + source_id = int(d.get("source_id") or 0) + except (TypeError, ValueError): + source_id = 0 + if source_id > 0 and source_id in skip: + continue + norm = _normalize_snapshot_archive_row(d, reset_hour=reset_hour) + if norm: + out.append(norm) + if len(out) >= lim: + break + return out + + def fetch_trades_for_archive( conn, *, @@ -235,36 +424,60 @@ def fetch_trades_for_archive( row_to_dict_fn: Optional[Callable] = None, reset_hour: int = 8, limit: int = 2000, + include_strategy_snapshots: bool = True, ) -> list[dict[str, Any]]: - """返回近 N 天已平仓记录(供币种档案聚合)。""" + """返回近 N 天已平仓记录(trade_records + 未落库的 strategy 快照)。""" lim = max(1, min(int(limit or 2000), 5000)) day_span = max(1, min(int(days or 365), 3650)) cutoff = datetime.now() - timedelta(days=day_span) cutoff_s = cutoff.strftime("%Y-%m-%d %H:%M:%S") - ts_expr = "REPLACE(COALESCE(reviewed_closed_at, closed_at, created_at, opened_at), 'T', ' ')" - rows = conn.execute( - f""" - SELECT id, symbol, direction, result, reviewed_result, pnl_amount, reviewed_pnl_amount, - exchange_realized_pnl, closed_at, reviewed_closed_at, opened_at, reviewed_opened_at, - opened_at_ms, closed_at_ms, created_at, monitor_type, actual_rr, planned_rr, - trade_style, entry_reason, trigger_price, stop_loss, take_profit, - reviewed_stop_loss, reviewed_take_profit, reviewed_at - FROM trade_records - WHERE {ts_expr} >= ? - ORDER BY {ts_expr} DESC - LIMIT ? - """, - (cutoff_s, lim * 2), - ).fetchall() - out: list[dict[str, Any]] = [] - for row in rows: - d = _row_dict(row, row_to_dict_fn) - norm = _normalize_archive_trade_row(d, reset_hour=reset_hour) - if norm: - out.append(norm) - if len(out) >= lim: - break - return out + cols = _table_columns(conn, "trade_records") + if not cols: + records: list[dict[str, Any]] = [] + else: + ts_expr = _archive_ts_expr(cols) + sql = f""" + SELECT {_archive_trade_select_sql(cols)} + FROM trade_records + WHERE {ts_expr} >= ? + ORDER BY {ts_expr} DESC + LIMIT ? + """ + rows = conn.execute(sql, (cutoff_s, lim * 2)).fetchall() + records = [] + for row in rows: + d = _row_dict(row, row_to_dict_fn) + norm = _normalize_archive_trade_row(d, reset_hour=reset_hour) + if norm: + records.append(norm) + if len(records) >= lim: + break + + if not include_strategy_snapshots: + return records + + skip_ids = _existing_trend_plan_ids(conn) + for rec in records: + try: + pid = int(rec.get("trend_plan_id") or 0) + except (TypeError, ValueError): + pid = 0 + if pid > 0: + skip_ids.add(pid) + + snaps = _fetch_strategy_snapshots_for_archive( + conn, + days=days, + reset_hour=reset_hour, + limit=max(0, lim - len(records)), + skip_plan_ids=skip_ids, + ) + merged = records + snaps + merged.sort( + key=lambda x: int(x.get("closed_at_ms") or 0), + reverse=True, + ) + return merged[:lim] def summarize_trades(trades: list[dict]) -> dict[str, Any]: diff --git a/manual_trading_hub/hub.py b/manual_trading_hub/hub.py index a92df4c..32bb8ca 100644 --- a/manual_trading_hub/hub.py +++ b/manual_trading_hub/hub.py @@ -95,6 +95,7 @@ DIR = Path(__file__).resolve().parent HUB_BUILD = "20260607-hub-archive" _archive_sync_stop: asyncio.Event | None = None _archive_sync_task: asyncio.Task | None = None +_last_archive_sync: dict | None = None HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8")) HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10")) HUB_BOARD_TIMEOUT = float(os.getenv("HUB_BOARD_TIMEOUT", "45")) @@ -239,6 +240,7 @@ def _schedule_board_refresh() -> None: async def _run_archive_sync_once() -> dict: + global _last_archive_sync init_archive_db() settings = load_settings() targets = enabled_exchanges(settings) @@ -254,11 +256,25 @@ async def _run_archive_sync_once() -> dict: limit=ARCHIVE_TRADE_LIMIT, ) if not trades_resp.get("ok"): + st = trades_resp.get("status") + msg = ( + trades_resp.get("msg") + or trades_resp.get("error") + or trades_resp.get("detail") + or "拉取交易失败" + ) + if st == 404: + msg = ( + "HTTP 404:该 Flask 未注册 /api/hub/trades/archive。" + "请在仓库根目录 git pull 后 pm2 restart crypto_gate crypto_gate_bot" + ) results.append( { "exchange_key": ex_key, + "name": ex.get("name"), "ok": False, - "msg": trades_resp.get("msg") or trades_resp.get("error") or "拉取交易失败", + "status": st, + "msg": msg, } ) continue @@ -282,8 +298,17 @@ async def _run_archive_sync_once() -> dict: trades, remote_fetch, ) + r["name"] = ex.get("name") + r["trade_count"] = len(trades) results.append(r) - return {"ok": True, "exchanges": len(targets), "results": results} + out = { + "ok": True, + "exchanges": len(targets), + "results": results, + "updated_at": __import__("datetime").datetime.now().isoformat(timespec="seconds"), + } + _last_archive_sync = out + return out async def _archive_sync_loop() -> None: @@ -549,9 +574,13 @@ def _fetch_instance_trades_archive_sync( if r.status_code >= 400: parsed = _parse_http_json_body(r) parsed.setdefault("ok", False) + parsed.setdefault("status", r.status_code) return parsed data = r.json() if r.content else {} - return data if isinstance(data, dict) else {"ok": False, "msg": "无效 JSON"} + if isinstance(data, dict): + data.setdefault("ok", True) + return data + return {"ok": False, "msg": "无效 JSON"} except Exception as e: return {"ok": False, "msg": str(e)} @@ -1662,6 +1691,7 @@ def api_archive_meta(): "sync_interval_sec": ARCHIVE_SYNC_INTERVAL_SEC, "visible_bars_default": ARCHIVE_VISIBLE_BARS_DEFAULT, "exchanges": exchanges, + "last_sync": _last_archive_sync, } diff --git a/manual_trading_hub/static/archive.js b/manual_trading_hub/static/archive.js index e1950b2..13bd70d 100644 --- a/manual_trading_hub/static/archive.js +++ b/manual_trading_hub/static/archive.js @@ -410,6 +410,9 @@ const r = await apiFetch("/api/archive/meta"); meta = await r.json(); timeframe = (meta && meta.default_timeframe) || "15m"; + if (meta && meta.last_sync && elStatus && !elStatus.textContent) { + setStatus(formatSyncSummary(meta.last_sync)); + } renderExchangeOptions(); if (elTfTabs) { elTfTabs.querySelectorAll(".archive-tf-btn").forEach(function (btn) { @@ -418,16 +421,30 @@ } } + function formatSyncSummary(j) { + const results = j.results || []; + const okN = results.filter(function (x) { + return x.ok !== false; + }).length; + const parts = ["同步完成 · " + okN + "/" + (j.exchanges || 0) + " 所"]; + results.forEach(function (row) { + const label = row.exchange_key || row.name || "?"; + if (row.ok === false) { + parts.push(label + " 失败: " + (row.msg || "未知错误")); + } else { + parts.push(label + " " + (row.trade_count != null ? row.trade_count : row.trades || 0) + " 笔"); + } + }); + return parts.join(" · "); + } + async function syncAll() { setStatus("同步中(可能需数分钟)…"); elBtnSync && (elBtnSync.disabled = true); try { const r = await apiFetch("/api/archive/sync", { method: "POST" }); const j = await r.json(); - const okN = (j.results || []).filter(function (x) { - return x.ok !== false; - }).length; - setStatus("同步完成 · " + okN + "/" + (j.exchanges || 0) + " 所"); + setStatus(formatSyncSummary(j)); await loadList(); if (selected) await openDetail(selected.exchange_key, selected.symbol); } catch (e) { diff --git a/manual_trading_hub/static/index.html b/manual_trading_hub/static/index.html index f52c2d4..fdc860f 100644 --- a/manual_trading_hub/static/index.html +++ b/manual_trading_hub/static/index.html @@ -349,7 +349,7 @@
- +