# Copyright (c) 2025-2026 马建军. All rights reserved. # 专有软件 — 未经授权禁止复制、传播、转售。 # 严禁用于:带单/代客理财、向他人推荐期货品种或买卖建议、融资配资等业务。 # 详见 LICENSE.zh-CN.txt 与 docs/软件购买与使用协议.md """复盘 K 线:新浪拉取 + matplotlib 生成截图。""" import json import logging import os import re import sqlite3 from datetime import datetime from typing import Optional from zoneinfo import ZoneInfo import requests from symbols import ths_to_codes from db_conn import connect_db from kline_store import ensure_kline_tables, get_cached_entry, save_bars logger = logging.getLogger(__name__) TZ = ZoneInfo("Asia/Shanghai") # CTP tick 聚合 bar 少于此数时,用新浪历史补齐走势 MIN_CTP_KLINE_BARS = 15 PERIOD_MINUTES = { "1m": "1", "3m": "3", "5m": "5", "15m": "15", "30m": "30", "1h": "60", "4h": "240", } MARKET_PERIODS = [ {"key": "timeshare", "label": "分时"}, {"key": "1m", "label": "1分"}, {"key": "2m", "label": "2分"}, {"key": "5m", "label": "5分"}, {"key": "15m", "label": "15分"}, {"key": "1h", "label": "1小时"}, {"key": "2h", "label": "2小时"}, {"key": "4h", "label": "4小时"}, {"key": "d", "label": "日线"}, {"key": "w", "label": "周线"}, ] def ths_to_sina_chart_symbol(symbol: str) -> Optional[str]: """ag2608 -> AG2608(新浪 K 线接口合约代码)。""" code = (symbol or "").strip() if not code: return None codes = ths_to_codes(code) if codes: sina = codes.get("sina_code", "") if sina.startswith("nf_"): return sina[3:] if sina.startswith("CFF_RE_"): return sina[7:] ths = codes.get("ths_code", "") return ths.upper() if ths else None m = re.match(r"^([A-Za-z]+)(\d+)$", code) if m: return m.group(1).upper() + m.group(2) return None def _parse_jsonp(text: str) -> Optional[list]: m = re.search(r"\((.*)\)\s*;?\s*$", text.strip(), re.DOTALL) if not m: return None try: data = json.loads(m.group(1)) return data if isinstance(data, list) else None except json.JSONDecodeError: return None def fetch_sina_klines(symbol: str, period: str) -> list: """拉取新浪期货 K 线(原始 bar 列表)。""" chart_sym = ths_to_sina_chart_symbol(symbol) if not chart_sym: return [] p = (period or "").lower() if p in ("1d", "d"): return _fetch_sina_daily(chart_sym) if p == "w": return _weekly_from_daily(_fetch_sina_daily(chart_sym)) if p == "timeshare": bars = _fetch_few_min_line(chart_sym, "1") return _timeshare_session(bars) if p == "2m": return _aggregate_bars(_fetch_few_min_line(chart_sym, "1"), 2) if p == "2h": return _aggregate_bars(_fetch_few_min_line(chart_sym, "60"), 2) typ = PERIOD_MINUTES.get(p) if typ: return _fetch_few_min_line(chart_sym, typ) return [] def _fetch_few_min_line(chart_sym: str, typ: str) -> list: ts = datetime.now(TZ).strftime("%Y%m%d%H%M%S") url = ( "https://stock2.finance.sina.com.cn/futures/api/jsonp.php/" f"var_{chart_sym}_{typ}_{ts}=/InnerFuturesNewService.getFewMinLine" f"?symbol={chart_sym}&type={typ}" ) try: resp = requests.get( url, timeout=20, headers={"Referer": "https://finance.sina.com.cn"}, ) bars = _parse_jsonp(resp.text) return _normalize_bars(bars or []) except Exception as exc: logger.warning("fetch kline failed %s %s: %s", chart_sym, typ, exc) return [] def _normalize_bars(raw: list) -> list: out = [] for row in raw: if isinstance(row, list) and len(row) >= 5: out.append({ "d": str(row[0]), "o": float(row[1]), "h": float(row[2]), "l": float(row[3]), "c": float(row[4]), "v": float(row[5]) if len(row) > 5 and row[5] else 0.0, }) elif isinstance(row, dict) and row.get("d"): out.append({ "d": str(row["d"]), "o": float(row.get("o", 0) or 0), "h": float(row.get("h", 0) or 0), "l": float(row.get("l", 0) or 0), "c": float(row.get("c", 0) or 0), "v": float(row.get("v", 0) or 0), }) return out def _aggregate_bars(bars: list, n: int) -> list: if n <= 1 or not bars: return bars out = [] chunk: list = [] for bar in bars: chunk.append(bar) if len(chunk) >= n: out.append(_merge_bars(chunk)) chunk = [] if chunk: out.append(_merge_bars(chunk)) return out def _merge_bars(chunk: list) -> dict: return { "d": chunk[0]["d"], "o": chunk[0]["o"], "h": max(b["h"] for b in chunk), "l": min(b["l"] for b in chunk), "c": chunk[-1]["c"], "v": sum(b.get("v", 0) for b in chunk), } def _merge_kline_bars(history: list, live: list) -> list: """新浪历史 + CTP 实时尾部(去重叠)。""" if not history: return list(live or []) if not live: return list(history) first_live = _bar_datetime(live[0]) if not first_live: return history + live trimmed = [] for bar in history: dt = _bar_datetime(bar) if dt and dt < first_live: trimmed.append(bar) merged = trimmed + list(live) return merged if merged else list(history) def _weekly_from_daily(daily: list) -> list: if not daily: return [] buckets: dict[tuple, list] = {} for bar in daily: dt = _bar_datetime(bar) if not dt: continue iso = dt.isocalendar() key = (iso[0], iso[1]) buckets.setdefault(key, []).append(bar) out = [] for key in sorted(buckets.keys()): chunk = buckets[key] out.append(_merge_bars(chunk)) out[-1]["d"] = chunk[-1]["d"] return out def _timeshare_session(bars: list) -> list: if not bars: return [] today = datetime.now(TZ).date() session = [] for bar in bars: dt = _bar_datetime(bar) if dt and dt.date() == today: session.append(bar) if session: return session[-480:] return bars[-480:] def bars_to_api(bars: list) -> list[dict]: """转为前端图表 JSON(去重、排序、数值规范化)。""" result: list[dict] = [] seen: dict[int, dict] = {} for bar in bars: dt = _bar_datetime(bar) ts = int(dt.timestamp() * 1000) if dt else None try: o = float(bar.get("o") or 0) h = float(bar.get("h") or o) l = float(bar.get("l") or o) c = float(bar.get("c") or o) v = float(bar.get("v") or 0) except (TypeError, ValueError): continue if h < l: h, l = l, h h = max(h, o, c) l = min(l, o, c) row = { "time": bar["d"], "timestamp": ts, "open": o, "high": h, "low": l, "close": c, "volume": v, } if ts is not None: seen[ts] = row else: result.append(row) if seen: result = [seen[k] for k in sorted(seen.keys())] return result def fetch_market_klines( symbol: str, period: str, db_path: Optional[str] = None, force_remote: bool = False, *, trading_mode: Optional[str] = None, prefer_ctp: bool = False, ) -> dict: chart_sym = ths_to_sina_chart_symbol(symbol) p = (period or "15m").lower() if p == "timeshare": chart_type = "line" else: chart_type = "candle" bars: list = [] source = "remote" cached_at = None ctp_connected = False ctp_bars: list = [] if prefer_ctp: try: from ctp_kline import fetch_ctp_klines from vnpy_bridge import ctp_status mode = trading_mode if not mode: try: from app import get_setting from trading_context import get_trading_mode mode = get_trading_mode(get_setting) except Exception: mode = "simulation" ctp_connected = bool(ctp_status(mode).get("connected")) if ctp_connected: ctp_bars = fetch_ctp_klines(symbol, p, mode) or [] except Exception as exc: logger.debug("ctp kline fetch failed %s %s: %s", symbol, p, exc) need_sina = force_remote or not prefer_ctp or not ctp_bars or len(ctp_bars) < MIN_CTP_KLINE_BARS if ctp_bars and len(ctp_bars) >= MIN_CTP_KLINE_BARS: bars = ctp_bars source = "ctp" if not bars and db_path and chart_sym and not force_remote and need_sina: try: conn = connect_db(db_path) cached = get_cached_entry(conn, chart_sym, p) conn.close() if cached and cached.get("fresh"): bars = cached["bars"] source = "local" cached_at = cached.get("updated_at") except Exception as exc: logger.warning("kline cache read failed %s %s: %s", chart_sym, p, exc) if not bars or len(ctp_bars) < MIN_CTP_KLINE_BARS or not prefer_ctp: remote_bars = fetch_sina_klines(symbol, p) if remote_bars: if prefer_ctp and ctp_bars and ctp_connected: bars = _merge_kline_bars(remote_bars, ctp_bars) source = "ctp+remote" else: bars = remote_bars source = "remote" if db_path and chart_sym and not ctp_connected: try: conn = connect_db(db_path) ensure_kline_tables(conn) save_bars(conn, chart_sym, p, remote_bars) meta = conn.execute( "SELECT updated_at FROM kline_meta WHERE chart_symbol=? AND period=?", (chart_sym, p), ).fetchone() conn.close() cached_at = meta[0] if meta else None except Exception as exc: logger.warning("kline cache write failed %s %s: %s", chart_sym, p, exc) elif not bars and db_path and chart_sym: try: conn = connect_db(db_path) cached = get_cached_entry(conn, chart_sym, p) conn.close() if cached and cached.get("bars"): bars = cached["bars"] source = "local" cached_at = cached.get("updated_at") except Exception as exc: logger.warning("kline cache fallback failed %s %s: %s", chart_sym, p, exc) api_bars = bars_to_api(bars) prev_close = None if len(api_bars) >= 2: prev_close = api_bars[-2]["close"] return { "symbol": symbol, "chart_symbol": chart_sym, "period": p, "chart_type": chart_type, "count": len(bars), "bars": api_bars, "prev_close": prev_close, "source": source, "cached_at": cached_at, "ctp_connected": ctp_connected, } def _fetch_sina_daily(chart_sym: str) -> list: url = ( "https://stock2.finance.sina.com.cn/futures/api/json.php/" f"IndexService.getInnerFuturesDailyKLine?symbol={chart_sym}" ) try: resp = requests.get(url, timeout=20, headers={"Referer": "https://finance.sina.com.cn"}) raw = resp.json() if raw and isinstance(raw, list): bars = _normalize_bars(raw) if bars: return bars except Exception as exc: logger.warning("fetch daily kline failed %s: %s", chart_sym, exc) return _daily_from_minutes(chart_sym) def _daily_from_minutes(chart_sym: str) -> list: """合约日线接口无数据时,由 60 分钟 K 线按日合成。""" bars_60 = _fetch_few_min_line(chart_sym, "60") if not bars_60: bars_60 = _fetch_few_min_line(chart_sym, "240") buckets: dict[str, list] = {} for bar in bars_60: dt = _bar_datetime(bar) if not dt: continue key = dt.strftime("%Y-%m-%d") buckets.setdefault(key, []).append(bar) out = [] for day in sorted(buckets.keys()): chunk = buckets[day] merged = _merge_bars(chunk) merged["d"] = day + " 15:00:00" out.append(merged) return out def _parse_dt(value: str) -> Optional[datetime]: if not value: return None v = value.strip().replace("T", " ") for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M"): try: return datetime.strptime(v, fmt).replace(tzinfo=TZ) except ValueError: continue try: return datetime.fromisoformat(value.strip()).replace(tzinfo=TZ) except ValueError: return None def _bar_datetime(bar: dict) -> Optional[datetime]: d = bar.get("d") if not d: return None try: return datetime.strptime(d, "%Y-%m-%d %H:%M:%S").replace(tzinfo=TZ) except ValueError: return None def _select_bars( bars: list, cutoff: datetime, count: int, ) -> list: filtered = [] for bar in bars: dt = _bar_datetime(bar) if dt and dt <= cutoff: filtered.append(bar) if not filtered: filtered = bars if count > 0 and len(filtered) > count: filtered = filtered[-count:] return filtered def generate_review_kline_chart( symbol: str, periods: list[str], count: int, cutoff_label: str, open_time: str, close_time: str, entry_price: Optional[float], stop_loss: Optional[float], take_profit: Optional[float], close_price: Optional[float], upload_dir: str, ) -> Optional[str]: """生成双周期 K 线复盘图,返回 uploads 目录下的文件名。""" import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import matplotlib.dates as mdates now = datetime.now(TZ) if cutoff_label == "开仓时间": cutoff = _parse_dt(open_time) or now elif cutoff_label == "当前时间": cutoff = now else: cutoff = _parse_dt(close_time) or now open_dt = _parse_dt(open_time) close_dt = _parse_dt(close_time) valid_periods = [p for p in periods if p] if not valid_periods: valid_periods = ["15m", "1h"] fig, axes = plt.subplots( len(valid_periods), 1, figsize=(14, 4.5 * len(valid_periods)), facecolor="#0a0a10", squeeze=False, ) plotted = False for idx, period in enumerate(valid_periods): ax = axes[idx, 0] bars = fetch_sina_klines(symbol, period) bars = _select_bars(bars, cutoff, count) if not bars: ax.set_facecolor("#12121a") ax.text(0.5, 0.5, f"No {period} data", ha="center", va="center", color="#888") ax.set_xticks([]) ax.set_yticks([]) continue times = [_bar_datetime(b) for b in bars] closes = [float(b["c"]) for b in bars] highs = [float(b["h"]) for b in bars] lows = [float(b["l"]) for b in bars] ax.set_facecolor("#12121a") ax.plot(times, closes, color="#4cc2ff", linewidth=1.2) ax.fill_between( times, lows, highs, color="#4cc2ff", alpha=0.12, ) levels = [ (entry_price, "#eac147", "Entry"), (stop_loss, "#ff6666", "SL"), (take_profit, "#4cd97f", "TP"), (close_price, "#c4c4ff", "Close"), ] for price, color, label in levels: if price is not None: ax.axhline(price, color=color, linewidth=0.9, linestyle="--", alpha=0.85) ax.text(times[-1], price, label, color=color, fontsize=8, va="bottom") if open_dt: ax.axvline(open_dt, color="#888", linewidth=0.8, linestyle=":", alpha=0.7) if close_dt: ax.axvline(close_dt, color="#aaa", linewidth=0.8, linestyle=":", alpha=0.7) chart_sym = ths_to_sina_chart_symbol(symbol) or symbol ax.set_title(f"{chart_sym} {period}", color="#eaeaea", fontsize=11, pad=8) ax.tick_params(colors="#888", labelsize=8) for spine in ax.spines.values(): spine.set_color("#2e2e45") ax.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d %H:%M")) ax.grid(True, color="#1e1e30", linewidth=0.5) plotted = True if not plotted: plt.close(fig) return None fig.tight_layout() ts = datetime.now(TZ).strftime("%Y%m%d%H%M%S") chart_sym = ths_to_sina_chart_symbol(symbol) or "chart" filename = f"{ts}_kline_{chart_sym}.png" path = os.path.join(upload_dir, filename) fig.savefig(path, dpi=120, facecolor=fig.get_facecolor()) plt.close(fig) return filename