"""交易复盘 / 订单 K 线拼图(Binance / Gate / OKX 共用)。""" import math try: from PIL import Image, ImageDraw, ImageFont except ImportError: Image = None # type: ignore ImageDraw = None # type: ignore ImageFont = None # type: ignore JOURNAL_CHART_TF_CHOICES = ("1m", "3m", "5m", "15m", "30m", "1h", "2h", "4h", "6h", "12h", "1d") JOURNAL_CHART_DEFAULT_TF1 = "15m" JOURNAL_CHART_DEFAULT_TF2 = "1h" JOURNAL_CHART_DEFAULT_LIMIT = 300 JOURNAL_CHART_LIMIT_MIN = 50 JOURNAL_CHART_LIMIT_MAX = 500 def _load_font(size): if not ImageFont: return None for name in ("msyh.ttc", "Microsoft YaHei.ttf", "arial.ttf", "Arial.ttf"): try: return ImageFont.truetype(name, size) except Exception: continue try: return ImageFont.load_default() except Exception: return None def ohlcv_to_rows(ohlcv): rows = [] for bar in ohlcv or []: if not bar or len(bar) < 6: continue try: rows.append( { "ts": int(bar[0]), "o": float(bar[1]), "h": float(bar[2]), "l": float(bar[3]), "c": float(bar[4]), "v": float(bar[5]), } ) except Exception: continue return rows def marker_tag_label(tag): t = str(tag or "").strip().upper() if t == "ENTRY": return "开仓" if t == "EXIT": return "平仓" if t == "STOP": return "止损" return str(tag or "") def pick_marker_point(rows, target_ts_ms, target_price=None): if not rows or target_ts_ms is None: return None, None idx = min(range(len(rows)), key=lambda i: abs(int(rows[i]["ts"]) - int(target_ts_ms))) if target_price is not None: try: p = float(target_price) if p > 0: return idx, p except Exception: pass return idx, float(rows[idx]["c"]) def parse_positive_price(raw): if raw is None: return None s = str(raw).strip() if not s: return None try: p = float(s) return p if p > 0 else None except (TypeError, ValueError): return None def parse_journal_chart_limit(raw, fallback=None): fb = int(fallback if fallback is not None else JOURNAL_CHART_DEFAULT_LIMIT) try: n = int(str(raw or "").strip() or fb) except (TypeError, ValueError): n = fb return max(JOURNAL_CHART_LIMIT_MIN, min(JOURNAL_CHART_LIMIT_MAX, n)) def normalize_chart_timeframe(raw): tf = str(raw or "").strip().lower() if tf in JOURNAL_CHART_TF_CHOICES: return tf return "" def parse_journal_chart_timeframes(tf1, tf2, fallback_tfs=None): """复盘表单:最多两个周期,去重保序。""" out = [] for raw in (tf1, tf2): tf = normalize_chart_timeframe(raw) if tf and tf not in out: out.append(tf) if out: return out[:2] fb = [normalize_chart_timeframe(x) for x in (fallback_tfs or (JOURNAL_CHART_DEFAULT_TF1, JOURNAL_CHART_DEFAULT_TF2))] fb = [x for x in fb if x] return fb[:2] if fb else [JOURNAL_CHART_DEFAULT_TF1, JOURNAL_CHART_DEFAULT_TF2] def marker_points_for_timeframe(rows, marker_payload): points = [] if not marker_payload or not rows: return points entry_idx, entry_price = pick_marker_point( rows, marker_payload.get("entry_ts_ms"), marker_payload.get("entry_price") ) exit_idx, exit_price = pick_marker_point( rows, marker_payload.get("exit_ts_ms"), marker_payload.get("exit_price") ) if entry_idx is not None and entry_price is not None: points.append({"idx": entry_idx, "price": entry_price, "tag": "ENTRY"}) if exit_idx is not None and exit_price is not None: points.append({"idx": exit_idx, "price": exit_price, "tag": "EXIT"}) return points def price_levels_from_marker_payload(marker_payload): levels = [] if not marker_payload: return levels sl = parse_positive_price(marker_payload.get("stop_loss_price")) if sl is not None: levels.append({"price": sl, "label": "止损", "color": (255, 152, 0)}) return levels def render_candles_subplot( rows, title, width, height, bg_rgb=(255, 255, 255), marker_points=None, price_levels=None, ): if not Image or not ImageDraw: raise RuntimeError("缺少依赖:Pillow(pip install Pillow)") img = Image.new("RGB", (width, height), bg_rgb) draw = ImageDraw.Draw(img) font = _load_font(14) small = _load_font(12) pad_l, pad_r, pad_t, pad_b = 46, 12, 26, 28 plot_w = max(10, width - pad_l - pad_r) plot_h = max(10, height - pad_t - pad_b) header_bg = (245, 247, 250) draw.rectangle((0, 0, width, pad_t), fill=header_bg) if font: draw.text((10, 6), title, fill=(25, 35, 60), font=font) else: draw.text((10, 6), title, fill=(25, 35, 60)) if not rows: if small: draw.text((pad_l, pad_t + 10), "无K线数据", fill=(90, 100, 120), font=small) else: draw.text((pad_l, pad_t + 10), "无K线数据", fill=(90, 100, 120)) return img lo = min(r["l"] for r in rows) hi = max(r["h"] for r in rows) for pl in price_levels or []: try: p = float(pl.get("price")) if p > 0: lo = min(lo, p) hi = max(hi, p) except (TypeError, ValueError): pass if hi <= lo: hi = lo + 1e-12 n = len(rows) marker_by_idx = {} for mp in marker_points or []: try: idx = int(mp.get("idx")) except Exception: continue if idx < 0 or idx >= n: continue marker_by_idx.setdefault(idx, []).append(mp) x0 = pad_l for i, r in enumerate(rows): x1 = pad_l + int((i + 1) * plot_w / n) x_mid = (x0 + x1) // 2 wick_x = x_mid y_high = pad_t + int((hi - r["h"]) / (hi - lo) * plot_h) y_low = pad_t + int((hi - r["l"]) / (hi - lo) * plot_h) y_open = pad_t + int((hi - r["o"]) / (hi - lo) * plot_h) y_close = pad_t + int((hi - r["c"]) / (hi - lo) * plot_h) top = min(y_open, y_close) bot = max(y_open, y_close) up = r["c"] >= r["o"] wick_color = (120, 120, 120) edge_color = (20, 20, 20) draw.line((wick_x, y_high, wick_x, y_low), fill=wick_color) body_w = max(1, (x1 - x0) - 2) left = x0 + 1 if bot - top < 2: mid = (top + bot) // 2 draw.rectangle((left, mid, left + body_w, mid + 1), fill=edge_color) else: if up: draw.rectangle((left, top, left + body_w, bot), fill=(255, 255, 255), outline=edge_color, width=1) else: draw.rectangle((left, top, left + body_w, bot), fill=edge_color, outline=edge_color, width=1) for j, mp in enumerate(marker_by_idx.get(i, [])): tag = str(mp.get("tag") or "") label = marker_tag_label(tag) m_price = float(mp.get("price") or r["c"]) y_m = pad_t + int((hi - m_price) / (hi - lo) * plot_h) y_m = max(pad_t + 4, min(pad_t + plot_h - 4, y_m)) x_off = (j - (len(marker_by_idx[i]) - 1) / 2.0) * 14 x_draw = int(x_mid + x_off) if tag == "ENTRY": m_color = (0, 195, 95) tri = [(x_draw, y_m - 20), (x_draw - 9, y_m - 4), (x_draw + 9, y_m - 4)] text_y = y_m - 36 else: m_color = (235, 65, 65) tri = [(x_draw, y_m + 20), (x_draw - 9, y_m + 4), (x_draw + 9, y_m + 4)] text_y = y_m + 12 draw.ellipse((x_draw - 5, y_m - 5, x_draw + 5, y_m + 5), fill=m_color, outline=(255, 255, 255), width=1) draw.polygon(tri, fill=m_color) draw.line((x_draw, y_m, x_draw, y_m - 16 if tag == "ENTRY" else y_m + 16), fill=m_color, width=3) if font: draw.text((x_draw + 8, text_y), label, fill=m_color, font=font) else: draw.text((x_draw + 8, text_y), label, fill=m_color) x0 = x1 x_right = pad_l + plot_w for pl in price_levels or []: try: p = float(pl.get("price")) except (TypeError, ValueError): continue if p <= 0: continue y_sl = pad_t + int((hi - p) / (hi - lo) * plot_h) color = tuple(pl.get("color") or (255, 152, 0)) label = str(pl.get("label") or "止损") for xx in range(pad_l, x_right, 10): draw.line((xx, y_sl, min(xx + 6, x_right), y_sl), fill=color, width=2) if font: draw.text((x_right - 72, y_sl - 18), label, fill=color, font=small or font) else: draw.text((x_right - 72, y_sl - 18), label, fill=color) if len(marker_points or []) >= 2: try: entry = next((m for m in marker_points if m.get("tag") == "ENTRY"), None) exitp = next((m for m in marker_points if m.get("tag") == "EXIT"), None) if entry is not None and exitp is not None: ex_i, ex_p = int(entry["idx"]), float(entry["price"]) xx_i, xx_p = int(exitp["idx"]), float(exitp["price"]) x_ex = pad_l + int((ex_i + 0.5) * plot_w / n) x_xx = pad_l + int((xx_i + 0.5) * plot_w / n) y_ex = pad_t + int((hi - ex_p) / (hi - lo) * plot_h) y_xx = pad_t + int((hi - xx_p) / (hi - lo) * plot_h) draw.line((x_ex, y_ex, x_xx, y_xx), fill=(35, 135, 255), width=3) except Exception: pass if small: draw.text((width - 210, height - 22), f"L={lo:.6g} H={hi:.6g}", fill=(120, 125, 135), font=small) return img def compose_chart_panels(panels, layout="grid", cell_w=980, cell_h=520, gap=10): if not panels or not Image: return None if layout == "vertical": cols = 1 rows_n = len(panels) else: cols = 2 rows_n = int(math.ceil(len(panels) / cols)) w = cols * cell_w + (cols - 1) * gap h = rows_n * cell_h + (rows_n - 1) * gap out = Image.new("RGB", (w, h), (255, 255, 255)) idx = 0 for r in range(rows_n): for c in range(cols): if idx >= len(panels): break x = c * (cell_w + gap) y = r * (cell_h + gap) out.paste(panels[idx], (x, y)) idx += 1 if ImageDraw and layout != "vertical" and rows_n >= 1: draw_out = ImageDraw.Draw(out) line_col = (220, 225, 232) x_mid = cell_w + gap // 2 if w > x_mid >= 0: draw_out.line((x_mid, 0, x_mid, h), fill=line_col, width=2) for rr in range(1, rows_n): y_mid = rr * cell_h + (rr - 1) * gap + gap // 2 if 0 <= y_mid <= h: draw_out.line((0, y_mid, w, y_mid), fill=line_col, width=2) elif ImageDraw and layout == "vertical" and rows_n >= 2: draw_out = ImageDraw.Draw(out) line_col = (220, 225, 232) for rr in range(1, rows_n): y_mid = rr * cell_h + (rr - 1) * gap + gap // 2 if 0 <= y_mid <= h: draw_out.line((0, y_mid, w, y_mid), fill=line_col, width=2) return out