"""品种推荐:计算、按资金过滤、SQLite 缓存。""" from __future__ import annotations import json import logging import math from datetime import datetime from typing import Callable, Optional from fee_specs import ensure_fee_rates_schema from product_recommend import list_product_recommendations logger = logging.getLogger(__name__) RECOMMEND_CACHE_SQL = """ CREATE TABLE IF NOT EXISTS product_recommend_cache ( id INTEGER PRIMARY KEY CHECK (id = 1), capital REAL NOT NULL DEFAULT 0, rows_json TEXT NOT NULL DEFAULT '[]', updated_at TEXT ) """ def ensure_recommend_tables(conn) -> None: conn.execute(RECOMMEND_CACHE_SQL) def filter_affordable_recommendations(rows: list[dict]) -> list[dict]: """仅保留当前资金可开 1 手的品种(不含资金不足、无行情)。""" return [r for r in rows if r.get("status") in ("ok", "margin_ok")] def rows_missing_max_lots(rows: list[dict]) -> bool: """缓存是否为旧版(缺少最大手数字段)。""" if not rows: return False return any("max_lots" not in r for r in rows) def recommend_cache_needs_refresh( cached: dict, *, capital: float = 0.0, ) -> bool: """是否需要重新拉行情计算推荐列表。""" if recommend_cache_stale(cached.get("updated_at")): return True rows = cached.get("rows") or [] if rows_missing_max_lots(rows): return True if float(capital or 0) > 0 and not rows: return True return False def enrich_recommend_rows( rows: list[dict], capital: float, *, max_margin_pct: float = 30.0, ) -> list[dict]: """用当前权益与保证金比例补算最大可开手数(兼容旧缓存)。""" cap = float(capital or 0) pct = max(1.0, min(100.0, float(max_margin_pct or 30.0))) budget = cap * pct / 100.0 if cap > 0 else 0.0 enriched: list[dict] = [] for raw in rows: row = dict(raw) try: margin_one = float(row.get("margin_one_lot") or 0) except (TypeError, ValueError): margin_one = 0.0 if margin_one > 0 and budget > 0: lots = int(math.floor(budget / margin_one)) else: try: lots = int(row.get("max_lots") or row.get("recommended_lots") or 0) except (TypeError, ValueError): lots = 0 row["max_lots"] = lots row.pop("recommended_lots", None) row["margin_budget"] = round(budget, 2) row["max_margin_pct"] = pct status = row.get("status") or "" if lots >= 1 and status in ("ok", "margin_ok"): row["status_label"] = ( f"最大 {lots} 手" if status == "ok" else f"最大 {lots} 手·止损偏宽" ) enriched.append(row) return enriched def refresh_recommend_cache( conn, capital: float, quote_fn: Callable[[str], Optional[dict]], *, trading_mode: str = "simulation", max_margin_pct: float = 30.0, ) -> list[dict]: """后台拉行情、筛选并写入数据库。""" ensure_recommend_tables(conn) ensure_fee_rates_schema(conn) all_rows = list_product_recommendations( capital, quote_fn, max_margin_pct=max_margin_pct, trading_mode=trading_mode, ) rows = filter_affordable_recommendations(all_rows) if not rows and float(capital or 0) > 0: logger.warning( "recommend refresh: 0 affordable rows capital=%.2f total=%d no_price=%d blocked=%d", float(capital or 0), len(all_rows), sum(1 for r in all_rows if r.get("status") == "no_price"), sum(1 for r in all_rows if r.get("status") == "blocked"), ) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") conn.execute( """INSERT INTO product_recommend_cache (id, capital, rows_json, updated_at) VALUES (1, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET capital=excluded.capital, rows_json=excluded.rows_json, updated_at=excluded.updated_at""", (float(capital or 0), json.dumps(rows, ensure_ascii=False), now), ) conn.commit() return rows def recommend_cache_stale(updated_at: Optional[str], *, now: Optional[datetime] = None) -> bool: """缓存是否不是今日更新(需重新拉行情计算)。""" if not updated_at: return True try: cached_day = datetime.strptime(str(updated_at)[:10], "%Y-%m-%d").date() except ValueError: return True today = (now or datetime.now()).date() return cached_day != today def load_recommend_cache(conn) -> dict: """优先从数据库读取推荐列表。""" ensure_recommend_tables(conn) row = conn.execute("SELECT capital, rows_json, updated_at FROM product_recommend_cache WHERE id=1").fetchone() if not row: return {"capital": 0.0, "rows": [], "updated_at": None, "stale": True} try: rows = json.loads(row["rows_json"] or "[]") except (TypeError, ValueError, json.JSONDecodeError): rows = [] updated_at = row["updated_at"] return { "capital": float(row["capital"] or 0), "rows": rows if isinstance(rows, list) else [], "updated_at": updated_at, "stale": recommend_cache_stale(updated_at), } def recommend_payload( conn, *, live_capital: float, max_margin_pct: float = 30.0, ) -> dict: """读取缓存并附带当前权益(展示用,可能与缓存计算时不同)。""" payload = load_recommend_cache(conn) cap = float(live_capital or 0) pct = max(1.0, min(100.0, float(max_margin_pct or 30.0))) payload["capital"] = cap payload["max_margin_pct"] = pct rows = payload.get("rows") or [] payload["rows"] = enrich_recommend_rows(rows, cap, max_margin_pct=pct) payload["needs_refresh"] = recommend_cache_needs_refresh(payload, capital=cap) return payload