Files
crypto_monitor/hub_entry_plan_lib.py
T
2026-06-22 16:19:56 +08:00

449 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""中控开仓计划:进行中 / 历史归档 / 胜率统计。"""
from __future__ import annotations
import os
import sqlite3
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
PLAN_TYPES = {
"trend": "趋势单",
"swing": "波段单",
"intraday": "日内短线",
}
TREND_TIMEFRAMES = ("5m", "15m", "30m", "1h", "4h", "1d")
ENTRY_TIMEFRAMES = ("1m", "5m", "15m", "30m", "1h")
DIRECTIONS = {"long": "", "short": ""}
ENTRY_SCHEMES = {
"breakout": "突破方案",
"false_breakout": "假突破突破方案",
"box_inflection": "箱体拐点方案",
}
RESULTS = {"win": "", "loss": ""}
STAT_DIMENSIONS = ("symbol", "trend_tf", "entry_scheme")
DISPLAY_TZ = ZoneInfo(
(os.getenv("HUB_ENTRY_PLAN_TZ") or os.getenv("HUB_VOLUME_RANK_TZ") or "Asia/Shanghai").strip()
or "Asia/Shanghai"
)
def default_db_path() -> Path:
raw = (os.getenv("HUB_ENTRY_PLAN_DB_PATH") or "").strip()
if raw:
return Path(raw)
hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data"
hub_dir.mkdir(parents=True, exist_ok=True)
return hub_dir / "hub_entry_plans.db"
def _now_ms() -> int:
return int(time.time() * 1000)
def _connect(db_path: Path | None = None) -> sqlite3.Connection:
path = db_path or default_db_path()
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path), timeout=30, isolation_level=None)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def init_db(db_path: Path | None = None) -> None:
conn = _connect(db_path)
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS entry_plans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plan_date TEXT NOT NULL,
exchange_key TEXT NOT NULL,
symbol TEXT NOT NULL,
plan_type TEXT NOT NULL,
trend_timeframe TEXT NOT NULL,
entry_timeframe TEXT NOT NULL,
direction TEXT NOT NULL,
target_level TEXT NOT NULL DEFAULT '',
current_range TEXT NOT NULL DEFAULT '',
entry_scheme TEXT NOT NULL,
result TEXT,
pnl_amount REAL,
note TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'active',
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
archived_at INTEGER
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_entry_plans_status_date
ON entry_plans (status, plan_date DESC, id DESC)
"""
)
finally:
conn.close()
def normalize_plan_symbol(raw: str) -> str:
s = str(raw or "").strip().upper()
if not s:
raise ValueError("缺少币种")
if ":" in s:
s = s.split(":", 1)[0]
if "/" in s:
base, quote = s.split("/", 1)
base = base.strip()
quote = (quote or "USDT").strip() or "USDT"
if not base:
raise ValueError("币种无效")
return f"{base}/{quote}"
if s.endswith("USDT") and len(s) > 4:
return f"{s[:-4]}/{s[-4:]}"
return f"{s}/USDT"
def _validate_choice(value: str, allowed: dict[str, str] | tuple[str, ...], field: str) -> str:
key = str(value or "").strip().lower()
if isinstance(allowed, dict):
if key not in allowed:
raise ValueError(f"{field} 无效")
return key
if key not in allowed:
raise ValueError(f"{field} 无效")
return key
def _row_to_dict(row: sqlite3.Row | None) -> dict[str, Any] | None:
if row is None:
return None
d = dict(row)
d["plan_type_label"] = PLAN_TYPES.get(d.get("plan_type") or "", d.get("plan_type") or "")
d["direction_label"] = DIRECTIONS.get(d.get("direction") or "", d.get("direction") or "")
d["entry_scheme_label"] = ENTRY_SCHEMES.get(
d.get("entry_scheme") or "", d.get("entry_scheme") or ""
)
res = d.get("result")
d["result_label"] = RESULTS.get(res, "") if res else ""
return d
def _parse_optional_pnl(raw: Any) -> float | None:
if raw is None or raw == "":
return None
try:
return round(float(raw), 4)
except (TypeError, ValueError) as e:
raise ValueError("盈亏金额无效") from e
def create_entry_plan(payload: dict[str, Any], *, db_path: Path | None = None) -> dict[str, Any]:
init_db(db_path)
plan_date = str(payload.get("plan_date") or "").strip()[:10]
if not plan_date:
raise ValueError("缺少 plan_date")
exchange_key = str(payload.get("exchange_key") or "").strip().lower()
if not exchange_key:
raise ValueError("缺少 exchange_key")
symbol = normalize_plan_symbol(payload.get("symbol") or "")
plan_type = _validate_choice(payload.get("plan_type"), PLAN_TYPES, "类型")
trend_tf = _validate_choice(payload.get("trend_timeframe"), TREND_TIMEFRAMES, "趋势周期")
entry_tf = _validate_choice(payload.get("entry_timeframe"), ENTRY_TIMEFRAMES, "入场周期")
direction = _validate_choice(payload.get("direction"), DIRECTIONS, "方向")
entry_scheme = _validate_choice(payload.get("entry_scheme"), ENTRY_SCHEMES, "入场方案")
target_level = str(payload.get("target_level") or "").strip()
current_range = str(payload.get("current_range") or "").strip()
note = str(payload.get("note") or "").strip()
now = _now_ms()
conn = _connect(db_path)
try:
cur = conn.execute(
"""
INSERT INTO entry_plans (
plan_date, exchange_key, symbol, plan_type, trend_timeframe, entry_timeframe,
direction, target_level, current_range, entry_scheme, note, status,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'active', ?, ?)
""",
(
plan_date,
exchange_key,
symbol,
plan_type,
trend_tf,
entry_tf,
direction,
target_level,
current_range,
entry_scheme,
note,
now,
now,
),
)
row = conn.execute(
"SELECT * FROM entry_plans WHERE id=?",
(int(cur.lastrowid),),
).fetchone()
return _row_to_dict(row) or {}
finally:
conn.close()
def list_entry_plans(
*,
status: str = "active",
db_path: Path | None = None,
) -> list[dict[str, Any]]:
init_db(db_path)
st = (status or "active").strip().lower()
if st not in ("active", "archived"):
raise ValueError("status 无效")
conn = _connect(db_path)
try:
rows = conn.execute(
"""
SELECT * FROM entry_plans
WHERE status=?
ORDER BY plan_date DESC, id DESC
""",
(st,),
).fetchall()
return [_row_to_dict(r) for r in rows if r]
finally:
conn.close()
def get_entry_plan(plan_id: int, *, db_path: Path | None = None) -> dict[str, Any] | None:
init_db(db_path)
conn = _connect(db_path)
try:
row = conn.execute("SELECT * FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone()
return _row_to_dict(row)
finally:
conn.close()
def update_entry_plan(
plan_id: int,
payload: dict[str, Any],
*,
db_path: Path | None = None,
) -> dict[str, Any] | None:
init_db(db_path)
conn = _connect(db_path)
try:
row = conn.execute("SELECT * FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone()
if not row:
return None
if row["status"] == "archived":
raise ValueError("已归档计划不可修改")
fields: dict[str, Any] = {}
if "plan_date" in payload:
qd = str(payload.get("plan_date") or "").strip()[:10]
if not qd:
raise ValueError("缺少 plan_date")
fields["plan_date"] = qd
if "exchange_key" in payload:
ex = str(payload.get("exchange_key") or "").strip().lower()
if not ex:
raise ValueError("缺少 exchange_key")
fields["exchange_key"] = ex
if "symbol" in payload:
fields["symbol"] = normalize_plan_symbol(payload.get("symbol") or "")
if "plan_type" in payload:
fields["plan_type"] = _validate_choice(payload.get("plan_type"), PLAN_TYPES, "类型")
if "trend_timeframe" in payload:
fields["trend_timeframe"] = _validate_choice(
payload.get("trend_timeframe"), TREND_TIMEFRAMES, "趋势周期"
)
if "entry_timeframe" in payload:
fields["entry_timeframe"] = _validate_choice(
payload.get("entry_timeframe"), ENTRY_TIMEFRAMES, "入场周期"
)
if "direction" in payload:
fields["direction"] = _validate_choice(payload.get("direction"), DIRECTIONS, "方向")
if "entry_scheme" in payload:
fields["entry_scheme"] = _validate_choice(
payload.get("entry_scheme"), ENTRY_SCHEMES, "入场方案"
)
if "target_level" in payload:
fields["target_level"] = str(payload.get("target_level") or "").strip()
if "current_range" in payload:
fields["current_range"] = str(payload.get("current_range") or "").strip()
if "note" in payload:
fields["note"] = str(payload.get("note") or "").strip()
if "pnl_amount" in payload:
fields["pnl_amount"] = _parse_optional_pnl(payload.get("pnl_amount"))
archive_now = False
if "result" in payload:
res_raw = payload.get("result")
if res_raw is None or str(res_raw).strip() == "":
fields["result"] = None
else:
fields["result"] = _validate_choice(res_raw, RESULTS, "结果")
archive_now = True
if not fields:
return _row_to_dict(row)
now = _now_ms()
fields["updated_at"] = now
if archive_now:
fields["status"] = "archived"
fields["archived_at"] = now
sets = ", ".join(f"{k}=?" for k in fields)
conn.execute(
f"UPDATE entry_plans SET {sets} WHERE id=?",
(*fields.values(), int(plan_id)),
)
updated = conn.execute("SELECT * FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone()
return _row_to_dict(updated)
finally:
conn.close()
def delete_entry_plan(plan_id: int, *, db_path: Path | None = None) -> bool:
init_db(db_path)
conn = _connect(db_path)
try:
row = conn.execute("SELECT status FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone()
if not row:
return False
if row["status"] != "active":
raise ValueError("仅进行中的计划可删除")
cur = conn.execute("DELETE FROM entry_plans WHERE id=? AND status='active'", (int(plan_id),))
return int(cur.rowcount or 0) > 0
finally:
conn.close()
def _today_iso() -> str:
return datetime.now(DISPLAY_TZ).strftime("%Y-%m-%d")
def resolve_stats_date_bounds(
*,
period: str = "all",
date_from: str = "",
date_to: str = "",
) -> tuple[str | None, str | None, str]:
"""返回 (date_from, date_to, label)all 时 bounds 为 None。"""
p = (period or "all").strip().lower() or "all"
today = _today_iso()
if p == "all":
return None, None, "全部历史"
if p == "week":
day_dt = datetime.strptime(today, "%Y-%m-%d")
monday = (day_dt - timedelta(days=day_dt.weekday())).strftime("%Y-%m-%d")
return monday, today, f"本周 {monday}{today}"
if p == "month":
day_dt = datetime.strptime(today, "%Y-%m-%d")
first = day_dt.replace(day=1).strftime("%Y-%m-%d")
return first, today, f"本月 {first}{today}"
if p == "range":
df = (date_from or "").strip()[:10] or today
dt = (date_to or "").strip()[:10] or df
if df > dt:
df, dt = dt, df
label = f"区间 {df}{dt}" if df != dt else f"区间 {df}"
return df, dt, label
return None, None, "全部历史"
def compute_entry_plan_stats(
*,
dimension: str = "symbol",
period: str = "all",
date_from: str = "",
date_to: str = "",
db_path: Path | None = None,
) -> dict[str, Any]:
init_db(db_path)
dim = (dimension or "symbol").strip().lower()
if dim not in STAT_DIMENSIONS:
raise ValueError("dimension 无效")
df_bound, dt_bound, period_label = resolve_stats_date_bounds(
period=period, date_from=date_from, date_to=date_to
)
col_map = {
"symbol": "symbol",
"trend_tf": "trend_timeframe",
"entry_scheme": "entry_scheme",
}
col = col_map[dim]
conn = _connect(db_path)
try:
where = "status='archived' AND result IN ('win','loss')"
params: list[Any] = []
if df_bound:
where += " AND plan_date >= ? AND plan_date <= ?"
params.extend([df_bound, dt_bound])
rows = conn.execute(
f"""
SELECT {col} AS dim_key,
COUNT(*) AS total,
SUM(CASE WHEN result='win' THEN 1 ELSE 0 END) AS win_count,
SUM(CASE WHEN result='loss' THEN 1 ELSE 0 END) AS loss_count
FROM entry_plans
WHERE {where}
GROUP BY {col}
ORDER BY total DESC, dim_key ASC
""",
params,
).fetchall()
items = []
for r in rows:
total = int(r["total"] or 0)
wins = int(r["win_count"] or 0)
losses = int(r["loss_count"] or 0)
key = str(r["dim_key"] or "")
label = key
if dim == "entry_scheme":
label = ENTRY_SCHEMES.get(key, key)
elif dim == "trend_tf":
label = key
win_rate = round(wins / total * 100, 1) if total else None
items.append(
{
"key": key,
"label": label,
"total": total,
"win_count": wins,
"loss_count": losses,
"win_rate": win_rate,
}
)
return {
"dimension": dim,
"period": period,
"period_label": period_label,
"date_from": df_bound,
"date_to": dt_bound,
"items": items,
}
finally:
conn.close()
def meta_payload(exchanges: list[dict[str, Any]] | None = None) -> dict[str, Any]:
return {
"plan_types": [{"value": k, "label": v} for k, v in PLAN_TYPES.items()],
"trend_timeframes": list(TREND_TIMEFRAMES),
"entry_timeframes": list(ENTRY_TIMEFRAMES),
"directions": [{"value": k, "label": v} for k, v in DIRECTIONS.items()],
"entry_schemes": [{"value": k, "label": v} for k, v in ENTRY_SCHEMES.items()],
"results": [{"value": k, "label": v} for k, v in RESULTS.items()],
"stat_dimensions": [
{"value": "symbol", "label": "币种"},
{"value": "trend_tf", "label": "趋势周期"},
{"value": "entry_scheme", "label": "入场方案"},
],
"exchanges": exchanges or [],
}