Files
qihuo/scripts/deploy_night_session.py
T
dekun 7f8b4cfefd Label night-session products and hide day-only symbols at night.
Mark tradable varieties with a night tag; during 21:00-02:30 filter out index futures and other products without night sessions from symbol picker and recommend list.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-06-26 22:27:47 +08:00

77 lines
2.6 KiB
Python

"""Deploy night session symbol filter."""
import paramiko
import sys
from pathlib import Path
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
root = Path(__file__).resolve().parents[1]
FILES = [
"market_sessions.py",
"symbols.py",
"product_recommend.py",
"recommend_store.py",
"install_trading.py",
"templates/trade.html",
"static/js/symbol.js",
"static/js/trade.js",
"static/css/base.css",
]
VERIFY = r"""
import sys
sys.path.insert(0, "/opt/qihuo")
from market_sessions import is_night_trading_session, is_trading_session
from symbols import product_has_night_session, list_recommended_symbols_grouped, enrich_recommend_row
print("trading", is_trading_session(), "night", is_night_trading_session())
print("IF night", product_has_night_session("IF"))
print("ag night", product_has_night_session("ag"))
print("jd night", product_has_night_session("jd"))
rows = [
enrich_recommend_row({"ths": "IF", "name": "沪深300", "status": "ok", "max_lots": 1}),
enrich_recommend_row({"ths": "ag", "name": "白银", "status": "ok", "max_lots": 1}),
]
groups = list_recommended_symbols_grouped(rows)
items = [i for g in groups for i in g.get("items", [])]
ths_set = {i.get("ths_code", "")[:2].lower() for i in items}
print("group ths prefixes", ths_set)
if is_night_trading_session() and any(x.startswith("if") for x in ths_set):
print("FAIL IF shown during night")
elif is_night_trading_session() and not any(x.startswith("ag") for x in ths_set):
print("FAIL ag missing during night")
else:
print("VERIFY PASS")
"""
def main() -> None:
c = paramiko.SSHClient()
c.set_missing_host_key_policy(paramiko.AutoAddPolicy())
c.connect("192.168.8.21", username="root", password="woaini88", timeout=15)
sftp = c.open_sftp()
for rel in FILES:
sftp.put(str(root / rel), f"/opt/qihuo/{rel.replace(chr(92), '/')}")
print("uploaded", rel)
sftp.close()
for cmd in ("cd /opt/qihuo && pm2 restart qihuo", "sleep 3"):
print(">>>", cmd)
_, o, e = c.exec_command(cmd)
print(o.read().decode("utf-8", errors="replace"))
err = e.read().decode("utf-8", errors="replace")
if err.strip():
print(err.strip())
sftp = c.open_sftp()
with sftp.open("/tmp/verify_night.py", "w") as f:
f.write(VERIFY)
sftp.close()
_, o, e = c.exec_command("cd /opt/qihuo && /opt/qihuo/venv/bin/python /tmp/verify_night.py")
print(o.read().decode("utf-8", errors="replace"))
print(e.read().decode("utf-8", errors="replace"))
c.close()
if __name__ == "__main__":
main()