中控行情区与 K 线本地库(15 天滚动、按需拉取)

新增行情区单图与周期切换,K 线优先读 hub_kline.db,不足时经各实例 /api/hub/ohlcv 补齐;无后台定时更新。含回滚标签说明与单元测试。

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-02 10:58:59 +08:00
parent ef99fb6c2e
commit ba681c7a58
16 changed files with 1298 additions and 3 deletions
+1
View File
@@ -16,6 +16,7 @@
**/.env.bak
**/.env.local
manual_trading_hub/hub_settings.json
manual_trading_hub/data/
# 数据库与上传(运行时生成)
**/*.sqlite
+17
View File
@@ -7950,6 +7950,22 @@ def _hub_meta_bundle():
}
def _hub_fetch_ohlcv(symbol, timeframe, since_ms=None, limit=500):
from hub_ohlcv_lib import fetch_ohlcv_for_hub
return fetch_ohlcv_for_hub(
symbol=symbol,
timeframe=timeframe,
since_ms=since_ms,
limit=limit,
normalize_symbol_input=normalize_symbol_input,
normalize_exchange_symbol=normalize_exchange_symbol,
ensure_markets_loaded=ensure_markets_loaded,
exchange=exchange,
friendly_error=friendly_exchange_error,
)
try:
import sys
from pathlib import Path
@@ -7968,6 +7984,7 @@ try:
row_to_dict=row_to_dict,
meta_fn=_hub_meta_bundle,
views={"add_order": add_order, "add_key": add_key},
ohlcv_fn=_hub_fetch_ohlcv,
)
except Exception as _hub_err:
print(f"[hub_bridge] binance: {_hub_err}")
+17
View File
@@ -8127,6 +8127,22 @@ def _hub_meta_bundle():
}
def _hub_fetch_ohlcv(symbol, timeframe, since_ms=None, limit=500):
from hub_ohlcv_lib import fetch_ohlcv_for_hub
return fetch_ohlcv_for_hub(
symbol=symbol,
timeframe=timeframe,
since_ms=since_ms,
limit=limit,
normalize_symbol_input=normalize_symbol_input,
normalize_exchange_symbol=normalize_exchange_symbol,
ensure_markets_loaded=ensure_markets_loaded,
exchange=exchange,
friendly_error=friendly_exchange_error,
)
try:
import sys
from pathlib import Path
@@ -8145,6 +8161,7 @@ try:
row_to_dict=row_to_dict,
meta_fn=_hub_meta_bundle,
views={"add_order": add_order, "add_key": add_key},
ohlcv_fn=_hub_fetch_ohlcv,
)
except Exception as _hub_err:
print(f"[hub_bridge] gate: {_hub_err}")
+17
View File
@@ -7479,6 +7479,22 @@ def _hub_meta_bundle():
}
def _hub_fetch_ohlcv(symbol, timeframe, since_ms=None, limit=500):
from hub_ohlcv_lib import fetch_ohlcv_for_hub
return fetch_ohlcv_for_hub(
symbol=symbol,
timeframe=timeframe,
since_ms=since_ms,
limit=limit,
normalize_symbol_input=normalize_symbol_input,
normalize_exchange_symbol=normalize_exchange_symbol,
ensure_markets_loaded=ensure_markets_loaded,
exchange=exchange,
friendly_error=friendly_exchange_error,
)
try:
import sys
from pathlib import Path
@@ -7502,6 +7518,7 @@ try:
"preview_trend_pullback": preview_trend_pullback,
"execute_trend_pullback": execute_trend_pullback,
},
ohlcv_fn=_hub_fetch_ohlcv,
)
except Exception as _hub_err:
print(f"[hub_bridge] gate_bot: {_hub_err}")
+17
View File
@@ -7438,6 +7438,22 @@ def _hub_meta_bundle():
}
def _hub_fetch_ohlcv(symbol, timeframe, since_ms=None, limit=500):
from hub_ohlcv_lib import fetch_ohlcv_for_hub
return fetch_ohlcv_for_hub(
symbol=symbol,
timeframe=timeframe,
since_ms=since_ms,
limit=limit,
normalize_symbol_input=normalize_symbol_input,
normalize_exchange_symbol=normalize_okx_symbol,
ensure_markets_loaded=ensure_markets_loaded,
exchange=exchange,
friendly_error=friendly_okx_error,
)
try:
import sys
from pathlib import Path
@@ -7456,6 +7472,7 @@ try:
row_to_dict=row_to_dict,
meta_fn=_hub_meta_bundle,
views={"add_order": add_order, "add_key": add_key},
ohlcv_fn=_hub_fetch_ohlcv,
)
except Exception as _hub_err:
print(f"[hub_bridge] okx: {_hub_err}")
+26
View File
@@ -111,6 +111,7 @@ def install_on_app(
row_to_dict,
meta_fn,
views: dict,
ohlcv_fn=None,
):
app.config["HUB_CTX"] = {
"exchange": exchange,
@@ -120,6 +121,7 @@ def install_on_app(
"row_to_dict": row_to_dict,
"meta_fn": meta_fn,
"views": views,
"ohlcv_fn": ohlcv_fn,
}
install_hub_embed_headers(app)
configure_hub_embed_session(app)
@@ -294,6 +296,30 @@ def register_hub_routes(app):
}
)
@app.route("/api/hub/ohlcv")
@_hub_auth_required
def api_hub_ohlcv():
fn = _ctx().get("ohlcv_fn")
if not callable(fn):
return jsonify({"ok": False, "msg": "该实例未配置 OHLCV 接口"}), 501
symbol = (request.args.get("symbol") or "").strip()
timeframe = (request.args.get("timeframe") or "5m").strip()
since_raw = (request.args.get("since_ms") or "").strip()
limit_raw = (request.args.get("limit") or "").strip()
since_ms = None
if since_raw.isdigit():
since_ms = int(since_raw)
limit = 500
if limit_raw.isdigit():
limit = int(limit_raw)
try:
result = fn(symbol=symbol, timeframe=timeframe, since_ms=since_ms, limit=limit)
if isinstance(result, dict):
return jsonify(result)
return jsonify({"ok": False, "msg": "OHLCV 返回格式无效"}), 500
except Exception as e:
return jsonify({"ok": False, "msg": str(e)}), 500
@app.route("/api/hub/add_order", methods=["POST"])
@_hub_auth_required
def api_hub_add_order():
+317
View File
@@ -0,0 +1,317 @@
"""中控 K 线 SQLite 缓存:按需拉取、15 天滚动保留。"""
from __future__ import annotations
import os
import sqlite3
import time
from pathlib import Path
from typing import Any, Callable, Optional
from hub_ohlcv_lib import (
TIMEFRAME_MS,
bar_limit_for_timeframe,
format_price_by_tick,
last_closed_bar_open_ms,
normalize_chart_timeframe,
window_start_ms,
)
_DEFAULT_RETENTION_DAYS = 15
def retention_days() -> int:
try:
return max(1, int(os.getenv("HUB_KLINE_RETENTION_DAYS", str(_DEFAULT_RETENTION_DAYS))))
except ValueError:
return _DEFAULT_RETENTION_DAYS
def default_db_path() -> Path:
raw = (os.getenv("HUB_KLINE_DB_PATH") or "").strip()
if raw:
return Path(raw)
hub_dir = Path(__file__).resolve().parent / "manual_trading_hub" / "data"
hub_dir.mkdir(parents=True, exist_ok=True)
return hub_dir / "hub_kline.db"
def _connect(db_path: Path | None = None) -> sqlite3.Connection:
path = db_path or default_db_path()
path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(path), timeout=30, isolation_level=None)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
return conn
def init_db(db_path: Path | None = None) -> None:
conn = _connect(db_path)
try:
conn.execute(
"""
CREATE TABLE IF NOT EXISTS ohlcv_bars (
exchange_key TEXT NOT NULL,
symbol TEXT NOT NULL,
timeframe TEXT NOT NULL,
open_time_ms INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume REAL NOT NULL DEFAULT 0,
updated_at INTEGER NOT NULL,
PRIMARY KEY (exchange_key, symbol, timeframe, open_time_ms)
)
"""
)
conn.execute(
"""
CREATE INDEX IF NOT EXISTS idx_ohlcv_series
ON ohlcv_bars (exchange_key, symbol, timeframe, open_time_ms)
"""
)
finally:
conn.close()
def purge_retention(db_path: Path | None = None, *, days: int | None = None) -> int:
"""删除早于 retention 的 K 线;返回删除行数。"""
keep = days if days is not None else retention_days()
cutoff = int(time.time() * 1000) - keep * 86400000
conn = _connect(db_path)
try:
cur = conn.execute("DELETE FROM ohlcv_bars WHERE open_time_ms < ?", (cutoff,))
return int(cur.rowcount or 0)
finally:
conn.close()
def upsert_bars(
exchange_key: str,
symbol: str,
timeframe: str,
bars: list[dict[str, Any]],
db_path: Path | None = None,
) -> int:
if not bars:
return 0
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
tf = normalize_chart_timeframe(timeframe)
now = int(time.time())
conn = _connect(db_path)
n = 0
try:
for b in bars:
try:
oms = int(b["open_time_ms"])
conn.execute(
"""
INSERT INTO ohlcv_bars
(exchange_key, symbol, timeframe, open_time_ms, open, high, low, close, volume, updated_at)
VALUES (?,?,?,?,?,?,?,?,?,?)
ON CONFLICT(exchange_key, symbol, timeframe, open_time_ms) DO UPDATE SET
open=excluded.open,
high=excluded.high,
low=excluded.low,
close=excluded.close,
volume=excluded.volume,
updated_at=excluded.updated_at
""",
(
ex_k,
sym,
tf,
oms,
float(b["open"]),
float(b["high"]),
float(b["low"]),
float(b["close"]),
float(b.get("volume") or 0),
now,
),
)
n += 1
except (KeyError, TypeError, ValueError):
continue
finally:
conn.close()
return n
def load_bars_range(
exchange_key: str,
symbol: str,
timeframe: str,
start_ms: int,
end_ms: int,
db_path: Path | None = None,
) -> list[dict[str, Any]]:
ex_k = (exchange_key or "").strip().lower()
sym = (symbol or "").strip().upper()
tf = normalize_chart_timeframe(timeframe)
conn = _connect(db_path)
try:
rows = conn.execute(
"""
SELECT open_time_ms, open, high, low, close, volume
FROM ohlcv_bars
WHERE exchange_key=? AND symbol=? AND timeframe=?
AND open_time_ms >= ? AND open_time_ms <= ?
ORDER BY open_time_ms ASC
""",
(ex_k, sym, tf, int(start_ms), int(end_ms)),
).fetchall()
return [
{
"open_time_ms": int(r["open_time_ms"]),
"open": float(r["open"]),
"high": float(r["high"]),
"low": float(r["low"]),
"close": float(r["close"]),
"volume": float(r["volume"] or 0),
}
for r in rows
]
finally:
conn.close()
def _to_chart_candles(bars: list[dict[str, Any]]) -> list[dict[str, Any]]:
out = []
for b in bars:
try:
out.append(
{
"time": int(b["open_time_ms"] // 1000),
"open": float(b["open"]),
"high": float(b["high"]),
"low": float(b["low"]),
"close": float(b["close"]),
"volume": float(b.get("volume") or 0),
}
)
except (KeyError, TypeError, ValueError):
continue
return out
def _merge_bars(*groups: list[dict[str, Any]]) -> list[dict[str, Any]]:
merged: dict[int, dict[str, Any]] = {}
for g in groups:
for b in g or []:
try:
merged[int(b["open_time_ms"])] = b
except (KeyError, TypeError, ValueError):
continue
return [merged[k] for k in sorted(merged.keys())]
def resolve_chart_bars(
exchange_key: str,
symbol: str,
timeframe: str,
remote_fetch: Callable[..., dict[str, Any]],
*,
db_path: Path | None = None,
force_refresh: bool = False,
) -> dict[str, Any]:
"""
按需:先读库,不足则 remote_fetch(symbol, timeframe, since_ms, limit) 补齐并写库。
无后台定时任务;每次调用时顺带 purge 15 天前数据。
"""
init_db(db_path)
purged = purge_retention(db_path)
sym = (symbol or "").strip().upper()
ex_k = (exchange_key or "").strip().lower()
tf = normalize_chart_timeframe(timeframe)
if not sym or not ex_k:
return {"ok": False, "msg": "缺少 exchange 或 symbol"}
need = bar_limit_for_timeframe(tf)
now_ms = int(time.time() * 1000)
start_ms = window_start_ms(tf, need, retention_days(), now_ms)
last_closed = last_closed_bar_open_ms(tf, now_ms)
db_rows: list[dict[str, Any]] = []
if not force_refresh:
period_ms = TIMEFRAME_MS[tf]
db_rows = load_bars_range(
ex_k, sym, tf, max(0, start_ms - period_ms), now_ms + period_ms, db_path
)
newest_db = db_rows[-1]["open_time_ms"] if db_rows else None
period_ms = TIMEFRAME_MS[tf]
newest_ok = newest_db is not None and int(newest_db) >= int(last_closed) - period_ms
need_fetch = force_refresh or len(db_rows) < need or not newest_ok
fetched = 0
price_tick: Optional[float] = None
remote_err: Optional[str] = None
if need_fetch:
since = start_ms
if db_rows and not force_refresh:
since = min(since, int(db_rows[0]["open_time_ms"]))
remote = remote_fetch(
symbol=sym,
timeframe=tf,
since_ms=since,
limit=need + 20,
)
if remote.get("ok") and remote.get("bars"):
fetched = upsert_bars(ex_k, sym, tf, remote["bars"], db_path)
price_tick = remote.get("price_tick")
db_rows = load_bars_range(ex_k, sym, tf, start_ms, now_ms, db_path)
else:
remote_err = remote.get("msg") or remote.get("error") or "实例拉取 K 线失败"
if not db_rows:
return {"ok": False, "msg": remote_err, "purged": purged}
if len(db_rows) > need:
db_rows = db_rows[-need:]
candles = _to_chart_candles(db_rows)
if not candles:
return {"ok": False, "msg": remote_err or "无 K 线数据", "purged": purged}
from_cache = max(0, len(candles) - (1 if fetched else 0))
if fetched:
from_cache = max(0, len(candles) - min(fetched, len(candles)))
hi = max(candles, key=lambda x: x["high"])
lo = min(candles, key=lambda x: x["low"])
return {
"ok": True,
"symbol": sym,
"exchange_key": ex_k,
"timeframe": tf,
"limit": need,
"retention_days": retention_days(),
"candles": candles,
"from_cache": from_cache,
"fetched": fetched,
"purged": purged,
"price_tick": price_tick,
"range_high": {"time": hi["time"], "price": hi["high"]},
"range_low": {"time": lo["time"], "price": lo["low"]},
"stale": bool(remote_err),
"stale_message": remote_err if remote_err else None,
"updated_at": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
}
def format_ohlcv_detail(bar: dict[str, Any] | None, tick: Optional[float]) -> dict[str, str]:
if not bar:
return {"open": "-", "high": "-", "low": "-", "close": "-", "volume": "-"}
return {
"open": format_price_by_tick(bar.get("open"), tick),
"high": format_price_by_tick(bar.get("high"), tick),
"low": format_price_by_tick(bar.get("low"), tick),
"close": format_price_by_tick(bar.get("close"), tick),
"volume": format_price_by_tick(bar.get("volume"), tick),
}
+189
View File
@@ -0,0 +1,189 @@
"""中控行情区:各实例 ccxt OHLCV 拉取(hub_bridge /api/hub/ohlcv 共用)。"""
from __future__ import annotations
import math
import time
from typing import Any, Callable, Optional
CHART_TIMEFRAMES = frozenset({"1m", "5m", "15m", "1h", "4h", "1d", "1w"})
DAILY_PLUS_TIMEFRAMES = frozenset({"1d", "1w"})
TIMEFRAME_MS: dict[str, int] = {
"1m": 60_000,
"5m": 5 * 60_000,
"15m": 15 * 60_000,
"1h": 60 * 60_000,
"4h": 4 * 60 * 60_000,
"1d": 24 * 60 * 60_000,
"1w": 7 * 24 * 60 * 60_000,
}
def normalize_chart_timeframe(raw: str | None, default: str = "5m") -> str:
tf = (raw or default).strip().lower()
return tf if tf in CHART_TIMEFRAMES else default
def bar_limit_for_timeframe(timeframe: str) -> int:
tf = normalize_chart_timeframe(timeframe)
return 500 if tf in DAILY_PLUS_TIMEFRAMES else 1000
def last_closed_bar_open_ms(timeframe: str, now_ms: int | None = None) -> int:
"""上一根已收盘 K 的 open_time(毫秒 UTC)。"""
tf = normalize_chart_timeframe(timeframe)
period = TIMEFRAME_MS[tf]
now = int(now_ms if now_ms is not None else time.time() * 1000)
current_open = (now // period) * period
return int(current_open - period)
def window_start_ms(timeframe: str, need: int, retention_days: int, now_ms: int | None = None) -> int:
now = int(now_ms if now_ms is not None else time.time() * 1000)
period = TIMEFRAME_MS[normalize_chart_timeframe(timeframe)]
retention_cutoff = now - max(1, int(retention_days)) * 86400000
want = now - max(1, int(need)) * period
return max(retention_cutoff, want)
def price_tick_from_market(exchange, exchange_symbol: str) -> Optional[float]:
try:
markets = getattr(exchange, "markets", None) or {}
m = markets.get(exchange_symbol) or {}
prec = m.get("precision") or {}
p = prec.get("price")
if p is not None:
p = float(p)
if p > 0:
return p
info = m.get("info") or {}
for key in ("tickSize", "price_increment", "order_price_round"):
if info.get(key) not in (None, ""):
try:
v = float(info[key])
if v > 0:
return v
except (TypeError, ValueError):
pass
except Exception:
pass
return None
def format_price_by_tick(value: Any, tick: Optional[float]) -> str:
if value in (None, ""):
return "-"
try:
v = float(value)
except (TypeError, ValueError):
return str(value)
if v == 0:
return "0"
if tick and tick > 0:
decimals = max(0, min(12, int(round(-math.log10(tick))) if tick < 1 else 0))
if tick >= 1:
decimals = 0
text = f"{v:.{decimals}f}"
return text.rstrip("0").rstrip(".") if "." in text else text
av = abs(v)
if av >= 10000:
d = 2
elif av >= 100:
d = 3
elif av >= 1:
d = 4
elif av >= 0.01:
d = 6
else:
d = 8
text = f"{v:.{d}f}"
return text.rstrip("0").rstrip(".") if "." in text else text
def _bars_to_dicts(ohlcv: list) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for bar in ohlcv or []:
if not bar or len(bar) < 6:
continue
try:
out.append(
{
"open_time_ms": int(bar[0]),
"open": float(bar[1]),
"high": float(bar[2]),
"low": float(bar[3]),
"close": float(bar[4]),
"volume": float(bar[5]),
}
)
except (TypeError, ValueError):
continue
return out
def fetch_ohlcv_for_hub(
*,
symbol: str,
timeframe: str,
since_ms: int | None = None,
limit: int = 500,
normalize_symbol_input: Callable[[Any], str],
normalize_exchange_symbol: Callable[[str], str],
ensure_markets_loaded: Callable[[], None],
exchange,
friendly_error: Callable[[Exception], str] | None = None,
) -> dict[str, Any]:
"""从 ccxt 拉 OHLCV,供 hub_bridge /api/hub/ohlcv 返回。"""
tf = normalize_chart_timeframe(timeframe)
sym = normalize_symbol_input(symbol)
if not sym:
return {"ok": False, "msg": "symbol 不能为空"}
try:
ensure_markets_loaded()
ex_sym = normalize_exchange_symbol(sym)
want = max(1, min(int(limit or bar_limit_for_timeframe(tf)), 1500))
chunk_max = 300
collected: list = []
if since_ms is not None and int(since_ms) > 0:
since = int(since_ms)
guard = 0
while len(collected) < want and guard < 20:
guard += 1
batch = exchange.fetch_ohlcv(
ex_sym, timeframe=tf, since=since, limit=min(chunk_max, want - len(collected))
)
if not batch:
break
collected.extend(batch)
since = int(batch[-1][0]) + 1
if len(batch) < min(chunk_max, want - len(collected)):
break
else:
batch = exchange.fetch_ohlcv(ex_sym, timeframe=tf, limit=want)
collected = list(batch or [])
bars = _bars_to_dicts(collected)
if not bars:
return {"ok": False, "msg": "交易所未返回 K 线"}
tick = price_tick_from_market(exchange, ex_sym)
uniq: dict[int, dict] = {}
for b in bars:
uniq[int(b["open_time_ms"])] = b
merged = [uniq[k] for k in sorted(uniq.keys())]
if len(merged) > want:
merged = merged[-want:]
return {
"ok": True,
"symbol": sym,
"exchange_symbol": ex_sym,
"timeframe": tf,
"price_tick": tick,
"bars": merged,
}
except Exception as e:
msg = friendly_error(e) if friendly_error else str(e)
return {"ok": False, "msg": f"K线加载失败:{msg}"}
+4
View File
@@ -62,6 +62,10 @@ HUB_TRUST_LAN=true
# 为 false 时不拉各实例 /api/price_snapshot(关键位门控简化为「-」,首屏明显更快)
# HUB_BOARD_KEY_PRICES=true
# ---------- 行情区 K 线库(data/hub_kline.db,默认保留 15 天)----------
# HUB_KLINE_RETENTION_DAYS=15
# HUB_KLINE_DB_PATH=/opt/crypto_monitor/manual_trading_hub/data/hub_kline.db
# --- 子代理 agent.py(在 crypto_monitor_* 目录启动时另设 EXCHANGE / PORT---
# 与 HUB_BRIDGE_TOKEN 一致时可只设其一;agent 校验请求头 X-Control-Token
# CONTROL_TOKEN=your-long-random-token
+20
View File
@@ -0,0 +1,20 @@
# 更新前快照(行情区 + K 线库)
更新前已打 Git 标签,回滚方式:
```bash
cd /opt/crypto_monitor # 或你的仓库路径
git fetch --tags
git checkout snapshot/pre-hub-market-20260528
# 恢复后重启:
pm2 restart manual-trading-hub crypto_okx crypto_binance crypto_gate crypto_gate_bot
```
回到最新主线:
```bash
git checkout main
git pull
```
K 线数据库(不纳入 Git):`manual_trading_hub/data/hub_kline.db`,回滚代码不会自动删除该文件。
+119 -1
View File
@@ -13,6 +13,9 @@ _REPO_ROOT = Path(__file__).resolve().parent.parent
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
from hub_kline_store import format_ohlcv_detail, resolve_chart_bars, retention_days
from hub_ohlcv_lib import CHART_TIMEFRAMES, bar_limit_for_timeframe
from env_load import load_hub_dotenv
load_hub_dotenv()
@@ -66,7 +69,7 @@ _allow_pub_raw = (os.getenv("HUB_ALLOW_PUBLIC") or "").strip().lower()
# 云服务器 + 域名反代时设为 true:不做 IP 限制,仅靠 HUB_PASSWORD / 登录页保护
HUB_ALLOW_PUBLIC = _allow_pub_raw in ("1", "true", "yes", "on")
DIR = Path(__file__).resolve().parent
HUB_BUILD = "20260526-hub-key3col"
HUB_BUILD = "20260528-hub-market"
HUB_AGENT_TIMEOUT = float(os.getenv("HUB_AGENT_TIMEOUT", "8"))
HUB_FLASK_TIMEOUT = float(os.getenv("HUB_FLASK_TIMEOUT", "10"))
_board_key_prices_raw = (os.getenv("HUB_BOARD_KEY_PRICES", "true") or "").strip().lower()
@@ -258,6 +261,7 @@ def root_redirect():
@app.get("/monitor")
@app.get("/market")
@app.get("/settings")
def shell_pages():
return _shell_page()
@@ -294,6 +298,120 @@ def api_save_settings(body: SettingsBody):
return {"ok": True, "settings": load_settings()}
def _find_exchange_by_key(exchange_key: str) -> dict | None:
key = (exchange_key or "").strip().lower()
if not key:
return None
for ex in load_settings().get("exchanges") or []:
if str(ex.get("key") or "").strip().lower() == key:
return ex
if str(ex.get("id") or "").strip() == exchange_key.strip():
return ex
return None
def _fetch_instance_ohlcv_sync(
ex: dict,
*,
symbol: str,
timeframe: str,
since_ms: int | None,
limit: int,
) -> dict:
base = (ex.get("flask_url") or "").rstrip("/")
if not base:
return {"ok": False, "msg": "未配置 flask_url"}
params = {"symbol": symbol, "timeframe": timeframe, "limit": str(int(limit))}
if since_ms is not None and int(since_ms) > 0:
params["since_ms"] = str(int(since_ms))
url = f"{base}/api/hub/ohlcv?{urlencode(params)}"
try:
with httpx.Client(timeout=HUB_FLASK_TIMEOUT) as client:
r = client.get(url, headers=_hub_headers())
if r.status_code >= 400:
parsed = _parse_http_json_body(r)
parsed.setdefault("ok", False)
return parsed
data = r.json() if r.content else {}
return data if isinstance(data, dict) else {"ok": False, "msg": "无效 JSON"}
except Exception as e:
return {"ok": False, "msg": str(e)}
@app.get("/api/chart/meta")
def api_chart_meta():
tfs = ["1m", "5m", "15m", "1h", "4h", "1d", "1w"]
exchanges = []
for ex in enabled_exchanges(load_settings()):
exchanges.append(
{
"id": ex.get("id"),
"key": ex.get("key"),
"name": ex.get("name"),
}
)
return {
"ok": True,
"timeframes": [tf for tf in tfs if tf in CHART_TIMEFRAMES],
"retention_days": retention_days(),
"limits": {tf: bar_limit_for_timeframe(tf) for tf in tfs if tf in CHART_TIMEFRAMES},
"exchanges": exchanges,
}
@app.get("/api/chart/ohlcv")
def api_chart_ohlcv(
exchange_key: str = "",
symbol: str = "",
timeframe: str = "5m",
refresh: str = "",
):
ex = _find_exchange_by_key(exchange_key)
if not ex:
raise HTTPException(status_code=400, detail="交易所不存在")
if not ex.get("enabled"):
raise HTTPException(status_code=400, detail="该交易所未启用")
sym = (symbol or "").strip().upper()
if not sym:
raise HTTPException(status_code=400, detail="请输入币种")
ex_key = str(ex.get("key") or "").strip().lower()
force = (refresh or "").strip().lower() in ("1", "true", "yes", "on")
def remote_fetch(**kwargs):
return _fetch_instance_ohlcv_sync(
ex,
symbol=kwargs.get("symbol") or sym,
timeframe=kwargs.get("timeframe") or timeframe,
since_ms=kwargs.get("since_ms"),
limit=int(kwargs.get("limit") or bar_limit_for_timeframe(timeframe)),
)
result = resolve_chart_bars(
ex_key,
sym,
timeframe,
remote_fetch,
force_refresh=force,
)
if not result.get("ok"):
raise HTTPException(status_code=502, detail=result.get("msg") or "K线加载失败")
tick = result.get("price_tick")
last = result["candles"][-1] if result.get("candles") else None
result["ohlcv"] = format_ohlcv_detail(
{
"open": last.get("open") if last else None,
"high": last.get("high") if last else None,
"low": last.get("low") if last else None,
"close": last.get("close") if last else None,
"volume": last.get("volume") if last else None,
}
if last
else None,
tick,
)
return result
@app.get("/api/settings/meta")
def api_settings_meta():
po = public_origin()
+92
View File
@@ -1920,3 +1920,95 @@ body.login-page {
white-space: normal;
}
}
/* ---------- 行情区 ---------- */
.market-toolbar {
flex-wrap: wrap;
gap: 10px;
align-items: flex-end;
}
.market-field {
display: flex;
flex-direction: column;
gap: 4px;
font-size: 0.72rem;
color: var(--muted);
}
.market-field select,
.market-field input {
min-width: 120px;
padding: 8px 10px;
border-radius: 8px;
border: 1px solid var(--border-soft);
background: var(--bg-elevated);
color: var(--text);
font-family: var(--font);
}
.market-status {
font-size: 0.8rem;
color: var(--muted);
margin: 0 0 10px;
}
.market-status.err {
color: var(--red);
}
.market-status.warn {
color: #ffb84d;
}
.market-chart-wrap {
position: relative;
height: min(72vh, 640px);
min-height: 360px;
border: 1px solid var(--border-soft);
border-radius: var(--radius);
background: #0a1018;
overflow: hidden;
}
.market-chart-host {
width: 100%;
height: 100%;
}
.market-ohlcv-overlay {
position: absolute;
top: 10px;
left: 10px;
z-index: 4;
pointer-events: none;
padding: 10px 12px;
border-radius: 8px;
background: rgba(8, 14, 24, 0.88);
border: 1px solid var(--border-soft);
font-size: 0.78rem;
min-width: 200px;
}
.market-ohlcv-title {
font-weight: 600;
color: var(--accent);
margin-bottom: 6px;
display: flex;
gap: 8px;
}
.market-ohlcv-grid {
display: grid;
grid-template-columns: 1fr 1fr;
gap: 4px 14px;
}
.market-ohlcv-grid .k {
color: var(--muted);
margin-right: 6px;
}
.market-ohlcv-grid .market-vol {
grid-column: 1 / -1;
}
+4
View File
@@ -245,6 +245,7 @@
function currentPage() {
const p = window.location.pathname.replace(/\/$/, "") || "/monitor";
if (p.includes("settings")) return "settings";
if (p.includes("market")) return "market";
return "monitor";
}
@@ -259,6 +260,9 @@
if (page === "monitor") startMonitorPoll();
else stopMonitorPoll();
if (page === "settings") loadSettingsUI();
if (page === "market" && window.hubMarketChart) {
window.hubMarketChart.init();
}
}
function stopMonitorPoll() {
+299
View File
@@ -0,0 +1,299 @@
/**
* 中控行情区单图 + 周期切换数据来自 /api/chart/ohlcv本地库优先
*/
(function () {
const TF_ORDER = ["1m", "5m", "15m", "1h", "4h", "1d", "1w"];
const chartHost = document.getElementById("market-chart");
if (!chartHost) return;
const elExchange = document.getElementById("market-exchange");
const elSymbol = document.getElementById("market-symbol");
const elTf = document.getElementById("market-timeframe");
const elRefresh = document.getElementById("market-refresh");
const elStatus = document.getElementById("market-status");
const elUpdated = document.getElementById("market-updated");
const elO = document.getElementById("mkt-o");
const elH = document.getElementById("mkt-h");
const elL = document.getElementById("mkt-l");
const elC = document.getElementById("mkt-c");
const elV = document.getElementById("mkt-v");
const elSymLabel = document.getElementById("mkt-symbol-label");
const elTfLabel = document.getElementById("mkt-tf-label");
let chart = null;
let candleSeries = null;
let priceTick = null;
let rangeMarkers = [];
let lastCandles = [];
let chartMeta = null;
let loadToken = 0;
let marketInited = false;
function fmtVol(v) {
if (v == null || Number.isNaN(Number(v))) return "-";
const n = Number(v);
if (n >= 1e9) return (n / 1e9).toFixed(2) + "B";
if (n >= 1e6) return (n / 1e6).toFixed(2) + "M";
if (n >= 1e3) return (n / 1e3).toFixed(2) + "K";
return n.toFixed(2);
}
function paintOhlcv(bar) {
if (!bar) {
["o", "h", "l", "c", "v"].forEach(function (k) {
const el = { o: elO, h: elH, l: elL, c: elC, v: elV }[k];
if (el) el.textContent = "-";
});
return;
}
if (elO) elO.textContent = bar.open != null ? String(bar.open) : "-";
if (elH) elH.textContent = bar.high != null ? String(bar.high) : "-";
if (elL) elL.textContent = bar.low != null ? String(bar.low) : "-";
if (elC) elC.textContent = bar.close != null ? String(bar.close) : "-";
if (elV) elV.textContent = fmtVol(bar.volume);
}
function ensureChart() {
if (chart && candleSeries) return true;
if (!window.LightweightCharts) {
if (elStatus) {
elStatus.className = "market-status err";
elStatus.textContent = "图表库加载失败";
}
return false;
}
chart = LightweightCharts.createChart(chartHost, {
layout: { background: { color: "#0a1018" }, textColor: "#b8d4e8" },
grid: { vertLines: { color: "#1a2838" }, horzLines: { color: "#1a2838" } },
rightPriceScale: { borderColor: "#2a4058" },
timeScale: { borderColor: "#2a4058", timeVisible: true, secondsVisible: false },
crosshair: { mode: LightweightCharts.CrosshairMode ? LightweightCharts.CrosshairMode.Normal : 0 },
});
const opts = {
upColor: "#00ff9d",
downColor: "#ff4d6d",
borderVisible: false,
wickUpColor: "#00ff9d",
wickDownColor: "#ff4d6d",
};
if (typeof chart.addCandlestickSeries === "function") {
candleSeries = chart.addCandlestickSeries(opts);
} else if (
typeof chart.addSeries === "function" &&
window.LightweightCharts &&
window.LightweightCharts.CandlestickSeries
) {
candleSeries = chart.addSeries(window.LightweightCharts.CandlestickSeries, opts);
}
if (!candleSeries) return false;
chart.subscribeCrosshairMove(function (param) {
if (!param || !param.time || !param.seriesData) return;
const d = param.seriesData.get(candleSeries);
if (!d) return;
paintOhlcv({
open: d.open,
high: d.high,
low: d.low,
close: d.close,
volume: d.volume,
});
});
window.addEventListener("resize", function () {
if (!chart) return;
chart.applyOptions({ width: chartHost.clientWidth, height: chartHost.clientHeight });
});
chart.applyOptions({ width: chartHost.clientWidth, height: chartHost.clientHeight });
return true;
}
function clearMarkers() {
rangeMarkers.forEach(function (m) {
try {
candleSeries.removePriceLine(m);
} catch (e) {}
});
rangeMarkers = [];
}
function addRangeMarkers(data) {
clearMarkers();
if (!candleSeries || !data) return;
const hi = data.range_high;
const lo = data.range_low;
if (hi && hi.price != null) {
rangeMarkers.push(
candleSeries.createPriceLine({
price: Number(hi.price),
color: "#ffb84d",
lineWidth: 1,
lineStyle: 2,
axisLabelVisible: true,
title: "区间高",
})
);
}
if (lo && lo.price != null) {
rangeMarkers.push(
candleSeries.createPriceLine({
price: Number(lo.price),
color: "#4cd97f",
lineWidth: 1,
lineStyle: 2,
axisLabelVisible: true,
title: "区间低",
})
);
}
}
function readQuery() {
const qs = new URLSearchParams(window.location.search);
const ex = qs.get("exchange_key") || qs.get("exchange") || "";
const sym = qs.get("symbol") || "";
const tf = qs.get("timeframe") || "";
if (ex && elExchange) elExchange.value = ex;
if (sym && elSymbol) elSymbol.value = sym;
if (tf && elTf) elTf.value = tf;
}
async function loadMeta() {
const r = await fetch("/api/chart/meta", { credentials: "same-origin" });
chartMeta = await r.json();
if (!elExchange || !chartMeta.exchanges) return;
elExchange.innerHTML = "";
chartMeta.exchanges.forEach(function (ex) {
const opt = document.createElement("option");
opt.value = ex.key || ex.id;
opt.textContent = ex.name || ex.key;
elExchange.appendChild(opt);
});
readQuery();
}
async function loadChart(force) {
if (!ensureChart()) return;
const exKey = (elExchange && elExchange.value) || "";
const sym = (elSymbol && elSymbol.value.trim().toUpperCase()) || "";
const tf = (elTf && elTf.value) || "5m";
if (!exKey || !sym) {
if (elStatus) {
elStatus.className = "market-status err";
elStatus.textContent = "请选择交易所并输入币种";
}
return;
}
const myToken = ++loadToken;
if (elStatus) {
elStatus.className = "market-status";
elStatus.textContent = "加载中…";
}
if (elSymLabel) elSymLabel.textContent = sym;
if (elTfLabel) elTfLabel.textContent = tf;
const qs = new URLSearchParams({
exchange_key: exKey,
symbol: sym,
timeframe: tf,
});
if (force) qs.set("refresh", "1");
try {
const r = await fetch("/api/chart/ohlcv?" + qs.toString(), { credentials: "same-origin" });
const data = await r.json();
if (myToken !== loadToken) return;
if (!r.ok) {
throw new Error(data.detail || data.msg || "请求失败");
}
if (!data.ok || !data.candles || !data.candles.length) {
throw new Error(data.msg || "无 K 线");
}
priceTick = data.price_tick;
lastCandles = data.candles;
candleSeries.setData(data.candles);
chart.timeScale().fitContent();
addRangeMarkers(data);
const ohlcv = data.ohlcv || {};
paintOhlcv({
open: ohlcv.open,
high: ohlcv.high,
low: ohlcv.low,
close: ohlcv.close,
volume: ohlcv.volume,
});
let hint =
"已加载 " +
data.candles.length +
" 根(库 " +
(data.from_cache || 0) +
" / 新拉 " +
(data.fetched || 0) +
")· 保留 " +
(data.retention_days || 15) +
" 天";
if (data.stale && data.stale_message) {
hint += " · 缓存:" + data.stale_message;
}
if (elStatus) {
elStatus.className = data.stale ? "market-status warn" : "market-status";
elStatus.textContent = hint;
}
if (elUpdated) elUpdated.textContent = data.updated_at || "--";
} catch (e) {
if (myToken !== loadToken) return;
if (elStatus) {
elStatus.className = "market-status err";
elStatus.textContent = String(e.message || e);
}
}
}
function bind() {
if (elRefresh) {
elRefresh.addEventListener("click", function () {
loadChart(true);
});
}
if (elTf) {
elTf.addEventListener("change", function () {
loadChart(false);
});
}
if (elExchange) {
elExchange.addEventListener("change", function () {
loadChart(false);
});
}
if (elSymbol) {
elSymbol.addEventListener("keydown", function (e) {
if (e.key === "Enter") loadChart(false);
});
}
const btnLoad = document.getElementById("market-load");
if (btnLoad) btnLoad.addEventListener("click", function () {
loadChart(false);
});
}
window.hubMarketChart = {
init: async function () {
if (!marketInited) {
marketInited = true;
await loadMeta();
bind();
}
await loadChart(false);
},
reload: function (force) {
loadChart(!!force);
},
};
if (document.getElementById("page-market") && !document.getElementById("page-market").classList.contains("hidden")) {
window.hubMarketChart.init();
}
})();
+62 -2
View File
@@ -8,7 +8,7 @@
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin />
<link href="https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600&family=Orbitron:wght@500;600;700&display=swap" rel="stylesheet" media="print" onload="this.media='all'" />
<noscript><link href="https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;500;600&family=Orbitron:wght@500;600;700&display=swap" rel="stylesheet" /></noscript>
<link rel="stylesheet" href="/assets/app.css?v=20260530-hub-iframe" />
<link rel="stylesheet" href="/assets/app.css?v=20260528-hub-market" />
</head>
<body>
<div class="app-bg" aria-hidden="true"></div>
@@ -25,6 +25,7 @@
<span id="sys-status" class="sys-pill" title="系统状态">SYNC</span>
<nav class="top-nav">
<a href="/monitor" id="nav-monitor">监控区</a>
<a href="/market" id="nav-market">行情区</a>
<a href="/settings" id="nav-settings">系统设置</a>
</nav>
<button type="button" id="btn-logout" class="ghost" title="退出登录">退出</button>
@@ -56,6 +57,63 @@
<div id="monitor-grid" class="grid-monitor"></div>
</div>
<div id="page-market" class="page hidden">
<div class="page-head">
<h1><span class="head-tag">MKT</span> 行情区</h1>
<p class="page-desc">按需拉取 K 线,本地库保留 15 天(无后台自动更新)</p>
</div>
<details class="hint-box">
<summary>数据说明</summary>
<div class="hint-body">
优先读中控 <code>data/hub_kline.db</code>,不足时向所选交易所实例请求并写入库。<br />
日内周期最多 1000 根,日线/周线最多 500 根。仅在本页操作或点「刷新」时拉取交易所。
</div>
</details>
<div class="market-toolbar toolbar">
<label class="market-field">
<span>交易所</span>
<select id="market-exchange"></select>
</label>
<label class="market-field">
<span>币种</span>
<input id="market-symbol" type="text" placeholder="TON/USDT" autocomplete="off" />
</label>
<label class="market-field">
<span>周期</span>
<select id="market-timeframe">
<option value="1m">1m</option>
<option value="5m" selected>5m</option>
<option value="15m">15m</option>
<option value="1h">1h</option>
<option value="4h">4h</option>
<option value="1d">1d</option>
<option value="1w">1w</option>
</select>
</label>
<button type="button" id="market-load" class="primary">加载</button>
<button type="button" id="market-refresh" class="ghost">强制刷新</button>
<span class="toolbar-spacer"></span>
<span id="market-updated" class="toolbar-meta"></span>
</div>
<p id="market-status" class="market-status"></p>
<div class="market-chart-wrap">
<div class="market-ohlcv-overlay" aria-label="K线详情">
<div class="market-ohlcv-title">
<span id="mkt-symbol-label"></span>
<span id="mkt-tf-label">5m</span>
</div>
<div class="market-ohlcv-grid">
<div><span class="k"></span><span id="mkt-o"></span></div>
<div><span class="k"></span><span id="mkt-h"></span></div>
<div><span class="k"></span><span id="mkt-l"></span></div>
<div><span class="k"></span><span id="mkt-c"></span></div>
<div class="market-vol"><span class="k"></span><span id="mkt-v"></span></div>
</div>
</div>
<div id="market-chart" class="market-chart-host"></div>
</div>
</div>
<div id="instance-frame-shell" class="instance-frame-shell hidden" aria-hidden="true">
<div class="instance-frame-toolbar">
<button type="button" id="instance-frame-back" class="ghost">← 返回监控</button>
@@ -120,6 +178,8 @@
</div>
<div id="toast"></div>
<script src="/assets/app.js?v=20260530-hub-embed-sso"></script>
<script src="https://unpkg.com/lightweight-charts@4.2.0/dist/lightweight-charts.standalone.production.js"></script>
<script src="/assets/chart.js?v=20260528-hub-market"></script>
<script src="/assets/app.js?v=20260528-hub-market"></script>
</body>
</html>
+97
View File
@@ -0,0 +1,97 @@
"""中控 K 线库:15 天滚动与按需合并。"""
from __future__ import annotations
import sqlite3
import tempfile
import unittest
from pathlib import Path
from hub_kline_store import (
bar_limit_for_timeframe,
load_bars_range,
purge_retention,
resolve_chart_bars,
retention_days,
upsert_bars,
window_start_ms,
)
from hub_ohlcv_lib import TIMEFRAME_MS
class TestHubKlineStore(unittest.TestCase):
def setUp(self):
self.tmp = tempfile.TemporaryDirectory()
self.db = Path(self.tmp.name) / "test_hub_kline.db"
def tearDown(self):
self.tmp.cleanup()
def test_bar_limits(self):
self.assertEqual(bar_limit_for_timeframe("5m"), 1000)
self.assertEqual(bar_limit_for_timeframe("1d"), 500)
def test_purge_retention(self):
import time
from hub_kline_store import init_db
init_db(self.db)
old_ms = int(time.time() * 1000) - 20 * 86400000
upsert_bars(
"okx",
"BTC/USDT",
"5m",
[
{
"open_time_ms": old_ms,
"open": 1,
"high": 2,
"low": 0.5,
"close": 1.5,
"volume": 10,
}
],
self.db,
)
n = purge_retention(self.db, days=15)
self.assertGreaterEqual(n, 1)
rows = load_bars_range("okx", "BTC/USDT", "5m", old_ms - 1, old_ms + 1, self.db)
self.assertEqual(len(rows), 0)
def test_resolve_uses_cache_without_remote(self):
import time
from hub_kline_store import init_db
from hub_ohlcv_lib import last_closed_bar_open_ms
init_db(self.db)
now = int(time.time() * 1000)
tf = "5m"
period = TIMEFRAME_MS[tf]
last_closed = last_closed_bar_open_ms(tf, now)
bars = []
for i in range(1000):
oms = last_closed - (999 - i) * period
bars.append(
{
"open_time_ms": oms,
"open": 100 + i,
"high": 101 + i,
"low": 99 + i,
"close": 100.5 + i,
"volume": 1000 + i,
}
)
upsert_bars("okx", "ETH/USDT", tf, bars, self.db)
def remote_fetch(**kwargs):
self.fail("不应请求交易所")
out = resolve_chart_bars("okx", "ETH/USDT", tf, remote_fetch, db_path=self.db)
self.assertTrue(out.get("ok"))
self.assertGreaterEqual(len(out.get("candles") or []), 1000)
if __name__ == "__main__":
unittest.main()