ed3709dddf
Co-authored-by: Cursor <cursoragent@cursor.com>
158 lines
5.4 KiB
Python
158 lines
5.4 KiB
Python
"""开仓计划库:CRUD 与胜率统计。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import tempfile
|
|
from pathlib import Path
|
|
|
|
from hub_entry_plan_lib import (
|
|
compute_entry_plan_stats,
|
|
create_entry_plan,
|
|
delete_entry_plan,
|
|
init_db,
|
|
list_entry_plans,
|
|
normalize_plan_symbol,
|
|
resolve_stats_date_bounds,
|
|
update_entry_plan,
|
|
)
|
|
|
|
|
|
def _base_payload(**overrides):
|
|
data = {
|
|
"plan_date": "2026-06-14",
|
|
"exchange_key": "binance",
|
|
"symbol": "BTC",
|
|
"plan_type": "trend",
|
|
"trend_timeframe": "4h",
|
|
"entry_timeframe": "15m",
|
|
"direction": "long",
|
|
"target_level": "70000",
|
|
"current_range": "68000-69000",
|
|
"entry_scheme": "breakout",
|
|
"note": "test",
|
|
}
|
|
data.update(overrides)
|
|
return data
|
|
|
|
|
|
def test_normalize_plan_symbol():
|
|
assert normalize_plan_symbol("btc") == "BTC/USDT"
|
|
assert normalize_plan_symbol("ETH/USDT") == "ETH/USDT"
|
|
|
|
|
|
def test_create_without_entry_scheme():
|
|
with tempfile.TemporaryDirectory() as td:
|
|
db = Path(td) / "plans.db"
|
|
payload = _base_payload()
|
|
del payload["entry_scheme"]
|
|
row = create_entry_plan(payload, db_path=db)
|
|
assert row["entry_scheme"] == ""
|
|
assert row["entry_scheme_label"] == "待填写"
|
|
|
|
|
|
def test_archive_requires_entry_scheme():
|
|
with tempfile.TemporaryDirectory() as td:
|
|
db = Path(td) / "plans.db"
|
|
payload = _base_payload()
|
|
del payload["entry_scheme"]
|
|
row = create_entry_plan(payload, db_path=db)
|
|
try:
|
|
update_entry_plan(int(row["id"]), {"result": "win"}, db_path=db)
|
|
assert False, "expected ValueError"
|
|
except ValueError as e:
|
|
assert "入场方案" in str(e)
|
|
updated = update_entry_plan(
|
|
int(row["id"]),
|
|
{"entry_scheme": "breakout", "result": "win"},
|
|
db_path=db,
|
|
)
|
|
assert updated["status"] == "archived"
|
|
|
|
|
|
def test_create_list_delete_active_plan():
|
|
with tempfile.TemporaryDirectory() as td:
|
|
db = Path(td) / "plans.db"
|
|
row = create_entry_plan(_base_payload(), db_path=db)
|
|
assert row["status"] == "active"
|
|
assert row["symbol"] == "BTC/USDT"
|
|
active = list_entry_plans(status="active", db_path=db)
|
|
assert len(active) == 1
|
|
assert delete_entry_plan(int(row["id"]), db_path=db) is True
|
|
assert list_entry_plans(status="active", db_path=db) == []
|
|
|
|
|
|
def test_archive_on_result():
|
|
with tempfile.TemporaryDirectory() as td:
|
|
db = Path(td) / "plans.db"
|
|
row = create_entry_plan(_base_payload(symbol="SOL"), db_path=db)
|
|
updated = update_entry_plan(
|
|
int(row["id"]),
|
|
{"result": "win", "pnl_amount": 12.5},
|
|
db_path=db,
|
|
)
|
|
assert updated["status"] == "archived"
|
|
assert updated["result"] == "win"
|
|
assert updated["pnl_amount"] == 12.5
|
|
assert list_entry_plans(status="active", db_path=db) == []
|
|
archived = list_entry_plans(status="archived", db_path=db)
|
|
assert len(archived) == 1
|
|
|
|
|
|
def test_archive_without_pnl_amount():
|
|
with tempfile.TemporaryDirectory() as td:
|
|
db = Path(td) / "plans.db"
|
|
row = create_entry_plan(_base_payload(symbol="DOGE"), db_path=db)
|
|
updated = update_entry_plan(int(row["id"]), {"result": "loss"}, db_path=db)
|
|
assert updated["status"] == "archived"
|
|
assert updated["pnl_amount"] is None
|
|
|
|
|
|
def test_cannot_delete_archived():
|
|
with tempfile.TemporaryDirectory() as td:
|
|
db = Path(td) / "plans.db"
|
|
row = create_entry_plan(_base_payload(), db_path=db)
|
|
update_entry_plan(int(row["id"]), {"result": "win"}, db_path=db)
|
|
try:
|
|
delete_entry_plan(int(row["id"]), db_path=db)
|
|
assert False, "expected ValueError"
|
|
except ValueError as e:
|
|
assert "仅进行中" in str(e)
|
|
|
|
|
|
def test_compute_stats_by_symbol():
|
|
with tempfile.TemporaryDirectory() as td:
|
|
db = Path(td) / "plans.db"
|
|
for sym, res in (("BTC", "win"), ("BTC", "loss"), ("ETH", "win")):
|
|
row = create_entry_plan(_base_payload(symbol=sym), db_path=db)
|
|
update_entry_plan(int(row["id"]), {"result": res}, db_path=db)
|
|
stats = compute_entry_plan_stats(dimension="symbol", period="all", db_path=db)
|
|
by_sym = {it["key"]: it for it in stats["items"]}
|
|
assert by_sym["BTC/USDT"]["win_count"] == 1
|
|
assert by_sym["BTC/USDT"]["loss_count"] == 1
|
|
assert by_sym["BTC/USDT"]["win_rate"] == 50.0
|
|
assert by_sym["ETH/USDT"]["win_count"] == 1
|
|
|
|
|
|
def test_stats_period_range_filter():
|
|
with tempfile.TemporaryDirectory() as td:
|
|
db = Path(td) / "plans.db"
|
|
row1 = create_entry_plan(_base_payload(plan_date="2026-06-01"), db_path=db)
|
|
row2 = create_entry_plan(_base_payload(plan_date="2026-06-20", symbol="ETH"), db_path=db)
|
|
update_entry_plan(int(row1["id"]), {"result": "win"}, db_path=db)
|
|
update_entry_plan(int(row2["id"]), {"result": "loss"}, db_path=db)
|
|
stats = compute_entry_plan_stats(
|
|
dimension="symbol",
|
|
period="range",
|
|
date_from="2026-06-01",
|
|
date_to="2026-06-10",
|
|
db_path=db,
|
|
)
|
|
assert len(stats["items"]) == 1
|
|
assert stats["items"][0]["key"] == "BTC/USDT"
|
|
|
|
|
|
def test_resolve_stats_date_bounds():
|
|
df, dt, label = resolve_stats_date_bounds(period="all")
|
|
assert df is None and dt is None
|
|
assert "全部" in label
|