修复
This commit is contained in:
@@ -261,27 +261,32 @@ def get_node_base(node: Dict[str, Any]) -> str:
|
|||||||
return f"http://{host}:{port}".rstrip("/")
|
return f"http://{host}:{port}".rstrip("/")
|
||||||
|
|
||||||
|
|
||||||
|
def _runtime_unlocked(nid: str) -> Dict[str, Any]:
|
||||||
|
"""获取节点运行时状态(调用方已持有 _nodes_lock 时使用)。"""
|
||||||
|
if nid not in _node_runtime:
|
||||||
|
_node_runtime[nid] = {
|
||||||
|
"in_flight": 0,
|
||||||
|
"healthy": None,
|
||||||
|
"last_check": None,
|
||||||
|
"last_error": "",
|
||||||
|
}
|
||||||
|
return _node_runtime[nid]
|
||||||
|
|
||||||
|
|
||||||
def _runtime(nid: str) -> Dict[str, Any]:
|
def _runtime(nid: str) -> Dict[str, Any]:
|
||||||
with _nodes_lock:
|
with _nodes_lock:
|
||||||
if nid not in _node_runtime:
|
return _runtime_unlocked(nid)
|
||||||
_node_runtime[nid] = {
|
|
||||||
"in_flight": 0,
|
|
||||||
"healthy": None,
|
|
||||||
"last_check": None,
|
|
||||||
"last_error": "",
|
|
||||||
}
|
|
||||||
return _node_runtime[nid]
|
|
||||||
|
|
||||||
|
|
||||||
def node_in_flight_inc(nid: str) -> None:
|
def node_in_flight_inc(nid: str) -> None:
|
||||||
with _nodes_lock:
|
with _nodes_lock:
|
||||||
rt = _runtime(nid)
|
rt = _runtime_unlocked(nid)
|
||||||
rt["in_flight"] = int(rt.get("in_flight", 0)) + 1
|
rt["in_flight"] = int(rt.get("in_flight", 0)) + 1
|
||||||
|
|
||||||
|
|
||||||
def node_in_flight_dec(nid: str) -> None:
|
def node_in_flight_dec(nid: str) -> None:
|
||||||
with _nodes_lock:
|
with _nodes_lock:
|
||||||
rt = _runtime(nid)
|
rt = _runtime_unlocked(nid)
|
||||||
rt["in_flight"] = max(0, int(rt.get("in_flight", 0)) - 1)
|
rt["in_flight"] = max(0, int(rt.get("in_flight", 0)) - 1)
|
||||||
|
|
||||||
|
|
||||||
@@ -362,7 +367,7 @@ async def refresh_all_node_health(client: httpx.AsyncClient) -> None:
|
|||||||
continue
|
continue
|
||||||
ok, err = await probe_node(client, node)
|
ok, err = await probe_node(client, node)
|
||||||
with _nodes_lock:
|
with _nodes_lock:
|
||||||
rt = _runtime(nid)
|
rt = _runtime_unlocked(nid)
|
||||||
rt["healthy"] = ok
|
rt["healthy"] = ok
|
||||||
rt["last_error"] = err
|
rt["last_error"] = err
|
||||||
rt["last_check"] = datetime.now(timezone.utc).isoformat()
|
rt["last_check"] = datetime.now(timezone.utc).isoformat()
|
||||||
@@ -1494,7 +1499,6 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]:
|
|||||||
timeout = httpx.Timeout(600.0, connect=30.0)
|
timeout = httpx.Timeout(600.0, connect=30.0)
|
||||||
app.state.http = httpx.AsyncClient(timeout=timeout)
|
app.state.http = httpx.AsyncClient(timeout=timeout)
|
||||||
global _health_task
|
global _health_task
|
||||||
await refresh_all_node_health(app.state.http)
|
|
||||||
_health_task = asyncio.create_task(_health_loop(app.state.http))
|
_health_task = asyncio.create_task(_health_loop(app.state.http))
|
||||||
try:
|
try:
|
||||||
yield
|
yield
|
||||||
@@ -1636,7 +1640,7 @@ async def api_nodes_test(
|
|||||||
client: httpx.AsyncClient = request.app.state.http
|
client: httpx.AsyncClient = request.app.state.http
|
||||||
ok, err = await probe_node(client, node)
|
ok, err = await probe_node(client, node)
|
||||||
with _nodes_lock:
|
with _nodes_lock:
|
||||||
rt = _runtime(node_id)
|
rt = _runtime_unlocked(node_id)
|
||||||
rt["healthy"] = ok
|
rt["healthy"] = ok
|
||||||
rt["last_error"] = err
|
rt["last_error"] = err
|
||||||
rt["last_check"] = datetime.now(timezone.utc).isoformat()
|
rt["last_check"] = datetime.now(timezone.utc).isoformat()
|
||||||
|
|||||||
Reference in New Issue
Block a user