Fix AI stream by returning stream.value directly from Server Actions.

createStreamableValue must be created and returned in the action itself; wrapping in { data } or a helper return caused production RSC serialization errors.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
dekun
2026-06-10 22:24:57 +08:00
parent 3933905d66
commit 0f3bc2c50a
7 changed files with 102 additions and 109 deletions
+6 -2
View File
@@ -1,6 +1,7 @@
"use server"; "use server";
import { streamAIResponse } from "@/lib/ai/stream"; import { createStreamableValue } from "ai/rsc";
import { pumpAIStream } from "@/lib/ai/stream";
import { import {
calculateBazi, calculateBazi,
formatBaziForPrompt, formatBaziForPrompt,
@@ -16,7 +17,9 @@ export async function getBaziAnswer(
const chart = calculateBazi(input); const chart = calculateBazi(input);
const chartText = formatBaziForPrompt(chart); const chartText = formatBaziForPrompt(chart);
return streamAIResponse( const stream = createStreamableValue<string>();
pumpAIStream(
stream,
BAZI_SYSTEM_PROMPT, BAZI_SYSTEM_PROMPT,
`【出生时空】 `【出生时空】
出生地:${birthPlaceName} 出生地:${birthPlaceName}
@@ -27,4 +30,5 @@ ${chartText}
【问事】 【问事】
${question}`, ${question}`,
); );
return stream.value;
} }
+6 -2
View File
@@ -1,6 +1,7 @@
"use server"; "use server";
import { streamAIResponse } from "@/lib/ai/stream"; import { createStreamableValue } from "ai/rsc";
import { pumpAIStream } from "@/lib/ai/stream";
import { import {
calculateBazi, calculateBazi,
formatBaziForPrompt, formatBaziForPrompt,
@@ -69,7 +70,9 @@ export async function getCombinedAnswer(input: CombinedInput) {
} }
} }
return streamAIResponse( const stream = createStreamableValue<string>();
pumpAIStream(
stream,
COMBINED_SYSTEM_PROMPT, COMBINED_SYSTEM_PROMPT,
`【人和 · 八字排盘】 `【人和 · 八字排盘】
出生地:${input.birthPlaceName} 出生地:${input.birthPlaceName}
@@ -81,4 +84,5 @@ ${hexagramText}
【问事】 【问事】
${input.question}`, ${input.question}`,
); );
return stream.value;
} }
+6 -2
View File
@@ -1,6 +1,7 @@
"use server"; "use server";
import { streamAIResponse } from "@/lib/ai/stream"; import { createStreamableValue } from "ai/rsc";
import { pumpAIStream } from "@/lib/ai/stream";
import { import {
formatLiuyaoTimingForPrompt, formatLiuyaoTimingForPrompt,
getTimingInfoWithLongitude, getTimingInfoWithLongitude,
@@ -51,7 +52,9 @@ export async function getLiuyaoAnswer(input: LiuyaoInput) {
guaDetailText = ""; guaDetailText = "";
} }
return streamAIResponse( const stream = createStreamableValue<string>();
pumpAIStream(
stream,
LIUYAO_SYSTEM_PROMPT, LIUYAO_SYSTEM_PROMPT,
`${timingText} `${timingText}
@@ -63,4 +66,5 @@ ${input.question}
${guaDetailText}`, ${guaDetailText}`,
); );
return stream.value;
} }
+8 -14
View File
@@ -70,25 +70,19 @@ export default function BaziForm() {
setIsLoading(true); setIsLoading(true);
setShowAi(true); setShowAi(true);
try { try {
const { data, error: apiError } = await getBaziAnswer( const stream = await getBaziAnswer(
input, input,
question, question,
location!.name, location!.name,
); );
if (apiError) { let ret = "";
setError(apiError); for await (const delta of readStreamableValue(stream)) {
return; if (typeof delta === "string" && delta.startsWith(ERROR_PREFIX)) {
} setError(delta.slice(ERROR_PREFIX.length));
if (data) { return;
let ret = "";
for await (const delta of readStreamableValue(data)) {
if (typeof delta === "string" && delta.startsWith(ERROR_PREFIX)) {
setError(delta.slice(ERROR_PREFIX.length));
return;
}
ret += delta ?? "";
setCompletion(ret);
} }
ret += delta ?? "";
setCompletion(ret);
} }
} catch (err) { } catch (err) {
setError(err instanceof Error ? err.message : String(err)); setError(err instanceof Error ? err.message : String(err));
+8 -14
View File
@@ -109,7 +109,7 @@ export default function CombinedForm() {
setShowAi(true); setShowAi(true);
try { try {
const { data, error: apiError } = await getCombinedAnswer({ const stream = await getCombinedAnswer({
birth: { birth: {
date: birthDate, date: birthDate,
time: unknownHour ? "12:00" : birthTime, time: unknownHour ? "12:00" : birthTime,
@@ -133,20 +133,14 @@ export default function CombinedForm() {
: undefined, : undefined,
}); });
if (apiError) { let ret = "";
setError(apiError); for await (const delta of readStreamableValue(stream)) {
return; if (typeof delta === "string" && delta.startsWith(ERROR_PREFIX)) {
} setError(delta.slice(ERROR_PREFIX.length));
if (data) { return;
let ret = "";
for await (const delta of readStreamableValue(data)) {
if (typeof delta === "string" && delta.startsWith(ERROR_PREFIX)) {
setError(delta.slice(ERROR_PREFIX.length));
return;
}
ret += delta ?? "";
setCompletion(ret);
} }
ret += delta ?? "";
setCompletion(ret);
} }
} catch (e) { } catch (e) {
setError(e instanceof Error ? e.message : String(e)); setError(e instanceof Error ? e.message : String(e));
+8 -14
View File
@@ -72,7 +72,7 @@ export default function LiuyaoForm() {
setShowAi(true); setShowAi(true);
try { try {
const { data, error: apiError } = await getLiuyaoAnswer({ const stream = await getLiuyaoAnswer({
question, question,
calcDate, calcDate,
calcTime, calcTime,
@@ -84,20 +84,14 @@ export default function LiuyaoForm() {
guaChange: guaData!.result.guaChange, guaChange: guaData!.result.guaChange,
}); });
if (apiError) { let ret = "";
setError(apiError); for await (const delta of readStreamableValue(stream)) {
return; if (typeof delta === "string" && delta.startsWith(ERROR_PREFIX)) {
} setError(delta.slice(ERROR_PREFIX.length));
if (data) { return;
let ret = "";
for await (const delta of readStreamableValue(data)) {
if (typeof delta === "string" && delta.startsWith(ERROR_PREFIX)) {
setError(delta.slice(ERROR_PREFIX.length));
return;
}
ret += delta ?? "";
setCompletion(ret);
} }
ret += delta ?? "";
setCompletion(ret);
} }
} catch (e) { } catch (e) {
setError(e instanceof Error ? e.message : String(e)); setError(e instanceof Error ? e.message : String(e));
+60 -61
View File
@@ -3,6 +3,8 @@ import { createOpenAI } from "@ai-sdk/openai";
import { createStreamableValue } from "ai/rsc"; import { createStreamableValue } from "ai/rsc";
import { ERROR_PREFIX } from "@/lib/constant"; import { ERROR_PREFIX } from "@/lib/constant";
export type AIStream = ReturnType<typeof createStreamableValue<string>>;
function getOpenAiBaseUrl(): string { function getOpenAiBaseUrl(): string {
return ( return (
process.env.OPENAI_BASE_URL ?? process.env.OPENAI_BASE_URL ??
@@ -14,77 +16,74 @@ function getOpenAiBaseUrl(): string {
const model = const model =
process.env.OPENAI_MODEL ?? "huihui_ai/gemma-4-abliterated:e4b"; process.env.OPENAI_MODEL ?? "huihui_ai/gemma-4-abliterated:e4b";
export async function streamAIResponse( export function assertOpenAiKey(): string {
system: string,
user: string,
): Promise<{ data?: ReturnType<typeof createStreamableValue<string>>["value"]; error?: string }> {
const apiKey = process.env.OPENAI_API_KEY?.trim(); const apiKey = process.env.OPENAI_API_KEY?.trim();
if (!apiKey) { if (!apiKey) {
return { error: "未配置 OPENAI_API_KEY,请在 .env.local 或 Docker env_file 中设置" }; throw new Error(
"未配置 OPENAI_API_KEY,请在 .env.local 或 Docker env_file 中设置",
);
} }
return apiKey;
}
/** 在 Server Action 内 createStreamableValue 后调用,不可从子函数返回 stream.value */
export function pumpAIStream(
stream: AIStream,
system: string,
user: string,
): void {
const openai = createOpenAI({ const openai = createOpenAI({
apiKey, apiKey: assertOpenAiKey(),
baseURL: getOpenAiBaseUrl(), baseURL: getOpenAiBaseUrl(),
}); });
const stream = createStreamableValue<string>(); const { fullStream } = streamText({
temperature: 0.5,
model: openai(model),
messages: [
{ role: "system", content: system },
{ role: "user", content: user },
],
maxRetries: 0,
});
try { let buffer = "";
const { fullStream } = streamText({ let done = false;
temperature: 0.5, const intervalId = setInterval(() => {
model: openai(model), if (done && buffer.length === 0) {
messages: [ clearInterval(intervalId);
{ role: "system", content: system }, stream.done();
{ role: "user", content: user }, return;
], }
maxRetries: 0, if (buffer.length <= 6) {
}); stream.update(buffer);
buffer = "";
} else {
const chunk = buffer.slice(0, 6);
buffer = buffer.slice(6);
stream.update(chunk);
}
}, 60);
let buffer = ""; (async () => {
let done = false; for await (const part of fullStream) {
const intervalId = setInterval(() => { switch (part.type) {
if (done && buffer.length === 0) { case "text-delta":
clearInterval(intervalId); buffer += part.textDelta;
stream.done(); break;
return; case "error": {
} const err = part.error as { message?: string };
if (buffer.length <= 6) { stream.update(ERROR_PREFIX + (err.message ?? String(part.error)));
stream.update(buffer); break;
buffer = "";
} else {
const chunk = buffer.slice(0, 6);
buffer = buffer.slice(6);
stream.update(chunk);
}
}, 60);
(async () => {
for await (const part of fullStream) {
switch (part.type) {
case "text-delta":
buffer += part.textDelta;
break;
case "error": {
const err = part.error as { message?: string };
stream.update(ERROR_PREFIX + (err.message ?? String(part.error)));
break;
}
} }
} }
})() }
.catch((err) => { })()
const message = err instanceof Error ? err.message : String(err); .catch((err) => {
stream.update(ERROR_PREFIX + message); const message = err instanceof Error ? err.message : String(err);
}) stream.update(ERROR_PREFIX + message);
.finally(() => { })
done = true; .finally(() => {
}); done = true;
});
return { data: stream.value };
} catch (err) {
stream.done();
const message = err instanceof Error ? err.message : String(err);
return { error: message };
}
} }