去掉大模型

This commit is contained in:
dekun
2026-05-26 10:20:45 +08:00
parent 1845018151
commit b91721d315
13 changed files with 39 additions and 962 deletions
-66
View File
@@ -1,66 +0,0 @@
"""服务端生成日K+成交量 PNG,供大模型视觉解读。"""
import io
from datetime import datetime
from .kline_store import get_daily_candles
async def render_daily_chart_png_async(symbol: str, limit: int = 300) -> bytes:
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
candles, _ = await get_daily_candles(symbol, limit)
if not candles:
raise ValueError(f"no klines for {symbol}")
times = [datetime.fromtimestamp(c["time"] / 1000) for c in candles]
opens = [c["open"] for c in candles]
highs = [c["high"] for c in candles]
lows = [c["low"] for c in candles]
closes = [c["close"] for c in candles]
vols = [c.get("quote_volume") or c.get("volume") or 0 for c in candles]
fig, (ax1, ax2) = plt.subplots(
2, 1, figsize=(12, 7), gridspec_kw={"height_ratios": [3, 1]}, facecolor="#0d1118"
)
fig.subplots_adjust(hspace=0.08)
for i in range(len(candles)):
t = times[i]
o, h, l, cl = opens[i], highs[i], lows[i], closes[i]
color = "#0ecb81" if cl >= o else "#f6465d"
ax1.plot([t, t], [l, h], color=color, linewidth=0.8)
ax1.add_patch(
plt.Rectangle(
(mdates.date2num(t) - 0.3, min(o, cl)),
0.6,
abs(cl - o) or 0.001,
facecolor=color,
edgecolor=color,
)
)
ax1.set_facecolor("#0d1118")
ax1.tick_params(colors="#8b9cb3")
ax1.set_title(f"{symbol} 日K + 成交量", color="#e7ecf3", fontsize=14)
ax1.grid(True, alpha=0.2)
colors_vol = ["#0ecb81" if closes[i] >= opens[i] else "#f6465d" for i in range(len(candles))]
ax2.bar(times, vols, color=colors_vol, alpha=0.7, width=0.8)
ax2.set_facecolor("#0d1118")
ax2.tick_params(colors="#8b9cb3")
ax2.set_ylabel("成交额", color="#8b9cb3")
ax2.grid(True, alpha=0.2)
for ax in (ax1, ax2):
ax.xaxis.set_major_formatter(mdates.DateFormatter("%m-%d"))
fig.autofmt_xdate()
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=120, facecolor="#0d1118")
plt.close(fig)
buf.seek(0)
return buf.read()
-5
View File
@@ -37,11 +37,6 @@ class Settings(BaseSettings):
proxy_enabled: bool = False
proxy_url: str = "socks5h://192.168.8.4:1081"
proxy_for: str = "binance" # binance | wecom | all
llm_base_url: str = "http://op.bz121.com"
llm_api_key: str = ""
llm_model: str = "gemma4:e4b"
llm_symbol_interval_sec: int = 180
llm_auto_on_startup: bool = True
settings = Settings()
-91
View File
@@ -97,16 +97,6 @@ def init_db() -> None:
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS llm_interpretations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
batch_id TEXT NOT NULL,
content TEXT NOT NULL,
created_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_llm_symbol_batch
ON llm_interpretations(symbol, batch_id);
"""
)
@@ -364,87 +354,6 @@ def save_funding_current_bulk(data: dict[str, dict[str, Any]]) -> None:
)
def save_llm_interpretation(symbol: str, batch_id: str, content: str) -> None:
with get_conn() as conn:
conn.execute(
"""
INSERT INTO llm_interpretations (symbol, batch_id, content, created_at)
VALUES (?, ?, ?, ?)
""",
(symbol.upper(), batch_id, content, datetime.now().isoformat()),
)
def get_llm_interpretations(batch_id: str | None = None, limit: int = 50) -> list[dict[str, Any]]:
with get_conn() as conn:
if batch_id:
rows = conn.execute(
"""
SELECT symbol, batch_id, content, created_at
FROM llm_interpretations
WHERE batch_id = ?
ORDER BY id DESC
LIMIT ?
""",
(batch_id, limit),
).fetchall()
else:
rows = conn.execute(
"""
SELECT symbol, batch_id, content, created_at
FROM llm_interpretations
WHERE batch_id = (
SELECT batch_id FROM llm_interpretations ORDER BY id DESC LIMIT 1
)
ORDER BY id ASC
LIMIT ?
""",
(limit,),
).fetchall()
return [
{
"symbol": r["symbol"],
"batch_id": r["batch_id"],
"content": r["content"],
"created_at": r["created_at"],
}
for r in rows
]
def get_llm_interpretation(symbol: str, batch_id: str | None = None) -> dict[str, Any] | None:
sym = symbol.upper()
with get_conn() as conn:
if batch_id:
row = conn.execute(
"""
SELECT symbol, batch_id, content, created_at
FROM llm_interpretations
WHERE symbol = ? AND batch_id = ?
ORDER BY id DESC LIMIT 1
""",
(sym, batch_id),
).fetchone()
else:
row = conn.execute(
"""
SELECT symbol, batch_id, content, created_at
FROM llm_interpretations
WHERE symbol = ?
ORDER BY id DESC LIMIT 1
""",
(sym,),
).fetchone()
if not row:
return None
return {
"symbol": row["symbol"],
"batch_id": row["batch_id"],
"content": row["content"],
"created_at": row["created_at"],
}
def was_pushed_today(period_start: str, period_end: str) -> bool:
with get_conn() as conn:
row = conn.execute(
-206
View File
@@ -1,206 +0,0 @@
"""大模型解读(OpenAI 兼容接口 + 图表图片)。"""
import asyncio
import base64
import logging
from datetime import datetime
import httpx
from .chart_image import render_daily_chart_png_async
from .config import settings
from .db import save_llm_interpretation
from .stats import compute_three_day_stats
logger = logging.getLogger(__name__)
_interpret_lock = asyncio.Lock()
_interpret_state: dict = {
"running": False,
"current_symbol": "",
"done": 0,
"total": 0,
"batch_id": "",
"last_error": "",
}
def get_interpret_state() -> dict:
return dict(_interpret_state)
def init_interpret_batch() -> dict:
"""同步初始化批次(API 立即返回 batch_id,避免前端刷新拉错旧批次)。"""
if _interpret_lock.locked() or _interpret_state.get("running"):
return {"ok": False, "message": "解读任务进行中", **get_interpret_state()}
stats = compute_three_day_stats()
if not stats.get("ok"):
return {"ok": False, "message": stats.get("message", "统计数据未就绪")}
sym_list = stats.get("symbols") or [x["symbol"] for x in stats.get("items", [])]
if not sym_list:
return {"ok": False, "message": "三日交集为空"}
bid = datetime.now().strftime("%Y-%m-%d-%H%M")
_interpret_state.update(
{
"running": True,
"current_symbol": "",
"done": 0,
"total": len(sym_list),
"batch_id": bid,
"last_error": "",
}
)
return {"ok": True, "batch_id": bid, "total": len(sym_list), **get_interpret_state()}
def _api_url() -> str:
base = settings.llm_base_url.rstrip("/")
if base.endswith("/v1"):
return f"{base}/chat/completions"
return f"{base}/v1/chat/completions"
def _build_prompt(symbol: str, stats_row: dict | None) -> str:
lines = [
f"你是加密货币合约分析师。请根据附图({symbol} 近300日K+成交量)及数据给出中文简析。",
"关注:趋势、关键支撑阻力、成交量变化、资金费率含义、未来1-3日可能节奏。",
"控制在 200-350 字,条理清晰,不要废话。",
]
if stats_row:
t, y, b = stats_row.get("today", {}), stats_row.get("yesterday", {}), stats_row.get("daybefore", {})
lines.append(
f"\n三日均为成交额Top30交集:"
f"\n今日 排名{t.get('rank')} 涨跌{t.get('price_change_pct_fmt')}{t.get('quote_volume_fmt')}"
f"\n昨日 排名{y.get('rank')} 涨跌{y.get('price_change_pct_fmt')}{y.get('quote_volume_fmt')}"
f"\n前日 排名{b.get('rank')} 涨跌{b.get('price_change_pct_fmt')}{b.get('quote_volume_fmt')}"
f"\n资金费率(当前){t.get('funding_rate_fmt', '')}"
)
return "\n".join(lines)
async def interpret_symbol(
symbol: str,
stats_row: dict | None = None,
batch_id: str | None = None,
) -> str:
if not settings.llm_api_key.strip():
raise RuntimeError("LLM_API_KEY 未配置")
png = await render_daily_chart_png_async(symbol, settings.chart_kline_limit)
b64 = base64.standard_b64encode(png).decode("ascii")
prompt = _build_prompt(symbol, stats_row)
payload = {
"model": settings.llm_model,
"messages": [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{b64}"},
},
],
}
],
"max_tokens": 800,
"temperature": 0.4,
}
headers = {
"Authorization": f"Bearer {settings.llm_api_key}",
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(_api_url(), json=payload, headers=headers)
if resp.status_code >= 400:
# 部分模型不支持 vision,降级纯文本
logger.warning("LLM vision failed %s, fallback text", resp.status_code)
payload["messages"] = [{"role": "user", "content": prompt + "\n(附图日K+成交量未能传入,请基于数据简析)"}]
resp = await client.post(_api_url(), json=payload, headers=headers)
resp.raise_for_status()
data = resp.json()
content = data["choices"][0]["message"]["content"]
bid = batch_id or datetime.now().strftime("%Y-%m-%d-%H%M")
save_llm_interpretation(symbol, bid, content)
return content
async def run_interpretation_batch(
symbols: list[str] | None = None,
*,
batch_id: str | None = None,
) -> dict:
if _interpret_lock.locked():
return {"ok": False, "message": "解读任务进行中"}
stats = compute_three_day_stats()
if not stats.get("ok"):
_interpret_state["running"] = False
return {"ok": False, "message": stats.get("message", "统计数据未就绪")}
sym_list = symbols or stats.get("symbols") or [x["symbol"] for x in stats.get("items", [])]
if not sym_list:
_interpret_state["running"] = False
return {"ok": False, "message": "三日交集为空"}
stats_map = {x["symbol"]: x for x in stats.get("items", [])}
bid = batch_id or _interpret_state.get("batch_id") or datetime.now().strftime("%Y-%m-%d-%H%M")
interval = settings.llm_symbol_interval_sec
async with _interpret_lock:
_interpret_state.update(
{
"running": True,
"current_symbol": "",
"done": _interpret_state.get("done", 0),
"total": len(sym_list),
"batch_id": bid,
}
)
for i, sym in enumerate(sym_list):
_interpret_state["current_symbol"] = sym
try:
await interpret_symbol(sym, stats_map.get(sym), bid)
logger.info("LLM interpreted %s (%d/%d)", sym, i + 1, len(sym_list))
except Exception as e:
_interpret_state["last_error"] = str(e)
logger.error("LLM %s failed: %s", sym, e)
save_llm_interpretation(sym, bid, f"[解读失败] {e}")
_interpret_state["done"] = i + 1
if i < len(sym_list) - 1:
await asyncio.sleep(interval)
_interpret_state["running"] = False
_interpret_state["current_symbol"] = ""
return {
"ok": True,
"batch_id": bid,
"count": len(sym_list),
"interval_sec": interval,
}
def schedule_interpret_background(symbols: list[str] | None = None) -> None:
"""后台启动解读,不阻塞请求。"""
info = init_interpret_batch()
if not info.get("ok"):
logger.info("Startup LLM skip: %s", info.get("message"))
return
bid = info.get("batch_id")
async def _run():
try:
await run_interpretation_batch(symbols, batch_id=bid)
except Exception as e:
logger.error("Background LLM batch failed: %s", e)
_interpret_state["running"] = False
asyncio.create_task(_run())
+3 -67
View File
@@ -2,19 +2,17 @@ import logging
from contextlib import asynccontextmanager
from pathlib import Path
from fastapi import BackgroundTasks, FastAPI, HTTPException
from fastapi.responses import FileResponse, Response
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from .config import ROOT_DIR, settings
from .funding_store import get_funding_bundle
from .kline_store import get_daily_candles, sync_daily_klines
from .db import get_latest_snapshot, get_llm_interpretations, init_db, log_push, save_snapshot
from .db import get_latest_snapshot, init_db, log_push, save_snapshot
from .exceptions import BinanceRateLimitedError
from .period_api import get_period_top30
from .periods import get_daybefore_period, get_today_period, get_yesterday_period
from .chart_image import render_daily_chart_png_async
from .llm_service import get_interpret_state, init_interpret_batch, run_interpretation_batch
from .scheduler import job_finalize_yesterday, job_push_wecom, job_refresh_today, start_scheduler, startup_tasks, stop_scheduler
from .stats import compute_three_day_stats
from .aggregator import aggregate_period
@@ -179,68 +177,6 @@ async def api_funding_history(symbol: str, limit: int | None = None, refresh: bo
raise HTTPException(502, "资金费率获取失败") from e
@app.get("/api/chart/{symbol}/daily.png")
async def api_chart_daily_png(symbol: str, limit: int | None = None):
sym = symbol.upper().strip()
if not sym.endswith("USDT"):
raise HTTPException(400, "invalid symbol")
try:
png = await render_daily_chart_png_async(sym, limit or settings.chart_kline_limit)
return Response(content=png, media_type="image/png")
except ValueError as e:
raise HTTPException(404, str(e)) from e
except Exception as e:
logger.error("chart png %s failed: %s", sym, e)
raise HTTPException(502, "图表生成失败") from e
@app.get("/api/llm/status")
async def api_llm_status():
state = get_interpret_state()
return {
**state,
"enabled": bool(settings.llm_api_key.strip()),
"model": settings.llm_model,
"base_url": settings.llm_base_url,
"interval_sec": settings.llm_symbol_interval_sec,
}
@app.get("/api/llm/interpretations")
async def api_llm_interpretations(batch_id: str | None = None, limit: int = 100):
"""返回解读列表;进行中时优先当前批次(即使尚无记录)。"""
st = get_interpret_state()
bid = batch_id or (st.get("batch_id") if st.get("running") else None)
items = get_llm_interpretations(bid, limit) if bid else get_llm_interpretations(None, limit)
if not bid and items:
bid = items[0].get("batch_id", "")
return {
"items": items,
"batch_id": bid or st.get("batch_id", ""),
"running": st.get("running", False),
"done": st.get("done", 0),
"total": st.get("total", 0),
"current_symbol": st.get("current_symbol", ""),
}
@app.post("/api/llm/interpret/run")
async def api_llm_interpret_run(background_tasks: BackgroundTasks):
if not settings.llm_api_key.strip():
raise HTTPException(400, "LLM_API_KEY 未配置")
info = init_interpret_batch()
if not info.get("ok"):
return info
bid = info.get("batch_id")
background_tasks.add_task(run_interpretation_batch, batch_id=bid)
return {
"ok": True,
"message": "已启动三日交集解读队列",
"batch_id": bid,
**get_interpret_state(),
}
@app.post("/api/chart/{symbol}/daily/refresh")
async def api_chart_daily_refresh(symbol: str, limit: int | None = None):
"""强制从币安同步日 K 到本地库。"""
+1 -32
View File
@@ -13,8 +13,6 @@ from .periods import get_daybefore_period, get_today_period, get_yesterday_perio
from .state import get_today_cache, set_today_cache
from .funding_store import prefetch_funding
from .kline_store import prefetch_symbols
from .llm_service import run_interpretation_batch, schedule_interpret_background
from .stats import compute_three_day_stats
from .wecom import build_push_payload, send_wecom_markdown
logger = logging.getLogger(__name__)
@@ -156,18 +154,6 @@ async def job_refresh_today() -> None:
_restore_today_from_db()
async def job_llm_interpret() -> None:
"""08:05 对三日交集币种逐个大模型解读(每币间隔 3 分钟)。"""
logger.info("Job: LLM interpret three-day intersection")
if not settings.llm_api_key.strip():
logger.info("LLM_API_KEY not set, skip")
return
try:
await run_interpretation_batch()
except Exception as e:
logger.error("LLM job failed: %s", e)
async def startup_tasks() -> None:
init_db()
now = now_shanghai()
@@ -215,13 +201,6 @@ async def startup_tasks() -> None:
except Exception as e:
logger.error("Startup catch-up push failed: %s", e)
if settings.llm_api_key.strip() and settings.llm_auto_on_startup:
stats = compute_three_day_stats()
if stats.get("ok") and stats.get("symbols"):
logger.info("Startup: schedule one LLM interpret batch")
schedule_interpret_background()
def start_scheduler() -> None:
scheduler.add_job(
job_finalize_yesterday,
@@ -242,19 +221,9 @@ def start_scheduler() -> None:
id="refresh_today",
replace_existing=True,
)
scheduler.add_job(
job_llm_interpret,
CronTrigger(hour=8, minute=5, timezone="Asia/Shanghai"),
id="llm_interpret",
replace_existing=True,
)
if not scheduler.running:
scheduler.start()
logger.info(
"Scheduler started (today every %dh, LLM 08:05, interval %ds)",
refresh_hours,
settings.llm_symbol_interval_sec,
)
logger.info("Scheduler started (today every %dh)", refresh_hours)
def stop_scheduler() -> None: