aad88a9e98
Show category, turnover, and per-industry counts; clarify volume is in lots. Prevent trade-save button from stretching full column width. Co-authored-by: Cursor <cursoragent@cursor.com>
335 lines
11 KiB
Python
335 lines
11 KiB
Python
"""品种推荐:近一周日线走势(多头 / 空头 / 震荡 / 转多 / 转空)。"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Callable, Optional
|
|
|
|
import requests
|
|
|
|
from kline_chart import fetch_sina_klines, ths_to_sina_chart_symbol
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DAILY_LOOKBACK = 7
|
|
OVERLAP_WINDOW = 3
|
|
OVERLAP_RANGE_THRESHOLD = 0.70
|
|
KLINE_FETCH_TIMEOUT = 5
|
|
|
|
TREND_LONG = "long"
|
|
TREND_SHORT = "short"
|
|
TREND_RANGE = "range"
|
|
TREND_BREAK_LONG = "break_long"
|
|
TREND_BREAK_SHORT = "break_short"
|
|
|
|
|
|
def _bar_ohlc(bar: dict) -> tuple[float, float, float, float]:
|
|
o = float(bar.get("o") or bar.get("open") or 0)
|
|
h = float(bar.get("h") or bar.get("high") or o)
|
|
l = float(bar.get("l") or bar.get("low") or o)
|
|
c = float(bar.get("c") or bar.get("close") or o)
|
|
return o, h, l, c
|
|
|
|
|
|
def kline_overlap_ratio(bars: list) -> float:
|
|
"""三根 K 线高低价区间的重叠度 = 交集 / 并集(0~1)。"""
|
|
if len(bars) < OVERLAP_WINDOW:
|
|
return 0.0
|
|
chunk = bars[-OVERLAP_WINDOW:]
|
|
lows, highs = [], []
|
|
for bar in chunk:
|
|
_, h, l, _ = _bar_ohlc(bar)
|
|
if h <= 0 and l <= 0:
|
|
continue
|
|
lows.append(l)
|
|
highs.append(h)
|
|
if len(lows) < OVERLAP_WINDOW:
|
|
return 0.0
|
|
overlap = max(0.0, min(highs) - max(lows))
|
|
union = max(highs) - min(lows)
|
|
if union <= 0:
|
|
return 1.0 if overlap > 0 else 0.0
|
|
return overlap / union
|
|
|
|
|
|
def _direction_from_closes(bars: list) -> str:
|
|
if len(bars) < 2:
|
|
return TREND_RANGE
|
|
closes = [_bar_ohlc(b)[3] for b in bars if _bar_ohlc(b)[3] > 0]
|
|
if len(closes) < 2:
|
|
return TREND_RANGE
|
|
if closes[-1] > closes[0]:
|
|
return TREND_LONG
|
|
if closes[-1] < closes[0]:
|
|
return TREND_SHORT
|
|
return TREND_RANGE
|
|
|
|
|
|
def _bar_ohlcv(bar: dict) -> tuple[float, float, float, float, float]:
|
|
o, h, l, c = _bar_ohlc(bar)
|
|
v = float(bar.get("v") or bar.get("volume") or 0)
|
|
return o, h, l, c, v
|
|
|
|
|
|
def compute_daily_quote_stats(bars: list) -> dict:
|
|
"""从日线提取:跳空、昨收、今开、昨涨跌、昨振幅、成交量。"""
|
|
empty = {
|
|
"gap": "",
|
|
"gap_label": "—",
|
|
"gap_pct": None,
|
|
"prev_close": None,
|
|
"today_open": None,
|
|
"yesterday_change": None,
|
|
"yesterday_change_pct": None,
|
|
"yesterday_amplitude_pct": None,
|
|
"volume": None,
|
|
}
|
|
if len(bars) < 2:
|
|
return empty
|
|
|
|
t_o, _, _, _, t_v = _bar_ohlcv(bars[-1])
|
|
y_o, y_h, y_l, y_c, y_v = _bar_ohlcv(bars[-2])
|
|
if y_c <= 0:
|
|
return empty
|
|
|
|
prev_close = round(y_c, 4)
|
|
today_open = round(t_o, 4) if t_o > 0 else None
|
|
|
|
gap, gap_label, gap_pct = "none", "否", 0.0
|
|
if today_open is not None and today_open > y_c:
|
|
gap, gap_label = "up", "跳空高开"
|
|
gap_pct = (today_open - y_c) / y_c * 100
|
|
elif today_open is not None and today_open < y_c:
|
|
gap, gap_label = "down", "跳空低开"
|
|
gap_pct = (today_open - y_c) / y_c * 100
|
|
|
|
if len(bars) >= 3:
|
|
_, _, _, p_c, _ = _bar_ohlcv(bars[-3])
|
|
base = p_c if p_c > 0 else y_o
|
|
else:
|
|
base = y_o if y_o > 0 else y_c
|
|
|
|
y_change = y_c - base if base > 0 else None
|
|
y_change_pct = (y_change / base * 100) if y_change is not None and base > 0 else None
|
|
y_amp = ((y_h - y_l) / base * 100) if base > 0 and y_h >= y_l else None
|
|
vol = y_v if y_v > 0 else (t_v if t_v > 0 else None)
|
|
|
|
return {
|
|
"gap": gap,
|
|
"gap_label": gap_label,
|
|
"gap_pct": round(gap_pct, 2) if gap != "none" else 0.0,
|
|
"prev_close": prev_close,
|
|
"today_open": today_open,
|
|
"yesterday_change": round(y_change, 4) if y_change is not None else None,
|
|
"yesterday_change_pct": round(y_change_pct, 2) if y_change_pct is not None else None,
|
|
"yesterday_amplitude_pct": round(y_amp, 2) if y_amp is not None else None,
|
|
"volume": int(vol) if vol is not None else None,
|
|
"volume_unit": "lot",
|
|
}
|
|
|
|
|
|
def analyze_daily_trend(bars: list, *, overlap_threshold: float = OVERLAP_RANGE_THRESHOLD) -> dict:
|
|
"""根据近一周日线判断走势;最近三天重叠度≥阈值视为震荡。"""
|
|
empty = {
|
|
"trend": "",
|
|
"trend_label": "—",
|
|
"trend_transition": False,
|
|
"trend_overlap_pct": None,
|
|
"trend_prev_overlap_pct": None,
|
|
}
|
|
if len(bars) < OVERLAP_WINDOW:
|
|
return empty
|
|
|
|
recent = bars[-DAILY_LOOKBACK:] if len(bars) > DAILY_LOOKBACK else bars
|
|
curr_overlap = kline_overlap_ratio(recent)
|
|
prev_overlap = kline_overlap_ratio(recent[:-OVERLAP_WINDOW]) if len(recent) >= OVERLAP_WINDOW * 2 else 0.0
|
|
|
|
curr_range = curr_overlap >= overlap_threshold
|
|
prev_range = prev_overlap >= overlap_threshold
|
|
|
|
if curr_range:
|
|
trend, label = TREND_RANGE, "震荡"
|
|
transition = False
|
|
else:
|
|
direction = _direction_from_closes(recent[-OVERLAP_WINDOW:])
|
|
if direction == TREND_LONG:
|
|
trend, label = TREND_LONG, "多头"
|
|
elif direction == TREND_SHORT:
|
|
trend, label = TREND_SHORT, "空头"
|
|
else:
|
|
trend, label = TREND_RANGE, "震荡"
|
|
transition = prev_range and trend in (TREND_LONG, TREND_SHORT)
|
|
if transition:
|
|
if trend == TREND_LONG:
|
|
trend, label = TREND_BREAK_LONG, "转多"
|
|
else:
|
|
trend, label = TREND_BREAK_SHORT, "转空"
|
|
|
|
return {
|
|
"trend": trend,
|
|
"trend_label": label,
|
|
"trend_transition": transition,
|
|
"trend_overlap_pct": round(curr_overlap * 100, 1),
|
|
"trend_prev_overlap_pct": round(prev_overlap * 100, 1) if prev_overlap else None,
|
|
}
|
|
|
|
|
|
def _normalize_daily_bars(raw: list) -> list:
|
|
out = []
|
|
for row in raw:
|
|
if isinstance(row, list) and len(row) >= 5:
|
|
out.append({
|
|
"d": str(row[0]),
|
|
"o": float(row[1]),
|
|
"h": float(row[2]),
|
|
"l": float(row[3]),
|
|
"c": float(row[4]),
|
|
"v": float(row[5]) if len(row) > 5 and row[5] else 0.0,
|
|
})
|
|
elif isinstance(row, dict) and row.get("d"):
|
|
out.append({
|
|
"d": str(row["d"]),
|
|
"o": float(row.get("o", 0) or 0),
|
|
"h": float(row.get("h", 0) or 0),
|
|
"l": float(row.get("l", 0) or 0),
|
|
"c": float(row.get("c", 0) or 0),
|
|
"v": float(row.get("v", 0) or 0),
|
|
})
|
|
return out
|
|
|
|
|
|
def _fetch_sina_daily_quick(chart_sym: str) -> list:
|
|
url = (
|
|
"https://stock2.finance.sina.com.cn/futures/api/json.php/"
|
|
f"IndexService.getInnerFuturesDailyKLine?symbol={chart_sym}"
|
|
)
|
|
try:
|
|
resp = requests.get(
|
|
url, timeout=KLINE_FETCH_TIMEOUT,
|
|
headers={"Referer": "https://finance.sina.com.cn"},
|
|
)
|
|
raw = resp.json()
|
|
if raw and isinstance(raw, list):
|
|
bars = _normalize_daily_bars(raw)
|
|
if bars:
|
|
return bars
|
|
except Exception as exc:
|
|
logger.debug("quick daily kline failed %s: %s", chart_sym, exc)
|
|
return []
|
|
|
|
|
|
def fetch_week_daily_bars(
|
|
symbol: str,
|
|
*,
|
|
fetch_fn: Callable[[str, str], list] | None = None,
|
|
) -> list:
|
|
sym = (symbol or "").strip()
|
|
if not sym:
|
|
return []
|
|
if fetch_fn:
|
|
try:
|
|
bars = fetch_fn(sym, "d") or []
|
|
except Exception as exc:
|
|
logger.debug("fetch week daily failed %s: %s", sym, exc)
|
|
return []
|
|
return bars[-DAILY_LOOKBACK:] if bars else []
|
|
|
|
chart_sym = ths_to_sina_chart_symbol(sym)
|
|
if not chart_sym:
|
|
return []
|
|
bars = _fetch_sina_daily_quick(chart_sym)
|
|
if not bars:
|
|
try:
|
|
bars = fetch_sina_klines(sym, "d") or []
|
|
except Exception as exc:
|
|
logger.debug("fetch week daily fallback failed %s: %s", sym, exc)
|
|
return []
|
|
return bars[-DAILY_LOOKBACK:] if bars else []
|
|
|
|
|
|
def analyze_product_daily(
|
|
symbol: str,
|
|
*,
|
|
fetch_fn: Callable[[str, str], list] | None = None,
|
|
) -> dict:
|
|
"""拉取主力合约一周日线:走势 + 跳空/量价统计。"""
|
|
sym = (symbol or "").strip()
|
|
if not sym:
|
|
out = analyze_daily_trend([])
|
|
out.update(compute_daily_quote_stats([]))
|
|
return out
|
|
bars = fetch_week_daily_bars(sym, fetch_fn=fetch_fn)
|
|
out = analyze_daily_trend(bars)
|
|
out.update(compute_daily_quote_stats(bars))
|
|
return out
|
|
|
|
|
|
def analyze_product_trend(
|
|
symbol: str,
|
|
*,
|
|
fetch_fn: Callable[[str, str], list] | None = None,
|
|
) -> dict:
|
|
return analyze_product_daily(symbol, fetch_fn=fetch_fn)
|
|
|
|
|
|
GAP_SORT_RANK = {"up": 2, "down": 1, "none": 0, "": -1}
|
|
TREND_SORT_RANK = {
|
|
TREND_BREAK_LONG: 0,
|
|
TREND_BREAK_SHORT: 0,
|
|
TREND_LONG: 1,
|
|
TREND_SHORT: 2,
|
|
TREND_RANGE: 3,
|
|
"": 9,
|
|
}
|
|
|
|
|
|
def recommend_sort_key(row: dict, sort_by: str = "trend", *, desc: bool = True) -> tuple:
|
|
"""可排序字段:trend / gap / volume / amplitude。"""
|
|
key = (sort_by or "trend").strip().lower()
|
|
if key == "gap":
|
|
primary = GAP_SORT_RANK.get(row.get("gap") or "", -1)
|
|
secondary = abs(float(row.get("gap_pct") or 0))
|
|
elif key == "volume":
|
|
primary = float(row.get("volume") or 0)
|
|
secondary = 0.0
|
|
elif key == "amplitude":
|
|
primary = float(row.get("yesterday_amplitude_pct") or 0)
|
|
secondary = 0.0
|
|
else:
|
|
primary = TREND_SORT_RANK.get(row.get("trend") or "", 9)
|
|
secondary = -(int(row.get("max_lots") or 0))
|
|
|
|
if desc:
|
|
return (-primary, -secondary, row.get("name") or "")
|
|
return (primary, secondary, row.get("name") or "")
|
|
|
|
|
|
def sort_recommend_rows(
|
|
rows: list[dict],
|
|
*,
|
|
sort_by: str = "trend",
|
|
desc: bool = True,
|
|
) -> list[dict]:
|
|
return sorted(rows, key=lambda r: recommend_sort_key(r, sort_by, desc=desc))
|
|
|
|
|
|
def trend_sort_key(row: dict) -> tuple:
|
|
"""转多/转空优先,其次多头/空头,震荡靠后。"""
|
|
trend = (row.get("trend") or "").strip()
|
|
priority = {
|
|
TREND_BREAK_LONG: 0,
|
|
TREND_BREAK_SHORT: 0,
|
|
TREND_LONG: 1,
|
|
TREND_SHORT: 1,
|
|
TREND_RANGE: 2,
|
|
}
|
|
status_order = {"ok": 0, "margin_ok": 1, "blocked": 2, "no_price": 3}
|
|
return (
|
|
priority.get(trend, 3),
|
|
status_order.get(row.get("status") or "", 9),
|
|
-(int(row.get("max_lots") or 0)),
|
|
)
|
|
|
|
|
|
def sort_recommend_by_trend(rows: list[dict]) -> list[dict]:
|
|
return sort_recommend_rows(rows, sort_by="trend", desc=True)
|