""" 子账户极轻代理:GET /status、挂单/条件单查询与撤销、POST /emergency/close-all、POST /emergency/close-position,仅监听 127.0.0.1。 与仓库内四个策略/监控目录一一对应时,典型用法(各目录自己的 .env 里已有密钥;子代理用环境变量 PORT,勿与 Flask 的 APP_PORT 相同): EXCHANGE=binance → crypto_monitor_binance(BINANCE_*) EXCHANGE=okx → crypto_monitor_okx(OKX_*) EXCHANGE=gate → crypto_monitor_gate / crypto_monitor_gate_bot(GATE_*) 环境变量: EXCHANGE binance(默认)| okx | gate PORT 默认 15200(与 crypto_monitor_* 的 Flask APP_PORT 错开;中控默认聚合 15200–15203) HOST 默认 127.0.0.1 CONTROL_TOKEN 可选;请求头 X-Control-Token Binance:BINANCE_API_KEY / BINANCE_API_SECRET;余额为 **U 本位永续合约账户** USDT(与 `crypto_monitor_binance` 的合约口径一致,非现货钱包);BINANCE_POSITION_MODE;BINANCE_MARGIN_MODE OKX:OKX_API_KEY / OKX_API_SECRET / OKX_API_PASSPHRASE;OKX_TD_MODE;OKX_POS_MODE Gate:GATE_API_KEY / GATE_API_SECRET;GATE_TD_MODE;GATE_POS_MODE 代理与主项目一致时可设:BINANCE_SOCKS_PROXY / OKX_SOCKS_PROXY / GATE_SOCKS_PROXY(或 HTTP(S)_PROXY)。 """ from __future__ import annotations import math import os import time from typing import Any import ccxt from fastapi import FastAPI, Header, HTTPException, Request from fastapi.responses import JSONResponse from pydantic import BaseModel from exchange_orders import ( attach_orders_to_positions, cancel_order as hub_cancel_order, cancel_orders_for_symbol, list_open_orders, replace_position_tpsl, symbols_match, ) HOST = os.getenv("HOST", "127.0.0.1") PORT = int(os.getenv("PORT", "15200")) CONTROL_TOKEN = (os.getenv("CONTROL_TOKEN") or "").strip() _raw_ex = (os.getenv("EXCHANGE") or "binance").strip().lower() if _raw_ex in ("binance", "bnb", "ba"): EXCHANGE_KIND = "binance" elif _raw_ex in ("okx", "okex"): EXCHANGE_KIND = "okx" elif _raw_ex in ("gate", "gateio"): EXCHANGE_KIND = "gate" else: EXCHANGE_KIND = "binance" # —— Binance —— _bin_pos = (os.getenv("BINANCE_POSITION_MODE") or "hedge").strip().lower() BINANCE_POSITION_MODE = "hedge" if _bin_pos in ("hedge", "dual", "double", "hedged") else "oneway" _bin_margin = (os.getenv("BINANCE_MARGIN_MODE") or "cross").strip().lower() BINANCE_DEFAULT_MARGIN_MODE = "cross" if _bin_margin in ("cross", "cross_margin") else "isolated" # —— OKX —— OKX_TD_MODE = (os.getenv("OKX_TD_MODE") or "cross").strip() _okx_pos = (os.getenv("OKX_POS_MODE") or "hedge").strip().lower() OKX_POS_MODE = "hedge" if _okx_pos in ("hedge", "long_short_mode", "dual") else "net" # —— Gate —— _gate_td = (os.getenv("GATE_TD_MODE") or "cross").strip().lower() GATE_DEFAULT_MARGIN_MODE = "cross" if _gate_td in ("cross", "cross_margin") else "isolated" _gate_pos = (os.getenv("GATE_POS_MODE") or "hedge").strip().lower() GATE_POS_MODE = "hedge" if _gate_pos in ("hedge", "dual", "double") else "single" app = FastAPI(title="sub-agent", docs_url=None, redoc_url=None) _ccxt_ex: Any = None _markets_loaded = False def _socks_proxy_url(prefix: str) -> str: return (os.getenv(f"{prefix}_SOCKS_PROXY") or "").strip() def _http_https_proxy(prefix: str) -> dict[str, str] | None: http = (os.getenv(f"{prefix}_HTTP_PROXY") or "").strip() https = (os.getenv(f"{prefix}_HTTPS_PROXY") or "").strip() socks = _socks_proxy_url(prefix) if socks: return {"http": socks, "https": socks} if http or https: return {"http": http, "https": https} return None def _attach_proxies(ex: Any, prefix: str) -> None: p = _http_https_proxy(prefix) if p: ex.proxies = p def _make_exchange() -> Any: if EXCHANGE_KIND == "binance": key = (os.getenv("BINANCE_API_KEY") or "").strip() secret = (os.getenv("BINANCE_API_SECRET") or "").strip() if not key or not secret: raise RuntimeError("缺少 BINANCE_API_KEY / BINANCE_API_SECRET") ex = ccxt.binance( { "apiKey": key, "secret": secret, "enableRateLimit": True, "options": { "defaultType": "swap", # ccxt 默认 fetch_balance 走现货;与监控项目一致,固定为 U 本位合约钱包 "fetchBalance": {"defaultType": "swap"}, "defaultMarginMode": BINANCE_DEFAULT_MARGIN_MODE, "adjustForTimeDifference": True, }, } ) _attach_proxies(ex, "BINANCE") return ex if EXCHANGE_KIND == "okx": key = (os.getenv("OKX_API_KEY") or "").strip() secret = (os.getenv("OKX_API_SECRET") or "").strip() password = (os.getenv("OKX_API_PASSPHRASE") or "").strip() if not key or not secret or not password: raise RuntimeError("缺少 OKX_API_KEY / OKX_API_SECRET / OKX_API_PASSPHRASE") ex = ccxt.okx( { "apiKey": key, "secret": secret, "password": password, "enableRateLimit": True, "options": {"defaultType": "swap"}, } ) _attach_proxies(ex, "OKX") return ex # gate key = (os.getenv("GATE_API_KEY") or "").strip() secret = (os.getenv("GATE_API_SECRET") or "").strip() if not key or not secret: raise RuntimeError("缺少 GATE_API_KEY / GATE_API_SECRET") ex = ccxt.gateio( { "apiKey": key, "secret": secret, "enableRateLimit": True, "options": { "defaultType": "swap", "defaultMarginMode": GATE_DEFAULT_MARGIN_MODE, }, } ) _attach_proxies(ex, "GATE") return ex def get_exchange() -> Any: global _ccxt_ex if _ccxt_ex is None: _ccxt_ex = _make_exchange() return _ccxt_ex def _ensure_markets() -> None: global _markets_loaded if not _markets_loaded: get_exchange().load_markets() _markets_loaded = True def _check_token(x_control_token: str | None) -> None: if not CONTROL_TOKEN: return if (x_control_token or "").strip() != CONTROL_TOKEN: raise HTTPException(status_code=401, detail="invalid token") def _position_mode_label() -> str: if EXCHANGE_KIND == "binance": return BINANCE_POSITION_MODE if EXCHANGE_KIND == "okx": return OKX_POS_MODE return GATE_POS_MODE def _close_param_candidates_binance(direction: str) -> list[dict[str, Any]]: ps = "LONG" if direction == "long" else "SHORT" hedge_ro = {"positionSide": ps, "reduceOnly": True} hedge_plain = {"positionSide": ps} oneway_ro = {"reduceOnly": True} oneway_plain: dict[str, Any] = {} if BINANCE_POSITION_MODE == "hedge": return [hedge_ro, hedge_plain, oneway_ro, oneway_plain] return [oneway_ro, oneway_plain, hedge_ro, hedge_plain] def _close_param_candidates_okx(direction: str) -> list[dict[str, Any]]: base: dict[str, Any] = {"tdMode": OKX_TD_MODE} out: list[dict[str, Any]] = [] if OKX_POS_MODE == "hedge": ps = "long" if direction == "long" else "short" out.extend( [ {**base, "posSide": ps, "reduceOnly": True}, {**base, "posSide": ps}, ] ) out.extend([{**base, "reduceOnly": True}, dict(base)]) return out def _close_param_candidates_gate(_direction: str) -> list[dict[str, Any]]: return [{"reduceOnly": True}, {}] def _close_param_candidates(direction: str) -> list[dict[str, Any]]: if EXCHANGE_KIND == "binance": return _close_param_candidates_binance(direction) if EXCHANGE_KIND == "okx": return _close_param_candidates_okx(direction) return _close_param_candidates_gate(direction) def _retryable_close_err(msg: str) -> bool: s = (msg or "").lower() if "-4061" in s: return True if "-1106" in s and "reduceonly" in s: return True if "reduceonly" in s or "reduce only" in s: return True if "position side" in s or "positionside" in s or "pos side" in s: return True if "dual side" in s or "position mode" in s: return True return False def _position_contracts(p: dict[str, Any]) -> float: raw = p.get("contracts") if raw is not None: try: return float(raw) except (TypeError, ValueError): pass info = p.get("info") or {} for k in ("positionAmt", "positionamt", "pos", "size"): if k in info: try: v = float(info[k]) if v != 0: return v except (TypeError, ValueError): pass return 0.0 def _position_side(p: dict[str, Any], contracts: float) -> str: s = (p.get("side") or "").lower() if s in ("long", "short"): return s if contracts > 0: return "long" if contracts < 0: return "short" return "long" def _cancel_symbol_orders(ex: Any, sym: str) -> None: try: ex.cancel_all_orders(sym, params={}) except Exception: pass if EXCHANGE_KIND != "binance": return try: m = ex.market(sym) cid = m.get("id") if cid and hasattr(ex, "fapiPrivateDeleteAlgoOpenOrders"): ex.fapiPrivateDeleteAlgoOpenOrders({"symbol": cid}) except Exception: pass class EmergencyClosePositionBody(BaseModel): symbol: str side: str class CancelOrderBody(BaseModel): symbol: str order_id: str channel: str = "regular" class CancelSymbolOrdersBody(BaseModel): symbol: str scope: str = "all" # all | conditional | limit class PlaceTpslBody(BaseModel): symbol: str side: str # long | short stop_loss: float take_profit: float contracts: float | None = None def _close_position_market( ex: Any, sym: str, side: str, contracts: float ) -> tuple[dict[str, Any] | None, str | None]: """市价平掉指定合约、方向;返回 (closed_info, error_message)。""" side_n = (side or "").strip().lower() if side_n not in ("long", "short"): return None, f"无效方向: {side}" close_side = "sell" if side_n == "long" else "buy" direction = side_n try: amt = float(ex.amount_to_precision(sym, abs(float(contracts)))) except Exception: amt = abs(float(contracts)) if amt <= 0: return None, f"{sym}: 可平张数为 0" order_resp = None last_err: Exception | None = None for params in _close_param_candidates(direction): try: order_resp = ex.create_order(sym, "market", close_side, amt, None, params) last_err = None break except Exception as e: last_err = e if _retryable_close_err(str(e)): continue return None, f"{sym}: {e}" if order_resp is None: return None, f"{sym}: {last_err or '下单失败'}" _cancel_symbol_orders(ex, sym) return ( {"symbol": sym, "side": side_n, "amount": amt, "order_id": order_resp.get("id")}, None, ) def _is_local(host: str | None) -> bool: if not host: return False h = host.lower() return h in ("127.0.0.1", "::1", "localhost") or h.startswith("::ffff:127.0.0.1") def _finite_or_none(x: Any) -> float | None: try: f = float(x) return f if math.isfinite(f) else None except (TypeError, ValueError): return None def _extract_usdt_total(balance: dict[str, Any]) -> float | None: """从 ccxt balance 结构中尽量取出 USDT 总额(与 crypto_monitor_binance 一致)。""" usdt_info = balance.get("USDT") or {} if not isinstance(usdt_info, dict): usdt_info = {} total_map = balance.get("total") or {} if not isinstance(total_map, dict): total_map = {} free_map = balance.get("free") or {} if not isinstance(free_map, dict): free_map = {} total = usdt_info.get("total") if total is None: total = usdt_info.get("equity") if total is None: total = total_map.get("USDT") if total is None: total = usdt_info.get("free") if total is None: total = free_map.get("USDT") try: return float(total) if total is not None else None except (TypeError, ValueError): return None def _binance_futures_usdt_asset_row(balance: Any) -> dict[str, Any] | None: """U 本位合约 fetch_balance(type=swap) 的 info.assets 中 USDT 一行(与币安合约后台口径一致)。""" if not isinstance(balance, dict): return None info = balance.get("info") if not isinstance(info, dict): return None assets = info.get("assets") if not isinstance(assets, list): return None for a in assets: if isinstance(a, dict) and str(a.get("asset") or "").upper() == "USDT": return a return None def _binance_swap_usdt_total(ex: Any) -> float | None: """仅 U 本位永续合约账户 USDT(显式 type=swap,不用现货余额)。""" try: bal = ex.fetch_balance({"type": "swap"}) except Exception: return None row = _binance_futures_usdt_asset_row(bal) if row: for k in ("marginBalance", "walletBalance", "crossWalletBalance", "balance"): x = row.get(k) if x is not None and str(x).strip() != "": try: fv = float(x) if fv >= 0: return fv except (TypeError, ValueError): pass v = _extract_usdt_total(bal) return float(v) if v is not None else None @app.middleware("http") async def local_only(request: Request, call_next): if request.client and not _is_local(request.client.host): return JSONResponse({"detail": "forbidden"}, status_code=403) return await call_next(request) @app.get("/health") def health(): return {"ok": True, "exchange": EXCHANGE_KIND} @app.get("/status") def status(x_control_token: str | None = Header(default=None, alias="X-Control-Token")): try: return _status_inner(x_control_token) except HTTPException: raise except Exception as e: return JSONResponse( { "ok": False, "error": f"status: {e}", "exchange": EXCHANGE_KIND, "balance_usdt": None, "positions": [], "total_unrealized_pnl": None, }, status_code=200, ) def _status_inner(x_control_token: str | None) -> Any: _check_token(x_control_token) try: ex = get_exchange() except RuntimeError as e: return JSONResponse( { "ok": False, "error": str(e), "exchange": EXCHANGE_KIND, "balance_usdt": None, "positions": [], "total_unrealized_pnl": None, }, status_code=200, ) try: _ensure_markets() except Exception as e: return JSONResponse( { "ok": False, "error": f"load_markets: {e}", "exchange": EXCHANGE_KIND, "balance_usdt": None, "positions": [], "total_unrealized_pnl": None, }, status_code=200, ) balance_usdt: float | None = None try: if EXCHANGE_KIND == "binance": balance_usdt = _binance_swap_usdt_total(ex) else: bal = ex.fetch_balance() u = bal.get("USDT") or {} if isinstance(u, dict) and u.get("total") is not None: balance_usdt = _finite_or_none(u["total"]) except Exception: pass positions_out: list[dict[str, Any]] = [] total_upnl = 0.0 try: raw = ex.fetch_positions() or [] except Exception as e: return JSONResponse( { "ok": False, "error": str(e), "exchange": EXCHANGE_KIND, "balance_usdt": balance_usdt, "positions": [], "total_unrealized_pnl": None, }, status_code=200, ) for p in raw: if not isinstance(p, dict): continue c = _position_contracts(p) if abs(c) < 1e-12: continue sym = p.get("symbol") or "" side = _position_side(p, c) upnl = p.get("unrealizedPnl") try: upnl_f = float(upnl) if upnl is not None else 0.0 except (TypeError, ValueError): upnl_f = 0.0 total_upnl += upnl_f notional = p.get("notional") try: notional_f = float(notional) if notional is not None else None except (TypeError, ValueError): notional_f = None entry = p.get("entryPrice") try: entry_f = float(entry) if entry is not None else None except (TypeError, ValueError): entry_f = None positions_out.append( { "symbol": sym, "side": side, "contracts": abs(c), "contracts_signed": c, "notional_usdt": _finite_or_none(notional_f) if notional_f is not None else None, "unrealized_pnl": _finite_or_none(upnl_f), "entry_price": _finite_or_none(entry_f) if entry_f is not None else None, } ) orders_fetch_error: str | None = None try: attach_orders_to_positions( positions_out, list_open_orders(ex, EXCHANGE_KIND, None), ) except Exception as e: orders_fetch_error = str(e) for p in positions_out: p.setdefault("conditional_orders", []) p.setdefault("regular_orders", []) try: pm = _position_mode_label() except Exception: pm = EXCHANGE_KIND out = { "ok": True, "exchange": EXCHANGE_KIND, "balance_usdt": balance_usdt, "positions": positions_out, "total_unrealized_pnl": _finite_or_none(total_upnl), "position_mode": pm, } if orders_fetch_error: out["orders_fetch_error"] = orders_fetch_error return out @app.get("/open-orders") def open_orders( symbol: str = "", x_control_token: str | None = Header(default=None, alias="X-Control-Token"), ): _check_token(x_control_token) try: ex = get_exchange() _ensure_markets() sym = (symbol or "").strip() or None orders = list_open_orders(ex, EXCHANGE_KIND, sym) return {"ok": True, "exchange": EXCHANGE_KIND, "symbol": sym, "orders": orders} except Exception as e: return JSONResponse( {"ok": False, "error": str(e), "exchange": EXCHANGE_KIND, "orders": []}, status_code=200, ) @app.post("/orders/cancel") def cancel_one_order( body: CancelOrderBody, x_control_token: str | None = Header(default=None, alias="X-Control-Token"), ): _check_token(x_control_token) sym = (body.symbol or "").strip() oid = (body.order_id or "").strip() if not sym or not oid: raise HTTPException(status_code=400, detail="symbol 与 order_id 必填") try: ex = get_exchange() _ensure_markets() hub_cancel_order(ex, EXCHANGE_KIND, sym, oid, body.channel or "regular") return {"ok": True, "exchange": EXCHANGE_KIND, "cancelled": {"symbol": sym, "order_id": oid}} except Exception as e: return JSONResponse( {"ok": False, "error": str(e), "exchange": EXCHANGE_KIND}, status_code=200, ) @app.post("/orders/cancel-symbol") def cancel_symbol_orders( body: CancelSymbolOrdersBody, x_control_token: str | None = Header(default=None, alias="X-Control-Token"), ): _check_token(x_control_token) sym = (body.symbol or "").strip() if not sym: raise HTTPException(status_code=400, detail="symbol 必填") scope = (body.scope or "all").strip().lower() if scope not in ("all", "conditional", "limit"): raise HTTPException(status_code=400, detail="scope 须为 all / conditional / limit") try: ex = get_exchange() _ensure_markets() n = cancel_orders_for_symbol(ex, EXCHANGE_KIND, sym, scope=scope) return {"ok": True, "exchange": EXCHANGE_KIND, "cancelled_count": n, "scope": scope} except Exception as e: return JSONResponse( {"ok": False, "error": str(e), "exchange": EXCHANGE_KIND, "cancelled_count": 0}, status_code=200, ) @app.post("/orders/place-tpsl") def place_tpsl_orders( body: PlaceTpslBody, x_control_token: str | None = Header(default=None, alias="X-Control-Token"), ): """先撤该合约全部条件单,再挂止盈+止损(与四实例策略逻辑一致)。""" _check_token(x_control_token) sym = (body.symbol or "").strip() side = (body.side or "").strip().lower() if not sym or side not in ("long", "short"): raise HTTPException(status_code=400, detail="symbol 与 side(long/short) 必填") try: sl = float(body.stop_loss) tp = float(body.take_profit) except (TypeError, ValueError) as e: raise HTTPException(status_code=400, detail="stop_loss / take_profit 须为数字") from e try: ex = get_exchange() _ensure_markets() amt = body.contracts if amt is None or float(amt) <= 0: raw = ex.fetch_positions() or [] found = None for p in raw: psym = p.get("symbol") or "" if not symbols_match(sym, psym): continue c = abs(float(p.get("contracts") or 0)) if c <= 0: continue ps = (p.get("side") or "").lower() if ps and ps != side: continue found = c break if found is None: return JSONResponse( {"ok": False, "error": f"未找到持仓 {sym} {side}", "exchange": EXCHANGE_KIND}, status_code=200, ) amt = found info = replace_position_tpsl(ex, EXCHANGE_KIND, sym, side, float(amt), sl, tp) return {"ok": True, "exchange": EXCHANGE_KIND, "placed": info} except HTTPException: raise except Exception as e: return JSONResponse( {"ok": False, "error": str(e), "exchange": EXCHANGE_KIND}, status_code=200, ) @app.post("/emergency/close-all") def emergency_close_all(x_control_token: str | None = Header(default=None, alias="X-Control-Token")): _check_token(x_control_token) try: ex = get_exchange() except RuntimeError as e: raise HTTPException(status_code=503, detail=str(e)) from e try: _ensure_markets() except Exception as e: return JSONResponse( {"ok": False, "error": f"load_markets: {e}", "closed": [], "errors": [str(e)], "exchange": EXCHANGE_KIND}, status_code=200, ) errors: list[str] = [] closed: list[dict[str, Any]] = [] try: raw = ex.fetch_positions() or [] except Exception as e: raise HTTPException(status_code=502, detail=f"fetch_positions: {e}") from e for p in raw: if not isinstance(p, dict): continue c = _position_contracts(p) if abs(c) < 1e-12: continue sym = p.get("symbol") if not sym: continue side = _position_side(p, c) info, err = _close_position_market(ex, sym, side, abs(c)) if err: errors.append(err) elif info: closed.append(info) time.sleep(0.05) return {"ok": len(errors) == 0, "closed": closed, "errors": errors, "exchange": EXCHANGE_KIND} @app.post("/emergency/close-position") def emergency_close_position( body: EmergencyClosePositionBody, x_control_token: str | None = Header(default=None, alias="X-Control-Token"), ): _check_token(x_control_token) sym = (body.symbol or "").strip() want_side = (body.side or "").strip().lower() if not sym: raise HTTPException(status_code=400, detail="symbol 不能为空") if want_side not in ("long", "short"): raise HTTPException(status_code=400, detail="side 须为 long 或 short") try: ex = get_exchange() except RuntimeError as e: raise HTTPException(status_code=503, detail=str(e)) from e try: _ensure_markets() except Exception as e: return JSONResponse( { "ok": False, "error": f"load_markets: {e}", "closed": None, "exchange": EXCHANGE_KIND, }, status_code=200, ) try: raw = ex.fetch_positions() or [] except Exception as e: raise HTTPException(status_code=502, detail=f"fetch_positions: {e}") from e matched = None for p in raw: if not isinstance(p, dict): continue if (p.get("symbol") or "").strip() != sym: continue c = _position_contracts(p) if abs(c) < 1e-12: continue side = _position_side(p, c) if side != want_side: continue matched = (sym, side, abs(c)) break if not matched: return JSONResponse( { "ok": False, "error": f"未找到持仓: {sym} {want_side}", "closed": None, "exchange": EXCHANGE_KIND, }, status_code=200, ) sym, side, c = matched info, err = _close_position_market(ex, sym, side, c) if err: return JSONResponse( {"ok": False, "error": err, "closed": None, "exchange": EXCHANGE_KIND}, status_code=200, ) return {"ok": True, "closed": info, "errors": [], "exchange": EXCHANGE_KIND} def main(): import uvicorn uvicorn.run(app, host=HOST, port=PORT, log_level="warning", access_log=False) if __name__ == "__main__": main()