""" 子账户极轻代理:GET /status、POST /emergency/close-all、POST /emergency/close-position,仅监听 127.0.0.1。 与仓库内四个策略/监控目录一一对应时,典型用法(各目录自己的 .env 里已有密钥;子代理用环境变量 PORT,勿与 Flask 的 APP_PORT 相同): EXCHANGE=binance → crypto_monitor_binance(BINANCE_*) EXCHANGE=okx → crypto_monitor_okx(OKX_*) EXCHANGE=gate → crypto_monitor_gate / crypto_monitor_gate_bot(GATE_*) 环境变量: EXCHANGE binance(默认)| okx | gate PORT 默认 15200(与 crypto_monitor_* 的 Flask APP_PORT 错开;中控默认聚合 15200–15203) HOST 默认 127.0.0.1 CONTROL_TOKEN 可选;请求头 X-Control-Token Binance:BINANCE_API_KEY / BINANCE_API_SECRET;余额为 **U 本位永续合约账户** USDT(与 `crypto_monitor_binance` 的合约口径一致,非现货钱包);BINANCE_POSITION_MODE;BINANCE_MARGIN_MODE OKX:OKX_API_KEY / OKX_API_SECRET / OKX_API_PASSPHRASE;OKX_TD_MODE;OKX_POS_MODE Gate:GATE_API_KEY / GATE_API_SECRET;GATE_TD_MODE;GATE_POS_MODE 代理与主项目一致时可设:BINANCE_SOCKS_PROXY / OKX_SOCKS_PROXY / GATE_SOCKS_PROXY(或 HTTP(S)_PROXY)。 """ from __future__ import annotations import math import os import time from typing import Any import ccxt from fastapi import FastAPI, Header, HTTPException, Request from fastapi.responses import JSONResponse from pydantic import BaseModel HOST = os.getenv("HOST", "127.0.0.1") PORT = int(os.getenv("PORT", "15200")) CONTROL_TOKEN = (os.getenv("CONTROL_TOKEN") or "").strip() _raw_ex = (os.getenv("EXCHANGE") or "binance").strip().lower() if _raw_ex in ("binance", "bnb", "ba"): EXCHANGE_KIND = "binance" elif _raw_ex in ("okx", "okex"): EXCHANGE_KIND = "okx" elif _raw_ex in ("gate", "gateio"): EXCHANGE_KIND = "gate" else: EXCHANGE_KIND = "binance" # —— Binance —— _bin_pos = (os.getenv("BINANCE_POSITION_MODE") or "hedge").strip().lower() BINANCE_POSITION_MODE = "hedge" if _bin_pos in ("hedge", "dual", "double", "hedged") else "oneway" _bin_margin = (os.getenv("BINANCE_MARGIN_MODE") or "cross").strip().lower() BINANCE_DEFAULT_MARGIN_MODE = "cross" if _bin_margin in ("cross", "cross_margin") else "isolated" # —— OKX —— OKX_TD_MODE = (os.getenv("OKX_TD_MODE") or "cross").strip() _okx_pos = (os.getenv("OKX_POS_MODE") or "hedge").strip().lower() OKX_POS_MODE = "hedge" if _okx_pos in ("hedge", "long_short_mode", "dual") else "net" # —— Gate —— _gate_td = (os.getenv("GATE_TD_MODE") or "cross").strip().lower() GATE_DEFAULT_MARGIN_MODE = "cross" if _gate_td in ("cross", "cross_margin") else "isolated" _gate_pos = (os.getenv("GATE_POS_MODE") or "hedge").strip().lower() GATE_POS_MODE = "hedge" if _gate_pos in ("hedge", "dual", "double") else "single" app = FastAPI(title="sub-agent", docs_url=None, redoc_url=None) _ccxt_ex: Any = None _markets_loaded = False def _socks_proxy_url(prefix: str) -> str: return (os.getenv(f"{prefix}_SOCKS_PROXY") or "").strip() def _http_https_proxy(prefix: str) -> dict[str, str] | None: http = (os.getenv(f"{prefix}_HTTP_PROXY") or "").strip() https = (os.getenv(f"{prefix}_HTTPS_PROXY") or "").strip() socks = _socks_proxy_url(prefix) if socks: return {"http": socks, "https": socks} if http or https: return {"http": http, "https": https} return None def _attach_proxies(ex: Any, prefix: str) -> None: p = _http_https_proxy(prefix) if p: ex.proxies = p def _make_exchange() -> Any: if EXCHANGE_KIND == "binance": key = (os.getenv("BINANCE_API_KEY") or "").strip() secret = (os.getenv("BINANCE_API_SECRET") or "").strip() if not key or not secret: raise RuntimeError("缺少 BINANCE_API_KEY / BINANCE_API_SECRET") ex = ccxt.binance( { "apiKey": key, "secret": secret, "enableRateLimit": True, "options": { "defaultType": "swap", # ccxt 默认 fetch_balance 走现货;与监控项目一致,固定为 U 本位合约钱包 "fetchBalance": {"defaultType": "swap"}, "defaultMarginMode": BINANCE_DEFAULT_MARGIN_MODE, "adjustForTimeDifference": True, }, } ) _attach_proxies(ex, "BINANCE") return ex if EXCHANGE_KIND == "okx": key = (os.getenv("OKX_API_KEY") or "").strip() secret = (os.getenv("OKX_API_SECRET") or "").strip() password = (os.getenv("OKX_API_PASSPHRASE") or "").strip() if not key or not secret or not password: raise RuntimeError("缺少 OKX_API_KEY / OKX_API_SECRET / OKX_API_PASSPHRASE") ex = ccxt.okx( { "apiKey": key, "secret": secret, "password": password, "enableRateLimit": True, "options": {"defaultType": "swap"}, } ) _attach_proxies(ex, "OKX") return ex # gate key = (os.getenv("GATE_API_KEY") or "").strip() secret = (os.getenv("GATE_API_SECRET") or "").strip() if not key or not secret: raise RuntimeError("缺少 GATE_API_KEY / GATE_API_SECRET") ex = ccxt.gateio( { "apiKey": key, "secret": secret, "enableRateLimit": True, "options": { "defaultType": "swap", "defaultMarginMode": GATE_DEFAULT_MARGIN_MODE, }, } ) _attach_proxies(ex, "GATE") return ex def get_exchange() -> Any: global _ccxt_ex if _ccxt_ex is None: _ccxt_ex = _make_exchange() return _ccxt_ex def _ensure_markets() -> None: global _markets_loaded if not _markets_loaded: get_exchange().load_markets() _markets_loaded = True def _check_token(x_control_token: str | None) -> None: if not CONTROL_TOKEN: return if (x_control_token or "").strip() != CONTROL_TOKEN: raise HTTPException(status_code=401, detail="invalid token") def _position_mode_label() -> str: if EXCHANGE_KIND == "binance": return BINANCE_POSITION_MODE if EXCHANGE_KIND == "okx": return OKX_POS_MODE return GATE_POS_MODE def _close_param_candidates_binance(direction: str) -> list[dict[str, Any]]: ps = "LONG" if direction == "long" else "SHORT" hedge_ro = {"positionSide": ps, "reduceOnly": True} hedge_plain = {"positionSide": ps} oneway_ro = {"reduceOnly": True} oneway_plain: dict[str, Any] = {} if BINANCE_POSITION_MODE == "hedge": return [hedge_ro, hedge_plain, oneway_ro, oneway_plain] return [oneway_ro, oneway_plain, hedge_ro, hedge_plain] def _close_param_candidates_okx(direction: str) -> list[dict[str, Any]]: base: dict[str, Any] = {"tdMode": OKX_TD_MODE} out: list[dict[str, Any]] = [] if OKX_POS_MODE == "hedge": ps = "long" if direction == "long" else "short" out.extend( [ {**base, "posSide": ps, "reduceOnly": True}, {**base, "posSide": ps}, ] ) out.extend([{**base, "reduceOnly": True}, dict(base)]) return out def _close_param_candidates_gate(_direction: str) -> list[dict[str, Any]]: return [{"reduceOnly": True}, {}] def _close_param_candidates(direction: str) -> list[dict[str, Any]]: if EXCHANGE_KIND == "binance": return _close_param_candidates_binance(direction) if EXCHANGE_KIND == "okx": return _close_param_candidates_okx(direction) return _close_param_candidates_gate(direction) def _retryable_close_err(msg: str) -> bool: s = (msg or "").lower() if "-4061" in s: return True if "-1106" in s and "reduceonly" in s: return True if "reduceonly" in s or "reduce only" in s: return True if "position side" in s or "positionside" in s or "pos side" in s: return True if "dual side" in s or "position mode" in s: return True return False def _position_contracts(p: dict[str, Any]) -> float: raw = p.get("contracts") if raw is not None: try: return float(raw) except (TypeError, ValueError): pass info = p.get("info") or {} for k in ("positionAmt", "positionamt", "pos", "size"): if k in info: try: v = float(info[k]) if v != 0: return v except (TypeError, ValueError): pass return 0.0 def _position_side(p: dict[str, Any], contracts: float) -> str: s = (p.get("side") or "").lower() if s in ("long", "short"): return s if contracts > 0: return "long" if contracts < 0: return "short" return "long" def _cancel_symbol_orders(ex: Any, sym: str) -> None: try: ex.cancel_all_orders(sym, params={}) except Exception: pass if EXCHANGE_KIND != "binance": return try: m = ex.market(sym) cid = m.get("id") if cid and hasattr(ex, "fapiPrivateDeleteAlgoOpenOrders"): ex.fapiPrivateDeleteAlgoOpenOrders({"symbol": cid}) except Exception: pass class EmergencyClosePositionBody(BaseModel): symbol: str side: str def _close_position_market( ex: Any, sym: str, side: str, contracts: float ) -> tuple[dict[str, Any] | None, str | None]: """市价平掉指定合约、方向;返回 (closed_info, error_message)。""" side_n = (side or "").strip().lower() if side_n not in ("long", "short"): return None, f"无效方向: {side}" close_side = "sell" if side_n == "long" else "buy" direction = side_n try: amt = float(ex.amount_to_precision(sym, abs(float(contracts)))) except Exception: amt = abs(float(contracts)) if amt <= 0: return None, f"{sym}: 可平张数为 0" order_resp = None last_err: Exception | None = None for params in _close_param_candidates(direction): try: order_resp = ex.create_order(sym, "market", close_side, amt, None, params) last_err = None break except Exception as e: last_err = e if _retryable_close_err(str(e)): continue return None, f"{sym}: {e}" if order_resp is None: return None, f"{sym}: {last_err or '下单失败'}" _cancel_symbol_orders(ex, sym) return ( {"symbol": sym, "side": side_n, "amount": amt, "order_id": order_resp.get("id")}, None, ) def _is_local(host: str | None) -> bool: if not host: return False h = host.lower() return h in ("127.0.0.1", "::1", "localhost") or h.startswith("::ffff:127.0.0.1") def _finite_or_none(x: Any) -> float | None: try: f = float(x) return f if math.isfinite(f) else None except (TypeError, ValueError): return None def _extract_usdt_total(balance: dict[str, Any]) -> float | None: """从 ccxt balance 结构中尽量取出 USDT 总额(与 crypto_monitor_binance 一致)。""" usdt_info = balance.get("USDT") or {} if not isinstance(usdt_info, dict): usdt_info = {} total_map = balance.get("total") or {} if not isinstance(total_map, dict): total_map = {} free_map = balance.get("free") or {} if not isinstance(free_map, dict): free_map = {} total = usdt_info.get("total") if total is None: total = usdt_info.get("equity") if total is None: total = total_map.get("USDT") if total is None: total = usdt_info.get("free") if total is None: total = free_map.get("USDT") try: return float(total) if total is not None else None except (TypeError, ValueError): return None def _binance_futures_usdt_asset_row(balance: Any) -> dict[str, Any] | None: """U 本位合约 fetch_balance(type=swap) 的 info.assets 中 USDT 一行(与币安合约后台口径一致)。""" if not isinstance(balance, dict): return None info = balance.get("info") if not isinstance(info, dict): return None assets = info.get("assets") if not isinstance(assets, list): return None for a in assets: if isinstance(a, dict) and str(a.get("asset") or "").upper() == "USDT": return a return None def _binance_swap_usdt_total(ex: Any) -> float | None: """仅 U 本位永续合约账户 USDT(显式 type=swap,不用现货余额)。""" try: bal = ex.fetch_balance({"type": "swap"}) except Exception: return None row = _binance_futures_usdt_asset_row(bal) if row: for k in ("marginBalance", "walletBalance", "crossWalletBalance", "balance"): x = row.get(k) if x is not None and str(x).strip() != "": try: fv = float(x) if fv >= 0: return fv except (TypeError, ValueError): pass v = _extract_usdt_total(bal) return float(v) if v is not None else None @app.middleware("http") async def local_only(request: Request, call_next): if request.client and not _is_local(request.client.host): return JSONResponse({"detail": "forbidden"}, status_code=403) return await call_next(request) @app.get("/health") def health(): return {"ok": True, "exchange": EXCHANGE_KIND} @app.get("/status") def status(x_control_token: str | None = Header(default=None, alias="X-Control-Token")): try: return _status_inner(x_control_token) except HTTPException: raise except Exception as e: return JSONResponse( { "ok": False, "error": f"status: {e}", "exchange": EXCHANGE_KIND, "balance_usdt": None, "positions": [], "total_unrealized_pnl": None, }, status_code=200, ) def _status_inner(x_control_token: str | None) -> Any: _check_token(x_control_token) try: ex = get_exchange() except RuntimeError as e: return JSONResponse( { "ok": False, "error": str(e), "exchange": EXCHANGE_KIND, "balance_usdt": None, "positions": [], "total_unrealized_pnl": None, }, status_code=200, ) try: _ensure_markets() except Exception as e: return JSONResponse( { "ok": False, "error": f"load_markets: {e}", "exchange": EXCHANGE_KIND, "balance_usdt": None, "positions": [], "total_unrealized_pnl": None, }, status_code=200, ) balance_usdt: float | None = None try: if EXCHANGE_KIND == "binance": balance_usdt = _binance_swap_usdt_total(ex) else: bal = ex.fetch_balance() u = bal.get("USDT") or {} if isinstance(u, dict) and u.get("total") is not None: balance_usdt = _finite_or_none(u["total"]) except Exception: pass positions_out: list[dict[str, Any]] = [] total_upnl = 0.0 try: raw = ex.fetch_positions() or [] except Exception as e: return JSONResponse( { "ok": False, "error": str(e), "exchange": EXCHANGE_KIND, "balance_usdt": balance_usdt, "positions": [], "total_unrealized_pnl": None, }, status_code=200, ) for p in raw: if not isinstance(p, dict): continue c = _position_contracts(p) if abs(c) < 1e-12: continue sym = p.get("symbol") or "" side = _position_side(p, c) upnl = p.get("unrealizedPnl") try: upnl_f = float(upnl) if upnl is not None else 0.0 except (TypeError, ValueError): upnl_f = 0.0 total_upnl += upnl_f notional = p.get("notional") try: notional_f = float(notional) if notional is not None else None except (TypeError, ValueError): notional_f = None entry = p.get("entryPrice") try: entry_f = float(entry) if entry is not None else None except (TypeError, ValueError): entry_f = None positions_out.append( { "symbol": sym, "side": side, "contracts": abs(c), "contracts_signed": c, "notional_usdt": _finite_or_none(notional_f) if notional_f is not None else None, "unrealized_pnl": _finite_or_none(upnl_f), "entry_price": _finite_or_none(entry_f) if entry_f is not None else None, } ) try: pm = _position_mode_label() except Exception: pm = EXCHANGE_KIND return { "ok": True, "exchange": EXCHANGE_KIND, "balance_usdt": balance_usdt, "positions": positions_out, "total_unrealized_pnl": _finite_or_none(total_upnl), "position_mode": pm, } @app.post("/emergency/close-all") def emergency_close_all(x_control_token: str | None = Header(default=None, alias="X-Control-Token")): _check_token(x_control_token) try: ex = get_exchange() except RuntimeError as e: raise HTTPException(status_code=503, detail=str(e)) from e try: _ensure_markets() except Exception as e: return JSONResponse( {"ok": False, "error": f"load_markets: {e}", "closed": [], "errors": [str(e)], "exchange": EXCHANGE_KIND}, status_code=200, ) errors: list[str] = [] closed: list[dict[str, Any]] = [] try: raw = ex.fetch_positions() or [] except Exception as e: raise HTTPException(status_code=502, detail=f"fetch_positions: {e}") from e for p in raw: if not isinstance(p, dict): continue c = _position_contracts(p) if abs(c) < 1e-12: continue sym = p.get("symbol") if not sym: continue side = _position_side(p, c) info, err = _close_position_market(ex, sym, side, abs(c)) if err: errors.append(err) elif info: closed.append(info) time.sleep(0.05) return {"ok": len(errors) == 0, "closed": closed, "errors": errors, "exchange": EXCHANGE_KIND} @app.post("/emergency/close-position") def emergency_close_position( body: EmergencyClosePositionBody, x_control_token: str | None = Header(default=None, alias="X-Control-Token"), ): _check_token(x_control_token) sym = (body.symbol or "").strip() want_side = (body.side or "").strip().lower() if not sym: raise HTTPException(status_code=400, detail="symbol 不能为空") if want_side not in ("long", "short"): raise HTTPException(status_code=400, detail="side 须为 long 或 short") try: ex = get_exchange() except RuntimeError as e: raise HTTPException(status_code=503, detail=str(e)) from e try: _ensure_markets() except Exception as e: return JSONResponse( { "ok": False, "error": f"load_markets: {e}", "closed": None, "exchange": EXCHANGE_KIND, }, status_code=200, ) try: raw = ex.fetch_positions() or [] except Exception as e: raise HTTPException(status_code=502, detail=f"fetch_positions: {e}") from e matched = None for p in raw: if not isinstance(p, dict): continue if (p.get("symbol") or "").strip() != sym: continue c = _position_contracts(p) if abs(c) < 1e-12: continue side = _position_side(p, c) if side != want_side: continue matched = (sym, side, abs(c)) break if not matched: return JSONResponse( { "ok": False, "error": f"未找到持仓: {sym} {want_side}", "closed": None, "exchange": EXCHANGE_KIND, }, status_code=200, ) sym, side, c = matched info, err = _close_position_market(ex, sym, side, c) if err: return JSONResponse( {"ok": False, "error": err, "closed": None, "exchange": EXCHANGE_KIND}, status_code=200, ) return {"ok": True, "closed": info, "errors": [], "exchange": EXCHANGE_KIND} def main(): import uvicorn uvicorn.run(app, host=HOST, port=PORT, log_level="warning", access_log=False) if __name__ == "__main__": main()