# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """可开仓品种:计算、按资金过滤、SQLite 缓存。""" from __future__ import annotations import json import logging import math from datetime import datetime from typing import Callable, Optional from contract_specs import get_contract_spec from fee_specs import ensure_fee_rates_schema from product_recommend import _attach_turnover, list_product_recommendations from recommend_trend import sort_recommend_by_trend from symbols import product_category logger = logging.getLogger(__name__) RECOMMEND_CACHE_SQL = """ CREATE TABLE IF NOT EXISTS product_recommend_cache ( id INTEGER PRIMARY KEY CHECK (id = 1), capital REAL NOT NULL DEFAULT 0, rows_json TEXT NOT NULL DEFAULT '[]', updated_at TEXT ) """ def ensure_recommend_tables(conn) -> None: conn.execute(RECOMMEND_CACHE_SQL) def filter_affordable_recommendations(rows: list[dict]) -> list[dict]: """仅保留当前资金可开 1 手的品种(不含资金不足、无行情)。""" return [r for r in rows if r.get("status") in ("ok", "margin_ok")] def rows_missing_max_lots(rows: list[dict]) -> bool: """缓存是否为旧版(缺少最大手数字段)。""" if not rows: return False return any("max_lots" not in r for r in rows) def rows_missing_trend(rows: list[dict]) -> bool: """缓存是否为旧版(缺少走势字段)。""" if not rows: return False return any("trend" not in r for r in rows) def rows_missing_daily_stats(rows: list[dict]) -> bool: """缓存是否为旧版(缺少跳空/量价字段)。""" if not rows: return False return any("gap" not in r for r in rows) def rows_missing_category(rows: list[dict]) -> bool: if not rows: return False return any("category" not in r for r in rows) def rows_missing_turnover(rows: list[dict]) -> bool: if not rows: return False return any("turnover" not in r for r in rows) def rows_missing_contract_spec(rows: list[dict]) -> bool: if not rows: return False return any("mult" not in r or "tick_size" not in r for r in rows) def recommend_cache_needs_refresh( cached: dict, *, capital: float = 0.0, ) -> bool: """是否需要重新拉行情计算可开仓列表。""" if recommend_cache_stale(cached.get("updated_at")): return True rows = cached.get("rows") or [] if rows_missing_max_lots(rows): return True if rows_missing_trend(rows): return True if rows_missing_daily_stats(rows): return True if rows_missing_category(rows): return True if rows_missing_turnover(rows): return True if rows_missing_contract_spec(rows): return True if float(capital or 0) > 0 and not rows: return True return False def enrich_recommend_rows( rows: list[dict], capital: float, *, max_margin_pct: float = 30.0, trading_mode: str = "simulation", ) -> list[dict]: """用当前权益与保证金比例补算最大可开手数(兼容旧缓存)。""" cap = float(capital or 0) pct = max(1.0, min(100.0, float(max_margin_pct or 30.0))) budget = cap * pct / 100.0 if cap > 0 else 0.0 ctp_connected = False ctp_lookup_spec = None ctp_estimate_margin_one_lot_fn = None try: from vnpy_bridge import ctp_estimate_margin_one_lot, ctp_lookup_contract_spec, ctp_status ctp_connected = bool(ctp_status(trading_mode).get("connected")) ctp_lookup_spec = ctp_lookup_contract_spec ctp_estimate_margin_one_lot_fn = ctp_estimate_margin_one_lot except Exception: pass enriched: list[dict] = [] for raw in rows: row = dict(raw) ths = (row.get("ths") or "").strip() main_code = (row.get("main_code") or "").strip() spec_code = main_code or (ths + "8888" if ths else "") if spec_code: spec = get_contract_spec(spec_code) if row.get("mult") in (None, ""): row["mult"] = spec["mult"] if row.get("tick_size") in (None, ""): row["tick_size"] = float(spec.get("tick_size") or 1.0) if ctp_connected and main_code and ctp_lookup_spec: ctp_spec = ctp_lookup_spec(trading_mode, main_code) if ctp_spec: if ctp_spec.get("mult"): row["mult"] = ctp_spec["mult"] if ctp_spec.get("tick_size"): row["tick_size"] = ctp_spec["tick_size"] row["spec_source"] = "ctp" margin_one = 0.0 try: margin_one = float(row.get("margin_one_lot") or 0) except (TypeError, ValueError): margin_one = 0.0 price = float(row.get("price") or 0) if ctp_connected and main_code and price > 0 and ctp_estimate_margin_one_lot_fn: ctp_margin = ctp_estimate_margin_one_lot_fn(trading_mode, main_code, price) if ctp_margin and ctp_margin > 0: margin_one = ctp_margin row["margin_one_lot"] = ctp_margin row["margin_source"] = "ctp" if margin_one > 0 and budget > 0: lots = int(math.floor(budget / margin_one)) else: try: lots = int(row.get("max_lots") or row.get("recommended_lots") or 0) except (TypeError, ValueError): lots = 0 row["max_lots"] = lots row.pop("recommended_lots", None) row["margin_budget"] = round(budget, 2) row["max_margin_pct"] = pct status = row.get("status") or "" if lots >= 1 and status in ("ok", "margin_ok"): src = "柜台" if row.get("margin_source") == "ctp" else "估算" row["status_label"] = ( f"最大 {lots} 手" if status == "ok" else f"最大 {lots} 手·止损偏宽" ) if row.get("margin_source") == "ctp": row["status_label"] += f"({src}保证金)" elif lots < 1 and status in ("ok", "margin_ok"): row["status"] = "blocked" row["status_label"] = "资金不足" if not row.get("category"): row["category"] = product_category(row.get("ths") or "") _attach_turnover(row) enriched.append(row) return enriched def filter_recommend_by_sizing( rows: list[dict], *, sizing_mode: str, fixed_lots: int = 1, ) -> list[dict]: """固定手数模式下:最大手数低于设定值的品种不展示。""" if (sizing_mode or "").strip().lower() != "fixed": return rows fl = max(1, int(fixed_lots or 1)) return [r for r in rows if int(r.get("max_lots") or 0) >= fl] def refresh_recommend_cache( conn, capital: float, quote_fn: Callable[[str], Optional[dict]], *, trading_mode: str = "simulation", max_margin_pct: float = 30.0, ) -> list[dict]: """后台拉行情、筛选并写入数据库。""" ensure_recommend_tables(conn) ensure_fee_rates_schema(conn) all_rows = list_product_recommendations( capital, quote_fn, max_margin_pct=max_margin_pct, trading_mode=trading_mode, ) rows = filter_affordable_recommendations(all_rows) if not rows and float(capital or 0) > 0: logger.warning( "recommend refresh: 0 affordable rows capital=%.2f total=%d no_price=%d blocked=%d", float(capital or 0), len(all_rows), sum(1 for r in all_rows if r.get("status") == "no_price"), sum(1 for r in all_rows if r.get("status") == "blocked"), ) now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") conn.execute( """INSERT INTO product_recommend_cache (id, capital, rows_json, updated_at) VALUES (1, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET capital=excluded.capital, rows_json=excluded.rows_json, updated_at=excluded.updated_at""", (float(capital or 0), json.dumps(rows, ensure_ascii=False), now), ) conn.commit() return rows def recommend_cache_stale(updated_at: Optional[str], *, now: Optional[datetime] = None) -> bool: """缓存是否不是今日更新(需重新拉行情计算)。""" if not updated_at: return True try: cached_day = datetime.strptime(str(updated_at)[:10], "%Y-%m-%d").date() except ValueError: return True today = (now or datetime.now()).date() return cached_day != today def load_recommend_cache(conn) -> dict: """优先从数据库读取可开仓品种列表。""" ensure_recommend_tables(conn) row = conn.execute("SELECT capital, rows_json, updated_at FROM product_recommend_cache WHERE id=1").fetchone() if not row: return {"capital": 0.0, "rows": [], "updated_at": None, "stale": True} try: rows = json.loads(row["rows_json"] or "[]") except (TypeError, ValueError, json.JSONDecodeError): rows = [] updated_at = row["updated_at"] return { "capital": float(row["capital"] or 0), "rows": rows if isinstance(rows, list) else [], "updated_at": updated_at, "stale": recommend_cache_stale(updated_at), } def recommend_payload( conn, *, live_capital: float, max_margin_pct: float = 30.0, trading_mode: str = "simulation", sizing_mode: str = "fixed", fixed_lots: int = 1, ) -> dict: """读取缓存并附带当前权益(展示用,可能与缓存计算时不同)。""" payload = load_recommend_cache(conn) cap = float(live_capital or 0) pct = max(1.0, min(100.0, float(max_margin_pct or 30.0))) payload["capital"] = cap payload["max_margin_pct"] = pct rows = payload.get("rows") or [] rows = enrich_recommend_rows( rows, cap, max_margin_pct=pct, trading_mode=trading_mode, ) rows = filter_recommend_by_sizing(rows, sizing_mode=sizing_mode, fixed_lots=fixed_lots) rows = sort_recommend_by_trend(rows) payload["rows"] = rows payload["needs_refresh"] = recommend_cache_needs_refresh(payload, capital=cap) return payload