Files
crypto_monitor/hub_symbol_archive_lib.py
T
dekun 92ff945d72 fix(hub): correct _mark_meta_sync NameError in archive seed
seed_symbol_archive and sync_symbol_klines_incremental called undefined _mark_meta.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-07 23:05:25 +08:00

812 lines
26 KiB
Python

"""中控币种档案:永久 5m K 线库(建档种子 + 4h 增量),交易缓存与 overlay。"""
from __future__ import annotations
import json
import os
import sqlite3
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Callable, Optional
from hub_ohlcv_lib import (
TIMEFRAME_MS,
aggregate_ohlcv_bars,
normalize_chart_timeframe,
)
ARCHIVE_TIMEFRAMES = frozenset({"5m", "15m", "1h", "4h"})
ARCHIVE_DEFAULT_TIMEFRAME = "15m"
ARCHIVE_SEED_LOOKBACK_DAYS = 30
ARCHIVE_VISIBLE_BARS_DEFAULT = 200
ARCHIVE_SYNC_INTERVAL_SEC = int(os.getenv("HUB_ARCHIVE_SYNC_INTERVAL_SEC", str(4 * 3600)))
ARCHIVE_TRADE_DAYS = int(os.getenv("HUB_ARCHIVE_TRADE_DAYS", "365"))
ARCHIVE_TRADE_LIMIT = int(os.getenv("HUB_ARCHIVE_TRADE_LIMIT", "2000"))
BEHAVIOR_TAGS = frozenset({"", "sick", "emotion"})
def default_db_path() -> Path:
raw = (os.getenv("HUB_ARCHIVE_DB_PATH") or "").strip()
if raw:
return Path(raw)
hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data"
hub_dir.mkdir(parents=True, exist_ok=True)
return hub_dir / "hub_symbol_archive.db"
def _connect(db_path: Path | None = None) -> sqlite3.Connection:
path = db_path or default_db_path()
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path), timeout=30, isolation_level=None)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def init_db(db_path: Path | None = None) -> None:
conn = _connect(db_path)
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS archive_meta (
exchange_key TEXT NOT NULL,
symbol TEXT NOT NULL,
first_trade_opened_ms INTEGER,
archive_started_at INTEGER NOT NULL,
last_kline_sync_ms INTEGER,
last_trade_sync_ms INTEGER,
seed_complete INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (exchange_key, symbol)
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS archive_bars_5m (
exchange_key TEXT NOT NULL,
symbol TEXT NOT NULL,
open_time_ms INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume REAL NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL,
PRIMARY KEY (exchange_key, symbol, open_time_ms)
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_archive_bars_series
ON archive_bars_5m (exchange_key, symbol, open_time_ms)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS archive_trade_cache (
exchange_key TEXT NOT NULL,
trade_id INTEGER NOT NULL,
symbol TEXT NOT NULL,
direction TEXT,
result TEXT,
pnl_amount REAL,
opened_at TEXT,
closed_at TEXT,
opened_at_ms INTEGER,
closed_at_ms INTEGER,
monitor_type TEXT,
entry_reason TEXT,
payload_json TEXT,
synced_at INTEGER NOT NULL,
PRIMARY KEY (exchange_key, trade_id)
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_archive_trades_sym
ON archive_trade_cache (exchange_key, symbol, closed_at_ms)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS trade_overlay (
exchange_key TEXT NOT NULL,
trade_id INTEGER NOT NULL,
behavior_tag TEXT NOT NULL DEFAULT '',
note TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL,
PRIMARY KEY (exchange_key, trade_id)
)
"""
)
finally:
conn.close()
def _now_ms() -> int:
return int(time.time() * 1000)
def _parse_dt_ms(raw: Any) -> int | None:
if raw in (None, ""):
return None
try:
if isinstance(raw, (int, float)):
v = int(raw)
return v if v > 1_000_000_000_000 else v * 1000
except (TypeError, ValueError):
pass
s = str(raw).strip().replace("Z", "").replace("T", " ")
if not s:
return None
for fmt, ln in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d %H:%M", 16), ("%Y-%m-%d", 10)):
try:
dt = datetime.strptime(s[:ln], fmt)
return int(dt.timestamp() * 1000)
except ValueError:
continue
return None
def upsert_trades_cache(
exchange_key: str,
trades: list[dict[str, Any]],
*,
db_path: Path | None = None,
) -> int:
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
if not ex_k:
return 0
now = _now_ms()
n = 0
conn = _connect(db_path)
try:
for t in trades or []:
try:
tid = int(t.get("id"))
except (TypeError, ValueError):
continue
sym = (t.get("symbol") or "").strip().upper()
if not sym:
continue
payload = {k: t.get(k) for k in t.keys()}
conn.execute(
"""
INSERT INTO archive_trade_cache (
exchange_key, trade_id, symbol, direction, result, pnl_amount,
opened_at, closed_at, opened_at_ms, closed_at_ms,
monitor_type, entry_reason, payload_json, synced_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(exchange_key, trade_id) DO UPDATE SET
symbol=excluded.symbol,
direction=excluded.direction,
result=excluded.result,
pnl_amount=excluded.pnl_amount,
opened_at=excluded.opened_at,
closed_at=excluded.closed_at,
opened_at_ms=excluded.opened_at_ms,
closed_at_ms=excluded.closed_at_ms,
monitor_type=excluded.monitor_type,
entry_reason=excluded.entry_reason,
payload_json=excluded.payload_json,
synced_at=excluded.synced_at
""",
(
ex_k,
tid,
sym,
t.get("direction"),
t.get("result"),
float(t.get("pnl_amount") or 0),
t.get("opened_at"),
t.get("closed_at"),
t.get("opened_at_ms") or _parse_dt_ms(t.get("opened_at")),
t.get("closed_at_ms") or _parse_dt_ms(t.get("closed_at")),
t.get("monitor_type"),
t.get("entry_reason"),
json.dumps(payload, ensure_ascii=False, default=str),
now,
),
)
n += 1
finally:
conn.close()
return n
def _trade_row_to_dict(row: sqlite3.Row, overlay: dict | None = None) -> dict[str, Any]:
d = dict(row)
payload = {}
raw = d.pop("payload_json", None)
if raw:
try:
payload = json.loads(raw)
except (json.JSONDecodeError, TypeError):
payload = {}
out = {**payload, **{k: d[k] for k in d.keys() if k not in payload}}
ov = overlay or {}
out["behavior_tag"] = ov.get("behavior_tag") or ""
out["note"] = ov.get("note") or ""
out["trade_id"] = out.get("trade_id") or out.get("id")
return out
def load_overlays(
exchange_key: str,
trade_ids: list[int] | None = None,
*,
db_path: Path | None = None,
) -> dict[int, dict[str, Any]]:
ex_k = (exchange_key or "").strip().lower()
conn = _connect(db_path)
try:
if trade_ids:
placeholders = ",".join("?" * len(trade_ids))
rows = conn.execute(
f"""
SELECT exchange_key, trade_id, behavior_tag, note, updated_at
FROM trade_overlay
WHERE exchange_key=? AND trade_id IN ({placeholders})
""",
(ex_k, *trade_ids),
).fetchall()
else:
rows = conn.execute(
"""
SELECT exchange_key, trade_id, behavior_tag, note, updated_at
FROM trade_overlay WHERE exchange_key=?
""",
(ex_k,),
).fetchall()
return {
int(r["trade_id"]): {
"behavior_tag": r["behavior_tag"] or "",
"note": r["note"] or "",
"updated_at": r["updated_at"],
}
for r in rows
}
finally:
conn.close()
def upsert_trade_overlay(
exchange_key: str,
trade_id: int,
*,
behavior_tag: str | None = None,
note: str | None = None,
db_path: Path | None = None,
) -> dict[str, Any]:
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
tid = int(trade_id)
tag = (behavior_tag or "").strip().lower()
if tag not in BEHAVIOR_TAGS:
tag = ""
note_text = (note or "").strip()[:2000]
now = _now_ms()
conn = _connect(db_path)
try:
conn.execute(
"""
INSERT INTO trade_overlay (exchange_key, trade_id, behavior_tag, note, updated_at)
VALUES (?,?,?,?,?)
ON CONFLICT(exchange_key, trade_id) DO UPDATE SET
behavior_tag=excluded.behavior_tag,
note=excluded.note,
updated_at=excluded.updated_at
""",
(ex_k, tid, tag, note_text, now),
)
finally:
conn.close()
return {"exchange_key": ex_k, "trade_id": tid, "behavior_tag": tag, "note": note_text}
def list_symbol_rows(
*,
exchange_key: str = "",
filter_profit: bool = False,
filter_loss: bool = False,
filter_sick: bool = False,
filter_emotion: bool = False,
db_path: Path | None = None,
) -> list[dict[str, Any]]:
"""一所一币一行汇总。"""
init_db(db_path)
conn = _connect(db_path)
try:
params: list[Any] = []
where = "1=1"
ex_filter = (exchange_key or "").strip().lower()
if ex_filter:
where += " AND t.exchange_key=?"
params.append(ex_filter)
rows = conn.execute(
f"""
SELECT t.exchange_key, t.symbol,
COUNT(*) AS trade_count,
SUM(CASE WHEN t.pnl_amount > 0.0001 THEN 1 ELSE 0 END) AS win_count,
SUM(CASE WHEN t.pnl_amount < -0.0001 THEN 1 ELSE 0 END) AS loss_count,
SUM(COALESCE(t.pnl_amount, 0)) AS total_pnl,
MIN(COALESCE(t.opened_at_ms, 0)) AS first_opened_ms,
MAX(COALESCE(t.closed_at_ms, 0)) AS last_closed_ms
FROM archive_trade_cache t
WHERE {where}
GROUP BY t.exchange_key, t.symbol
ORDER BY last_closed_ms DESC
""",
params,
).fetchall()
overlays_by_ex: dict[str, dict[int, dict]] = {}
out: list[dict[str, Any]] = []
for r in rows:
ex_k = r["exchange_key"]
sym = r["symbol"]
if ex_k not in overlays_by_ex:
overlays_by_ex[ex_k] = load_overlays(ex_k, db_path=db_path)
trade_rows = conn.execute(
"""
SELECT trade_id, pnl_amount FROM archive_trade_cache
WHERE exchange_key=? AND symbol=?
""",
(ex_k, sym),
).fetchall()
has_profit = any(float(x["pnl_amount"] or 0) > 0.0001 for x in trade_rows)
has_loss = any(float(x["pnl_amount"] or 0) < -0.0001 for x in trade_rows)
has_sick = False
has_emotion = False
ov_map = overlays_by_ex.get(ex_k) or {}
for tr in trade_rows:
ov = ov_map.get(int(tr["trade_id"])) or {}
if ov.get("behavior_tag") == "sick":
has_sick = True
if ov.get("behavior_tag") == "emotion":
has_emotion = True
if filter_profit and not has_profit:
continue
if filter_loss and not has_loss:
continue
if filter_sick and not has_sick:
continue
if filter_emotion and not has_emotion:
continue
meta = conn.execute(
"SELECT seed_complete, last_kline_sync_ms FROM archive_meta WHERE exchange_key=? AND symbol=?",
(ex_k, sym),
).fetchone()
out.append(
{
"exchange_key": ex_k,
"symbol": sym,
"trade_count": int(r["trade_count"] or 0),
"win_count": int(r["win_count"] or 0),
"loss_count": int(r["loss_count"] or 0),
"total_pnl": round(float(r["total_pnl"] or 0), 4),
"first_opened_ms": int(r["first_opened_ms"] or 0) or None,
"last_closed_ms": int(r["last_closed_ms"] or 0) or None,
"seed_complete": bool(meta["seed_complete"]) if meta else False,
"last_kline_sync_ms": int(meta["last_kline_sync_ms"] or 0) if meta else None,
}
)
return out
finally:
conn.close()
def load_symbol_trades(
exchange_key: str,
symbol: str,
*,
db_path: Path | None = None,
) -> list[dict[str, Any]]:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
conn = _connect(db_path)
try:
rows = conn.execute(
"""
SELECT * FROM archive_trade_cache
WHERE exchange_key=? AND symbol=?
ORDER BY COALESCE(closed_at_ms, 0) DESC, trade_id DESC
""",
(ex_k, sym),
).fetchall()
ids = [int(r["trade_id"]) for r in rows]
ov = load_overlays(ex_k, ids, db_path=db_path)
return [_trade_row_to_dict(r, ov.get(int(r["trade_id"]))) for r in rows]
finally:
conn.close()
def upsert_bars_5m(
exchange_key: str,
symbol: str,
bars: list[dict[str, Any]],
*,
db_path: Path | None = None,
) -> int:
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
now = _now_ms()
n = 0
conn = _connect(db_path)
try:
for b in bars or []:
try:
conn.execute(
"""
INSERT INTO archive_bars_5m (
exchange_key, symbol, open_time_ms, open, high, low, close, volume, updated_at
) VALUES (?,?,?,?,?,?,?,?,?)
ON CONFLICT(exchange_key, symbol, open_time_ms) DO UPDATE SET
open=excluded.open,
high=excluded.high,
low=excluded.low,
close=excluded.close,
volume=excluded.volume,
updated_at=excluded.updated_at
""",
(
ex_k,
sym,
int(b["open_time_ms"]),
float(b["open"]),
float(b["high"]),
float(b["low"]),
float(b["close"]),
float(b.get("volume") or 0),
now,
),
)
n += 1
except (KeyError, TypeError, ValueError):
continue
finally:
conn.close()
return n
def load_bars_5m_range(
exchange_key: str,
symbol: str,
start_ms: int,
end_ms: int,
*,
db_path: Path | None = None,
) -> list[dict[str, Any]]:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
conn = _connect(db_path)
try:
rows = conn.execute(
"""
SELECT open_time_ms, open, high, low, close, volume
FROM archive_bars_5m
WHERE exchange_key=? AND symbol=?
AND open_time_ms >= ? AND open_time_ms <= ?
ORDER BY open_time_ms ASC
""",
(ex_k, sym, int(start_ms), int(end_ms)),
).fetchall()
return [
{
"open_time_ms": int(r["open_time_ms"]),
"open": float(r["open"]),
"high": float(r["high"]),
"low": float(r["low"]),
"close": float(r["close"]),
"volume": float(r["volume"] or 0),
}
for r in rows
]
finally:
conn.close()
def _to_candles(bars: list[dict[str, Any]]) -> list[dict[str, Any]]:
out = []
for b in bars or []:
try:
out.append(
{
"time": int(b["open_time_ms"] // 1000),
"open": float(b["open"]),
"high": float(b["high"]),
"low": float(b["low"]),
"close": float(b["close"]),
"volume": float(b.get("volume") or 0),
}
)
except (KeyError, TypeError, ValueError):
continue
return out
def resolve_archive_chart(
exchange_key: str,
symbol: str,
timeframe: str = ARCHIVE_DEFAULT_TIMEFRAME,
*,
anchor_ms: int | None = None,
mode: str = "hold",
bars: int = ARCHIVE_VISIBLE_BARS_DEFAULT,
db_path: Path | None = None,
) -> dict[str, Any]:
"""从永久 5m 库聚合出档案 K 线视窗。"""
tf = normalize_chart_timeframe(timeframe, default=ARCHIVE_DEFAULT_TIMEFRAME)
if tf not in ARCHIVE_TIMEFRAMES:
return {"ok": False, "msg": f"档案仅支持 {', '.join(sorted(ARCHIVE_TIMEFRAMES))}"}
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
if not ex_k or not sym:
return {"ok": False, "msg": "缺少 exchange_key 或 symbol"}
period = TIMEFRAME_MS[tf]
visible = max(50, min(int(bars or ARCHIVE_VISIBLE_BARS_DEFAULT), 500))
anchor = int(anchor_ms) if anchor_ms else _now_ms()
half = visible // 2
start_ms = max(0, anchor - half * period)
end_ms = anchor + half * period
raw_5m = load_bars_5m_range(ex_k, sym, start_ms - period * 3, end_ms + period * 3, db_path=db_path)
if not raw_5m:
return {"ok": False, "msg": "档案库暂无 K 线,请等待同步或手动刷新"}
if tf == "5m":
merged = [b for b in raw_5m if start_ms <= int(b["open_time_ms"]) <= end_ms]
else:
agg = aggregate_ohlcv_bars(raw_5m, tf)
merged = [b for b in agg if start_ms <= int(b["open_time_ms"]) <= end_ms]
candles = _to_candles(merged)
if not candles:
return {"ok": False, "msg": "视窗内无 K 线"}
return {
"ok": True,
"exchange_key": ex_k,
"symbol": sym,
"timeframe": tf,
"mode": (mode or "hold").strip().lower(),
"anchor_ms": anchor,
"window_start_ms": start_ms,
"window_end_ms": end_ms,
"candles": candles,
"bar_count": len(candles),
}
def _ensure_meta(
exchange_key: str,
symbol: str,
first_opened_ms: int | None,
*,
db_path: Path | None = None,
) -> None:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
now = _now_ms()
conn = _connect(db_path)
try:
row = conn.execute(
"SELECT first_trade_opened_ms FROM archive_meta WHERE exchange_key=? AND symbol=?",
(ex_k, sym),
).fetchone()
if row:
if first_opened_ms and (
not row["first_trade_opened_ms"]
or int(first_opened_ms) < int(row["first_trade_opened_ms"])
):
conn.execute(
"""
UPDATE archive_meta SET first_trade_opened_ms=?
WHERE exchange_key=? AND symbol=?
""",
(int(first_opened_ms), ex_k, sym),
)
return
conn.execute(
"""
INSERT INTO archive_meta (
exchange_key, symbol, first_trade_opened_ms,
archive_started_at, last_kline_sync_ms, last_trade_sync_ms, seed_complete
) VALUES (?,?,?,?,?,?,0)
""",
(ex_k, sym, int(first_opened_ms) if first_opened_ms else None, now, None, None),
)
finally:
conn.close()
def _mark_meta_sync(
exchange_key: str,
symbol: str,
*,
kline_ms: int | None = None,
trade_ms: int | None = None,
seed_complete: bool | None = None,
db_path: Path | None = None,
) -> None:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
conn = _connect(db_path)
try:
sets = []
params: list[Any] = []
if kline_ms is not None:
sets.append("last_kline_sync_ms=?")
params.append(int(kline_ms))
if trade_ms is not None:
sets.append("last_trade_sync_ms=?")
params.append(int(trade_ms))
if seed_complete is not None:
sets.append("seed_complete=?")
params.append(1 if seed_complete else 0)
if not sets:
return
params.extend([ex_k, sym])
conn.execute(
f"UPDATE archive_meta SET {', '.join(sets)} WHERE exchange_key=? AND symbol=?",
params,
)
finally:
conn.close()
def fetch_remote_5m_range(
remote_fetch: Callable[..., dict[str, Any]],
symbol: str,
start_ms: int,
end_ms: int,
) -> list[dict[str, Any]]:
"""经实例 /api/hub/ohlcv 分页拉取 5m。"""
period = TIMEFRAME_MS["5m"]
since = max(0, int(start_ms))
end = int(end_ms)
merged: dict[int, dict[str, Any]] = {}
guard = 0
while since < end and guard < 120:
guard += 1
remote = remote_fetch(symbol=symbol, timeframe="5m", since_ms=since, limit=500)
if not remote.get("ok"):
break
batch = remote.get("bars") or []
if not batch:
break
for b in batch:
try:
ts = int(b["open_time_ms"])
merged[ts] = b
except (KeyError, TypeError, ValueError):
continue
last_ts = max(int(b["open_time_ms"]) for b in batch)
next_since = last_ts + period
if next_since <= since:
break
since = next_since
if last_ts >= end:
break
return [merged[k] for k in sorted(merged.keys()) if start_ms <= k <= end]
def seed_symbol_archive(
exchange_key: str,
symbol: str,
first_opened_ms: int,
remote_fetch: Callable[..., dict[str, Any]],
*,
db_path: Path | None = None,
) -> dict[str, Any]:
"""建档:最早开仓向前 30 天 5m 种子。"""
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
anchor = int(first_opened_ms)
start_ms = max(0, anchor - ARCHIVE_SEED_LOOKBACK_DAYS * 86400000)
end_ms = _now_ms()
_ensure_meta(ex_k, sym, anchor, db_path=db_path)
bars = fetch_remote_5m_range(remote_fetch, sym, start_ms, end_ms)
n = upsert_bars_5m(ex_k, sym, bars, db_path=db_path)
now = _now_ms()
_mark_meta_sync(ex_k, sym, kline_ms=now, seed_complete=True, db_path=db_path)
return {"ok": True, "seed_bars": n, "start_ms": start_ms, "end_ms": end_ms}
def sync_symbol_klines_incremental(
exchange_key: str,
symbol: str,
remote_fetch: Callable[..., dict[str, Any]],
*,
db_path: Path | None = None,
) -> dict[str, Any]:
"""增量补 5m 至当前。"""
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
conn = _connect(db_path)
try:
row = conn.execute(
"SELECT MAX(open_time_ms) AS mx FROM archive_bars_5m WHERE exchange_key=? AND symbol=?",
(ex_k, sym),
).fetchone()
last_bar = int(row["mx"]) if row and row["mx"] else None
finally:
conn.close()
period = TIMEFRAME_MS["5m"]
start_ms = max(0, (last_bar + period) if last_bar else 0)
end_ms = _now_ms()
if start_ms >= end_ms - period:
return {"ok": True, "appended": 0, "skipped": True}
bars = fetch_remote_5m_range(remote_fetch, sym, start_ms, end_ms)
n = upsert_bars_5m(ex_k, sym, bars, db_path=db_path)
now = _now_ms()
_mark_meta_sync(ex_k, sym, kline_ms=now, db_path=db_path)
return {"ok": True, "appended": n, "start_ms": start_ms, "end_ms": end_ms}
def sync_exchange_symbol_archives(
exchange_key: str,
trades: list[dict[str, Any]],
remote_fetch: Callable[..., dict[str, Any]],
*,
db_path: Path | None = None,
) -> dict[str, Any]:
"""同步单所:交易缓存 + 各币种 K 线种子/增量。"""
ex_k = (exchange_key or "").strip().lower()
upsert_trades_cache(ex_k, trades, db_path=db_path)
by_sym: dict[str, int] = {}
for t in trades or []:
sym = (t.get("symbol") or "").strip().upper()
if not sym:
continue
oms = t.get("opened_at_ms") or _parse_dt_ms(t.get("opened_at"))
if oms:
cur = by_sym.get(sym)
if cur is None or int(oms) < cur:
by_sym[sym] = int(oms)
seeded = 0
appended = 0
for sym, first_ms in by_sym.items():
_ensure_meta(ex_k, sym, first_ms, db_path=db_path)
conn = _connect(db_path)
try:
meta = conn.execute(
"SELECT seed_complete FROM archive_meta WHERE exchange_key=? AND symbol=?",
(ex_k, sym),
).fetchone()
finally:
conn.close()
if not meta or not int(meta["seed_complete"] or 0):
r = seed_symbol_archive(ex_k, sym, first_ms, remote_fetch, db_path=db_path)
seeded += int(r.get("seed_bars") or 0)
else:
r = sync_symbol_klines_incremental(ex_k, sym, remote_fetch, db_path=db_path)
appended += int(r.get("appended") or 0)
return {
"ok": True,
"exchange_key": ex_k,
"symbols": len(by_sym),
"seed_bars": seeded,
"appended_bars": appended,
"trades": len(trades or []),
}