diff --git a/app.py b/app.py index 90d91f5..f73cb7e 100644 --- a/app.py +++ b/app.py @@ -11,7 +11,7 @@ _legacy = os.path.join(_ROOT, "_legacy") if _legacy not in sys.path: sys.path.insert(0, _legacy) -from modules.core.paths import ROOT, UPLOADS_DIR, DB_PATH, ensure_runtime_dirs, resolve_env_file +from modules.core.paths import ROOT, UPLOADS_DIR, DB_PATH, KLINE_DB_PATH, ensure_runtime_dirs, resolve_env_file from locale_fix import ensure_process_locale ensure_process_locale() @@ -60,7 +60,7 @@ from stats_engine import ( load_stats_cache, refresh_stats_cache, ) -from kline_store import ensure_kline_tables +from kline_store import connect_kline_db, ensure_kline_tables from kline_stream import kline_hub, sse_format from kline_chart import generate_review_kline_chart, fetch_market_klines, MARKET_PERIODS from market import ( @@ -456,7 +456,6 @@ def init_db(): if not is_schema_migration_error(exc): raise rollback_if_postgres(conn) - ensure_kline_tables(conn) init_strategy_tables(conn) from risk.account_risk_lib import ensure_account_risk_schema from recommend_store import ensure_recommend_tables @@ -761,7 +760,6 @@ def check_order_plans(): def check_key_monitors(): - from db_conn import DB_PATH from key_monitor_lib import run_key_monitor_check from trading_context import get_trading_mode @@ -770,7 +768,7 @@ def check_key_monitors(): execute_fn = getattr(app, "_execute_key_breakout", None) run_key_monitor_check( conn, - db_path=DB_PATH, + db_path=KLINE_DB_PATH, get_trading_mode_fn=lambda: get_trading_mode(get_setting), send_wechat=send_wechat_msg, execute_breakout_fn=execute_fn, @@ -799,11 +797,20 @@ def background_task(): def start_background_threads(): from trading_context import get_trading_mode + from modules.market.kline_seed import start_small_account_kline_seed + + try: + kconn = connect_kline_db(KLINE_DB_PATH) + ensure_kline_tables(kconn) + kconn.close() + except Exception as exc: + app.logger.warning("kline db init: %s", exc) + start_small_account_kline_seed(db_path=KLINE_DB_PATH) threading.Thread(target=background_task, daemon=True).start() threading.Thread( target=lambda: kline_hub.worker_loop( - DB_PATH, + KLINE_DB_PATH, lambda sym, mc, sc: build_market_quote_payload( sym, mc, sc, prefer_sina=False, ), @@ -849,6 +856,7 @@ if os.getenv("QIHUO_INIT_ONLY") != "1": start_background_threads=start_background_threads, tz=TZ, db_path=DB_PATH, + kline_db_path=KLINE_DB_PATH, upload_dir=UPLOAD_DIR, open_types=OPEN_TYPES, exit_triggers=EXIT_TRIGGERS, diff --git a/modules/core/db_conn.py b/modules/core/db_conn.py index 5c976bb..cccf41c 100644 --- a/modules/core/db_conn.py +++ b/modules/core/db_conn.py @@ -13,9 +13,10 @@ import threading import time from typing import Any, Iterable, Optional, Sequence -from modules.core.paths import DB_PATH as _ROOT_DB_PATH +from modules.core.paths import DB_PATH as _ROOT_DB_PATH, KLINE_DB_PATH as _KLINE_DB_PATH DB_PATH = _ROOT_DB_PATH +KLINE_DB_PATH = _KLINE_DB_PATH _backend_lock = threading.Lock() _backend: Optional[str] = None diff --git a/modules/core/deps.py b/modules/core/deps.py index 8cfaeb1..a5e64fb 100644 --- a/modules/core/deps.py +++ b/modules/core/deps.py @@ -29,6 +29,7 @@ class AppDeps: start_background_threads: Callable tz: Any db_path: str + kline_db_path: str upload_dir: str open_types: list exit_triggers: list diff --git a/modules/core/paths.py b/modules/core/paths.py index 9550061..a3aa213 100644 --- a/modules/core/paths.py +++ b/modules/core/paths.py @@ -19,6 +19,7 @@ UPLOADS_DIR = ROOT / "uploads" LOGS_DIR = ROOT / "logs" DB_PATH = str(ROOT / "futures.db") +KLINE_DB_PATH = str(DATA_DIR / "kline.db") def ensure_runtime_dirs() -> None: diff --git a/modules/market/kline_chart.py b/modules/market/kline_chart.py index f495364..4724693 100644 --- a/modules/market/kline_chart.py +++ b/modules/market/kline_chart.py @@ -17,7 +17,12 @@ import requests from modules.core.symbols import ths_to_codes from modules.core.db_conn import connect_db -from modules.market.kline_store import ensure_kline_tables, get_cached_entry, save_bars +from modules.market.kline_store import ( + connect_kline_db, + ensure_kline_tables, + get_cached_entry, + save_bars, +) logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") @@ -261,6 +266,14 @@ def bars_to_api(bars: list) -> list[dict]: return result +def _resolve_kline_db_path(db_path: Optional[str]) -> str: + if db_path: + return db_path + from modules.core.paths import KLINE_DB_PATH + + return KLINE_DB_PATH + + def fetch_market_klines( symbol: str, period: str, @@ -270,6 +283,7 @@ def fetch_market_klines( trading_mode: Optional[str] = None, prefer_ctp: bool = False, ) -> dict: + db_path = _resolve_kline_db_path(db_path) chart_sym = ths_to_sina_chart_symbol(symbol) p = (period or "15m").lower() if p == "timeshare": @@ -308,18 +322,21 @@ def fetch_market_klines( bars = ctp_bars source = "ctp" - if not bars and db_path and chart_sym and not force_remote and need_sina: + local_cached: Optional[dict] = None + if db_path and chart_sym and not force_remote: try: - conn = connect_db(db_path) - cached = get_cached_entry(conn, chart_sym, p) + conn = connect_kline_db(db_path) + local_cached = get_cached_entry(conn, chart_sym, p) conn.close() - if cached and cached.get("fresh"): - bars = cached["bars"] - source = "local" - cached_at = cached.get("updated_at") except Exception as exc: logger.warning("kline cache read failed %s %s: %s", chart_sym, p, exc) + if not bars and local_cached and local_cached.get("bars") and need_sina: + if local_cached.get("fresh"): + bars = local_cached["bars"] + source = "local" + cached_at = local_cached.get("updated_at") + if need_sina and (not bars or len(ctp_bars) < MIN_CTP_KLINE_BARS or not prefer_ctp): remote_bars = fetch_sina_klines(symbol, p) if remote_bars: @@ -331,7 +348,7 @@ def fetch_market_klines( source = "remote" if db_path and chart_sym and not ctp_connected: try: - conn = connect_db(db_path) + conn = connect_kline_db(db_path) ensure_kline_tables(conn) save_bars(conn, chart_sym, p, remote_bars) meta = conn.execute( @@ -342,17 +359,10 @@ def fetch_market_klines( cached_at = meta[0] if meta else None except Exception as exc: logger.warning("kline cache write failed %s %s: %s", chart_sym, p, exc) - elif not bars and db_path and chart_sym: - try: - conn = connect_db(db_path) - cached = get_cached_entry(conn, chart_sym, p) - conn.close() - if cached and cached.get("bars"): - bars = cached["bars"] - source = "local" - cached_at = cached.get("updated_at") - except Exception as exc: - logger.warning("kline cache fallback failed %s %s: %s", chart_sym, p, exc) + elif not bars and local_cached and local_cached.get("bars"): + bars = local_cached["bars"] + source = "local_seed" + cached_at = local_cached.get("updated_at") api_bars = bars_to_api(bars) prev_close = None diff --git a/modules/market/kline_seed.py b/modules/market/kline_seed.py new file mode 100644 index 0000000..86fcffc --- /dev/null +++ b/modules/market/kline_seed.py @@ -0,0 +1,137 @@ +# Copyright (c) 2025-2026 马建军. All rights reserved. +# 专有软件 — 未经授权禁止复制、传播、转售。 +# 详见 LICENSE.zh-CN.txt + +"""20 万以下四品种 K 线预下载(玉米、豆粕、甲醇、螺纹钢)。""" +from __future__ import annotations + +import logging +import threading +import time +from typing import Optional + +from modules.core.symbols import PRODUCTS, resolve_main_contract +from modules.market.kline_chart import fetch_sina_klines, ths_to_sina_chart_symbol +from modules.market.kline_store import ( + connect_kline_db, + ensure_kline_tables, + load_meta, + save_bars, +) +from modules.trading.product_recommend import ( + SMALL_ACCOUNT_PRODUCT_THS, + normalize_product_ths, +) + +logger = logging.getLogger(__name__) + +# 小账户默认预下载周期(行情页 + 关键位常用) +SMALL_ACCOUNT_KLINE_PERIODS = ("5m", "15m", "1h", "d") +MIN_SEED_BARS = 30 +_SEED_LOCK = threading.Lock() +_SEED_STARTED = False + + +def _small_account_products() -> list[dict]: + allowed = {x.upper() for x in SMALL_ACCOUNT_PRODUCT_THS} + out: list[dict] = [] + for p in PRODUCTS: + root = normalize_product_ths(p.get("ths") or "") + if root.upper() in allowed: + out.append(p) + return out + + +def _resolve_main_symbol(product: dict) -> Optional[str]: + try: + from modules.core.symbols import _main_index, _main_index_lock + + with _main_index_lock: + cached = (_main_index or {}).get(product.get("sina") or "") + if cached and cached.get("ths_code"): + return str(cached["ths_code"]) + except Exception: + pass + try: + main = resolve_main_contract(product) + if main and main.get("ths_code"): + return str(main["ths_code"]) + except Exception as exc: + logger.debug("resolve main for seed %s: %s", product.get("ths"), exc) + try: + from modules.core.symbols import _stub_main_contract + + stub = _stub_main_contract(product) + if stub and stub.get("ths_code"): + return str(stub["ths_code"]) + except Exception: + pass + return None + + +def seed_small_account_klines( + *, + db_path: Optional[str] = None, + force: bool = False, +) -> dict[str, int]: + """下载四品种主力合约 K 线到独立库;已存在且充足时跳过。""" + conn = connect_kline_db(db_path) + try: + ensure_kline_tables(conn) + saved: dict[str, int] = {} + for product in _small_account_products(): + sym = _resolve_main_symbol(product) + if not sym: + continue + chart_sym = ths_to_sina_chart_symbol(sym) + if not chart_sym: + continue + for period in SMALL_ACCOUNT_KLINE_PERIODS: + key = f"{sym}:{period}" + meta = load_meta(conn, chart_sym, period) + if ( + not force + and meta + and int(meta.get("bar_count") or 0) >= MIN_SEED_BARS + ): + continue + try: + bars = fetch_sina_klines(sym, period) or [] + except Exception as exc: + logger.warning("seed kline fetch failed %s %s: %s", sym, period, exc) + continue + if len(bars) < MIN_SEED_BARS: + logger.debug( + "seed kline too few bars %s %s: %d", + sym, period, len(bars), + ) + continue + n = save_bars(conn, chart_sym, period, bars) + saved[key] = n + logger.info("seeded kline %s %s (%d bars)", sym, period, n) + return saved + finally: + conn.close() + + +def start_small_account_kline_seed(*, db_path: Optional[str] = None, delay_sec: float = 8.0) -> None: + """后台预下载(仅执行一次)。""" + global _SEED_STARTED + with _SEED_LOCK: + if _SEED_STARTED: + return + _SEED_STARTED = True + + def _run() -> None: + if delay_sec > 0: + time.sleep(delay_sec) + try: + saved = seed_small_account_klines(db_path=db_path) + if saved: + logger.info("small-account kline seed done: %s", ", ".join(saved.keys())) + else: + logger.debug("small-account kline seed: nothing new to download") + except Exception as exc: + logger.warning("small-account kline seed failed: %s", exc) + + threading.Thread(target=_run, daemon=True, name="kline-seed").start() diff --git a/modules/market/kline_store.py b/modules/market/kline_store.py index 6f6a066..abf239b 100644 --- a/modules/market/kline_store.py +++ b/modules/market/kline_store.py @@ -3,9 +3,10 @@ # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md -"""K 线本地 SQLite 缓存。""" +"""K 线本地 SQLite 缓存(独立库 data/kline.db,与业务 futures.db 分离)。""" from __future__ import annotations +import os import sqlite3 from datetime import datetime, timedelta from typing import Optional @@ -27,6 +28,24 @@ REFRESH_SECONDS = { } +def connect_kline_db(path: Optional[str] = None) -> sqlite3.Connection: + """K 线专用 SQLite(生产环境业务库可为 PostgreSQL,K 线仍走本地文件)。""" + from modules.core.paths import KLINE_DB_PATH + + db_path = path or KLINE_DB_PATH + parent = os.path.dirname(db_path) + if parent: + os.makedirs(parent, exist_ok=True) + conn = sqlite3.connect(db_path, timeout=30, check_same_thread=False) + conn.row_factory = sqlite3.Row + conn.execute("PRAGMA busy_timeout=30000") + try: + conn.execute("PRAGMA journal_mode=WAL") + except sqlite3.OperationalError: + pass + return conn + + def ensure_kline_tables(conn: sqlite3.Connection) -> None: conn.execute( """CREATE TABLE IF NOT EXISTS kline_bars ( diff --git a/modules/market/kline_stream.py b/modules/market/kline_stream.py index 4116a26..9eb0e28 100644 --- a/modules/market/kline_stream.py +++ b/modules/market/kline_stream.py @@ -17,7 +17,7 @@ from typing import Callable, Optional from zoneinfo import ZoneInfo from modules.market.kline_chart import fetch_market_klines, ths_to_sina_chart_symbol -from modules.market.kline_store import is_cache_fresh, load_meta, ensure_kline_tables +from modules.market.kline_store import connect_kline_db, is_cache_fresh, load_meta, ensure_kline_tables from modules.market.market_sessions import is_trading_session logger = logging.getLogger(__name__) @@ -87,8 +87,10 @@ class KlineStreamHub: if is_trading_session() and sub.period in FAST_PERIODS: return True try: - from modules.core.db_conn import connect_db - conn = connect_db(db_path) + from modules.core.paths import KLINE_DB_PATH + + db_path = db_path or KLINE_DB_PATH + conn = connect_kline_db(db_path) ensure_kline_tables(conn) meta = load_meta(conn, chart_sym, sub.period) conn.close() diff --git a/modules/market/routes.py b/modules/market/routes.py index 1be2c5e..8d64c31 100644 --- a/modules/market/routes.py +++ b/modules/market/routes.py @@ -35,6 +35,7 @@ def register(deps) -> None: expire_old_plans = deps.expire_old_plans TZ = deps.tz DB_PATH = deps.db_path + KLINE_DB_PATH = deps.kline_db_path UPLOAD_DIR = deps.upload_dir OPEN_TYPES = deps.open_types EXIT_TRIGGERS = deps.exit_triggers @@ -147,7 +148,7 @@ def register(deps) -> None: from modules.core.trading_context import get_trading_mode data = fetch_market_klines( - symbol, period, DB_PATH, prefer_ctp=False, + symbol, period, KLINE_DB_PATH, prefer_ctp=False, ) except Exception as exc: app.logger.warning("kline api failed: %s", exc) @@ -175,7 +176,7 @@ def register(deps) -> None: sub = kline_hub.subscribe(symbol, period, market_code, sina_code) try: kline_data = fetch_market_klines( - symbol, period, DB_PATH, prefer_ctp=False, + symbol, period, KLINE_DB_PATH, prefer_ctp=False, ) if kline_data.get("bars"): yield sse_format("kline", kline_data) diff --git a/modules/web/static/js/market.js b/modules/web/static/js/market.js index 3b50dda..5abf2f8 100644 --- a/modules/web/static/js/market.js +++ b/modules/web/static/js/market.js @@ -521,6 +521,7 @@ function klineSourceLabel(src) { if (src === 'local') return '本地缓存'; + if (src === 'local_seed') return '本地预下载'; return '新浪'; } diff --git a/modules/web/templates/settings.html b/modules/web/templates/settings.html index 5a26d06..5489167 100644 --- a/modules/web/templates/settings.html +++ b/modules/web/templates/settings.html @@ -398,6 +398,7 @@

当前行情源:{{ quote_label }}
现价、浮盈、关键位等业务数据均使用CTP 柜台行情(需已连接);仅行情页 K 线图表使用新浪接口。
+ 权益 20 万以下四品种(玉米、豆粕、甲醇、螺纹钢)K 线预存在独立库 data/kline.db
合约代码按同花顺格式(如 ag2608、IF2606)。