diff --git a/apps/server/src/modules/AgentRuntime/StreamEventManager.ts b/apps/server/src/modules/AgentRuntime/StreamEventManager.ts index efe43b29a7..8dd52e7ee1 100644 --- a/apps/server/src/modules/AgentRuntime/StreamEventManager.ts +++ b/apps/server/src/modules/AgentRuntime/StreamEventManager.ts @@ -121,6 +121,12 @@ export interface StreamChunkData { | 'content_part' | 'reasoning_part'; content?: string; + /** + * Defaults to `delta`. + * `snapshot` means `content` is the full current text for this stream step, + * not an append-only token delta. + */ + contentMode?: 'delta' | 'snapshot'; /** Multimodal content parts (text + images) */ contentParts?: Array<{ text: string; type: 'text' } | { image: string; type: 'image' }>; /** Grounding/search data */ diff --git a/packages/agent-gateway-client/src/types.ts b/packages/agent-gateway-client/src/types.ts index 46ac9f44c3..0fc4394756 100644 --- a/packages/agent-gateway-client/src/types.ts +++ b/packages/agent-gateway-client/src/types.ts @@ -66,6 +66,12 @@ export type StreamChunkType = export interface StreamChunkData { chunkType: StreamChunkType; content?: string; + /** + * Defaults to `delta`. + * `snapshot` means `content` is the full current text for this stream step, + * not an append-only token delta. + */ + contentMode?: 'delta' | 'snapshot'; contentParts?: Array<{ text: string; type: 'text' } | { image: string; type: 'image' }>; grounding?: any; imageList?: any[]; diff --git a/packages/heterogeneous-agents/src/adapters/codex.test.ts b/packages/heterogeneous-agents/src/adapters/codex.test.ts index faaf4edd63..56392f9800 100644 --- a/packages/heterogeneous-agents/src/adapters/codex.test.ts +++ b/packages/heterogeneous-agents/src/adapters/codex.test.ts @@ -43,7 +43,110 @@ describe('CodexAdapter', () => { type: 'stream_start', }); expect(text[0]).toMatchObject({ - data: { chunkType: 'text', content: 'hello from codex' }, + data: { chunkType: 'text', content: 'hello from codex', contentMode: 'snapshot' }, + type: 'stream_chunk', + }); + }); + + it('treats Codex agent message updates as replaceable text snapshots', () => { + const adapter = new CodexAdapter(); + + adapter.adapt({ type: 'turn.started' }); + const draft = adapter.adapt({ + item: { + id: 'item_0', + text: 'I will inspect the whole repository.', + type: 'agent_message', + }, + type: 'item.updated', + }); + const shortened = adapter.adapt({ + item: { + id: 'item_0', + text: 'I will inspect the repo.', + type: 'agent_message', + }, + type: 'item.updated', + }); + const completed = adapter.adapt({ + item: { + id: 'item_0', + text: 'I will inspect the repo first.', + type: 'agent_message', + }, + type: 'item.completed', + }); + + expect(draft[0]).toMatchObject({ + data: { + chunkType: 'text', + content: 'I will inspect the whole repository.', + contentMode: 'snapshot', + }, + type: 'stream_chunk', + }); + expect(shortened[0]).toMatchObject({ + data: { + chunkType: 'text', + content: 'I will inspect the repo.', + contentMode: 'snapshot', + }, + type: 'stream_chunk', + }); + expect(completed[0]).toMatchObject({ + data: { + chunkType: 'text', + content: 'I will inspect the repo first.', + contentMode: 'snapshot', + }, + type: 'stream_chunk', + }); + }); + + it('keeps prior agent message text when a later item updates in the same step', () => { + const adapter = new CodexAdapter(); + + adapter.adapt({ type: 'turn.started' }); + adapter.adapt({ + item: { + id: 'item_0', + text: 'First status update.', + type: 'agent_message', + }, + type: 'item.completed', + }); + + const secondDraft = adapter.adapt({ + item: { + id: 'item_1', + text: 'Second draft.', + type: 'agent_message', + }, + type: 'item.updated', + }); + const secondRevision = adapter.adapt({ + item: { + id: 'item_1', + text: 'Second revised status update.', + type: 'agent_message', + }, + type: 'item.updated', + }); + + expect(secondDraft[0]).toMatchObject({ + data: { + chunkType: 'text', + content: 'First status update.\n\nSecond draft.', + contentMode: 'snapshot', + }, + type: 'stream_chunk', + }); + expect(secondRevision[0]).toMatchObject({ + data: { + chunkType: 'text', + content: 'First status update.\n\nSecond revised status update.', + contentMode: 'snapshot', + }, type: 'stream_chunk', }); }); @@ -231,7 +334,11 @@ describe('CodexAdapter', () => { expect(secondMessage).toHaveLength(1); expect(secondMessage[0]).toMatchObject({ - data: { chunkType: 'text', content: '\n\nSecond status update.' }, + data: { + chunkType: 'text', + content: 'First status update.\n\nSecond status update.', + contentMode: 'snapshot', + }, stepIndex: 0, type: 'stream_chunk', }); @@ -289,7 +396,11 @@ describe('CodexAdapter', () => { expect(nextMessage).toHaveLength(1); expect(nextMessage[0]).toMatchObject({ - data: { chunkType: 'text', content: '\n\nThe broad search is done; continuing.' }, + data: { + chunkType: 'text', + content: 'Continuing with narrower checks.\n\nThe broad search is done; continuing.', + contentMode: 'snapshot', + }, stepIndex: 1, type: 'stream_chunk', }); diff --git a/packages/heterogeneous-agents/src/adapters/codex.ts b/packages/heterogeneous-agents/src/adapters/codex.ts index 9ce272ed56..d28b10ec0c 100644 --- a/packages/heterogeneous-agents/src/adapters/codex.ts +++ b/packages/heterogeneous-agents/src/adapters/codex.ts @@ -522,9 +522,17 @@ const getCodexTerminalErrorStderr = (raw: any): string | undefined => { ); }; +const getAgentMessageText = (item: unknown): string | undefined => { + if (!isRecord(item)) return; + const text = item.text; + return typeof text === 'string' ? text : undefined; +}; + export class CodexAdapter implements AgentEventAdapter { private currentAgentMessageItemId?: string; + private currentAgentMessageText = ''; private currentModel?: string; + private currentStepText = ''; sessionId?: string; private hasTextInCurrentStep = false; @@ -563,6 +571,9 @@ export class CodexAdapter implements AgentEventAdapter { case 'item.started': { return this.handleItemStarted(raw.item); } + case 'item.updated': { + return this.handleItemUpdated(raw.item); + } case 'item.completed': { return this.handleItemCompleted(raw.item); } @@ -638,6 +649,8 @@ export class CodexAdapter implements AgentEventAdapter { private handleTurnStarted(): HeterogeneousAgentEvent[] { this.currentAgentMessageItemId = undefined; + this.currentAgentMessageText = ''; + this.currentStepText = ''; this.hasTextInCurrentStep = false; this.hasToolActivitySinceAgentMessage = false; this.resetStepToolCalls(); @@ -666,42 +679,16 @@ export class CodexAdapter implements AgentEventAdapter { return this.emitToolChunk(tool); } + private handleItemUpdated(item: any): HeterogeneousAgentEvent[] { + if (item?.type === 'agent_message') return this.handleAgentMessageItem(item); + return []; + } + private handleItemCompleted(item: any): HeterogeneousAgentEvent[] { if (!item?.type) return []; if (item.type === 'agent_message') { - if (!item.text) return []; - - const events: HeterogeneousAgentEvent[] = []; - const shouldStartNewStep = - this.hasToolActivitySinceAgentMessage && - !!item.id && - item.id !== this.currentAgentMessageItemId; - - if (shouldStartNewStep) { - this.stepIndex += 1; - this.resetStepToolCalls(); - this.hasTextInCurrentStep = false; - events.push(this.makeEvent('stream_end', {})); - events.push(this.makeEvent('stream_start', this.getStreamStartData({ newStep: true }))); - } - - const content = - this.hasTextInCurrentStep && item.id !== this.currentAgentMessageItemId - ? `\n\n${item.text}` - : item.text; - - this.currentAgentMessageItemId = item.id; - this.hasTextInCurrentStep = true; - this.hasToolActivitySinceAgentMessage = false; - events.push( - this.makeEvent('stream_chunk', { - chunkType: 'text', - content, - }), - ); - - return events; + return this.handleAgentMessageItem(item); } if (!item.id) return []; @@ -731,6 +718,50 @@ export class CodexAdapter implements AgentEventAdapter { return events; } + private handleAgentMessageItem(item: any): HeterogeneousAgentEvent[] { + const text = getAgentMessageText(item); + if (text === undefined) return []; + + const events: HeterogeneousAgentEvent[] = []; + const isNewAgentMessageItem = !!item.id && item.id !== this.currentAgentMessageItemId; + const shouldStartNewStep = this.hasToolActivitySinceAgentMessage && isNewAgentMessageItem; + + if (shouldStartNewStep) { + this.stepIndex += 1; + this.resetStepToolCalls(); + this.currentAgentMessageText = ''; + this.currentStepText = ''; + this.hasTextInCurrentStep = false; + events.push(this.makeEvent('stream_end', {})); + events.push(this.makeEvent('stream_start', this.getStreamStartData({ newStep: true }))); + } + + const separator = this.hasTextInCurrentStep && isNewAgentMessageItem ? '\n\n' : ''; + if (isNewAgentMessageItem) { + this.currentStepText = `${this.currentStepText}${separator}${text}`; + } else { + const prefixLength = Math.max( + 0, + this.currentStepText.length - this.currentAgentMessageText.length, + ); + this.currentStepText = `${this.currentStepText.slice(0, prefixLength)}${text}`; + } + + this.currentAgentMessageItemId = item.id; + this.currentAgentMessageText = text; + this.hasTextInCurrentStep = true; + this.hasToolActivitySinceAgentMessage = false; + events.push( + this.makeEvent('stream_chunk', { + chunkType: 'text', + content: this.currentStepText, + contentMode: 'snapshot', + }), + ); + + return events; + } + private drainPendingToolEndEvents(): HeterogeneousAgentEvent[] { const events = [...this.pendingToolCalls].map((toolCallId) => this.makeEvent('tool_end', { diff --git a/packages/heterogeneous-agents/src/types.ts b/packages/heterogeneous-agents/src/types.ts index 5b70fea090..750863b7a0 100644 --- a/packages/heterogeneous-agents/src/types.ts +++ b/packages/heterogeneous-agents/src/types.ts @@ -166,6 +166,12 @@ export interface SubagentEventContext { export interface StreamChunkData { chunkType: StreamChunkType; content?: string; + /** + * Defaults to `delta`. + * `snapshot` means `content` is the full current text for this stream step, + * not an append-only token delta. + */ + contentMode?: 'delta' | 'snapshot'; reasoning?: string; /** * Subagent context for the entire chunk — peer to `toolsCalling`, diff --git a/packages/openapi/src/services/responses.service.ts b/packages/openapi/src/services/responses.service.ts index f3b071c7d5..ffbbdb9be5 100644 --- a/packages/openapi/src/services/responses.service.ts +++ b/packages/openapi/src/services/responses.service.ts @@ -601,20 +601,35 @@ export class ResponsesService extends BaseService { if (event.type === 'stream_chunk') { const chunk = event.data as StreamChunkData; - if (chunk.chunkType === 'text' && chunk.content) { + if ( + chunk.chunkType === 'text' && + typeof chunk.content === 'string' && + (chunk.content || chunk.contentMode === 'snapshot') + ) { // Start text message output item if not already started yield* startTextMessage(seq); - accumulatedText += chunk.content; - yield { - content_index: 0, - delta: chunk.content, - item_id: currentTextItemId, - logprobs: [], - output_index: currentOutputIndex, - sequence_number: seq.n++, - type: 'response.output_text.delta' as const, - }; + const nextText = + chunk.contentMode === 'snapshot' ? chunk.content : accumulatedText + chunk.content; + const delta = + chunk.contentMode === 'snapshot' + ? nextText.startsWith(accumulatedText) + ? nextText.slice(accumulatedText.length) + : '' + : chunk.content; + accumulatedText = nextText; + + if (delta) { + yield { + content_index: 0, + delta, + item_id: currentTextItemId, + logprobs: [], + output_index: currentOutputIndex, + sequence_number: seq.n++, + type: 'response.output_text.delta' as const, + }; + } } else if (chunk.chunkType === 'tools_calling' && chunk.toolsCalling) { // Close any open text message before emitting tool calls yield* finishTextMessage(seq, accumulatedText); diff --git a/src/features/AgentMockDevtools/hooks/createMockStoreInjector.ts b/src/features/AgentMockDevtools/hooks/createMockStoreInjector.ts index 8c437bd645..862b931698 100644 --- a/src/features/AgentMockDevtools/hooks/createMockStoreInjector.ts +++ b/src/features/AgentMockDevtools/hooks/createMockStoreInjector.ts @@ -101,8 +101,13 @@ export const createMockStoreInjector = (get: () => ChatStore, params: MockStoreI const data = event.data as StreamChunkData | undefined; if (!data) break; - if (data.chunkType === 'text' && data.content) { - accumulatedContent += data.content; + if ( + data.chunkType === 'text' && + typeof data.content === 'string' && + (data.content || data.contentMode === 'snapshot') + ) { + accumulatedContent = + data.contentMode === 'snapshot' ? data.content : accumulatedContent + data.content; get().internal_dispatchMessage( { id: assistantMessageId, diff --git a/src/store/chat/slices/aiChat/actions/__tests__/gatewayEventHandler.test.ts b/src/store/chat/slices/aiChat/actions/__tests__/gatewayEventHandler.test.ts index 8e030b1ddf..f1abead8f1 100644 --- a/src/store/chat/slices/aiChat/actions/__tests__/gatewayEventHandler.test.ts +++ b/src/store/chat/slices/aiChat/actions/__tests__/gatewayEventHandler.test.ts @@ -193,6 +193,31 @@ describe('createGatewayEventHandler', () => { ); }); + it('should replace text content for snapshot chunks', async () => { + const store = createMockStore(); + const handler = createHandler(store); + + handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Draft with tail' })); + handler( + makeEvent('stream_chunk', { + chunkType: 'text', + content: 'Draft', + contentMode: 'snapshot', + }), + ); + handler(makeEvent('stream_chunk', { chunkType: 'text', content: ' final' })); + await flush(); + + expect(store.internal_dispatchMessage).toHaveBeenLastCalledWith( + { + id: 'msg-initial', + type: 'updateMessage', + value: { content: 'Draft final' }, + }, + { operationId: 'op-1' }, + ); + }); + it('should accumulate reasoning content', async () => { const store = createMockStore(); const handler = createHandler(store); diff --git a/src/store/chat/slices/aiChat/actions/__tests__/heterogeneousAgentExecutor.test.ts b/src/store/chat/slices/aiChat/actions/__tests__/heterogeneousAgentExecutor.test.ts index 869fab8416..b6c47396fa 100644 --- a/src/store/chat/slices/aiChat/actions/__tests__/heterogeneousAgentExecutor.test.ts +++ b/src/store/chat/slices/aiChat/actions/__tests__/heterogeneousAgentExecutor.test.ts @@ -356,6 +356,15 @@ const codexAgentMessage = (id: string, text: string) => ({ type: 'item.completed', }); +const codexAgentMessageUpdated = (id: string, text: string) => ({ + item: { + id, + text, + type: 'agent_message', + }, + type: 'item.updated', +}); + const codexCommandStarted = (id: string, command: string) => ({ item: { aggregated_output: '', @@ -1322,6 +1331,38 @@ describe('heterogeneousAgentExecutor DB persistence', () => { }); }); + it('should persist the latest Codex agent_message snapshot instead of appending snapshots', async () => { + const contentUpdates: Array<{ assistantId: string; content: string }> = []; + mockUpdateMessage.mockImplementation(async (id: string, val: any) => { + if (typeof val.content === 'string') { + contentUpdates.push({ assistantId: id, content: val.content }); + } + }); + + await runWithEvents( + [ + codexThreadStarted(), + codexTurnStarted(), + codexAgentMessageUpdated('item_0', 'Draft with stale tail'), + codexAgentMessageUpdated('item_0', 'Draft'), + codexAgentMessage('item_0', 'Draft final'), + codexTurnCompleted({ input_tokens: 10, output_tokens: 3 }), + ], + { + params: { + heterogeneousProvider: { command: 'codex', type: 'codex' as const }, + }, + }, + ); + + expect( + contentUpdates.findLast((update) => update.assistantId === 'ast-initial')?.content, + ).toBe('Draft final'); + expect(contentUpdates.map((update) => update.content)).not.toContain( + 'Draft with stale tailDraftDraft final', + ); + }); + it('should switch to a new assistant before persisting the next turn tool', async () => { const idCounter = { assistant: 0, tool: 0 }; mockCreateMessage.mockImplementation(async (params: any) => { diff --git a/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts b/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts index ef49cadb0d..2bb27cdc07 100644 --- a/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts +++ b/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts @@ -350,11 +350,16 @@ export const createGatewayEventHandler = ( const data = event.data as StreamChunkData | undefined; if (!data) return; - if (data.chunkType === 'text' && data.content) { + if ( + data.chunkType === 'text' && + typeof data.content === 'string' && + (data.content || data.contentMode === 'snapshot') + ) { // Text after reasoning marks the end of the thinking pass — see // `StreamingHandler.handleText` for the same transition. endReasoningIfNeeded(); - accumulatedContent += data.content; + accumulatedContent = + data.contentMode === 'snapshot' ? data.content : accumulatedContent + data.content; hasStreamedContent = true; get().internal_dispatchMessage( { diff --git a/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts b/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts index 1db326e373..35ea8cae2f 100644 --- a/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts +++ b/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts @@ -1369,8 +1369,13 @@ export const executeHeterogeneousAgent = async ( const mainAsstId = currentAssistantMessageId; persistQueue = persistQueue.then(() => reduceAndApplySubagent(event, mainAsstId)); } else { - if (chunk?.chunkType === 'text' && chunk.content) { - accumulatedContent += chunk.content; + if ( + chunk?.chunkType === 'text' && + typeof chunk.content === 'string' && + (chunk.content || chunk.contentMode === 'snapshot') + ) { + accumulatedContent = + chunk.contentMode === 'snapshot' ? chunk.content : accumulatedContent + chunk.content; } if (chunk?.chunkType === 'reasoning' && chunk.reasoning) { accumulatedReasoning += chunk.reasoning;