diff --git a/README.md b/README.md index 7e21aad..4e92258 100644 --- a/README.md +++ b/README.md @@ -33,6 +33,8 @@ ## 快速开始 +> **发布铁律**:本地改代码 → 提交并 `git push` → 服务器 **仅** `git pull` / `git reset --hard origin/main` 更新。**禁止 SCP 复制代码到服务器。** 详见 [部署文档 · 代码发布铁律](docs/DEPLOY.md#代码发布铁律强制不容置疑)。 + **服务器(Ubuntu)** ```bash @@ -40,7 +42,7 @@ cd /opt/qihuo && bash deploy.sh # 访问 http://:6600 ``` -**更新** +**更新**(须先在本机 `git push`) ```bash cd /opt/qihuo @@ -50,6 +52,8 @@ python scripts/run_schema_migrate.py pm2 restart ecosystem.config.cjs --update-env ``` +**禁止** 使用 `scp` / 手工复制更新服务器代码。 + 生产环境须同时维护 **`qihuo`**(Web)与 **`qihuo-ctp`**(CTP Worker)两个 PM2 进程。 详见 [部署文档](docs/DEPLOY.md)。 diff --git a/docs/DEPLOY.md b/docs/DEPLOY.md index 4cb96e1..77ba2fa 100644 --- a/docs/DEPLOY.md +++ b/docs/DEPLOY.md @@ -4,6 +4,52 @@ --- +## 代码发布铁律(强制,不容置疑) + +**所有代码变更必须且只能按以下三步执行,不得跳过、不得变通:** + +| 步骤 | 在哪里 | 做什么 | +|------|--------|--------| +| **1. 本地修改** | 开发机 / 本仓库工作区 | 改代码、自测 | +| **2. 提交仓库** | `git.bz121.com` | `git add` → `git commit` → `git push origin main`(或约定分支) | +| **3. 更新服务器** | `/opt/qihuo` | **仅** `git fetch` + `git reset --hard origin/main`(或 `git pull`)→ 依赖/迁移 → `pm2 restart` | + +### 严禁事项 + +- **禁止** 用 `scp`、`rsync`、SFTP、手工复制等方式把 `.py` / `.js` / `.html` / 模板 / 静态资源 **直接覆盖** 到服务器。 +- **禁止** 在服务器上 `vim` 改业务代码后长期不提交仓库(`.env`、日志、上传文件除外)。 +- **禁止** 「服务器上先改一版、本地以后再补提交」——服务器代码必须与远端 Git **完全一致**。 + +违反上述规则会导致:`git pull` 冲突、Web 与 Worker 版本不一致、问题无法复现、回滚困难。**一律视为部署事故。** + +### 服务器唯一合法更新命令 + +代码已推送到远端后,在服务器执行: + +```bash +cd /opt/qihuo +git fetch origin +git reset --hard origin/main +source venv/bin/activate +pip install -r requirements.txt +python scripts/run_schema_migrate.py +pm2 restart ecosystem.config.cjs --update-env +pm2 save +``` + +或使用 `bash deploy.sh`(内部同样通过 Git 拉取,见下文)。 + +### 数据与配置(不受 Git 管理) + +以下文件 **不** 随 `git pull` 更新,卸载/重装时须 **单独备份与恢复**: + +- `/opt/qihuo/.env` +- `/opt/qihuo/futures.db`(SQLite)或 PostgreSQL 数据 +- `/opt/qihuo/uploads/` +- `/opt/qihuo/backups/`(若有) + +--- + ## 部署概要 | 项目 | 默认值 | @@ -87,6 +133,56 @@ MIGRATE_SQLITE=1 sudo bash scripts/deploy_postgres.sh --- +## 服务器卸载与全新部署(Git 唯一来源) + +当服务器代码被 SCP 弄乱、版本不可信、或需要与仓库 **完全对齐** 时,按本节 **卸载后重装**。全程 **只** 通过 Git 获取代码,**不得** SCP 复制业务文件。 + +### 1. 备份(必做) + +```bash +# 在服务器上 +cp /opt/qihuo/.env /root/qihuo.env.bak +# SQLite +cp /opt/qihuo/futures.db /root/futures.db.bak 2>/dev/null || true +# PostgreSQL 见 POSTGRES.md 备份命令 +tar czf /root/qihuo_uploads.bak.tar.gz -C /opt/qihuo uploads 2>/dev/null || true +``` + +### 2. 卸载 PM2 与代码目录 + +```bash +pm2 stop qihuo qihuo-ctp 2>/dev/null || true +pm2 delete qihuo qihuo-ctp 2>/dev/null || true +pm2 save +rm -rf /opt/qihuo +``` + +> **不删除** `/root/qihuo.env.bak`、`/root/futures.db.bak` 等备份。 + +### 3. 从 Git 全新克隆并部署 + +```bash +git clone https://git.bz121.com/dekun/qihuo.git /opt/qihuo +cd /opt/qihuo +cp /root/qihuo.env.bak .env +# SQLite 恢复(若使用) +cp /root/futures.db.bak futures.db 2>/dev/null || true +bash deploy.sh +``` + +### 4. 验收 + +```bash +cd /opt/qihuo && git log -1 --oneline # 须与远端 main 最新提交一致 +pm2 status # qihuo、qihuo-ctp 均为 online +``` + +浏览器访问 `http://<服务器IP>:6600` 登录验证。 + +此后所有更新 **只** 走上文「代码发布铁律」三步,**禁止** 再使用 SCP 更新代码。 + +--- + ## 手动部署 ### 1. 安装系统依赖 @@ -206,6 +302,9 @@ mkdir -p /opt/qihuo/logs /opt/qihuo/uploads ## 更新部署 +> **强制流程**:本地修改 → `git push` → 服务器 `git fetch && git reset --hard origin/main` → 迁移 → `pm2 restart`。 +> **禁止 SCP 复制代码。** 详见上文 [代码发布铁律](#代码发布铁律强制不容置疑)。 + 代码已推送后,在服务器执行: ```bash @@ -383,7 +482,7 @@ ufw allow 6600/tcp | **下单监控无持仓** | 未连接 CTP 或确实无仓 | 先点「连接 CTP」 | | **`Could not resolve host`** | 服务器 DNS 故障 | 配置 systemd-resolved 公共 DNS,见下方 | | `database is locked` | SQLite 并发 | **推荐改 PostgreSQL**:`MIGRATE_SQLITE=1 bash scripts/deploy_postgres.sh`,见 [POSTGRES.md](./POSTGRES.md) | -| `git pull` 冲突 | 本地有修改 / SCP 部署 | `git fetch && git reset --hard origin/main` | +| `git pull` 冲突 | 曾用 SCP 覆盖文件(**禁止**) | 按 [服务器卸载与全新部署](#服务器卸载与全新部署git-唯一来源) 或 `git reset --hard origin/main` 与远端对齐 | 查看应用是否在监听: diff --git a/docs/INDEX.md b/docs/INDEX.md index 3421f47..21d9e95 100644 --- a/docs/INDEX.md +++ b/docs/INDEX.md @@ -41,7 +41,7 @@ | [TRADING.md](./TRADING.md) | 可开仓品种、计仓、SimNow/实盘 | | [SIMNOW.md](./SIMNOW.md) | SimNow 仿真注册与接入 | | [CTP_LIVE.md](./CTP_LIVE.md) | **期货公司实盘 CTP** 与开平仓对比 | -| [DEPLOY.md](./DEPLOY.md) | 部署说明 | +| [DEPLOY.md](./DEPLOY.md) | 部署说明(含 **代码发布铁律**:仅 Git 三步,禁止 SCP) | | [POSTGRES.md](./POSTGRES.md) | **PostgreSQL 生产库**(一键部署、迁移、备份恢复) | | [BACKUP.md](./BACKUP.md) | 数据备份与恢复 | diff --git a/install_trading.py b/install_trading.py index 9bfe6b1..7750efd 100644 --- a/install_trading.py +++ b/install_trading.py @@ -154,6 +154,65 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se """注册交易相关路由。""" _nav = require_nav _live_refresh_lock = threading.Lock() + _ctp_status_cache: dict = {"mode": "", "status": {}, "ts": 0.0} + _ctp_status_cache_lock = threading.Lock() + _ctp_status_refresh_flag = {"busy": False} + + def _remember_ctp_status(mode: str, st: dict) -> None: + if not isinstance(st, dict) or not st: + return + with _ctp_status_cache_lock: + _ctp_status_cache["mode"] = mode + _ctp_status_cache["status"] = dict(st) + _ctp_status_cache["ts"] = time.time() + + def _schedule_ctp_status_refresh(mode: str) -> None: + with _ctp_status_cache_lock: + if _ctp_status_refresh_flag["busy"]: + return + _ctp_status_refresh_flag["busy"] = True + + def _run() -> None: + try: + st = dict(ctp_status(mode) or {}) + _remember_ctp_status(mode, st) + snap = position_hub.get_snapshot() + if snap: + merged = dict(snap) + merged["ctp_status"] = st + position_hub.set_snapshot(merged) + except Exception as exc: + logger.debug("ctp status refresh: %s", exc) + finally: + with _ctp_status_cache_lock: + _ctp_status_refresh_flag["busy"] = False + + threading.Thread( + target=_run, + daemon=True, + name="ctp-status-refresh", + ).start() + + def _cached_ctp_status(mode: str) -> dict: + """页面/SSE 优先读快照与内存缓存,避免同步 worker IPC 阻塞 HTTP 线程。""" + try: + snap = position_hub.get_snapshot() or {} + st = snap.get("ctp_status") + if isinstance(st, dict) and st: + _remember_ctp_status(mode, st) + return dict(st) + except Exception: + pass + with _ctp_status_cache_lock: + if _ctp_status_cache["mode"] == mode and _ctp_status_cache["status"]: + return dict(_ctp_status_cache["status"]) + _schedule_ctp_status_refresh(mode) + return { + "connected": False, + "connecting": True, + "last_error": "", + "mode_label": trading_mode_label(get_setting), + } def _sizing_mode_label(mode: str) -> str: m = normalize_sizing_mode(mode) @@ -217,7 +276,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se get_fixed_lots_fn=lambda: get_fixed_lots(get_setting), ) - def _recommend_payload(conn) -> dict: + def _recommend_payload(conn, *, use_ctp_margin: bool = True) -> dict: mode = get_trading_mode(get_setting) return recommend_payload( conn, @@ -226,6 +285,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se trading_mode=mode, sizing_mode=get_sizing_mode(get_setting), fixed_lots=get_fixed_lots(get_setting), + use_ctp_margin=use_ctp_margin, ) def _recommend_capital(conn) -> float: @@ -2011,6 +2071,7 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M") mode = get_trading_mode(get_setting) ctp_st = ctp_status(mode) + _remember_ctp_status(mode, ctp_st) capital = _capital(conn) if ctp_st.get("connected") and not fast: _reconcile_pending(conn, mode, capital=capital) @@ -2061,45 +2122,19 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se } def _minimal_live_payload(conn) -> dict: - """CTP 直出兜底:不跑对账/写库,避免与后台 worker 争锁。""" - from zoneinfo import ZoneInfo - - tz = ZoneInfo("Asia/Shanghai") - now_iso = datetime.now(tz).strftime("%Y-%m-%dT%H:%M") + """零 IPC 兜底:仅读库 + 缓存 CTP 状态,持仓由后台 worker 补全。""" mode = get_trading_mode(get_setting) - ctp_st = ctp_status(mode) + ctp_st = _cached_ctp_status(mode) capital = _capital(conn) - rows: list[dict] = [] - if ctp_st.get("connected"): - for p in _ctp_positions(mode, refresh_if_empty=False): - lots = int(p.get("lots") or 0) - if lots <= 0: - continue - ths = _ctp_pos_to_ths_code(p) or (p.get("symbol") or "") - direction = p.get("direction") or "long" - mon = {"symbol": ths, "direction": direction} - try: - row = _compose_position_row( - conn, - mon=mon, - ctp=p, - mode=mode, - capital=capital, - now_iso=now_iso, - fast=True, - ) - if row: - rows.append(row) - except Exception as exc: - logger.warning("minimal live row failed: %s", exc) risk = get_risk_status( conn, - active_count=_effective_active_position_count(conn, mode), + active_count=count_active_trade_monitors(conn), equity=capital, ) + syncing = bool(ctp_st.get("connected") or ctp_st.get("connecting")) return { "ok": True, - "rows": rows, + "rows": [], "active_orders": [], "pending_orders": [], "capital": capital, @@ -2110,8 +2145,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se "night_session": is_night_trading_session(), "session_clock": trading_session_clock(), "pending_order_timeout_min": get_pending_order_timeout_min(get_setting), - "sync_state": trading_state.sync_state, - "sync_label": trading_state.sync_label(), + "sync_state": "syncing" if syncing else trading_state.sync_state, + "sync_label": "加载中…" if syncing else trading_state.sync_label(), } def _normalize_live_payload(payload: dict) -> dict: @@ -2445,10 +2480,10 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se conn.commit() sizing = get_sizing_mode(get_setting) max_pct = get_max_margin_pct(get_setting) - rec_cache = _recommend_payload(conn) + rec_cache = _recommend_payload(conn, use_ctp_margin=False) if rec_cache.get("needs_refresh"): _schedule_recommend_refresh() - ctp_connected = is_ctp_connected(get_setting) + ctp_connected = connected margin_rec = small_account_margin_recommendations() if not bootstrap_live: bootstrap_live = { @@ -2529,13 +2564,13 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def api_trading_stream(): from queue import Empty + @stream_with_context def generate(): + yield ": stream\n\n" q = position_hub.subscribe() try: snap = position_hub.get_snapshot() - if snap: - yield sse_format("positions", snap) - else: + if not snap: conn = get_db() try: init_strategy_tables(conn) @@ -2545,6 +2580,8 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se position_hub.set_snapshot(payload) yield sse_format("positions", payload) _push_position_snapshot_async(fast=True) + else: + yield sse_format("positions", snap) while True: try: msg = q.get(timeout=25) @@ -2866,17 +2903,6 @@ def install_trading(app, *, login_required, require_nav, get_db, get_setting, se def _roll_ui_modes(): return frozenset({ADD_MODE_MARKET, ADD_MODE_BREAKOUT}) - def _cached_ctp_status(mode: str) -> dict: - """页面渲染优先读持仓快照里的 CTP 状态,避免每次打 worker IPC。""" - try: - snap = position_hub.get_snapshot() or {} - st = snap.get("ctp_status") - if isinstance(st, dict) and st: - return dict(st) - except Exception: - pass - return dict(ctp_status(mode) or {}) - def _roll_filled_lots_map(conn, group_ids: list[int]) -> dict[int, int]: if not group_ids: return {} diff --git a/recommend_store.py b/recommend_store.py index c5287b0..f089592 100644 --- a/recommend_store.py +++ b/recommend_store.py @@ -110,15 +110,29 @@ def recommend_cache_needs_refresh( def _ctp_connected_for_mode(trading_mode: str) -> bool: try: - from vnpy_bridge import ctp_status + from position_stream import position_hub - return bool(ctp_status(trading_mode).get("connected")) + snap = position_hub.get_snapshot() or {} + st = snap.get("ctp_status") + if isinstance(st, dict) and st: + return bool(st.get("connected")) except Exception: - return False + pass + del trading_mode + return False def recommend_margin_used(trading_mode: str) -> float: """当前持仓已占用保证金(各持仓 CTP 回报之和,与柜台持仓保证金一致)。""" + try: + from position_stream import position_hub + + snap = position_hub.get_snapshot() or {} + raw = snap.get("margin_used") + if raw is not None: + return max(0.0, float(raw or 0)) + except Exception: + pass if not _ctp_connected_for_mode(trading_mode): return 0.0 try: @@ -162,6 +176,7 @@ def enrich_recommend_rows( max_margin_pct: float = 30.0, trading_mode: str = "simulation", margin_used: float = 0.0, + use_ctp_margin: bool = True, ) -> list[dict]: """用当前权益与保证金比例补算最大可开手数(兼容旧缓存)。""" cap = float(capital or 0) @@ -193,7 +208,7 @@ def enrich_recommend_rows( code_for_margin, price, direction="max", - trading_mode=trading_mode if ctp_connected else None, + trading_mode=trading_mode if (ctp_connected and use_ctp_margin) else None, ) if spec_used.get("mult"): row["mult"] = spec_used["mult"] @@ -340,19 +355,39 @@ def recommend_payload( trading_mode: str = "simulation", sizing_mode: str = "fixed", fixed_lots: int = 1, + use_ctp_margin: bool = True, ) -> dict: """读取缓存并附带当前权益(展示用,可能与缓存计算时不同)。""" payload = load_recommend_cache(conn) cap = float(live_capital or 0) pct = max(1.0, min(100.0, float(max_margin_pct or 30.0))) - used = recommend_margin_used(trading_mode) + if use_ctp_margin: + used = recommend_margin_used(trading_mode) + else: + used = 0.0 + try: + from position_stream import position_hub + + snap = position_hub.get_snapshot() or {} + raw = snap.get("margin_used") + if raw is not None: + used = max(0.0, float(raw or 0)) + except Exception: + pass + if used <= 0: + used = float(payload.get("margin_used") or 0) budget_info = margin_budget_info(cap, pct, used) payload["capital"] = cap payload["max_margin_pct"] = pct payload.update(budget_info) rows = payload.get("rows") or [] rows = enrich_recommend_rows( - rows, cap, max_margin_pct=pct, trading_mode=trading_mode, margin_used=used, + rows, + cap, + max_margin_pct=pct, + trading_mode=trading_mode, + margin_used=used, + use_ctp_margin=use_ctp_margin, ) rows = filter_rows_for_account_scope( rows, cap, ctp_connected=_ctp_connected_for_mode(trading_mode), diff --git a/trading_context.py b/trading_context.py index 169a6ba..56edda7 100644 --- a/trading_context.py +++ b/trading_context.py @@ -118,10 +118,29 @@ def _cached_ctp_account(mode: str) -> dict[str, float]: return {} +def _ctp_status_from_snapshot(mode: str) -> Optional[dict]: + """读持仓快照中的 CTP 状态,避免页面渲染同步 IPC。""" + try: + from position_stream import position_hub + + snap = position_hub.get_snapshot() or {} + st = snap.get("ctp_status") + if isinstance(st, dict) and st: + return st + except Exception: + pass + del mode + return None + + def get_account_capital(conn, get_setting: Callable[[str, str], str]) -> float: - """优先 SimNow/期货公司 CTP 权益;未连接时用最近快照或设置中的参考资金。""" + """优先读持仓/Worker 快照权益;无快照时才同步问 CTP。""" del conn mode = get_trading_mode(get_setting) + cached = _cached_ctp_account(mode) + balance = float(cached.get("balance") or 0) + if balance > 0: + return balance try: from vnpy_bridge import ctp_status, get_ctp_balance @@ -132,10 +151,6 @@ def get_account_capital(conn, get_setting: Callable[[str, str], str]) -> float: return float(bal) except Exception: pass - cached = _cached_ctp_account(mode) - balance = float(cached.get("balance") or 0) - if balance > 0: - return balance try: return float(get_setting("live_capital", "0") or 0) except (TypeError, ValueError): @@ -153,10 +168,13 @@ def get_recommend_capital(conn, get_setting: Callable[[str, str], str]) -> float def is_ctp_connected(get_setting: Callable[[str, str], str]) -> bool: """当前交易模式(SimNow / 实盘)是否已连接 CTP。""" + mode = get_trading_mode(get_setting) + st = _ctp_status_from_snapshot(mode) + if st is not None: + return bool(st.get("connected")) try: from vnpy_bridge import ctp_status - mode = get_trading_mode(get_setting) return bool(ctp_status(mode).get("connected")) except Exception: return False