Files
crypto_monitor/hub_symbol_archive_lib.py
T
2026-06-21 09:03:21 +08:00

1529 lines
50 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""中控币种档案:永久 5m K 线库(建档种子 + 4h 增量),交易缓存与 overlay。"""
from __future__ import annotations
import json
import os
import sqlite3
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any, Callable, Optional
from zoneinfo import ZoneInfo
CHART_DISPLAY_TZ = ZoneInfo(os.getenv("APP_TIMEZONE", "Asia/Shanghai"))
from hub_ohlcv_lib import (
TIMEFRAME_MS,
aggregate_ohlcv_bars,
normalize_chart_timeframe,
normalize_perpetual_symbol,
)
from hub_trades_lib import (
display_entry_type_label,
effective_hold_minutes,
format_hold_minutes,
)
ARCHIVE_TIMEFRAMES = frozenset({"5m", "15m", "1h", "4h"})
ARCHIVE_DEFAULT_TIMEFRAME = "15m"
ARCHIVE_SEED_LOOKBACK_DAYS = 30
ARCHIVE_VISIBLE_BARS_DEFAULT = 200
ARCHIVE_MAX_CANDLES: dict[str, int] = {
"5m": 9000,
"15m": 15000,
"1h": 4000,
"4h": 2000,
}
ARCHIVE_SYNC_INTERVAL_SEC = int(os.getenv("HUB_ARCHIVE_SYNC_INTERVAL_SEC", str(4 * 3600)))
ARCHIVE_TRADE_DAYS = int(os.getenv("HUB_ARCHIVE_TRADE_DAYS", "365"))
ARCHIVE_TRADE_LIMIT = int(os.getenv("HUB_ARCHIVE_TRADE_LIMIT", "2000"))
ARCHIVE_QUOTES_MAX = int(os.getenv("HUB_ARCHIVE_QUOTES_MAX", "100"))
TRADING_DAY_RESET_HOUR = int(os.getenv("TRADING_DAY_RESET_HOUR", "8"))
ARCHIVE_QUOTE_MAX_LEN = 5000
BEHAVIOR_TAGS = frozenset({"", "sick", "emotion"})
def default_db_path() -> Path:
raw = (os.getenv("HUB_ARCHIVE_DB_PATH") or "").strip()
if raw:
return Path(raw)
hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data"
hub_dir.mkdir(parents=True, exist_ok=True)
return hub_dir / "hub_symbol_archive.db"
def _connect(db_path: Path | None = None) -> sqlite3.Connection:
path = db_path or default_db_path()
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path), timeout=30, isolation_level=None)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def init_db(db_path: Path | None = None) -> None:
conn = _connect(db_path)
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS archive_meta (
exchange_key TEXT NOT NULL,
symbol TEXT NOT NULL,
first_trade_opened_ms INTEGER,
archive_started_at INTEGER NOT NULL,
last_kline_sync_ms INTEGER,
last_trade_sync_ms INTEGER,
seed_complete INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (exchange_key, symbol)
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS archive_bars_5m (
exchange_key TEXT NOT NULL,
symbol TEXT NOT NULL,
open_time_ms INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume REAL NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL,
PRIMARY KEY (exchange_key, symbol, open_time_ms)
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_archive_bars_series
ON archive_bars_5m (exchange_key, symbol, open_time_ms)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS archive_trade_cache (
exchange_key TEXT NOT NULL,
trade_id INTEGER NOT NULL,
symbol TEXT NOT NULL,
direction TEXT,
result TEXT,
pnl_amount REAL,
opened_at TEXT,
closed_at TEXT,
opened_at_ms INTEGER,
closed_at_ms INTEGER,
monitor_type TEXT,
entry_reason TEXT,
payload_json TEXT,
synced_at INTEGER NOT NULL,
PRIMARY KEY (exchange_key, trade_id)
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_archive_trades_sym
ON archive_trade_cache (exchange_key, symbol, closed_at_ms)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS trade_overlay (
exchange_key TEXT NOT NULL,
trade_id INTEGER NOT NULL,
behavior_tag TEXT NOT NULL DEFAULT '',
note TEXT NOT NULL DEFAULT '',
updated_at INTEGER NOT NULL,
PRIMARY KEY (exchange_key, trade_id)
)
"""
)
conn.execute(
"""
CREATE TABLE IF NOT EXISTS archive_review_quotes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
quote_date TEXT NOT NULL UNIQUE,
content TEXT NOT NULL DEFAULT '',
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_archive_quotes_date
ON archive_review_quotes (quote_date DESC)
"""
)
finally:
conn.close()
def _now_ms() -> int:
return int(time.time() * 1000)
def parse_wall_clock_ms(raw: Any, *, tz: ZoneInfo = CHART_DISPLAY_TZ) -> int | None:
"""将 YYYY-MM-DD[ HH:MM[:SS]] 按指定时区墙钟解析为 UTC 毫秒(默认 UTC+8)。"""
if raw in (None, ""):
return None
try:
if isinstance(raw, (int, float)):
v = int(raw)
return v if v > 1_000_000_000_000 else v * 1000
except (TypeError, ValueError):
pass
s = str(raw).strip().replace("Z", "").replace("T", " ")
if not s:
return None
if s.isdigit():
v = int(s)
return v if v > 1_000_000_000_000 else v * 1000
for fmt, ln in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d %H:%M", 16), ("%Y-%m-%d", 10)):
try:
dt = datetime.strptime(s[:ln], fmt)
aware = dt.replace(tzinfo=tz)
return int(aware.timestamp() * 1000)
except ValueError:
continue
return None
def ms_to_wall_clock_str(ms: int, *, tz: ZoneInfo = CHART_DISPLAY_TZ) -> str:
dt = datetime.fromtimestamp(int(ms) / 1000.0, tz=timezone.utc).astimezone(tz)
return dt.strftime("%Y-%m-%d %H:%M:%S")
def _parse_dt_ms(raw: Any) -> int | None:
return parse_wall_clock_ms(raw)
def _trade_entry_reason_for_cache(t: dict[str, Any]) -> str:
for key in ("entry_type", "entry_reason", "reviewed_entry_reason"):
raw = t.get(key)
if raw is not None and str(raw).strip():
return str(raw).strip()
return display_entry_type_label(t) if isinstance(t, dict) else ""
def purge_stale_trades_cache(
exchange_key: str,
active_trade_ids: list[int] | set[int],
*,
db_path: Path | None = None,
) -> int:
"""删除该所缓存中已不在复盘/交易记录里的条目。"""
ex_k = (exchange_key or "").strip().lower()
if not ex_k:
return 0
ids: list[int] = []
for raw in active_trade_ids or []:
try:
ids.append(int(raw))
except (TypeError, ValueError):
continue
conn = _connect(db_path)
try:
if not ids:
rows = conn.execute(
"SELECT trade_id FROM archive_trade_cache WHERE exchange_key=?",
(ex_k,),
).fetchall()
stale_ids = [int(r["trade_id"]) for r in rows]
cur = conn.execute(
"DELETE FROM archive_trade_cache WHERE exchange_key=?",
(ex_k,),
)
else:
placeholders = ",".join("?" * len(ids))
rows = conn.execute(
f"""
SELECT trade_id FROM archive_trade_cache
WHERE exchange_key=? AND trade_id NOT IN ({placeholders})
""",
(ex_k, *ids),
).fetchall()
stale_ids = [int(r["trade_id"]) for r in rows]
cur = conn.execute(
f"""
DELETE FROM archive_trade_cache
WHERE exchange_key=? AND trade_id NOT IN ({placeholders})
""",
(ex_k, *ids),
)
removed = int(cur.rowcount or 0)
if stale_ids:
ph2 = ",".join("?" * len(stale_ids))
conn.execute(
f"""
DELETE FROM trade_overlay
WHERE exchange_key=? AND trade_id IN ({ph2})
""",
(ex_k, *stale_ids),
)
return removed
finally:
conn.close()
def delete_trade_from_archive(
exchange_key: str,
trade_id: int,
*,
db_path: Path | None = None,
) -> bool:
ex_k = (exchange_key or "").strip().lower()
tid = int(trade_id)
conn = _connect(db_path)
try:
cur = conn.execute(
"""
DELETE FROM archive_trade_cache
WHERE exchange_key=? AND trade_id=?
""",
(ex_k, tid),
)
conn.execute(
"DELETE FROM trade_overlay WHERE exchange_key=? AND trade_id=?",
(ex_k, tid),
)
return int(cur.rowcount or 0) > 0
finally:
conn.close()
def upsert_trades_cache(
exchange_key: str,
trades: list[dict[str, Any]],
*,
db_path: Path | None = None,
prune_missing: bool = True,
) -> dict[str, int]:
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
if not ex_k:
return {"upserted": 0, "removed": 0}
now = _now_ms()
n = 0
active_ids: list[int] = []
conn = _connect(db_path)
try:
for t in trades or []:
try:
tid = int(t.get("id"))
except (TypeError, ValueError):
continue
sym = (t.get("symbol") or "").strip().upper()
if not sym:
continue
active_ids.append(tid)
row = dict(t)
row["exchange_key"] = ex_k
row.pop("account_exchange_key", None)
payload = {k: row.get(k) for k in row.keys()}
entry_label = _trade_entry_reason_for_cache(t)
conn.execute(
"""
INSERT INTO archive_trade_cache (
exchange_key, trade_id, symbol, direction, result, pnl_amount,
opened_at, closed_at, opened_at_ms, closed_at_ms,
monitor_type, entry_reason, payload_json, synced_at
) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(exchange_key, trade_id) DO UPDATE SET
symbol=excluded.symbol,
direction=excluded.direction,
result=excluded.result,
pnl_amount=excluded.pnl_amount,
opened_at=excluded.opened_at,
closed_at=excluded.closed_at,
opened_at_ms=excluded.opened_at_ms,
closed_at_ms=excluded.closed_at_ms,
monitor_type=excluded.monitor_type,
entry_reason=excluded.entry_reason,
payload_json=excluded.payload_json,
synced_at=excluded.synced_at
""",
(
ex_k,
tid,
sym,
t.get("direction"),
t.get("result"),
float(t.get("pnl_amount") or 0),
t.get("opened_at"),
t.get("closed_at"),
t.get("opened_at_ms") or _parse_dt_ms(t.get("opened_at")),
t.get("closed_at_ms") or _parse_dt_ms(t.get("closed_at")),
t.get("monitor_type"),
entry_label,
json.dumps(payload, ensure_ascii=False, default=str),
now,
),
)
n += 1
finally:
conn.close()
removed = 0
if prune_missing:
removed = purge_stale_trades_cache(ex_k, active_ids, db_path=db_path)
return {"upserted": n, "removed": removed}
def _enrich_trade_display_fields(out: dict[str, Any]) -> dict[str, Any]:
"""缓存行补齐复盘优先的展示字段(兼容旧同步数据)。"""
opened_ms = out.get("opened_at_ms") or _parse_dt_ms(out.get("opened_at"))
closed_ms = out.get("closed_at_ms") or _parse_dt_ms(out.get("closed_at"))
if opened_ms:
out["opened_at_ms"] = int(opened_ms)
if closed_ms:
out["closed_at_ms"] = int(closed_ms)
if not out.get("opened_at") and opened_ms:
out["opened_at"] = ms_to_wall_clock_str(int(opened_ms))
if not out.get("closed_at") and closed_ms:
out["closed_at"] = ms_to_wall_clock_str(int(closed_ms))
entry_type = display_entry_type_label(out)
if entry_type and entry_type != "":
out["entry_type"] = entry_type
out["entry_reason"] = entry_type
hold_m = out.get("hold_minutes")
if hold_m in (None, ""):
hold_m = effective_hold_minutes(
out,
opened_ms=out.get("opened_at_ms"),
closed_ms=out.get("closed_at_ms"),
)
try:
hold_m = max(0, int(hold_m or 0))
except (TypeError, ValueError):
hold_m = 0
out["hold_minutes"] = hold_m
out["hold_minutes_text"] = out.get("hold_minutes_text") or format_hold_minutes(hold_m)
if "reviewed" not in out:
out["reviewed"] = bool(
out.get("reviewed_at")
or out.get("reviewed_result")
or out.get("reviewed_opened_at")
or out.get("reviewed_closed_at")
or out.get("reviewed_entry_reason")
or out.get("reviewed_hold_minutes")
)
return out
def _trade_row_to_dict(row: sqlite3.Row, overlay: dict | None = None) -> dict[str, Any]:
d = dict(row)
payload = {}
raw = d.pop("payload_json", None)
if raw:
try:
payload = json.loads(raw)
except (json.JSONDecodeError, TypeError):
payload = {}
out = {**payload, **{k: d[k] for k in d.keys() if k not in payload}}
for key in (
"exchange_key",
"symbol",
"trade_id",
"direction",
"result",
"pnl_amount",
"opened_at",
"closed_at",
"opened_at_ms",
"closed_at_ms",
"monitor_type",
"entry_reason",
"synced_at",
):
if key in d and d[key] not in (None, ""):
out[key] = d[key]
ov = overlay or {}
out["behavior_tag"] = ov.get("behavior_tag") or ""
out["note"] = ov.get("note") or ""
out["trade_id"] = out.get("trade_id") or out.get("id")
ex_col = str(d.get("exchange_key") or "").strip().lower()
if ex_col:
out["exchange_key"] = ex_col
out.pop("account_exchange_key", None)
return _enrich_trade_display_fields(out)
def load_overlays(
exchange_key: str,
trade_ids: list[int] | None = None,
*,
db_path: Path | None = None,
) -> dict[int, dict[str, Any]]:
ex_k = (exchange_key or "").strip().lower()
conn = _connect(db_path)
try:
if trade_ids:
placeholders = ",".join("?" * len(trade_ids))
rows = conn.execute(
f"""
SELECT exchange_key, trade_id, behavior_tag, note, updated_at
FROM trade_overlay
WHERE exchange_key=? AND trade_id IN ({placeholders})
""",
(ex_k, *trade_ids),
).fetchall()
else:
rows = conn.execute(
"""
SELECT exchange_key, trade_id, behavior_tag, note, updated_at
FROM trade_overlay WHERE exchange_key=?
""",
(ex_k,),
).fetchall()
return {
int(r["trade_id"]): {
"behavior_tag": r["behavior_tag"] or "",
"note": r["note"] or "",
"updated_at": r["updated_at"],
}
for r in rows
}
finally:
conn.close()
def upsert_trade_overlay(
exchange_key: str,
trade_id: int,
*,
behavior_tag: str | None = None,
note: str | None = None,
db_path: Path | None = None,
) -> dict[str, Any]:
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
tid = int(trade_id)
tag = (behavior_tag or "").strip().lower()
if tag not in BEHAVIOR_TAGS:
tag = ""
note_text = (note or "").strip()[:2000]
now = _now_ms()
conn = _connect(db_path)
try:
conn.execute(
"""
INSERT INTO trade_overlay (exchange_key, trade_id, behavior_tag, note, updated_at)
VALUES (?,?,?,?,?)
ON CONFLICT(exchange_key, trade_id) DO UPDATE SET
behavior_tag=excluded.behavior_tag,
note=excluded.note,
updated_at=excluded.updated_at
""",
(ex_k, tid, tag, note_text, now),
)
finally:
conn.close()
return {"exchange_key": ex_k, "trade_id": tid, "behavior_tag": tag, "note": note_text}
def list_symbol_rows(
*,
exchange_key: str = "",
filter_profit: bool = False,
filter_loss: bool = False,
filter_sick: bool = False,
filter_emotion: bool = False,
db_path: Path | None = None,
) -> list[dict[str, Any]]:
"""一所一币一行汇总。"""
init_db(db_path)
conn = _connect(db_path)
try:
params: list[Any] = []
where = "1=1"
ex_filter = (exchange_key or "").strip().lower()
if ex_filter:
where += " AND t.exchange_key=?"
params.append(ex_filter)
rows = conn.execute(
f"""
SELECT t.exchange_key, t.symbol,
COUNT(*) AS trade_count,
SUM(CASE WHEN t.pnl_amount > 0.0001 THEN 1 ELSE 0 END) AS win_count,
SUM(CASE WHEN t.pnl_amount < -0.0001 THEN 1 ELSE 0 END) AS loss_count,
SUM(COALESCE(t.pnl_amount, 0)) AS total_pnl,
MIN(COALESCE(t.opened_at_ms, 0)) AS first_opened_ms,
MAX(COALESCE(t.closed_at_ms, 0)) AS last_closed_ms
FROM archive_trade_cache t
WHERE {where}
GROUP BY t.exchange_key, t.symbol
ORDER BY last_closed_ms DESC
""",
params,
).fetchall()
overlays_by_ex: dict[str, dict[int, dict]] = {}
out: list[dict[str, Any]] = []
for r in rows:
ex_k = r["exchange_key"]
sym = r["symbol"]
if ex_k not in overlays_by_ex:
overlays_by_ex[ex_k] = load_overlays(ex_k, db_path=db_path)
trade_rows = conn.execute(
"""
SELECT trade_id, pnl_amount FROM archive_trade_cache
WHERE exchange_key=? AND symbol=?
""",
(ex_k, sym),
).fetchall()
has_profit = any(float(x["pnl_amount"] or 0) > 0.0001 for x in trade_rows)
has_loss = any(float(x["pnl_amount"] or 0) < -0.0001 for x in trade_rows)
has_sick = False
has_emotion = False
ov_map = overlays_by_ex.get(ex_k) or {}
for tr in trade_rows:
ov = ov_map.get(int(tr["trade_id"])) or {}
if ov.get("behavior_tag") == "sick":
has_sick = True
if ov.get("behavior_tag") == "emotion":
has_emotion = True
if filter_profit and not has_profit:
continue
if filter_loss and not has_loss:
continue
if filter_sick and not has_sick:
continue
if filter_emotion and not has_emotion:
continue
meta = conn.execute(
"SELECT seed_complete, last_kline_sync_ms FROM archive_meta WHERE exchange_key=? AND symbol=?",
(ex_k, sym),
).fetchone()
out.append(
{
"exchange_key": ex_k,
"symbol": sym,
"trade_count": int(r["trade_count"] or 0),
"win_count": int(r["win_count"] or 0),
"loss_count": int(r["loss_count"] or 0),
"total_pnl": round(float(r["total_pnl"] or 0), 4),
"first_opened_ms": int(r["first_opened_ms"] or 0) or None,
"last_closed_ms": int(r["last_closed_ms"] or 0) or None,
"seed_complete": bool(meta["seed_complete"]) if meta else False,
"last_kline_sync_ms": int(meta["last_kline_sync_ms"] or 0) if meta else None,
}
)
return out
finally:
conn.close()
def load_symbol_trades(
exchange_key: str,
symbol: str,
*,
db_path: Path | None = None,
) -> list[dict[str, Any]]:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
conn = _connect(db_path)
try:
rows = conn.execute(
"""
SELECT * FROM archive_trade_cache
WHERE exchange_key=? AND symbol=?
ORDER BY COALESCE(closed_at_ms, 0) DESC, trade_id DESC
""",
(ex_k, sym),
).fetchall()
ids = [int(r["trade_id"]) for r in rows]
ov = load_overlays(ex_k, ids, db_path=db_path)
return [_trade_row_to_dict(r, ov.get(int(r["trade_id"]))) for r in rows]
finally:
conn.close()
def upsert_bars_5m(
exchange_key: str,
symbol: str,
bars: list[dict[str, Any]],
*,
db_path: Path | None = None,
) -> int:
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
now = _now_ms()
n = 0
conn = _connect(db_path)
try:
for b in bars or []:
try:
conn.execute(
"""
INSERT INTO archive_bars_5m (
exchange_key, symbol, open_time_ms, open, high, low, close, volume, updated_at
) VALUES (?,?,?,?,?,?,?,?,?)
ON CONFLICT(exchange_key, symbol, open_time_ms) DO UPDATE SET
open=excluded.open,
high=excluded.high,
low=excluded.low,
close=excluded.close,
volume=excluded.volume,
updated_at=excluded.updated_at
""",
(
ex_k,
sym,
int(b["open_time_ms"]),
float(b["open"]),
float(b["high"]),
float(b["low"]),
float(b["close"]),
float(b.get("volume") or 0),
now,
),
)
n += 1
except (KeyError, TypeError, ValueError):
continue
finally:
conn.close()
return n
def load_bars_5m_range(
exchange_key: str,
symbol: str,
start_ms: int,
end_ms: int,
*,
db_path: Path | None = None,
) -> list[dict[str, Any]]:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
conn = _connect(db_path)
try:
rows = conn.execute(
"""
SELECT open_time_ms, open, high, low, close, volume
FROM archive_bars_5m
WHERE exchange_key=? AND symbol=?
AND open_time_ms >= ? AND open_time_ms <= ?
ORDER BY open_time_ms ASC
""",
(ex_k, sym, int(start_ms), int(end_ms)),
).fetchall()
return [
{
"open_time_ms": int(r["open_time_ms"]),
"open": float(r["open"]),
"high": float(r["high"]),
"low": float(r["low"]),
"close": float(r["close"]),
"volume": float(r["volume"] or 0),
}
for r in rows
]
finally:
conn.close()
def _to_candles(bars: list[dict[str, Any]]) -> list[dict[str, Any]]:
out = []
for b in bars or []:
try:
out.append(
{
"time": int(b["open_time_ms"] // 1000),
"open": float(b["open"]),
"high": float(b["high"]),
"low": float(b["low"]),
"close": float(b["close"]),
"volume": float(b.get("volume") or 0),
}
)
except (KeyError, TypeError, ValueError):
continue
return out
def _snap_to_bar_grid(ts_ms: int, origin_ms: int, step_ms: int) -> int:
step = max(1, int(step_ms))
origin = int(origin_ms)
if ts_ms <= origin:
return origin
idx = (int(ts_ms) - origin + step - 1) // step
return origin + idx * step
def _fill_missing_bars(
bars: list[dict[str, Any]],
period_ms: int,
start_ms: int,
end_ms: int,
) -> list[dict[str, Any]]:
"""5m 缺口用上一根收盘价填平,保证聚合后 K 线时间轴连续。"""
by_ts: dict[int, dict[str, Any]] = {}
for b in bars or []:
try:
by_ts[int(b["open_time_ms"])] = b
except (KeyError, TypeError, ValueError):
continue
if not by_ts:
return []
keys = sorted(by_ts.keys())
step_ms = max(1, int(period_ms))
origin = keys[0]
aligned_start = _snap_to_bar_grid(int(start_ms), origin, step_ms)
aligned_end = max(int(end_ms), keys[-1])
out: list[dict[str, Any]] = []
last: dict[str, Any] | None = None
for ts_key in keys:
if ts_key <= aligned_start:
last = by_ts[ts_key]
ts = aligned_start
while ts <= aligned_end:
cur = by_ts.get(ts)
if cur is not None:
last = cur
out.append(cur)
elif last is not None:
c = float(last["close"])
out.append(
{
"open_time_ms": ts,
"open": c,
"high": c,
"low": c,
"close": c,
"volume": 0.0,
"filled": True,
}
)
ts += step_ms
return out
def _archive_earliest_bar_ms(
exchange_key: str,
symbol: str,
*,
db_path: Path | None = None,
) -> int | None:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
conn = _connect(db_path)
try:
row = conn.execute(
"SELECT MIN(open_time_ms) AS mn FROM archive_bars_5m WHERE exchange_key=? AND symbol=?",
(ex_k, sym),
).fetchone()
if row and row["mn"] is not None:
return int(row["mn"])
finally:
conn.close()
return None
def _trim_bars_for_cap(
bars: list[dict[str, Any]],
*,
end_ms: int,
max_n: int,
) -> list[dict[str, Any]]:
"""超长时优先保留到平仓,再从最古老端截断。"""
if len(bars) <= max_n:
return bars
cut_end = len(bars)
for i in range(len(bars) - 1, -1, -1):
if int(bars[i]["open_time_ms"]) <= int(end_ms):
cut_end = i + 1
break
essential = bars[:cut_end]
if len(essential) <= max_n:
return essential
return essential[len(essential) - max_n :]
def resolve_archive_chart(
exchange_key: str,
symbol: str,
timeframe: str = ARCHIVE_DEFAULT_TIMEFRAME,
*,
anchor_ms: int | None = None,
opened_ms: int | None = None,
closed_ms: int | None = None,
mode: str = "hold",
bars: int = ARCHIVE_VISIBLE_BARS_DEFAULT,
range_mode: str = "window",
db_path: Path | None = None,
) -> dict[str, Any]:
"""从永久 5m 库聚合出档案 K 线视窗。
range_mode=history:建档起点 → 平仓(不含「到现在」),供拖动/缩放查看建仓前全局形态。
"""
tf = normalize_chart_timeframe(timeframe, default=ARCHIVE_DEFAULT_TIMEFRAME)
if tf not in ARCHIVE_TIMEFRAMES:
return {"ok": False, "msg": f"档案仅支持 {', '.join(sorted(ARCHIVE_TIMEFRAMES))}"}
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
if not ex_k or not sym:
return {"ok": False, "msg": "缺少 exchange_key 或 symbol"}
period = TIMEFRAME_MS[tf]
period_5m = TIMEFRAME_MS["5m"]
hold_open = int(opened_ms) if opened_ms else None
hold_close = int(closed_ms) if closed_ms else None
rm = (range_mode or "window").strip().lower()
if hold_open and hold_close and hold_close >= hold_open and rm == "history":
seed_back = max(0, hold_open - ARCHIVE_SEED_LOOKBACK_DAYS * 86400000)
earliest = _archive_earliest_bar_ms(ex_k, sym, db_path=db_path)
if earliest is not None:
start_ms = min(earliest, seed_back)
else:
start_ms = seed_back
end_ms = hold_close + max(period * 16, period_5m * 8)
anchor = hold_close if (mode or "hold").strip().lower() != "entry" else hold_open
elif hold_open and hold_close and hold_close >= hold_open:
hold_len = hold_close - hold_open
pad = max(period * 24, hold_len // 3, period_5m * 12)
start_ms = max(0, hold_open - pad)
end_ms = hold_close + pad
anchor = hold_close if (mode or "hold").strip().lower() != "entry" else hold_open
else:
visible = max(50, min(int(bars or ARCHIVE_VISIBLE_BARS_DEFAULT), 500))
anchor = int(anchor_ms) if anchor_ms else _now_ms()
half = visible // 2
start_ms = max(0, anchor - half * period)
end_ms = anchor + half * period
raw_5m = load_bars_5m_range(
ex_k,
sym,
start_ms - period_5m * 6,
end_ms + period_5m * 6,
db_path=db_path,
)
if not raw_5m:
return {"ok": False, "msg": "档案库暂无 K 线,请等待同步或手动刷新"}
filled_5m = _fill_missing_bars(raw_5m, period_5m, start_ms - period_5m * 2, end_ms + period_5m * 2)
if tf == "5m":
merged = [b for b in filled_5m if start_ms <= int(b["open_time_ms"]) <= end_ms]
else:
agg = aggregate_ohlcv_bars(filled_5m, tf)
merged = [b for b in agg if start_ms <= int(b["open_time_ms"]) <= end_ms]
max_n = ARCHIVE_MAX_CANDLES.get(tf, 2000)
if rm == "history" and merged and len(merged) > max_n:
merged = merged[:max_n]
candles = _to_candles(merged)
if not candles:
return {"ok": False, "msg": "视窗内无 K 线"}
ex_sym = normalize_perpetual_symbol(sym)
return {
"ok": True,
"exchange_key": ex_k,
"symbol": sym,
"exchange_symbol": ex_sym,
"market_type": "swap",
"timeframe": tf,
"mode": (mode or "hold").strip().lower(),
"range_mode": rm,
"anchor_ms": anchor,
"opened_ms": hold_open,
"closed_ms": hold_close,
"window_start_ms": start_ms,
"window_end_ms": end_ms,
"candles": candles,
"bar_count": len(candles),
"gaps_filled": sum(1 for b in filled_5m if b.get("filled")),
}
def _ensure_meta(
exchange_key: str,
symbol: str,
first_opened_ms: int | None,
*,
db_path: Path | None = None,
) -> None:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
now = _now_ms()
conn = _connect(db_path)
try:
row = conn.execute(
"SELECT first_trade_opened_ms FROM archive_meta WHERE exchange_key=? AND symbol=?",
(ex_k, sym),
).fetchone()
if row:
if first_opened_ms and (
not row["first_trade_opened_ms"]
or int(first_opened_ms) < int(row["first_trade_opened_ms"])
):
conn.execute(
"""
UPDATE archive_meta SET first_trade_opened_ms=?
WHERE exchange_key=? AND symbol=?
""",
(int(first_opened_ms), ex_k, sym),
)
return
conn.execute(
"""
INSERT INTO archive_meta (
exchange_key, symbol, first_trade_opened_ms,
archive_started_at, last_kline_sync_ms, last_trade_sync_ms, seed_complete
) VALUES (?,?,?,?,?,?,0)
""",
(ex_k, sym, int(first_opened_ms) if first_opened_ms else None, now, None, None),
)
finally:
conn.close()
def _mark_meta_sync(
exchange_key: str,
symbol: str,
*,
kline_ms: int | None = None,
trade_ms: int | None = None,
seed_complete: bool | None = None,
db_path: Path | None = None,
) -> None:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
conn = _connect(db_path)
try:
sets = []
params: list[Any] = []
if kline_ms is not None:
sets.append("last_kline_sync_ms=?")
params.append(int(kline_ms))
if trade_ms is not None:
sets.append("last_trade_sync_ms=?")
params.append(int(trade_ms))
if seed_complete is not None:
sets.append("seed_complete=?")
params.append(1 if seed_complete else 0)
if not sets:
return
params.extend([ex_k, sym])
conn.execute(
f"UPDATE archive_meta SET {', '.join(sets)} WHERE exchange_key=? AND symbol=?",
params,
)
finally:
conn.close()
def fetch_remote_5m_range(
remote_fetch: Callable[..., dict[str, Any]],
symbol: str,
start_ms: int,
end_ms: int,
) -> list[dict[str, Any]]:
"""经实例 /api/hub/ohlcv 分页拉取 5m。"""
period = TIMEFRAME_MS["5m"]
since = max(0, int(start_ms))
end = int(end_ms)
merged: dict[int, dict[str, Any]] = {}
guard = 0
while since < end and guard < 120:
guard += 1
remote = remote_fetch(symbol=symbol, timeframe="5m", since_ms=since, limit=500)
if not remote.get("ok"):
break
batch = remote.get("bars") or []
if not batch:
break
for b in batch:
try:
ts = int(b["open_time_ms"])
merged[ts] = b
except (KeyError, TypeError, ValueError):
continue
last_ts = max(int(b["open_time_ms"]) for b in batch)
next_since = last_ts + period
if next_since <= since:
break
since = next_since
if last_ts >= end:
break
return [merged[k] for k in sorted(merged.keys()) if start_ms <= k <= end]
def seed_symbol_archive(
exchange_key: str,
symbol: str,
first_opened_ms: int,
remote_fetch: Callable[..., dict[str, Any]],
*,
db_path: Path | None = None,
) -> dict[str, Any]:
"""建档:最早开仓向前 30 天 5m 种子。"""
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
anchor = int(first_opened_ms)
start_ms = max(0, anchor - ARCHIVE_SEED_LOOKBACK_DAYS * 86400000)
end_ms = _now_ms()
_ensure_meta(ex_k, sym, anchor, db_path=db_path)
bars = fetch_remote_5m_range(remote_fetch, sym, start_ms, end_ms)
n = upsert_bars_5m(ex_k, sym, bars, db_path=db_path)
now = _now_ms()
_mark_meta_sync(ex_k, sym, kline_ms=now, seed_complete=True, db_path=db_path)
return {"ok": True, "seed_bars": n, "start_ms": start_ms, "end_ms": end_ms}
def sync_symbol_klines_incremental(
exchange_key: str,
symbol: str,
remote_fetch: Callable[..., dict[str, Any]],
*,
db_path: Path | None = None,
) -> dict[str, Any]:
"""增量补 5m 至当前。"""
init_db(db_path)
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
conn = _connect(db_path)
try:
row = conn.execute(
"SELECT MAX(open_time_ms) AS mx FROM archive_bars_5m WHERE exchange_key=? AND symbol=?",
(ex_k, sym),
).fetchone()
last_bar = int(row["mx"]) if row and row["mx"] else None
finally:
conn.close()
period = TIMEFRAME_MS["5m"]
start_ms = max(0, (last_bar + period) if last_bar else 0)
end_ms = _now_ms()
if start_ms >= end_ms - period:
return {"ok": True, "appended": 0, "skipped": True}
bars = fetch_remote_5m_range(remote_fetch, sym, start_ms, end_ms)
n = upsert_bars_5m(ex_k, sym, bars, db_path=db_path)
now = _now_ms()
_mark_meta_sync(ex_k, sym, kline_ms=now, db_path=db_path)
return {"ok": True, "appended": n, "start_ms": start_ms, "end_ms": end_ms}
def sync_exchange_symbol_archives(
exchange_key: str,
trades: list[dict[str, Any]],
remote_fetch: Callable[..., dict[str, Any]],
*,
db_path: Path | None = None,
) -> dict[str, Any]:
"""同步单所:交易缓存 + 各币种 K 线种子/增量。"""
ex_k = (exchange_key or "").strip().lower()
cache_stats = upsert_trades_cache(ex_k, trades, db_path=db_path, prune_missing=True)
by_sym: dict[str, int] = {}
for t in trades or []:
sym = (t.get("symbol") or "").strip().upper()
if not sym:
continue
oms = t.get("opened_at_ms") or _parse_dt_ms(t.get("opened_at"))
if oms:
cur = by_sym.get(sym)
if cur is None or int(oms) < cur:
by_sym[sym] = int(oms)
seeded = 0
appended = 0
for sym, first_ms in by_sym.items():
_ensure_meta(ex_k, sym, first_ms, db_path=db_path)
conn = _connect(db_path)
try:
meta = conn.execute(
"SELECT seed_complete FROM archive_meta WHERE exchange_key=? AND symbol=?",
(ex_k, sym),
).fetchone()
finally:
conn.close()
if not meta or not int(meta["seed_complete"] or 0):
r = seed_symbol_archive(ex_k, sym, first_ms, remote_fetch, db_path=db_path)
seeded += int(r.get("seed_bars") or 0)
else:
r = sync_symbol_klines_incremental(ex_k, sym, remote_fetch, db_path=db_path)
appended += int(r.get("appended") or 0)
return {
"ok": True,
"exchange_key": ex_k,
"symbols": len(by_sym),
"trades_upserted": int(cache_stats.get("upserted") or 0),
"trades_removed": int(cache_stats.get("removed") or 0),
"seed_bars": seeded,
"appended_bars": appended,
"trades": len(trades or []),
}
def ms_to_trading_day(
ms: int | None,
*,
reset_hour: int = TRADING_DAY_RESET_HOUR,
tz: ZoneInfo = CHART_DISPLAY_TZ,
) -> str | None:
if ms is None:
return None
try:
dt = datetime.fromtimestamp(int(ms) / 1000.0, tz=timezone.utc).astimezone(tz)
except (TypeError, ValueError, OSError):
return None
if dt.hour < reset_hour:
dt = dt - timedelta(days=1)
return dt.strftime("%Y-%m-%d")
def today_trading_day(*, reset_hour: int = TRADING_DAY_RESET_HOUR) -> str:
return ms_to_trading_day(_now_ms(), reset_hour=reset_hour) or datetime.now(
CHART_DISPLAY_TZ
).strftime("%Y-%m-%d")
def trading_day_bounds_ms(
trading_day: str,
*,
reset_hour: int = TRADING_DAY_RESET_HOUR,
tz: ZoneInfo = CHART_DISPLAY_TZ,
) -> tuple[int, int]:
day = datetime.strptime((trading_day or "").strip()[:10], "%Y-%m-%d")
start = day.replace(hour=reset_hour, minute=0, second=0, microsecond=0, tzinfo=tz)
end = start + timedelta(days=1)
return int(start.timestamp() * 1000), int(end.timestamp() * 1000)
def resolve_period_bounds(
*,
period: str = "",
trading_day: str = "",
date_from: str = "",
date_to: str = "",
reset_hour: int = TRADING_DAY_RESET_HOUR,
) -> tuple[int, int, str, str, str]:
"""返回 (start_ms, end_ms, date_from, date_to, period_label)。"""
td = today_trading_day(reset_hour=reset_hour)
p = (period or "today").strip().lower()
if p in ("day", "today", ""):
d = (trading_day or "").strip()[:10] or td
start_ms, end_ms = trading_day_bounds_ms(d, reset_hour=reset_hour)
return start_ms, end_ms, d, d, f"本日 {d}"
if p == "week":
day_dt = datetime.strptime(td, "%Y-%m-%d")
monday = day_dt - timedelta(days=day_dt.weekday())
df = monday.strftime("%Y-%m-%d")
start_ms, _ = trading_day_bounds_ms(df, reset_hour=reset_hour)
_, end_ms = trading_day_bounds_ms(td, reset_hour=reset_hour)
return start_ms, end_ms, df, td, f"本周 {df}{td}"
if p == "month":
day_dt = datetime.strptime(td, "%Y-%m-%d")
first = day_dt.replace(day=1)
df = first.strftime("%Y-%m-%d")
start_ms, _ = trading_day_bounds_ms(df, reset_hour=reset_hour)
_, end_ms = trading_day_bounds_ms(td, reset_hour=reset_hour)
return start_ms, end_ms, df, td, f"本月 {df}{td}"
if p == "range":
df = (date_from or "").strip()[:10] or td
dt = (date_to or "").strip()[:10] or df
if df > dt:
df, dt = dt, df
start_ms, _ = trading_day_bounds_ms(df, reset_hour=reset_hour)
_, end_ms = trading_day_bounds_ms(dt, reset_hour=reset_hour)
label = f"区间 {df}{dt}" if df != dt else f"区间 {df}"
return start_ms, end_ms, df, dt, label
d = (trading_day or "").strip()[:10] or td
start_ms, end_ms = trading_day_bounds_ms(d, reset_hour=reset_hour)
return start_ms, end_ms, d, d, f"本日 {d}"
def _pnl_side(pnl: float) -> str:
if pnl > 0.0001:
return "win"
if pnl < -0.0001:
return "loss"
return "flat"
def _empty_pnl_bucket() -> dict[str, Any]:
return {
"open_count": 0,
"sick_count": 0,
"pnl_total": 0.0,
"pnl_ex_sick": 0.0,
"win_count": 0,
"loss_count": 0,
"avg_win": None,
"avg_loss": None,
"max_win": None,
"max_loss": None,
}
def _finalize_pnl_bucket(bucket: dict[str, Any]) -> None:
wins = bucket.pop("_wins", [])
losses = bucket.pop("_losses", [])
bucket["win_count"] = len(wins)
bucket["loss_count"] = len(losses)
bucket["avg_win"] = round(sum(wins) / len(wins), 4) if wins else None
bucket["avg_loss"] = round(sum(losses) / len(losses), 4) if losses else None
bucket["max_win"] = round(max(wins), 4) if wins else None
bucket["max_loss"] = round(min(losses), 4) if losses else None
bucket["pnl_total"] = round(float(bucket.get("pnl_total") or 0), 4)
bucket["pnl_ex_sick"] = round(float(bucket.get("pnl_ex_sick") or 0), 4)
def _accumulate_trade_stat(bucket: dict[str, Any], *, pnl: float, is_sick: bool) -> None:
bucket["open_count"] += 1
bucket["pnl_total"] += pnl
if is_sick:
bucket["sick_count"] += 1
else:
bucket["pnl_ex_sick"] += pnl
side = _pnl_side(pnl)
if side == "win":
bucket.setdefault("_wins", []).append(pnl)
elif side == "loss":
bucket.setdefault("_losses", []).append(pnl)
def _compute_period_stats(trade_rows: list[dict[str, Any]]) -> dict[str, Any]:
total_bucket = _empty_pnl_bucket()
by_ex: dict[str, dict[str, Any]] = {}
for td_row in trade_rows:
ex = str(td_row.get("exchange_key") or "?")
pnl = float(td_row.get("pnl_amount") or 0)
tag = str(td_row.get("behavior_tag") or "")
is_sick = tag == "sick"
_accumulate_trade_stat(total_bucket, pnl=pnl, is_sick=is_sick)
if ex not in by_ex:
by_ex[ex] = _empty_pnl_bucket()
_accumulate_trade_stat(by_ex[ex], pnl=pnl, is_sick=is_sick)
_finalize_pnl_bucket(total_bucket)
for ex in by_ex:
_finalize_pnl_bucket(by_ex[ex])
total = int(total_bucket["open_count"] or 0)
sick = int(total_bucket["sick_count"] or 0)
sick_pct = round(sick / total * 100, 1) if total else 0.0
return {
"open_count": total,
"sick_count": sick,
"sick_pct": sick_pct,
"pnl_total": total_bucket["pnl_total"],
"pnl_ex_sick": total_bucket["pnl_ex_sick"],
"win_count": total_bucket["win_count"],
"loss_count": total_bucket["loss_count"],
"avg_win": total_bucket["avg_win"],
"avg_loss": total_bucket["avg_loss"],
"max_win": total_bucket["max_win"],
"max_loss": total_bucket["max_loss"],
"by_exchange": by_ex,
}
def list_review_quotes(*, db_path: Path | None = None) -> list[dict[str, Any]]:
init_db(db_path)
conn = _connect(db_path)
try:
rows = conn.execute(
"""
SELECT id, quote_date, content, created_at, updated_at
FROM archive_review_quotes
ORDER BY quote_date DESC
LIMIT ?
""",
(ARCHIVE_QUOTES_MAX,),
).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def create_review_quote(
quote_date: str,
content: str,
*,
db_path: Path | None = None,
) -> dict[str, Any]:
init_db(db_path)
qd = (quote_date or "").strip()[:10]
if not qd:
raise ValueError("缺少 quote_date")
text = (content or "").strip()
if not text:
raise ValueError("语录内容不能为空")
if len(text) > ARCHIVE_QUOTE_MAX_LEN:
raise ValueError(f"语录最长 {ARCHIVE_QUOTE_MAX_LEN}")
conn = _connect(db_path)
try:
cnt = conn.execute("SELECT COUNT(*) AS c FROM archive_review_quotes").fetchone()
if int(cnt["c"] or 0) >= ARCHIVE_QUOTES_MAX:
raise ValueError(f"复盘语录最多保存 {ARCHIVE_QUOTES_MAX}")
now = _now_ms()
try:
cur = conn.execute(
"""
INSERT INTO archive_review_quotes (quote_date, content, created_at, updated_at)
VALUES (?,?,?,?)
""",
(qd, text, now, now),
)
except sqlite3.IntegrityError as e:
raise ValueError("该日期已有语录,请展开编辑") from e
rid = int(cur.lastrowid)
row = conn.execute(
"SELECT id, quote_date, content, created_at, updated_at FROM archive_review_quotes WHERE id=?",
(rid,),
).fetchone()
return dict(row)
finally:
conn.close()
def update_review_quote(
quote_id: int,
*,
quote_date: str | None = None,
content: str | None = None,
db_path: Path | None = None,
) -> dict[str, Any] | None:
init_db(db_path)
conn = _connect(db_path)
try:
row = conn.execute(
"SELECT id, quote_date, content FROM archive_review_quotes WHERE id=?",
(int(quote_id),),
).fetchone()
if not row:
return None
qd = (quote_date or row["quote_date"] or "").strip()[:10]
text = (content if content is not None else row["content"] or "").strip()
if not qd or not text:
raise ValueError("日期与内容均不能为空")
if len(text) > ARCHIVE_QUOTE_MAX_LEN:
raise ValueError(f"语录最长 {ARCHIVE_QUOTE_MAX_LEN}")
now = _now_ms()
conn.execute(
"""
UPDATE archive_review_quotes
SET quote_date=?, content=?, updated_at=?
WHERE id=?
""",
(qd, text, now, int(quote_id)),
)
out = conn.execute(
"SELECT id, quote_date, content, created_at, updated_at FROM archive_review_quotes WHERE id=?",
(int(quote_id),),
).fetchone()
return dict(out) if out else None
finally:
conn.close()
def delete_review_quote(quote_id: int, *, db_path: Path | None = None) -> bool:
init_db(db_path)
conn = _connect(db_path)
try:
cur = conn.execute(
"DELETE FROM archive_review_quotes WHERE id=?",
(int(quote_id),),
)
return int(cur.rowcount or 0) > 0
finally:
conn.close()
def list_daily_trades(
trading_day: str = "",
*,
period: str = "",
date_from: str = "",
date_to: str = "",
exchange_key: str = "",
filter_profit: bool = False,
filter_loss: bool = False,
filter_sick: bool = False,
search: str = "",
db_path: Path | None = None,
) -> dict[str, Any]:
"""按日期区间列出平仓记录(本日/本周/本月/自选,以平仓时间计),含犯病与盈亏统计。"""
init_db(db_path)
p = (period or "today").strip().lower() or "today"
start_ms, end_ms, df, dt, period_label = resolve_period_bounds(
period=p,
trading_day=trading_day,
date_from=date_from,
date_to=date_to,
)
ex_filter = (exchange_key or "").strip().lower()
conn = _connect(db_path)
try:
params: list[Any] = [start_ms, end_ms]
where = "closed_at_ms IS NOT NULL AND closed_at_ms >= ? AND closed_at_ms < ?"
if ex_filter:
where += " AND exchange_key=?"
params.append(ex_filter)
rows = conn.execute(
f"""
SELECT * FROM archive_trade_cache
WHERE {where}
ORDER BY closed_at_ms DESC, trade_id DESC
""",
params,
).fetchall()
overlays_by_ex: dict[str, dict[int, dict]] = {}
trades: list[dict[str, Any]] = []
q = (search or "").strip().lower()
for r in rows:
ex_k = r["exchange_key"]
if ex_k not in overlays_by_ex:
overlays_by_ex[ex_k] = load_overlays(ex_k, db_path=db_path)
td_row = _trade_row_to_dict(r, overlays_by_ex[ex_k].get(int(r["trade_id"])))
pnl = float(td_row.get("pnl_amount") or 0)
tag = td_row.get("behavior_tag") or ""
if filter_profit and pnl <= 0.0001:
continue
if filter_loss and pnl >= -0.0001:
continue
if filter_sick and tag != "sick":
continue
if q:
blob = " ".join(
str(td_row.get(k) or "")
for k in (
"symbol",
"exchange_key",
"direction",
"result",
"note",
"monitor_type",
"entry_reason",
)
).lower()
if q not in blob:
continue
trades.append(td_row)
return {
"period": p,
"period_label": period_label,
"trading_day": dt,
"date_from": df,
"date_to": dt,
"trades": trades,
"stats": _compute_period_stats(trades),
}
finally:
conn.close()