"""清空本地 futures.db 交易/监控数据,保留系统设置与 CTP 配置。""" from __future__ import annotations import os import shutil import sqlite3 import sys from datetime import datetime from pathlib import Path ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) from db_conn import DB_PATH # 保留:settings、fee_rates(柜台费率)、stats_cache 等配置/缓存 KEEP_TABLES = frozenset({ "settings", "fee_rates", "stats_cache", "sqlite_sequence", }) # 清空后重置 sim 账户默认值 RESET_SIM_ACCOUNT = True def _backup_db(db_path: Path) -> Path: ts = datetime.now().strftime("%Y%m%d_%H%M%S") dest = db_path.with_suffix(f".bak.{ts}.db") shutil.copy2(db_path, dest) return dest def clear_trading_data(db_path: Path | None = None, *, backup: bool = True) -> dict: path = Path(db_path or DB_PATH) if not path.is_file(): raise FileNotFoundError(f"数据库不存在: {path}") if backup: bak = _backup_db(path) print(f"已备份 -> {bak}") conn = sqlite3.connect(str(path)) conn.row_factory = sqlite3.Row try: tables = [ r[0] for r in conn.execute( "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" ).fetchall() ] cleared: dict[str, int] = {} for name in tables: if name in KEEP_TABLES: continue before = conn.execute(f"SELECT COUNT(*) AS n FROM [{name}]").fetchone()[0] conn.execute(f"DELETE FROM [{name}]") cleared[name] = int(before) if RESET_SIM_ACCOUNT and "ctp_sim_account" in tables: conn.execute( "INSERT OR REPLACE INTO ctp_sim_account (id, balance, available, updated_at) " "VALUES (1, 100000, 100000, datetime('now'))" ) conn.commit() conn.execute("VACUUM") conn.commit() return cleared finally: conn.close() def main() -> None: import argparse parser = argparse.ArgumentParser(description="清空 futures.db 交易数据,保留 settings") parser.add_argument( "--remote", action="store_true", help="清空服务器 /opt/qihuo/futures.db(需 scripts 内 SSH 配置)", ) parser.add_argument("--no-backup", action="store_true", help="不备份原库") args = parser.parse_args() sys.stdout.reconfigure(encoding="utf-8", errors="replace") if args.remote: import paramiko import time remote_path = "/opt/qihuo/futures.db" script_body = Path(__file__).read_text(encoding="utf-8") remote_py = ( "import sqlite3, shutil, sys\n" "from datetime import datetime\n" f"KEEP={sorted(KEEP_TABLES)!r}\n" f"RESET_SIM={RESET_SIM_ACCOUNT!r}\n" f"path='{remote_path}'\n" "bak=path.replace('.db', f'.bak.{datetime.now():%Y%m%d_%H%M%S}.db')\n" "shutil.copy2(path, bak); print('backup', bak)\n" "c=sqlite3.connect(path)\n" "tables=[r[0] for r in c.execute(\"SELECT name FROM sqlite_master WHERE type='table'\")]\n" "cleared={}\n" "for name in tables:\n" " if name in KEEP: continue\n" " n=c.execute(f'SELECT COUNT(*) FROM [{name}]').fetchone()[0]\n" " c.execute(f'DELETE FROM [{name}]'); cleared[name]=n\n" "if RESET_SIM and 'ctp_sim_account' in tables:\n" " c.execute(\"INSERT OR REPLACE INTO ctp_sim_account (id,balance,available,updated_at) VALUES (1,100000,100000,datetime('now'))\")\n" "c.commit(); c.execute('VACUUM'); c.commit(); c.close()\n" "print('cleared', cleared)\n" ) c = paramiko.SSHClient() c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) host = os.getenv("QIHUO_SERVER_HOST", "192.168.8.21") user = os.getenv("QIHUO_SERVER_USER", "root") password = os.getenv("QIHUO_SERVER_PASSWORD", "") if not password: raise SystemExit("远程清库需设置环境变量 QIHUO_SERVER_PASSWORD") c.connect(host, username=user, password=password, timeout=15) sftp = c.open_sftp() with sftp.file("/tmp/clear_qihuo_db.py", "w") as f: f.write(remote_py) sftp.close() _, o, e = c.exec_command( "cd /opt/qihuo && python3 /tmp/clear_qihuo_db.py && pm2 restart qihuo", timeout=120, ) print(o.read().decode("utf-8", "replace")) err = e.read().decode("utf-8", "replace") if err.strip(): print(err) time.sleep(2) c.close() return print(f"目标库: {DB_PATH}") cleared = clear_trading_data(backup=not args.no_backup) if not cleared: print("无需要清空的表") return print("已清空:") for name, n in sorted(cleared.items()): print(f" {name}: {n} 行") print("保留表:", ", ".join(sorted(KEEP_TABLES - {"sqlite_sequence"}))) if __name__ == "__main__": main()