增加K线

This commit is contained in:
dekun
2026-05-30 10:35:25 +08:00
parent 74bd241579
commit 53959a9008
8 changed files with 683 additions and 134 deletions
+17 -5
View File
@@ -236,11 +236,7 @@ class BinanceFuturesClient:
)
return rows
async def get_daily_klines(self, symbol: str, limit: int = 300) -> list[dict]:
raw = await self._get(
"/fapi/v1/klines",
{"symbol": symbol.upper(), "interval": "1d", "limit": min(limit, 1500)},
)
def _parse_kline_rows(self, raw: list | None) -> list[dict]:
candles = []
for k in raw or []:
candles.append(
@@ -256,5 +252,21 @@ class BinanceFuturesClient:
)
return candles
async def get_klines_limit(
self, symbol: str, interval: str, limit: int = 500
) -> list[dict]:
raw = await self._get(
"/fapi/v1/klines",
{
"symbol": symbol.upper(),
"interval": interval,
"limit": min(limit, 1500),
},
)
return self._parse_kline_rows(raw)
async def get_daily_klines(self, symbol: str, limit: int = 300) -> list[dict]:
return await self.get_klines_limit(symbol, "1d", limit)
binance_client = BinanceFuturesClient()
+44
View File
@@ -0,0 +1,44 @@
"""K 线周期常量与每周期 bar 数 / 缓存 TTL。"""
CHART_INTERVALS: tuple[str, ...] = ("5m", "15m", "30m", "1h", "4h", "1d", "1w")
LONG_INTERVALS: frozenset[str] = frozenset({"1d", "1w"})
# 1d 及以上 500 根;以下 1000 根
INTERVAL_LIMITS: dict[str, int] = {
"5m": 1000,
"15m": 1000,
"30m": 1000,
"1h": 1000,
"4h": 1000,
"1d": 500,
"1w": 500,
}
# 预取 / 本地 freshness(分钟)
INTERVAL_CACHE_MINUTES: dict[str, int] = {
"5m": 5,
"15m": 15,
"30m": 15,
"1h": 30,
"4h": 30,
"1d": 60,
"1w": 60,
}
def validate_interval(interval: str) -> str:
iv = interval.lower().strip()
if iv not in CHART_INTERVALS:
raise ValueError(f"unsupported interval: {interval}")
return iv
def limit_for_interval(interval: str) -> int:
iv = validate_interval(interval)
return INTERVAL_LIMITS[iv]
def cache_minutes_for_interval(interval: str) -> int:
iv = validate_interval(interval)
return INTERVAL_CACHE_MINUTES[iv]
+173 -14
View File
@@ -47,6 +47,23 @@ def init_db() -> None:
message TEXT
);
CREATE TABLE IF NOT EXISTS klines (
symbol TEXT NOT NULL,
interval TEXT NOT NULL,
open_time INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume REAL NOT NULL,
quote_volume REAL NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL,
PRIMARY KEY (symbol, interval, open_time)
);
CREATE INDEX IF NOT EXISTS idx_klines_symbol_interval
ON klines(symbol, interval, open_time);
CREATE TABLE IF NOT EXISTS daily_klines (
symbol TEXT NOT NULL,
open_time INTEGER NOT NULL,
@@ -64,9 +81,11 @@ def init_db() -> None:
ON daily_klines(symbol, open_time);
CREATE TABLE IF NOT EXISTS kline_meta (
symbol TEXT PRIMARY KEY,
symbol TEXT NOT NULL,
interval TEXT NOT NULL DEFAULT '1d',
last_fetch_at TEXT NOT NULL,
bar_count INTEGER NOT NULL
bar_count INTEGER NOT NULL,
PRIMARY KEY (symbol, interval)
);
CREATE TABLE IF NOT EXISTS funding_history (
@@ -99,6 +118,57 @@ def init_db() -> None:
"""
)
_migrate_klines_if_needed(conn)
def _migrate_klines_if_needed(conn: sqlite3.Connection) -> None:
"""从旧版 daily_klines / 单 symbol kline_meta 迁移到多周期表。"""
cols = conn.execute("PRAGMA table_info(kline_meta)").fetchall()
col_names = {c[1] for c in cols}
if "interval" not in col_names and cols:
rows = conn.execute(
"SELECT symbol, last_fetch_at, bar_count FROM kline_meta"
).fetchall()
conn.execute("DROP TABLE kline_meta")
conn.execute(
"""
CREATE TABLE kline_meta (
symbol TEXT NOT NULL,
interval TEXT NOT NULL DEFAULT '1d',
last_fetch_at TEXT NOT NULL,
bar_count INTEGER NOT NULL,
PRIMARY KEY (symbol, interval)
)
"""
)
for r in rows:
conn.execute(
"""
INSERT INTO kline_meta (symbol, interval, last_fetch_at, bar_count)
VALUES (?, '1d', ?, ?)
""",
(r[0], r[1], r[2]),
)
has_klines = conn.execute(
"SELECT 1 FROM klines LIMIT 1"
).fetchone()
if has_klines:
return
daily_count = conn.execute("SELECT COUNT(*) FROM daily_klines").fetchone()[0]
if not daily_count:
return
conn.execute(
"""
INSERT OR IGNORE INTO klines (
symbol, interval, open_time, open, high, low, close, volume, quote_volume, updated_at
)
SELECT symbol, '1d', open_time, open, high, low, close, volume, quote_volume, updated_at
FROM daily_klines
"""
)
def save_snapshot(
@@ -159,7 +229,82 @@ def log_push(period_start: str, period_end: str, success: bool, message: str = "
)
def save_klines(symbol: str, interval: str, candles: list[dict[str, Any]]) -> None:
sym = symbol.upper()
iv = interval.lower()
now = datetime.now().isoformat()
with get_conn() as conn:
for c in candles:
conn.execute(
"""
INSERT INTO klines (
symbol, interval, open_time, open, high, low, close, volume, quote_volume, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(symbol, interval, open_time) DO UPDATE SET
open = excluded.open,
high = excluded.high,
low = excluded.low,
close = excluded.close,
volume = excluded.volume,
quote_volume = excluded.quote_volume,
updated_at = excluded.updated_at
""",
(
sym,
iv,
int(c["time"]),
float(c["open"]),
float(c["high"]),
float(c["low"]),
float(c["close"]),
float(c.get("volume", 0)),
float(c.get("quote_volume", 0)),
now,
),
)
conn.execute(
"""
INSERT INTO kline_meta (symbol, interval, last_fetch_at, bar_count)
VALUES (?, ?, ?, ?)
ON CONFLICT(symbol, interval) DO UPDATE SET
last_fetch_at = excluded.last_fetch_at,
bar_count = excluded.bar_count
""",
(sym, iv, now, len(candles)),
)
def get_klines_from_db(symbol: str, interval: str, limit: int) -> list[dict[str, Any]]:
sym = symbol.upper()
iv = interval.lower()
with get_conn() as conn:
rows = conn.execute(
"""
SELECT open_time, open, high, low, close, volume, quote_volume
FROM klines
WHERE symbol = ? AND interval = ?
ORDER BY open_time DESC
LIMIT ?
""",
(sym, iv, limit),
).fetchall()
rows = list(reversed(rows))
return [
{
"time": int(r["open_time"]),
"open": float(r["open"]),
"high": float(r["high"]),
"low": float(r["low"]),
"close": float(r["close"]),
"volume": float(r["volume"]),
"quote_volume": float(r["quote_volume"]),
}
for r in rows
]
def save_daily_klines(symbol: str, candles: list[dict[str, Any]]) -> None:
save_klines(symbol, "1d", candles)
sym = symbol.upper()
now = datetime.now().isoformat()
with get_conn() as conn:
@@ -190,19 +335,12 @@ def save_daily_klines(symbol: str, candles: list[dict[str, Any]]) -> None:
now,
),
)
conn.execute(
"""
INSERT INTO kline_meta (symbol, last_fetch_at, bar_count)
VALUES (?, ?, ?)
ON CONFLICT(symbol) DO UPDATE SET
last_fetch_at = excluded.last_fetch_at,
bar_count = excluded.bar_count
""",
(sym, now, len(candles)),
)
def get_daily_klines_from_db(symbol: str, limit: int) -> list[dict[str, Any]]:
stored = get_klines_from_db(symbol, "1d", limit)
if stored:
return stored
sym = symbol.upper()
with get_conn() as conn:
rows = conn.execute(
@@ -230,12 +368,33 @@ def get_daily_klines_from_db(symbol: str, limit: int) -> list[dict[str, Any]]:
]
def get_kline_meta(symbol: str) -> dict[str, Any] | None:
def get_kline_meta(symbol: str, interval: str = "1d") -> dict[str, Any] | None:
sym = symbol.upper()
iv = interval.lower()
with get_conn() as conn:
row = conn.execute(
"SELECT last_fetch_at, bar_count FROM kline_meta WHERE symbol = ? AND interval = ?",
(sym, iv),
).fetchone()
if not row:
if iv == "1d":
return _legacy_kline_meta(sym)
return None
return {
"last_fetch_at": row["last_fetch_at"],
"bar_count": row["bar_count"],
}
def _legacy_kline_meta(symbol: str) -> dict[str, Any] | None:
"""兼容旧库仅有 symbol 维度的 meta(迁移前)。"""
with get_conn() as conn:
cols = {c[1] for c in conn.execute("PRAGMA table_info(kline_meta)").fetchall()}
if "interval" in cols:
return None
row = conn.execute(
"SELECT last_fetch_at, bar_count FROM kline_meta WHERE symbol = ?",
(sym,),
(symbol,),
).fetchone()
if not row:
return None
+56 -32
View File
@@ -1,18 +1,23 @@
""" K 线:优先 SQLite 本地库,不足或过期再请求币安。"""
"""多周期 K 线:优先 SQLite,不足或过期再请求币安。"""
import logging
from datetime import datetime
from .binance import binance_client
from .config import settings
from .db import get_daily_klines_from_db, get_kline_meta, save_daily_klines
from .chart_intervals import (
CHART_INTERVALS,
cache_minutes_for_interval,
limit_for_interval,
validate_interval,
)
from .db import get_kline_meta, get_klines_from_db, save_klines
from .exceptions import BinanceRateLimitedError
logger = logging.getLogger(__name__)
def _is_db_fresh(symbol: str, min_bars: int) -> bool:
meta = get_kline_meta(symbol)
def _is_db_fresh(symbol: str, interval: str, min_bars: int) -> bool:
meta = get_kline_meta(symbol, interval)
if not meta or meta.get("bar_count", 0) < min_bars:
return False
try:
@@ -20,21 +25,29 @@ def _is_db_fresh(symbol: str, min_bars: int) -> bool:
except ValueError:
return False
age = (datetime.now() - last_fetch).total_seconds()
return age < settings.chart_cache_minutes * 60
return age < cache_minutes_for_interval(interval) * 60
async def sync_daily_klines(symbol: str, limit: int | None = None) -> list[dict]:
"""从币安拉取并写入本地库。"""
async def sync_klines(
symbol: str, interval: str, limit: int | None = None
) -> list[dict]:
"""从币安拉取指定周期并写入本地库。"""
sym = symbol.upper()
n = min(limit or settings.chart_kline_limit, 1500)
candles = await binance_client.get_daily_klines(sym, n)
save_daily_klines(sym, candles)
logger.info("Saved %d daily klines for %s to DB", len(candles), sym)
iv = validate_interval(interval)
n = min(limit or limit_for_interval(iv), 1500)
candles = await binance_client.get_klines_limit(sym, iv, n)
save_klines(sym, iv, candles)
logger.info("Saved %d %s klines for %s to DB", len(candles), iv, sym)
return candles
async def get_daily_candles(
async def sync_daily_klines(symbol: str, limit: int | None = None) -> list[dict]:
return await sync_klines(symbol, "1d", limit)
async def get_candles(
symbol: str,
interval: str = "1d",
limit: int | None = None,
force_refresh: bool = False,
) -> tuple[list[dict], str]:
@@ -43,23 +56,24 @@ async def get_daily_candles(
source: db | db_stale | binance
"""
sym = symbol.upper().strip()
n = min(limit or settings.chart_kline_limit, 1500)
iv = validate_interval(interval)
n = min(limit or limit_for_interval(iv), 1500)
min_bars = min(n, 50)
if not force_refresh and _is_db_fresh(sym, min_bars):
candles = get_daily_klines_from_db(sym, n)
if not force_refresh and _is_db_fresh(sym, iv, min_bars):
candles = get_klines_from_db(sym, iv, n)
if len(candles) >= min_bars:
return candles, "db"
stored = get_daily_klines_from_db(sym, n)
stored = get_klines_from_db(sym, iv, n)
if binance_client.is_rate_limited():
if stored:
logger.warning("Rate limited, serve stale DB klines for %s", sym)
logger.warning("Rate limited, serve stale DB klines for %s %s", sym, iv)
return stored, "db_stale"
raise BinanceRateLimitedError(binance_client.rate_limit_remaining_sec(), sym)
try:
candles = await sync_daily_klines(sym, n)
candles = await sync_klines(sym, iv, n)
return candles, "binance"
except BinanceRateLimitedError:
if stored:
@@ -71,23 +85,33 @@ async def get_daily_candles(
raise
async def get_daily_candles(
symbol: str,
limit: int | None = None,
force_refresh: bool = False,
) -> tuple[list[dict], str]:
return await get_candles(symbol, "1d", limit, force_refresh)
async def prefetch_symbols(symbols: list[str]) -> None:
"""后台预拉 Top 币种 K 入库(串行,避免 418)。"""
"""后台预拉 Top 币种全周期 K 线入库(串行,避免 418)。"""
seen: set[str] = set()
for raw in symbols:
sym = raw.upper().strip()
if not sym or sym in seen or not sym.endswith("USDT"):
continue
seen.add(sym)
if _is_db_fresh(sym, min(50, settings.chart_kline_limit)):
continue
if binance_client.is_rate_limited():
logger.warning("Prefetch stopped — rate limited")
break
try:
await sync_daily_klines(sym)
except BinanceRateLimitedError:
logger.warning("Prefetch rate limited at %s", sym)
break
except Exception as e:
logger.warning("Prefetch %s failed: %s", sym, e)
for interval in CHART_INTERVALS:
n = limit_for_interval(interval)
if _is_db_fresh(sym, interval, min(50, n)):
continue
if binance_client.is_rate_limited():
logger.warning("Prefetch stopped — rate limited")
return
try:
await sync_klines(sym, interval, n)
except BinanceRateLimitedError:
logger.warning("Prefetch rate limited at %s %s", sym, interval)
return
except Exception as e:
logger.warning("Prefetch %s %s failed: %s", sym, interval, e)
+56 -2
View File
@@ -8,7 +8,8 @@ from fastapi.staticfiles import StaticFiles
from .config import ROOT_DIR, settings
from .funding_store import get_funding_bundle
from .kline_store import get_daily_candles, sync_daily_klines
from .kline_store import get_candles, get_daily_candles, sync_daily_klines, sync_klines
from .chart_intervals import CHART_INTERVALS, limit_for_interval, validate_interval
from .db import get_latest_snapshot, init_db, log_push, save_snapshot
from .exceptions import BinanceRateLimitedError
from .period_api import get_period_top30
@@ -144,9 +145,44 @@ async def api_refresh_daybefore():
return get_latest_snapshot("daybefore") or {"message": "done"}
@app.get("/api/chart/{symbol}")
async def api_chart(
symbol: str,
interval: str = "1d",
limit: int | None = None,
refresh: bool = False,
):
"""合约 K 线:优先读本地 SQLite,过期再拉币安入库。"""
sym = symbol.upper().strip()
if not sym.endswith("USDT"):
raise HTTPException(400, "invalid symbol")
try:
iv = validate_interval(interval)
except ValueError as e:
raise HTTPException(400, str(e)) from e
default_limit = limit_for_interval(iv)
try:
candles, source = await get_candles(
sym, iv, limit or default_limit, force_refresh=refresh
)
return {
"symbol": sym,
"interval": iv,
"limit": len(candles),
"candles": candles,
"source": source,
"intervals": list(CHART_INTERVALS),
}
except BinanceRateLimitedError as e:
raise HTTPException(503, f"币安限流,请 {e.retry_after_sec} 秒后再试") from e
except Exception as e:
logger.error("chart %s %s failed: %s", sym, iv, e)
raise HTTPException(502, "K线获取失败") from e
@app.get("/api/chart/{symbol}/daily")
async def api_chart_daily(symbol: str, limit: int | None = None, refresh: bool = False):
"""合约日 K 线:优先读本地 SQLite,过期再拉币安入库"""
"""合约日 K 线(兼容旧路径)"""
sym = symbol.upper().strip()
if not sym.endswith("USDT"):
raise HTTPException(400, "invalid symbol")
@@ -158,6 +194,7 @@ async def api_chart_daily(symbol: str, limit: int | None = None, refresh: bool =
"limit": len(candles),
"candles": candles,
"source": source,
"intervals": list(CHART_INTERVALS),
}
except BinanceRateLimitedError as e:
raise HTTPException(503, f"币安限流,请 {e.retry_after_sec} 秒后再试") from e
@@ -180,6 +217,23 @@ async def api_funding_history(symbol: str, limit: int | None = None, refresh: bo
raise HTTPException(502, "资金费率获取失败") from e
@app.post("/api/chart/{symbol}/refresh")
async def api_chart_refresh(
symbol: str, interval: str = "1d", limit: int | None = None
):
"""强制从币安同步 K 线到本地库。"""
sym = symbol.upper().strip()
try:
iv = validate_interval(interval)
except ValueError as e:
raise HTTPException(400, str(e)) from e
try:
candles = await sync_klines(sym, iv, limit)
return {"symbol": sym, "interval": iv, "saved": len(candles), "source": "binance"}
except BinanceRateLimitedError as e:
raise HTTPException(503, f"币安限流,请 {e.retry_after_sec} 秒后再试") from e
@app.post("/api/chart/{symbol}/daily/refresh")
async def api_chart_daily_refresh(symbol: str, limit: int | None = None):
"""强制从币安同步日 K 到本地库。"""