"""Deploy night session symbol filter.""" import paramiko import sys from pathlib import Path sys.stdout.reconfigure(encoding="utf-8", errors="replace") root = Path(__file__).resolve().parents[1] FILES = [ "market_sessions.py", "symbols.py", "product_recommend.py", "recommend_store.py", "install_trading.py", "templates/trade.html", "static/js/symbol.js", "static/js/trade.js", "static/css/base.css", ] VERIFY = r""" import sys sys.path.insert(0, "/opt/qihuo") from market_sessions import is_night_trading_session, is_trading_session from symbols import product_has_night_session, list_recommended_symbols_grouped, enrich_recommend_row print("trading", is_trading_session(), "night", is_night_trading_session()) print("IF night", product_has_night_session("IF")) print("ag night", product_has_night_session("ag")) print("jd night", product_has_night_session("jd")) rows = [ enrich_recommend_row({"ths": "IF", "name": "沪深300", "status": "ok", "max_lots": 1}), enrich_recommend_row({"ths": "ag", "name": "白银", "status": "ok", "max_lots": 1}), ] groups = list_recommended_symbols_grouped(rows) items = [i for g in groups for i in g.get("items", [])] ths_set = {i.get("ths_code", "")[:2].lower() for i in items} print("group ths prefixes", ths_set) if is_night_trading_session() and any(x.startswith("if") for x in ths_set): print("FAIL IF shown during night") elif is_night_trading_session() and not any(x.startswith("ag") for x in ths_set): print("FAIL ag missing during night") else: print("VERIFY PASS") """ def main() -> None: c = paramiko.SSHClient() c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) c.connect("192.168.8.21", username="root", password="woaini88", timeout=15) sftp = c.open_sftp() for rel in FILES: sftp.put(str(root / rel), f"/opt/qihuo/{rel.replace(chr(92), '/')}") print("uploaded", rel) sftp.close() for cmd in ("cd /opt/qihuo && pm2 restart qihuo", "sleep 3"): print(">>>", cmd) _, o, e = c.exec_command(cmd) print(o.read().decode("utf-8", errors="replace")) err = e.read().decode("utf-8", errors="replace") if err.strip(): print(err.strip()) sftp = c.open_sftp() with sftp.open("/tmp/verify_night.py", "w") as f: f.write(VERIFY) sftp.close() _, o, e = c.exec_command("cd /opt/qihuo && /opt/qihuo/venv/bin/python /tmp/verify_night.py") print(o.read().decode("utf-8", errors="replace")) print(e.read().decode("utf-8", errors="replace")) c.close() if __name__ == "__main__": main()