Add entry plan page with CRUD, archive flow, and win-rate stats.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-22 16:19:56 +08:00
parent bd759c42d6
commit 091317276d
9 changed files with 1876 additions and 1 deletions
+448
View File
@@ -0,0 +1,448 @@
"""中控开仓计划:进行中 / 历史归档 / 胜率统计。"""
from __future__ import annotations
import os
import sqlite3
import time
from datetime import datetime, timedelta
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
PLAN_TYPES = {
"trend": "趋势单",
"swing": "波段单",
"intraday": "日内短线",
}
TREND_TIMEFRAMES = ("5m", "15m", "30m", "1h", "4h", "1d")
ENTRY_TIMEFRAMES = ("1m", "5m", "15m", "30m", "1h")
DIRECTIONS = {"long": "", "short": ""}
ENTRY_SCHEMES = {
"breakout": "突破方案",
"false_breakout": "假突破突破方案",
"box_inflection": "箱体拐点方案",
}
RESULTS = {"win": "", "loss": ""}
STAT_DIMENSIONS = ("symbol", "trend_tf", "entry_scheme")
DISPLAY_TZ = ZoneInfo(
(os.getenv("HUB_ENTRY_PLAN_TZ") or os.getenv("HUB_VOLUME_RANK_TZ") or "Asia/Shanghai").strip()
or "Asia/Shanghai"
)
def default_db_path() -> Path:
raw = (os.getenv("HUB_ENTRY_PLAN_DB_PATH") or "").strip()
if raw:
return Path(raw)
hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data"
hub_dir.mkdir(parents=True, exist_ok=True)
return hub_dir / "hub_entry_plans.db"
def _now_ms() -> int:
return int(time.time() * 1000)
def _connect(db_path: Path | None = None) -> sqlite3.Connection:
path = db_path or default_db_path()
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path), timeout=30, isolation_level=None)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def init_db(db_path: Path | None = None) -> None:
conn = _connect(db_path)
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS entry_plans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
plan_date TEXT NOT NULL,
exchange_key TEXT NOT NULL,
symbol TEXT NOT NULL,
plan_type TEXT NOT NULL,
trend_timeframe TEXT NOT NULL,
entry_timeframe TEXT NOT NULL,
direction TEXT NOT NULL,
target_level TEXT NOT NULL DEFAULT '',
current_range TEXT NOT NULL DEFAULT '',
entry_scheme TEXT NOT NULL,
result TEXT,
pnl_amount REAL,
note TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'active',
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
archived_at INTEGER
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_entry_plans_status_date
ON entry_plans (status, plan_date DESC, id DESC)
"""
)
finally:
conn.close()
def normalize_plan_symbol(raw: str) -> str:
s = str(raw or "").strip().upper()
if not s:
raise ValueError("缺少币种")
if ":" in s:
s = s.split(":", 1)[0]
if "/" in s:
base, quote = s.split("/", 1)
base = base.strip()
quote = (quote or "USDT").strip() or "USDT"
if not base:
raise ValueError("币种无效")
return f"{base}/{quote}"
if s.endswith("USDT") and len(s) > 4:
return f"{s[:-4]}/{s[-4:]}"
return f"{s}/USDT"
def _validate_choice(value: str, allowed: dict[str, str] | tuple[str, ...], field: str) -> str:
key = str(value or "").strip().lower()
if isinstance(allowed, dict):
if key not in allowed:
raise ValueError(f"{field} 无效")
return key
if key not in allowed:
raise ValueError(f"{field} 无效")
return key
def _row_to_dict(row: sqlite3.Row | None) -> dict[str, Any] | None:
if row is None:
return None
d = dict(row)
d["plan_type_label"] = PLAN_TYPES.get(d.get("plan_type") or "", d.get("plan_type") or "")
d["direction_label"] = DIRECTIONS.get(d.get("direction") or "", d.get("direction") or "")
d["entry_scheme_label"] = ENTRY_SCHEMES.get(
d.get("entry_scheme") or "", d.get("entry_scheme") or ""
)
res = d.get("result")
d["result_label"] = RESULTS.get(res, "") if res else ""
return d
def _parse_optional_pnl(raw: Any) -> float | None:
if raw is None or raw == "":
return None
try:
return round(float(raw), 4)
except (TypeError, ValueError) as e:
raise ValueError("盈亏金额无效") from e
def create_entry_plan(payload: dict[str, Any], *, db_path: Path | None = None) -> dict[str, Any]:
init_db(db_path)
plan_date = str(payload.get("plan_date") or "").strip()[:10]
if not plan_date:
raise ValueError("缺少 plan_date")
exchange_key = str(payload.get("exchange_key") or "").strip().lower()
if not exchange_key:
raise ValueError("缺少 exchange_key")
symbol = normalize_plan_symbol(payload.get("symbol") or "")
plan_type = _validate_choice(payload.get("plan_type"), PLAN_TYPES, "类型")
trend_tf = _validate_choice(payload.get("trend_timeframe"), TREND_TIMEFRAMES, "趋势周期")
entry_tf = _validate_choice(payload.get("entry_timeframe"), ENTRY_TIMEFRAMES, "入场周期")
direction = _validate_choice(payload.get("direction"), DIRECTIONS, "方向")
entry_scheme = _validate_choice(payload.get("entry_scheme"), ENTRY_SCHEMES, "入场方案")
target_level = str(payload.get("target_level") or "").strip()
current_range = str(payload.get("current_range") or "").strip()
note = str(payload.get("note") or "").strip()
now = _now_ms()
conn = _connect(db_path)
try:
cur = conn.execute(
"""
INSERT INTO entry_plans (
plan_date, exchange_key, symbol, plan_type, trend_timeframe, entry_timeframe,
direction, target_level, current_range, entry_scheme, note, status,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'active', ?, ?)
""",
(
plan_date,
exchange_key,
symbol,
plan_type,
trend_tf,
entry_tf,
direction,
target_level,
current_range,
entry_scheme,
note,
now,
now,
),
)
row = conn.execute(
"SELECT * FROM entry_plans WHERE id=?",
(int(cur.lastrowid),),
).fetchone()
return _row_to_dict(row) or {}
finally:
conn.close()
def list_entry_plans(
*,
status: str = "active",
db_path: Path | None = None,
) -> list[dict[str, Any]]:
init_db(db_path)
st = (status or "active").strip().lower()
if st not in ("active", "archived"):
raise ValueError("status 无效")
conn = _connect(db_path)
try:
rows = conn.execute(
"""
SELECT * FROM entry_plans
WHERE status=?
ORDER BY plan_date DESC, id DESC
""",
(st,),
).fetchall()
return [_row_to_dict(r) for r in rows if r]
finally:
conn.close()
def get_entry_plan(plan_id: int, *, db_path: Path | None = None) -> dict[str, Any] | None:
init_db(db_path)
conn = _connect(db_path)
try:
row = conn.execute("SELECT * FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone()
return _row_to_dict(row)
finally:
conn.close()
def update_entry_plan(
plan_id: int,
payload: dict[str, Any],
*,
db_path: Path | None = None,
) -> dict[str, Any] | None:
init_db(db_path)
conn = _connect(db_path)
try:
row = conn.execute("SELECT * FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone()
if not row:
return None
if row["status"] == "archived":
raise ValueError("已归档计划不可修改")
fields: dict[str, Any] = {}
if "plan_date" in payload:
qd = str(payload.get("plan_date") or "").strip()[:10]
if not qd:
raise ValueError("缺少 plan_date")
fields["plan_date"] = qd
if "exchange_key" in payload:
ex = str(payload.get("exchange_key") or "").strip().lower()
if not ex:
raise ValueError("缺少 exchange_key")
fields["exchange_key"] = ex
if "symbol" in payload:
fields["symbol"] = normalize_plan_symbol(payload.get("symbol") or "")
if "plan_type" in payload:
fields["plan_type"] = _validate_choice(payload.get("plan_type"), PLAN_TYPES, "类型")
if "trend_timeframe" in payload:
fields["trend_timeframe"] = _validate_choice(
payload.get("trend_timeframe"), TREND_TIMEFRAMES, "趋势周期"
)
if "entry_timeframe" in payload:
fields["entry_timeframe"] = _validate_choice(
payload.get("entry_timeframe"), ENTRY_TIMEFRAMES, "入场周期"
)
if "direction" in payload:
fields["direction"] = _validate_choice(payload.get("direction"), DIRECTIONS, "方向")
if "entry_scheme" in payload:
fields["entry_scheme"] = _validate_choice(
payload.get("entry_scheme"), ENTRY_SCHEMES, "入场方案"
)
if "target_level" in payload:
fields["target_level"] = str(payload.get("target_level") or "").strip()
if "current_range" in payload:
fields["current_range"] = str(payload.get("current_range") or "").strip()
if "note" in payload:
fields["note"] = str(payload.get("note") or "").strip()
if "pnl_amount" in payload:
fields["pnl_amount"] = _parse_optional_pnl(payload.get("pnl_amount"))
archive_now = False
if "result" in payload:
res_raw = payload.get("result")
if res_raw is None or str(res_raw).strip() == "":
fields["result"] = None
else:
fields["result"] = _validate_choice(res_raw, RESULTS, "结果")
archive_now = True
if not fields:
return _row_to_dict(row)
now = _now_ms()
fields["updated_at"] = now
if archive_now:
fields["status"] = "archived"
fields["archived_at"] = now
sets = ", ".join(f"{k}=?" for k in fields)
conn.execute(
f"UPDATE entry_plans SET {sets} WHERE id=?",
(*fields.values(), int(plan_id)),
)
updated = conn.execute("SELECT * FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone()
return _row_to_dict(updated)
finally:
conn.close()
def delete_entry_plan(plan_id: int, *, db_path: Path | None = None) -> bool:
init_db(db_path)
conn = _connect(db_path)
try:
row = conn.execute("SELECT status FROM entry_plans WHERE id=?", (int(plan_id),)).fetchone()
if not row:
return False
if row["status"] != "active":
raise ValueError("仅进行中的计划可删除")
cur = conn.execute("DELETE FROM entry_plans WHERE id=? AND status='active'", (int(plan_id),))
return int(cur.rowcount or 0) > 0
finally:
conn.close()
def _today_iso() -> str:
return datetime.now(DISPLAY_TZ).strftime("%Y-%m-%d")
def resolve_stats_date_bounds(
*,
period: str = "all",
date_from: str = "",
date_to: str = "",
) -> tuple[str | None, str | None, str]:
"""返回 (date_from, date_to, label)all 时 bounds 为 None。"""
p = (period or "all").strip().lower() or "all"
today = _today_iso()
if p == "all":
return None, None, "全部历史"
if p == "week":
day_dt = datetime.strptime(today, "%Y-%m-%d")
monday = (day_dt - timedelta(days=day_dt.weekday())).strftime("%Y-%m-%d")
return monday, today, f"本周 {monday}{today}"
if p == "month":
day_dt = datetime.strptime(today, "%Y-%m-%d")
first = day_dt.replace(day=1).strftime("%Y-%m-%d")
return first, today, f"本月 {first}{today}"
if p == "range":
df = (date_from or "").strip()[:10] or today
dt = (date_to or "").strip()[:10] or df
if df > dt:
df, dt = dt, df
label = f"区间 {df}{dt}" if df != dt else f"区间 {df}"
return df, dt, label
return None, None, "全部历史"
def compute_entry_plan_stats(
*,
dimension: str = "symbol",
period: str = "all",
date_from: str = "",
date_to: str = "",
db_path: Path | None = None,
) -> dict[str, Any]:
init_db(db_path)
dim = (dimension or "symbol").strip().lower()
if dim not in STAT_DIMENSIONS:
raise ValueError("dimension 无效")
df_bound, dt_bound, period_label = resolve_stats_date_bounds(
period=period, date_from=date_from, date_to=date_to
)
col_map = {
"symbol": "symbol",
"trend_tf": "trend_timeframe",
"entry_scheme": "entry_scheme",
}
col = col_map[dim]
conn = _connect(db_path)
try:
where = "status='archived' AND result IN ('win','loss')"
params: list[Any] = []
if df_bound:
where += " AND plan_date >= ? AND plan_date <= ?"
params.extend([df_bound, dt_bound])
rows = conn.execute(
f"""
SELECT {col} AS dim_key,
COUNT(*) AS total,
SUM(CASE WHEN result='win' THEN 1 ELSE 0 END) AS win_count,
SUM(CASE WHEN result='loss' THEN 1 ELSE 0 END) AS loss_count
FROM entry_plans
WHERE {where}
GROUP BY {col}
ORDER BY total DESC, dim_key ASC
""",
params,
).fetchall()
items = []
for r in rows:
total = int(r["total"] or 0)
wins = int(r["win_count"] or 0)
losses = int(r["loss_count"] or 0)
key = str(r["dim_key"] or "")
label = key
if dim == "entry_scheme":
label = ENTRY_SCHEMES.get(key, key)
elif dim == "trend_tf":
label = key
win_rate = round(wins / total * 100, 1) if total else None
items.append(
{
"key": key,
"label": label,
"total": total,
"win_count": wins,
"loss_count": losses,
"win_rate": win_rate,
}
)
return {
"dimension": dim,
"period": period,
"period_label": period_label,
"date_from": df_bound,
"date_to": dt_bound,
"items": items,
}
finally:
conn.close()
def meta_payload(exchanges: list[dict[str, Any]] | None = None) -> dict[str, Any]:
return {
"plan_types": [{"value": k, "label": v} for k, v in PLAN_TYPES.items()],
"trend_timeframes": list(TREND_TIMEFRAMES),
"entry_timeframes": list(ENTRY_TIMEFRAMES),
"directions": [{"value": k, "label": v} for k, v in DIRECTIONS.items()],
"entry_schemes": [{"value": k, "label": v} for k, v in ENTRY_SCHEMES.items()],
"results": [{"value": k, "label": v} for k, v in RESULTS.items()],
"stat_dimensions": [
{"value": "symbol", "label": "币种"},
{"value": "trend_tf", "label": "趋势周期"},
{"value": "entry_scheme", "label": "入场方案"},
],
"exchanges": exchanges or [],
}