import json import sqlite3 from contextlib import contextmanager from datetime import datetime from pathlib import Path from typing import Any from .config import settings def _ensure_db_dir() -> None: Path(settings.db_path).parent.mkdir(parents=True, exist_ok=True) @contextmanager def get_conn(): _ensure_db_dir() conn = sqlite3.connect(settings.db_path) conn.row_factory = sqlite3.Row try: yield conn conn.commit() finally: conn.close() def init_db() -> None: with get_conn() as conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS period_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, period_type TEXT NOT NULL, period_start TEXT NOT NULL, period_end TEXT NOT NULL, snapshot_json TEXT NOT NULL, created_at TEXT NOT NULL, UNIQUE(period_type, period_start, period_end) ); CREATE TABLE IF NOT EXISTS push_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, period_start TEXT NOT NULL, period_end TEXT NOT NULL, pushed_at TEXT NOT NULL, success INTEGER NOT NULL, message TEXT ); CREATE TABLE IF NOT EXISTS daily_klines ( symbol TEXT NOT NULL, open_time INTEGER NOT NULL, open REAL NOT NULL, high REAL NOT NULL, low REAL NOT NULL, close REAL NOT NULL, volume REAL NOT NULL, quote_volume REAL NOT NULL DEFAULT 0, updated_at TEXT NOT NULL, PRIMARY KEY (symbol, open_time) ); CREATE INDEX IF NOT EXISTS idx_daily_klines_symbol ON daily_klines(symbol, open_time); CREATE TABLE IF NOT EXISTS kline_meta ( symbol TEXT PRIMARY KEY, last_fetch_at TEXT NOT NULL, bar_count INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS funding_history ( symbol TEXT NOT NULL, funding_time INTEGER NOT NULL, funding_rate REAL NOT NULL, mark_price REAL NOT NULL DEFAULT 0, updated_at TEXT NOT NULL, PRIMARY KEY (symbol, funding_time) ); CREATE INDEX IF NOT EXISTS idx_funding_history_symbol ON funding_history(symbol, funding_time); CREATE TABLE IF NOT EXISTS funding_meta ( symbol TEXT PRIMARY KEY, last_fetch_at TEXT NOT NULL, bar_count INTEGER NOT NULL, last_funding_rate REAL NOT NULL DEFAULT 0, next_funding_time INTEGER NOT NULL DEFAULT 0 ); CREATE TABLE IF NOT EXISTS funding_current ( symbol TEXT PRIMARY KEY, last_funding_rate REAL NOT NULL, next_funding_time INTEGER NOT NULL, mark_price REAL NOT NULL DEFAULT 0, updated_at TEXT NOT NULL ); """ ) def save_snapshot( period_type: str, period_start: datetime, period_end: datetime, data: list[dict[str, Any]], ) -> None: with get_conn() as conn: conn.execute( """ INSERT INTO period_snapshots (period_type, period_start, period_end, snapshot_json, created_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(period_type, period_start, period_end) DO UPDATE SET snapshot_json = excluded.snapshot_json, created_at = excluded.created_at """, ( period_type, period_start.isoformat(), period_end.isoformat(), json.dumps(data, ensure_ascii=False), datetime.now().isoformat(), ), ) def get_latest_snapshot(period_type: str) -> dict[str, Any] | None: with get_conn() as conn: row = conn.execute( """ SELECT period_start, period_end, snapshot_json, created_at FROM period_snapshots WHERE period_type = ? ORDER BY period_end DESC LIMIT 1 """, (period_type,), ).fetchone() if not row: return None return { "period_start": row["period_start"], "period_end": row["period_end"], "created_at": row["created_at"], "items": json.loads(row["snapshot_json"]), } def log_push(period_start: str, period_end: str, success: bool, message: str = "") -> None: with get_conn() as conn: conn.execute( """ INSERT INTO push_log (period_start, period_end, pushed_at, success, message) VALUES (?, ?, ?, ?, ?) """, (period_start, period_end, datetime.now().isoformat(), int(success), message), ) def save_daily_klines(symbol: str, candles: list[dict[str, Any]]) -> None: sym = symbol.upper() now = datetime.now().isoformat() with get_conn() as conn: for c in candles: conn.execute( """ INSERT INTO daily_klines ( symbol, open_time, open, high, low, close, volume, quote_volume, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(symbol, open_time) DO UPDATE SET open = excluded.open, high = excluded.high, low = excluded.low, close = excluded.close, volume = excluded.volume, quote_volume = excluded.quote_volume, updated_at = excluded.updated_at """, ( sym, int(c["time"]), float(c["open"]), float(c["high"]), float(c["low"]), float(c["close"]), float(c.get("volume", 0)), float(c.get("quote_volume", 0)), now, ), ) conn.execute( """ INSERT INTO kline_meta (symbol, last_fetch_at, bar_count) VALUES (?, ?, ?) ON CONFLICT(symbol) DO UPDATE SET last_fetch_at = excluded.last_fetch_at, bar_count = excluded.bar_count """, (sym, now, len(candles)), ) def get_daily_klines_from_db(symbol: str, limit: int) -> list[dict[str, Any]]: sym = symbol.upper() with get_conn() as conn: rows = conn.execute( """ SELECT open_time, open, high, low, close, volume, quote_volume FROM daily_klines WHERE symbol = ? ORDER BY open_time DESC LIMIT ? """, (sym, limit), ).fetchall() rows = list(reversed(rows)) return [ { "time": int(r["open_time"]), "open": float(r["open"]), "high": float(r["high"]), "low": float(r["low"]), "close": float(r["close"]), "volume": float(r["volume"]), "quote_volume": float(r["quote_volume"]), } for r in rows ] def get_kline_meta(symbol: str) -> dict[str, Any] | None: sym = symbol.upper() with get_conn() as conn: row = conn.execute( "SELECT last_fetch_at, bar_count FROM kline_meta WHERE symbol = ?", (sym,), ).fetchone() if not row: return None return { "last_fetch_at": row["last_fetch_at"], "bar_count": row["bar_count"], } def save_funding_history( symbol: str, rows: list[dict[str, Any]], next_funding_time: int = 0, ) -> None: sym = symbol.upper() now = datetime.now().isoformat() last_rate = 0.0 next_time = next_funding_time with get_conn() as conn: for r in rows: ft = int(r["time"]) rate = float(r["rate"]) mp = float(r.get("mark_price", 0)) last_rate = rate conn.execute( """ INSERT INTO funding_history (symbol, funding_time, funding_rate, mark_price, updated_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(symbol, funding_time) DO UPDATE SET funding_rate = excluded.funding_rate, mark_price = excluded.mark_price, updated_at = excluded.updated_at """, (sym, ft, rate, mp, now), ) if rows: conn.execute( """ INSERT INTO funding_meta (symbol, last_fetch_at, bar_count, last_funding_rate, next_funding_time) VALUES (?, ?, ?, ?, ?) ON CONFLICT(symbol) DO UPDATE SET last_fetch_at = excluded.last_fetch_at, bar_count = excluded.bar_count, last_funding_rate = excluded.last_funding_rate """, (sym, now, len(rows), last_rate, next_time), ) def get_funding_history_from_db(symbol: str, limit: int) -> list[dict[str, Any]]: sym = symbol.upper() with get_conn() as conn: rows = conn.execute( """ SELECT funding_time, funding_rate, mark_price FROM funding_history WHERE symbol = ? ORDER BY funding_time DESC LIMIT ? """, (sym, limit), ).fetchall() rows = list(reversed(rows)) return [ { "time": int(r["funding_time"]), "rate": float(r["funding_rate"]), "rate_pct": float(r["funding_rate"]) * 100, "mark_price": float(r["mark_price"]), } for r in rows ] def get_funding_meta(symbol: str) -> dict[str, Any] | None: sym = symbol.upper() with get_conn() as conn: row = conn.execute( """ SELECT last_fetch_at, bar_count, last_funding_rate, next_funding_time FROM funding_meta WHERE symbol = ? """, (sym,), ).fetchone() if not row: return None return { "last_fetch_at": row["last_fetch_at"], "bar_count": row["bar_count"], "last_funding_rate": row["last_funding_rate"], "next_funding_time": row["next_funding_time"], } def save_funding_current_bulk(data: dict[str, dict[str, Any]]) -> None: now = datetime.now().isoformat() with get_conn() as conn: for sym, info in data.items(): conn.execute( """ INSERT INTO funding_current (symbol, last_funding_rate, next_funding_time, mark_price, updated_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(symbol) DO UPDATE SET last_funding_rate = excluded.last_funding_rate, next_funding_time = excluded.next_funding_time, mark_price = excluded.mark_price, updated_at = excluded.updated_at """, ( sym.upper(), float(info.get("lastFundingRate", 0) or 0), int(info.get("nextFundingTime", 0) or 0), float(info.get("markPrice", 0) or 0), now, ), ) def was_pushed_today(period_start: str, period_end: str) -> bool: with get_conn() as conn: row = conn.execute( """ SELECT 1 FROM push_log WHERE period_start = ? AND period_end = ? AND success = 1 LIMIT 1 """, (period_start, period_end), ).fetchone() return row is not None