"""品种推荐:近一周日线走势(多头 / 空头 / 震荡 / 转多 / 转空)。""" from __future__ import annotations import logging from typing import Callable, Optional import requests from kline_chart import fetch_sina_klines, ths_to_sina_chart_symbol logger = logging.getLogger(__name__) DAILY_LOOKBACK = 7 OVERLAP_WINDOW = 3 OVERLAP_RANGE_THRESHOLD = 0.70 KLINE_FETCH_TIMEOUT = 5 TREND_LONG = "long" TREND_SHORT = "short" TREND_RANGE = "range" TREND_BREAK_LONG = "break_long" TREND_BREAK_SHORT = "break_short" def _bar_ohlc(bar: dict) -> tuple[float, float, float, float]: o = float(bar.get("o") or bar.get("open") or 0) h = float(bar.get("h") or bar.get("high") or o) l = float(bar.get("l") or bar.get("low") or o) c = float(bar.get("c") or bar.get("close") or o) return o, h, l, c def kline_overlap_ratio(bars: list) -> float: """三根 K 线高低价区间的重叠度 = 交集 / 并集(0~1)。""" if len(bars) < OVERLAP_WINDOW: return 0.0 chunk = bars[-OVERLAP_WINDOW:] lows, highs = [], [] for bar in chunk: _, h, l, _ = _bar_ohlc(bar) if h <= 0 and l <= 0: continue lows.append(l) highs.append(h) if len(lows) < OVERLAP_WINDOW: return 0.0 overlap = max(0.0, min(highs) - max(lows)) union = max(highs) - min(lows) if union <= 0: return 1.0 if overlap > 0 else 0.0 return overlap / union def _direction_from_closes(bars: list) -> str: if len(bars) < 2: return TREND_RANGE closes = [_bar_ohlc(b)[3] for b in bars if _bar_ohlc(b)[3] > 0] if len(closes) < 2: return TREND_RANGE if closes[-1] > closes[0]: return TREND_LONG if closes[-1] < closes[0]: return TREND_SHORT return TREND_RANGE def _bar_ohlcv(bar: dict) -> tuple[float, float, float, float, float]: o, h, l, c = _bar_ohlc(bar) v = float(bar.get("v") or bar.get("volume") or 0) return o, h, l, c, v def compute_daily_quote_stats(bars: list) -> dict: """从日线提取:跳空、昨收、今开、昨涨跌、昨振幅、成交量。""" empty = { "gap": "", "gap_label": "—", "gap_pct": None, "prev_close": None, "today_open": None, "yesterday_change": None, "yesterday_change_pct": None, "yesterday_amplitude_pct": None, "volume": None, } if len(bars) < 2: return empty t_o, _, _, _, t_v = _bar_ohlcv(bars[-1]) y_o, y_h, y_l, y_c, y_v = _bar_ohlcv(bars[-2]) if y_c <= 0: return empty prev_close = round(y_c, 4) today_open = round(t_o, 4) if t_o > 0 else None gap, gap_label, gap_pct = "none", "否", 0.0 if today_open is not None and today_open > y_c: gap, gap_label = "up", "跳空高开" gap_pct = (today_open - y_c) / y_c * 100 elif today_open is not None and today_open < y_c: gap, gap_label = "down", "跳空低开" gap_pct = (today_open - y_c) / y_c * 100 if len(bars) >= 3: _, _, _, p_c, _ = _bar_ohlcv(bars[-3]) base = p_c if p_c > 0 else y_o else: base = y_o if y_o > 0 else y_c y_change = y_c - base if base > 0 else None y_change_pct = (y_change / base * 100) if y_change is not None and base > 0 else None y_amp = ((y_h - y_l) / base * 100) if base > 0 and y_h >= y_l else None vol = y_v if y_v > 0 else (t_v if t_v > 0 else None) return { "gap": gap, "gap_label": gap_label, "gap_pct": round(gap_pct, 2) if gap != "none" else 0.0, "prev_close": prev_close, "today_open": today_open, "yesterday_change": round(y_change, 4) if y_change is not None else None, "yesterday_change_pct": round(y_change_pct, 2) if y_change_pct is not None else None, "yesterday_amplitude_pct": round(y_amp, 2) if y_amp is not None else None, "volume": int(vol) if vol is not None else None, } def analyze_daily_trend(bars: list, *, overlap_threshold: float = OVERLAP_RANGE_THRESHOLD) -> dict: """根据近一周日线判断走势;最近三天重叠度≥阈值视为震荡。""" empty = { "trend": "", "trend_label": "—", "trend_transition": False, "trend_overlap_pct": None, "trend_prev_overlap_pct": None, } if len(bars) < OVERLAP_WINDOW: return empty recent = bars[-DAILY_LOOKBACK:] if len(bars) > DAILY_LOOKBACK else bars curr_overlap = kline_overlap_ratio(recent) prev_overlap = kline_overlap_ratio(recent[:-OVERLAP_WINDOW]) if len(recent) >= OVERLAP_WINDOW * 2 else 0.0 curr_range = curr_overlap >= overlap_threshold prev_range = prev_overlap >= overlap_threshold if curr_range: trend, label = TREND_RANGE, "震荡" transition = False else: direction = _direction_from_closes(recent[-OVERLAP_WINDOW:]) if direction == TREND_LONG: trend, label = TREND_LONG, "多头" elif direction == TREND_SHORT: trend, label = TREND_SHORT, "空头" else: trend, label = TREND_RANGE, "震荡" transition = prev_range and trend in (TREND_LONG, TREND_SHORT) if transition: if trend == TREND_LONG: trend, label = TREND_BREAK_LONG, "转多" else: trend, label = TREND_BREAK_SHORT, "转空" return { "trend": trend, "trend_label": label, "trend_transition": transition, "trend_overlap_pct": round(curr_overlap * 100, 1), "trend_prev_overlap_pct": round(prev_overlap * 100, 1) if prev_overlap else None, } def _normalize_daily_bars(raw: list) -> list: out = [] for row in raw: if isinstance(row, list) and len(row) >= 5: out.append({ "d": str(row[0]), "o": float(row[1]), "h": float(row[2]), "l": float(row[3]), "c": float(row[4]), "v": float(row[5]) if len(row) > 5 and row[5] else 0.0, }) elif isinstance(row, dict) and row.get("d"): out.append({ "d": str(row["d"]), "o": float(row.get("o", 0) or 0), "h": float(row.get("h", 0) or 0), "l": float(row.get("l", 0) or 0), "c": float(row.get("c", 0) or 0), "v": float(row.get("v", 0) or 0), }) return out def _fetch_sina_daily_quick(chart_sym: str) -> list: url = ( "https://stock2.finance.sina.com.cn/futures/api/json.php/" f"IndexService.getInnerFuturesDailyKLine?symbol={chart_sym}" ) try: resp = requests.get( url, timeout=KLINE_FETCH_TIMEOUT, headers={"Referer": "https://finance.sina.com.cn"}, ) raw = resp.json() if raw and isinstance(raw, list): bars = _normalize_daily_bars(raw) if bars: return bars except Exception as exc: logger.debug("quick daily kline failed %s: %s", chart_sym, exc) return [] def fetch_week_daily_bars( symbol: str, *, fetch_fn: Callable[[str, str], list] | None = None, ) -> list: sym = (symbol or "").strip() if not sym: return [] if fetch_fn: try: bars = fetch_fn(sym, "d") or [] except Exception as exc: logger.debug("fetch week daily failed %s: %s", sym, exc) return [] return bars[-DAILY_LOOKBACK:] if bars else [] chart_sym = ths_to_sina_chart_symbol(sym) if not chart_sym: return [] bars = _fetch_sina_daily_quick(chart_sym) if not bars: try: bars = fetch_sina_klines(sym, "d") or [] except Exception as exc: logger.debug("fetch week daily fallback failed %s: %s", sym, exc) return [] return bars[-DAILY_LOOKBACK:] if bars else [] def analyze_product_daily( symbol: str, *, fetch_fn: Callable[[str, str], list] | None = None, ) -> dict: """拉取主力合约一周日线:走势 + 跳空/量价统计。""" sym = (symbol or "").strip() if not sym: out = analyze_daily_trend([]) out.update(compute_daily_quote_stats([])) return out bars = fetch_week_daily_bars(sym, fetch_fn=fetch_fn) out = analyze_daily_trend(bars) out.update(compute_daily_quote_stats(bars)) return out def analyze_product_trend( symbol: str, *, fetch_fn: Callable[[str, str], list] | None = None, ) -> dict: return analyze_product_daily(symbol, fetch_fn=fetch_fn) GAP_SORT_RANK = {"up": 2, "down": 1, "none": 0, "": -1} TREND_SORT_RANK = { TREND_BREAK_LONG: 0, TREND_BREAK_SHORT: 0, TREND_LONG: 1, TREND_SHORT: 2, TREND_RANGE: 3, "": 9, } def recommend_sort_key(row: dict, sort_by: str = "trend", *, desc: bool = True) -> tuple: """可排序字段:trend / gap / volume / amplitude。""" key = (sort_by or "trend").strip().lower() if key == "gap": primary = GAP_SORT_RANK.get(row.get("gap") or "", -1) secondary = abs(float(row.get("gap_pct") or 0)) elif key == "volume": primary = float(row.get("volume") or 0) secondary = 0.0 elif key == "amplitude": primary = float(row.get("yesterday_amplitude_pct") or 0) secondary = 0.0 else: primary = TREND_SORT_RANK.get(row.get("trend") or "", 9) secondary = -(int(row.get("max_lots") or 0)) if desc: return (-primary, -secondary, row.get("name") or "") return (primary, secondary, row.get("name") or "") def sort_recommend_rows( rows: list[dict], *, sort_by: str = "trend", desc: bool = True, ) -> list[dict]: return sorted(rows, key=lambda r: recommend_sort_key(r, sort_by, desc=desc)) def trend_sort_key(row: dict) -> tuple: """转多/转空优先,其次多头/空头,震荡靠后。""" trend = (row.get("trend") or "").strip() priority = { TREND_BREAK_LONG: 0, TREND_BREAK_SHORT: 0, TREND_LONG: 1, TREND_SHORT: 1, TREND_RANGE: 2, } status_order = {"ok": 0, "margin_ok": 1, "blocked": 2, "no_price": 3} return ( priority.get(trend, 3), status_order.get(row.get("status") or "", 9), -(int(row.get("max_lots") or 0)), ) def sort_recommend_by_trend(rows: list[dict]) -> list[dict]: return sort_recommend_rows(rows, sort_by="trend", desc=True)