Compare commits

...

1 Commits

Author SHA1 Message Date
YuTengjing 445ce7025c 🐛 fix: isolate stream callback failures 2026-05-12 22:40:05 +08:00
2 changed files with 96 additions and 30 deletions
@@ -701,6 +701,56 @@ describe('createCallbacksTransformer', () => {
expect(onFinal).toHaveBeenCalledWith(expectedData);
});
it('should keep streaming when per-chunk callbacks throw', async () => {
const error = new Error('text callback failed');
const consoleError = vi.spyOn(console, 'error').mockImplementation(() => {});
const onCompletion = vi.fn();
const onText = vi.fn(() => {
throw error;
});
const transformer = createCallbacksTransformer({ onCompletion, onText });
const chunks = ['event: text\n', 'data: "Hello"\n\n'];
try {
await expect(processChunks(transformer, chunks)).resolves.toBeDefined();
expect(onText).toHaveBeenCalledWith('Hello');
expect(onCompletion).toHaveBeenCalledWith(expect.objectContaining({ text: 'Hello' }));
expect(consoleError).toHaveBeenCalledWith(
'[createCallbacksTransformer] onText callback error:',
error,
);
} finally {
consoleError.mockRestore();
}
});
it('should continue final callbacks when completion callback throws', async () => {
const error = new Error('completion callback failed');
const consoleError = vi.spyOn(console, 'error').mockImplementation(() => {});
const onCompletion = vi.fn(() => {
throw error;
});
const onFinal = vi.fn();
const transformer = createCallbacksTransformer({ onCompletion, onFinal });
const chunks = ['event: text\n', 'data: "Hello"\n\n'];
try {
await expect(processChunks(transformer, chunks)).resolves.toBeDefined();
expect(onCompletion).toHaveBeenCalledOnce();
expect(onFinal).toHaveBeenCalledWith(expect.objectContaining({ text: 'Hello' }));
expect(consoleError).toHaveBeenCalledWith(
'[createCallbacksTransformer] onCompletion callback error:',
error,
);
} finally {
consoleError.mockRestore();
}
});
it('should capture finishReason from stop chunks and include in final data', async () => {
const onCompletion = vi.fn();
const onFinal = vi.fn();
@@ -361,6 +361,19 @@ export const createSSEProtocolTransformer = (
});
};
const runStreamCallback = async (
name: keyof ChatStreamCallbacks,
callback: (() => Promise<void> | void) | undefined,
) => {
if (!callback) return;
try {
await callback();
} catch (e) {
console.error(`[createCallbacksTransformer] ${name} callback error:`, e);
}
};
export function createCallbacksTransformer(cb: ChatStreamCallbacks | undefined) {
const textEncoder = new TextEncoder();
let aggregatedText = '';
@@ -390,17 +403,12 @@ export function createCallbacksTransformer(cb: ChatStreamCallbacks | undefined)
usage,
};
if (callbacks.onCompletion) {
await callbacks.onCompletion(data);
}
if (callbacks.onFinal) {
await callbacks.onFinal(data);
}
await runStreamCallback('onCompletion', () => callbacks.onCompletion?.(data));
await runStreamCallback('onFinal', () => callbacks.onFinal?.(data));
},
async start(): Promise<void> {
if (callbacks.onStart) await callbacks.onStart();
await runStreamCallback('onStart', () => callbacks.onStart?.());
},
async transform(chunk: string, controller): Promise<void> {
@@ -421,7 +429,7 @@ export function createCallbacksTransformer(cb: ChatStreamCallbacks | undefined)
switch (currentType) {
case 'text': {
aggregatedText += data;
await callbacks.onText?.(data);
await runStreamCallback('onText', () => callbacks.onText?.(data));
break;
}
@@ -431,7 +439,7 @@ export function createCallbacksTransformer(cb: ChatStreamCallbacks | undefined)
}
aggregatedThinking += data;
await callbacks.onThinking?.(data);
await runStreamCallback('onThinking', () => callbacks.onThinking?.(data));
break;
}
@@ -439,40 +447,46 @@ export function createCallbacksTransformer(cb: ChatStreamCallbacks | undefined)
// data format: { image: { id, data }, images: [...] }
const imageData = data as { image: { data: string; id: string }; images: any[] };
base64Images.push(imageData.image);
await callbacks.onBase64Image?.({
image: imageData.image,
images: base64Images,
});
await runStreamCallback('onBase64Image', () =>
callbacks.onBase64Image?.({
image: imageData.image,
images: base64Images,
}),
);
break;
}
case 'content_part': {
// data format: StreamPartChunkData
const partData = data as StreamPartChunkData;
await callbacks.onContentPart?.({
content: partData.content,
mimeType: partData.mimeType,
partType: partData.partType,
thoughtSignature: partData.thoughtSignature,
});
await runStreamCallback('onContentPart', () =>
callbacks.onContentPart?.({
content: partData.content,
mimeType: partData.mimeType,
partType: partData.partType,
thoughtSignature: partData.thoughtSignature,
}),
);
break;
}
case 'reasoning_part': {
// data format: StreamPartChunkData
const partData = data as StreamPartChunkData;
await callbacks.onReasoningPart?.({
content: partData.content,
mimeType: partData.mimeType,
partType: partData.partType,
thoughtSignature: partData.thoughtSignature,
});
await runStreamCallback('onReasoningPart', () =>
callbacks.onReasoningPart?.({
content: partData.content,
mimeType: partData.mimeType,
partType: partData.partType,
thoughtSignature: partData.thoughtSignature,
}),
);
break;
}
case 'usage': {
usage = data;
await callbacks.onUsage?.(data);
await runStreamCallback('onUsage', () => callbacks.onUsage?.(data));
break;
}
@@ -483,7 +497,7 @@ export function createCallbacksTransformer(cb: ChatStreamCallbacks | undefined)
case 'grounding': {
grounding = data;
await callbacks.onGrounding?.(data);
await runStreamCallback('onGrounding', () => callbacks.onGrounding?.(data));
break;
}
@@ -491,7 +505,9 @@ export function createCallbacksTransformer(cb: ChatStreamCallbacks | undefined)
if (!toolsCalling) toolsCalling = [];
toolsCalling = parseToolCalls(toolsCalling, data);
await callbacks.onToolsCalling?.({ chunk: data, toolsCalling });
await runStreamCallback('onToolsCalling', () =>
callbacks.onToolsCalling?.({ chunk: data, toolsCalling }),
);
break;
}
@@ -513,7 +529,7 @@ export function createCallbacksTransformer(cb: ChatStreamCallbacks | undefined)
case 'error': {
streamError = data;
await callbacks.onError?.(data);
await runStreamCallback('onError', () => callbacks.onError?.(data));
break;
}
}