Files
qihuo/recommend_store.py
T
dekun 840e88daad Add key-level auto trade, AI analysis, and trading UX improvements.
Key monitors use 5m close triggers with WeChat alerts and box/convergence auto orders; add pending-order worker, structured WeChat notify, AI settings/messages, session clock, CTP margin sizing, and dual-layer position limits.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-28 10:36:56 +08:00

310 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""可开仓品种:计算、按资金过滤、SQLite 缓存。"""
from __future__ import annotations
import json
import logging
import math
from datetime import datetime
from typing import Callable, Optional
from contract_specs import get_contract_spec, margin_one_lot
from fee_specs import ensure_fee_rates_schema
from product_recommend import (
_attach_turnover,
filter_rows_for_account_scope,
list_product_recommendations,
)
from recommend_trend import sort_recommend_by_trend
from symbols import product_category
logger = logging.getLogger(__name__)
RECOMMEND_CACHE_SQL = """
CREATE TABLE IF NOT EXISTS product_recommend_cache (
id INTEGER PRIMARY KEY CHECK (id = 1),
capital REAL NOT NULL DEFAULT 0,
rows_json TEXT NOT NULL DEFAULT '[]',
updated_at TEXT
)
"""
def ensure_recommend_tables(conn) -> None:
conn.execute(RECOMMEND_CACHE_SQL)
def filter_affordable_recommendations(rows: list[dict]) -> list[dict]:
"""仅保留当前资金可开 1 手的品种(不含资金不足、无行情)。"""
return [r for r in rows if r.get("status") in ("ok", "margin_ok")]
def rows_missing_max_lots(rows: list[dict]) -> bool:
"""缓存是否为旧版(缺少最大手数字段)。"""
if not rows:
return False
return any("max_lots" not in r for r in rows)
def rows_missing_trend(rows: list[dict]) -> bool:
"""缓存是否为旧版(缺少走势字段)。"""
if not rows:
return False
return any("trend" not in r for r in rows)
def rows_missing_daily_stats(rows: list[dict]) -> bool:
"""缓存是否为旧版(缺少跳空/量价字段)。"""
if not rows:
return False
return any("gap" not in r for r in rows)
def rows_missing_category(rows: list[dict]) -> bool:
if not rows:
return False
return any("category" not in r for r in rows)
def rows_missing_turnover(rows: list[dict]) -> bool:
if not rows:
return False
return any("turnover" not in r for r in rows)
def rows_missing_contract_spec(rows: list[dict]) -> bool:
if not rows:
return False
return any("mult" not in r or "tick_size" not in r for r in rows)
def recommend_cache_needs_refresh(
cached: dict,
*,
capital: float = 0.0,
) -> bool:
"""是否需要重新拉行情计算可开仓列表。"""
if recommend_cache_stale(cached.get("updated_at")):
return True
rows = cached.get("rows") or []
if rows_missing_max_lots(rows):
return True
if rows_missing_trend(rows):
return True
if rows_missing_daily_stats(rows):
return True
if rows_missing_category(rows):
return True
if rows_missing_turnover(rows):
return True
if rows_missing_contract_spec(rows):
return True
if float(capital or 0) > 0 and not rows:
return True
return False
def _ctp_connected_for_mode(trading_mode: str) -> bool:
try:
from vnpy_bridge import ctp_status
return bool(ctp_status(trading_mode).get("connected"))
except Exception:
return False
def enrich_recommend_rows(
rows: list[dict],
capital: float,
*,
max_margin_pct: float = 30.0,
trading_mode: str = "simulation",
) -> list[dict]:
"""用当前权益与保证金比例补算最大可开手数(兼容旧缓存)。"""
cap = float(capital or 0)
pct = max(1.0, min(100.0, float(max_margin_pct or 30.0)))
budget = cap * pct / 100.0 if cap > 0 else 0.0
ctp_connected = _ctp_connected_for_mode(trading_mode)
enriched: list[dict] = []
for raw in rows:
row = dict(raw)
ths = (row.get("ths") or "").strip()
main_code = (row.get("main_code") or "").strip()
spec_code = main_code or (ths + "8888" if ths else "")
if spec_code:
spec = get_contract_spec(spec_code)
if row.get("mult") in (None, ""):
row["mult"] = spec["mult"]
if row.get("tick_size") in (None, ""):
row["tick_size"] = float(spec.get("tick_size") or 1.0)
margin_one = 0.0
try:
margin_one = float(row.get("margin_one_lot") or 0)
except (TypeError, ValueError):
margin_one = 0.0
price = float(row.get("price") or 0)
code_for_margin = main_code or spec_code
if price > 0 and code_for_margin:
margin_one, margin_source, spec_used = margin_one_lot(
code_for_margin,
price,
trading_mode=trading_mode if ctp_connected else None,
)
if spec_used.get("mult"):
row["mult"] = spec_used["mult"]
if spec_used.get("tick_size"):
row["tick_size"] = spec_used["tick_size"]
row["margin_one_lot"] = margin_one
if margin_source == "ctp":
row["margin_source"] = "ctp"
row["spec_source"] = "ctp"
if margin_one > 0 and budget > 0:
lots = int(math.floor(budget / margin_one))
else:
try:
lots = int(row.get("max_lots") or row.get("recommended_lots") or 0)
except (TypeError, ValueError):
lots = 0
row["max_lots"] = lots
row.pop("recommended_lots", None)
row["margin_budget"] = round(budget, 2)
row["max_margin_pct"] = pct
status = row.get("status") or ""
if lots >= 1 and status in ("ok", "margin_ok"):
src = "柜台" if row.get("margin_source") == "ctp" else "估算"
row["status_label"] = (
f"最大 {lots}" if status == "ok" else f"最大 {lots} 手·止损偏宽"
)
if row.get("margin_source") == "ctp":
row["status_label"] += f"{src}保证金)"
elif lots < 1 and status in ("ok", "margin_ok"):
row["status"] = "blocked"
row["status_label"] = "资金不足"
if not row.get("category"):
row["category"] = product_category(row.get("ths") or "")
from symbols import enrich_recommend_row
row = enrich_recommend_row(row)
_attach_turnover(row)
enriched.append(row)
from symbols import filter_for_trading_session
return filter_for_trading_session(enriched)
def filter_recommend_by_sizing(
rows: list[dict],
*,
sizing_mode: str,
fixed_lots: int = 1,
) -> list[dict]:
"""固定手数模式下:最大手数低于设定值的品种不展示。"""
if (sizing_mode or "").strip().lower() != "fixed":
return rows
fl = max(1, int(fixed_lots or 1))
return [r for r in rows if int(r.get("max_lots") or 0) >= fl]
def refresh_recommend_cache(
conn,
capital: float,
quote_fn: Callable[[str], Optional[dict]],
*,
trading_mode: str = "simulation",
max_margin_pct: float = 30.0,
) -> list[dict]:
"""后台拉行情、筛选并写入数据库。"""
ensure_recommend_tables(conn)
ensure_fee_rates_schema(conn)
ctp_connected = _ctp_connected_for_mode(trading_mode)
all_rows = list_product_recommendations(
capital,
quote_fn,
max_margin_pct=max_margin_pct,
trading_mode=trading_mode,
ctp_connected=ctp_connected,
)
rows = filter_affordable_recommendations(all_rows)
if not rows and float(capital or 0) > 0:
logger.warning(
"recommend refresh: 0 affordable rows capital=%.2f total=%d no_price=%d blocked=%d",
float(capital or 0),
len(all_rows),
sum(1 for r in all_rows if r.get("status") == "no_price"),
sum(1 for r in all_rows if r.get("status") == "blocked"),
)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn.execute(
"""INSERT INTO product_recommend_cache (id, capital, rows_json, updated_at)
VALUES (1, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
capital=excluded.capital,
rows_json=excluded.rows_json,
updated_at=excluded.updated_at""",
(float(capital or 0), json.dumps(rows, ensure_ascii=False), now),
)
conn.commit()
return rows
def recommend_cache_stale(updated_at: Optional[str], *, now: Optional[datetime] = None) -> bool:
"""缓存是否不是今日更新(需重新拉行情计算)。"""
if not updated_at:
return True
try:
cached_day = datetime.strptime(str(updated_at)[:10], "%Y-%m-%d").date()
except ValueError:
return True
today = (now or datetime.now()).date()
return cached_day != today
def load_recommend_cache(conn) -> dict:
"""优先从数据库读取可开仓品种列表。"""
ensure_recommend_tables(conn)
row = conn.execute("SELECT capital, rows_json, updated_at FROM product_recommend_cache WHERE id=1").fetchone()
if not row:
return {"capital": 0.0, "rows": [], "updated_at": None, "stale": True}
try:
rows = json.loads(row["rows_json"] or "[]")
except (TypeError, ValueError, json.JSONDecodeError):
rows = []
updated_at = row["updated_at"]
return {
"capital": float(row["capital"] or 0),
"rows": rows if isinstance(rows, list) else [],
"updated_at": updated_at,
"stale": recommend_cache_stale(updated_at),
}
def recommend_payload(
conn,
*,
live_capital: float,
max_margin_pct: float = 30.0,
trading_mode: str = "simulation",
sizing_mode: str = "fixed",
fixed_lots: int = 1,
) -> dict:
"""读取缓存并附带当前权益(展示用,可能与缓存计算时不同)。"""
payload = load_recommend_cache(conn)
cap = float(live_capital or 0)
pct = max(1.0, min(100.0, float(max_margin_pct or 30.0)))
payload["capital"] = cap
payload["max_margin_pct"] = pct
rows = payload.get("rows") or []
rows = enrich_recommend_rows(
rows, cap, max_margin_pct=pct, trading_mode=trading_mode,
)
rows = filter_rows_for_account_scope(
rows, cap, ctp_connected=_ctp_connected_for_mode(trading_mode),
)
rows = filter_recommend_by_sizing(rows, sizing_mode=sizing_mode, fixed_lots=fixed_lots)
rows = sort_recommend_by_trend(rows)
payload["rows"] = rows
payload["needs_refresh"] = recommend_cache_needs_refresh(payload, capital=cap)
return payload