142 lines
5.6 KiB
Python
142 lines
5.6 KiB
Python
"""关键位 5m 硬门控(逻辑自 crypto_monitor_gate _key_hard_checks,使用 Gate K 线)。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from statistics import mean
|
||
|
||
from .candle_rows import field_float
|
||
|
||
|
||
def _row_vol(item: list[str]) -> float:
|
||
v = field_float(item, 5)
|
||
return float(v) if v is not None else 0.0
|
||
|
||
|
||
def key_hard_checks_from_rows(
|
||
rows: list[list[str]],
|
||
*,
|
||
direction: str,
|
||
upper: float,
|
||
lower: float,
|
||
volume_ma_bars: int = 20,
|
||
volume_ratio_min: float = 1.3,
|
||
breakout_amp_min_pct: float = 0.03,
|
||
breakout_amp_max_pct: float = 0.5,
|
||
volume_rank: int | None = None,
|
||
volume_rank_total: int = 0,
|
||
volume_rank_max: int = 30,
|
||
) -> dict:
|
||
"""
|
||
rows:Gate K 线行(时间正序),最后一根应为最近一根已闭合 5m。
|
||
突破 K / 确认 K 固定为 rows[-2] / rows[-1](与 crypto_monitor_gate 一致)。
|
||
"""
|
||
out: dict = {"ok": False}
|
||
vol_lb = max(5, int(volume_ma_bars))
|
||
min_need = vol_lb + 3
|
||
if len(rows) < min_need:
|
||
out["reason"] = f"insufficient_candles have={len(rows)} need>={min_need}"
|
||
return out
|
||
|
||
br_row = rows[-2]
|
||
cf_row = rows[-1]
|
||
open_b = field_float(br_row, 1)
|
||
breakout_high = field_float(br_row, 2)
|
||
breakout_low = field_float(br_row, 3)
|
||
breakout_close = field_float(br_row, 4)
|
||
confirm_close = field_float(cf_row, 4)
|
||
if None in (open_b, breakout_high, breakout_low, breakout_close, confirm_close):
|
||
out["reason"] = "invalid_breakout_or_confirm_ohlc"
|
||
return out
|
||
|
||
open_b = float(open_b)
|
||
breakout_high = float(breakout_high)
|
||
breakout_low = float(breakout_low)
|
||
breakout_close = float(breakout_close)
|
||
confirm_close = float(confirm_close)
|
||
|
||
hist = rows[-vol_lb - 2 : -2]
|
||
prev_vol = [_row_vol(x) for x in hist if _row_vol(x) > 0]
|
||
avg20 = mean(prev_vol) if prev_vol else 0.0
|
||
vol_break = _row_vol(br_row)
|
||
vol_ok = vol_break > avg20 * volume_ratio_min if avg20 > 0 else False
|
||
|
||
direction = (direction or "long").strip().lower()
|
||
edge = float(upper) if direction == "long" else float(lower)
|
||
edge_ref = edge if edge > 0 else 1.0
|
||
|
||
if direction == "long":
|
||
breach_pct = (breakout_close - edge) / edge_ref * 100.0 if breakout_close > edge else 0.0
|
||
breakout_ok = breakout_close > float(upper)
|
||
confirm_ok = confirm_close > edge
|
||
else:
|
||
breach_pct = (edge - breakout_close) / edge_ref * 100.0 if breakout_close < edge else 0.0
|
||
breakout_ok = breakout_close < float(lower)
|
||
confirm_ok = confirm_close < edge
|
||
|
||
body_pct = abs(breakout_close - open_b) / open_b * 100.0 if open_b > 0 else 0.0
|
||
amp_ok = (
|
||
breakout_ok
|
||
and breach_pct > breakout_amp_min_pct
|
||
and breach_pct < breakout_amp_max_pct
|
||
)
|
||
confirm_ok = confirm_ok and breakout_ok
|
||
|
||
rank_ok = (volume_rank is not None) and (int(volume_rank) <= volume_rank_max)
|
||
|
||
try:
|
||
seg = rows[-48:] if len(rows) >= 48 else rows
|
||
hh = max(float(x[2]) for x in seg if field_float(x, 2) is not None)
|
||
ll = min(float(x[3]) for x in seg if field_float(x, 3) is not None)
|
||
swing4h_pct = ((hh - ll) / ll * 100.0) if ll > 0 else 0.0
|
||
except Exception:
|
||
swing4h_pct = 0.0
|
||
|
||
out.update(
|
||
{
|
||
"ok": all([vol_ok, amp_ok, breakout_ok, confirm_ok, rank_ok]),
|
||
"vol_ok": vol_ok,
|
||
"avg20": avg20,
|
||
"vol_break": vol_break,
|
||
"amp_ok": amp_ok,
|
||
"amp_pct": breach_pct,
|
||
"breach_pct": breach_pct,
|
||
"body_pct": body_pct,
|
||
"breakout_ok": breakout_ok,
|
||
"breakout_close": breakout_close,
|
||
"confirm_ok": confirm_ok,
|
||
"confirm_close": confirm_close,
|
||
"edge_price": edge,
|
||
"rank": volume_rank,
|
||
"rank_total": volume_rank_total,
|
||
"rank_ok": rank_ok,
|
||
"breakout_high": breakout_high,
|
||
"breakout_low": breakout_low,
|
||
"breakout_open": open_b,
|
||
"direction": direction,
|
||
"swing4h_pct": swing4h_pct,
|
||
"amp_min_pct": breakout_amp_min_pct,
|
||
"amp_max_pct": breakout_amp_max_pct,
|
||
}
|
||
)
|
||
return out
|
||
|
||
|
||
def key_hard_lines_from_checks(checks: dict, *, volume_ratio_min: float) -> list[str]:
|
||
breach = float(checks.get("breach_pct") if checks.get("breach_pct") is not None else checks.get("amp_pct") or 0)
|
||
body = float(checks.get("body_pct") or 0)
|
||
amp_min = float(checks.get("amp_min_pct") or 0.03)
|
||
amp_max = float(checks.get("amp_max_pct") or 0.5)
|
||
br_hi = checks.get("breakout_high")
|
||
br_lo = checks.get("breakout_low")
|
||
return [
|
||
f"量能:{'通过' if checks.get('vol_ok') else '不通过'}(突破K量 {round(float(checks.get('vol_break') or 0), 4)} / 前20均量 {round(float(checks.get('avg20') or 0), 4)},阈值{volume_ratio_min:g}x)",
|
||
f"突破价位:{'通过' if checks.get('breakout_ok') else '不通过'}(突破K收盘 {round(float(checks.get('breakout_close') or 0), 8)},关键位 {checks.get('edge_price')})",
|
||
(
|
||
f"突破越过关键位:{'通过' if checks.get('amp_ok') else '不通过'}"
|
||
f"(越过 {round(breach, 4)}%,K线实体 {round(body, 4)}%,要求越过 {amp_min:g}%~{amp_max:g}%)"
|
||
),
|
||
f"第二根确认:{'通过' if checks.get('confirm_ok') else '不通过'}(确认收盘 {checks.get('confirm_close')},关键位 {checks.get('edge_price')})",
|
||
f"日成交量排名:{'通过' if checks.get('rank_ok') else '不通过'}({checks.get('rank')}/{checks.get('rank_total')},要求前30)",
|
||
f"突破K极值:高 {br_hi}|低 {br_lo}(空→高点+外扩%|多→低点−外扩%)",
|
||
]
|