🐛 fix(agent): handle Codex message snapshots

This commit is contained in:
Arvin Xu
2026-06-13 11:15:12 +08:00
parent c7e0c83174
commit 500758e9f4
11 changed files with 308 additions and 52 deletions
@@ -121,6 +121,12 @@ export interface StreamChunkData {
| 'content_part'
| 'reasoning_part';
content?: string;
/**
* Defaults to `delta`.
* `snapshot` means `content` is the full current text for this stream step,
* not an append-only token delta.
*/
contentMode?: 'delta' | 'snapshot';
/** Multimodal content parts (text + images) */
contentParts?: Array<{ text: string; type: 'text' } | { image: string; type: 'image' }>;
/** Grounding/search data */
@@ -66,6 +66,12 @@ export type StreamChunkType =
export interface StreamChunkData {
chunkType: StreamChunkType;
content?: string;
/**
* Defaults to `delta`.
* `snapshot` means `content` is the full current text for this stream step,
* not an append-only token delta.
*/
contentMode?: 'delta' | 'snapshot';
contentParts?: Array<{ text: string; type: 'text' } | { image: string; type: 'image' }>;
grounding?: any;
imageList?: any[];
@@ -43,7 +43,110 @@ describe('CodexAdapter', () => {
type: 'stream_start',
});
expect(text[0]).toMatchObject({
data: { chunkType: 'text', content: 'hello from codex' },
data: { chunkType: 'text', content: 'hello from codex', contentMode: 'snapshot' },
type: 'stream_chunk',
});
});
it('treats Codex agent message updates as replaceable text snapshots', () => {
const adapter = new CodexAdapter();
adapter.adapt({ type: 'turn.started' });
const draft = adapter.adapt({
item: {
id: 'item_0',
text: 'I will inspect the whole repository.',
type: 'agent_message',
},
type: 'item.updated',
});
const shortened = adapter.adapt({
item: {
id: 'item_0',
text: 'I will inspect the repo.',
type: 'agent_message',
},
type: 'item.updated',
});
const completed = adapter.adapt({
item: {
id: 'item_0',
text: 'I will inspect the repo first.',
type: 'agent_message',
},
type: 'item.completed',
});
expect(draft[0]).toMatchObject({
data: {
chunkType: 'text',
content: 'I will inspect the whole repository.',
contentMode: 'snapshot',
},
type: 'stream_chunk',
});
expect(shortened[0]).toMatchObject({
data: {
chunkType: 'text',
content: 'I will inspect the repo.',
contentMode: 'snapshot',
},
type: 'stream_chunk',
});
expect(completed[0]).toMatchObject({
data: {
chunkType: 'text',
content: 'I will inspect the repo first.',
contentMode: 'snapshot',
},
type: 'stream_chunk',
});
});
it('keeps prior agent message text when a later item updates in the same step', () => {
const adapter = new CodexAdapter();
adapter.adapt({ type: 'turn.started' });
adapter.adapt({
item: {
id: 'item_0',
text: 'First status update.',
type: 'agent_message',
},
type: 'item.completed',
});
const secondDraft = adapter.adapt({
item: {
id: 'item_1',
text: 'Second draft.',
type: 'agent_message',
},
type: 'item.updated',
});
const secondRevision = adapter.adapt({
item: {
id: 'item_1',
text: 'Second revised status update.',
type: 'agent_message',
},
type: 'item.updated',
});
expect(secondDraft[0]).toMatchObject({
data: {
chunkType: 'text',
content: 'First status update.\n\nSecond draft.',
contentMode: 'snapshot',
},
type: 'stream_chunk',
});
expect(secondRevision[0]).toMatchObject({
data: {
chunkType: 'text',
content: 'First status update.\n\nSecond revised status update.',
contentMode: 'snapshot',
},
type: 'stream_chunk',
});
});
@@ -231,7 +334,11 @@ describe('CodexAdapter', () => {
expect(secondMessage).toHaveLength(1);
expect(secondMessage[0]).toMatchObject({
data: { chunkType: 'text', content: '\n\nSecond status update.' },
data: {
chunkType: 'text',
content: 'First status update.\n\nSecond status update.',
contentMode: 'snapshot',
},
stepIndex: 0,
type: 'stream_chunk',
});
@@ -289,7 +396,11 @@ describe('CodexAdapter', () => {
expect(nextMessage).toHaveLength(1);
expect(nextMessage[0]).toMatchObject({
data: { chunkType: 'text', content: '\n\nThe broad search is done; continuing.' },
data: {
chunkType: 'text',
content: 'Continuing with narrower checks.\n\nThe broad search is done; continuing.',
contentMode: 'snapshot',
},
stepIndex: 1,
type: 'stream_chunk',
});
@@ -522,9 +522,17 @@ const getCodexTerminalErrorStderr = (raw: any): string | undefined => {
);
};
const getAgentMessageText = (item: unknown): string | undefined => {
if (!isRecord(item)) return;
const text = item.text;
return typeof text === 'string' ? text : undefined;
};
export class CodexAdapter implements AgentEventAdapter {
private currentAgentMessageItemId?: string;
private currentAgentMessageText = '';
private currentModel?: string;
private currentStepText = '';
sessionId?: string;
private hasTextInCurrentStep = false;
@@ -563,6 +571,9 @@ export class CodexAdapter implements AgentEventAdapter {
case 'item.started': {
return this.handleItemStarted(raw.item);
}
case 'item.updated': {
return this.handleItemUpdated(raw.item);
}
case 'item.completed': {
return this.handleItemCompleted(raw.item);
}
@@ -638,6 +649,8 @@ export class CodexAdapter implements AgentEventAdapter {
private handleTurnStarted(): HeterogeneousAgentEvent[] {
this.currentAgentMessageItemId = undefined;
this.currentAgentMessageText = '';
this.currentStepText = '';
this.hasTextInCurrentStep = false;
this.hasToolActivitySinceAgentMessage = false;
this.resetStepToolCalls();
@@ -666,42 +679,16 @@ export class CodexAdapter implements AgentEventAdapter {
return this.emitToolChunk(tool);
}
private handleItemUpdated(item: any): HeterogeneousAgentEvent[] {
if (item?.type === 'agent_message') return this.handleAgentMessageItem(item);
return [];
}
private handleItemCompleted(item: any): HeterogeneousAgentEvent[] {
if (!item?.type) return [];
if (item.type === 'agent_message') {
if (!item.text) return [];
const events: HeterogeneousAgentEvent[] = [];
const shouldStartNewStep =
this.hasToolActivitySinceAgentMessage &&
!!item.id &&
item.id !== this.currentAgentMessageItemId;
if (shouldStartNewStep) {
this.stepIndex += 1;
this.resetStepToolCalls();
this.hasTextInCurrentStep = false;
events.push(this.makeEvent('stream_end', {}));
events.push(this.makeEvent('stream_start', this.getStreamStartData({ newStep: true })));
}
const content =
this.hasTextInCurrentStep && item.id !== this.currentAgentMessageItemId
? `\n\n${item.text}`
: item.text;
this.currentAgentMessageItemId = item.id;
this.hasTextInCurrentStep = true;
this.hasToolActivitySinceAgentMessage = false;
events.push(
this.makeEvent('stream_chunk', {
chunkType: 'text',
content,
}),
);
return events;
return this.handleAgentMessageItem(item);
}
if (!item.id) return [];
@@ -731,6 +718,50 @@ export class CodexAdapter implements AgentEventAdapter {
return events;
}
private handleAgentMessageItem(item: any): HeterogeneousAgentEvent[] {
const text = getAgentMessageText(item);
if (text === undefined) return [];
const events: HeterogeneousAgentEvent[] = [];
const isNewAgentMessageItem = !!item.id && item.id !== this.currentAgentMessageItemId;
const shouldStartNewStep = this.hasToolActivitySinceAgentMessage && isNewAgentMessageItem;
if (shouldStartNewStep) {
this.stepIndex += 1;
this.resetStepToolCalls();
this.currentAgentMessageText = '';
this.currentStepText = '';
this.hasTextInCurrentStep = false;
events.push(this.makeEvent('stream_end', {}));
events.push(this.makeEvent('stream_start', this.getStreamStartData({ newStep: true })));
}
const separator = this.hasTextInCurrentStep && isNewAgentMessageItem ? '\n\n' : '';
if (isNewAgentMessageItem) {
this.currentStepText = `${this.currentStepText}${separator}${text}`;
} else {
const prefixLength = Math.max(
0,
this.currentStepText.length - this.currentAgentMessageText.length,
);
this.currentStepText = `${this.currentStepText.slice(0, prefixLength)}${text}`;
}
this.currentAgentMessageItemId = item.id;
this.currentAgentMessageText = text;
this.hasTextInCurrentStep = true;
this.hasToolActivitySinceAgentMessage = false;
events.push(
this.makeEvent('stream_chunk', {
chunkType: 'text',
content: this.currentStepText,
contentMode: 'snapshot',
}),
);
return events;
}
private drainPendingToolEndEvents(): HeterogeneousAgentEvent[] {
const events = [...this.pendingToolCalls].map((toolCallId) =>
this.makeEvent('tool_end', {
@@ -166,6 +166,12 @@ export interface SubagentEventContext {
export interface StreamChunkData {
chunkType: StreamChunkType;
content?: string;
/**
* Defaults to `delta`.
* `snapshot` means `content` is the full current text for this stream step,
* not an append-only token delta.
*/
contentMode?: 'delta' | 'snapshot';
reasoning?: string;
/**
* Subagent context for the entire chunk — peer to `toolsCalling`,
@@ -601,20 +601,35 @@ export class ResponsesService extends BaseService {
if (event.type === 'stream_chunk') {
const chunk = event.data as StreamChunkData;
if (chunk.chunkType === 'text' && chunk.content) {
if (
chunk.chunkType === 'text' &&
typeof chunk.content === 'string' &&
(chunk.content || chunk.contentMode === 'snapshot')
) {
// Start text message output item if not already started
yield* startTextMessage(seq);
accumulatedText += chunk.content;
yield {
content_index: 0,
delta: chunk.content,
item_id: currentTextItemId,
logprobs: [],
output_index: currentOutputIndex,
sequence_number: seq.n++,
type: 'response.output_text.delta' as const,
};
const nextText =
chunk.contentMode === 'snapshot' ? chunk.content : accumulatedText + chunk.content;
const delta =
chunk.contentMode === 'snapshot'
? nextText.startsWith(accumulatedText)
? nextText.slice(accumulatedText.length)
: ''
: chunk.content;
accumulatedText = nextText;
if (delta) {
yield {
content_index: 0,
delta,
item_id: currentTextItemId,
logprobs: [],
output_index: currentOutputIndex,
sequence_number: seq.n++,
type: 'response.output_text.delta' as const,
};
}
} else if (chunk.chunkType === 'tools_calling' && chunk.toolsCalling) {
// Close any open text message before emitting tool calls
yield* finishTextMessage(seq, accumulatedText);
@@ -101,8 +101,13 @@ export const createMockStoreInjector = (get: () => ChatStore, params: MockStoreI
const data = event.data as StreamChunkData | undefined;
if (!data) break;
if (data.chunkType === 'text' && data.content) {
accumulatedContent += data.content;
if (
data.chunkType === 'text' &&
typeof data.content === 'string' &&
(data.content || data.contentMode === 'snapshot')
) {
accumulatedContent =
data.contentMode === 'snapshot' ? data.content : accumulatedContent + data.content;
get().internal_dispatchMessage(
{
id: assistantMessageId,
@@ -193,6 +193,31 @@ describe('createGatewayEventHandler', () => {
);
});
it('should replace text content for snapshot chunks', async () => {
const store = createMockStore();
const handler = createHandler(store);
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Draft with tail' }));
handler(
makeEvent('stream_chunk', {
chunkType: 'text',
content: 'Draft',
contentMode: 'snapshot',
}),
);
handler(makeEvent('stream_chunk', { chunkType: 'text', content: ' final' }));
await flush();
expect(store.internal_dispatchMessage).toHaveBeenLastCalledWith(
{
id: 'msg-initial',
type: 'updateMessage',
value: { content: 'Draft final' },
},
{ operationId: 'op-1' },
);
});
it('should accumulate reasoning content', async () => {
const store = createMockStore();
const handler = createHandler(store);
@@ -356,6 +356,15 @@ const codexAgentMessage = (id: string, text: string) => ({
type: 'item.completed',
});
const codexAgentMessageUpdated = (id: string, text: string) => ({
item: {
id,
text,
type: 'agent_message',
},
type: 'item.updated',
});
const codexCommandStarted = (id: string, command: string) => ({
item: {
aggregated_output: '',
@@ -1322,6 +1331,38 @@ describe('heterogeneousAgentExecutor DB persistence', () => {
});
});
it('should persist the latest Codex agent_message snapshot instead of appending snapshots', async () => {
const contentUpdates: Array<{ assistantId: string; content: string }> = [];
mockUpdateMessage.mockImplementation(async (id: string, val: any) => {
if (typeof val.content === 'string') {
contentUpdates.push({ assistantId: id, content: val.content });
}
});
await runWithEvents(
[
codexThreadStarted(),
codexTurnStarted(),
codexAgentMessageUpdated('item_0', 'Draft with stale tail'),
codexAgentMessageUpdated('item_0', 'Draft'),
codexAgentMessage('item_0', 'Draft final'),
codexTurnCompleted({ input_tokens: 10, output_tokens: 3 }),
],
{
params: {
heterogeneousProvider: { command: 'codex', type: 'codex' as const },
},
},
);
expect(
contentUpdates.findLast((update) => update.assistantId === 'ast-initial')?.content,
).toBe('Draft final');
expect(contentUpdates.map((update) => update.content)).not.toContain(
'Draft with stale tailDraftDraft final',
);
});
it('should switch to a new assistant before persisting the next turn tool', async () => {
const idCounter = { assistant: 0, tool: 0 };
mockCreateMessage.mockImplementation(async (params: any) => {
@@ -350,11 +350,16 @@ export const createGatewayEventHandler = (
const data = event.data as StreamChunkData | undefined;
if (!data) return;
if (data.chunkType === 'text' && data.content) {
if (
data.chunkType === 'text' &&
typeof data.content === 'string' &&
(data.content || data.contentMode === 'snapshot')
) {
// Text after reasoning marks the end of the thinking pass — see
// `StreamingHandler.handleText` for the same transition.
endReasoningIfNeeded();
accumulatedContent += data.content;
accumulatedContent =
data.contentMode === 'snapshot' ? data.content : accumulatedContent + data.content;
hasStreamedContent = true;
get().internal_dispatchMessage(
{
@@ -1369,8 +1369,13 @@ export const executeHeterogeneousAgent = async (
const mainAsstId = currentAssistantMessageId;
persistQueue = persistQueue.then(() => reduceAndApplySubagent(event, mainAsstId));
} else {
if (chunk?.chunkType === 'text' && chunk.content) {
accumulatedContent += chunk.content;
if (
chunk?.chunkType === 'text' &&
typeof chunk.content === 'string' &&
(chunk.content || chunk.contentMode === 'snapshot')
) {
accumulatedContent =
chunk.contentMode === 'snapshot' ? chunk.content : accumulatedContent + chunk.content;
}
if (chunk?.chunkType === 'reasoning' && chunk.reasoning) {
accumulatedReasoning += chunk.reasoning;