🐛 fix(model-runtime): emit stop:abort instead of error when stream is aborted (#13677)

* 🐛 fix(model-runtime): emit stop:abort instead of error when stream request is aborted

When user cancels a streaming request, the provider SDK throws abort errors
(e.g. "Request was aborted"). Previously these were propagated as error chunks,
causing the client to display a provider error message. Now abort errors emit
a stop:abort event through the SSE pipeline, allowing the client to handle
cancellation gracefully.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix(model-runtime): fix type error in abort pipeline test

Use `as const` for type literal to satisfy StreamProtocolChunk union type.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

*  test(fetch-sse): add planUpgradeAfterFinish to onFinish expectations

#15616 added planUpgradeAfterFinish to the onFinish context but missed
updating fetchSSE.test.ts, breaking 13 tests on canary.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* 🐛 fix(model-runtime): harden abort detection against non-Error throws

isAbortError assumed error.message is always a string, but catch
clauses receive unknown — a non-Error throw (string, object without
message) would make the abort check itself throw inside the stream
error handler, swallowing both ABORT_CHUNK and the first-chunk error.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-06-10 15:56:39 +08:00
committed by GitHub
parent 8b342c600f
commit 4c5c8795ef
3 changed files with 270 additions and 3 deletions
@@ -43,6 +43,7 @@ describe('fetchSSE', () => {
expect(mockOnMessageHandle).toHaveBeenNthCalledWith(1, { text: 'Hello World', type: 'text' });
expect(mockOnFinish).toHaveBeenCalledWith('Hello World', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: undefined,
traceId: null,
type: 'done',
@@ -90,6 +91,7 @@ describe('fetchSSE', () => {
});
expect(mockOnFinish).toHaveBeenCalledWith('', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: [
{ id: '1', type: 'function', function: { name: 'func1', arguments: 'arg1' } },
{ id: '2', type: 'function', function: { name: 'func2', arguments: 'arg2' } },
@@ -115,6 +117,7 @@ describe('fetchSSE', () => {
expect(mockOnMessageHandle).toHaveBeenCalledWith({ text: 'Hello World', type: 'text' });
expect(mockOnFinish).toHaveBeenCalledWith('Hello World', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: undefined,
traceId: null,
type: 'done',
@@ -146,6 +149,7 @@ describe('fetchSSE', () => {
expect(mockOnFinish).toHaveBeenCalledWith('Hello World', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: undefined,
traceId: null,
type: 'done',
@@ -179,6 +183,7 @@ describe('fetchSSE', () => {
expect(mockOnFinish).toHaveBeenCalledWith('Hello World', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: undefined,
traceId: null,
type: 'done',
@@ -212,6 +217,7 @@ describe('fetchSSE', () => {
expect(mockOnFinish).toHaveBeenCalledWith('hi', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: undefined,
reasoning: { content: 'Hello World' },
traceId: null,
@@ -263,6 +269,7 @@ describe('fetchSSE', () => {
// Verify output is accumulated correctly
expect(mockOnFinish).toHaveBeenCalledWith('Hello World', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: undefined,
traceId: null,
type: 'done',
@@ -300,6 +307,7 @@ describe('fetchSSE', () => {
// Verify reasoning is accumulated correctly
expect(mockOnFinish).toHaveBeenCalledWith('Final answer', {
observationId: null,
planUpgradeAfterFinish: false,
reasoning: { content: 'Thinking: step 1' },
toolCalls: undefined,
traceId: null,
@@ -342,6 +350,7 @@ describe('fetchSSE', () => {
// Output should be empty since image content is not accumulated
expect(mockOnFinish).toHaveBeenCalledWith('', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: undefined,
traceId: null,
type: 'done',
@@ -374,6 +383,7 @@ describe('fetchSSE', () => {
expect(mockOnFinish).toHaveBeenCalledWith('hi', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: undefined,
grounding: 'Hello',
traceId: null,
@@ -435,6 +445,7 @@ describe('fetchSSE', () => {
expect(mockOnFinish).toHaveBeenCalledWith('', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: [
{ id: '1', type: 'function', function: { name: 'func1', arguments: 'arg1' } },
{ id: '2', type: 'function', function: { name: 'func2', arguments: 'arg2' } },
@@ -473,6 +484,7 @@ describe('fetchSSE', () => {
expect(mockOnFinish).toHaveBeenCalledWith('Hello World', {
type: 'done',
observationId: null,
planUpgradeAfterFinish: false,
traceId: null,
});
});
@@ -492,6 +504,7 @@ describe('fetchSSE', () => {
expect(mockOnFinish).toHaveBeenCalledWith('Hello', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: undefined,
traceId: null,
type: 'abort',
@@ -509,6 +522,7 @@ describe('fetchSSE', () => {
expect(mockOnFinish).toHaveBeenCalledWith('Hello', {
observationId: null,
planUpgradeAfterFinish: false,
toolCalls: undefined,
traceId: null,
type: 'error',
@@ -1,6 +1,7 @@
import { describe, expect, it, vi } from 'vitest';
import {
ABORT_CHUNK,
convertIterableToStream,
createCallbacksTransformer,
createFirstErrorHandleTransformer,
@@ -8,6 +9,7 @@ import {
createSSEProtocolTransformer,
createTokenSpeedCalculator,
FIRST_CHUNK_ERROR_KEY,
readableFromAsyncIterable,
} from './protocol';
describe('createSSEDataExtractor', () => {
@@ -375,6 +377,203 @@ describe('convertIterableToStream', () => {
expect(chunks[1].cause.big).toBe('9007199254740993');
expect(chunks[1].cause.ref.self).toBe('[Circular]');
});
it('should emit ABORT_CHUNK when AbortError occurs during pull', async () => {
async function* abortingStream() {
yield 'first';
const err = new Error('The operation was aborted');
err.name = 'AbortError';
throw err;
}
const readable = convertIterableToStream(abortingStream());
const reader = readable.getReader();
const chunks: any[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks).toEqual(['first', ABORT_CHUNK]);
});
it('should emit ABORT_CHUNK when "Request was aborted" error occurs during pull', async () => {
async function* abortingStream() {
yield 'first';
throw new Error('Request was aborted.');
}
const readable = convertIterableToStream(abortingStream());
const reader = readable.getReader();
const chunks: any[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks).toEqual(['first', ABORT_CHUNK]);
});
it('should emit ABORT_CHUNK when abort error occurs during start', async () => {
async function* abortingStream(): AsyncGenerator<string> {
yield* []; // eslint: require-yield
throw new Error('Request was aborted.');
}
const readable = convertIterableToStream(abortingStream());
const reader = readable.getReader();
const chunks: any[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks).toEqual([ABORT_CHUNK]);
});
it('should emit ABORT_CHUNK when "cancelled" error occurs during pull', async () => {
async function* cancelledStream() {
yield 'data';
throw new Error('The request was cancelled');
}
const readable = convertIterableToStream(cancelledStream());
const reader = readable.getReader();
const chunks: any[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks).toEqual(['data', ABORT_CHUNK]);
});
it('should emit ABORT_CHUNK for AbortError-named throw without a message', async () => {
async function* abortingStream() {
yield 'first';
// some SDKs throw bare objects rather than Error instances
throw { name: 'AbortError' };
}
const readable = convertIterableToStream(abortingStream());
const reader = readable.getReader();
const chunks: any[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks).toEqual(['first', ABORT_CHUNK]);
});
it('should fall back to error chunk when iterator throws a non-Error value', async () => {
async function* erroringStream() {
yield 'first';
// a string throw must not crash the abort check — it should still
// surface as a serialized first-chunk error instead of killing the pipe
throw 'upstream exploded';
}
const chunks = await drain(
convertIterableToStream(erroringStream()).pipeThrough(createFirstErrorHandleTransformer()),
);
expect(chunks[0]).toBe('first');
expect(chunks[1][FIRST_CHUNK_ERROR_KEY]).toBe(true);
});
it('should fall back to error chunk when thrown object has no message', async () => {
async function* erroringStream() {
yield 'first';
throw { code: 'ECONNRESET' };
}
const chunks = await drain(
convertIterableToStream(erroringStream()).pipeThrough(createFirstErrorHandleTransformer()),
);
expect(chunks[0]).toBe('first');
expect(chunks[1][FIRST_CHUNK_ERROR_KEY]).toBe(true);
});
it('should produce stop:abort SSE event through full pipeline when request is aborted', async () => {
async function* abortingStream() {
yield { type: 'message_start', message: { id: 'msg_1', content: [] } };
throw new Error('Request was aborted.');
}
const identity = (chunk: any) => ({ data: chunk, id: 'msg_1', type: 'data' as const });
const readable = convertIterableToStream(abortingStream())
.pipeThrough(createTokenSpeedCalculator(identity))
.pipeThrough(createSSEProtocolTransformer((c) => c));
const reader = readable.getReader();
const chunks: string[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value as string);
}
// Last 3 chunks should be the stop:abort SSE event
const stopLines = chunks.slice(-3);
expect(stopLines).toEqual(['id: \n', 'event: stop\n', `data: ${JSON.stringify('abort')}\n\n`]);
});
});
describe('readableFromAsyncIterable', () => {
it('should emit ABORT_CHUNK when abort error occurs during pull', async () => {
async function* abortingStream() {
yield 'first';
throw new Error('Request was aborted.');
}
const readable = readableFromAsyncIterable(abortingStream());
const reader = readable.getReader();
const chunks: any[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks).toEqual(['first', ABORT_CHUNK]);
});
it('should still surface non-abort errors as error chunks', async () => {
async function* erroringStream() {
yield 'first';
throw new Error('rate limit');
}
const readable = readableFromAsyncIterable(erroringStream()).pipeThrough(
createFirstErrorHandleTransformer(),
);
const reader = readable.getReader();
const chunks: any[] = [];
while (true) {
const { done, value } = await reader.read();
if (done) break;
chunks.push(value);
}
expect(chunks[0]).toBe('first');
expect(chunks[1][FIRST_CHUNK_ERROR_KEY]).toBe(true);
expect(chunks[1].message).toBe('rate limit');
});
});
describe('createSSEProtocolTransformer', () => {
@@ -144,6 +144,22 @@ const chatStreamable = async function* <T>(stream: AsyncIterable<T>) {
const ERROR_CHUNK_PREFIX = '%FIRST_CHUNK_ERROR%: ';
export const ABORT_CHUNK = '%ABORT_CHUNK%';
const isAbortError = (error: unknown): boolean => {
// SDK iterators may throw non-Error values (strings, plain objects without
// a `message`) — guard before touching `.name`/`.message` so the abort
// check itself can't blow up inside the stream error handler.
if (!error || typeof error !== 'object') return false;
const { name, message } = error as { message?: unknown; name?: unknown };
return (
name === 'AbortError' ||
(typeof message === 'string' && (message.includes('aborted') || message.includes('cancelled')))
);
};
/**
* Optional diagnostic context attached to errors that surface from the
* provider SDK iterator. Lets the FIRST_CHUNK_ERROR payload carry
@@ -271,7 +287,15 @@ export function readableFromAsyncIterable<T>(
if (done) controller.close();
else controller.enqueue(value);
} catch (e) {
controller.enqueue(buildStreamErrorPayload(e as Error, context) as T);
const error = e as Error;
if (isAbortError(error)) {
controller.enqueue(ABORT_CHUNK as T);
controller.close();
return;
}
controller.enqueue(buildStreamErrorPayload(error, context) as T);
controller.close();
}
},
@@ -299,7 +323,15 @@ export const convertIterableToStream = <T>(
if (done) controller.close();
else controller.enqueue(value);
} catch (e) {
controller.enqueue(buildStreamErrorPayload(e as Error, context) as T);
const error = e as Error;
if (isAbortError(error)) {
controller.enqueue(ABORT_CHUNK as T);
controller.close();
return;
}
controller.enqueue(buildStreamErrorPayload(error, context) as T);
controller.close();
}
},
@@ -310,7 +342,15 @@ export const convertIterableToStream = <T>(
if (done) controller.close();
else controller.enqueue(value);
} catch (e) {
controller.enqueue(buildStreamErrorPayload(e as Error, context) as T);
const error = e as Error;
if (isAbortError(error)) {
controller.enqueue(ABORT_CHUNK as T);
controller.close();
return;
}
controller.enqueue(buildStreamErrorPayload(error, context) as T);
controller.close();
}
},
@@ -530,6 +570,11 @@ export const createFirstErrorHandleTransformer = (
) => {
return new TransformStream({
transform(chunk, controller) {
if (chunk === ABORT_CHUNK) {
controller.enqueue(chunk);
return;
}
if (chunk.toString().startsWith(ERROR_CHUNK_PREFIX)) {
const errorData = JSON.parse(chunk.toString().replace(ERROR_CHUNK_PREFIX, ''));
@@ -643,6 +688,15 @@ export const createTokenSpeedCalculator = (
return new TransformStream({
transform(chunk, controller) {
if (chunk === ABORT_CHUNK) {
controller.enqueue({
data: 'abort',
id: streamStack?.id || '',
type: 'stop',
} as StreamProtocolChunk);
return;
}
let result = transformer(chunk, streamStack || { id: '' });
if (!Array.isArray(result)) result = [result];
result.forEach((r) => {