Files
dekun e5a586f903 Restructure into modules/ with single-process CTP and config/ layout.
Move business code under modules/, env template to config/, PM2 single qihuo process, and _legacy shims for old imports.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-07-01 14:42:16 +08:00

400 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""可开仓品种:计算、按资金过滤、SQLite 缓存。"""
from __future__ import annotations
import json
import logging
import math
from datetime import datetime
from typing import Callable, Optional
from modules.core.contract_specs import get_contract_spec, margin_one_lot
from modules.fees.fee_specs import ensure_fee_rates_schema
from modules.trading.product_recommend import (
_attach_turnover,
filter_rows_for_account_scope,
list_product_recommendations,
)
from modules.trading.recommend_trend import sort_recommend_by_trend
from modules.core.symbols import product_category
logger = logging.getLogger(__name__)
RECOMMEND_CACHE_SQL = """
CREATE TABLE IF NOT EXISTS product_recommend_cache (
id INTEGER PRIMARY KEY CHECK (id = 1),
capital REAL NOT NULL DEFAULT 0,
rows_json TEXT NOT NULL DEFAULT '[]',
updated_at TEXT
)
"""
def ensure_recommend_tables(conn) -> None:
conn.execute(RECOMMEND_CACHE_SQL)
def filter_affordable_recommendations(rows: list[dict]) -> list[dict]:
"""仅保留当前资金可开 1 手的品种(不含资金不足、无行情)。"""
return [r for r in rows if r.get("status") in ("ok", "margin_ok")]
def rows_missing_max_lots(rows: list[dict]) -> bool:
"""缓存是否为旧版(缺少最大手数字段)。"""
if not rows:
return False
return any("max_lots" not in r for r in rows)
def rows_missing_trend(rows: list[dict]) -> bool:
"""缓存是否为旧版(缺少走势字段)。"""
if not rows:
return False
return any("trend" not in r for r in rows)
def rows_missing_daily_stats(rows: list[dict]) -> bool:
"""缓存是否为旧版(缺少跳空/量价字段)。"""
if not rows:
return False
return any("gap" not in r for r in rows)
def rows_missing_category(rows: list[dict]) -> bool:
if not rows:
return False
return any("category" not in r for r in rows)
def rows_missing_turnover(rows: list[dict]) -> bool:
if not rows:
return False
return any("turnover" not in r for r in rows)
def rows_missing_contract_spec(rows: list[dict]) -> bool:
if not rows:
return False
return any("mult" not in r or "tick_size" not in r for r in rows)
def recommend_cache_needs_refresh(
cached: dict,
*,
capital: float = 0.0,
) -> bool:
"""是否需要重新拉行情计算可开仓列表。"""
if recommend_cache_stale(cached.get("updated_at")):
return True
rows = cached.get("rows") or []
if rows_missing_max_lots(rows):
return True
if rows_missing_trend(rows):
return True
if rows_missing_daily_stats(rows):
return True
if rows_missing_category(rows):
return True
if rows_missing_turnover(rows):
return True
if rows_missing_contract_spec(rows):
return True
if float(capital or 0) > 0 and not rows:
return True
return False
def _ctp_connected_for_mode(trading_mode: str) -> bool:
try:
from modules.trading.position_stream import position_hub
snap = position_hub.get_snapshot() or {}
st = snap.get("ctp_status")
if isinstance(st, dict) and st:
return bool(st.get("connected"))
except Exception:
pass
del trading_mode
return False
def recommend_margin_used(trading_mode: str) -> float:
"""当前持仓已占用保证金(各持仓 CTP 回报之和,与柜台持仓保证金一致)。"""
try:
from modules.trading.position_stream import position_hub
snap = position_hub.get_snapshot() or {}
raw = snap.get("margin_used")
if raw is not None:
return max(0.0, float(raw or 0))
except Exception:
pass
if not _ctp_connected_for_mode(trading_mode):
return 0.0
try:
from modules.ctp.vnpy_bridge import ctp_account_margin_used, ctp_sum_position_margins
total = ctp_sum_position_margins(
trading_mode, refresh_if_empty=False, refresh_margin=True,
)
if total > 0:
return total
used = ctp_account_margin_used(trading_mode)
return float(used) if used and used > 0 else 0.0
except Exception as exc:
logger.debug("recommend_margin_used: %s", exc)
return 0.0
def margin_budget_info(
capital: float,
max_margin_pct: float,
margin_used: float = 0.0,
) -> dict[str, float]:
"""保证金上限总额、已占用、剩余可开额度。"""
cap = float(capital or 0)
pct = max(1.0, min(100.0, float(max_margin_pct or 30.0)))
total = cap * pct / 100.0 if cap > 0 else 0.0
used = max(0.0, float(margin_used or 0))
remaining = max(0.0, total - used)
return {
"margin_budget_total": round(total, 2),
"margin_used": round(used, 2),
"margin_budget_remaining": round(remaining, 2),
"max_margin_pct": pct,
}
def enrich_recommend_rows(
rows: list[dict],
capital: float,
*,
max_margin_pct: float = 30.0,
trading_mode: str = "simulation",
margin_used: float = 0.0,
use_ctp_margin: bool = True,
) -> list[dict]:
"""用当前权益与保证金比例补算最大可开手数(兼容旧缓存)。"""
cap = float(capital or 0)
budget_info = margin_budget_info(cap, max_margin_pct, margin_used)
pct = budget_info["max_margin_pct"]
budget = budget_info["margin_budget_remaining"]
ctp_connected = _ctp_connected_for_mode(trading_mode)
enriched: list[dict] = []
for raw in rows:
row = dict(raw)
ths = (row.get("ths") or "").strip()
main_code = (row.get("main_code") or "").strip()
spec_code = main_code or (ths + "8888" if ths else "")
if spec_code:
spec = get_contract_spec(spec_code)
if row.get("mult") in (None, ""):
row["mult"] = spec["mult"]
if row.get("tick_size") in (None, ""):
row["tick_size"] = float(spec.get("tick_size") or 1.0)
margin_one = 0.0
try:
margin_one = float(row.get("margin_one_lot") or 0)
except (TypeError, ValueError):
margin_one = 0.0
price = float(row.get("price") or 0)
code_for_margin = main_code or spec_code
if price > 0 and code_for_margin:
margin_one, margin_source, spec_used = margin_one_lot(
code_for_margin,
price,
direction="max",
trading_mode=trading_mode if (ctp_connected and use_ctp_margin) else None,
)
if spec_used.get("mult"):
row["mult"] = spec_used["mult"]
if spec_used.get("tick_size"):
row["tick_size"] = spec_used["tick_size"]
row["margin_one_lot"] = margin_one
if margin_source == "ctp":
row["margin_source"] = "ctp"
row["spec_source"] = "ctp"
if margin_one > 0 and budget > 0:
lots = int(math.floor(budget / margin_one))
else:
try:
lots = int(row.get("max_lots") or row.get("recommended_lots") or 0)
except (TypeError, ValueError):
lots = 0
row["max_lots"] = lots
row.pop("recommended_lots", None)
row["margin_budget"] = round(budget, 2)
row["margin_budget_total"] = budget_info["margin_budget_total"]
row["margin_used"] = budget_info["margin_used"]
row["max_margin_pct"] = pct
status = row.get("status") or ""
if lots >= 1 and status in ("ok", "margin_ok"):
src = "柜台" if row.get("margin_source") == "ctp" else "估算"
row["status_label"] = (
f"最大 {lots}" if status == "ok" else f"最大 {lots} 手·止损偏宽"
)
if row.get("margin_source") == "ctp":
row["status_label"] += f"{src}保证金)"
if budget_info["margin_used"] > 0:
row["status_label"] += "·扣持仓"
elif lots < 1 and status in ("ok", "margin_ok"):
row["status"] = "blocked"
row["status_label"] = "资金不足"
if not row.get("category"):
row["category"] = product_category(row.get("ths") or "")
from modules.core.symbols import enrich_recommend_row
row = enrich_recommend_row(row)
_attach_turnover(row)
enriched.append(row)
from modules.core.symbols import filter_for_trading_session
return filter_for_trading_session(enriched)
def filter_recommend_by_sizing(
rows: list[dict],
*,
sizing_mode: str,
fixed_lots: int = 1,
) -> list[dict]:
"""固定手数模式下:最大手数低于设定值的品种不展示。"""
if (sizing_mode or "").strip().lower() != "fixed":
return rows
fl = max(1, int(fixed_lots or 1))
return [r for r in rows if int(r.get("max_lots") or 0) >= fl]
def refresh_recommend_cache(
conn,
capital: float,
quote_fn: Callable[[str], Optional[dict]],
*,
trading_mode: str = "simulation",
max_margin_pct: float = 30.0,
margin_used: float | None = None,
) -> list[dict]:
"""后台拉行情、筛选并写入数据库。"""
ensure_recommend_tables(conn)
ensure_fee_rates_schema(conn)
ctp_connected = _ctp_connected_for_mode(trading_mode)
used = (
float(margin_used)
if margin_used is not None
else recommend_margin_used(trading_mode)
)
all_rows = list_product_recommendations(
capital,
quote_fn,
max_margin_pct=max_margin_pct,
trading_mode=trading_mode,
ctp_connected=ctp_connected,
margin_used=used,
)
rows = filter_affordable_recommendations(all_rows)
if not rows and float(capital or 0) > 0:
logger.warning(
"recommend refresh: 0 affordable rows capital=%.2f total=%d no_price=%d blocked=%d",
float(capital or 0),
len(all_rows),
sum(1 for r in all_rows if r.get("status") == "no_price"),
sum(1 for r in all_rows if r.get("status") == "blocked"),
)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
conn.execute(
"""INSERT INTO product_recommend_cache (id, capital, rows_json, updated_at)
VALUES (1, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
capital=excluded.capital,
rows_json=excluded.rows_json,
updated_at=excluded.updated_at""",
(float(capital or 0), json.dumps(rows, ensure_ascii=False), now),
)
conn.commit()
return rows
def recommend_cache_stale(updated_at: Optional[str], *, now: Optional[datetime] = None) -> bool:
"""缓存是否不是今日更新(需重新拉行情计算)。"""
if not updated_at:
return True
try:
cached_day = datetime.strptime(str(updated_at)[:10], "%Y-%m-%d").date()
except ValueError:
return True
today = (now or datetime.now()).date()
return cached_day != today
def load_recommend_cache(conn) -> dict:
"""优先从数据库读取可开仓品种列表。"""
ensure_recommend_tables(conn)
row = conn.execute("SELECT capital, rows_json, updated_at FROM product_recommend_cache WHERE id=1").fetchone()
if not row:
return {"capital": 0.0, "rows": [], "updated_at": None, "stale": True}
try:
rows = json.loads(row["rows_json"] or "[]")
except (TypeError, ValueError, json.JSONDecodeError):
rows = []
updated_at = row["updated_at"]
return {
"capital": float(row["capital"] or 0),
"rows": rows if isinstance(rows, list) else [],
"updated_at": updated_at,
"stale": recommend_cache_stale(updated_at),
}
def recommend_payload(
conn,
*,
live_capital: float,
max_margin_pct: float = 30.0,
trading_mode: str = "simulation",
sizing_mode: str = "fixed",
fixed_lots: int = 1,
use_ctp_margin: bool = True,
) -> dict:
"""读取缓存并附带当前权益(展示用,可能与缓存计算时不同)。"""
payload = load_recommend_cache(conn)
cap = float(live_capital or 0)
pct = max(1.0, min(100.0, float(max_margin_pct or 30.0)))
if use_ctp_margin:
used = recommend_margin_used(trading_mode)
else:
used = 0.0
try:
from modules.trading.position_stream import position_hub
snap = position_hub.get_snapshot() or {}
raw = snap.get("margin_used")
if raw is not None:
used = max(0.0, float(raw or 0))
except Exception:
pass
if used <= 0:
used = float(payload.get("margin_used") or 0)
budget_info = margin_budget_info(cap, pct, used)
payload["capital"] = cap
payload["max_margin_pct"] = pct
payload.update(budget_info)
rows = payload.get("rows") or []
rows = enrich_recommend_rows(
rows,
cap,
max_margin_pct=pct,
trading_mode=trading_mode,
margin_used=used,
use_ctp_margin=use_ctp_margin,
)
rows = filter_rows_for_account_scope(
rows, cap, ctp_connected=_ctp_connected_for_mode(trading_mode),
)
rows = filter_recommend_by_sizing(rows, sizing_mode=sizing_mode, fixed_lots=fixed_lots)
rows = sort_recommend_by_trend(rows)
payload["rows"] = rows
payload["needs_refresh"] = recommend_cache_needs_refresh(payload, capital=cap)
return payload