"""计算器:从已配置交易实例读取 USDT 永续合约精度与张数规则。""" from __future__ import annotations import json import threading import time import urllib.error import urllib.request from typing import Any, Callable, Optional, Tuple from urllib.parse import urlencode from manual_trading_hub.settings_store import enabled_exchanges, load_settings MARKET_CACHE: dict[str, tuple[float, dict[str, Any]]] = {} MARKET_LOCK = threading.Lock() MARKET_TTL_SEC = 300.0 HUB_FLASK_TIMEOUT = float(__import__("os").getenv("HUB_FLASK_TIMEOUT", "12")) def normalize_base_symbol(text: str) -> str: s = str(text or "").upper().strip() for suf in ("USDT:USDT", "/USDT:USDT", "/USDT", "USDT", "-USDT-SWAP"): if s.endswith(suf) and len(s) > len(suf): s = s[: -len(suf)].strip("-/") break if "/" in s: s = s.split("/", 1)[0].strip() if ":" in s: s = s.split(":", 1)[0].strip() return s def resolve_usdt_perp_symbol(exchange: Any, base: str) -> Tuple[Optional[str], Optional[str]]: base_u = normalize_base_symbol(base) if not base_u: return None, "请输入币种,如 ETH" candidates = [f"{base_u}/USDT:USDT", f"{base_u}/USDT"] markets = getattr(exchange, "markets", None) or {} for sym in candidates: m = markets.get(sym) if not m: continue if m.get("active") is False: continue if m.get("swap") or m.get("linear") or m.get("contract"): return sym, None for sym, m in markets.items(): if m.get("active") is False: continue if not (m.get("swap") or m.get("linear")): continue if (m.get("quote") or "").upper() != "USDT": continue if (m.get("base") or "").upper() == base_u: return sym, None return None, f"未找到 {base_u}/USDT 永续合约" def _decimals_from_precision_value(value: Any) -> Optional[int]: if value in (None, ""): return None try: p = float(value) except (TypeError, ValueError): return None if p >= 1 and abs(p - round(p)) < 1e-9 and p <= 12: return int(round(p)) if 0 < p < 1: s = f"{p:.12f}".rstrip("0") if "." in s: return min(12, len(s.split(".", 1)[1])) return None def _decimals_from_ccxt_str(text: str) -> int: s = str(text or "").strip() if not s or "." not in s: return 0 frac = s.split(".", 1)[1] if not frac: return 0 return min(12, len(frac.rstrip("0") or frac)) def amount_decimals_from_exchange(exchange: Any, exchange_symbol: str) -> int: try: return _decimals_from_ccxt_str(exchange.amount_to_precision(exchange_symbol, 1.23456789)) except Exception: market = exchange.market(exchange_symbol) prec = (market.get("precision") or {}).get("amount") d = _decimals_from_precision_value(prec) return d if d is not None else 4 def price_decimals_from_exchange( exchange: Any, exchange_symbol: str, price_tick: Optional[float] ) -> int: from hub_ohlcv_lib import normalize_price_tick tick = normalize_price_tick(price_tick) if tick and tick > 0: if tick >= 1: return 0 s = f"{tick:.12f}".rstrip("0") if "." in s: return min(12, len(s.split(".", 1)[1])) try: return _decimals_from_ccxt_str(exchange.price_to_precision(exchange_symbol, 12345.678901234)) except Exception: market = exchange.market(exchange_symbol) prec = (market.get("precision") or {}).get("price") d = _decimals_from_precision_value(prec) return d if d is not None else 4 def make_amount_precise_fn_from_market(market: dict[str, Any]) -> Callable[[float], Optional[float]]: dec = max(0, int(market.get("amount_decimals") or 4)) min_amt = market.get("min_amount") def _fn(amount: float) -> Optional[float]: try: v = float(amount) except (TypeError, ValueError): return None if v <= 0: return None factor = 10**dec v = int(v * factor + 1e-12) / factor if min_amt is not None: try: if v < float(min_amt): return None except (TypeError, ValueError): pass if v <= 0: return None return v return _fn def find_exchange(exchange_id: str) -> dict | None: needle = str(exchange_id or "").strip() if not needle: return None for ex in load_settings().get("exchanges") or []: if str(ex.get("id") or "").strip() == needle: return ex if str(ex.get("key") or "").strip().lower() == needle.lower(): return ex return None def list_calculator_exchanges() -> list[dict[str, Any]]: rows: list[dict[str, Any]] = [] for ex in enabled_exchanges(): rows.append( { "id": str(ex.get("id") or ""), "key": str(ex.get("key") or ""), "name": str(ex.get("name") or ex.get("key") or ""), "enabled": bool(ex.get("enabled")), } ) return rows def _hub_headers() -> dict[str, str]: import os token = (os.getenv("HUB_BRIDGE_TOKEN") or os.getenv("CONTROL_TOKEN") or "").strip() if token: return {"X-Hub-Token": token} return {} def fetch_instance_market_sync(ex: dict, *, base: str) -> dict[str, Any]: base_url = (ex.get("flask_url") or "").rstrip("/") if not base_url: return {"ok": False, "msg": "未配置 flask_url"} params = urlencode({"base": normalize_base_symbol(base) or base}) url = f"{base_url}/api/hub/market?{params}" req = urllib.request.Request(url, headers=_hub_headers(), method="GET") try: with urllib.request.urlopen(req, timeout=HUB_FLASK_TIMEOUT) as resp: status = int(getattr(resp, "status", 200) or 200) raw = resp.read().decode("utf-8", errors="replace") data = json.loads(raw) if raw else {} if not isinstance(data, dict): return {"ok": False, "msg": "无效 JSON"} if status >= 400: data.setdefault("ok", False) return data except urllib.error.HTTPError as exc: try: raw = exc.read().decode("utf-8", errors="replace") body = json.loads(raw) if raw else {} except Exception: body = {"ok": False, "msg": raw if "raw" in locals() else str(exc)} if isinstance(body, dict): body.setdefault("ok", False) return body return {"ok": False, "msg": f"HTTP {exc.code}"} except Exception as exc: return {"ok": False, "msg": str(exc)} def _enrich_market_from_settings(ex: dict, payload: dict[str, Any]) -> dict[str, Any]: out = dict(payload) out["exchange_id"] = str(ex.get("id") or "") out["exchange_key"] = str(ex.get("key") or "") out["exchange_name"] = str(ex.get("name") or ex.get("key") or "") out["exchange_label"] = out["exchange_name"] return out def get_calculator_market( exchange_id: str, base: str, *, ex: dict | None = None, ) -> Tuple[Optional[dict[str, Any]], Optional[str]]: """从系统设置中的交易实例拉取合约精度(与实盘一致)。""" row = ex or find_exchange(exchange_id) if not row: return None, "未找到该交易所配置" if not row.get("enabled"): return None, f"{row.get('name') or exchange_id} 未启用" base_u = normalize_base_symbol(base) if not base_u: return None, "请输入币种,如 ETH" cache_key = f"{row.get('id')}:{base_u}" now = time.time() with MARKET_LOCK: cached = MARKET_CACHE.get(cache_key) if cached and now - cached[0] < MARKET_TTL_SEC: return dict(cached[1]), None remote = fetch_instance_market_sync(row, base=base_u) if not remote.get("ok"): return None, str(remote.get("msg") or "实例返回失败") data = _enrich_market_from_settings(row, remote) with MARKET_LOCK: MARKET_CACHE[cache_key] = (now, data) return data, None def clear_market_cache() -> None: with MARKET_LOCK: MARKET_CACHE.clear()