import json import sqlite3 from contextlib import contextmanager from datetime import datetime from pathlib import Path from typing import Any from .config import settings def _ensure_db_dir() -> None: Path(settings.db_path).parent.mkdir(parents=True, exist_ok=True) @contextmanager def get_conn(): _ensure_db_dir() conn = sqlite3.connect(settings.db_path) conn.row_factory = sqlite3.Row try: yield conn conn.commit() finally: conn.close() def init_db() -> None: with get_conn() as conn: conn.executescript( """ CREATE TABLE IF NOT EXISTS period_snapshots ( id INTEGER PRIMARY KEY AUTOINCREMENT, period_type TEXT NOT NULL, period_start TEXT NOT NULL, period_end TEXT NOT NULL, snapshot_json TEXT NOT NULL, created_at TEXT NOT NULL, UNIQUE(period_type, period_start, period_end) ); CREATE TABLE IF NOT EXISTS push_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, period_start TEXT NOT NULL, period_end TEXT NOT NULL, pushed_at TEXT NOT NULL, success INTEGER NOT NULL, message TEXT ); """ ) def save_snapshot( period_type: str, period_start: datetime, period_end: datetime, data: list[dict[str, Any]], ) -> None: with get_conn() as conn: conn.execute( """ INSERT INTO period_snapshots (period_type, period_start, period_end, snapshot_json, created_at) VALUES (?, ?, ?, ?, ?) ON CONFLICT(period_type, period_start, period_end) DO UPDATE SET snapshot_json = excluded.snapshot_json, created_at = excluded.created_at """, ( period_type, period_start.isoformat(), period_end.isoformat(), json.dumps(data, ensure_ascii=False), datetime.now().isoformat(), ), ) def get_latest_snapshot(period_type: str) -> dict[str, Any] | None: with get_conn() as conn: row = conn.execute( """ SELECT period_start, period_end, snapshot_json, created_at FROM period_snapshots WHERE period_type = ? ORDER BY period_end DESC LIMIT 1 """, (period_type,), ).fetchone() if not row: return None return { "period_start": row["period_start"], "period_end": row["period_end"], "created_at": row["created_at"], "items": json.loads(row["snapshot_json"]), } def log_push(period_start: str, period_end: str, success: bool, message: str = "") -> None: with get_conn() as conn: conn.execute( """ INSERT INTO push_log (period_start, period_end, pushed_at, success, message) VALUES (?, ?, ?, ?, ?) """, (period_start, period_end, datetime.now().isoformat(), int(success), message), ) def was_pushed_today(period_start: str, period_end: str) -> bool: with get_conn() as conn: row = conn.execute( """ SELECT 1 FROM push_log WHERE period_start = ? AND period_end = ? AND success = 1 LIMIT 1 """, (period_start, period_end), ).fetchone() return row is not None