diff --git a/install_trading.py b/install_trading.py index b4167dd..d0ea32a 100644 --- a/install_trading.py +++ b/install_trading.py @@ -24,12 +24,10 @@ from position_sizing import ( normalize_sizing_mode, ) from recommend_store import ( - load_recommend_cache, - recommend_cache_needs_refresh, recommend_payload, refresh_recommend_cache, ) -from recommend_stream import recommend_hub, start_recommend_worker +from recommend_stream import recommend_hub, schedule_recommend_refresh, start_recommend_worker from position_stream import position_hub, start_position_worker from ctp_reconnect import start_ctp_reconnect_worker from ctp_premarket_connect import start_ctp_premarket_connect_worker @@ -100,6 +98,20 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se return "固定金额" return "固定手数" + def _schedule_recommend_refresh() -> None: + from db_conn import DB_PATH + + schedule_recommend_refresh( + db_path=DB_PATH, + get_capital_fn=_capital, + quote_fn=_main_quote, + init_tables_fn=lambda c: init_strategy_tables(c), + get_mode_fn=lambda: get_trading_mode(get_setting), + get_max_margin_pct_fn=lambda: get_max_margin_pct(get_setting), + get_sizing_mode_fn=lambda: get_sizing_mode(get_setting), + get_fixed_lots_fn=lambda: get_fixed_lots(get_setting), + ) + def _recommend_payload(conn) -> dict: mode = get_trading_mode(get_setting) return recommend_payload( @@ -937,41 +949,9 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se conn.commit() sizing = get_sizing_mode(get_setting) max_pct = get_max_margin_pct(get_setting) - rec_loaded = load_recommend_cache(conn) - if recommend_cache_needs_refresh(rec_loaded, capital=capital): - try: - refresh_recommend_cache( - conn, capital, _main_quote, trading_mode=mode, max_margin_pct=max_pct, - ) - except Exception as exc: - logger.warning("positions recommend refresh failed: %s", exc) rec_cache = _recommend_payload(conn) - if not rec_cache.get("rows") and capital > 0: - try: - from product_recommend import list_product_recommendations - from recommend_store import ( - enrich_recommend_rows, - filter_affordable_recommendations, - filter_recommend_by_sizing, - ) - - live_rows = filter_affordable_recommendations( - list_product_recommendations( - capital, _main_quote, max_margin_pct=max_pct, trading_mode=mode, - ) - ) - if live_rows: - enriched = enrich_recommend_rows( - live_rows, capital, max_margin_pct=max_pct, trading_mode=mode, - ) - rec_cache["rows"] = filter_recommend_by_sizing( - enriched, - sizing_mode=sizing, - fixed_lots=get_fixed_lots(get_setting), - ) - rec_cache["updated_at"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - except Exception as exc: - logger.warning("positions recommend live fallback failed: %s", exc) + if rec_cache.get("needs_refresh"): + _schedule_recommend_refresh() return render_template( "trade.html", trading_mode=mode, diff --git a/recommend_stream.py b/recommend_stream.py index ea056d8..c15ae51 100644 --- a/recommend_stream.py +++ b/recommend_stream.py @@ -20,6 +20,76 @@ from recommend_store import ( logger = logging.getLogger(__name__) CHECK_INTERVAL_SEC = 3600 +_refresh_lock = threading.Lock() +_refresh_running = False + + +def schedule_recommend_refresh( + *, + db_path: str, + get_capital_fn: Callable, + quote_fn: Callable[[str], Optional[dict]], + init_tables_fn: Callable | None = None, + get_mode_fn: Callable[[], str] | None = None, + get_max_margin_pct_fn: Callable[[], float] | None = None, + get_sizing_mode_fn: Callable[[], str] | None = None, + get_fixed_lots_fn: Callable[[], int] | None = None, +) -> None: + """后台刷新推荐缓存(不阻塞页面请求)。""" + global _refresh_running + with _refresh_lock: + if _refresh_running: + return + _refresh_running = True + + def _run() -> None: + global _refresh_running + try: + conn = connect_db(db_path) + try: + if init_tables_fn: + init_tables_fn(conn) + capital = float(get_capital_fn(conn) or 0) + mode = get_mode_fn() if get_mode_fn else "simulation" + max_pct = float(get_max_margin_pct_fn()) if get_max_margin_pct_fn else 30.0 + cached = load_recommend_cache(conn) + if not recommend_cache_needs_refresh(cached, capital=capital): + payload = recommend_payload( + conn, + live_capital=capital, + max_margin_pct=max_pct, + trading_mode=mode, + sizing_mode=get_sizing_mode_fn() if get_sizing_mode_fn else "fixed", + fixed_lots=get_fixed_lots_fn() if get_fixed_lots_fn else 1, + ) + recommend_hub.broadcast("recommend", {"ok": True, **payload}) + return + refresh_recommend_cache( + conn, capital, quote_fn, trading_mode=mode, max_margin_pct=max_pct, + ) + cached = load_recommend_cache(conn) + logger.info( + "品种推荐后台刷新完成,capital=%.2f rows=%d", + capital, len(cached.get("rows") or []), + ) + payload = recommend_payload( + conn, + live_capital=capital, + max_margin_pct=max_pct, + trading_mode=mode, + sizing_mode=get_sizing_mode_fn() if get_sizing_mode_fn else "fixed", + fixed_lots=get_fixed_lots_fn() if get_fixed_lots_fn else 1, + ) + finally: + conn.close() + recommend_hub.broadcast("recommend", {"ok": True, **payload}) + except Exception as exc: + logger.warning("recommend background refresh failed: %s", exc) + finally: + with _refresh_lock: + _refresh_running = False + + threading.Thread(target=_run, daemon=True, name="recommend-refresh").start() class RecommendStreamHub: @@ -71,31 +141,16 @@ def start_recommend_worker( def _loop() -> None: while True: try: - conn = connect_db(db_path) - try: - if init_tables_fn: - init_tables_fn(conn) - capital = float(get_capital_fn(conn) or 0) - mode = get_mode_fn() if get_mode_fn else "simulation" - max_pct = float(get_max_margin_pct_fn()) if get_max_margin_pct_fn else 30.0 - cached = load_recommend_cache(conn) - if recommend_cache_needs_refresh(cached, capital=capital): - refresh_recommend_cache( - conn, capital, quote_fn, trading_mode=mode, max_margin_pct=max_pct, - ) - cached = load_recommend_cache(conn) - logger.info("品种推荐刷新完成,capital=%.2f rows=%d", capital, len(cached.get("rows") or [])) - payload = recommend_payload( - conn, - live_capital=capital, - max_margin_pct=max_pct, - trading_mode=mode, - sizing_mode=get_sizing_mode_fn() if get_sizing_mode_fn else "fixed", - fixed_lots=get_fixed_lots_fn() if get_fixed_lots_fn else 1, - ) - finally: - conn.close() - recommend_hub.broadcast("recommend", {"ok": True, **payload}) + schedule_recommend_refresh( + db_path=db_path, + get_capital_fn=get_capital_fn, + quote_fn=quote_fn, + init_tables_fn=init_tables_fn, + get_mode_fn=get_mode_fn, + get_max_margin_pct_fn=get_max_margin_pct_fn, + get_sizing_mode_fn=get_sizing_mode_fn, + get_fixed_lots_fn=get_fixed_lots_fn, + ) except Exception as exc: logger.warning("recommend worker failed: %s", exc) time.sleep(max(300, interval)) diff --git a/recommend_trend.py b/recommend_trend.py index 491162a..8bc515a 100644 --- a/recommend_trend.py +++ b/recommend_trend.py @@ -4,13 +4,16 @@ from __future__ import annotations import logging from typing import Callable, Optional -from kline_chart import fetch_sina_klines +import requests + +from kline_chart import fetch_sina_klines, ths_to_sina_chart_symbol logger = logging.getLogger(__name__) DAILY_LOOKBACK = 7 OVERLAP_WINDOW = 3 OVERLAP_RANGE_THRESHOLD = 0.70 +KLINE_FETCH_TIMEOUT = 5 TREND_LONG = "long" TREND_SHORT = "short" @@ -107,20 +110,75 @@ def analyze_daily_trend(bars: list, *, overlap_threshold: float = OVERLAP_RANGE_ } +def _normalize_daily_bars(raw: list) -> list: + out = [] + for row in raw: + if isinstance(row, list) and len(row) >= 5: + out.append({ + "d": str(row[0]), + "o": float(row[1]), + "h": float(row[2]), + "l": float(row[3]), + "c": float(row[4]), + }) + elif isinstance(row, dict) and row.get("d"): + out.append({ + "d": str(row["d"]), + "o": float(row.get("o", 0) or 0), + "h": float(row.get("h", 0) or 0), + "l": float(row.get("l", 0) or 0), + "c": float(row.get("c", 0) or 0), + }) + return out + + +def _fetch_sina_daily_quick(chart_sym: str) -> list: + url = ( + "https://stock2.finance.sina.com.cn/futures/api/json.php/" + f"IndexService.getInnerFuturesDailyKLine?symbol={chart_sym}" + ) + try: + resp = requests.get( + url, timeout=KLINE_FETCH_TIMEOUT, + headers={"Referer": "https://finance.sina.com.cn"}, + ) + raw = resp.json() + if raw and isinstance(raw, list): + bars = _normalize_daily_bars(raw) + if bars: + return bars + except Exception as exc: + logger.debug("quick daily kline failed %s: %s", chart_sym, exc) + return [] + + def fetch_week_daily_bars( symbol: str, *, fetch_fn: Callable[[str, str], list] | None = None, ) -> list: - fn = fetch_fn or fetch_sina_klines - try: - bars = fn(symbol, "d") or [] - except Exception as exc: - logger.debug("fetch week daily failed %s: %s", symbol, exc) + sym = (symbol or "").strip() + if not sym: return [] + if fetch_fn: + try: + bars = fetch_fn(sym, "d") or [] + except Exception as exc: + logger.debug("fetch week daily failed %s: %s", sym, exc) + return [] + return bars[-DAILY_LOOKBACK:] if bars else [] + + chart_sym = ths_to_sina_chart_symbol(sym) + if not chart_sym: + return [] + bars = _fetch_sina_daily_quick(chart_sym) if not bars: - return [] - return bars[-DAILY_LOOKBACK:] + try: + bars = fetch_sina_klines(sym, "d") or [] + except Exception as exc: + logger.debug("fetch week daily fallback failed %s: %s", sym, exc) + return [] + return bars[-DAILY_LOOKBACK:] if bars else [] def analyze_product_trend(