Files
crypto_monitor/hub_bridge.py
T
2026-05-30 12:08:56 +08:00

332 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
各 crypto_monitor_* 注册 /api/hub/* JSON 接口,供 manual_trading_hub 调用。
实例末尾:app.config["HUB_CTX"] = {...}; register_hub_routes(app)
"""
from __future__ import annotations
import json
import time
from functools import wraps
from flask import current_app, get_flashed_messages, jsonify, redirect, request, session
from hub_auth import request_allowed
from hub_sso import safe_next_path, verify_hub_sso_token
def _hub_auth_required(f):
@wraps(f)
def wrapped(*args, **kwargs):
from flask import current_app as cap
auth_disabled = bool(cap.config.get("HUB_AUTH_DISABLED"))
if not request_allowed(bool(session.get("logged_in")), auth_disabled):
return jsonify({"ok": False, "msg": "未授权(登录或 HUB_BRIDGE_TOKEN"}), 401
return f(*args, **kwargs)
return wrapped
def _ctx():
return current_app.config.get("HUB_CTX") or {}
def _row_to_dict(row):
fn = _ctx().get("row_to_dict")
if fn and row is not None:
return fn(row)
return dict(row) if row is not None else {}
_FAIL_HINTS = (
"失败",
"错误",
"拒绝",
"无效",
"缺少",
"无法",
"过期",
"未达",
"不能为空",
"已有",
"不允许",
"异常",
)
def _invoke_view(view_name: str, path: str, form=None) -> dict:
views = _ctx().get("views") or {}
view = views.get(view_name)
if not view:
return {"ok": False, "messages": [f"未配置视图 {view_name}"]}
data = form if form is not None else request.form
if hasattr(data, "items") and not isinstance(data, dict):
data = {k: v for k, v in data.items()}
with current_app.test_request_context(path, method="POST", data=data):
session["logged_in"] = True
try:
view()
except Exception as e:
return {"ok": False, "messages": [str(e)]}
try:
msgs = [str(x) for x in get_flashed_messages()]
except Exception as e:
return {"ok": False, "messages": [f"读取提示信息失败: {e}"]}
ok = True
for m in msgs:
if any(k in m for k in _FAIL_HINTS):
ok = False
break
return {"ok": ok, "messages": msgs}
def _hub_json(view_name: str, path: str, form=None):
try:
return jsonify(_invoke_view(view_name, path, form=form))
except Exception as e:
return jsonify({"ok": False, "messages": [str(e)]})
def install_on_app(
app,
*,
exchange: str,
capabilities: list,
has_trend: bool,
get_db,
row_to_dict,
meta_fn,
views: dict,
):
app.config["HUB_CTX"] = {
"exchange": exchange,
"capabilities": list(capabilities),
"has_trend": bool(has_trend),
"get_db": get_db,
"row_to_dict": row_to_dict,
"meta_fn": meta_fn,
"views": views,
}
install_hub_embed_headers(app)
register_hub_routes(app)
def install_hub_embed_headers(app):
"""允许复盘中控 iframe 内嵌打开本实例(须与 hub 的 HUB_EMBED_ORIGINS 或域名一致)。"""
import os
allowed = (os.getenv("APP_ALLOW_HUB_EMBED") or "true").strip().lower() in (
"1",
"true",
"yes",
"on",
)
if not allowed:
return
origins = (
(os.getenv("HUB_EMBED_PARENT_ORIGINS") or os.getenv("HUB_EMBED_ORIGINS") or "*")
.strip()
)
@app.after_request
def _hub_embed_frame_headers(response):
if origins == "*":
response.headers["Content-Security-Policy"] = "frame-ancestors *"
else:
response.headers["Content-Security-Policy"] = (
f"frame-ancestors 'self' {origins}"
)
return response
def register_hub_routes(app):
auth_disabled = False
try:
import os
auth_disabled = os.getenv("APP_AUTH_DISABLED", "false").lower() in (
"1",
"true",
"yes",
"on",
)
except Exception:
pass
app.config.setdefault("HUB_AUTH_DISABLED", auth_disabled)
@app.route("/api/hub/ping")
@_hub_auth_required
def api_hub_ping():
c = _ctx()
return jsonify(
{
"ok": True,
"exchange": c.get("exchange"),
"capabilities": c.get("capabilities") or [],
}
)
@app.route("/api/hub/meta")
@_hub_auth_required
def api_hub_meta():
c = _ctx()
meta_fn = c.get("meta_fn")
meta = meta_fn() if callable(meta_fn) else {}
return jsonify({"ok": True, "meta": meta})
@app.route("/api/hub/monitor")
@_hub_auth_required
def api_hub_monitor():
c = _ctx()
get_db = c.get("get_db")
if not get_db:
return jsonify({"ok": False, "msg": "HUB_CTX 缺少 get_db"}), 500
conn = get_db()
keys = []
for row in conn.execute("SELECT * FROM key_monitors ORDER BY id DESC").fetchall():
keys.append(_row_to_dict(row))
orders = []
for row in conn.execute(
"SELECT * FROM order_monitors WHERE status='active' ORDER BY id DESC"
).fetchall():
orders.append(_row_to_dict(row))
trends = []
if c.get("has_trend"):
for row in conn.execute(
"SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC"
).fetchall():
trends.append(_row_to_dict(row))
rolls = []
try:
for row in conn.execute(
"""SELECT g.* FROM roll_groups g
INNER JOIN order_monitors m ON m.id = g.order_monitor_id AND m.status='active'
WHERE g.status='active' ORDER BY g.id DESC"""
).fetchall():
rolls.append(_row_to_dict(row))
except Exception:
pass
conn.close()
enrich = c.get("enrich_monitor")
if callable(enrich):
try:
payload = enrich(keys=keys, orders=orders, trends=trends, rolls=rolls)
if isinstance(payload, dict):
return jsonify({"ok": True, **payload})
except Exception as e:
return jsonify({"ok": False, "msg": str(e)}), 500
return jsonify(
{
"ok": True,
"keys": keys,
"orders": orders,
"trends": trends,
"rolls": rolls,
"key_prices": [],
}
)
@app.route("/api/hub/add_order", methods=["POST"])
@_hub_auth_required
def api_hub_add_order():
return _hub_json("add_order", "/add_order")
@app.route("/api/hub/add_key", methods=["POST"])
@_hub_auth_required
def api_hub_add_key():
return _hub_json("add_key", "/add_key")
@app.route("/api/hub/trend/preview", methods=["POST"])
@_hub_auth_required
def api_hub_trend_preview():
if not _ctx().get("has_trend"):
return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400
data = _invoke_view("preview_trend_pullback", "/trade")
pid = _latest_preview_id()
preview = _fetch_preview(pid) if pid else None
return jsonify(
{
"ok": bool(data.get("ok")),
"messages": data.get("messages") or [],
"preview_id": pid,
"preview": preview,
}
)
@app.route("/api/hub/trend/execute", methods=["POST"])
@_hub_auth_required
def api_hub_trend_execute():
if not _ctx().get("has_trend"):
return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400
pid = (request.form.get("preview_id") or "").strip()
if not pid:
body = request.get_json(silent=True) or {}
pid = str(body.get("preview_id") or "").strip()
form = {"preview_id": pid} if pid else {}
return jsonify(_invoke_view("execute_trend_pullback", "/trade", form=form))
@app.route("/api/hub/trend/preview/<pid>")
@_hub_auth_required
def api_hub_trend_preview_get(pid):
if not _ctx().get("has_trend"):
return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400
preview = _fetch_preview(pid)
if not preview:
return jsonify({"ok": False, "msg": "预览不存在或已过期"}), 404
return jsonify({"ok": True, "preview": preview})
@app.route("/hub-sso")
def hub_sso_login():
"""中控签发的临时链接:写入 session 后跳转,直链访问仍走 /login。"""
auth_disabled = bool(current_app.config.get("HUB_AUTH_DISABLED"))
next_arg = request.args.get("next")
if auth_disabled:
session["logged_in"] = True
return redirect(safe_next_path(next_arg))
ex = str((_ctx().get("exchange") or "")).strip().lower()
token = (request.args.get("token") or "").strip()
ok, next_path, _err = verify_hub_sso_token(token, ex)
if ok:
session["logged_in"] = True
return redirect(next_path)
return redirect("/login")
def _latest_preview_id():
get_db = _ctx().get("get_db")
if not get_db:
return None
conn = get_db()
row = conn.execute(
"SELECT id FROM trend_pullback_previews ORDER BY created_at DESC LIMIT 1"
).fetchone()
conn.close()
return row["id"] if row else None
def _fetch_preview(pid):
get_db = _ctx().get("get_db")
if not get_db or not pid:
return None
conn = get_db()
row = conn.execute(
"SELECT * FROM trend_pullback_previews WHERE id=?", (pid,)
).fetchone()
conn.close()
if not row:
return None
d = _row_to_dict(row)
now_ms = int(time.time() * 1000)
d["expires_in_sec"] = max(0, int((int(d.get("expires_at_ms") or 0) - now_ms) / 1000))
try:
grid = json.loads(d.get("grid_prices_json") or "[]")
legs = json.loads(d.get("leg_amounts_json") or "[]")
d["grid_levels"] = [
{"i": i + 1, "price": grid[i], "contracts": legs[i] if i < len(legs) else None}
for i in range(len(grid))
]
except Exception:
d["grid_levels"] = []
return d