feat: archive entry type from review, prune stale trades on sync, manual delete

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-08 12:39:27 +08:00
parent e68e29629e
commit 46963a4498
7 changed files with 260 additions and 19 deletions
+115 -8
View File
@@ -15,7 +15,11 @@ from hub_ohlcv_lib import (
aggregate_ohlcv_bars,
normalize_chart_timeframe,
)
from hub_trades_lib import effective_entry_type, effective_hold_minutes, format_hold_minutes
from hub_trades_lib import (
display_entry_type_label,
effective_hold_minutes,
format_hold_minutes,
)
ARCHIVE_TIMEFRAMES = frozenset({"5m", "15m", "1h", "4h"})
ARCHIVE_DEFAULT_TIMEFRAME = "15m"
@@ -160,18 +164,114 @@ def _parse_dt_ms(raw: Any) -> int | None:
return None
def _trade_entry_reason_for_cache(t: dict[str, Any]) -> str:
for key in ("entry_type", "entry_reason", "reviewed_entry_reason"):
raw = t.get(key)
if raw is not None and str(raw).strip():
return str(raw).strip()
return display_entry_type_label(t) if isinstance(t, dict) else ""
def purge_stale_trades_cache(
exchange_key: str,
active_trade_ids: list[int] | set[int],
*,
db_path: Path | None = None,
) -> int:
"""删除该所缓存中已不在复盘/交易记录里的条目。"""
ex_k = (exchange_key or "").strip().lower()
if not ex_k:
return 0
ids: list[int] = []
for raw in active_trade_ids or []:
try:
ids.append(int(raw))
except (TypeError, ValueError):
continue
conn = _connect(db_path)
try:
if not ids:
rows = conn.execute(
"SELECT trade_id FROM archive_trade_cache WHERE exchange_key=?",
(ex_k,),
).fetchall()
stale_ids = [int(r["trade_id"]) for r in rows]
cur = conn.execute(
"DELETE FROM archive_trade_cache WHERE exchange_key=?",
(ex_k,),
)
else:
placeholders = ",".join("?" * len(ids))
rows = conn.execute(
f"""
SELECT trade_id FROM archive_trade_cache
WHERE exchange_key=? AND trade_id NOT IN ({placeholders})
""",
(ex_k, *ids),
).fetchall()
stale_ids = [int(r["trade_id"]) for r in rows]
cur = conn.execute(
f"""
DELETE FROM archive_trade_cache
WHERE exchange_key=? AND trade_id NOT IN ({placeholders})
""",
(ex_k, *ids),
)
removed = int(cur.rowcount or 0)
if stale_ids:
ph2 = ",".join("?" * len(stale_ids))
conn.execute(
f"""
DELETE FROM trade_overlay
WHERE exchange_key=? AND trade_id IN ({ph2})
""",
(ex_k, *stale_ids),
)
return removed
finally:
conn.close()
def delete_trade_from_archive(
exchange_key: str,
trade_id: int,
*,
db_path: Path | None = None,
) -> bool:
ex_k = (exchange_key or "").strip().lower()
tid = int(trade_id)
conn = _connect(db_path)
try:
cur = conn.execute(
"""
DELETE FROM archive_trade_cache
WHERE exchange_key=? AND trade_id=?
""",
(ex_k, tid),
)
conn.execute(
"DELETE FROM trade_overlay WHERE exchange_key=? AND trade_id=?",
(ex_k, tid),
)
return int(cur.rowcount or 0) > 0
finally:
conn.close()
def upsert_trades_cache(
exchange_key: str,
trades: list[dict[str, Any]],
*,
db_path: Path | None = None,
) -> int:
prune_missing: bool = True,
) -> dict[str, int]:
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
if not ex_k:
return 0
return {"upserted": 0, "removed": 0}
now = _now_ms()
n = 0
active_ids: list[int] = []
conn = _connect(db_path)
try:
for t in trades or []:
@@ -182,7 +282,9 @@ def upsert_trades_cache(
sym = (t.get("symbol") or "").strip().upper()
if not sym:
continue
active_ids.append(tid)
payload = {k: t.get(k) for k in t.keys()}
entry_label = _trade_entry_reason_for_cache(t)
conn.execute(
"""
INSERT INTO archive_trade_cache (
@@ -216,7 +318,7 @@ def upsert_trades_cache(
t.get("opened_at_ms") or _parse_dt_ms(t.get("opened_at")),
t.get("closed_at_ms") or _parse_dt_ms(t.get("closed_at")),
t.get("monitor_type"),
t.get("entry_reason"),
entry_label,
json.dumps(payload, ensure_ascii=False, default=str),
now,
),
@@ -224,7 +326,10 @@ def upsert_trades_cache(
n += 1
finally:
conn.close()
return n
removed = 0
if prune_missing:
removed = purge_stale_trades_cache(ex_k, active_ids, db_path=db_path)
return {"upserted": n, "removed": removed}
def _enrich_trade_display_fields(out: dict[str, Any]) -> dict[str, Any]:
@@ -243,8 +348,8 @@ def _enrich_trade_display_fields(out: dict[str, Any]) -> dict[str, Any]:
out["closed_at"] = datetime.fromtimestamp(int(closed_ms) / 1000).strftime(
"%Y-%m-%d %H:%M:%S"
)
entry_type = (out.get("entry_type") or effective_entry_type(out) or "").strip()
if entry_type:
entry_type = display_entry_type_label(out)
if entry_type and entry_type != "":
out["entry_type"] = entry_type
out["entry_reason"] = entry_type
hold_m = out.get("hold_minutes")
@@ -961,7 +1066,7 @@ def sync_exchange_symbol_archives(
) -> dict[str, Any]:
"""同步单所:交易缓存 + 各币种 K 线种子/增量。"""
ex_k = (exchange_key or "").strip().lower()
upsert_trades_cache(ex_k, trades, db_path=db_path)
cache_stats = upsert_trades_cache(ex_k, trades, db_path=db_path, prune_missing=True)
by_sym: dict[str, int] = {}
for t in trades or []:
@@ -997,6 +1102,8 @@ def sync_exchange_symbol_archives(
"ok": True,
"exchange_key": ex_k,
"symbols": len(by_sym),
"trades_upserted": int(cache_stats.get("upserted") or 0),
"trades_removed": int(cache_stats.get("removed") or 0),
"seed_bars": seeded,
"appended_bars": appended,
"trades": len(trades or []),