Files
crypto_monitor/tests/test_hub_symbol_archive_lib.py
T
2026-06-08 16:47:09 +08:00

219 lines
6.4 KiB
Python

"""币种档案库:5m 聚合与视窗计算。"""
from __future__ import annotations
import tempfile
from pathlib import Path
from hub_ohlcv_lib import aggregate_ohlcv_bars
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from hub_symbol_archive_lib import (
CHART_DISPLAY_TZ,
_fill_missing_bars,
init_db,
load_symbol_trades,
ms_to_wall_clock_str,
parse_wall_clock_ms,
resolve_archive_chart,
upsert_bars_5m,
upsert_trade_overlay,
list_symbol_rows,
upsert_trades_cache,
)
def _seed_5m_bars(
db: Path,
start_ms: int,
count: int,
step: int = 300_000,
*,
ex: str = "gate",
sym: str = "ONDO",
) -> None:
bars = []
price = 1.0
for i in range(count):
o = start_ms + i * step
price += 0.001
bars.append(
{
"open_time_ms": o,
"open": price,
"high": price + 0.002,
"low": price - 0.001,
"close": price + 0.001,
"volume": 100 + i,
}
)
upsert_bars_5m(ex, sym, bars, db_path=db)
def test_aggregate_15m_from_5m():
start = 1_700_000_000_000
bars = []
for i in range(6):
t = start + i * 300_000
bars.append(
{
"open_time_ms": t,
"open": 1.0,
"high": 1.1,
"low": 0.9,
"close": 1.05,
"volume": 10,
}
)
agg = aggregate_ohlcv_bars(bars, "15m")
assert len(agg) >= 1
assert agg[-1]["close"] == bars[-1]["close"]
assert agg[0]["open_time_ms"] <= agg[1]["open_time_ms"]
def test_resolve_archive_chart_15m():
with tempfile.TemporaryDirectory() as td:
db = Path(td) / "archive.db"
init_db(db)
anchor = 1_700_000_000_000
_seed_5m_bars(db, anchor - 50 * 300_000, 120)
out = resolve_archive_chart(
"gate",
"ONDO",
"15m",
anchor_ms=anchor,
mode="hold",
bars=40,
db_path=db,
)
assert out["ok"] is True
assert out["timeframe"] == "15m"
assert len(out["candles"]) >= 10
def test_fill_missing_bars_continuity():
period = 300_000
start = (1_700_000_000_000 // period) * period
bars = [
{
"open_time_ms": start,
"open": 1.0,
"high": 1.1,
"low": 0.9,
"close": 1.05,
"volume": 10,
},
{
"open_time_ms": start + period * 2,
"open": 1.05,
"high": 1.15,
"low": 1.0,
"close": 1.1,
"volume": 8,
},
]
filled = _fill_missing_bars(bars, period, start, start + period * 2)
assert len(filled) >= 3
assert any(b.get("filled") for b in filled)
def test_resolve_archive_chart_history_range():
with tempfile.TemporaryDirectory() as td:
db = Path(td) / "archive.db"
init_db(db)
open_ms = 1_700_000_000_000
close_ms = open_ms + 6 * 3600_000
_seed_5m_bars(db, open_ms - 20 * 300_000, 200, ex="gate", sym="BNB/USDT")
out = resolve_archive_chart(
"gate",
"BNB/USDT",
"15m",
opened_ms=open_ms,
closed_ms=close_ms,
mode="hold",
range_mode="history",
db_path=db,
)
assert out["ok"] is True
assert out.get("range_mode") == "history"
assert out.get("window_end_ms") <= close_ms + 4 * 3600_000
assert len(out["candles"]) >= 40
def test_sync_prunes_missing_trades():
with tempfile.TemporaryDirectory() as td:
db = Path(td) / "archive.db"
init_db(db)
upsert_trades_cache(
"gate",
[
{"id": 1, "symbol": "BNB/USDT", "result": "止损", "pnl_amount": -1},
{"id": 2, "symbol": "BNB/USDT", "result": "止盈", "pnl_amount": 1},
],
db_path=db,
prune_missing=False,
)
stats = upsert_trades_cache(
"gate",
[{"id": 1, "symbol": "BNB/USDT", "result": "止损", "pnl_amount": -1}],
db_path=db,
prune_missing=True,
)
rows = load_symbol_trades("gate", "BNB/USDT", db_path=db)
assert len(rows) == 1
assert rows[0]["trade_id"] == 1
assert stats["removed"] == 1
def test_list_with_overlay_filters():
with tempfile.TemporaryDirectory() as td:
db = Path(td) / "archive.db"
init_db(db)
upsert_trades_cache(
"gate",
[
{
"id": 1,
"symbol": "ONDO",
"direction": "long",
"result": "止盈",
"pnl_amount": 12.5,
"opened_at": "2026-01-01 10:00:00",
"closed_at": "2026-01-01 12:00:00",
"opened_at_ms": 1_700_000_000_000,
"closed_at_ms": 1_700_007_200_000,
},
{
"id": 2,
"symbol": "ONDO",
"direction": "short",
"result": "止损",
"pnl_amount": -3.2,
"opened_at": "2026-01-02 10:00:00",
"closed_at": "2026-01-02 11:00:00",
"opened_at_ms": 1_700_086_400_000,
"closed_at_ms": 1_700_090_000_000,
},
],
db_path=db,
)
upsert_trade_overlay("gate", 2, behavior_tag="sick", note="追高", db_path=db)
rows = list_symbol_rows(db_path=db)
assert len(rows) == 1
assert rows[0]["trade_count"] == 2
sick_only = list_symbol_rows(filter_sick=True, db_path=db)
assert len(sick_only) == 1
profit_only = list_symbol_rows(filter_profit=True, db_path=db)
assert len(profit_only) == 1
def test_parse_wall_clock_ms_uses_utc_plus_8():
ms = parse_wall_clock_ms("2026-06-07 20:30:00")
assert ms is not None
dt_utc = datetime.fromtimestamp(ms / 1000.0, tz=timezone.utc)
dt_bj = dt_utc.astimezone(CHART_DISPLAY_TZ)
assert dt_bj.strftime("%Y-%m-%d %H:%M:%S") == "2026-06-07 20:30:00"
assert ms_to_wall_clock_str(ms) == "2026-06-07 20:30:00"
assert parse_wall_clock_ms("2026-06-07 20:30") == ms