Files
qihuo/recommend_store.py
T
2026-06-24 14:20:15 +08:00

90 lines
3.1 KiB
Python

"""品种推荐:计算、按资金过滤、SQLite 缓存。"""
from __future__ import annotations
import json
from datetime import datetime
from typing import Callable, Optional
from product_recommend import list_product_recommendations
RECOMMEND_CACHE_SQL = """
CREATE TABLE IF NOT EXISTS product_recommend_cache (
id INTEGER PRIMARY KEY CHECK (id = 1),
capital REAL NOT NULL DEFAULT 0,
rows_json TEXT NOT NULL DEFAULT '[]',
updated_at TEXT
)
"""
def ensure_recommend_tables(conn) -> None:
conn.execute(RECOMMEND_CACHE_SQL)
def filter_affordable_recommendations(rows: list[dict]) -> list[dict]:
"""仅保留当前资金可开 1 手的品种(不含资金不足、无行情)。"""
return [r for r in rows if r.get("status") in ("ok", "margin_ok")]
def refresh_recommend_cache(
conn,
capital: float,
quote_fn: Callable[[str], Optional[dict]],
*,
trading_mode: str = "simulation",
) -> list[dict]:
"""后台拉行情、筛选并写入数据库。"""
ensure_recommend_tables(conn)
all_rows = list_product_recommendations(capital, quote_fn, trading_mode=trading_mode)
rows = filter_affordable_recommendations(all_rows)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn.execute(
"""INSERT INTO product_recommend_cache (id, capital, rows_json, updated_at)
VALUES (1, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
capital=excluded.capital,
rows_json=excluded.rows_json,
updated_at=excluded.updated_at""",
(float(capital or 0), json.dumps(rows, ensure_ascii=False), now),
)
conn.commit()
return rows
def recommend_cache_stale(updated_at: Optional[str], *, now: Optional[datetime] = None) -> bool:
"""缓存是否不是今日更新(需重新拉行情计算)。"""
if not updated_at:
return True
try:
cached_day = datetime.strptime(str(updated_at)[:10], "%Y-%m-%d").date()
except ValueError:
return True
today = (now or datetime.now()).date()
return cached_day != today
def load_recommend_cache(conn) -> dict:
"""优先从数据库读取推荐列表。"""
ensure_recommend_tables(conn)
row = conn.execute("SELECT capital, rows_json, updated_at FROM product_recommend_cache WHERE id=1").fetchone()
if not row:
return {"capital": 0.0, "rows": [], "updated_at": None, "stale": True}
try:
rows = json.loads(row["rows_json"] or "[]")
except (TypeError, ValueError, json.JSONDecodeError):
rows = []
updated_at = row["updated_at"]
return {
"capital": float(row["capital"] or 0),
"rows": rows if isinstance(rows, list) else [],
"updated_at": updated_at,
"stale": recommend_cache_stale(updated_at),
}
def recommend_payload(conn, *, live_capital: float) -> dict:
"""读取缓存并附带当前权益(展示用,可能与缓存计算时不同)。"""
payload = load_recommend_cache(conn)
payload["capital"] = float(live_capital or 0)
return payload