"""品种推荐:计算、按资金过滤、SQLite 缓存。""" from __future__ import annotations import json from datetime import datetime from typing import Callable, Optional from product_recommend import list_product_recommendations RECOMMEND_CACHE_SQL = """ CREATE TABLE IF NOT EXISTS product_recommend_cache ( id INTEGER PRIMARY KEY CHECK (id = 1), capital REAL NOT NULL DEFAULT 0, rows_json TEXT NOT NULL DEFAULT '[]', updated_at TEXT ) """ def ensure_recommend_tables(conn) -> None: conn.execute(RECOMMEND_CACHE_SQL) def filter_affordable_recommendations(rows: list[dict]) -> list[dict]: """仅保留当前资金可开 1 手的品种(不含资金不足、无行情)。""" return [r for r in rows if r.get("status") in ("ok", "margin_ok")] def refresh_recommend_cache( conn, capital: float, quote_fn: Callable[[str], Optional[dict]], ) -> list[dict]: """后台拉行情、筛选并写入数据库。""" ensure_recommend_tables(conn) all_rows = list_product_recommendations(capital, quote_fn) rows = filter_affordable_recommendations(all_rows) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") conn.execute( """INSERT INTO product_recommend_cache (id, capital, rows_json, updated_at) VALUES (1, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET capital=excluded.capital, rows_json=excluded.rows_json, updated_at=excluded.updated_at""", (float(capital or 0), json.dumps(rows, ensure_ascii=False), now), ) conn.commit() return rows def load_recommend_cache(conn) -> dict: """优先从数据库读取推荐列表。""" ensure_recommend_tables(conn) row = conn.execute("SELECT capital, rows_json, updated_at FROM product_recommend_cache WHERE id=1").fetchone() if not row: return {"capital": 0.0, "rows": [], "updated_at": None} try: rows = json.loads(row["rows_json"] or "[]") except (TypeError, ValueError, json.JSONDecodeError): rows = [] return { "capital": float(row["capital"] or 0), "rows": rows if isinstance(rows, list) else [], "updated_at": row["updated_at"], }