diff --git a/.env.example b/.env.example index 2f64cfe..89a9845 100644 --- a/.env.example +++ b/.env.example @@ -22,3 +22,5 @@ TODAY_DATA_MODE=ticker24h YESTERDAY_DATA_MODE=klines CHART_KLINE_LIMIT=300 CHART_CACHE_MINUTES=60 +FUNDING_HISTORY_LIMIT=90 +FUNDING_CACHE_MINUTES=30 diff --git a/backend/app/aggregator.py b/backend/app/aggregator.py index 3a90fec..b0b36f1 100644 --- a/backend/app/aggregator.py +++ b/backend/app/aggregator.py @@ -6,6 +6,7 @@ from datetime import datetime from .binance import binance_client from .config import settings from .exceptions import BinanceRateLimitedError +from .funding_store import enrich_items_with_funding from .periods import to_ms logger = logging.getLogger(__name__) @@ -38,14 +39,15 @@ def format_volume(vol: float) -> str: return f"{vol:.0f}" -def _finalize_top(stats: list[SymbolStats]) -> list[dict]: +async def _finalize_top(stats: list[SymbolStats]) -> list[dict]: stats.sort(key=lambda x: x.quote_volume, reverse=True) top = stats[: settings.top_n] for i, s in enumerate(top, 1): s.rank = i s.is_high_volume = s.quote_volume >= settings.volume_threshold s.is_high_change = abs(s.price_change_pct) >= settings.change_threshold - return [s.to_dict() for s in top] + items = [s.to_dict() for s in top] + return await enrich_items_with_funding(items) def _aggregate_klines(klines: list, start_ms: int, end_ms: int) -> tuple[float, float, float]: @@ -87,7 +89,7 @@ async def aggregate_from_ticker24hr() -> list[dict]: ) ) logger.info("ticker24h mode: %d symbols, 1 API call", len(stats)) - return _finalize_top(stats) + return await _finalize_top(stats) async def _fetch_symbol_stats( @@ -177,7 +179,7 @@ async def aggregate_period_klines( ) results = await asyncio.gather(*tasks) stats = [r for r in results if r is not None and r.quote_volume > 0] - return _finalize_top(stats) + return await _finalize_top(stats) async def aggregate_period( diff --git a/backend/app/binance.py b/backend/app/binance.py index a776539..2e474e5 100644 --- a/backend/app/binance.py +++ b/backend/app/binance.py @@ -212,6 +212,30 @@ class BinanceFuturesClient: sym_set = set(symbols) return {t["symbol"]: float(t["price"]) for t in tickers if t["symbol"] in sym_set} + async def get_premium_index_all(self) -> dict[str, dict]: + data = await self._get("/fapi/v1/premiumIndex") + if isinstance(data, dict): + items = [data] + else: + items = data or [] + return {str(x["symbol"]).upper(): x for x in items if x.get("symbol")} + + async def get_funding_rate_history(self, symbol: str, limit: int = 90) -> list[dict]: + raw = await self._get( + "/fapi/v1/fundingRate", + {"symbol": symbol.upper(), "limit": min(limit, 1000)}, + ) + rows = [] + for r in raw or []: + rows.append( + { + "time": int(r["fundingTime"]), + "rate": float(r["fundingRate"]), + "mark_price": float(r.get("markPrice", 0) or 0), + } + ) + return rows + async def get_daily_klines(self, symbol: str, limit: int = 300) -> list[dict]: raw = await self._get( "/fapi/v1/klines", diff --git a/backend/app/config.py b/backend/app/config.py index 44db9d0..201427d 100644 --- a/backend/app/config.py +++ b/backend/app/config.py @@ -31,6 +31,8 @@ class Settings(BaseSettings): yesterday_data_mode: str = "klines" chart_kline_limit: int = 300 chart_cache_minutes: int = 60 + funding_history_limit: int = 90 + funding_cache_minutes: int = 30 # 代理默认关闭;仅当 PROXY_ENABLED=true 时生效 proxy_enabled: bool = False proxy_url: str = "socks5h://192.168.8.4:1081" diff --git a/backend/app/db.py b/backend/app/db.py index 5125315..973551b 100644 --- a/backend/app/db.py +++ b/backend/app/db.py @@ -68,6 +68,34 @@ def init_db() -> None: last_fetch_at TEXT NOT NULL, bar_count INTEGER NOT NULL ); + + CREATE TABLE IF NOT EXISTS funding_history ( + symbol TEXT NOT NULL, + funding_time INTEGER NOT NULL, + funding_rate REAL NOT NULL, + mark_price REAL NOT NULL DEFAULT 0, + updated_at TEXT NOT NULL, + PRIMARY KEY (symbol, funding_time) + ); + + CREATE INDEX IF NOT EXISTS idx_funding_history_symbol + ON funding_history(symbol, funding_time); + + CREATE TABLE IF NOT EXISTS funding_meta ( + symbol TEXT PRIMARY KEY, + last_fetch_at TEXT NOT NULL, + bar_count INTEGER NOT NULL, + last_funding_rate REAL NOT NULL DEFAULT 0, + next_funding_time INTEGER NOT NULL DEFAULT 0 + ); + + CREATE TABLE IF NOT EXISTS funding_current ( + symbol TEXT PRIMARY KEY, + last_funding_rate REAL NOT NULL, + next_funding_time INTEGER NOT NULL, + mark_price REAL NOT NULL DEFAULT 0, + updated_at TEXT NOT NULL + ); """ ) @@ -216,6 +244,115 @@ def get_kline_meta(symbol: str) -> dict[str, Any] | None: } +def save_funding_history( + symbol: str, + rows: list[dict[str, Any]], + next_funding_time: int = 0, +) -> None: + sym = symbol.upper() + now = datetime.now().isoformat() + last_rate = 0.0 + next_time = next_funding_time + with get_conn() as conn: + for r in rows: + ft = int(r["time"]) + rate = float(r["rate"]) + mp = float(r.get("mark_price", 0)) + last_rate = rate + conn.execute( + """ + INSERT INTO funding_history (symbol, funding_time, funding_rate, mark_price, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(symbol, funding_time) DO UPDATE SET + funding_rate = excluded.funding_rate, + mark_price = excluded.mark_price, + updated_at = excluded.updated_at + """, + (sym, ft, rate, mp, now), + ) + if rows: + conn.execute( + """ + INSERT INTO funding_meta (symbol, last_fetch_at, bar_count, last_funding_rate, next_funding_time) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(symbol) DO UPDATE SET + last_fetch_at = excluded.last_fetch_at, + bar_count = excluded.bar_count, + last_funding_rate = excluded.last_funding_rate + """, + (sym, now, len(rows), last_rate, next_time), + ) + + +def get_funding_history_from_db(symbol: str, limit: int) -> list[dict[str, Any]]: + sym = symbol.upper() + with get_conn() as conn: + rows = conn.execute( + """ + SELECT funding_time, funding_rate, mark_price + FROM funding_history + WHERE symbol = ? + ORDER BY funding_time DESC + LIMIT ? + """, + (sym, limit), + ).fetchall() + rows = list(reversed(rows)) + return [ + { + "time": int(r["funding_time"]), + "rate": float(r["funding_rate"]), + "rate_pct": float(r["funding_rate"]) * 100, + "mark_price": float(r["mark_price"]), + } + for r in rows + ] + + +def get_funding_meta(symbol: str) -> dict[str, Any] | None: + sym = symbol.upper() + with get_conn() as conn: + row = conn.execute( + """ + SELECT last_fetch_at, bar_count, last_funding_rate, next_funding_time + FROM funding_meta WHERE symbol = ? + """, + (sym,), + ).fetchone() + if not row: + return None + return { + "last_fetch_at": row["last_fetch_at"], + "bar_count": row["bar_count"], + "last_funding_rate": row["last_funding_rate"], + "next_funding_time": row["next_funding_time"], + } + + +def save_funding_current_bulk(data: dict[str, dict[str, Any]]) -> None: + now = datetime.now().isoformat() + with get_conn() as conn: + for sym, info in data.items(): + conn.execute( + """ + INSERT INTO funding_current (symbol, last_funding_rate, next_funding_time, mark_price, updated_at) + VALUES (?, ?, ?, ?, ?) + ON CONFLICT(symbol) DO UPDATE SET + last_funding_rate = excluded.last_funding_rate, + next_funding_time = excluded.next_funding_time, + mark_price = excluded.mark_price, + updated_at = excluded.updated_at + """, + ( + sym.upper(), + float(info.get("lastFundingRate", 0) or 0), + int(info.get("nextFundingTime", 0) or 0), + float(info.get("markPrice", 0) or 0), + now, + ), + ) + + def was_pushed_today(period_start: str, period_end: str) -> bool: with get_conn() as conn: row = conn.execute( diff --git a/backend/app/funding_store.py b/backend/app/funding_store.py new file mode 100644 index 0000000..739fae2 --- /dev/null +++ b/backend/app/funding_store.py @@ -0,0 +1,160 @@ +"""资金费率:当前 premiumIndex + 历史 fundingRate 存 SQLite。""" + +import logging +import time +from datetime import datetime +from typing import Any + +from .binance import binance_client +from .config import settings +from .db import ( + get_funding_history_from_db, + get_funding_meta, + save_funding_current_bulk, + save_funding_history, +) +from .exceptions import BinanceRateLimitedError + +logger = logging.getLogger(__name__) + +_premium_cache: dict[str, Any] = {} +_premium_cache_at: float = 0.0 + + +def _premium_ttl_sec() -> int: + return settings.funding_cache_minutes * 60 + + +async def get_current_funding_map(force: bool = False) -> dict[str, dict]: + """全市场当前资金费率(一次 premiumIndex)。""" + global _premium_cache, _premium_cache_at + now = time.time() + if not force and _premium_cache and now - _premium_cache_at < _premium_ttl_sec(): + return _premium_cache + + if binance_client.is_rate_limited(): + if _premium_cache: + return _premium_cache + raise BinanceRateLimitedError(binance_client.rate_limit_remaining_sec(), "premiumIndex") + + data = await binance_client.get_premium_index_all() + _premium_cache = data + _premium_cache_at = now + save_funding_current_bulk(data) + return data + + +def _is_history_fresh(symbol: str, min_bars: int) -> bool: + meta = get_funding_meta(symbol) + if not meta or meta.get("bar_count", 0) < min_bars: + return False + try: + last = datetime.fromisoformat(meta["last_fetch_at"]) + except ValueError: + return False + return (datetime.now() - last).total_seconds() < _premium_ttl_sec() + + +async def sync_funding_history(symbol: str, limit: int | None = None) -> list[dict]: + sym = symbol.upper() + n = min(limit or settings.funding_history_limit, 1000) + rows = await binance_client.get_funding_rate_history(sym, n) + try: + cur_map = await get_current_funding_map() + nft = int(cur_map.get(sym, {}).get("nextFundingTime", 0) or 0) + except Exception: + nft = 0 + save_funding_history(sym, rows, next_funding_time=nft) + logger.info("Saved %d funding records for %s", len(rows), sym) + return rows + + +async def get_funding_bundle( + symbol: str, + limit: int | None = None, + force_refresh: bool = False, +) -> dict[str, Any]: + sym = symbol.upper() + n = min(limit or settings.funding_history_limit, 1000) + min_bars = min(n, 10) + + current_map = await get_current_funding_map() + cur = current_map.get(sym, {}) + current = { + "rate": float(cur.get("lastFundingRate", 0) or 0), + "rate_pct": float(cur.get("lastFundingRate", 0) or 0) * 100, + "next_funding_time": int(cur.get("nextFundingTime", 0) or 0), + "mark_price": float(cur.get("markPrice", 0) or 0), + } + + if not force_refresh and _is_history_fresh(sym, min_bars): + history = get_funding_history_from_db(sym, n) + if len(history) >= min_bars: + return { + "symbol": sym, + "current": current, + "history": history, + "source": "db", + } + + stored = get_funding_history_from_db(sym, n) + if binance_client.is_rate_limited(): + if stored: + return {"symbol": sym, "current": current, "history": stored, "source": "db_stale"} + raise BinanceRateLimitedError(binance_client.rate_limit_remaining_sec(), sym) + + try: + history = await sync_funding_history(sym, n) + return {"symbol": sym, "current": current, "history": history, "source": "binance"} + except BinanceRateLimitedError: + if stored: + return {"symbol": sym, "current": current, "history": stored, "source": "db_stale"} + raise + except Exception: + if stored: + return {"symbol": sym, "current": current, "history": stored, "source": "db_stale"} + raise + + +async def enrich_items_with_funding(items: list[dict]) -> list[dict]: + try: + current_map = await get_current_funding_map() + except Exception as e: + logger.warning("Funding current map failed: %s", e) + current_map = {} + + for item in items: + sym = item.get("symbol", "") + info = current_map.get(sym, {}) + rate = float(info.get("lastFundingRate", 0) or 0) + item["funding_rate"] = rate + item["funding_rate_pct"] = rate * 100 + item["funding_rate_fmt"] = f"{rate * 100:.4f}%" + nft = info.get("nextFundingTime") + item["next_funding_time"] = int(nft) if nft else None + return items + + +async def prefetch_funding(symbols: list[str]) -> None: + seen: set[str] = set() + try: + await get_current_funding_map() + except Exception as e: + logger.warning("Prefetch premiumIndex failed: %s", e) + + for raw in symbols: + sym = raw.upper().strip() + if not sym or sym in seen: + continue + seen.add(sym) + if _is_history_fresh(sym, 10): + continue + if binance_client.is_rate_limited(): + logger.warning("Prefetch funding stopped — rate limited") + break + try: + await sync_funding_history(sym) + except BinanceRateLimitedError: + break + except Exception as e: + logger.warning("Prefetch funding %s failed: %s", sym, e) diff --git a/backend/app/main.py b/backend/app/main.py index 2a7b4fa..5a09636 100644 --- a/backend/app/main.py +++ b/backend/app/main.py @@ -8,6 +8,7 @@ from fastapi.staticfiles import StaticFiles from .aggregator import aggregate_period, enrich_snapshot_meta from .config import ROOT_DIR, settings +from .funding_store import enrich_items_with_funding, get_funding_bundle from .kline_store import get_daily_candles, sync_daily_klines from .db import get_latest_snapshot, init_db, log_push from .exceptions import BinanceRateLimitedError @@ -62,6 +63,7 @@ async def index(): async def api_yesterday_top30(): snap = get_latest_snapshot("yesterday") if snap: + items = await enrich_items_with_funding(snap["items"]) return { "period_start": snap["period_start"], "period_end": snap["period_end"], @@ -69,7 +71,7 @@ async def api_yesterday_top30(): "top_n": settings.top_n, "volume_threshold": settings.volume_threshold, "change_threshold": settings.change_threshold, - "items": snap["items"], + "items": items, } start, end = get_yesterday_period() try: @@ -87,6 +89,7 @@ async def api_yesterday_top30(): async def api_today_top30(): cached = get_today_cache() if cached: + cached["items"] = await enrich_items_with_funding(cached.get("items", [])) return cached start, end = get_today_period() try: @@ -154,6 +157,20 @@ async def api_chart_daily(symbol: str, limit: int | None = None, refresh: bool = raise HTTPException(502, "K线获取失败") from e +@app.get("/api/funding/{symbol}/history") +async def api_funding_history(symbol: str, limit: int | None = None, refresh: bool = False): + sym = symbol.upper().strip() + if not sym.endswith("USDT"): + raise HTTPException(400, "invalid symbol") + try: + return await get_funding_bundle(sym, limit, force_refresh=refresh) + except BinanceRateLimitedError as e: + raise HTTPException(503, f"币安限流,请 {e.retry_after_sec} 秒后再试") from e + except Exception as e: + logger.error("funding %s failed: %s", sym, e) + raise HTTPException(502, "资金费率获取失败") from e + + @app.post("/api/chart/{symbol}/daily/refresh") async def api_chart_daily_refresh(symbol: str, limit: int | None = None): """强制从币安同步日 K 到本地库。""" diff --git a/backend/app/scheduler.py b/backend/app/scheduler.py index 06cc2a9..2b82385 100644 --- a/backend/app/scheduler.py +++ b/backend/app/scheduler.py @@ -11,6 +11,7 @@ from .db import get_latest_snapshot, init_db, log_push, save_snapshot, was_pushe from .exceptions import BinanceRateLimitedError from .periods import get_today_period, get_yesterday_period, now_shanghai from .state import get_today_cache, set_today_cache +from .funding_store import prefetch_funding from .kline_store import prefetch_symbols from .wecom import build_markdown, send_wecom_markdown @@ -57,6 +58,7 @@ async def job_finalize_yesterday() -> None: syms = [x["symbol"] for x in items if x.get("symbol")] if syms: await prefetch_symbols(syms) + await prefetch_funding(syms) except BinanceRateLimitedError as e: logger.error("Finalize yesterday rate limited %ss", e.retry_after_sec) except Exception as e: @@ -120,6 +122,7 @@ async def job_refresh_today() -> None: syms = [x["symbol"] for x in items if x.get("symbol")] if syms: await prefetch_symbols(syms) + await prefetch_funding(syms) except BinanceRateLimitedError as e: logger.error("Refresh today rate limited %ss — use cache", e.retry_after_sec) _restore_today_from_db() diff --git a/backend/app/wecom.py b/backend/app/wecom.py index 9cdffd3..904f4d5 100644 --- a/backend/app/wecom.py +++ b/backend/app/wecom.py @@ -25,8 +25,8 @@ def build_markdown(snapshot: dict) -> str: f"> 统计周期(北京时间 8:00 切日)", f"> **{period_label}**", "", - "| 排名 | 合约 | 成交额(USDT) | 涨跌幅 | 标记 |", - "| --- | --- | --- | --- | --- |", + "| 排名 | 合约 | 成交额(USDT) | 涨跌幅 | 资金费率 | 标记 |", + "| --- | --- | --- | --- | --- | --- |", ] for row in items: tags = [] @@ -37,8 +37,9 @@ def build_markdown(snapshot: dict) -> str: tag_str = " ".join(tags) if tags else "-" vol = row.get("quote_volume_fmt") or f"{row.get('quote_volume', 0):.0f}" pct = row.get("price_change_pct_fmt") or f"{row.get('price_change_pct', 0):+.2f}%" + fr = row.get("funding_rate_fmt") or "-" lines.append( - f"| {row['rank']} | {row['symbol']} | {vol} | {pct} | {tag_str} |" + f"| {row['rank']} | {row['symbol']} | {vol} | {pct} | {fr} | {tag_str} |" ) lines.append("") lines.append("> 标记说明:千万+ = 成交额≥1000万 USDT;涨跌5%+ = |涨跌幅|≥5%") diff --git a/web/app.js b/web/app.js index f9b5c47..223a656 100644 --- a/web/app.js +++ b/web/app.js @@ -26,6 +26,7 @@ const SORT_KEYS = { if (r.is_high_change) score += 1; return score; }, + funding_rate: (r) => Number(r.funding_rate_pct) || 0, }; function formatPeriod(start, end) { @@ -88,7 +89,7 @@ function renderTable(tableId, tbody) { const items = sortItems(state.items, state.sortKey, state.sortDir); if (!items.length) { - tbody.innerHTML = '
资金费率历史(约 90 次结算,8h/次)· 虚线为零轴
+北京时间 08:00 切日 · Top30 · 合约右侧 300 日K+成交量 · 点击图表放大查看
+Top30 · 日K+成交量 · 资金费率当前+历史曲线 · 点击图表放大