Files
2026-05-21 16:44:31 +08:00

569 lines
18 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
子账户极轻代理:仅 GET /status + POST /emergency/close-all,仅监听 127.0.0.1。
与仓库内四个策略/监控目录一一对应时,典型用法(各目录自己的 .env 里已有密钥;子代理用环境变量 PORT,勿与 Flask 的 APP_PORT 相同):
EXCHANGE=binance → crypto_monitor_binanceBINANCE_*
EXCHANGE=okx → crypto_monitor_okxOKX_*
EXCHANGE=gate → crypto_monitor_gate / crypto_monitor_gate_botGATE_*
环境变量:
EXCHANGE binance(默认)| okx | gate
PORT 默认 15200(与 crypto_monitor_* 的 Flask APP_PORT 错开;中控默认聚合 1520015203)
HOST 默认 127.0.0.1
CONTROL_TOKEN 可选;请求头 X-Control-Token
BinanceBINANCE_API_KEY / BINANCE_API_SECRET;余额为 **U 本位永续合约账户** USDT(与 `crypto_monitor_binance` 的合约口径一致,非现货钱包);BINANCE_POSITION_MODEBINANCE_MARGIN_MODE
OKXOKX_API_KEY / OKX_API_SECRET / OKX_API_PASSPHRASEOKX_TD_MODEOKX_POS_MODE
GateGATE_API_KEY / GATE_API_SECRETGATE_TD_MODEGATE_POS_MODE
代理与主项目一致时可设:BINANCE_SOCKS_PROXY / OKX_SOCKS_PROXY / GATE_SOCKS_PROXY(或 HTTP(S)_PROXY)。
"""
from __future__ import annotations
import math
import os
import time
from typing import Any
import ccxt
from fastapi import FastAPI, Header, HTTPException, Request
from fastapi.responses import JSONResponse
HOST = os.getenv("HOST", "127.0.0.1")
PORT = int(os.getenv("PORT", "15200"))
CONTROL_TOKEN = (os.getenv("CONTROL_TOKEN") or "").strip()
_raw_ex = (os.getenv("EXCHANGE") or "binance").strip().lower()
if _raw_ex in ("binance", "bnb", "ba"):
EXCHANGE_KIND = "binance"
elif _raw_ex in ("okx", "okex"):
EXCHANGE_KIND = "okx"
elif _raw_ex in ("gate", "gateio"):
EXCHANGE_KIND = "gate"
else:
EXCHANGE_KIND = "binance"
# —— Binance ——
_bin_pos = (os.getenv("BINANCE_POSITION_MODE") or "hedge").strip().lower()
BINANCE_POSITION_MODE = "hedge" if _bin_pos in ("hedge", "dual", "double", "hedged") else "oneway"
_bin_margin = (os.getenv("BINANCE_MARGIN_MODE") or "cross").strip().lower()
BINANCE_DEFAULT_MARGIN_MODE = "cross" if _bin_margin in ("cross", "cross_margin") else "isolated"
# —— OKX ——
OKX_TD_MODE = (os.getenv("OKX_TD_MODE") or "cross").strip()
_okx_pos = (os.getenv("OKX_POS_MODE") or "hedge").strip().lower()
OKX_POS_MODE = "hedge" if _okx_pos in ("hedge", "long_short_mode", "dual") else "net"
# —— Gate ——
_gate_td = (os.getenv("GATE_TD_MODE") or "cross").strip().lower()
GATE_DEFAULT_MARGIN_MODE = "cross" if _gate_td in ("cross", "cross_margin") else "isolated"
_gate_pos = (os.getenv("GATE_POS_MODE") or "hedge").strip().lower()
GATE_POS_MODE = "hedge" if _gate_pos in ("hedge", "dual", "double") else "single"
app = FastAPI(title="sub-agent", docs_url=None, redoc_url=None)
_ccxt_ex: Any = None
_markets_loaded = False
def _socks_proxy_url(prefix: str) -> str:
return (os.getenv(f"{prefix}_SOCKS_PROXY") or "").strip()
def _http_https_proxy(prefix: str) -> dict[str, str] | None:
http = (os.getenv(f"{prefix}_HTTP_PROXY") or "").strip()
https = (os.getenv(f"{prefix}_HTTPS_PROXY") or "").strip()
socks = _socks_proxy_url(prefix)
if socks:
return {"http": socks, "https": socks}
if http or https:
return {"http": http, "https": https}
return None
def _attach_proxies(ex: Any, prefix: str) -> None:
p = _http_https_proxy(prefix)
if p:
ex.proxies = p
def _make_exchange() -> Any:
if EXCHANGE_KIND == "binance":
key = (os.getenv("BINANCE_API_KEY") or "").strip()
secret = (os.getenv("BINANCE_API_SECRET") or "").strip()
if not key or not secret:
raise RuntimeError("缺少 BINANCE_API_KEY / BINANCE_API_SECRET")
ex = ccxt.binance(
{
"apiKey": key,
"secret": secret,
"enableRateLimit": True,
"options": {
"defaultType": "swap",
# ccxt 默认 fetch_balance 走现货;与监控项目一致,固定为 U 本位合约钱包
"fetchBalance": {"defaultType": "swap"},
"defaultMarginMode": BINANCE_DEFAULT_MARGIN_MODE,
"adjustForTimeDifference": True,
},
}
)
_attach_proxies(ex, "BINANCE")
return ex
if EXCHANGE_KIND == "okx":
key = (os.getenv("OKX_API_KEY") or "").strip()
secret = (os.getenv("OKX_API_SECRET") or "").strip()
password = (os.getenv("OKX_API_PASSPHRASE") or "").strip()
if not key or not secret or not password:
raise RuntimeError("缺少 OKX_API_KEY / OKX_API_SECRET / OKX_API_PASSPHRASE")
ex = ccxt.okx(
{
"apiKey": key,
"secret": secret,
"password": password,
"enableRateLimit": True,
"options": {"defaultType": "swap"},
}
)
_attach_proxies(ex, "OKX")
return ex
# gate
key = (os.getenv("GATE_API_KEY") or "").strip()
secret = (os.getenv("GATE_API_SECRET") or "").strip()
if not key or not secret:
raise RuntimeError("缺少 GATE_API_KEY / GATE_API_SECRET")
ex = ccxt.gateio(
{
"apiKey": key,
"secret": secret,
"enableRateLimit": True,
"options": {
"defaultType": "swap",
"defaultMarginMode": GATE_DEFAULT_MARGIN_MODE,
},
}
)
_attach_proxies(ex, "GATE")
return ex
def get_exchange() -> Any:
global _ccxt_ex
if _ccxt_ex is None:
_ccxt_ex = _make_exchange()
return _ccxt_ex
def _ensure_markets() -> None:
global _markets_loaded
if not _markets_loaded:
get_exchange().load_markets()
_markets_loaded = True
def _check_token(x_control_token: str | None) -> None:
if not CONTROL_TOKEN:
return
if (x_control_token or "").strip() != CONTROL_TOKEN:
raise HTTPException(status_code=401, detail="invalid token")
def _position_mode_label() -> str:
if EXCHANGE_KIND == "binance":
return BINANCE_POSITION_MODE
if EXCHANGE_KIND == "okx":
return OKX_POS_MODE
return GATE_POS_MODE
def _close_param_candidates_binance(direction: str) -> list[dict[str, Any]]:
ps = "LONG" if direction == "long" else "SHORT"
hedge_ro = {"positionSide": ps, "reduceOnly": True}
hedge_plain = {"positionSide": ps}
oneway_ro = {"reduceOnly": True}
oneway_plain: dict[str, Any] = {}
if BINANCE_POSITION_MODE == "hedge":
return [hedge_ro, hedge_plain, oneway_ro, oneway_plain]
return [oneway_ro, oneway_plain, hedge_ro, hedge_plain]
def _close_param_candidates_okx(direction: str) -> list[dict[str, Any]]:
base: dict[str, Any] = {"tdMode": OKX_TD_MODE}
out: list[dict[str, Any]] = []
if OKX_POS_MODE == "hedge":
ps = "long" if direction == "long" else "short"
out.extend(
[
{**base, "posSide": ps, "reduceOnly": True},
{**base, "posSide": ps},
]
)
out.extend([{**base, "reduceOnly": True}, dict(base)])
return out
def _close_param_candidates_gate(_direction: str) -> list[dict[str, Any]]:
return [{"reduceOnly": True}, {}]
def _close_param_candidates(direction: str) -> list[dict[str, Any]]:
if EXCHANGE_KIND == "binance":
return _close_param_candidates_binance(direction)
if EXCHANGE_KIND == "okx":
return _close_param_candidates_okx(direction)
return _close_param_candidates_gate(direction)
def _retryable_close_err(msg: str) -> bool:
s = (msg or "").lower()
if "-4061" in s:
return True
if "-1106" in s and "reduceonly" in s:
return True
if "reduceonly" in s or "reduce only" in s:
return True
if "position side" in s or "positionside" in s or "pos side" in s:
return True
if "dual side" in s or "position mode" in s:
return True
return False
def _position_contracts(p: dict[str, Any]) -> float:
raw = p.get("contracts")
if raw is not None:
try:
return float(raw)
except (TypeError, ValueError):
pass
info = p.get("info") or {}
for k in ("positionAmt", "positionamt", "pos", "size"):
if k in info:
try:
v = float(info[k])
if v != 0:
return v
except (TypeError, ValueError):
pass
return 0.0
def _position_side(p: dict[str, Any], contracts: float) -> str:
s = (p.get("side") or "").lower()
if s in ("long", "short"):
return s
if contracts > 0:
return "long"
if contracts < 0:
return "short"
return "long"
def _cancel_symbol_orders(ex: Any, sym: str) -> None:
try:
ex.cancel_all_orders(sym, params={})
except Exception:
pass
if EXCHANGE_KIND != "binance":
return
try:
m = ex.market(sym)
cid = m.get("id")
if cid and hasattr(ex, "fapiPrivateDeleteAlgoOpenOrders"):
ex.fapiPrivateDeleteAlgoOpenOrders({"symbol": cid})
except Exception:
pass
def _is_local(host: str | None) -> bool:
if not host:
return False
h = host.lower()
return h in ("127.0.0.1", "::1", "localhost") or h.startswith("::ffff:127.0.0.1")
def _finite_or_none(x: Any) -> float | None:
try:
f = float(x)
return f if math.isfinite(f) else None
except (TypeError, ValueError):
return None
def _extract_usdt_total(balance: dict[str, Any]) -> float | None:
"""从 ccxt balance 结构中尽量取出 USDT 总额(与 crypto_monitor_binance 一致)。"""
usdt_info = balance.get("USDT") or {}
if not isinstance(usdt_info, dict):
usdt_info = {}
total_map = balance.get("total") or {}
if not isinstance(total_map, dict):
total_map = {}
free_map = balance.get("free") or {}
if not isinstance(free_map, dict):
free_map = {}
total = usdt_info.get("total")
if total is None:
total = usdt_info.get("equity")
if total is None:
total = total_map.get("USDT")
if total is None:
total = usdt_info.get("free")
if total is None:
total = free_map.get("USDT")
try:
return float(total) if total is not None else None
except (TypeError, ValueError):
return None
def _binance_futures_usdt_asset_row(balance: Any) -> dict[str, Any] | None:
"""U 本位合约 fetch_balance(type=swap) 的 info.assets 中 USDT 一行(与币安合约后台口径一致)。"""
if not isinstance(balance, dict):
return None
info = balance.get("info")
if not isinstance(info, dict):
return None
assets = info.get("assets")
if not isinstance(assets, list):
return None
for a in assets:
if isinstance(a, dict) and str(a.get("asset") or "").upper() == "USDT":
return a
return None
def _binance_swap_usdt_total(ex: Any) -> float | None:
"""仅 U 本位永续合约账户 USDT(显式 type=swap,不用现货余额)。"""
try:
bal = ex.fetch_balance({"type": "swap"})
except Exception:
return None
row = _binance_futures_usdt_asset_row(bal)
if row:
for k in ("marginBalance", "walletBalance", "crossWalletBalance", "balance"):
x = row.get(k)
if x is not None and str(x).strip() != "":
try:
fv = float(x)
if fv >= 0:
return fv
except (TypeError, ValueError):
pass
v = _extract_usdt_total(bal)
return float(v) if v is not None else None
@app.middleware("http")
async def local_only(request: Request, call_next):
if request.client and not _is_local(request.client.host):
return JSONResponse({"detail": "forbidden"}, status_code=403)
return await call_next(request)
@app.get("/health")
def health():
return {"ok": True, "exchange": EXCHANGE_KIND}
@app.get("/status")
def status(x_control_token: str | None = Header(default=None, alias="X-Control-Token")):
try:
return _status_inner(x_control_token)
except HTTPException:
raise
except Exception as e:
return JSONResponse(
{
"ok": False,
"error": f"status: {e}",
"exchange": EXCHANGE_KIND,
"balance_usdt": None,
"positions": [],
"total_unrealized_pnl": None,
},
status_code=200,
)
def _status_inner(x_control_token: str | None) -> Any:
_check_token(x_control_token)
try:
ex = get_exchange()
except RuntimeError as e:
return JSONResponse(
{
"ok": False,
"error": str(e),
"exchange": EXCHANGE_KIND,
"balance_usdt": None,
"positions": [],
"total_unrealized_pnl": None,
},
status_code=200,
)
try:
_ensure_markets()
except Exception as e:
return JSONResponse(
{
"ok": False,
"error": f"load_markets: {e}",
"exchange": EXCHANGE_KIND,
"balance_usdt": None,
"positions": [],
"total_unrealized_pnl": None,
},
status_code=200,
)
balance_usdt: float | None = None
try:
if EXCHANGE_KIND == "binance":
balance_usdt = _binance_swap_usdt_total(ex)
else:
bal = ex.fetch_balance()
u = bal.get("USDT") or {}
if isinstance(u, dict) and u.get("total") is not None:
balance_usdt = _finite_or_none(u["total"])
except Exception:
pass
positions_out: list[dict[str, Any]] = []
total_upnl = 0.0
try:
raw = ex.fetch_positions() or []
except Exception as e:
return JSONResponse(
{
"ok": False,
"error": str(e),
"exchange": EXCHANGE_KIND,
"balance_usdt": balance_usdt,
"positions": [],
"total_unrealized_pnl": None,
},
status_code=200,
)
for p in raw:
if not isinstance(p, dict):
continue
c = _position_contracts(p)
if abs(c) < 1e-12:
continue
sym = p.get("symbol") or ""
side = _position_side(p, c)
upnl = p.get("unrealizedPnl")
try:
upnl_f = float(upnl) if upnl is not None else 0.0
except (TypeError, ValueError):
upnl_f = 0.0
total_upnl += upnl_f
notional = p.get("notional")
try:
notional_f = float(notional) if notional is not None else None
except (TypeError, ValueError):
notional_f = None
entry = p.get("entryPrice")
try:
entry_f = float(entry) if entry is not None else None
except (TypeError, ValueError):
entry_f = None
positions_out.append(
{
"symbol": sym,
"side": side,
"contracts": abs(c),
"contracts_signed": c,
"notional_usdt": _finite_or_none(notional_f) if notional_f is not None else None,
"unrealized_pnl": _finite_or_none(upnl_f),
"entry_price": _finite_or_none(entry_f) if entry_f is not None else None,
}
)
try:
pm = _position_mode_label()
except Exception:
pm = EXCHANGE_KIND
return {
"ok": True,
"exchange": EXCHANGE_KIND,
"balance_usdt": balance_usdt,
"positions": positions_out,
"total_unrealized_pnl": _finite_or_none(total_upnl),
"position_mode": pm,
}
@app.post("/emergency/close-all")
def emergency_close_all(x_control_token: str | None = Header(default=None, alias="X-Control-Token")):
_check_token(x_control_token)
try:
ex = get_exchange()
except RuntimeError as e:
raise HTTPException(status_code=503, detail=str(e)) from e
try:
_ensure_markets()
except Exception as e:
return JSONResponse(
{"ok": False, "error": f"load_markets: {e}", "closed": [], "errors": [str(e)], "exchange": EXCHANGE_KIND},
status_code=200,
)
errors: list[str] = []
closed: list[dict[str, Any]] = []
try:
raw = ex.fetch_positions() or []
except Exception as e:
raise HTTPException(status_code=502, detail=f"fetch_positions: {e}") from e
for p in raw:
if not isinstance(p, dict):
continue
c = _position_contracts(p)
if abs(c) < 1e-12:
continue
sym = p.get("symbol")
if not sym:
continue
side = _position_side(p, c)
close_side = "sell" if side == "long" else "buy"
direction = "long" if side == "long" else "short"
try:
amt = float(ex.amount_to_precision(sym, abs(c)))
except Exception:
amt = abs(c)
if amt <= 0:
continue
order_resp = None
last_err: Exception | None = None
for params in _close_param_candidates(direction):
try:
order_resp = ex.create_order(sym, "market", close_side, amt, None, params)
last_err = None
break
except Exception as e:
last_err = e
if _retryable_close_err(str(e)):
continue
errors.append(f"{sym}: {e}")
order_resp = None
break
if order_resp is None and last_err and sym not in "".join(errors):
errors.append(f"{sym}: {last_err}")
if order_resp is not None:
closed.append({"symbol": sym, "side": side, "amount": amt, "order_id": order_resp.get("id")})
_cancel_symbol_orders(ex, sym)
time.sleep(0.05)
return {"ok": len(errors) == 0, "closed": closed, "errors": errors, "exchange": EXCHANGE_KIND}
def main():
import uvicorn
uvicorn.run(app, host=HOST, port=PORT, log_level="warning", access_log=False)
if __name__ == "__main__":
main()