"""Check qihuo web CTP status and qihuo-ctp worker health.""" from __future__ import annotations import sys import paramiko sys.stdout.reconfigure(encoding="utf-8", errors="replace") c = paramiko.SSHClient() c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) c.connect("192.168.8.21", username="root", password="woaini88", timeout=15) cmds = [ 'curl -s --max-time 8 -H "X-Qihuo-CTP-Token: qihuo-local-ctp" http://127.0.0.1:6601/health', r'''python3 - <<'PY' import http.cookiejar, json, sqlite3, urllib.parse, urllib.request conn = sqlite3.connect("/opt/qihuo/futures.db") row = conn.execute("SELECT value FROM settings WHERE key='admin_username'").fetchone() conn.close() user = row[0] if row else "admin" pw = "" for line in open("/opt/qihuo/.env", encoding="utf-8-sig", errors="replace"): if line.startswith("ADMIN_PASSWORD="): pw = line.split("=", 1)[1].strip().strip('"').strip("'") jar = http.cookiejar.CookieJar() op = urllib.request.build_opener(urllib.request.HTTPCookieProcessor(jar)) op.open(urllib.request.Request( "http://127.0.0.1:6600/login", urllib.parse.urlencode({"username": user, "password": pw}).encode(), ), timeout=8) raw = op.open("http://127.0.0.1:6600/api/ctp/status", timeout=8).read() print(raw.decode("utf-8", "replace")[:2000]) PY''', "pm2 jlist | python3 -c \"import sys,json; rows=json.load(sys.stdin); print([(r.get('name'), (r.get('pm2_env') or {}).get('status'), (r.get('pm2_env') or {}).get('restart_time')) for r in rows if r.get('name') in ('qihuo','qihuo-ctp')])\"", ] try: for cmd in cmds: print("===", cmd) _, o, e = c.exec_command(cmd, timeout=30) out = o.read().decode("utf-8", "replace") err = e.read().decode("utf-8", "replace") print(out[:3000]) if err.strip(): print("ERR:", err[:1000]) finally: c.close()