"""中控宏观关键数据日历:手动录入 FOMC / CPI / 非农档发布时间,±1h 风控前置窗口。""" from __future__ import annotations import os import sqlite3 import time from datetime import datetime from pathlib import Path from typing import Any from zoneinfo import ZoneInfo from hub_symbol_archive_lib import parse_wall_clock_ms DISPLAY_TZ = ZoneInfo(os.getenv("APP_TIMEZONE", "Asia/Shanghai")) MACRO_EVENT_TYPES = ("fomc", "cpi", "employment") MACRO_EVENT_LABELS: dict[str, str] = { "fomc": "FOMC 联邦基金利率", "cpi": "美国 CPI 通胀", "employment": "就业与劳工数据", } WINDOW_BEFORE_MS = int(os.getenv("HUB_MACRO_WINDOW_BEFORE_SEC", str(3600))) * 1000 WINDOW_AFTER_MS = int(os.getenv("HUB_MACRO_WINDOW_AFTER_SEC", str(3600))) * 1000 IMMINENT_BEFORE_MS = int(os.getenv("HUB_MACRO_IMMINENT_BEFORE_SEC", str(1800))) * 1000 LIST_FUTURE_DAYS = int(os.getenv("HUB_MACRO_LIST_FUTURE_DAYS", "60")) def default_db_path() -> Path: raw = (os.getenv("HUB_MACRO_CALENDAR_DB_PATH") or "").strip() if raw: return Path(raw) hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data" hub_dir.mkdir(parents=True, exist_ok=True) return hub_dir / "hub_macro_calendar.db" def _connect(db_path: Path | None = None) -> sqlite3.Connection: path = db_path or default_db_path() path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(path), timeout=30, isolation_level=None) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") return conn def init_db(db_path: Path | None = None) -> None: conn = _connect(db_path) try: conn.execute( """ CREATE TABLE IF NOT EXISTS macro_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, event_type TEXT NOT NULL, event_at_ms INTEGER NOT NULL, note TEXT NOT NULL DEFAULT '', created_at_ms INTEGER NOT NULL, updated_at_ms INTEGER NOT NULL ) """ ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_macro_events_at ON macro_events(event_at_ms)" ) finally: conn.close() def normalize_event_type(raw: str) -> str: key = (raw or "").strip().lower() if key not in MACRO_EVENT_TYPES: raise ValueError(f"事件类型须为: {', '.join(MACRO_EVENT_LABELS.values())}") return key def parse_event_at_ms(raw: Any) -> int: ms = parse_wall_clock_ms(raw, tz=DISPLAY_TZ) if ms is None: raise ValueError("发布时间格式错误,请使用 YYYY-MM-DD HH:MM 或 YYYY-MM-DDTHH:MM") return int(ms) def format_event_at(ms: int) -> str: dt = datetime.fromtimestamp(ms / 1000, tz=DISPLAY_TZ) return dt.strftime("%Y-%m-%d %H:%M") def _row_to_dict(row: sqlite3.Row) -> dict[str, Any]: ms = int(row["event_at_ms"]) et = str(row["event_type"]) return { "id": int(row["id"]), "event_type": et, "event_type_label": MACRO_EVENT_LABELS.get(et, et), "event_at_ms": ms, "event_at": format_event_at(ms), "note": str(row["note"] or ""), "created_at_ms": int(row["created_at_ms"]), "updated_at_ms": int(row["updated_at_ms"]), } def _window_bounds(event_at_ms: int) -> tuple[int, int]: start = int(event_at_ms) - WINDOW_BEFORE_MS end = int(event_at_ms) + WINDOW_AFTER_MS return start, end def enrich_alert(row: dict[str, Any], now_ms: int | None = None) -> dict[str, Any] | None: now = int(now_ms if now_ms is not None else time.time() * 1000) event_at_ms = int(row["event_at_ms"]) window_start, window_end = _window_bounds(event_at_ms) if now < window_start or now > window_end: return None imminent = now >= (event_at_ms - IMMINENT_BEFORE_MS) and now <= window_end mins_to_event = max(0, int((event_at_ms - now) / 60000)) mins_from_event = max(0, int((now - event_at_ms) / 60000)) return { **row, "window_start_ms": window_start, "window_end_ms": window_end, "window_start": format_event_at(window_start), "window_end": format_event_at(window_end), "phase": "imminent" if imminent else "window", "phase_label": "即将发布" if imminent and now < event_at_ms else "高波动窗口", "minutes_to_event": mins_to_event if now < event_at_ms else 0, "minutes_from_event": mins_from_event if now >= event_at_ms else 0, } def list_events( *, now_ms: int | None = None, include_expired_hours: int = 24, db_path: Path | None = None, ) -> list[dict[str, Any]]: init_db(db_path) now = int(now_ms if now_ms is not None else time.time() * 1000) horizon = now + LIST_FUTURE_DAYS * 86400 * 1000 expired_cutoff = now - max(0, int(include_expired_hours)) * 3600 * 1000 - WINDOW_AFTER_MS conn = _connect(db_path) try: rows = conn.execute( """ SELECT * FROM macro_events WHERE event_at_ms >= ? AND event_at_ms <= ? ORDER BY event_at_ms ASC, id ASC """, (expired_cutoff, horizon), ).fetchall() return [_row_to_dict(r) for r in rows] finally: conn.close() def get_event(event_id: int, db_path: Path | None = None) -> dict[str, Any] | None: init_db(db_path) conn = _connect(db_path) try: row = conn.execute("SELECT * FROM macro_events WHERE id=?", (int(event_id),)).fetchone() return _row_to_dict(row) if row else None finally: conn.close() def _assert_no_duplicate( conn: sqlite3.Connection, event_type: str, event_at_ms: int, *, exclude_id: int | None = None, ) -> None: if exclude_id is None: row = conn.execute( "SELECT id FROM macro_events WHERE event_type=? AND event_at_ms=? LIMIT 1", (event_type, int(event_at_ms)), ).fetchone() else: row = conn.execute( """ SELECT id FROM macro_events WHERE event_type=? AND event_at_ms=? AND id<>? LIMIT 1 """, (event_type, int(event_at_ms), int(exclude_id)), ).fetchone() if row: raise ValueError("同类型、同发布时间的记录已存在") def create_event( event_type: str, event_at: Any, *, note: str = "", db_path: Path | None = None, ) -> dict[str, Any]: init_db(db_path) et = normalize_event_type(event_type) event_at_ms = parse_event_at_ms(event_at) note_s = str(note or "").strip()[:500] now_ms = int(time.time() * 1000) conn = _connect(db_path) try: _assert_no_duplicate(conn, et, event_at_ms) cur = conn.execute( """ INSERT INTO macro_events (event_type, event_at_ms, note, created_at_ms, updated_at_ms) VALUES (?, ?, ?, ?, ?) """, (et, event_at_ms, note_s, now_ms, now_ms), ) eid = int(cur.lastrowid) finally: conn.close() row = get_event(eid, db_path=db_path) assert row is not None return row def update_event( event_id: int, *, event_type: str | None = None, event_at: Any | None = None, note: str | None = None, db_path: Path | None = None, ) -> dict[str, Any] | None: init_db(db_path) existing = get_event(event_id, db_path=db_path) if not existing: return None et = normalize_event_type(event_type if event_type is not None else existing["event_type"]) event_at_ms = ( parse_event_at_ms(event_at) if event_at is not None else int(existing["event_at_ms"]) ) note_s = existing["note"] if note is None else str(note or "").strip()[:500] now_ms = int(time.time() * 1000) conn = _connect(db_path) try: _assert_no_duplicate(conn, et, event_at_ms, exclude_id=int(event_id)) conn.execute( """ UPDATE macro_events SET event_type=?, event_at_ms=?, note=?, updated_at_ms=? WHERE id=? """, (et, event_at_ms, note_s, now_ms, int(event_id)), ) finally: conn.close() return get_event(event_id, db_path=db_path) def delete_event(event_id: int, db_path: Path | None = None) -> bool: init_db(db_path) conn = _connect(db_path) try: cur = conn.execute("DELETE FROM macro_events WHERE id=?", (int(event_id),)) return cur.rowcount > 0 finally: conn.close() def list_active_alerts( now_ms: int | None = None, db_path: Path | None = None, ) -> list[dict[str, Any]]: now = int(now_ms if now_ms is not None else time.time() * 1000) lookback = now - WINDOW_BEFORE_MS - IMMINENT_BEFORE_MS lookahead = now + WINDOW_AFTER_MS init_db(db_path) conn = _connect(db_path) try: rows = conn.execute( """ SELECT * FROM macro_events WHERE event_at_ms >= ? AND event_at_ms <= ? ORDER BY event_at_ms ASC, id ASC """, (lookback, lookahead), ).fetchall() finally: conn.close() alerts: list[dict[str, Any]] = [] for row in rows: item = enrich_alert(_row_to_dict(row), now_ms=now) if item: alerts.append(item) return alerts def build_banner_message(alert: dict[str, Any], *, has_positions: bool) -> str: label = alert.get("event_type_label") or alert.get("event_type") or "宏观数据" phase = alert.get("phase") or "window" if has_positions: if phase == "imminent" and int(alert.get("minutes_to_event") or 0) > 0: return ( f"「{label}」即将发布(约 {alert['minutes_to_event']} 分钟)," "注意仓位风险:勿加仓,检查止损/减仓" ) return f"「{label}」高波动窗口(±1h),注意仓位风险:勿加仓,检查止损/减仓" if phase == "imminent" and int(alert.get("minutes_to_event") or 0) > 0: return ( f"「{label}」即将发布(约 {alert['minutes_to_event']} 分钟)," "建议等待,避免新开仓" ) return f"「{label}」高波动窗口(±1h),建议等待,避免新开仓"