Add daily trend status to product recommendations with breakout priority
Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -0,0 +1,158 @@
|
||||
"""品种推荐:近一周日线走势(多头 / 空头 / 震荡 / 转多 / 转空)。"""
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Callable, Optional
|
||||
|
||||
from kline_chart import fetch_sina_klines
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
DAILY_LOOKBACK = 7
|
||||
OVERLAP_WINDOW = 3
|
||||
OVERLAP_RANGE_THRESHOLD = 0.70
|
||||
|
||||
TREND_LONG = "long"
|
||||
TREND_SHORT = "short"
|
||||
TREND_RANGE = "range"
|
||||
TREND_BREAK_LONG = "break_long"
|
||||
TREND_BREAK_SHORT = "break_short"
|
||||
|
||||
|
||||
def _bar_ohlc(bar: dict) -> tuple[float, float, float, float]:
|
||||
o = float(bar.get("o") or bar.get("open") or 0)
|
||||
h = float(bar.get("h") or bar.get("high") or o)
|
||||
l = float(bar.get("l") or bar.get("low") or o)
|
||||
c = float(bar.get("c") or bar.get("close") or o)
|
||||
return o, h, l, c
|
||||
|
||||
|
||||
def kline_overlap_ratio(bars: list) -> float:
|
||||
"""三根 K 线高低价区间的重叠度 = 交集 / 并集(0~1)。"""
|
||||
if len(bars) < OVERLAP_WINDOW:
|
||||
return 0.0
|
||||
chunk = bars[-OVERLAP_WINDOW:]
|
||||
lows, highs = [], []
|
||||
for bar in chunk:
|
||||
_, h, l, _ = _bar_ohlc(bar)
|
||||
if h <= 0 and l <= 0:
|
||||
continue
|
||||
lows.append(l)
|
||||
highs.append(h)
|
||||
if len(lows) < OVERLAP_WINDOW:
|
||||
return 0.0
|
||||
overlap = max(0.0, min(highs) - max(lows))
|
||||
union = max(highs) - min(lows)
|
||||
if union <= 0:
|
||||
return 1.0 if overlap > 0 else 0.0
|
||||
return overlap / union
|
||||
|
||||
|
||||
def _direction_from_closes(bars: list) -> str:
|
||||
if len(bars) < 2:
|
||||
return TREND_RANGE
|
||||
closes = [_bar_ohlc(b)[3] for b in bars if _bar_ohlc(b)[3] > 0]
|
||||
if len(closes) < 2:
|
||||
return TREND_RANGE
|
||||
if closes[-1] > closes[0]:
|
||||
return TREND_LONG
|
||||
if closes[-1] < closes[0]:
|
||||
return TREND_SHORT
|
||||
return TREND_RANGE
|
||||
|
||||
|
||||
def analyze_daily_trend(bars: list, *, overlap_threshold: float = OVERLAP_RANGE_THRESHOLD) -> dict:
|
||||
"""根据近一周日线判断走势;最近三天重叠度≥阈值视为震荡。"""
|
||||
empty = {
|
||||
"trend": "",
|
||||
"trend_label": "—",
|
||||
"trend_transition": False,
|
||||
"trend_overlap_pct": None,
|
||||
"trend_prev_overlap_pct": None,
|
||||
}
|
||||
if len(bars) < OVERLAP_WINDOW:
|
||||
return empty
|
||||
|
||||
recent = bars[-DAILY_LOOKBACK:] if len(bars) > DAILY_LOOKBACK else bars
|
||||
curr_overlap = kline_overlap_ratio(recent)
|
||||
prev_overlap = kline_overlap_ratio(recent[:-OVERLAP_WINDOW]) if len(recent) >= OVERLAP_WINDOW * 2 else 0.0
|
||||
|
||||
curr_range = curr_overlap >= overlap_threshold
|
||||
prev_range = prev_overlap >= overlap_threshold
|
||||
|
||||
if curr_range:
|
||||
trend, label = TREND_RANGE, "震荡"
|
||||
transition = False
|
||||
else:
|
||||
direction = _direction_from_closes(recent[-OVERLAP_WINDOW:])
|
||||
if direction == TREND_LONG:
|
||||
trend, label = TREND_LONG, "多头"
|
||||
elif direction == TREND_SHORT:
|
||||
trend, label = TREND_SHORT, "空头"
|
||||
else:
|
||||
trend, label = TREND_RANGE, "震荡"
|
||||
transition = prev_range and trend in (TREND_LONG, TREND_SHORT)
|
||||
if transition:
|
||||
if trend == TREND_LONG:
|
||||
trend, label = TREND_BREAK_LONG, "转多"
|
||||
else:
|
||||
trend, label = TREND_BREAK_SHORT, "转空"
|
||||
|
||||
return {
|
||||
"trend": trend,
|
||||
"trend_label": label,
|
||||
"trend_transition": transition,
|
||||
"trend_overlap_pct": round(curr_overlap * 100, 1),
|
||||
"trend_prev_overlap_pct": round(prev_overlap * 100, 1) if prev_overlap else None,
|
||||
}
|
||||
|
||||
|
||||
def fetch_week_daily_bars(
|
||||
symbol: str,
|
||||
*,
|
||||
fetch_fn: Callable[[str, str], list] | None = None,
|
||||
) -> list:
|
||||
fn = fetch_fn or fetch_sina_klines
|
||||
try:
|
||||
bars = fn(symbol, "d") or []
|
||||
except Exception as exc:
|
||||
logger.debug("fetch week daily failed %s: %s", symbol, exc)
|
||||
return []
|
||||
if not bars:
|
||||
return []
|
||||
return bars[-DAILY_LOOKBACK:]
|
||||
|
||||
|
||||
def analyze_product_trend(
|
||||
symbol: str,
|
||||
*,
|
||||
fetch_fn: Callable[[str, str], list] | None = None,
|
||||
) -> dict:
|
||||
"""拉取主力合约一周日线并分析走势。"""
|
||||
sym = (symbol or "").strip()
|
||||
if not sym:
|
||||
return analyze_daily_trend([])
|
||||
bars = fetch_week_daily_bars(sym, fetch_fn=fetch_fn)
|
||||
return analyze_daily_trend(bars)
|
||||
|
||||
|
||||
def trend_sort_key(row: dict) -> tuple:
|
||||
"""转多/转空优先,其次多头/空头,震荡靠后。"""
|
||||
trend = (row.get("trend") or "").strip()
|
||||
priority = {
|
||||
TREND_BREAK_LONG: 0,
|
||||
TREND_BREAK_SHORT: 0,
|
||||
TREND_LONG: 1,
|
||||
TREND_SHORT: 1,
|
||||
TREND_RANGE: 2,
|
||||
}
|
||||
status_order = {"ok": 0, "margin_ok": 1, "blocked": 2, "no_price": 3}
|
||||
return (
|
||||
priority.get(trend, 3),
|
||||
status_order.get(row.get("status") or "", 9),
|
||||
-(int(row.get("max_lots") or 0)),
|
||||
)
|
||||
|
||||
|
||||
def sort_recommend_by_trend(rows: list[dict]) -> list[dict]:
|
||||
return sorted(rows, key=trend_sort_key)
|
||||
Reference in New Issue
Block a user