@@ -4,7 +4,12 @@ from __future__ import annotations
from statistics import mean
from . candle_rows import rows_to_ohlcv
from . candle_rows import field_float
def _row_vol ( item : list [ str ] ) - > float :
v = field_float ( item , 5 )
return float ( v ) if v is not None else 0.0
def key_hard_checks_from_rows (
@@ -21,45 +26,67 @@ def key_hard_checks_from_rows(
volume_rank_total : int = 0 ,
volume_rank_max : int = 30 ,
) - > dict :
""" rows:Gate K 线行,最后一根应为最近一根已闭合 5m(调用方负责剔除未闭合)。 """
"""
rows:Gate K 线行(时间正序),最后一根应为最近一根已闭合 5m。
突破 K / 确认 K 固定为 rows[-2] / rows[-1](与 crypto_monitor_gate 一致)。
"""
out : dict = { " ok " : False }
_ , ah , al , ac , av = rows_to_ohlcv ( rows )
vol_lb = max ( 5 , int ( volume_ma_bars ) )
min_need = vol_lb + 3
if len ( ac ) < min_need :
out [ " reason " ] = f " insufficient_candles have= { len ( ac ) } need>= { min_need } "
if len ( rows ) < min_need :
out [ " reason " ] = f " insufficient_candles have= { len ( rows ) } need>= { min_need } "
return out
breakout_close = float ( ac [ - 2 ] )
confirm_close = float ( ac [ - 1 ] )
breakout_high = float ( ah [ - 2 ] )
breakout_low = float ( al [ - 2 ] )
try :
open_b = float ( rows [ - 2 ] [ 1 ] )
except ( IndexError , TypeError , ValueError ) :
open_b = breakout_close
br_row = rows [ - 2 ]
cf_row = rows [ - 1 ]
open_b = field_float ( br_row , 1 )
breakout_high = field_float ( br_row , 2 )
breakout_low = field_float ( br_row , 3 )
breakout_close = field_float ( br_row , 4 )
confirm_close = field_float ( cf_row , 4 )
if None in ( open_b , breakout_high , breakout_low , breakout_close , confirm_close ) :
out [ " reason " ] = " invalid_breakout_or_confirm_ohlc "
return out
prev_vol = av [ - vol_lb - 2 : - 2 ]
open_b = float ( open_b )
breakout_high = float ( breakout_high )
breakout_low = float ( breakout_low )
breakout_close = float ( breakout_close )
confirm_close = float ( confirm_close )
hist = rows [ - vol_lb - 2 : - 2 ]
prev_vol = [ _row_vol ( x ) for x in hist if _row_vol ( x ) > 0 ]
avg20 = mean ( prev_vol ) if prev_vol else 0.0
vol_break = float ( av [ - 2 ] )
vol_break = _row_vol ( br_row )
vol_ok = vol_break > avg20 * volume_ratio_min if avg20 > 0 else False
amp_pct = abs ( breakout_close - open_b ) / open_b * 100 if open_b > 0 else 0.0
amp_ok = ( amp_pct > breakout_amp_min_pct ) and ( amp_pct < breakout_amp_max_pct )
direction = ( direction or " long " ) . strip ( ) . lower ( )
edge = float ( upper ) if direction == " long " else float ( lower )
breakout_ok = ( breakout_close > float ( upper ) ) if direction == " long " else ( breakout_close < float ( lower ) )
confirm_ok_raw = ( confirm_close > edge ) if direction == " long " else ( confirm_close < edge )
amp_ok = amp_ok and breakout_ok
confirm_ok = confirm_ok_raw and breakout_ok
edge_ref = edge if edge > 0 else 1.0
if direction == " long " :
breach_pct = ( breakout_close - edge ) / edge_ref * 100.0 if breakout_close > edge else 0.0
breakout_ok = breakout_close > float ( upper )
confirm_ok = confirm_close > edge
else :
breach_pct = ( edge - breakout_close ) / edge_ref * 100.0 if breakout_close < edge else 0.0
breakout_ok = breakout_close < float ( lower )
confirm_ok = confirm_close < edge
body_pct = abs ( breakout_close - open_b ) / open_b * 100.0 if open_b > 0 else 0.0
amp_ok = (
breakout_ok
and breach_pct > breakout_amp_min_pct
and breach_pct < breakout_amp_max_pct
)
confirm_ok = confirm_ok and breakout_ok
rank_ok = ( volume_rank is not None ) and ( int ( volume_rank ) < = volume_rank_max )
try :
seg = rows [ - 48 : ] if len ( rows ) > = 48 else rows
hh = max ( float ( x [ 2 ] ) for x in seg )
ll = min ( float ( x [ 3 ] ) for x in seg )
hh = max ( float ( x [ 2 ] ) for x in seg if field_float ( x , 2 ) is not None )
ll = min ( float ( x [ 3 ] ) for x in seg if field_float ( x , 3 ) is not None )
swing4h_pct = ( ( hh - ll ) / ll * 100.0 ) if ll > 0 else 0.0
except Exception :
swing4h_pct = 0.0
@@ -71,7 +98,9 @@ def key_hard_checks_from_rows(
" avg20 " : avg20 ,
" vol_break " : vol_break ,
" amp_ok " : amp_ok ,
" amp_pct " : amp_pct ,
" amp_pct " : breach_pct ,
" breach_pct " : breach_pct ,
" body_pct " : body_pct ,
" breakout_ok " : breakout_ok ,
" breakout_close " : breakout_close ,
" confirm_ok " : confirm_ok ,
@@ -82,18 +111,27 @@ def key_hard_checks_from_rows(
" rank_ok " : rank_ok ,
" breakout_high " : breakout_high ,
" breakout_low " : breakout_low ,
" swing4h_pct " : swing4h_pct ,
" breakout_open " : open_b ,
" direction " : direction ,
" swing4h_pct " : swing4h_pct ,
}
)
return out
def key_hard_lines_from_checks ( checks : dict , * , volume_ratio_min : float ) - > list [ str ] :
breach = float ( checks . get ( " breach_pct " ) if checks . get ( " breach_pct " ) is not None else checks . get ( " amp_pct " ) or 0 )
body = float ( checks . get ( " body_pct " ) or 0 )
br_hi = checks . get ( " breakout_high " )
br_lo = checks . get ( " breakout_low " )
return [
f " 量能: { ' 通过 ' if checks . get ( ' vol_ok ' ) else ' 不通过 ' } (突破K量 { round ( float ( checks . get ( ' vol_break ' ) or 0 ) , 4 ) } / 前20均量 { round ( float ( checks . get ( ' avg20 ' ) or 0 ) , 4 ) } ,阈值 { volume_ratio_min : g } x) " ,
f " 突破价位: { ' 通过 ' if checks . get ( ' breakout_ok ' ) else ' 不通过 ' } (突破K收盘 { round ( float ( checks . get ( ' breakout_close ' ) or 0 ) , 8 ) } ,关键位 { checks . get ( ' edge_price ' ) } ) " ,
f " 突破K幅度: { ' 通过 ' if checks . get ( ' amp_ok ' ) else ' 不通过 ' } ( { round ( float ( checks . get ( ' amp_pct ' ) or 0 ) , 4 ) } %,要求0.03%~0.5%) " ,
(
f " 突破越过关键位: { ' 通过 ' if checks . get ( ' amp_ok ' ) else ' 不通过 ' } "
f " (越过 { round ( breach , 4 ) } %, K线实体 { round ( body , 4 ) } %,要求越过 0.03%~0.5%) "
) ,
f " 第二根确认: { ' 通过 ' if checks . get ( ' confirm_ok ' ) else ' 不通过 ' } (确认收盘 { checks . get ( ' confirm_close ' ) } ,关键位 { checks . get ( ' edge_price ' ) } ) " ,
f " 日成交量排名: { ' 通过 ' if checks . get ( ' rank_ok ' ) else ' 不通过 ' } ( { checks . get ( ' rank ' ) } / { checks . get ( ' rank_total ' ) } ,要求前30) " ,
f " 突破K极值:高 { br_hi } |低 { br_lo } (止损据此 ± 外扩%) " ,
]