""" 各 crypto_monitor_* 注册 /api/hub/* JSON 接口,供 manual_trading_hub 调用。 实例末尾:app.config["HUB_CTX"] = {...}; register_hub_routes(app) """ from __future__ import annotations import json import time from functools import wraps from flask import ( current_app, flash, get_flashed_messages, jsonify, redirect, request, session, ) from hub_auth import request_allowed from hub_sso import ( mint_hub_embed_bootstrap, safe_next_path, verify_hub_embed_bootstrap, verify_hub_sso_token, ) def _merge_query_into_path(path: str, **params: str) -> str: from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit split = urlsplit(path or "/") q = list(parse_qsl(split.query, keep_blank_values=True)) keys = {k for k, _ in q} for k, v in params.items(): if not v or k in keys: continue q.append((k, str(v))) return urlunsplit((split.scheme, split.netloc, split.path, urlencode(q), split.fragment)) def install_instance_theme_static(app) -> None: """仓库根 static/instance_theme.* 供四所页面共用。""" import os from flask import Response, send_file repo_static = os.path.join(os.path.dirname(os.path.abspath(__file__)), "static") assets = { "instance_theme.js": "application/javascript; charset=utf-8", "instance_theme_early.css": "text/css; charset=utf-8", "instance_theme.css": "text/css; charset=utf-8", "instance_ui.js": "application/javascript; charset=utf-8", "ai_review_render.js": "application/javascript; charset=utf-8", "form_submit_guard.js": "application/javascript; charset=utf-8", "time_close_ui.js": "application/javascript; charset=utf-8", "focus_chart_page.js": "application/javascript; charset=utf-8", "focus_chart_page.css": "text/css; charset=utf-8", } for name, mime in assets.items(): path = os.path.join(repo_static, name) def _view(p=path, m=mime): if not os.path.isfile(p): return Response("not found", status=404, mimetype="text/plain; charset=utf-8") return send_file(p, mimetype=m) app.add_url_rule( f"/static/{name}", endpoint=f"repo_static_{name.replace('.', '_')}", view_func=_view, ) def _hub_auth_required(f): @wraps(f) def wrapped(*args, **kwargs): from flask import current_app as cap auth_disabled = bool(cap.config.get("HUB_AUTH_DISABLED")) if not request_allowed(bool(session.get("logged_in")), auth_disabled): return jsonify({"ok": False, "msg": "未授权(登录或 HUB_BRIDGE_TOKEN)"}), 401 return f(*args, **kwargs) return wrapped def _ctx(): return current_app.config.get("HUB_CTX") or {} def _row_to_dict(row): fn = _ctx().get("row_to_dict") if fn and row is not None: return fn(row) return dict(row) if row is not None else {} def build_hub_monitor_payload( *, keys, orders, trends, rolls, enrich=None, ) -> dict: """合并 enrich 增量字段;enrich 只返回 trends 等局部时不得丢掉 keys/orders。""" payload = { "ok": True, "keys": keys, "orders": orders, "trends": trends, "rolls": rolls, "key_prices": [], } if callable(enrich): extra = enrich(keys=keys, orders=orders, trends=trends, rolls=rolls) if isinstance(extra, dict): payload.update(extra) return payload _FAIL_HINTS = ( "失败", "错误", "拒绝", "无效", "缺少", "无法", "过期", "未达", "不能为空", "已有", "不允许", "异常", ) def _invoke_view(view_name: str, path: str, form=None) -> dict: views = _ctx().get("views") or {} view = views.get(view_name) if not view: return {"ok": False, "messages": [f"未配置视图 {view_name}"]} data = form if form is not None else request.form if hasattr(data, "items") and not isinstance(data, dict): data = {k: v for k, v in data.items()} with current_app.test_request_context(path, method="POST", data=data): session["logged_in"] = True try: view() except Exception as e: return {"ok": False, "messages": [str(e)]} try: msgs = [str(x) for x in get_flashed_messages()] except Exception as e: return {"ok": False, "messages": [f"读取提示信息失败: {e}"]} ok = True for m in msgs: if any(k in m for k in _FAIL_HINTS): ok = False break return {"ok": ok, "messages": msgs} def _invoke_view_get(view_name: str, path: str) -> dict: views = _ctx().get("views") or {} view = views.get(view_name) if not view: return {"ok": False, "messages": [f"未配置视图 {view_name}"]} with current_app.test_request_context(path, method="GET"): session["logged_in"] = True try: view() except Exception as e: return {"ok": False, "messages": [str(e)]} try: msgs = [str(x) for x in get_flashed_messages()] except Exception as e: return {"ok": False, "messages": [f"读取提示信息失败: {e}"]} ok = True for m in msgs: if any(k in m for k in _FAIL_HINTS): ok = False break return {"ok": ok, "messages": msgs} def _hub_json(view_name: str, path: str, form=None): try: return jsonify(_invoke_view(view_name, path, form=form)) except Exception as e: return jsonify({"ok": False, "messages": [str(e)]}) def install_on_app( app, *, exchange: str, capabilities: list, has_trend: bool, get_db, row_to_dict, meta_fn, views: dict, ohlcv_fn=None, account_fn=None, volume_rank_fn=None, reconcile_hub_flat_fn=None, ): app.config["HUB_CTX"] = { "exchange": exchange, "capabilities": list(capabilities), "has_trend": bool(has_trend), "get_db": get_db, "row_to_dict": row_to_dict, "meta_fn": meta_fn, "account_fn": account_fn, "views": views, "ohlcv_fn": ohlcv_fn, "volume_rank_fn": volume_rank_fn, "reconcile_hub_flat_fn": reconcile_hub_flat_fn, } install_hub_embed_headers(app) configure_hub_embed_session(app) install_instance_theme_static(app) register_hub_routes(app) def configure_hub_embed_session(app): """HTTPS iframe 内嵌须 SameSite=None + Secure;hub-sso / hub-embed-auth 自动启用。""" import os allowed = (os.getenv("APP_ALLOW_HUB_EMBED") or "true").strip().lower() in ( "1", "true", "yes", "on", ) if not allowed: return secure_env = (os.getenv("APP_COOKIE_SECURE") or "auto").strip().lower() if secure_env in ("1", "true", "yes", "on"): app.config.update( SESSION_COOKIE_SECURE=True, SESSION_COOKIE_SAMESITE="None", SESSION_COOKIE_HTTPONLY=True, ) return @app.before_request def _hub_embed_session_cookie(): if request.path not in ("/hub-sso", "/hub-embed-auth"): return embed = (request.args.get("embed") or "").strip().lower() in ( "1", "true", "yes", "on", ) in_iframe = (request.headers.get("Sec-Fetch-Dest") or "").lower() == "iframe" if not embed and not in_iframe: return if not request.is_secure: return app.config["SESSION_COOKIE_SECURE"] = True app.config["SESSION_COOKIE_SAMESITE"] = "None" app.config["SESSION_COOKIE_HTTPONLY"] = True def _sso_wants_embed_auth() -> bool: embed = (request.args.get("embed") or "").strip().lower() in ( "1", "true", "yes", "on", ) in_iframe = (request.headers.get("Sec-Fetch-Dest") or "").lower() == "iframe" return bool(embed or in_iframe) def install_hub_embed_headers(app): """允许复盘中控 iframe 内嵌打开本实例(须与 hub 的 HUB_EMBED_ORIGINS 或域名一致)。""" import os allowed = (os.getenv("APP_ALLOW_HUB_EMBED") or "true").strip().lower() in ( "1", "true", "yes", "on", ) if not allowed: return origins = ( (os.getenv("HUB_EMBED_PARENT_ORIGINS") or os.getenv("HUB_EMBED_ORIGINS") or "*") .strip() ) @app.after_request def _hub_embed_frame_headers(response): if origins == "*": response.headers["Content-Security-Policy"] = "frame-ancestors *" else: response.headers["Content-Security-Policy"] = ( f"frame-ancestors 'self' {origins}" ) return response def register_hub_routes(app): auth_disabled = False try: import os auth_disabled = os.getenv("APP_AUTH_DISABLED", "false").lower() in ( "1", "true", "yes", "on", ) except Exception: pass app.config.setdefault("HUB_AUTH_DISABLED", auth_disabled) @app.route("/api/hub/ping") @_hub_auth_required def api_hub_ping(): c = _ctx() return jsonify( { "ok": True, "exchange": c.get("exchange"), "capabilities": c.get("capabilities") or [], } ) @app.route("/api/hub/meta") @_hub_auth_required def api_hub_meta(): c = _ctx() meta_fn = c.get("meta_fn") meta = meta_fn() if callable(meta_fn) else {} return jsonify({"ok": True, "meta": meta}) @app.route("/api/hub/account") @_hub_auth_required def api_hub_account(): """中控 AI:资金账户 / 交易账户余额(无需浏览器登录)。""" fn = _ctx().get("account_fn") if not callable(fn): return jsonify({"ok": False, "msg": "未配置 account_fn"}), 501 try: data = fn() if not isinstance(data, dict): data = {} return jsonify({"ok": True, **data}) except Exception as e: return jsonify({"ok": False, "msg": str(e)}), 500 @app.route("/api/hub/monitor") @_hub_auth_required def api_hub_monitor(): c = _ctx() get_db = c.get("get_db") if not get_db: return jsonify({"ok": False, "msg": "HUB_CTX 缺少 get_db"}), 500 conn = get_db() keys = [] for row in conn.execute("SELECT * FROM key_monitors ORDER BY id DESC").fetchall(): keys.append(_row_to_dict(row)) orders = [] for row in conn.execute( "SELECT * FROM order_monitors WHERE status='active' ORDER BY id DESC" ).fetchall(): od = _row_to_dict(row) try: from strategy_trade_labels import apply_order_monitor_source_labels od = apply_order_monitor_source_labels(od) except Exception: pass orders.append(od) trends = [] if c.get("has_trend"): for row in conn.execute( "SELECT * FROM trend_pullback_plans WHERE status='active' ORDER BY id DESC" ).fetchall(): trends.append(_row_to_dict(row)) rolls = [] try: for row in conn.execute( """SELECT g.* FROM roll_groups g INNER JOIN order_monitors m ON m.id = g.order_monitor_id AND m.status='active' WHERE g.status='active' ORDER BY g.id DESC""" ).fetchall(): rolls.append(_row_to_dict(row)) except Exception: pass conn.close() enrich = c.get("enrich_monitor") if callable(enrich): try: return jsonify( build_hub_monitor_payload( keys=keys, orders=orders, trends=trends, rolls=rolls, enrich=enrich, ) ) except Exception as e: return jsonify({"ok": False, "msg": str(e)}), 500 return jsonify( build_hub_monitor_payload( keys=keys, orders=orders, trends=trends, rolls=rolls, ) ) @app.route("/api/hub/trades/archive") @_hub_auth_required def api_hub_trades_archive(): """中控币种档案:近 N 天已平仓记录。""" from hub_trades_lib import fetch_trades_for_archive, summarize_trades c = _ctx() get_db = c.get("get_db") if not get_db: return jsonify({"ok": False, "msg": "HUB_CTX 缺少 get_db"}), 500 try: days = int(request.args.get("days") or "365") except ValueError: days = 365 try: limit = int(request.args.get("limit") or "2000") except ValueError: limit = 2000 try: import os reset_hour = int(os.getenv("TRADING_DAY_RESET_HOUR", "8") or "8") except ValueError: reset_hour = 8 conn = get_db() try: trades = fetch_trades_for_archive( conn, exchange_key=str(c.get("exchange") or ""), days=days, row_to_dict_fn=c.get("row_to_dict"), reset_hour=reset_hour, limit=limit, ) finally: conn.close() stats = summarize_trades(trades) return jsonify( { "ok": True, "days": max(1, min(days, 3650)), "trading_day_reset_hour": reset_hour, "trades": trades, "stats": stats, } ) @app.route("/api/hub/trades/today") @_hub_auth_required def api_hub_trades_today(): """中控 AI:当日已平仓记录(按实例交易日)。""" from hub_trades_lib import ( current_trading_day, fetch_trades_for_trading_day, summarize_trades, ) c = _ctx() get_db = c.get("get_db") if not get_db: return jsonify({"ok": False, "msg": "HUB_CTX 缺少 get_db"}), 500 day_arg = (request.args.get("trading_day") or request.args.get("date") or "").strip()[:10] try: import os reset_hour = int(os.getenv("TRADING_DAY_RESET_HOUR", "8") or "8") except ValueError: reset_hour = 8 trading_day = day_arg or current_trading_day(reset_hour=reset_hour) conn = get_db() try: trades = fetch_trades_for_trading_day( conn, trading_day, row_to_dict_fn=c.get("row_to_dict"), reset_hour=reset_hour, ) finally: conn.close() stats = summarize_trades(trades) return jsonify( { "ok": True, "trading_day": trading_day, "trading_day_reset_hour": reset_hour, "trades": trades, "stats": stats, } ) @app.route("/api/hub/volume-rank") @_hub_auth_required def api_hub_volume_rank(): fn = _ctx().get("volume_rank_fn") if not callable(fn): return jsonify({"ok": False, "msg": "该实例未配置成交量排名接口"}), 501 top_raw = (request.args.get("top") or "").strip() top_n = 20 if top_raw.isdigit(): top_n = int(top_raw) try: result = fn(top_n=top_n) if isinstance(result, dict): return jsonify(result) return jsonify({"ok": False, "msg": "成交量排名返回格式无效"}), 500 except Exception as e: return jsonify({"ok": False, "msg": str(e)}), 500 @app.route("/api/hub/ohlcv") @_hub_auth_required def api_hub_ohlcv(): fn = _ctx().get("ohlcv_fn") if not callable(fn): return jsonify({"ok": False, "msg": "该实例未配置 OHLCV 接口"}), 501 symbol = (request.args.get("symbol") or "").strip() timeframe = (request.args.get("timeframe") or "5m").strip() since_raw = (request.args.get("since_ms") or "").strip() limit_raw = (request.args.get("limit") or "").strip() since_ms = None if since_raw.isdigit(): since_ms = int(since_raw) limit = 500 if limit_raw.isdigit(): limit = int(limit_raw) try: result = fn(symbol=symbol, timeframe=timeframe, since_ms=since_ms, limit=limit) if isinstance(result, dict): return jsonify(result) return jsonify({"ok": False, "msg": "OHLCV 返回格式无效"}), 500 except Exception as e: return jsonify({"ok": False, "msg": str(e)}), 500 @app.route("/api/hub/add_order", methods=["POST"]) @_hub_auth_required def api_hub_add_order(): return _hub_json("add_order", "/add_order") @app.route("/api/hub/add_key", methods=["POST"]) @_hub_auth_required def api_hub_add_key(): return _hub_json("add_key", "/add_key") @app.route("/api/hub/trend/preview", methods=["POST"]) @_hub_auth_required def api_hub_trend_preview(): if not _ctx().get("has_trend"): return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 data = _invoke_view("preview_trend_pullback", "/trade") pid = _latest_preview_id() preview = _fetch_preview(pid) if pid else None return jsonify( { "ok": bool(data.get("ok")), "messages": data.get("messages") or [], "preview_id": pid, "preview": preview, } ) @app.route("/api/hub/trend/execute", methods=["POST"]) @_hub_auth_required def api_hub_trend_execute(): if not _ctx().get("has_trend"): return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 pid = (request.form.get("preview_id") or "").strip() if not pid: body = request.get_json(silent=True) or {} pid = str(body.get("preview_id") or "").strip() form = {"preview_id": pid} if pid else {} return jsonify(_invoke_view("execute_trend_pullback", "/trade", form=form)) @app.route("/api/hub/trend/preview/") @_hub_auth_required def api_hub_trend_preview_get(pid): if not _ctx().get("has_trend"): return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 preview = _fetch_preview(pid) if not preview: return jsonify({"ok": False, "msg": "预览不存在或已过期"}), 404 return jsonify({"ok": True, "preview": preview}) @app.route("/api/hub/trend/stop/", methods=["POST"]) @_hub_auth_required def api_hub_trend_stop(pid): if not _ctx().get("has_trend"): return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 return jsonify(_invoke_view_get("stop_trend_pullback", f"/stop_trend_pullback/{pid}")) @app.route("/api/hub/order/sync-flat", methods=["POST"]) @_hub_auth_required def api_hub_order_sync_flat(): """中控市价全平后:同步 order_monitors 并读 Gate 平仓历史写交易记录。""" fn = _ctx().get("reconcile_hub_flat_fn") if not callable(fn): return jsonify({"ok": False, "msg": "该实例未配置 order sync-flat"}), 400 body = request.get_json(silent=True) or {} symbol = (body.get("symbol") or request.form.get("symbol") or "").strip() side = ( body.get("side") or body.get("direction") or request.form.get("side") or "" ).strip().lower() if not symbol: return jsonify({"ok": False, "msg": "symbol 不能为空"}), 400 if side not in ("long", "short"): return jsonify({"ok": False, "msg": "side 须为 long 或 short"}), 400 get_db = _ctx().get("get_db") if not callable(get_db): return jsonify({"ok": False, "msg": "HUB_CTX 缺少 get_db"}), 500 conn = get_db() try: out = fn(conn, symbol, side) if not isinstance(out, dict): out = {"ok": True, "synced": int(out or 0)} conn.commit() return jsonify(out) except Exception as e: return jsonify({"ok": False, "msg": str(e)}), 500 finally: conn.close() @app.route("/api/hub/trend/sync-flat", methods=["POST"]) @_hub_auth_required def api_hub_trend_sync_flat(): """中控市价全平后:结束仍 active 的同币种同向趋势计划。""" if not _ctx().get("has_trend"): return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 body = request.get_json(silent=True) or {} symbol = (body.get("symbol") or request.form.get("symbol") or "").strip() side = ( body.get("side") or body.get("direction") or request.form.get("side") or "" ).strip().lower() if not symbol: return jsonify({"ok": False, "msg": "symbol 不能为空"}), 400 if side not in ("long", "short"): return jsonify({"ok": False, "msg": "side 须为 long 或 short"}), 400 cfg = current_app.extensions.get("strategy_trend_cfg") get_db = _ctx().get("get_db") if not cfg or not callable(get_db): return jsonify({"ok": False, "msg": "趋势配置未就绪"}), 500 from strategy_trend_register import sync_trend_plans_after_external_close conn = get_db() try: return jsonify(sync_trend_plans_after_external_close(cfg, conn, symbol, side)) except Exception as e: return jsonify({"ok": False, "msg": str(e)}), 500 finally: conn.close() @app.route("/api/hub/trend/breakeven/", methods=["POST"]) @_hub_auth_required def api_hub_trend_breakeven(pid): if not _ctx().get("has_trend"): return jsonify({"ok": False, "msg": "该实例无趋势回调"}), 400 body = request.get_json(silent=True) or {} raw = (request.form.get("breakeven_offset_pct") or body.get("breakeven_offset_pct") or "").strip() form = {} if raw != "": form["breakeven_offset_pct"] = raw return jsonify( _invoke_view( "trend_pullback_breakeven", f"/trend_pullback_breakeven/{pid}", form=form, ) ) @app.route("/hub-sso") def hub_sso_login(): """中控签发的临时链接:写入 session 后跳转,直链访问仍走 /login。""" from urllib.parse import urlencode auth_disabled = bool(current_app.config.get("HUB_AUTH_DISABLED")) next_arg = request.args.get("next") if auth_disabled: session["logged_in"] = True return redirect(safe_next_path(next_arg)) ex = str((_ctx().get("exchange") or "")).strip().lower() token = (request.args.get("token") or "").strip() ok, next_path, err = verify_hub_sso_token(token, ex) if ok: if _sso_wants_embed_auth() and request.is_secure: boot = mint_hub_embed_bootstrap(ex, next_path) if boot: from urllib.parse import urlencode as _ue qdict = {"t": boot, "next": next_path, "embed": "1"} ht0 = (request.args.get("hub_theme") or "").strip().lower() if ht0 in ("light", "dark"): qdict["hub_theme"] = ht0 return redirect(f"/hub-embed-auth?{_ue(qdict)}") session["logged_in"] = True session.modified = True dest = next_path if request.args.get("embed", "").strip().lower() in ("1", "true", "yes", "on"): dest = _merge_query_into_path(dest, embed="1") ht = (request.args.get("hub_theme") or "").strip().lower() if ht in ("light", "dark"): dest = _merge_query_into_path(dest, hub_theme=ht) return redirect(dest) hint = err or "校验失败" flash( f"中控 SSO 未生效({hint})。" "请确认中控与实例 .env 中 HUB_BRIDGE_TOKEN 一致," f"且中控设置里该账户 key 为「{ex}」。" "经本地导航 iframe 打开时,实例须 HTTPS 且可设 APP_COOKIE_SECURE=true。" ) return redirect("/login") @app.route("/hub-embed-auth") def hub_embed_auth_login(): """LocalNav 等 iframe 内嵌:单独写入 SameSite=None 会话后跳转。""" auth_disabled = bool(current_app.config.get("HUB_AUTH_DISABLED")) next_arg = request.args.get("next") if auth_disabled: session["logged_in"] = True return redirect(safe_next_path(next_arg)) ex = str((_ctx().get("exchange") or "")).strip().lower() boot = (request.args.get("t") or "").strip() ok, next_path, err = verify_hub_embed_bootstrap(boot, ex) if ok: session["logged_in"] = True session.modified = True dest = next_path if request.args.get("embed", "").strip().lower() in ("1", "true", "yes", "on"): dest = _merge_query_into_path(dest, embed="1") ht = (request.args.get("hub_theme") or "").strip().lower() if ht in ("light", "dark"): dest = _merge_query_into_path(dest, hub_theme=ht) return redirect(dest) hint = err or "校验失败" flash(f"iframe 登录未生效({hint})。可点本地导航工具栏「实例免密」重试。") return redirect("/login") def _latest_preview_id(): get_db = _ctx().get("get_db") if not get_db: return None conn = get_db() row = conn.execute( "SELECT id FROM trend_pullback_previews ORDER BY created_at DESC LIMIT 1" ).fetchone() conn.close() return row["id"] if row else None def _fetch_preview(pid): get_db = _ctx().get("get_db") if not get_db or not pid: return None conn = get_db() row = conn.execute( "SELECT * FROM trend_pullback_previews WHERE id=?", (pid,) ).fetchone() conn.close() if not row: return None d = _row_to_dict(row) now_ms = int(time.time() * 1000) d["expires_in_sec"] = max(0, int((int(d.get("expires_at_ms") or 0) - now_ms) / 1000)) try: from strategy_trend_lib import build_trend_preview_level_rows enriched, level_rows = build_trend_preview_level_rows(d) for key in ( "preview_target_rr", "preview_first_take_profit", "preview_unified_stop_loss", "preview_risk_amount_u", "preview_first_profit_u", "preview_take_profit_price", ): if key in enriched: d[key] = enriched[key] d["preview_level_rows"] = level_rows d["grid_levels"] = [ { "i": row.get("i"), "label": row.get("label"), "price": row.get("price"), "contracts": row.get("contracts"), "cum_contracts": row.get("cum_contracts"), "avg_entry": row.get("avg_entry"), "take_profit_price": row.get("take_profit_price"), "profit_u": row.get("profit_u"), "risk_u": row.get("risk_u"), "rr": row.get("rr"), "stop_loss_price": row.get("stop_loss_price"), "take_profit": row.get("profit_u"), "stop_loss": row.get("risk_u"), } for row in level_rows ] except Exception: d["grid_levels"] = [] d["preview_level_rows"] = [] return d