"""中控币种档案:永久 5m K 线库(建档种子 + 4h 增量),交易缓存与 overlay。""" from __future__ import annotations import json import os import sqlite3 import time from datetime import datetime from pathlib import Path from typing import Any, Callable, Optional from hub_ohlcv_lib import ( TIMEFRAME_MS, aggregate_ohlcv_bars, normalize_chart_timeframe, ) ARCHIVE_TIMEFRAMES = frozenset({"5m", "15m", "1h", "4h"}) ARCHIVE_DEFAULT_TIMEFRAME = "15m" ARCHIVE_SEED_LOOKBACK_DAYS = 30 ARCHIVE_VISIBLE_BARS_DEFAULT = 200 ARCHIVE_MAX_CANDLES: dict[str, int] = { "5m": 4000, "15m": 2500, "1h": 1200, "4h": 600, } ARCHIVE_SYNC_INTERVAL_SEC = int(os.getenv("HUB_ARCHIVE_SYNC_INTERVAL_SEC", str(4 * 3600))) ARCHIVE_TRADE_DAYS = int(os.getenv("HUB_ARCHIVE_TRADE_DAYS", "365")) ARCHIVE_TRADE_LIMIT = int(os.getenv("HUB_ARCHIVE_TRADE_LIMIT", "2000")) BEHAVIOR_TAGS = frozenset({"", "sick", "emotion"}) def default_db_path() -> Path: raw = (os.getenv("HUB_ARCHIVE_DB_PATH") or "").strip() if raw: return Path(raw) hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data" hub_dir.mkdir(parents=True, exist_ok=True) return hub_dir / "hub_symbol_archive.db" def _connect(db_path: Path | None = None) -> sqlite3.Connection: path = db_path or default_db_path() path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(path), timeout=30, isolation_level=None) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") return conn def init_db(db_path: Path | None = None) -> None: conn = _connect(db_path) try: conn.execute( """ CREATE TABLE IF NOT EXISTS archive_meta ( exchange_key TEXT NOT NULL, symbol TEXT NOT NULL, first_trade_opened_ms INTEGER, archive_started_at INTEGER NOT NULL, last_kline_sync_ms INTEGER, last_trade_sync_ms INTEGER, seed_complete INTEGER NOT NULL DEFAULT 0, PRIMARY KEY (exchange_key, symbol) ) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS archive_bars_5m ( exchange_key TEXT NOT NULL, symbol TEXT NOT NULL, open_time_ms INTEGER NOT NULL, open REAL NOT NULL, high REAL NOT NULL, low REAL NOT NULL, close REAL NOT NULL, volume REAL NOT NULL DEFAULT 0, updated_at INTEGER NOT NULL, PRIMARY KEY (exchange_key, symbol, open_time_ms) ) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_archive_bars_series ON archive_bars_5m (exchange_key, symbol, open_time_ms) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS archive_trade_cache ( exchange_key TEXT NOT NULL, trade_id INTEGER NOT NULL, symbol TEXT NOT NULL, direction TEXT, result TEXT, pnl_amount REAL, opened_at TEXT, closed_at TEXT, opened_at_ms INTEGER, closed_at_ms INTEGER, monitor_type TEXT, entry_reason TEXT, payload_json TEXT, synced_at INTEGER NOT NULL, PRIMARY KEY (exchange_key, trade_id) ) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_archive_trades_sym ON archive_trade_cache (exchange_key, symbol, closed_at_ms) """ ) conn.execute( """ CREATE TABLE IF NOT EXISTS trade_overlay ( exchange_key TEXT NOT NULL, trade_id INTEGER NOT NULL, behavior_tag TEXT NOT NULL DEFAULT '', note TEXT NOT NULL DEFAULT '', updated_at INTEGER NOT NULL, PRIMARY KEY (exchange_key, trade_id) ) """ ) finally: conn.close() def _now_ms() -> int: return int(time.time() * 1000) def _parse_dt_ms(raw: Any) -> int | None: if raw in (None, ""): return None try: if isinstance(raw, (int, float)): v = int(raw) return v if v > 1_000_000_000_000 else v * 1000 except (TypeError, ValueError): pass s = str(raw).strip().replace("Z", "").replace("T", " ") if not s: return None for fmt, ln in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d %H:%M", 16), ("%Y-%m-%d", 10)): try: dt = datetime.strptime(s[:ln], fmt) return int(dt.timestamp() * 1000) except ValueError: continue return None def upsert_trades_cache( exchange_key: str, trades: list[dict[str, Any]], *, db_path: Path | None = None, ) -> int: init_db(db_path) ex_k = (exchange_key or "").strip().lower() if not ex_k: return 0 now = _now_ms() n = 0 conn = _connect(db_path) try: for t in trades or []: try: tid = int(t.get("id")) except (TypeError, ValueError): continue sym = (t.get("symbol") or "").strip().upper() if not sym: continue payload = {k: t.get(k) for k in t.keys()} conn.execute( """ INSERT INTO archive_trade_cache ( exchange_key, trade_id, symbol, direction, result, pnl_amount, opened_at, closed_at, opened_at_ms, closed_at_ms, monitor_type, entry_reason, payload_json, synced_at ) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?) ON CONFLICT(exchange_key, trade_id) DO UPDATE SET symbol=excluded.symbol, direction=excluded.direction, result=excluded.result, pnl_amount=excluded.pnl_amount, opened_at=excluded.opened_at, closed_at=excluded.closed_at, opened_at_ms=excluded.opened_at_ms, closed_at_ms=excluded.closed_at_ms, monitor_type=excluded.monitor_type, entry_reason=excluded.entry_reason, payload_json=excluded.payload_json, synced_at=excluded.synced_at """, ( ex_k, tid, sym, t.get("direction"), t.get("result"), float(t.get("pnl_amount") or 0), t.get("opened_at"), t.get("closed_at"), t.get("opened_at_ms") or _parse_dt_ms(t.get("opened_at")), t.get("closed_at_ms") or _parse_dt_ms(t.get("closed_at")), t.get("monitor_type"), t.get("entry_reason"), json.dumps(payload, ensure_ascii=False, default=str), now, ), ) n += 1 finally: conn.close() return n def _trade_row_to_dict(row: sqlite3.Row, overlay: dict | None = None) -> dict[str, Any]: d = dict(row) payload = {} raw = d.pop("payload_json", None) if raw: try: payload = json.loads(raw) except (json.JSONDecodeError, TypeError): payload = {} out = {**payload, **{k: d[k] for k in d.keys() if k not in payload}} ov = overlay or {} out["behavior_tag"] = ov.get("behavior_tag") or "" out["note"] = ov.get("note") or "" out["trade_id"] = out.get("trade_id") or out.get("id") return out def load_overlays( exchange_key: str, trade_ids: list[int] | None = None, *, db_path: Path | None = None, ) -> dict[int, dict[str, Any]]: ex_k = (exchange_key or "").strip().lower() conn = _connect(db_path) try: if trade_ids: placeholders = ",".join("?" * len(trade_ids)) rows = conn.execute( f""" SELECT exchange_key, trade_id, behavior_tag, note, updated_at FROM trade_overlay WHERE exchange_key=? AND trade_id IN ({placeholders}) """, (ex_k, *trade_ids), ).fetchall() else: rows = conn.execute( """ SELECT exchange_key, trade_id, behavior_tag, note, updated_at FROM trade_overlay WHERE exchange_key=? """, (ex_k,), ).fetchall() return { int(r["trade_id"]): { "behavior_tag": r["behavior_tag"] or "", "note": r["note"] or "", "updated_at": r["updated_at"], } for r in rows } finally: conn.close() def upsert_trade_overlay( exchange_key: str, trade_id: int, *, behavior_tag: str | None = None, note: str | None = None, db_path: Path | None = None, ) -> dict[str, Any]: init_db(db_path) ex_k = (exchange_key or "").strip().lower() tid = int(trade_id) tag = (behavior_tag or "").strip().lower() if tag not in BEHAVIOR_TAGS: tag = "" note_text = (note or "").strip()[:2000] now = _now_ms() conn = _connect(db_path) try: conn.execute( """ INSERT INTO trade_overlay (exchange_key, trade_id, behavior_tag, note, updated_at) VALUES (?,?,?,?,?) ON CONFLICT(exchange_key, trade_id) DO UPDATE SET behavior_tag=excluded.behavior_tag, note=excluded.note, updated_at=excluded.updated_at """, (ex_k, tid, tag, note_text, now), ) finally: conn.close() return {"exchange_key": ex_k, "trade_id": tid, "behavior_tag": tag, "note": note_text} def list_symbol_rows( *, exchange_key: str = "", filter_profit: bool = False, filter_loss: bool = False, filter_sick: bool = False, filter_emotion: bool = False, db_path: Path | None = None, ) -> list[dict[str, Any]]: """一所一币一行汇总。""" init_db(db_path) conn = _connect(db_path) try: params: list[Any] = [] where = "1=1" ex_filter = (exchange_key or "").strip().lower() if ex_filter: where += " AND t.exchange_key=?" params.append(ex_filter) rows = conn.execute( f""" SELECT t.exchange_key, t.symbol, COUNT(*) AS trade_count, SUM(CASE WHEN t.pnl_amount > 0.0001 THEN 1 ELSE 0 END) AS win_count, SUM(CASE WHEN t.pnl_amount < -0.0001 THEN 1 ELSE 0 END) AS loss_count, SUM(COALESCE(t.pnl_amount, 0)) AS total_pnl, MIN(COALESCE(t.opened_at_ms, 0)) AS first_opened_ms, MAX(COALESCE(t.closed_at_ms, 0)) AS last_closed_ms FROM archive_trade_cache t WHERE {where} GROUP BY t.exchange_key, t.symbol ORDER BY last_closed_ms DESC """, params, ).fetchall() overlays_by_ex: dict[str, dict[int, dict]] = {} out: list[dict[str, Any]] = [] for r in rows: ex_k = r["exchange_key"] sym = r["symbol"] if ex_k not in overlays_by_ex: overlays_by_ex[ex_k] = load_overlays(ex_k, db_path=db_path) trade_rows = conn.execute( """ SELECT trade_id, pnl_amount FROM archive_trade_cache WHERE exchange_key=? AND symbol=? """, (ex_k, sym), ).fetchall() has_profit = any(float(x["pnl_amount"] or 0) > 0.0001 for x in trade_rows) has_loss = any(float(x["pnl_amount"] or 0) < -0.0001 for x in trade_rows) has_sick = False has_emotion = False ov_map = overlays_by_ex.get(ex_k) or {} for tr in trade_rows: ov = ov_map.get(int(tr["trade_id"])) or {} if ov.get("behavior_tag") == "sick": has_sick = True if ov.get("behavior_tag") == "emotion": has_emotion = True if filter_profit and not has_profit: continue if filter_loss and not has_loss: continue if filter_sick and not has_sick: continue if filter_emotion and not has_emotion: continue meta = conn.execute( "SELECT seed_complete, last_kline_sync_ms FROM archive_meta WHERE exchange_key=? AND symbol=?", (ex_k, sym), ).fetchone() out.append( { "exchange_key": ex_k, "symbol": sym, "trade_count": int(r["trade_count"] or 0), "win_count": int(r["win_count"] or 0), "loss_count": int(r["loss_count"] or 0), "total_pnl": round(float(r["total_pnl"] or 0), 4), "first_opened_ms": int(r["first_opened_ms"] or 0) or None, "last_closed_ms": int(r["last_closed_ms"] or 0) or None, "seed_complete": bool(meta["seed_complete"]) if meta else False, "last_kline_sync_ms": int(meta["last_kline_sync_ms"] or 0) if meta else None, } ) return out finally: conn.close() def load_symbol_trades( exchange_key: str, symbol: str, *, db_path: Path | None = None, ) -> list[dict[str, Any]]: ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() conn = _connect(db_path) try: rows = conn.execute( """ SELECT * FROM archive_trade_cache WHERE exchange_key=? AND symbol=? ORDER BY COALESCE(closed_at_ms, 0) DESC, trade_id DESC """, (ex_k, sym), ).fetchall() ids = [int(r["trade_id"]) for r in rows] ov = load_overlays(ex_k, ids, db_path=db_path) return [_trade_row_to_dict(r, ov.get(int(r["trade_id"]))) for r in rows] finally: conn.close() def upsert_bars_5m( exchange_key: str, symbol: str, bars: list[dict[str, Any]], *, db_path: Path | None = None, ) -> int: init_db(db_path) ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() now = _now_ms() n = 0 conn = _connect(db_path) try: for b in bars or []: try: conn.execute( """ INSERT INTO archive_bars_5m ( exchange_key, symbol, open_time_ms, open, high, low, close, volume, updated_at ) VALUES (?,?,?,?,?,?,?,?,?) ON CONFLICT(exchange_key, symbol, open_time_ms) DO UPDATE SET open=excluded.open, high=excluded.high, low=excluded.low, close=excluded.close, volume=excluded.volume, updated_at=excluded.updated_at """, ( ex_k, sym, int(b["open_time_ms"]), float(b["open"]), float(b["high"]), float(b["low"]), float(b["close"]), float(b.get("volume") or 0), now, ), ) n += 1 except (KeyError, TypeError, ValueError): continue finally: conn.close() return n def load_bars_5m_range( exchange_key: str, symbol: str, start_ms: int, end_ms: int, *, db_path: Path | None = None, ) -> list[dict[str, Any]]: ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() conn = _connect(db_path) try: rows = conn.execute( """ SELECT open_time_ms, open, high, low, close, volume FROM archive_bars_5m WHERE exchange_key=? AND symbol=? AND open_time_ms >= ? AND open_time_ms <= ? ORDER BY open_time_ms ASC """, (ex_k, sym, int(start_ms), int(end_ms)), ).fetchall() return [ { "open_time_ms": int(r["open_time_ms"]), "open": float(r["open"]), "high": float(r["high"]), "low": float(r["low"]), "close": float(r["close"]), "volume": float(r["volume"] or 0), } for r in rows ] finally: conn.close() def _to_candles(bars: list[dict[str, Any]]) -> list[dict[str, Any]]: out = [] for b in bars or []: try: out.append( { "time": int(b["open_time_ms"] // 1000), "open": float(b["open"]), "high": float(b["high"]), "low": float(b["low"]), "close": float(b["close"]), "volume": float(b.get("volume") or 0), } ) except (KeyError, TypeError, ValueError): continue return out def _snap_to_bar_grid(ts_ms: int, origin_ms: int, step_ms: int) -> int: step = max(1, int(step_ms)) origin = int(origin_ms) if ts_ms <= origin: return origin idx = (int(ts_ms) - origin + step - 1) // step return origin + idx * step def _fill_missing_bars( bars: list[dict[str, Any]], period_ms: int, start_ms: int, end_ms: int, ) -> list[dict[str, Any]]: """5m 缺口用上一根收盘价填平,保证聚合后 K 线时间轴连续。""" by_ts: dict[int, dict[str, Any]] = {} for b in bars or []: try: by_ts[int(b["open_time_ms"])] = b except (KeyError, TypeError, ValueError): continue if not by_ts: return [] keys = sorted(by_ts.keys()) step_ms = max(1, int(period_ms)) origin = keys[0] aligned_start = _snap_to_bar_grid(int(start_ms), origin, step_ms) aligned_end = max(int(end_ms), keys[-1]) out: list[dict[str, Any]] = [] last: dict[str, Any] | None = None for ts_key in keys: if ts_key <= aligned_start: last = by_ts[ts_key] ts = aligned_start while ts <= aligned_end: cur = by_ts.get(ts) if cur is not None: last = cur out.append(cur) elif last is not None: c = float(last["close"]) out.append( { "open_time_ms": ts, "open": c, "high": c, "low": c, "close": c, "volume": 0.0, "filled": True, } ) ts += step_ms return out def _archive_earliest_bar_ms( exchange_key: str, symbol: str, *, db_path: Path | None = None, ) -> int | None: ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() conn = _connect(db_path) try: row = conn.execute( "SELECT MIN(open_time_ms) AS mn FROM archive_bars_5m WHERE exchange_key=? AND symbol=?", (ex_k, sym), ).fetchone() if row and row["mn"] is not None: return int(row["mn"]) finally: conn.close() return None def _trim_bars_for_cap( bars: list[dict[str, Any]], *, end_ms: int, max_n: int, ) -> list[dict[str, Any]]: """超长时优先保留到平仓,再从最古老端截断。""" if len(bars) <= max_n: return bars cut_end = len(bars) for i in range(len(bars) - 1, -1, -1): if int(bars[i]["open_time_ms"]) <= int(end_ms): cut_end = i + 1 break essential = bars[:cut_end] if len(essential) <= max_n: return essential return essential[len(essential) - max_n :] def resolve_archive_chart( exchange_key: str, symbol: str, timeframe: str = ARCHIVE_DEFAULT_TIMEFRAME, *, anchor_ms: int | None = None, opened_ms: int | None = None, closed_ms: int | None = None, mode: str = "hold", bars: int = ARCHIVE_VISIBLE_BARS_DEFAULT, range_mode: str = "window", db_path: Path | None = None, ) -> dict[str, Any]: """从永久 5m 库聚合出档案 K 线视窗。 range_mode=history:建档起点 → 平仓(不含「到现在」),供拖动/缩放查看建仓前全局形态。 """ tf = normalize_chart_timeframe(timeframe, default=ARCHIVE_DEFAULT_TIMEFRAME) if tf not in ARCHIVE_TIMEFRAMES: return {"ok": False, "msg": f"档案仅支持 {', '.join(sorted(ARCHIVE_TIMEFRAMES))}"} ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() if not ex_k or not sym: return {"ok": False, "msg": "缺少 exchange_key 或 symbol"} period = TIMEFRAME_MS[tf] period_5m = TIMEFRAME_MS["5m"] hold_open = int(opened_ms) if opened_ms else None hold_close = int(closed_ms) if closed_ms else None rm = (range_mode or "window").strip().lower() if hold_open and hold_close and hold_close >= hold_open and rm == "history": seed_back = max(0, hold_open - ARCHIVE_SEED_LOOKBACK_DAYS * 86400000) earliest = _archive_earliest_bar_ms(ex_k, sym, db_path=db_path) if earliest is not None: start_ms = min(earliest, seed_back) else: start_ms = seed_back end_ms = hold_close + max(period * 16, period_5m * 8) anchor = hold_close if (mode or "hold").strip().lower() != "entry" else hold_open elif hold_open and hold_close and hold_close >= hold_open: hold_len = hold_close - hold_open pad = max(period * 24, hold_len // 3, period_5m * 12) start_ms = max(0, hold_open - pad) end_ms = hold_close + pad anchor = hold_close if (mode or "hold").strip().lower() != "entry" else hold_open else: visible = max(50, min(int(bars or ARCHIVE_VISIBLE_BARS_DEFAULT), 500)) anchor = int(anchor_ms) if anchor_ms else _now_ms() half = visible // 2 start_ms = max(0, anchor - half * period) end_ms = anchor + half * period raw_5m = load_bars_5m_range( ex_k, sym, start_ms - period_5m * 6, end_ms + period_5m * 6, db_path=db_path, ) if not raw_5m: return {"ok": False, "msg": "档案库暂无 K 线,请等待同步或手动刷新"} filled_5m = _fill_missing_bars(raw_5m, period_5m, start_ms - period_5m * 2, end_ms + period_5m * 2) if tf == "5m": merged = [b for b in filled_5m if start_ms <= int(b["open_time_ms"]) <= end_ms] else: agg = aggregate_ohlcv_bars(filled_5m, tf) merged = [b for b in agg if start_ms <= int(b["open_time_ms"]) <= end_ms] max_n = ARCHIVE_MAX_CANDLES.get(tf, 2000) if rm == "history" and merged: merged = _trim_bars_for_cap(merged, end_ms=end_ms, max_n=max_n) candles = _to_candles(merged) if not candles: return {"ok": False, "msg": "视窗内无 K 线"} return { "ok": True, "exchange_key": ex_k, "symbol": sym, "timeframe": tf, "mode": (mode or "hold").strip().lower(), "range_mode": rm, "anchor_ms": anchor, "opened_ms": hold_open, "closed_ms": hold_close, "window_start_ms": start_ms, "window_end_ms": end_ms, "candles": candles, "bar_count": len(candles), "gaps_filled": sum(1 for b in filled_5m if b.get("filled")), } def _ensure_meta( exchange_key: str, symbol: str, first_opened_ms: int | None, *, db_path: Path | None = None, ) -> None: ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() now = _now_ms() conn = _connect(db_path) try: row = conn.execute( "SELECT first_trade_opened_ms FROM archive_meta WHERE exchange_key=? AND symbol=?", (ex_k, sym), ).fetchone() if row: if first_opened_ms and ( not row["first_trade_opened_ms"] or int(first_opened_ms) < int(row["first_trade_opened_ms"]) ): conn.execute( """ UPDATE archive_meta SET first_trade_opened_ms=? WHERE exchange_key=? AND symbol=? """, (int(first_opened_ms), ex_k, sym), ) return conn.execute( """ INSERT INTO archive_meta ( exchange_key, symbol, first_trade_opened_ms, archive_started_at, last_kline_sync_ms, last_trade_sync_ms, seed_complete ) VALUES (?,?,?,?,?,?,0) """, (ex_k, sym, int(first_opened_ms) if first_opened_ms else None, now, None, None), ) finally: conn.close() def _mark_meta_sync( exchange_key: str, symbol: str, *, kline_ms: int | None = None, trade_ms: int | None = None, seed_complete: bool | None = None, db_path: Path | None = None, ) -> None: ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() conn = _connect(db_path) try: sets = [] params: list[Any] = [] if kline_ms is not None: sets.append("last_kline_sync_ms=?") params.append(int(kline_ms)) if trade_ms is not None: sets.append("last_trade_sync_ms=?") params.append(int(trade_ms)) if seed_complete is not None: sets.append("seed_complete=?") params.append(1 if seed_complete else 0) if not sets: return params.extend([ex_k, sym]) conn.execute( f"UPDATE archive_meta SET {', '.join(sets)} WHERE exchange_key=? AND symbol=?", params, ) finally: conn.close() def fetch_remote_5m_range( remote_fetch: Callable[..., dict[str, Any]], symbol: str, start_ms: int, end_ms: int, ) -> list[dict[str, Any]]: """经实例 /api/hub/ohlcv 分页拉取 5m。""" period = TIMEFRAME_MS["5m"] since = max(0, int(start_ms)) end = int(end_ms) merged: dict[int, dict[str, Any]] = {} guard = 0 while since < end and guard < 120: guard += 1 remote = remote_fetch(symbol=symbol, timeframe="5m", since_ms=since, limit=500) if not remote.get("ok"): break batch = remote.get("bars") or [] if not batch: break for b in batch: try: ts = int(b["open_time_ms"]) merged[ts] = b except (KeyError, TypeError, ValueError): continue last_ts = max(int(b["open_time_ms"]) for b in batch) next_since = last_ts + period if next_since <= since: break since = next_since if last_ts >= end: break return [merged[k] for k in sorted(merged.keys()) if start_ms <= k <= end] def seed_symbol_archive( exchange_key: str, symbol: str, first_opened_ms: int, remote_fetch: Callable[..., dict[str, Any]], *, db_path: Path | None = None, ) -> dict[str, Any]: """建档:最早开仓向前 30 天 5m 种子。""" init_db(db_path) ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() anchor = int(first_opened_ms) start_ms = max(0, anchor - ARCHIVE_SEED_LOOKBACK_DAYS * 86400000) end_ms = _now_ms() _ensure_meta(ex_k, sym, anchor, db_path=db_path) bars = fetch_remote_5m_range(remote_fetch, sym, start_ms, end_ms) n = upsert_bars_5m(ex_k, sym, bars, db_path=db_path) now = _now_ms() _mark_meta_sync(ex_k, sym, kline_ms=now, seed_complete=True, db_path=db_path) return {"ok": True, "seed_bars": n, "start_ms": start_ms, "end_ms": end_ms} def sync_symbol_klines_incremental( exchange_key: str, symbol: str, remote_fetch: Callable[..., dict[str, Any]], *, db_path: Path | None = None, ) -> dict[str, Any]: """增量补 5m 至当前。""" init_db(db_path) ex_k = (exchange_key or "").strip().lower() sym = (symbol or "").strip().upper() conn = _connect(db_path) try: row = conn.execute( "SELECT MAX(open_time_ms) AS mx FROM archive_bars_5m WHERE exchange_key=? AND symbol=?", (ex_k, sym), ).fetchone() last_bar = int(row["mx"]) if row and row["mx"] else None finally: conn.close() period = TIMEFRAME_MS["5m"] start_ms = max(0, (last_bar + period) if last_bar else 0) end_ms = _now_ms() if start_ms >= end_ms - period: return {"ok": True, "appended": 0, "skipped": True} bars = fetch_remote_5m_range(remote_fetch, sym, start_ms, end_ms) n = upsert_bars_5m(ex_k, sym, bars, db_path=db_path) now = _now_ms() _mark_meta_sync(ex_k, sym, kline_ms=now, db_path=db_path) return {"ok": True, "appended": n, "start_ms": start_ms, "end_ms": end_ms} def sync_exchange_symbol_archives( exchange_key: str, trades: list[dict[str, Any]], remote_fetch: Callable[..., dict[str, Any]], *, db_path: Path | None = None, ) -> dict[str, Any]: """同步单所:交易缓存 + 各币种 K 线种子/增量。""" ex_k = (exchange_key or "").strip().lower() upsert_trades_cache(ex_k, trades, db_path=db_path) by_sym: dict[str, int] = {} for t in trades or []: sym = (t.get("symbol") or "").strip().upper() if not sym: continue oms = t.get("opened_at_ms") or _parse_dt_ms(t.get("opened_at")) if oms: cur = by_sym.get(sym) if cur is None or int(oms) < cur: by_sym[sym] = int(oms) seeded = 0 appended = 0 for sym, first_ms in by_sym.items(): _ensure_meta(ex_k, sym, first_ms, db_path=db_path) conn = _connect(db_path) try: meta = conn.execute( "SELECT seed_complete FROM archive_meta WHERE exchange_key=? AND symbol=?", (ex_k, sym), ).fetchone() finally: conn.close() if not meta or not int(meta["seed_complete"] or 0): r = seed_symbol_archive(ex_k, sym, first_ms, remote_fetch, db_path=db_path) seeded += int(r.get("seed_bars") or 0) else: r = sync_symbol_klines_incremental(ex_k, sym, remote_fetch, db_path=db_path) appended += int(r.get("appended") or 0) return { "ok": True, "exchange_key": ex_k, "symbols": len(by_sym), "seed_bars": seeded, "appended_bars": appended, "trades": len(trades or []), }