bfffc7d984
- fetch_ohlcv_for_hub:无 since 时按目标根数分页拉取(OKX/Gate 单次约 300) - hub_kline_store 全量补拉传 fetch_start_ms - 行情区:数字键切换周期、Ctrl+空格全屏、Esc 退出全屏 Co-authored-by: Cursor <cursoragent@cursor.com>
241 lines
7.6 KiB
Python
241 lines
7.6 KiB
Python
"""中控行情区:各实例 ccxt OHLCV 拉取(hub_bridge /api/hub/ohlcv 共用)。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
import time
|
|
from typing import Any, Callable, Optional
|
|
|
|
CHART_TIMEFRAMES = frozenset({"1m", "5m", "15m", "1h", "4h", "1d", "1w"})
|
|
DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"})
|
|
|
|
TIMEFRAME_MS: dict[str, int] = {
|
|
"1m": 60_000,
|
|
"5m": 5 * 60_000,
|
|
"15m": 15 * 60_000,
|
|
"1h": 60 * 60_000,
|
|
"4h": 4 * 60 * 60_000,
|
|
"1d": 24 * 60 * 60_000,
|
|
"1w": 7 * 24 * 60 * 60_000,
|
|
}
|
|
|
|
|
|
def normalize_chart_timeframe(raw: str | None, default: str = "5m") -> str:
|
|
tf = (raw or default).strip().lower()
|
|
return tf if tf in CHART_TIMEFRAMES else default
|
|
|
|
|
|
def bar_limit_for_timeframe(timeframe: str) -> int:
|
|
tf = normalize_chart_timeframe(timeframe)
|
|
return 500 if tf in DAILY_PLUS_TIMEFRAMES else 1000
|
|
|
|
|
|
def last_closed_bar_open_ms(timeframe: str, now_ms: int | None = None) -> int:
|
|
"""上一根已收盘 K 的 open_time(毫秒 UTC)。"""
|
|
tf = normalize_chart_timeframe(timeframe)
|
|
period = TIMEFRAME_MS[tf]
|
|
now = int(now_ms if now_ms is not None else time.time() * 1000)
|
|
current_open = (now // period) * period
|
|
return int(current_open - period)
|
|
|
|
|
|
def window_start_ms(timeframe: str, need: int, retention_days: int, now_ms: int | None = None) -> int:
|
|
"""本地库清理/读库窗口:不超过 retention_days。"""
|
|
now = int(now_ms if now_ms is not None else time.time() * 1000)
|
|
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
|
|
retention_cutoff = now - max(1, int(retention_days)) * 86400000
|
|
want = now - max(1, int(need)) * period
|
|
return max(retention_cutoff, want)
|
|
|
|
|
|
def chart_fetch_start_ms(timeframe: str, need: int, now_ms: int | None = None) -> int:
|
|
"""行情展示拉取起点:按 need 根回看(日线 500 / 日内 1000),不受 DB 保留天数限制。"""
|
|
now = int(now_ms if now_ms is not None else time.time() * 1000)
|
|
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
|
|
return max(0, now - max(1, int(need)) * period)
|
|
|
|
|
|
def price_tick_from_market(exchange, exchange_symbol: str) -> Optional[float]:
|
|
"""最小价格变动单位(与交易所 tick / price_to_precision 一致)。"""
|
|
try:
|
|
if not getattr(exchange, "markets", None):
|
|
exchange.load_markets()
|
|
market = exchange.market(exchange_symbol)
|
|
except Exception:
|
|
return None
|
|
|
|
info = market.get("info") or {}
|
|
if isinstance(info, dict):
|
|
for key in ("tickSize", "tickSz", "price_increment", "order_price_round", "quote_increment"):
|
|
if info.get(key) not in (None, ""):
|
|
try:
|
|
v = float(info[key])
|
|
if v > 0:
|
|
return v
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
limits = market.get("limits") or {}
|
|
price_limits = limits.get("price") or {}
|
|
if price_limits.get("min") not in (None, ""):
|
|
try:
|
|
v = float(price_limits["min"])
|
|
if v > 0:
|
|
return v
|
|
except (TypeError, ValueError):
|
|
pass
|
|
|
|
try:
|
|
sample = exchange.price_to_precision(exchange_symbol, 12345.678901234)
|
|
s = str(sample).strip()
|
|
if "." in s:
|
|
frac = s.split(".", 1)[1]
|
|
if frac:
|
|
return 10 ** (-len(frac))
|
|
return 1.0
|
|
except Exception:
|
|
pass
|
|
|
|
prec = (market.get("precision") or {}).get("price")
|
|
if prec is not None:
|
|
try:
|
|
p = float(prec)
|
|
if p >= 1 and abs(p - round(p)) < 1e-9 and p <= 12:
|
|
return 10 ** (-int(p))
|
|
if 0 < p < 1:
|
|
return p
|
|
except (TypeError, ValueError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def _decimals_from_tick(tick: float) -> int:
|
|
if tick >= 1:
|
|
return 0
|
|
s = f"{tick:.12f}".rstrip("0")
|
|
if "." in s:
|
|
frac = s.split(".", 1)[1]
|
|
if frac:
|
|
return min(12, len(frac))
|
|
return max(0, min(12, int(round(-math.log10(tick)))))
|
|
|
|
|
|
def format_price_by_tick(value: Any, tick: Optional[float]) -> str:
|
|
if value in (None, ""):
|
|
return "-"
|
|
try:
|
|
v = float(value)
|
|
except (TypeError, ValueError):
|
|
return str(value)
|
|
if v == 0:
|
|
return "0"
|
|
if tick and tick > 0:
|
|
return f"{v:.{_decimals_from_tick(float(tick))}f}"
|
|
av = abs(v)
|
|
if av >= 10000:
|
|
d = 2
|
|
elif av >= 100:
|
|
d = 3
|
|
elif av >= 1:
|
|
d = 4
|
|
elif av >= 0.01:
|
|
d = 6
|
|
else:
|
|
d = 8
|
|
text = f"{v:.{d}f}"
|
|
return text.rstrip("0").rstrip(".") if "." in text else text
|
|
|
|
|
|
def _bars_to_dicts(ohlcv: list) -> list[dict[str, Any]]:
|
|
out: list[dict[str, Any]] = []
|
|
for bar in ohlcv or []:
|
|
if not bar or len(bar) < 6:
|
|
continue
|
|
try:
|
|
out.append(
|
|
{
|
|
"open_time_ms": int(bar[0]),
|
|
"open": float(bar[1]),
|
|
"high": float(bar[2]),
|
|
"low": float(bar[3]),
|
|
"close": float(bar[4]),
|
|
"volume": float(bar[5]),
|
|
}
|
|
)
|
|
except (TypeError, ValueError):
|
|
continue
|
|
return out
|
|
|
|
|
|
def fetch_ohlcv_for_hub(
|
|
*,
|
|
symbol: str,
|
|
timeframe: str,
|
|
since_ms: int | None = None,
|
|
limit: int = 500,
|
|
normalize_symbol_input: Callable[[Any], str],
|
|
normalize_exchange_symbol: Callable[[str], str],
|
|
ensure_markets_loaded: Callable[[], None],
|
|
exchange,
|
|
friendly_error: Callable[[Exception], str] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""从 ccxt 拉 OHLCV,供 hub_bridge /api/hub/ohlcv 返回。"""
|
|
tf = normalize_chart_timeframe(timeframe)
|
|
sym = normalize_symbol_input(symbol)
|
|
if not sym:
|
|
return {"ok": False, "msg": "symbol 不能为空"}
|
|
try:
|
|
ensure_markets_loaded()
|
|
ex_sym = normalize_exchange_symbol(sym)
|
|
want = max(1, min(int(limit or bar_limit_for_timeframe(tf)), 1500))
|
|
chunk_max = 300
|
|
period = TIMEFRAME_MS[tf]
|
|
collected: list = []
|
|
|
|
if since_ms is not None and int(since_ms) > 0:
|
|
since = int(since_ms)
|
|
else:
|
|
# OKX/Gate 等无 since 时单次常被限制在 ~300 根,须从目标起点分页向前拉
|
|
since = max(0, int(time.time() * 1000) - want * period)
|
|
|
|
guard = 0
|
|
prev_since = None
|
|
while len(collected) < want and guard < 80:
|
|
guard += 1
|
|
req_limit = min(chunk_max, want - len(collected))
|
|
batch = exchange.fetch_ohlcv(
|
|
ex_sym, timeframe=tf, since=since, limit=req_limit
|
|
)
|
|
if not batch:
|
|
break
|
|
collected.extend(batch)
|
|
next_since = int(batch[-1][0]) + period
|
|
if prev_since is not None and next_since <= prev_since:
|
|
break
|
|
prev_since = since
|
|
since = next_since
|
|
|
|
bars = _bars_to_dicts(collected)
|
|
if not bars:
|
|
return {"ok": False, "msg": "交易所未返回 K 线"}
|
|
|
|
tick = price_tick_from_market(exchange, ex_sym)
|
|
uniq: dict[int, dict] = {}
|
|
for b in bars:
|
|
uniq[int(b["open_time_ms"])] = b
|
|
merged = [uniq[k] for k in sorted(uniq.keys())]
|
|
if len(merged) > want:
|
|
merged = merged[-want:]
|
|
|
|
return {
|
|
"ok": True,
|
|
"symbol": sym,
|
|
"exchange_symbol": ex_sym,
|
|
"timeframe": tf,
|
|
"price_tick": tick,
|
|
"bars": merged,
|
|
}
|
|
except Exception as e:
|
|
msg = friendly_error(e) if friendly_error else str(e)
|
|
return {"ok": False, "msg": f"K线加载失败:{msg}"}
|