"""中控开仓计划:进行中 / 历史归档 / 胜率统计。""" from __future__ import annotations import os import sqlite3 import time from datetime import datetime, timedelta from pathlib import Path from typing import Any from zoneinfo import ZoneInfo PLAN_TYPES = { "trend": "趋势单", "swing": "波段单", "intraday": "日内短线", } TREND_TIMEFRAMES = ("5m", "15m", "30m", "1h", "4h", "1d") ENTRY_TIMEFRAMES = ("1m", "5m", "15m", "30m", "1h") DIRECTIONS = {"long": "多", "short": "空"} ENTRY_SCHEMES = { "breakout": "突破方案", "false_breakout": "假突破突破方案", "box_inflection": "箱体拐点方案", } RESULTS = {"win": "盈", "loss": "亏"} STAT_DIMENSIONS = ("symbol", "trend_tf", "entry_scheme") DISPLAY_TZ = ZoneInfo( (os.getenv("HUB_ENTRY_PLAN_TZ") or os.getenv("HUB_VOLUME_RANK_TZ") or "Asia/Shanghai").strip() or "Asia/Shanghai" ) def default_db_path() -> Path: raw = (os.getenv("HUB_ENTRY_PLAN_DB_PATH") or "").strip() if raw: return Path(raw) hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data" hub_dir.mkdir(parents=True, exist_ok=True) return hub_dir / "hub_entry_plans.db" def _now_ms() -> int: return int(time.time() * 1000) def _connect(db_path: Path | None = None) -> sqlite3.Connection: path = db_path or default_db_path() path.parent.mkdir(parents=True, exist_ok=True) conn = sqlite3.connect(str(path), timeout=30, isolation_level=None) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") return conn def init_db(db_path: Path | None = None) -> None: conn = _connect(db_path) try: conn.execute( """ CREATE TABLE IF NOT EXISTS entry_plans ( id INTEGER PRIMARY KEY AUTOINCREMENT, plan_date TEXT NOT NULL, exchange_key TEXT NOT NULL, symbol TEXT NOT NULL, plan_type TEXT NOT NULL, trend_timeframe TEXT NOT NULL, entry_timeframe TEXT NOT NULL, direction TEXT NOT NULL, target_level TEXT NOT NULL DEFAULT '', current_range TEXT NOT NULL DEFAULT '', entry_scheme TEXT NOT NULL, result TEXT, pnl_amount REAL, note TEXT NOT NULL DEFAULT '', status TEXT NOT NULL DEFAULT 'active', created_at INTEGER NOT NULL, updated_at INTEGER NOT NULL, archived_at INTEGER ) """ ) conn.execute( """ CREATE INDEX IF NOT EXISTS idx_entry_plans_status_date ON entry_plans (status, plan_date DESC, id DESC) """ ) finally: conn.close() def normalize_plan_symbol(raw: str) -> str: s = str(raw or "").strip().upper() if not s: raise ValueError("缺少币种") if ":" in s: s = s.split(":", 1)[0] if "/" in s: base, quote = s.split("/", 1) base = base.strip() quote = (quote or "USDT").strip() or "USDT" if not base: raise ValueError("币种无效") return f"{base}/{quote}" if s.endswith("USDT") and len(s) > 4: return f"{s[:-4]}/{s[-4:]}" return f"{s}/USDT" def _validate_choice(value: str, allowed: dict[str, str] | tuple[str, ...], field: str) -> str: key = str(value or "").strip().lower() if isinstance(allowed, dict): if key not in allowed: raise ValueError(f"{field} 无效") return key if key not in allowed: raise ValueError(f"{field} 无效") return key def _row_to_dict(row: sqlite3.Row | None) -> dict[str, Any] | None: if row is None: return None d = dict(row) d["plan_type_label"] = PLAN_TYPES.get(d.get("plan_type") or "", d.get("plan_type") or "") d["direction_label"] = DIRECTIONS.get(d.get("direction") or "", d.get("direction") or "") d["entry_scheme_label"] = ENTRY_SCHEMES.get( d.get("entry_scheme") or "", d.get("entry_scheme") or "" ) or "待填写" res = d.get("result") d["result_label"] = RESULTS.get(res, "") if res else "" return d def _parse_optional_pnl(raw: Any) -> float | None: if raw is None or raw == "": return None try: return round(float(raw), 4) except (TypeError, ValueError) as e: raise ValueError("盈亏金额无效") from e def create_entry_plan(payload: dict[str, Any], *, db_path: Path | None = None) -> dict[str, Any]: init_db(db_path) plan_date = str(payload.get("plan_date") or "").strip()[:10] if not plan_date: raise ValueError("缺少 plan_date") exchange_key = str(payload.get("exchange_key") or "").strip().lower() if not exchange_key: raise ValueError("缺少 exchange_key") symbol = normalize_plan_symbol(payload.get("symbol") or "") plan_type = _validate_choice(payload.get("plan_type"), PLAN_TYPES, "类型") trend_tf = _validate_choice(payload.get("trend_timeframe"), TREND_TIMEFRAMES, "趋势周期") entry_tf = _validate_choice(payload.get("entry_timeframe"), ENTRY_TIMEFRAMES, "入场周期") direction = _validate_choice(payload.get("direction"), DIRECTIONS, "方向") entry_scheme = "" if payload.get("entry_scheme"): entry_scheme = _validate_choice(payload.get("entry_scheme"), ENTRY_SCHEMES, "入场方案") target_level = str(payload.get("target_level") or "").strip() current_range = str(payload.get("current_range") or "").strip() note = str(payload.get("note") or "").strip() now = _now_ms() conn = _connect(db_path) try: cur = conn.execute( """ INSERT INTO entry_plans ( plan_date, exchange_key, symbol, plan_type, trend_timeframe, entry_timeframe, direction, target_level, current_range, entry_scheme, note, status, created_at, updated_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'active', ?, ?) """, ( plan_date, exchange_key, symbol, plan_type, trend_tf, entry_tf, direction, target_level, current_range, entry_scheme, note, now, now, ), ) row = conn.execute( "SELECT * FROM entry_plans WHERE id=?", (int(cur.lastrowid),), ).fetchone() return _row_to_dict(row) or {} finally: conn.close() def list_entry_plans( *, status: str = "active", db_path: Path | None = None, ) -> list[dict[str, Any]]: init_db(db_path) st = (status or "active").strip().lower() if st not in ("active", "archived"): raise ValueError("status 无效") conn = _connect(db_path) try: rows = conn.execute( """ SELECT * FROM entry_plans WHERE status=? ORDER BY plan_date DESC, id DESC """, (st,), ).fetchall() return [_row_to_dict(r) for r in rows if r] finally: conn.close() def get_entry_plan(plan_id: int, *, db_path: Path | None = None) -> dict[str, Any] | None: init_db(db_path) conn = _connect(db_path) try: row = conn.execute("SELECT * FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone() return _row_to_dict(row) finally: conn.close() def update_entry_plan( plan_id: int, payload: dict[str, Any], *, db_path: Path | None = None, ) -> dict[str, Any] | None: init_db(db_path) conn = _connect(db_path) try: row = conn.execute("SELECT * FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone() if not row: return None if row["status"] == "archived": raise ValueError("已归档计划不可修改") fields: dict[str, Any] = {} if "plan_date" in payload: qd = str(payload.get("plan_date") or "").strip()[:10] if not qd: raise ValueError("缺少 plan_date") fields["plan_date"] = qd if "exchange_key" in payload: ex = str(payload.get("exchange_key") or "").strip().lower() if not ex: raise ValueError("缺少 exchange_key") fields["exchange_key"] = ex if "symbol" in payload: fields["symbol"] = normalize_plan_symbol(payload.get("symbol") or "") if "plan_type" in payload: fields["plan_type"] = _validate_choice(payload.get("plan_type"), PLAN_TYPES, "类型") if "trend_timeframe" in payload: fields["trend_timeframe"] = _validate_choice( payload.get("trend_timeframe"), TREND_TIMEFRAMES, "趋势周期" ) if "entry_timeframe" in payload: fields["entry_timeframe"] = _validate_choice( payload.get("entry_timeframe"), ENTRY_TIMEFRAMES, "入场周期" ) if "direction" in payload: fields["direction"] = _validate_choice(payload.get("direction"), DIRECTIONS, "方向") if "entry_scheme" in payload: fields["entry_scheme"] = _validate_choice( payload.get("entry_scheme"), ENTRY_SCHEMES, "入场方案" ) if "target_level" in payload: fields["target_level"] = str(payload.get("target_level") or "").strip() if "current_range" in payload: fields["current_range"] = str(payload.get("current_range") or "").strip() if "note" in payload: fields["note"] = str(payload.get("note") or "").strip() if "pnl_amount" in payload: fields["pnl_amount"] = _parse_optional_pnl(payload.get("pnl_amount")) archive_now = False if "result" in payload: res_raw = payload.get("result") if res_raw is None or str(res_raw).strip() == "": fields["result"] = None else: fields["result"] = _validate_choice(res_raw, RESULTS, "结果") archive_now = True if not fields: return _row_to_dict(row) now = _now_ms() fields["updated_at"] = now if archive_now: scheme_val = fields.get("entry_scheme", row["entry_scheme"]) if not str(scheme_val or "").strip(): raise ValueError("归档前请在进行中计划里选择入场方案") fields["status"] = "archived" fields["archived_at"] = now sets = ", ".join(f"{k}=?" for k in fields) conn.execute( f"UPDATE entry_plans SET {sets} WHERE id=?", (*fields.values(), int(plan_id)), ) updated = conn.execute("SELECT * FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone() return _row_to_dict(updated) finally: conn.close() def delete_entry_plan(plan_id: int, *, db_path: Path | None = None) -> bool: init_db(db_path) conn = _connect(db_path) try: row = conn.execute("SELECT status FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone() if not row: return False if row["status"] != "active": raise ValueError("仅进行中的计划可删除") cur = conn.execute("DELETE FROM entry_plans WHERE id=? AND status='active'", (int(plan_id),)) return int(cur.rowcount or 0) > 0 finally: conn.close() def _today_iso() -> str: return datetime.now(DISPLAY_TZ).strftime("%Y-%m-%d") def resolve_stats_date_bounds( *, period: str = "all", date_from: str = "", date_to: str = "", ) -> tuple[str | None, str | None, str]: """返回 (date_from, date_to, label);all 时 bounds 为 None。""" p = (period or "all").strip().lower() or "all" today = _today_iso() if p == "all": return None, None, "全部历史" if p == "week": day_dt = datetime.strptime(today, "%Y-%m-%d") monday = (day_dt - timedelta(days=day_dt.weekday())).strftime("%Y-%m-%d") return monday, today, f"本周 {monday}~{today}" if p == "month": day_dt = datetime.strptime(today, "%Y-%m-%d") first = day_dt.replace(day=1).strftime("%Y-%m-%d") return first, today, f"本月 {first}~{today}" if p == "range": df = (date_from or "").strip()[:10] or today dt = (date_to or "").strip()[:10] or df if df > dt: df, dt = dt, df label = f"区间 {df}~{dt}" if df != dt else f"区间 {df}" return df, dt, label return None, None, "全部历史" def compute_entry_plan_stats( *, dimension: str = "symbol", period: str = "all", date_from: str = "", date_to: str = "", db_path: Path | None = None, ) -> dict[str, Any]: init_db(db_path) dim = (dimension or "symbol").strip().lower() if dim not in STAT_DIMENSIONS: raise ValueError("dimension 无效") df_bound, dt_bound, period_label = resolve_stats_date_bounds( period=period, date_from=date_from, date_to=date_to ) col_map = { "symbol": "symbol", "trend_tf": "trend_timeframe", "entry_scheme": "entry_scheme", } col = col_map[dim] conn = _connect(db_path) try: where = "status='archived' AND result IN ('win','loss')" params: list[Any] = [] if df_bound: where += " AND plan_date >= ? AND plan_date <= ?" params.extend([df_bound, dt_bound]) rows = conn.execute( f""" SELECT {col} AS dim_key, COUNT(*) AS total, SUM(CASE WHEN result='win' THEN 1 ELSE 0 END) AS win_count, SUM(CASE WHEN result='loss' THEN 1 ELSE 0 END) AS loss_count FROM entry_plans WHERE {where} GROUP BY {col} ORDER BY total DESC, dim_key ASC """, params, ).fetchall() items = [] for r in rows: total = int(r["total"] or 0) wins = int(r["win_count"] or 0) losses = int(r["loss_count"] or 0) key = str(r["dim_key"] or "") label = key if dim == "entry_scheme": label = ENTRY_SCHEMES.get(key, key) elif dim == "trend_tf": label = key win_rate = round(wins / total * 100, 1) if total else None items.append( { "key": key, "label": label, "total": total, "win_count": wins, "loss_count": losses, "win_rate": win_rate, } ) return { "dimension": dim, "period": period, "period_label": period_label, "date_from": df_bound, "date_to": dt_bound, "items": items, } finally: conn.close() def meta_payload(exchanges: list[dict[str, Any]] | None = None) -> dict[str, Any]: return { "plan_types": [{"value": k, "label": v} for k, v in PLAN_TYPES.items()], "trend_timeframes": list(TREND_TIMEFRAMES), "entry_timeframes": list(ENTRY_TIMEFRAMES), "directions": [{"value": k, "label": v} for k, v in DIRECTIONS.items()], "entry_schemes": [{"value": k, "label": v} for k, v in ENTRY_SCHEMES.items()], "results": [{"value": k, "label": v} for k, v in RESULTS.items()], "stat_dimensions": [ {"value": "symbol", "label": "币种"}, {"value": "trend_tf", "label": "趋势周期"}, {"value": "entry_scheme", "label": "入场方案"}, ], "exchanges": exchanges or [], }