Files
qihuo/recommend_store.py
T

265 lines
8.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""可开仓品种:计算、按资金过滤、SQLite 缓存。"""
from __future__ import annotations
import json
import logging
import math
from datetime import datetime
from typing import Callable, Optional
from fee_specs import ensure_fee_rates_schema
from product_recommend import _attach_turnover, list_product_recommendations
from recommend_trend import sort_recommend_by_trend
from symbols import product_category
logger = logging.getLogger(__name__)
RECOMMEND_CACHE_SQL = """
CREATE TABLE IF NOT EXISTS product_recommend_cache (
id INTEGER PRIMARY KEY CHECK (id = 1),
capital REAL NOT NULL DEFAULT 0,
rows_json TEXT NOT NULL DEFAULT '[]',
updated_at TEXT
)
"""
def ensure_recommend_tables(conn) -> None:
conn.execute(RECOMMEND_CACHE_SQL)
def filter_affordable_recommendations(rows: list[dict]) -> list[dict]:
"""仅保留当前资金可开 1 手的品种(不含资金不足、无行情)。"""
return [r for r in rows if r.get("status") in ("ok", "margin_ok")]
def rows_missing_max_lots(rows: list[dict]) -> bool:
"""缓存是否为旧版(缺少最大手数字段)。"""
if not rows:
return False
return any("max_lots" not in r for r in rows)
def rows_missing_trend(rows: list[dict]) -> bool:
"""缓存是否为旧版(缺少走势字段)。"""
if not rows:
return False
return any("trend" not in r for r in rows)
def rows_missing_daily_stats(rows: list[dict]) -> bool:
"""缓存是否为旧版(缺少跳空/量价字段)。"""
if not rows:
return False
return any("gap" not in r for r in rows)
def rows_missing_category(rows: list[dict]) -> bool:
if not rows:
return False
return any("category" not in r for r in rows)
def rows_missing_turnover(rows: list[dict]) -> bool:
if not rows:
return False
return any("turnover" not in r for r in rows)
def recommend_cache_needs_refresh(
cached: dict,
*,
capital: float = 0.0,
) -> bool:
"""是否需要重新拉行情计算可开仓列表。"""
if recommend_cache_stale(cached.get("updated_at")):
return True
rows = cached.get("rows") or []
if rows_missing_max_lots(rows):
return True
if rows_missing_trend(rows):
return True
if rows_missing_daily_stats(rows):
return True
if rows_missing_category(rows):
return True
if rows_missing_turnover(rows):
return True
if float(capital or 0) > 0 and not rows:
return True
return False
def enrich_recommend_rows(
rows: list[dict],
capital: float,
*,
max_margin_pct: float = 30.0,
trading_mode: str = "simulation",
) -> list[dict]:
"""用当前权益与保证金比例补算最大可开手数(兼容旧缓存)。"""
cap = float(capital or 0)
pct = max(1.0, min(100.0, float(max_margin_pct or 30.0)))
budget = cap * pct / 100.0 if cap > 0 else 0.0
ctp_connected = False
try:
from vnpy_bridge import ctp_estimate_margin_one_lot, ctp_status
ctp_connected = bool(ctp_status(trading_mode).get("connected"))
except Exception:
pass
enriched: list[dict] = []
for raw in rows:
row = dict(raw)
margin_one = 0.0
try:
margin_one = float(row.get("margin_one_lot") or 0)
except (TypeError, ValueError):
margin_one = 0.0
price = float(row.get("price") or 0)
main_code = (row.get("main_code") or "").strip()
if ctp_connected and main_code and price > 0:
ctp_margin = ctp_estimate_margin_one_lot(trading_mode, main_code, price)
if ctp_margin and ctp_margin > 0:
margin_one = ctp_margin
row["margin_one_lot"] = ctp_margin
row["margin_source"] = "ctp"
if margin_one > 0 and budget > 0:
lots = int(math.floor(budget / margin_one))
else:
try:
lots = int(row.get("max_lots") or row.get("recommended_lots") or 0)
except (TypeError, ValueError):
lots = 0
row["max_lots"] = lots
row.pop("recommended_lots", None)
row["margin_budget"] = round(budget, 2)
row["max_margin_pct"] = pct
status = row.get("status") or ""
if lots >= 1 and status in ("ok", "margin_ok"):
src = "柜台" if row.get("margin_source") == "ctp" else "估算"
row["status_label"] = (
f"最大 {lots}" if status == "ok" else f"最大 {lots} 手·止损偏宽"
)
if row.get("margin_source") == "ctp":
row["status_label"] += f"{src}保证金)"
elif lots < 1 and status in ("ok", "margin_ok"):
row["status"] = "blocked"
row["status_label"] = "资金不足"
if not row.get("category"):
row["category"] = product_category(row.get("ths") or "")
_attach_turnover(row)
enriched.append(row)
return enriched
def filter_recommend_by_sizing(
rows: list[dict],
*,
sizing_mode: str,
fixed_lots: int = 1,
) -> list[dict]:
"""固定手数模式下:最大手数低于设定值的品种不展示。"""
if (sizing_mode or "").strip().lower() != "fixed":
return rows
fl = max(1, int(fixed_lots or 1))
return [r for r in rows if int(r.get("max_lots") or 0) >= fl]
def refresh_recommend_cache(
conn,
capital: float,
quote_fn: Callable[[str], Optional[dict]],
*,
trading_mode: str = "simulation",
max_margin_pct: float = 30.0,
) -> list[dict]:
"""后台拉行情、筛选并写入数据库。"""
ensure_recommend_tables(conn)
ensure_fee_rates_schema(conn)
all_rows = list_product_recommendations(
capital, quote_fn, max_margin_pct=max_margin_pct, trading_mode=trading_mode,
)
rows = filter_affordable_recommendations(all_rows)
if not rows and float(capital or 0) > 0:
logger.warning(
"recommend refresh: 0 affordable rows capital=%.2f total=%d no_price=%d blocked=%d",
float(capital or 0),
len(all_rows),
sum(1 for r in all_rows if r.get("status") == "no_price"),
sum(1 for r in all_rows if r.get("status") == "blocked"),
)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn.execute(
"""INSERT INTO product_recommend_cache (id, capital, rows_json, updated_at)
VALUES (1, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
capital=excluded.capital,
rows_json=excluded.rows_json,
updated_at=excluded.updated_at""",
(float(capital or 0), json.dumps(rows, ensure_ascii=False), now),
)
conn.commit()
return rows
def recommend_cache_stale(updated_at: Optional[str], *, now: Optional[datetime] = None) -> bool:
"""缓存是否不是今日更新(需重新拉行情计算)。"""
if not updated_at:
return True
try:
cached_day = datetime.strptime(str(updated_at)[:10], "%Y-%m-%d").date()
except ValueError:
return True
today = (now or datetime.now()).date()
return cached_day != today
def load_recommend_cache(conn) -> dict:
"""优先从数据库读取可开仓品种列表。"""
ensure_recommend_tables(conn)
row = conn.execute("SELECT capital, rows_json, updated_at FROM product_recommend_cache WHERE id=1").fetchone()
if not row:
return {"capital": 0.0, "rows": [], "updated_at": None, "stale": True}
try:
rows = json.loads(row["rows_json"] or "[]")
except (TypeError, ValueError, json.JSONDecodeError):
rows = []
updated_at = row["updated_at"]
return {
"capital": float(row["capital"] or 0),
"rows": rows if isinstance(rows, list) else [],
"updated_at": updated_at,
"stale": recommend_cache_stale(updated_at),
}
def recommend_payload(
conn,
*,
live_capital: float,
max_margin_pct: float = 30.0,
trading_mode: str = "simulation",
sizing_mode: str = "fixed",
fixed_lots: int = 1,
) -> dict:
"""读取缓存并附带当前权益(展示用,可能与缓存计算时不同)。"""
payload = load_recommend_cache(conn)
cap = float(live_capital or 0)
pct = max(1.0, min(100.0, float(max_margin_pct or 30.0)))
payload["capital"] = cap
payload["max_margin_pct"] = pct
rows = payload.get("rows") or []
rows = enrich_recommend_rows(
rows, cap, max_margin_pct=pct, trading_mode=trading_mode,
)
rows = filter_recommend_by_sizing(rows, sizing_mode=sizing_mode, fixed_lots=fixed_lots)
rows = sort_recommend_by_trend(rows)
payload["rows"] = rows
payload["needs_refresh"] = recommend_cache_needs_refresh(payload, capital=cap)
return payload