Add separate kline.db and pre-seed small-account four-product K-lines on startup.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-07-02 15:22:52 +08:00
parent 972ab5d08b
commit 5328673ce8
11 changed files with 215 additions and 33 deletions
+2 -1
View File
@@ -13,9 +13,10 @@ import threading
import time
from typing import Any, Iterable, Optional, Sequence
from modules.core.paths import DB_PATH as _ROOT_DB_PATH
from modules.core.paths import DB_PATH as _ROOT_DB_PATH, KLINE_DB_PATH as _KLINE_DB_PATH
DB_PATH = _ROOT_DB_PATH
KLINE_DB_PATH = _KLINE_DB_PATH
_backend_lock = threading.Lock()
_backend: Optional[str] = None
+1
View File
@@ -29,6 +29,7 @@ class AppDeps:
start_background_threads: Callable
tz: Any
db_path: str
kline_db_path: str
upload_dir: str
open_types: list
exit_triggers: list
+1
View File
@@ -19,6 +19,7 @@ UPLOADS_DIR = ROOT / "uploads"
LOGS_DIR = ROOT / "logs"
DB_PATH = str(ROOT / "futures.db")
KLINE_DB_PATH = str(DATA_DIR / "kline.db")
def ensure_runtime_dirs() -> None:
+30 -20
View File
@@ -17,7 +17,12 @@ import requests
from modules.core.symbols import ths_to_codes
from modules.core.db_conn import connect_db
from modules.market.kline_store import ensure_kline_tables, get_cached_entry, save_bars
from modules.market.kline_store import (
connect_kline_db,
ensure_kline_tables,
get_cached_entry,
save_bars,
)
logger = logging.getLogger(__name__)
TZ = ZoneInfo("Asia/Shanghai")
@@ -261,6 +266,14 @@ def bars_to_api(bars: list) -> list[dict]:
return result
def _resolve_kline_db_path(db_path: Optional[str]) -> str:
if db_path:
return db_path
from modules.core.paths import KLINE_DB_PATH
return KLINE_DB_PATH
def fetch_market_klines(
symbol: str,
period: str,
@@ -270,6 +283,7 @@ def fetch_market_klines(
trading_mode: Optional[str] = None,
prefer_ctp: bool = False,
) -> dict:
db_path = _resolve_kline_db_path(db_path)
chart_sym = ths_to_sina_chart_symbol(symbol)
p = (period or "15m").lower()
if p == "timeshare":
@@ -308,18 +322,21 @@ def fetch_market_klines(
bars = ctp_bars
source = "ctp"
if not bars and db_path and chart_sym and not force_remote and need_sina:
local_cached: Optional[dict] = None
if db_path and chart_sym and not force_remote:
try:
conn = connect_db(db_path)
cached = get_cached_entry(conn, chart_sym, p)
conn = connect_kline_db(db_path)
local_cached = get_cached_entry(conn, chart_sym, p)
conn.close()
if cached and cached.get("fresh"):
bars = cached["bars"]
source = "local"
cached_at = cached.get("updated_at")
except Exception as exc:
logger.warning("kline cache read failed %s %s: %s", chart_sym, p, exc)
if not bars and local_cached and local_cached.get("bars") and need_sina:
if local_cached.get("fresh"):
bars = local_cached["bars"]
source = "local"
cached_at = local_cached.get("updated_at")
if need_sina and (not bars or len(ctp_bars) < MIN_CTP_KLINE_BARS or not prefer_ctp):
remote_bars = fetch_sina_klines(symbol, p)
if remote_bars:
@@ -331,7 +348,7 @@ def fetch_market_klines(
source = "remote"
if db_path and chart_sym and not ctp_connected:
try:
conn = connect_db(db_path)
conn = connect_kline_db(db_path)
ensure_kline_tables(conn)
save_bars(conn, chart_sym, p, remote_bars)
meta = conn.execute(
@@ -342,17 +359,10 @@ def fetch_market_klines(
cached_at = meta[0] if meta else None
except Exception as exc:
logger.warning("kline cache write failed %s %s: %s", chart_sym, p, exc)
elif not bars and db_path and chart_sym:
try:
conn = connect_db(db_path)
cached = get_cached_entry(conn, chart_sym, p)
conn.close()
if cached and cached.get("bars"):
bars = cached["bars"]
source = "local"
cached_at = cached.get("updated_at")
except Exception as exc:
logger.warning("kline cache fallback failed %s %s: %s", chart_sym, p, exc)
elif not bars and local_cached and local_cached.get("bars"):
bars = local_cached["bars"]
source = "local_seed"
cached_at = local_cached.get("updated_at")
api_bars = bars_to_api(bars)
prev_close = None
+137
View File
@@ -0,0 +1,137 @@
# Copyright (c) 2025-2026 马建军. All rights reserved.
# 专有软件 — 未经授权禁止复制、传播、转售。
# 详见 LICENSE.zh-CN.txt
"""20 万以下四品种 K 线预下载(玉米、豆粕、甲醇、螺纹钢)。"""
from __future__ import annotations
import logging
import threading
import time
from typing import Optional
from modules.core.symbols import PRODUCTS, resolve_main_contract
from modules.market.kline_chart import fetch_sina_klines, ths_to_sina_chart_symbol
from modules.market.kline_store import (
connect_kline_db,
ensure_kline_tables,
load_meta,
save_bars,
)
from modules.trading.product_recommend import (
SMALL_ACCOUNT_PRODUCT_THS,
normalize_product_ths,
)
logger = logging.getLogger(__name__)
# 小账户默认预下载周期(行情页 + 关键位常用)
SMALL_ACCOUNT_KLINE_PERIODS = ("5m", "15m", "1h", "d")
MIN_SEED_BARS = 30
_SEED_LOCK = threading.Lock()
_SEED_STARTED = False
def _small_account_products() -> list[dict]:
allowed = {x.upper() for x in SMALL_ACCOUNT_PRODUCT_THS}
out: list[dict] = []
for p in PRODUCTS:
root = normalize_product_ths(p.get("ths") or "")
if root.upper() in allowed:
out.append(p)
return out
def _resolve_main_symbol(product: dict) -> Optional[str]:
try:
from modules.core.symbols import _main_index, _main_index_lock
with _main_index_lock:
cached = (_main_index or {}).get(product.get("sina") or "")
if cached and cached.get("ths_code"):
return str(cached["ths_code"])
except Exception:
pass
try:
main = resolve_main_contract(product)
if main and main.get("ths_code"):
return str(main["ths_code"])
except Exception as exc:
logger.debug("resolve main for seed %s: %s", product.get("ths"), exc)
try:
from modules.core.symbols import _stub_main_contract
stub = _stub_main_contract(product)
if stub and stub.get("ths_code"):
return str(stub["ths_code"])
except Exception:
pass
return None
def seed_small_account_klines(
*,
db_path: Optional[str] = None,
force: bool = False,
) -> dict[str, int]:
"""下载四品种主力合约 K 线到独立库;已存在且充足时跳过。"""
conn = connect_kline_db(db_path)
try:
ensure_kline_tables(conn)
saved: dict[str, int] = {}
for product in _small_account_products():
sym = _resolve_main_symbol(product)
if not sym:
continue
chart_sym = ths_to_sina_chart_symbol(sym)
if not chart_sym:
continue
for period in SMALL_ACCOUNT_KLINE_PERIODS:
key = f"{sym}:{period}"
meta = load_meta(conn, chart_sym, period)
if (
not force
and meta
and int(meta.get("bar_count") or 0) >= MIN_SEED_BARS
):
continue
try:
bars = fetch_sina_klines(sym, period) or []
except Exception as exc:
logger.warning("seed kline fetch failed %s %s: %s", sym, period, exc)
continue
if len(bars) < MIN_SEED_BARS:
logger.debug(
"seed kline too few bars %s %s: %d",
sym, period, len(bars),
)
continue
n = save_bars(conn, chart_sym, period, bars)
saved[key] = n
logger.info("seeded kline %s %s (%d bars)", sym, period, n)
return saved
finally:
conn.close()
def start_small_account_kline_seed(*, db_path: Optional[str] = None, delay_sec: float = 8.0) -> None:
"""后台预下载(仅执行一次)。"""
global _SEED_STARTED
with _SEED_LOCK:
if _SEED_STARTED:
return
_SEED_STARTED = True
def _run() -> None:
if delay_sec > 0:
time.sleep(delay_sec)
try:
saved = seed_small_account_klines(db_path=db_path)
if saved:
logger.info("small-account kline seed done: %s", ", ".join(saved.keys()))
else:
logger.debug("small-account kline seed: nothing new to download")
except Exception as exc:
logger.warning("small-account kline seed failed: %s", exc)
threading.Thread(target=_run, daemon=True, name="kline-seed").start()
+20 -1
View File
@@ -3,9 +3,10 @@
# 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。
# 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md
"""K 线本地 SQLite 缓存。"""
"""K 线本地 SQLite 缓存(独立库 data/kline.db,与业务 futures.db 分离)"""
from __future__ import annotations
import os
import sqlite3
from datetime import datetime, timedelta
from typing import Optional
@@ -27,6 +28,24 @@ REFRESH_SECONDS = {
}
def connect_kline_db(path: Optional[str] = None) -> sqlite3.Connection:
"""K 线专用 SQLite(生产环境业务库可为 PostgreSQL,K 线仍走本地文件)。"""
from modules.core.paths import KLINE_DB_PATH
db_path = path or KLINE_DB_PATH
parent = os.path.dirname(db_path)
if parent:
os.makedirs(parent, exist_ok=True)
conn = sqlite3.connect(db_path, timeout=30, check_same_thread=False)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA busy_timeout=30000")
try:
conn.execute("PRAGMA journal_mode=WAL")
except sqlite3.OperationalError:
pass
return conn
def ensure_kline_tables(conn: sqlite3.Connection) -> None:
conn.execute(
"""CREATE TABLE IF NOT EXISTS kline_bars (
+5 -3
View File
@@ -17,7 +17,7 @@ from typing import Callable, Optional
from zoneinfo import ZoneInfo
from modules.market.kline_chart import fetch_market_klines, ths_to_sina_chart_symbol
from modules.market.kline_store import is_cache_fresh, load_meta, ensure_kline_tables
from modules.market.kline_store import connect_kline_db, is_cache_fresh, load_meta, ensure_kline_tables
from modules.market.market_sessions import is_trading_session
logger = logging.getLogger(__name__)
@@ -87,8 +87,10 @@ class KlineStreamHub:
if is_trading_session() and sub.period in FAST_PERIODS:
return True
try:
from modules.core.db_conn import connect_db
conn = connect_db(db_path)
from modules.core.paths import KLINE_DB_PATH
db_path = db_path or KLINE_DB_PATH
conn = connect_kline_db(db_path)
ensure_kline_tables(conn)
meta = load_meta(conn, chart_sym, sub.period)
conn.close()
+3 -2
View File
@@ -35,6 +35,7 @@ def register(deps) -> None:
expire_old_plans = deps.expire_old_plans
TZ = deps.tz
DB_PATH = deps.db_path
KLINE_DB_PATH = deps.kline_db_path
UPLOAD_DIR = deps.upload_dir
OPEN_TYPES = deps.open_types
EXIT_TRIGGERS = deps.exit_triggers
@@ -147,7 +148,7 @@ def register(deps) -> None:
from modules.core.trading_context import get_trading_mode
data = fetch_market_klines(
symbol, period, DB_PATH, prefer_ctp=False,
symbol, period, KLINE_DB_PATH, prefer_ctp=False,
)
except Exception as exc:
app.logger.warning("kline api failed: %s", exc)
@@ -175,7 +176,7 @@ def register(deps) -> None:
sub = kline_hub.subscribe(symbol, period, market_code, sina_code)
try:
kline_data = fetch_market_klines(
symbol, period, DB_PATH, prefer_ctp=False,
symbol, period, KLINE_DB_PATH, prefer_ctp=False,
)
if kline_data.get("bars"):
yield sse_format("kline", kline_data)
+1
View File
@@ -521,6 +521,7 @@
function klineSourceLabel(src) {
if (src === 'local') return '本地缓存';
if (src === 'local_seed') return '本地预下载';
return '新浪';
}
+1
View File
@@ -398,6 +398,7 @@
<p class="hint" style="font-size:.88rem;line-height:1.6;margin:0">
当前行情源:<strong class="text-accent">{{ quote_label }}</strong><br>
现价、浮盈、关键位等业务数据均使用<strong>CTP 柜台行情</strong>(需已连接);仅行情页 K 线图表使用新浪接口。<br>
权益 20 万以下四品种(玉米、豆粕、甲醇、螺纹钢)K 线预存在独立库 <code>data/kline.db</code><br>
合约代码按同花顺格式(如 ag2608、IF2606)。
</p>
</div>