diff --git a/crypto_monitor_binance/app.py b/crypto_monitor_binance/app.py index 5ffbb77..63a252c 100644 --- a/crypto_monitor_binance/app.py +++ b/crypto_monitor_binance/app.py @@ -84,6 +84,7 @@ from key_monitor_lib import ( KEY_MONITOR_RS_TYPES, auto_amp_ok, auto_confirm_ok, + claim_rs_level_notify, detect_rs_box_break, format_auto_amp_line, format_auto_confirm_line, @@ -4329,16 +4330,20 @@ def _process_key_rs_level_alert(conn, row): max_n = int(tick["notify_max"]) interval = int(tick["interval_min"]) bar_ts = tick.get("bar_ts") + prior_count = int(tick.get("prior_count", notify_index - 1)) - if tick.get("need_claim_first"): - conn.execute( - "UPDATE key_monitors SET notification_count=1, direction=?, last_notified_at=?, last_rs_bar_ts=? " - "WHERE id=? AND COALESCE(notification_count,0)=0", - (br["direction"], app_now_str(), bar_ts, row["id"]), - ) - if conn.total_changes == 0: - return - conn.commit() + notified_at = app_now_str() + if not claim_rs_level_notify( + conn, + row["id"], + notify_index, + br["direction"], + notified_at, + bar_ts, + prior_count=prior_count, + ): + return + conn.commit() trigger_time = ms_to_app_local_str(int(ts)) if ts else app_now_str() msg = build_wechat_rs_level_message( @@ -4358,9 +4363,8 @@ def _process_key_rs_level_alert(conn, row): ) send_wechat_msg(msg) conn.execute( - "UPDATE key_monitors SET direction=?, notification_count=?, last_notified_at=?, " - "last_alert_message=?, last_rs_bar_ts=? WHERE id=?", - (br["direction"], notify_index, app_now_str(), msg, bar_ts, row["id"]), + "UPDATE key_monitors SET last_alert_message=? WHERE id=?", + (msg, row["id"]), ) conn.commit() if notify_index >= max_n: diff --git a/crypto_monitor_gate/app.py b/crypto_monitor_gate/app.py index 99855f4..7990586 100644 --- a/crypto_monitor_gate/app.py +++ b/crypto_monitor_gate/app.py @@ -85,6 +85,7 @@ from key_monitor_lib import ( KEY_MONITOR_RS_TYPES, auto_amp_ok, auto_confirm_ok, + claim_rs_level_notify, detect_rs_box_break, format_auto_amp_line, format_auto_confirm_line, @@ -4287,16 +4288,20 @@ def _process_key_rs_level_alert(conn, row): max_n = int(tick["notify_max"]) interval = int(tick["interval_min"]) bar_ts = tick.get("bar_ts") + prior_count = int(tick.get("prior_count", notify_index - 1)) - if tick.get("need_claim_first"): - conn.execute( - "UPDATE key_monitors SET notification_count=1, direction=?, last_notified_at=?, last_rs_bar_ts=? " - "WHERE id=? AND COALESCE(notification_count,0)=0", - (br["direction"], app_now_str(), bar_ts, row["id"]), - ) - if conn.total_changes == 0: - return - conn.commit() + notified_at = app_now_str() + if not claim_rs_level_notify( + conn, + row["id"], + notify_index, + br["direction"], + notified_at, + bar_ts, + prior_count=prior_count, + ): + return + conn.commit() trigger_time = ms_to_app_local_str(int(ts)) if ts else app_now_str() msg = build_wechat_rs_level_message( @@ -4316,9 +4321,8 @@ def _process_key_rs_level_alert(conn, row): ) send_wechat_msg(msg) conn.execute( - "UPDATE key_monitors SET direction=?, notification_count=?, last_notified_at=?, " - "last_alert_message=?, last_rs_bar_ts=? WHERE id=?", - (br["direction"], notify_index, app_now_str(), msg, bar_ts, row["id"]), + "UPDATE key_monitors SET last_alert_message=? WHERE id=?", + (msg, row["id"]), ) conn.commit() if notify_index >= max_n: diff --git a/crypto_monitor_okx/app.py b/crypto_monitor_okx/app.py index 613d8b9..029522c 100644 --- a/crypto_monitor_okx/app.py +++ b/crypto_monitor_okx/app.py @@ -85,6 +85,7 @@ from key_monitor_lib import ( KEY_MONITOR_RS_TYPES, auto_amp_ok, auto_confirm_ok, + claim_rs_level_notify, detect_rs_box_break, format_auto_amp_line, format_auto_confirm_line, @@ -4115,16 +4116,20 @@ def _process_key_rs_level_alert(conn, row): max_n = int(tick["notify_max"]) interval = int(tick["interval_min"]) bar_ts = tick.get("bar_ts") + prior_count = int(tick.get("prior_count", notify_index - 1)) - if tick.get("need_claim_first"): - conn.execute( - "UPDATE key_monitors SET notification_count=1, direction=?, last_notified_at=?, last_rs_bar_ts=? " - "WHERE id=? AND COALESCE(notification_count,0)=0", - (br["direction"], app_now_str(), bar_ts, row["id"]), - ) - if conn.total_changes == 0: - return - conn.commit() + notified_at = app_now_str() + if not claim_rs_level_notify( + conn, + row["id"], + notify_index, + br["direction"], + notified_at, + bar_ts, + prior_count=prior_count, + ): + return + conn.commit() trigger_time = ms_to_app_local_str(int(ts)) if ts else app_now_str() msg = build_wechat_rs_level_message( @@ -4145,9 +4150,8 @@ def _process_key_rs_level_alert(conn, row): ) send_wechat_msg(msg) conn.execute( - "UPDATE key_monitors SET direction=?, notification_count=?, last_notified_at=?, " - "last_alert_message=?, last_rs_bar_ts=? WHERE id=?", - (br["direction"], notify_index, app_now_str(), msg, bar_ts, row["id"]), + "UPDATE key_monitors SET last_alert_message=? WHERE id=?", + (msg, row["id"]), ) conn.commit() if notify_index >= max_n: diff --git a/key_monitor_lib.py b/key_monitor_lib.py index 600edee..2c400ef 100644 --- a/key_monitor_lib.py +++ b/key_monitor_lib.py @@ -115,6 +115,56 @@ def rs_break_infer_from_close(close: float, upper: float, lower: float) -> dict[ } +def _parse_notify_datetime(raw: Optional[str]) -> Optional[datetime]: + s = str(raw or "").strip() + if not s: + return None + try: + dt = datetime.fromisoformat(s.replace("Z", "+00:00")) + if dt.tzinfo is not None: + dt = dt.replace(tzinfo=None) + return dt + except Exception: + pass + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"): + try: + return datetime.strptime(s[:19], fmt) + except Exception: + continue + return None + + +def claim_rs_level_notify( + conn: Any, + monitor_id: int, + notify_index: int, + direction: str, + notified_at: str, + bar_ts: Optional[int], + *, + prior_count: Optional[int] = None, +) -> bool: + """ + 原子占位:仅在 notification_count 仍为 prior_count 时推进到 notify_index。 + 须在发送企业微信之前调用并 commit,避免 (2/3) 重复刷屏。 + """ + prior = int(prior_count if prior_count is not None else notify_index - 1) + if prior < 0 or notify_index != prior + 1: + return False + bar_val: Optional[int] = None + if bar_ts is not None: + try: + bar_val = int(bar_ts) + except (TypeError, ValueError): + bar_val = None + cur = conn.execute( + "UPDATE key_monitors SET notification_count=?, direction=?, last_notified_at=?, last_rs_bar_ts=? " + "WHERE id=? AND COALESCE(notification_count,0)=?", + (notify_index, direction, notified_at, bar_val, int(monitor_id), prior), + ) + return int(cur.rowcount or 0) > 0 + + def parse_last_rs_bar_ts(row: Any) -> Optional[int]: if row is None: return None @@ -142,7 +192,7 @@ def run_rs_level_alert_tick( ) -> Optional[dict[str, Any]]: """ 判定本轮回合是否应推送阻力/支撑提醒。 - 首条:仅在新 5m 闭合 K 越线时触发,并 need_claim_first 防 3 秒轮询刷屏。 + 首条:仅在新闭合 K 越线时触发;发送前须 claim_rs_level_notify 占位防轮询/多进程重复。 """ up, lo = float(row["upper"]), float(row["lower"]) if up <= lo: @@ -170,10 +220,10 @@ def run_rs_level_alert_tick( return { "break_info": br, "notify_index": 1, + "prior_count": 0, "notify_max": max_n, "interval_min": interval, "bar_ts": bar_ts_i, - "need_claim_first": True, } if not notify_interval_elapsed(row["last_notified_at"], interval, now_dt): @@ -184,10 +234,10 @@ def run_rs_level_alert_tick( return { "break_info": br, "notify_index": count + 1, + "prior_count": count, "notify_max": max_n, "interval_min": interval, "bar_ts": bar_ts_i, - "need_claim_first": False, } @@ -223,13 +273,10 @@ def notify_interval_elapsed( now_dt: datetime, ) -> bool: if not last_notified_at: - return True - try: - last_dt = datetime.fromisoformat(str(last_notified_at).replace("Z", "+00:00")) - if last_dt.tzinfo is not None: - last_dt = last_dt.replace(tzinfo=None) - except Exception: - return True + return False + last_dt = _parse_notify_datetime(last_notified_at) + if last_dt is None: + return False return (now_dt - last_dt).total_seconds() >= max(1, int(interval_min)) * 60 diff --git a/tests/test_key_monitor_rs_alert.py b/tests/test_key_monitor_rs_alert.py new file mode 100644 index 0000000..d2ac828 --- /dev/null +++ b/tests/test_key_monitor_rs_alert.py @@ -0,0 +1,86 @@ +"""阻力/支撑提醒:占位与间隔防重复推送。""" +from __future__ import annotations + +import sqlite3 +import unittest +from datetime import datetime, timedelta + +from key_monitor_lib import ( + claim_rs_level_notify, + notify_interval_elapsed, + run_rs_level_alert_tick, +) + + +def _row(**kwargs): + base = { + "upper": 2.174, + "lower": 1.694, + "notification_count": 0, + "max_notify": 3, + "notify_interval_min": 5, + "direction": "watch", + "last_notified_at": None, + "last_rs_bar_ts": None, + } + base.update(kwargs) + return base + + +class TestRsLevelAlertClaim(unittest.TestCase): + def setUp(self): + self.conn = sqlite3.connect(":memory:") + self.conn.execute( + "CREATE TABLE key_monitors (" + "id INTEGER PRIMARY KEY, notification_count INTEGER DEFAULT 0, " + "direction TEXT, last_notified_at TEXT, last_rs_bar_ts INTEGER)" + ) + self.conn.execute( + "INSERT INTO key_monitors (id, notification_count, direction) VALUES (1, 0, 'watch')" + ) + self.conn.commit() + + def test_claim_advances_once_per_index(self): + ok1 = claim_rs_level_notify( + self.conn, 1, 1, "long", "2026-06-02 00:25:00", 1000, prior_count=0 + ) + self.conn.commit() + self.assertTrue(ok1) + ok_dup = claim_rs_level_notify( + self.conn, 1, 1, "long", "2026-06-02 00:25:03", 1000, prior_count=0 + ) + self.assertFalse(ok_dup) + ok2 = claim_rs_level_notify( + self.conn, 1, 2, "long", "2026-06-02 00:30:00", 1000, prior_count=1 + ) + self.conn.commit() + self.assertTrue(ok2) + row = self.conn.execute( + "SELECT notification_count FROM key_monitors WHERE id=1" + ).fetchone() + self.assertEqual(row[0], 2) + + def test_second_push_requires_interval(self): + now = datetime(2026, 6, 2, 0, 26, 0) + row = _row( + notification_count=1, + direction="long", + last_notified_at="2026-06-02 00:25:00", + ) + tick = run_rs_level_alert_tick(row, 2.18, 1000, now, default_max_notify=3, default_interval_min=5) + self.assertIsNone(tick) + later = datetime(2026, 6, 2, 0, 30, 1) + tick2 = run_rs_level_alert_tick( + row, 2.18, 1000, later, default_max_notify=3, default_interval_min=5 + ) + self.assertIsNotNone(tick2) + self.assertEqual(tick2["notify_index"], 2) + self.assertEqual(tick2["prior_count"], 1) + + def test_notify_interval_invalid_timestamp_does_not_spam(self): + now = datetime(2026, 6, 2, 1, 0, 0) + self.assertFalse(notify_interval_elapsed("not-a-date", 5, now)) + + +if __name__ == "__main__": + unittest.main()