Files
crypto_monitor/hub_macro_calendar_lib.py
dekun e470c5952f feat(hub): add macro calendar for pre-release risk alerts
Manual FOMC/CPI/employment entries in settings drive ±1h monitor banners without touching exchange instances.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-18 11:52:30 +08:00

312 lines
10 KiB
Python

"""中控宏观关键数据日历:手动录入 FOMC / CPI / 非农档发布时间,±1h 风控前置窗口。"""
from __future__ import annotations
import os
import sqlite3
import time
from datetime import datetime
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
from hub_symbol_archive_lib import parse_wall_clock_ms
DISPLAY_TZ = ZoneInfo(os.getenv("APP_TIMEZONE", "Asia/Shanghai"))
MACRO_EVENT_TYPES = ("fomc", "cpi", "employment")
MACRO_EVENT_LABELS: dict[str, str] = {
"fomc": "FOMC 联邦基金利率",
"cpi": "美国 CPI 通胀",
"employment": "就业与劳工数据",
}
WINDOW_BEFORE_MS = int(os.getenv("HUB_MACRO_WINDOW_BEFORE_SEC", str(3600))) * 1000
WINDOW_AFTER_MS = int(os.getenv("HUB_MACRO_WINDOW_AFTER_SEC", str(3600))) * 1000
IMMINENT_BEFORE_MS = int(os.getenv("HUB_MACRO_IMMINENT_BEFORE_SEC", str(1800))) * 1000
LIST_FUTURE_DAYS = int(os.getenv("HUB_MACRO_LIST_FUTURE_DAYS", "60"))
def default_db_path() -> Path:
raw = (os.getenv("HUB_MACRO_CALENDAR_DB_PATH") or "").strip()
if raw:
return Path(raw)
hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data"
hub_dir.mkdir(parents=True, exist_ok=True)
return hub_dir / "hub_macro_calendar.db"
def _connect(db_path: Path | None = None) -> sqlite3.Connection:
path = db_path or default_db_path()
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path), timeout=30, isolation_level=None)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def init_db(db_path: Path | None = None) -> None:
conn = _connect(db_path)
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS macro_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
event_type TEXT NOT NULL,
event_at_ms INTEGER NOT NULL,
note TEXT NOT NULL DEFAULT '',
created_at_ms INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL
)
"""
)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_macro_events_at ON macro_events(event_at_ms)"
)
finally:
conn.close()
def normalize_event_type(raw: str) -> str:
key = (raw or "").strip().lower()
if key not in MACRO_EVENT_TYPES:
raise ValueError(f"事件类型须为: {', '.join(MACRO_EVENT_LABELS.values())}")
return key
def parse_event_at_ms(raw: Any) -> int:
ms = parse_wall_clock_ms(raw, tz=DISPLAY_TZ)
if ms is None:
raise ValueError("发布时间格式错误,请使用 YYYY-MM-DD HH:MM 或 YYYY-MM-DDTHH:MM")
return int(ms)
def format_event_at(ms: int) -> str:
dt = datetime.fromtimestamp(ms / 1000, tz=DISPLAY_TZ)
return dt.strftime("%Y-%m-%d %H:%M")
def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]:
ms = int(row["event_at_ms"])
et = str(row["event_type"])
return {
"id": int(row["id"]),
"event_type": et,
"event_type_label": MACRO_EVENT_LABELS.get(et, et),
"event_at_ms": ms,
"event_at": format_event_at(ms),
"note": str(row["note"] or ""),
"created_at_ms": int(row["created_at_ms"]),
"updated_at_ms": int(row["updated_at_ms"]),
}
def _window_bounds(event_at_ms: int) -> tuple[int, int]:
start = int(event_at_ms) - WINDOW_BEFORE_MS
end = int(event_at_ms) + WINDOW_AFTER_MS
return start, end
def enrich_alert(row: dict[str, Any], now_ms: int | None = None) -> dict[str, Any] | None:
now = int(now_ms if now_ms is not None else time.time() * 1000)
event_at_ms = int(row["event_at_ms"])
window_start, window_end = _window_bounds(event_at_ms)
if now < window_start or now > window_end:
return None
imminent = now >= (event_at_ms - IMMINENT_BEFORE_MS) and now <= window_end
mins_to_event = max(0, int((event_at_ms - now) / 60000))
mins_from_event = max(0, int((now - event_at_ms) / 60000))
return {
**row,
"window_start_ms": window_start,
"window_end_ms": window_end,
"window_start": format_event_at(window_start),
"window_end": format_event_at(window_end),
"phase": "imminent" if imminent else "window",
"phase_label": "即将发布" if imminent and now < event_at_ms else "高波动窗口",
"minutes_to_event": mins_to_event if now < event_at_ms else 0,
"minutes_from_event": mins_from_event if now >= event_at_ms else 0,
}
def list_events(
*,
now_ms: int | None = None,
include_expired_hours: int = 24,
db_path: Path | None = None,
) -> list[dict[str, Any]]:
init_db(db_path)
now = int(now_ms if now_ms is not None else time.time() * 1000)
horizon = now + LIST_FUTURE_DAYS * 86400 * 1000
expired_cutoff = now - max(0, int(include_expired_hours)) * 3600 * 1000 - WINDOW_AFTER_MS
conn = _connect(db_path)
try:
rows = conn.execute(
"""
SELECT * FROM macro_events
WHERE event_at_ms >= ? AND event_at_ms <= ?
ORDER BY event_at_ms ASC, id ASC
""",
(expired_cutoff, horizon),
).fetchall()
return [_row_to_dict(r) for r in rows]
finally:
conn.close()
def get_event(event_id: int, db_path: Path | None = None) -> dict[str, Any] | None:
init_db(db_path)
conn = _connect(db_path)
try:
row = conn.execute("SELECT * FROM macro_events WHERE id=?", (int(event_id),)).fetchone()
return _row_to_dict(row) if row else None
finally:
conn.close()
def _assert_no_duplicate(
conn: sqlite3.Connection,
event_type: str,
event_at_ms: int,
*,
exclude_id: int | None = None,
) -> None:
if exclude_id is None:
row = conn.execute(
"SELECT id FROM macro_events WHERE event_type=? AND event_at_ms=? LIMIT 1",
(event_type, int(event_at_ms)),
).fetchone()
else:
row = conn.execute(
"""
SELECT id FROM macro_events
WHERE event_type=? AND event_at_ms=? AND id<>?
LIMIT 1
""",
(event_type, int(event_at_ms), int(exclude_id)),
).fetchone()
if row:
raise ValueError("同类型、同发布时间的记录已存在")
def create_event(
event_type: str,
event_at: Any,
*,
note: str = "",
db_path: Path | None = None,
) -> dict[str, Any]:
init_db(db_path)
et = normalize_event_type(event_type)
event_at_ms = parse_event_at_ms(event_at)
note_s = str(note or "").strip()[:500]
now_ms = int(time.time() * 1000)
conn = _connect(db_path)
try:
_assert_no_duplicate(conn, et, event_at_ms)
cur = conn.execute(
"""
INSERT INTO macro_events (event_type, event_at_ms, note, created_at_ms, updated_at_ms)
VALUES (?, ?, ?, ?, ?)
""",
(et, event_at_ms, note_s, now_ms, now_ms),
)
eid = int(cur.lastrowid)
finally:
conn.close()
row = get_event(eid, db_path=db_path)
assert row is not None
return row
def update_event(
event_id: int,
*,
event_type: str | None = None,
event_at: Any | None = None,
note: str | None = None,
db_path: Path | None = None,
) -> dict[str, Any] | None:
init_db(db_path)
existing = get_event(event_id, db_path=db_path)
if not existing:
return None
et = normalize_event_type(event_type if event_type is not None else existing["event_type"])
event_at_ms = (
parse_event_at_ms(event_at) if event_at is not None else int(existing["event_at_ms"])
)
note_s = existing["note"] if note is None else str(note or "").strip()[:500]
now_ms = int(time.time() * 1000)
conn = _connect(db_path)
try:
_assert_no_duplicate(conn, et, event_at_ms, exclude_id=int(event_id))
conn.execute(
"""
UPDATE macro_events
SET event_type=?, event_at_ms=?, note=?, updated_at_ms=?
WHERE id=?
""",
(et, event_at_ms, note_s, now_ms, int(event_id)),
)
finally:
conn.close()
return get_event(event_id, db_path=db_path)
def delete_event(event_id: int, db_path: Path | None = None) -> bool:
init_db(db_path)
conn = _connect(db_path)
try:
cur = conn.execute("DELETE FROM macro_events WHERE id=?", (int(event_id),))
return cur.rowcount > 0
finally:
conn.close()
def list_active_alerts(
now_ms: int | None = None,
db_path: Path | None = None,
) -> list[dict[str, Any]]:
now = int(now_ms if now_ms is not None else time.time() * 1000)
lookback = now - WINDOW_BEFORE_MS - IMMINENT_BEFORE_MS
lookahead = now + WINDOW_AFTER_MS
init_db(db_path)
conn = _connect(db_path)
try:
rows = conn.execute(
"""
SELECT * FROM macro_events
WHERE event_at_ms >= ? AND event_at_ms <= ?
ORDER BY event_at_ms ASC, id ASC
""",
(lookback, lookahead),
).fetchall()
finally:
conn.close()
alerts: list[dict[str, Any]] = []
for row in rows:
item = enrich_alert(_row_to_dict(row), now_ms=now)
if item:
alerts.append(item)
return alerts
def build_banner_message(alert: dict[str, Any], *, has_positions: bool) -> str:
label = alert.get("event_type_label") or alert.get("event_type") or "宏观数据"
phase = alert.get("phase") or "window"
if has_positions:
if phase == "imminent" and int(alert.get("minutes_to_event") or 0) > 0:
return (
f"{label}」即将发布(约 {alert['minutes_to_event']} 分钟),"
"注意仓位风险:勿加仓,检查止损/减仓"
)
return f"{label}」高波动窗口(±1h),注意仓位风险:勿加仓,检查止损/减仓"
if phase == "imminent" and int(alert.get("minutes_to_event") or 0) > 0:
return (
f"{label}」即将发布(约 {alert['minutes_to_event']} 分钟),"
"建议等待,避免新开仓"
)
return f"{label}」高波动窗口(±1h),建议等待,避免新开仓"