Add TTS markdown sanitization and expand deployment docs.
Strip Markdown and stage directions before ChatTTS synthesis with chunked long scripts; document model pre-download, server-update, and microphone HTTPS notes. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
+162
-10
@@ -8,12 +8,13 @@ from __future__ import annotations
|
||||
import inspect
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import traceback
|
||||
import uuid
|
||||
import warnings
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional, Tuple
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
import numpy as np
|
||||
import torch
|
||||
@@ -29,6 +30,7 @@ from config import (
|
||||
SPEAKER_EMB_PATH,
|
||||
SPEAKER_SAMPLE_MAX_SEC,
|
||||
SPEAKER_SAMPLE_MIN_SEC,
|
||||
TTS_MAX_CHARS_PER_CHUNK,
|
||||
TTS_SAMPLE_RATE,
|
||||
TTS_SPEED_PROMPT,
|
||||
TTS_TEMPERATURE,
|
||||
@@ -377,6 +379,121 @@ def speaker_is_ready() -> Tuple[bool, str]:
|
||||
return True, f"已加载固定音色: {SPEAKER_EMB_PATH}"
|
||||
|
||||
|
||||
_EMOJI_RE = re.compile(
|
||||
"["
|
||||
"\U0001F300-\U0001FAFF"
|
||||
"\U00002700-\U000027BF"
|
||||
"\U00002600-\U000026FF"
|
||||
"]+",
|
||||
flags=re.UNICODE,
|
||||
)
|
||||
|
||||
_TTS_NOTE_MARKERS = (
|
||||
"💡",
|
||||
"量化交易员的修改笔记",
|
||||
"修改笔记(供你参考)",
|
||||
"修改笔记",
|
||||
"供你参考",
|
||||
)
|
||||
|
||||
_STAGE_DIRECTION_RE = re.compile(
|
||||
r"[((][^))]{0,80}(?:前奏|转场|语气|背景|BGM|配乐|节奏|环节)[^))]{0,80}[))]"
|
||||
)
|
||||
|
||||
|
||||
def prepare_text_for_tts(text: str) -> str:
|
||||
"""
|
||||
将 LLM 润色稿转为 ChatTTS 可朗读的纯文本。
|
||||
去除 Markdown、emoji、舞台提示、修改笔记等非朗读内容。
|
||||
"""
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
cleaned = text.replace("\r\n", "\n").strip()
|
||||
|
||||
for marker in _TTS_NOTE_MARKERS:
|
||||
idx = cleaned.find(marker)
|
||||
if idx >= 0:
|
||||
cleaned = cleaned[:idx]
|
||||
|
||||
# 去掉模型常见前言,从标题或正文起点开始
|
||||
for pattern in (
|
||||
r"^作为一名极其严谨的量化交易员.*?配音稿。\s*",
|
||||
r"^以下是为你润色后的文案[::]*\s*",
|
||||
r"^以下(?:是|为).*?润色.*?文案[::]*\s*",
|
||||
):
|
||||
cleaned = re.sub(pattern, "", cleaned, count=1, flags=re.DOTALL)
|
||||
|
||||
cleaned = re.sub(r"^\*{3,}\s*$", "", cleaned, flags=re.MULTILINE)
|
||||
cleaned = re.sub(r"^-{3,}\s*$", "", cleaned, flags=re.MULTILINE)
|
||||
cleaned = re.sub(r"^#{1,6}\s*", "", cleaned, flags=re.MULTILINE)
|
||||
cleaned = re.sub(r"\*\*([^*\n]+)\*\*", r"\1", cleaned)
|
||||
cleaned = re.sub(r"\*([^*\n]+)\*", r"\1", cleaned)
|
||||
cleaned = re.sub(r"__([^_\n]+)__", r"\1", cleaned)
|
||||
cleaned = _STAGE_DIRECTION_RE.sub("", cleaned)
|
||||
cleaned = _EMOJI_RE.sub("", cleaned)
|
||||
cleaned = re.sub(r"^\d+\.\s*", "", cleaned, flags=re.MULTILINE)
|
||||
cleaned = re.sub(r"^[-*]\s+", "", cleaned, flags=re.MULTILINE)
|
||||
cleaned = re.sub(r"[ \t]+\n", "\n", cleaned)
|
||||
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
|
||||
|
||||
lines = [ln.strip() for ln in cleaned.split("\n")]
|
||||
lines = [ln for ln in lines if ln and not re.fullmatch(r"[*\-#]+", ln)]
|
||||
return "\n".join(lines).strip()
|
||||
|
||||
|
||||
def split_text_for_tts(text: str, max_chars: int = TTS_MAX_CHARS_PER_CHUNK) -> List[str]:
|
||||
"""按句号/换行切分长稿,避免 ChatTTS 单段过长失败。"""
|
||||
text = text.strip()
|
||||
if not text:
|
||||
return []
|
||||
if len(text) <= max_chars:
|
||||
return [text]
|
||||
|
||||
parts = re.split(r"(?<=[。!?!?;;])\s*|\n+", text)
|
||||
chunks: List[str] = []
|
||||
buf = ""
|
||||
|
||||
for part in parts:
|
||||
part = part.strip()
|
||||
if not part:
|
||||
continue
|
||||
candidate = f"{buf}{part}" if buf else part
|
||||
if len(candidate) <= max_chars:
|
||||
buf = candidate
|
||||
continue
|
||||
if buf:
|
||||
chunks.append(buf)
|
||||
buf = ""
|
||||
if len(part) <= max_chars:
|
||||
buf = part
|
||||
continue
|
||||
for i in range(0, len(part), max_chars):
|
||||
chunks.append(part[i : i + max_chars])
|
||||
|
||||
if buf:
|
||||
chunks.append(buf)
|
||||
|
||||
return [c.strip() for c in chunks if c.strip()]
|
||||
|
||||
|
||||
def _concat_wavs(
|
||||
wavs: List[np.ndarray],
|
||||
sample_rate: int,
|
||||
pause_sec: float = 0.35,
|
||||
) -> np.ndarray:
|
||||
if not wavs:
|
||||
return np.array([], dtype=np.float32)
|
||||
|
||||
pause = np.zeros(int(sample_rate * pause_sec), dtype=np.float32)
|
||||
segments: List[np.ndarray] = []
|
||||
for i, wav in enumerate(wavs):
|
||||
segments.append(np.asarray(wav, dtype=np.float32).flatten())
|
||||
if i < len(wavs) - 1:
|
||||
segments.append(pause)
|
||||
return np.concatenate(segments)
|
||||
|
||||
|
||||
def generate_voice(refined_text: str) -> Tuple[bool, str, Optional[str]]:
|
||||
"""
|
||||
使用 ChatTTS 将润色后的文稿合成为 wav 配音。
|
||||
@@ -401,6 +518,19 @@ def generate_voice(refined_text: str) -> Tuple[bool, str, Optional[str]]:
|
||||
try:
|
||||
import ChatTTS
|
||||
|
||||
speak_text = prepare_text_for_tts(refined_text)
|
||||
if not speak_text:
|
||||
return (
|
||||
False,
|
||||
"清洗后无有效朗读文本。请删除 Markdown(#、**)、emoji、舞台提示和「修改笔记」,"
|
||||
"只保留可念出的正文后再合成。",
|
||||
None,
|
||||
)
|
||||
|
||||
chunks = split_text_for_tts(speak_text)
|
||||
if not chunks:
|
||||
return False, "无法切分朗读文本,请检查润色稿内容。", None
|
||||
|
||||
spk_emb = payload.get("spk_emb")
|
||||
spk_smp = payload.get("spk_smp")
|
||||
txt_smp = payload.get("txt_smp", "")
|
||||
@@ -419,17 +549,35 @@ def generate_voice(refined_text: str) -> Tuple[bool, str, Optional[str]]:
|
||||
prompt="[oral_2][laugh_0][break_4]",
|
||||
)
|
||||
|
||||
wavs = chat.infer(
|
||||
refined_text.strip(),
|
||||
skip_refine_text=False,
|
||||
params_refine_text=params_refine_text,
|
||||
params_infer_code=params_infer_code,
|
||||
logger.info(
|
||||
"TTS 合成: 原文 %d 字 → 清洗后 %d 字,分 %d 段",
|
||||
len(refined_text),
|
||||
len(speak_text),
|
||||
len(chunks),
|
||||
)
|
||||
|
||||
if not wavs or len(wavs) == 0:
|
||||
return False, "ChatTTS 未生成有效音频。", None
|
||||
segment_wavs: List[np.ndarray] = []
|
||||
for idx, chunk in enumerate(chunks, start=1):
|
||||
wavs = chat.infer(
|
||||
chunk,
|
||||
skip_refine_text=False,
|
||||
params_refine_text=params_refine_text,
|
||||
params_infer_code=params_infer_code,
|
||||
)
|
||||
if not wavs or len(wavs) == 0:
|
||||
return (
|
||||
False,
|
||||
f"ChatTTS 第 {idx}/{len(chunks)} 段未生成音频。"
|
||||
f"(段内容前 40 字: {chunk[:40]}…)",
|
||||
None,
|
||||
)
|
||||
segment_wavs.append(np.asarray(wavs[0], dtype=np.float32))
|
||||
|
||||
wav_array = np.asarray(wavs[0], dtype=np.float32)
|
||||
wav_array = (
|
||||
segment_wavs[0]
|
||||
if len(segment_wavs) == 1
|
||||
else _concat_wavs(segment_wavs, TTS_SAMPLE_RATE)
|
||||
)
|
||||
|
||||
peak = np.max(np.abs(wav_array)) or 1.0
|
||||
wav_int16 = (wav_array / peak * 32767).astype(np.int16)
|
||||
@@ -440,7 +588,11 @@ def generate_voice(refined_text: str) -> Tuple[bool, str, Optional[str]]:
|
||||
|
||||
wavfile.write(str(output_path), TTS_SAMPLE_RATE, wav_int16)
|
||||
|
||||
msg = f"配音合成成功: {output_path}"
|
||||
chunk_note = f",共 {len(chunks)} 段拼接" if len(chunks) > 1 else ""
|
||||
msg = (
|
||||
f"配音合成成功: {output_path}"
|
||||
f"(朗读 {len(speak_text)} 字{chunk_note})"
|
||||
)
|
||||
logger.info(msg)
|
||||
return True, msg, str(output_path)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user