修复支撑阻力企业微信重复

This commit is contained in:
dekun
2026-06-02 06:55:57 +08:00
parent 9115523df7
commit ef99fb6c2e
5 changed files with 191 additions and 46 deletions
+14 -10
View File
@@ -84,6 +84,7 @@ from key_monitor_lib import (
KEY_MONITOR_RS_TYPES,
auto_amp_ok,
auto_confirm_ok,
claim_rs_level_notify,
detect_rs_box_break,
format_auto_amp_line,
format_auto_confirm_line,
@@ -4329,14 +4330,18 @@ def _process_key_rs_level_alert(conn, row):
max_n = int(tick["notify_max"])
interval = int(tick["interval_min"])
bar_ts = tick.get("bar_ts")
prior_count = int(tick.get("prior_count", notify_index - 1))
if tick.get("need_claim_first"):
conn.execute(
"UPDATE key_monitors SET notification_count=1, direction=?, last_notified_at=?, last_rs_bar_ts=? "
"WHERE id=? AND COALESCE(notification_count,0)=0",
(br["direction"], app_now_str(), bar_ts, row["id"]),
)
if conn.total_changes == 0:
notified_at = app_now_str()
if not claim_rs_level_notify(
conn,
row["id"],
notify_index,
br["direction"],
notified_at,
bar_ts,
prior_count=prior_count,
):
return
conn.commit()
@@ -4358,9 +4363,8 @@ def _process_key_rs_level_alert(conn, row):
)
send_wechat_msg(msg)
conn.execute(
"UPDATE key_monitors SET direction=?, notification_count=?, last_notified_at=?, "
"last_alert_message=?, last_rs_bar_ts=? WHERE id=?",
(br["direction"], notify_index, app_now_str(), msg, bar_ts, row["id"]),
"UPDATE key_monitors SET last_alert_message=? WHERE id=?",
(msg, row["id"]),
)
conn.commit()
if notify_index >= max_n:
+14 -10
View File
@@ -85,6 +85,7 @@ from key_monitor_lib import (
KEY_MONITOR_RS_TYPES,
auto_amp_ok,
auto_confirm_ok,
claim_rs_level_notify,
detect_rs_box_break,
format_auto_amp_line,
format_auto_confirm_line,
@@ -4287,14 +4288,18 @@ def _process_key_rs_level_alert(conn, row):
max_n = int(tick["notify_max"])
interval = int(tick["interval_min"])
bar_ts = tick.get("bar_ts")
prior_count = int(tick.get("prior_count", notify_index - 1))
if tick.get("need_claim_first"):
conn.execute(
"UPDATE key_monitors SET notification_count=1, direction=?, last_notified_at=?, last_rs_bar_ts=? "
"WHERE id=? AND COALESCE(notification_count,0)=0",
(br["direction"], app_now_str(), bar_ts, row["id"]),
)
if conn.total_changes == 0:
notified_at = app_now_str()
if not claim_rs_level_notify(
conn,
row["id"],
notify_index,
br["direction"],
notified_at,
bar_ts,
prior_count=prior_count,
):
return
conn.commit()
@@ -4316,9 +4321,8 @@ def _process_key_rs_level_alert(conn, row):
)
send_wechat_msg(msg)
conn.execute(
"UPDATE key_monitors SET direction=?, notification_count=?, last_notified_at=?, "
"last_alert_message=?, last_rs_bar_ts=? WHERE id=?",
(br["direction"], notify_index, app_now_str(), msg, bar_ts, row["id"]),
"UPDATE key_monitors SET last_alert_message=? WHERE id=?",
(msg, row["id"]),
)
conn.commit()
if notify_index >= max_n:
+14 -10
View File
@@ -85,6 +85,7 @@ from key_monitor_lib import (
KEY_MONITOR_RS_TYPES,
auto_amp_ok,
auto_confirm_ok,
claim_rs_level_notify,
detect_rs_box_break,
format_auto_amp_line,
format_auto_confirm_line,
@@ -4115,14 +4116,18 @@ def _process_key_rs_level_alert(conn, row):
max_n = int(tick["notify_max"])
interval = int(tick["interval_min"])
bar_ts = tick.get("bar_ts")
prior_count = int(tick.get("prior_count", notify_index - 1))
if tick.get("need_claim_first"):
conn.execute(
"UPDATE key_monitors SET notification_count=1, direction=?, last_notified_at=?, last_rs_bar_ts=? "
"WHERE id=? AND COALESCE(notification_count,0)=0",
(br["direction"], app_now_str(), bar_ts, row["id"]),
)
if conn.total_changes == 0:
notified_at = app_now_str()
if not claim_rs_level_notify(
conn,
row["id"],
notify_index,
br["direction"],
notified_at,
bar_ts,
prior_count=prior_count,
):
return
conn.commit()
@@ -4145,9 +4150,8 @@ def _process_key_rs_level_alert(conn, row):
)
send_wechat_msg(msg)
conn.execute(
"UPDATE key_monitors SET direction=?, notification_count=?, last_notified_at=?, "
"last_alert_message=?, last_rs_bar_ts=? WHERE id=?",
(br["direction"], notify_index, app_now_str(), msg, bar_ts, row["id"]),
"UPDATE key_monitors SET last_alert_message=? WHERE id=?",
(msg, row["id"]),
)
conn.commit()
if notify_index >= max_n:
+57 -10
View File
@@ -115,6 +115,56 @@ def rs_break_infer_from_close(close: float, upper: float, lower: float) -> dict[
}
def _parse_notify_datetime(raw: Optional[str]) -> Optional[datetime]:
s = str(raw or "").strip()
if not s:
return None
try:
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
if dt.tzinfo is not None:
dt = dt.replace(tzinfo=None)
return dt
except Exception:
pass
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S"):
try:
return datetime.strptime(s[:19], fmt)
except Exception:
continue
return None
def claim_rs_level_notify(
conn: Any,
monitor_id: int,
notify_index: int,
direction: str,
notified_at: str,
bar_ts: Optional[int],
*,
prior_count: Optional[int] = None,
) -> bool:
"""
原子占位:仅在 notification_count 仍为 prior_count 时推进到 notify_index。
须在发送企业微信之前调用并 commit,避免 (2/3) 重复刷屏。
"""
prior = int(prior_count if prior_count is not None else notify_index - 1)
if prior < 0 or notify_index != prior + 1:
return False
bar_val: Optional[int] = None
if bar_ts is not None:
try:
bar_val = int(bar_ts)
except (TypeError, ValueError):
bar_val = None
cur = conn.execute(
"UPDATE key_monitors SET notification_count=?, direction=?, last_notified_at=?, last_rs_bar_ts=? "
"WHERE id=? AND COALESCE(notification_count,0)=?",
(notify_index, direction, notified_at, bar_val, int(monitor_id), prior),
)
return int(cur.rowcount or 0) > 0
def parse_last_rs_bar_ts(row: Any) -> Optional[int]:
if row is None:
return None
@@ -142,7 +192,7 @@ def run_rs_level_alert_tick(
) -> Optional[dict[str, Any]]:
"""
判定本轮回合是否应推送阻力/支撑提醒。
首条:仅在新 5m 闭合 K 越线时触发,并 need_claim_first 防 3 秒轮询刷屏
首条:仅在新闭合 K 越线时触发;发送前须 claim_rs_level_notify 占位防轮询/多进程重复
"""
up, lo = float(row["upper"]), float(row["lower"])
if up <= lo:
@@ -170,10 +220,10 @@ def run_rs_level_alert_tick(
return {
"break_info": br,
"notify_index": 1,
"prior_count": 0,
"notify_max": max_n,
"interval_min": interval,
"bar_ts": bar_ts_i,
"need_claim_first": True,
}
if not notify_interval_elapsed(row["last_notified_at"], interval, now_dt):
@@ -184,10 +234,10 @@ def run_rs_level_alert_tick(
return {
"break_info": br,
"notify_index": count + 1,
"prior_count": count,
"notify_max": max_n,
"interval_min": interval,
"bar_ts": bar_ts_i,
"need_claim_first": False,
}
@@ -223,13 +273,10 @@ def notify_interval_elapsed(
now_dt: datetime,
) -> bool:
if not last_notified_at:
return True
try:
last_dt = datetime.fromisoformat(str(last_notified_at).replace("Z", "+00:00"))
if last_dt.tzinfo is not None:
last_dt = last_dt.replace(tzinfo=None)
except Exception:
return True
return False
last_dt = _parse_notify_datetime(last_notified_at)
if last_dt is None:
return False
return (now_dt - last_dt).total_seconds() >= max(1, int(interval_min)) * 60
+86
View File
@@ -0,0 +1,86 @@
"""阻力/支撑提醒:占位与间隔防重复推送。"""
from __future__ import annotations
import sqlite3
import unittest
from datetime import datetime, timedelta
from key_monitor_lib import (
claim_rs_level_notify,
notify_interval_elapsed,
run_rs_level_alert_tick,
)
def _row(**kwargs):
base = {
"upper": 2.174,
"lower": 1.694,
"notification_count": 0,
"max_notify": 3,
"notify_interval_min": 5,
"direction": "watch",
"last_notified_at": None,
"last_rs_bar_ts": None,
}
base.update(kwargs)
return base
class TestRsLevelAlertClaim(unittest.TestCase):
def setUp(self):
self.conn = sqlite3.connect(":memory:")
self.conn.execute(
"CREATE TABLE key_monitors ("
"id INTEGER PRIMARY KEY, notification_count INTEGER DEFAULT 0, "
"direction TEXT, last_notified_at TEXT, last_rs_bar_ts INTEGER)"
)
self.conn.execute(
"INSERT INTO key_monitors (id, notification_count, direction) VALUES (1, 0, 'watch')"
)
self.conn.commit()
def test_claim_advances_once_per_index(self):
ok1 = claim_rs_level_notify(
self.conn, 1, 1, "long", "2026-06-02 00:25:00", 1000, prior_count=0
)
self.conn.commit()
self.assertTrue(ok1)
ok_dup = claim_rs_level_notify(
self.conn, 1, 1, "long", "2026-06-02 00:25:03", 1000, prior_count=0
)
self.assertFalse(ok_dup)
ok2 = claim_rs_level_notify(
self.conn, 1, 2, "long", "2026-06-02 00:30:00", 1000, prior_count=1
)
self.conn.commit()
self.assertTrue(ok2)
row = self.conn.execute(
"SELECT notification_count FROM key_monitors WHERE id=1"
).fetchone()
self.assertEqual(row[0], 2)
def test_second_push_requires_interval(self):
now = datetime(2026, 6, 2, 0, 26, 0)
row = _row(
notification_count=1,
direction="long",
last_notified_at="2026-06-02 00:25:00",
)
tick = run_rs_level_alert_tick(row, 2.18, 1000, now, default_max_notify=3, default_interval_min=5)
self.assertIsNone(tick)
later = datetime(2026, 6, 2, 0, 30, 1)
tick2 = run_rs_level_alert_tick(
row, 2.18, 1000, later, default_max_notify=3, default_interval_min=5
)
self.assertIsNotNone(tick2)
self.assertEqual(tick2["notify_index"], 2)
self.assertEqual(tick2["prior_count"], 1)
def test_notify_interval_invalid_timestamp_does_not_spam(self):
now = datetime(2026, 6, 2, 1, 0, 0)
self.assertFalse(notify_interval_elapsed("not-a-date", 5, now))
if __name__ == "__main__":
unittest.main()