Files
Binance_Altcoin_Monitor/backend/app/db.py
T
2026-05-22 13:06:42 +08:00

122 lines
3.6 KiB
Python

import json
import sqlite3
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import Any
from .config import settings
def _ensure_db_dir() -> None:
Path(settings.db_path).parent.mkdir(parents=True, exist_ok=True)
@contextmanager
def get_conn():
_ensure_db_dir()
conn = sqlite3.connect(settings.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db() -> None:
with get_conn() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS period_snapshots (
id INTEGER PRIMARY KEY AUTOINCREMENT,
period_type TEXT NOT NULL,
period_start TEXT NOT NULL,
period_end TEXT NOT NULL,
snapshot_json TEXT NOT NULL,
created_at TEXT NOT NULL,
UNIQUE(period_type, period_start, period_end)
);
CREATE TABLE IF NOT EXISTS push_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
period_start TEXT NOT NULL,
period_end TEXT NOT NULL,
pushed_at TEXT NOT NULL,
success INTEGER NOT NULL,
message TEXT
);
"""
)
def save_snapshot(
period_type: str,
period_start: datetime,
period_end: datetime,
data: list[dict[str, Any]],
) -> None:
with get_conn() as conn:
conn.execute(
"""
INSERT INTO period_snapshots (period_type, period_start, period_end, snapshot_json, created_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(period_type, period_start, period_end) DO UPDATE SET
snapshot_json = excluded.snapshot_json,
created_at = excluded.created_at
""",
(
period_type,
period_start.isoformat(),
period_end.isoformat(),
json.dumps(data, ensure_ascii=False),
datetime.now().isoformat(),
),
)
def get_latest_snapshot(period_type: str) -> dict[str, Any] | None:
with get_conn() as conn:
row = conn.execute(
"""
SELECT period_start, period_end, snapshot_json, created_at
FROM period_snapshots
WHERE period_type = ?
ORDER BY period_end DESC
LIMIT 1
""",
(period_type,),
).fetchone()
if not row:
return None
return {
"period_start": row["period_start"],
"period_end": row["period_end"],
"created_at": row["created_at"],
"items": json.loads(row["snapshot_json"]),
}
def log_push(period_start: str, period_end: str, success: bool, message: str = "") -> None:
with get_conn() as conn:
conn.execute(
"""
INSERT INTO push_log (period_start, period_end, pushed_at, success, message)
VALUES (?, ?, ?, ?, ?)
""",
(period_start, period_end, datetime.now().isoformat(), int(success), message),
)
def was_pushed_today(period_start: str, period_end: str) -> bool:
with get_conn() as conn:
row = conn.execute(
"""
SELECT 1 FROM push_log
WHERE period_start = ? AND period_end = ? AND success = 1
LIMIT 1
""",
(period_start, period_end),
).fetchone()
return row is not None