feat(hub): add symbol archive with permanent 5m klines

Add /archive page, hub_symbol_archive.db, trade overlay, 4h background sync, and instance /api/hub/trades/archive. Document in hub-symbol-archive-kline.md with cross-links.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-07 22:51:48 +08:00
parent 32b66fc343
commit 6a56928d59
13 changed files with 2174 additions and 5 deletions
+257 -2
View File
@@ -16,6 +16,21 @@ if str(_REPO_ROOT) not in sys.path:
from hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days
from hub_ohlcv_lib import CHART_TIMEFRAME_ORDER, CHART_TIMEFRAMES, bar_limit_for_timeframe
from hub_symbol_archive_lib import (
ARCHIVE_DEFAULT_TIMEFRAME,
ARCHIVE_SEED_LOOKBACK_DAYS,
ARCHIVE_SYNC_INTERVAL_SEC,
ARCHIVE_TIMEFRAMES,
ARCHIVE_TRADE_DAYS,
ARCHIVE_TRADE_LIMIT,
ARCHIVE_VISIBLE_BARS_DEFAULT,
init_db as init_archive_db,
list_symbol_rows,
load_symbol_trades,
resolve_archive_chart,
sync_exchange_symbol_archives,
upsert_trade_overlay,
)
from env_load import load_hub_dotenv
load_hub_dotenv()
@@ -77,7 +92,9 @@ _allow_pub_raw = (os.getenv("HUB_ALLOW_PUBLIC") or "").strip().lower()
# 云服务器 + 域名反代时设为 true:不做 IP 限制,仅靠 HUB_PASSWORD / 登录页保护
HUB_ALLOW_PUBLIC = _allow_pub_raw in ("1", "true", "yes", "on")
DIR = Path(__file__).resolve().parent
HUB_BUILD = "20260606-hub-ai"
HUB_BUILD = "20260607-hub-archive"
_archive_sync_stop: asyncio.Event | None = None
_archive_sync_task: asyncio.Task | None = None
HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8"))
HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10"))
HUB_BOARD_TIMEOUT = float(os.getenv("HUB_BOARD_TIMEOUT", "45"))
@@ -221,13 +238,91 @@ def _schedule_board_refresh() -> None:
board_store.request_refresh()
async def _run_archive_sync_once() -> dict:
init_archive_db()
settings = load_settings()
targets = enabled_exchanges(settings)
results: list[dict] = []
for ex in targets:
ex_key = str(ex.get("key") or "").strip().lower()
if not ex_key:
continue
trades_resp = await asyncio.to_thread(
_fetch_instance_trades_archive_sync,
ex,
days=ARCHIVE_TRADE_DAYS,
limit=ARCHIVE_TRADE_LIMIT,
)
if not trades_resp.get("ok"):
results.append(
{
"exchange_key": ex_key,
"ok": False,
"msg": trades_resp.get("msg") or trades_resp.get("error") or "拉取交易失败",
}
)
continue
trades = trades_resp.get("trades") or []
for t in trades:
if isinstance(t, dict):
t.setdefault("exchange_key", ex_key)
def remote_fetch(**kwargs):
return _fetch_instance_ohlcv_sync(
ex,
symbol=kwargs.get("symbol") or "",
timeframe=kwargs.get("timeframe") or "5m",
since_ms=kwargs.get("since_ms"),
limit=int(kwargs.get("limit") or 500),
)
r = await asyncio.to_thread(
sync_exchange_symbol_archives,
ex_key,
trades,
remote_fetch,
)
results.append(r)
return {"ok": True, "exchanges": len(targets), "results": results}
async def _archive_sync_loop() -> None:
global _archive_sync_stop
stop = _archive_sync_stop
if stop is None:
return
init_archive_db()
while not stop.is_set():
try:
await _run_archive_sync_once()
except Exception:
pass
try:
await asyncio.wait_for(stop.wait(), timeout=float(ARCHIVE_SYNC_INTERVAL_SEC))
except asyncio.TimeoutError:
pass
@asynccontextmanager
async def _hub_lifespan(_app: FastAPI):
global _archive_sync_stop, _archive_sync_task
await board_store.start(_run_board_aggregate)
await chart_poll_store.start(_run_chart_poll)
_archive_sync_stop = asyncio.Event()
_archive_sync_task = asyncio.create_task(_archive_sync_loop(), name="hub-archive-sync")
try:
yield
finally:
if _archive_sync_stop:
_archive_sync_stop.set()
if _archive_sync_task:
_archive_sync_task.cancel()
try:
await _archive_sync_task
except asyncio.CancelledError:
pass
_archive_sync_task = None
_archive_sync_stop = None
await chart_poll_store.stop()
await board_store.stop()
@@ -377,6 +472,7 @@ def root_redirect():
@app.get("/monitor")
@app.get("/market")
@app.get("/archive")
@app.get("/ai")
@app.get("/settings")
def shell_pages():
@@ -436,6 +532,30 @@ def _find_exchange_by_key(exchange_key: str) -> dict | None:
return None
def _fetch_instance_trades_archive_sync(
ex: dict,
*,
days: int = 365,
limit: int = 2000,
) -> dict:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return {"ok": False, "msg": "未配置 flask_url"}
params = {"days": str(int(days)), "limit": str(int(limit))}
url = f"{base}/api/hub/trades/archive?{urlencode(params)}"
try:
with httpx.Client(timeout=HUB_FLASK_TIMEOUT) as client:
r = client.get(url, headers=_hub_headers())
if r.status_code >= 400:
parsed = _parse_http_json_body(r)
parsed.setdefault("ok", False)
return parsed
data = r.json() if r.content else {}
return data if isinstance(data, dict) else {"ok": False, "msg": "无效 JSON"}
except Exception as e:
return {"ok": False, "msg": str(e)}
def _fetch_instance_ohlcv_sync(
ex: dict,
*,
@@ -1503,6 +1623,141 @@ def _trade_removed_response():
)
def _parse_anchor_ms(at: str = "", anchor_ms: str = "") -> int | None:
raw = (anchor_ms or at or "").strip()
if not raw:
return None
if raw.isdigit():
v = int(raw)
return v if v > 1_000_000_000_000 else v * 1000
s = raw.replace("Z", "").replace("T", " ")
for fmt, ln in (("%Y-%m-%d %H:%M:%S", 19), ("%Y-%m-%d %H:%M", 16), ("%Y-%m-%d", 10)):
try:
from datetime import datetime
dt = datetime.strptime(s[:ln], fmt)
return int(dt.timestamp() * 1000)
except ValueError:
continue
return None
@app.get("/api/archive/meta")
def api_archive_meta():
init_archive_db()
exchanges = []
for ex in enabled_exchanges(load_settings()):
exchanges.append(
{
"id": ex.get("id"),
"key": ex.get("key"),
"name": ex.get("name"),
}
)
return {
"ok": True,
"timeframes": sorted(ARCHIVE_TIMEFRAMES),
"default_timeframe": ARCHIVE_DEFAULT_TIMEFRAME,
"seed_lookback_days": ARCHIVE_SEED_LOOKBACK_DAYS,
"sync_interval_sec": ARCHIVE_SYNC_INTERVAL_SEC,
"visible_bars_default": ARCHIVE_VISIBLE_BARS_DEFAULT,
"exchanges": exchanges,
}
@app.get("/api/archive/list")
def api_archive_list(
exchange_key: str = "",
filter_profit: str = "",
filter_loss: str = "",
filter_sick: str = "",
filter_emotion: str = "",
):
init_archive_db()
rows = list_symbol_rows(
exchange_key=exchange_key,
filter_profit=(filter_profit or "").lower() in ("1", "true", "yes", "on"),
filter_loss=(filter_loss or "").lower() in ("1", "true", "yes", "on"),
filter_sick=(filter_sick or "").lower() in ("1", "true", "yes", "on"),
filter_emotion=(filter_emotion or "").lower() in ("1", "true", "yes", "on"),
)
return {"ok": True, "rows": rows, "count": len(rows)}
@app.get("/api/archive/detail")
def api_archive_detail(exchange_key: str = "", symbol: str = ""):
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
if not ex_k or not sym:
raise HTTPException(status_code=400, detail="缺少 exchange_key 或 symbol")
init_archive_db()
trades = load_symbol_trades(ex_k, sym)
return {"ok": True, "exchange_key": ex_k, "symbol": sym, "trades": trades}
@app.get("/api/archive/ohlcv")
def api_archive_ohlcv(
exchange_key: str = "",
symbol: str = "",
timeframe: str = ARCHIVE_DEFAULT_TIMEFRAME,
mode: str = "hold",
anchor_ms: str = "",
at: str = "",
bars: str = "",
):
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
if not ex_k or not sym:
raise HTTPException(status_code=400, detail="缺少 exchange_key 或 symbol")
init_archive_db()
anchor = _parse_anchor_ms(at, anchor_ms)
try:
bar_n = int(bars) if (bars or "").strip().isdigit() else ARCHIVE_VISIBLE_BARS_DEFAULT
except ValueError:
bar_n = ARCHIVE_VISIBLE_BARS_DEFAULT
result = resolve_archive_chart(
ex_k,
sym,
timeframe,
anchor_ms=anchor,
mode=mode,
bars=bar_n,
)
if not result.get("ok"):
raise HTTPException(status_code=404, detail=result.get("msg") or "无 K 线")
return result
class ArchiveOverlayBody(BaseModel):
behavior_tag: str = ""
note: str = ""
@app.patch("/api/archive/trade/{exchange_key}/{trade_id}")
def api_archive_trade_overlay(
exchange_key: str,
trade_id: int,
body: ArchiveOverlayBody = Body(...),
):
ex_k = (exchange_key or "").strip().lower()
if not ex_k:
raise HTTPException(status_code=400, detail="缺少 exchange_key")
init_archive_db()
out = upsert_trade_overlay(
ex_k,
int(trade_id),
behavior_tag=body.behavior_tag,
note=body.note,
)
return {"ok": True, "overlay": out}
@app.post("/api/archive/sync")
async def api_archive_sync():
body = await _run_archive_sync_once()
return body
@app.get("/api/ping")
def api_ping():
return {
@@ -1510,7 +1765,7 @@ def api_ping():
"service": "manual-trading-hub",
"build": HUB_BUILD,
"trade_ui": False,
"features": ["monitor", "settings", "auth", "board_sse"],
"features": ["monitor", "settings", "auth", "board_sse", "archive"],
"board_poll_interval_sec": HUB_BOARD_POLL_INTERVAL,
"board_version": board_store.version,
"board_aggregating": board_store.aggregating,