From fa76928f62ee229f87e3764a802e93b46e5e4b62 Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Sat, 13 Jun 2026 13:34:41 +0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20fix:=20fix=20Codex=20resumed=20u?= =?UTF-8?q?sage=20reporting=20for=20heterogeneous=20agents=20(#15751)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 🐛 fix(heterogeneous-agent): normalize codex resumed usage --- .../main/controllers/HeterogeneousAgentCtr.ts | 13 ++- .../src/adapters/codex.test.ts | 99 +++++++++++++++- .../src/adapters/codex.ts | 106 +++++++++++++----- .../src/spawn/agentStreamPipeline.test.ts | 38 +++++++ .../src/spawn/agentStreamPipeline.ts | 20 +++- .../src/spawn/codexModel.test.ts | 31 +++++ .../src/spawn/codexModel.ts | 16 ++- .../heterogeneous-agents/src/spawn/index.ts | 1 + .../src/spawn/spawnAgent.test.ts | 80 ++++++++++++- .../src/spawn/spawnAgent.ts | 8 +- .../src/utils/codexUsage.ts | 59 ++++++++++ 11 files changed, 423 insertions(+), 48 deletions(-) create mode 100644 packages/heterogeneous-agents/src/utils/codexUsage.ts diff --git a/apps/desktop/src/main/controllers/HeterogeneousAgentCtr.ts b/apps/desktop/src/main/controllers/HeterogeneousAgentCtr.ts index 31d611a999..97cbc21ebe 100644 --- a/apps/desktop/src/main/controllers/HeterogeneousAgentCtr.ts +++ b/apps/desktop/src/main/controllers/HeterogeneousAgentCtr.ts @@ -23,7 +23,7 @@ import type { HeteroExecImageRef, } from '@lobechat/heterogeneous-agents/protocol'; import { buildHeteroExecStdinPayload } from '@lobechat/heterogeneous-agents/protocol'; -import type { AgentStreamEvent } from '@lobechat/heterogeneous-agents/spawn'; +import type { AgentStreamEvent, UsageData } from '@lobechat/heterogeneous-agents/spawn'; import { AgentStreamPipeline, buildAgentInput, @@ -911,6 +911,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule { let spawnPlan; let traceSession; let cwd: string; + let initialCumulativeUsage: UsageData | undefined; let spawnEnv: NodeJS.ProcessEnv; try { const driver = getHeterogeneousAgentDriver(session.agentType); @@ -945,6 +946,12 @@ export default class HeterogeneousAgentCtr extends ControllerModule { session.model = initialModel.model; session.modelSource = initialModel.source; } + + if (session.agentSessionId) { + initialCumulativeUsage = ( + await readCodexSessionModel(session.agentSessionId, { env: spawnEnv }) + )?.cumulativeUsage; + } } traceSession = await this.createCliTraceSession({ @@ -1001,6 +1008,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule { reject, resolve, session, + initialCumulativeUsage, spawnEnv, traceSession, useStdin, @@ -1070,6 +1078,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule { private handleSpawnedAgentProcess({ cwd, + initialCumulativeUsage, intervention, params, proc, @@ -1088,6 +1097,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule { reject: (reason?: unknown) => void; resolve: () => void; session: AgentSession; + initialCumulativeUsage?: UsageData | undefined; spawnEnv: NodeJS.ProcessEnv; spawnPlan: HeterogeneousAgentBuildPlan; traceSession: CliTraceSession | undefined; @@ -1128,6 +1138,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule { const pipeline = new AgentStreamPipeline({ agentType: session.agentType, cwd, + initialCumulativeUsage, initialModel: session.model, operationId: params.operationId, }); diff --git a/packages/heterogeneous-agents/src/adapters/codex.test.ts b/packages/heterogeneous-agents/src/adapters/codex.test.ts index faaf4edd63..0b6415b31d 100644 --- a/packages/heterogeneous-agents/src/adapters/codex.test.ts +++ b/packages/heterogeneous-agents/src/adapters/codex.test.ts @@ -1,6 +1,6 @@ import { readFile } from 'node:fs/promises'; -import { describe, expect, it } from 'vitest'; +import { describe, expect, it, vi } from 'vitest'; import { CodexAdapter } from './codex'; @@ -107,6 +107,40 @@ describe('CodexAdapter', () => { }); }); + it('classifies Codex usage-limit errors with retry metadata', () => { + vi.useFakeTimers(); + vi.setSystemTime(new Date(2026, 5, 13, 3, 9, 27)); + + try { + const adapter = new CodexAdapter(); + const message = + "You've hit your usage limit. Visit https://chatgpt.com/codex/settings/usage to purchase more credits or try again at 3:10 AM."; + const expectedResetAt = Math.floor(new Date(2026, 5, 13, 3, 10).getTime() / 1000); + + adapter.adapt({ type: 'turn.started' }); + const events = adapter.adapt({ + message, + type: 'error', + }); + + expect(events.map((event) => event.type)).toEqual(['stream_end', 'error']); + expect(events[1].data).toMatchObject({ + agentType: 'codex', + clearEchoedContent: true, + code: 'rate_limit', + docsUrl: 'https://chatgpt.com/codex/settings/usage', + message, + rateLimitInfo: { + resetsAt: expectedResetAt, + status: 'rejected', + }, + stderr: message, + }); + } finally { + vi.useRealTimers(); + } + }); + it('deduplicates the following turn.failed after a Codex JSONL error event', () => { const adapter = new CodexAdapter(); @@ -668,8 +702,16 @@ describe('CodexAdapter', () => { }); }); - it('keeps a real collab_tool_call stream fixture readable and drains unfinished attempts', async () => { - const adapter = new CodexAdapter(); + it('keeps a real collab_tool_call stream fixture readable and subtracts resumed usage', async () => { + const adapter = new CodexAdapter({ + initialCumulativeUsage: { + inputCachedTokens: 42_000, + inputCacheMissTokens: 52_000, + totalInputTokens: 94_000, + totalOutputTokens: 300, + totalTokens: 94_300, + }, + }); const rawEvents = await loadFixture('collab_tool_call.spawn_wait.jsonl'); const adapted = rawEvents.flatMap((event) => adapter.adapt(event)); @@ -715,6 +757,21 @@ describe('CodexAdapter', () => { }), ]), ); + expect( + adapted.find( + (event) => event.type === 'step_complete' && event.data?.phase === 'turn_metadata', + ), + ).toMatchObject({ + data: { + usage: { + inputCachedTokens: 1008, + inputCacheMissTokens: 937, + totalInputTokens: 1945, + totalOutputTokens: 116, + totalTokens: 2061, + }, + }, + }); expect(flushed).toEqual([]); }); @@ -895,6 +952,42 @@ describe('CodexAdapter', () => { }); }); + it('subtracts the previous cumulative Codex usage for resumed turns', () => { + const adapter = new CodexAdapter({ + initialCumulativeUsage: { + inputCachedTokens: 4, + inputCacheMissTokens: 10, + totalInputTokens: 14, + totalOutputTokens: 3, + totalTokens: 17, + }, + }); + + const events = adapter.adapt({ + type: 'turn.completed', + usage: { + cached_input_tokens: 9, + input_tokens: 25, + output_tokens: 11, + }, + }); + + expect(events[0]).toMatchObject({ + data: { + phase: 'turn_metadata', + provider: 'codex', + usage: { + inputCachedTokens: 5, + inputCacheMissTokens: 15, + totalInputTokens: 20, + totalOutputTokens: 8, + totalTokens: 28, + }, + }, + type: 'step_complete', + }); + }); + it('hydrates turn metadata model from session_configured when turn.completed omits it', () => { const adapter = new CodexAdapter(); diff --git a/packages/heterogeneous-agents/src/adapters/codex.ts b/packages/heterogeneous-agents/src/adapters/codex.ts index 9ce272ed56..e5958e2ed0 100644 --- a/packages/heterogeneous-agents/src/adapters/codex.ts +++ b/packages/heterogeneous-agents/src/adapters/codex.ts @@ -1,6 +1,7 @@ import type { AgentEventAdapter, HeterogeneousAgentEvent, + HeterogeneousRateLimitInfo, HeterogeneousTerminalErrorData, StepCompleteData, StreamStartData, @@ -8,6 +9,7 @@ import type { ToolResultData, UsageData, } from '../types'; +import { toCodexUsageData, toTurnUsageFromCumulative } from '../utils/codexUsage'; const CODEX_IDENTIFIER = 'codex'; const CODEX_COLLAB_TOOL_CALL_API = 'collab_tool_call'; @@ -15,6 +17,15 @@ const CODEX_COMMAND_API = 'command_execution'; const CODEX_FILE_CHANGE_API = 'file_change'; const CODEX_MCP_TOOL_CALL_API = 'mcp_tool_call'; const CODEX_TODO_LIST_API = 'todo_list'; +const CODEX_USAGE_SETTINGS_URL = 'https://chatgpt.com/codex/settings/usage'; + +const CODEX_USER_RATE_LIMIT_PATTERNS = [ + /you'?ve hit your usage limit/i, + /purchase more credits/i, + /\busage limit\b/i, +] as const; + +const CODEX_RETRY_AT_PATTERN = /\btry again at\s+(\d{1,2})(?::(\d{2}))?\s*(AM|PM)?\b/i; interface CodexBaseItem { id: string; @@ -96,34 +107,6 @@ const isMcpToolCallItem = (item: CodexToolItem): item is CodexMcpToolCallItem => const isTodoListItem = (item: CodexToolItem): item is CodexTodoListItem => item.type === CODEX_TODO_LIST_API; -const toUsageData = ( - raw: - | { - cached_input_tokens?: number; - input_tokens?: number; - output_tokens?: number; - } - | null - | undefined, -): UsageData | undefined => { - if (!raw) return undefined; - - const inputCacheMissTokens = raw.input_tokens || 0; - const inputCachedTokens = raw.cached_input_tokens || 0; - const totalInputTokens = inputCacheMissTokens + inputCachedTokens; - const totalOutputTokens = raw.output_tokens || 0; - - if (totalInputTokens + totalOutputTokens === 0) return undefined; - - return { - inputCachedTokens: inputCachedTokens || undefined, - inputCacheMissTokens, - totalInputTokens, - totalOutputTokens, - totalTokens: totalInputTokens + totalOutputTokens, - }; -}; - const normalizeTodoListItems = (item: CodexTodoListItem) => (item.items || []) .map((todo) => ({ @@ -522,9 +505,50 @@ const getCodexTerminalErrorStderr = (raw: any): string | undefined => { ); }; +const parseCodexRetryAt = (message: string, now = new Date()): number | undefined => { + const match = CODEX_RETRY_AT_PATTERN.exec(message); + if (!match) return; + + const hour = Number(match[1]); + const minute = match[2] ? Number(match[2]) : 0; + const meridiem = match[3]?.toUpperCase(); + + if (!Number.isInteger(hour) || !Number.isInteger(minute) || minute < 0 || minute > 59) { + return; + } + + let normalizedHour = hour; + if (meridiem) { + if (hour < 1 || hour > 12) return; + normalizedHour = (hour % 12) + (meridiem === 'PM' ? 12 : 0); + } else if (hour < 0 || hour > 23) { + return; + } + + const resetAt = new Date(now); + resetAt.setHours(normalizedHour, minute, 0, 0); + if (resetAt.getTime() <= now.getTime()) { + resetAt.setDate(resetAt.getDate() + 1); + } + + return Math.floor(resetAt.getTime() / 1000); +}; + +const getCodexRateLimitInfo = (message: string): HeterogeneousRateLimitInfo | undefined => { + if (!CODEX_USER_RATE_LIMIT_PATTERNS.some((pattern) => pattern.test(message))) return; + + const resetsAt = parseCodexRetryAt(message); + + return { + ...(resetsAt ? { resetsAt } : {}), + status: 'rejected', + }; +}; + export class CodexAdapter implements AgentEventAdapter { private currentAgentMessageItemId?: string; private currentModel?: string; + private lastCumulativeUsage?: UsageData; sessionId?: string; private hasTextInCurrentStep = false; @@ -538,6 +562,10 @@ export class CodexAdapter implements AgentEventAdapter { private terminalEndEmitted = false; private terminalErrorEmitted = false; + constructor(options: { initialCumulativeUsage?: UsageData | undefined } = {}) { + this.lastCumulativeUsage = options.initialCumulativeUsage; + } + adapt(raw: any): HeterogeneousAgentEvent[] { if (!raw || typeof raw !== 'object') return []; @@ -583,7 +611,9 @@ export class CodexAdapter implements AgentEventAdapter { const model = getEventModel(raw) || this.currentModel; if (model) this.currentModel = model; - const usage = toUsageData(raw.usage); + const cumulativeUsage = toCodexUsageData(raw.usage); + const usage = toTurnUsageFromCumulative(cumulativeUsage, this.lastCumulativeUsage); + if (cumulativeUsage) this.lastCumulativeUsage = cumulativeUsage; const events = this.drainPendingToolEndEvents(); if (usage || model) { @@ -607,11 +637,21 @@ export class CodexAdapter implements AgentEventAdapter { if (this.terminalErrorEmitted || this.terminalEndEmitted) return []; this.terminalErrorEmitted = true; + const message = getCodexTerminalErrorMessage(raw); + const stderr = getCodexTerminalErrorStderr(raw); + const rateLimitInfo = getCodexRateLimitInfo(message); const data: HeterogeneousTerminalErrorData = { agentType: CODEX_IDENTIFIER, clearEchoedContent: true, - message: getCodexTerminalErrorMessage(raw), - stderr: getCodexTerminalErrorStderr(raw), + ...(rateLimitInfo + ? { + code: 'rate_limit', + docsUrl: CODEX_USAGE_SETTINGS_URL, + rateLimitInfo, + } + : {}), + message, + stderr, }; const events: HeterogeneousAgentEvent[] = this.started @@ -623,6 +663,10 @@ export class CodexAdapter implements AgentEventAdapter { } private handleSessionConfigured(raw: any): HeterogeneousAgentEvent[] { + if (raw.initialCumulativeUsage) { + this.lastCumulativeUsage = raw.initialCumulativeUsage; + } + const model = getEventModel(raw); if (!model || model === this.currentModel) return []; diff --git a/packages/heterogeneous-agents/src/spawn/agentStreamPipeline.test.ts b/packages/heterogeneous-agents/src/spawn/agentStreamPipeline.test.ts index 78ce51f1ea..011fe67d0a 100644 --- a/packages/heterogeneous-agents/src/spawn/agentStreamPipeline.test.ts +++ b/packages/heterogeneous-agents/src/spawn/agentStreamPipeline.test.ts @@ -84,6 +84,44 @@ describe('AgentStreamPipeline', () => { }); }); + it('passes initial Codex cumulative usage into the adapter for resumed turns', async () => { + const pipeline = new AgentStreamPipeline({ + agentType: 'codex', + initialCumulativeUsage: { + inputCacheMissTokens: 100, + totalInputTokens: 100, + totalOutputTokens: 20, + totalTokens: 120, + }, + operationId: 'op-codex', + }); + + const events = await pipeline.push( + `${JSON.stringify({ + type: 'turn.completed', + usage: { + input_tokens: 180, + output_tokens: 45, + }, + })}\n`, + ); + + expect(events[0]).toMatchObject({ + data: { + phase: 'turn_metadata', + provider: 'codex', + usage: { + inputCacheMissTokens: 80, + totalInputTokens: 80, + totalOutputTokens: 25, + totalTokens: 105, + }, + }, + operationId: 'op-codex', + type: 'step_complete', + }); + }); + it('drops non-JSON noise lines instead of throwing', async () => { const pipeline = new AgentStreamPipeline({ agentType: 'claude-code', diff --git a/packages/heterogeneous-agents/src/spawn/agentStreamPipeline.ts b/packages/heterogeneous-agents/src/spawn/agentStreamPipeline.ts index d2dc978e05..857646f3ec 100644 --- a/packages/heterogeneous-agents/src/spawn/agentStreamPipeline.ts +++ b/packages/heterogeneous-agents/src/spawn/agentStreamPipeline.ts @@ -1,7 +1,7 @@ import type { AgentStreamEvent } from '@lobechat/agent-gateway-client'; import { createAdapter } from '../registry'; -import type { AgentEventAdapter, HeterogeneousAgentEvent } from '../types'; +import type { AgentEventAdapter, HeterogeneousAgentEvent, UsageData } from '../types'; import { CodexFileChangeTracker } from './codexFileChangeTracker'; import { JsonlStreamProcessor } from './jsonlProcessor'; import { toStreamEvent } from './streamEvent'; @@ -11,8 +11,10 @@ export interface AgentStreamPipelineOptions { agentType: string; /** Working directory used to resolve relative file paths emitted by CLI tools. */ cwd?: string; + /** Last known Codex cumulative usage before a resumed turn starts. */ + initialCumulativeUsage?: UsageData | undefined; /** Host-known model to emit before the CLI's first stdout payload. */ - initialModel?: string; + initialModel?: string | undefined; /** Operation id to stamp onto every emitted `AgentStreamEvent`. */ operationId: string; } @@ -42,8 +44,13 @@ export class AgentStreamPipeline { this.codexTracker = options.agentType === 'codex' ? new CodexFileChangeTracker(options.cwd) : undefined; - if (options.initialModel) { - this.queuedEvents.push(...this.configureSession({ model: options.initialModel })); + if (options.initialModel || options.initialCumulativeUsage) { + this.queuedEvents.push( + ...this.configureSession({ + initialCumulativeUsage: options.initialCumulativeUsage, + model: options.initialModel, + }), + ); } } @@ -71,7 +78,10 @@ export class AgentStreamPipeline { return [...trailing, ...flushed]; } - configureSession(data: { model?: string }): AgentStreamEvent[] { + configureSession(data: { + initialCumulativeUsage?: UsageData | undefined; + model?: string | undefined; + }): AgentStreamEvent[] { return this.toStreamEvents( this.adapter.adapt({ ...data, diff --git a/packages/heterogeneous-agents/src/spawn/codexModel.test.ts b/packages/heterogeneous-agents/src/spawn/codexModel.test.ts index 9732fd62b9..3ec40829a7 100644 --- a/packages/heterogeneous-agents/src/spawn/codexModel.test.ts +++ b/packages/heterogeneous-agents/src/spawn/codexModel.test.ts @@ -109,4 +109,35 @@ describe('codex model metadata helpers', () => { provider: 'openai', }); }); + + it('reads the latest cumulative usage from a Codex rollout session file', async () => { + const codexHome = await makeTempCodexHome(); + const sessionDir = path.join(codexHome, 'sessions', '2026', '06', '11'); + await mkdir(sessionDir, { recursive: true }); + await writeFile( + path.join(sessionDir, 'rollout-2026-06-11T01-31-27-thread-usage.jsonl'), + [ + JSON.stringify({ + payload: { usage: { input_tokens: 10, output_tokens: 2 } }, + type: 'event_msg', + }), + JSON.stringify({ + type: 'turn.completed', + usage: { cached_input_tokens: 5, input_tokens: 25, output_tokens: 9 }, + }), + ].join('\n'), + ); + + await expect( + readCodexSessionModel('thread-usage', { env: { CODEX_HOME: codexHome } }), + ).resolves.toMatchObject({ + cumulativeUsage: { + inputCachedTokens: 5, + inputCacheMissTokens: 25, + totalInputTokens: 30, + totalOutputTokens: 9, + totalTokens: 39, + }, + }); + }); }); diff --git a/packages/heterogeneous-agents/src/spawn/codexModel.ts b/packages/heterogeneous-agents/src/spawn/codexModel.ts index 87d6bcd55f..51d921352b 100644 --- a/packages/heterogeneous-agents/src/spawn/codexModel.ts +++ b/packages/heterogeneous-agents/src/spawn/codexModel.ts @@ -2,6 +2,9 @@ import { readdir, readFile, stat } from 'node:fs/promises'; import os from 'node:os'; import path from 'node:path'; +import type { UsageData } from '../types'; +import { toCodexUsageData } from '../utils/codexUsage'; + type CodexEnv = Record; export type CodexInitialModelSource = 'args' | 'config'; @@ -14,6 +17,7 @@ export interface CodexInitialModelResolution { export interface CodexSessionModelInfo { contextWindow?: number; + cumulativeUsage?: UsageData | undefined; line?: number; model?: string; provider?: string; @@ -51,7 +55,7 @@ const parseTomlStringAssignment = (line: string, key: string): string | undefine if (!match?.[1]) return; const value = unquoteTomlString(match[1]); - return value ? value : undefined; + return value || undefined; }; const normalizeProfileName = (raw: string): string => unquoteTomlString(raw.trim()); @@ -73,7 +77,7 @@ export const getCodexHome = ( homeDir: string = os.homedir(), ): string => { const configured = env.CODEX_HOME?.trim(); - return configured ? configured : path.join(homeDir, '.codex'); + return configured || path.join(homeDir, '.codex'); }; export const parseCodexModelFromArgs = (args: string[]): string | undefined => { @@ -265,6 +269,7 @@ export const readCodexSessionModel = async ( let model: string | undefined; let provider: string | undefined; let contextWindow: number | undefined; + let cumulativeUsage: UsageData | undefined; let lineNumber: number | undefined; const content = await readFile(sourceFile, 'utf8').catch(() => undefined); @@ -278,6 +283,9 @@ export const readCodexSessionModel = async ( try { const record = JSON.parse(line); const payload = record?.payload; + const usage = toCodexUsageData(record?.usage) || toCodexUsageData(payload?.usage); + if (usage) cumulativeUsage = usage; + const payloadModel = getStringValue(payload?.model) || getStringValue(payload?.collaboration_mode?.settings?.model); @@ -293,7 +301,7 @@ export const readCodexSessionModel = async ( } } - return model || provider || contextWindow - ? { contextWindow, line: lineNumber, model, provider, sourceFile } + return model || provider || contextWindow || cumulativeUsage + ? { contextWindow, cumulativeUsage, line: lineNumber, model, provider, sourceFile } : undefined; }; diff --git a/packages/heterogeneous-agents/src/spawn/index.ts b/packages/heterogeneous-agents/src/spawn/index.ts index 32fc9188c8..aa5b241c65 100644 --- a/packages/heterogeneous-agents/src/spawn/index.ts +++ b/packages/heterogeneous-agents/src/spawn/index.ts @@ -12,6 +12,7 @@ * `@lobechat/agent-gateway-client` (which is a browser-side WebSocket client * that producers have no business pulling in). */ +export type { UsageData } from '../types'; export { AgentStreamPipeline, type AgentStreamPipelineOptions } from './agentStreamPipeline'; export { type CliSpawnPlan, resolveCliSpawnPlan } from './cliSpawn'; export { CodexFileChangeTracker } from './codexFileChangeTracker'; diff --git a/packages/heterogeneous-agents/src/spawn/spawnAgent.test.ts b/packages/heterogeneous-agents/src/spawn/spawnAgent.test.ts index 1410778990..060110ade6 100644 --- a/packages/heterogeneous-agents/src/spawn/spawnAgent.test.ts +++ b/packages/heterogeneous-agents/src/spawn/spawnAgent.test.ts @@ -1,12 +1,15 @@ import * as childProcess from 'node:child_process'; import { EventEmitter } from 'node:events'; +import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises'; import * as os from 'node:os'; +import path from 'node:path'; import { PassThrough } from 'node:stream'; import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; const spawnCalls: Array<{ args: string[]; command: string; options: any }> = []; let nextFakeProc: any = null; +const tempDirs: string[] = []; const platformMock = vi.mocked(os.platform); const execFileMock = vi.mocked(childProcess.execFile); @@ -103,8 +106,9 @@ describe('spawnAgent', () => { execFileMock.mockReset(); }); - afterEach(() => { + afterEach(async () => { nextFakeProc = null; + await Promise.all(tempDirs.splice(0).map((dir) => rm(dir, { force: true, recursive: true }))); }); it('spawns claude with stream-json flags + writes prompt as user message to stdin', async () => { @@ -222,10 +226,13 @@ describe('spawnAgent', () => { }); it('uses codex `exec resume` form with thread id + `-` stdin marker on resume', async () => { + const codexHome = await mkdtemp(path.join(os.tmpdir(), 'lobe-codex-spawn-empty-')); + tempDirs.push(codexHome); nextFakeProc = createFakeProc().proc; const { spawnAgent } = await import('./spawnAgent'); await spawnAgent({ agentType: 'codex', + env: { CODEX_HOME: codexHome }, operationId: 'op-1', prompt: 'continue', resumeSessionId: 'thread_abc', @@ -237,10 +244,77 @@ describe('spawnAgent', () => { expect(args.at(-1)).toBe('-'); }); + it('seeds a real Codex resumed stream with the previous cumulative usage from the session file', async () => { + const threadId = '019dba1e-eec2-7a22-bdfb-ac6175e03081'; + const realCodexFixture = await readFile( + new URL('../adapters/__fixtures__/codex/collab_tool_call.spawn_wait.jsonl', import.meta.url), + 'utf8', + ); + const codexHome = await mkdtemp(path.join(os.tmpdir(), 'lobe-codex-spawn-')); + tempDirs.push(codexHome); + const sessionDir = path.join(codexHome, 'sessions', '2026', '06', '11'); + await mkdir(sessionDir, { recursive: true }); + await writeFile( + path.join(sessionDir, `rollout-2026-06-11T01-31-27-${threadId}.jsonl`), + JSON.stringify({ + type: 'turn.completed', + usage: { cached_input_tokens: 42_000, input_tokens: 52_000, output_tokens: 300 }, + }), + ); + + const fake = createFakeProc({ + stdoutChunks: [realCodexFixture], + }); + nextFakeProc = fake.proc; + + const { spawnAgent } = await import('./spawnAgent'); + const handle = await spawnAgent({ + agentType: 'codex', + env: { CODEX_HOME: codexHome }, + operationId: 'op-1', + prompt: 'continue', + resumeSessionId: threadId, + }); + fake.start(); + + const events: any[] = []; + for await (const event of handle.events) events.push(event); + await handle.exit; + + const usageEvent = events.find( + (event) => event.type === 'step_complete' && event.data?.phase === 'turn_metadata', + ); + expect(usageEvent).toMatchObject({ + data: { + phase: 'turn_metadata', + usage: { + inputCachedTokens: 1008, + inputCacheMissTokens: 937, + totalInputTokens: 1945, + totalOutputTokens: 116, + totalTokens: 2061, + }, + }, + type: 'step_complete', + }); + expect(usageEvent?.data.usage.totalTokens).not.toBe(96_361); + expect(events).toEqual( + expect.arrayContaining([ + expect.objectContaining({ + data: expect.objectContaining({ + content: 'Wait completed: 2 + 2 = 4', + toolCallId: 'item_4', + }), + type: 'tool_result', + }), + ]), + ); + }); + it('serializes multimodal content blocks into the CC stream-json user message', async () => { nextFakeProc = createFakeProc().proc; const { spawnAgent } = await import('./spawnAgent'); - const pngBytes = Buffer.from([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00]); + const pngBytes = Buffer.from('89504e470d0a1a0a00', 'hex'); await spawnAgent({ agentType: 'claude-code', operationId: 'op-1', @@ -275,7 +349,7 @@ describe('spawnAgent', () => { const fsp = await import('node:fs/promises'); const cacheDir = await fsp.mkdtemp(`${os.tmpdir()}/spawn-agent-codex-`); - const pngBytes = Buffer.from([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00]); + const pngBytes = Buffer.from('89504e470d0a1a0a00', 'hex'); const { spawnAgent } = await import('./spawnAgent'); await spawnAgent({ agentType: 'codex', diff --git a/packages/heterogeneous-agents/src/spawn/spawnAgent.ts b/packages/heterogeneous-agents/src/spawn/spawnAgent.ts index 3f5d684b7b..08b5d2b5dc 100644 --- a/packages/heterogeneous-agents/src/spawn/spawnAgent.ts +++ b/packages/heterogeneous-agents/src/spawn/spawnAgent.ts @@ -5,7 +5,7 @@ import type { AgentStreamEvent } from '@lobechat/agent-gateway-client'; import { AgentStreamPipeline } from './agentStreamPipeline'; import { resolveCliSpawnPlan } from './cliSpawn'; -import { resolveCodexInitialModel } from './codexModel'; +import { readCodexSessionModel, resolveCodexInitialModel } from './codexModel'; import type { AgentPromptInput, BuildAgentInputOptions } from './input'; import { buildAgentInput } from './input'; @@ -262,6 +262,11 @@ export const spawnAgent = async (options: SpawnAgentOptions): Promise { + if (!raw) return undefined; + + const inputCacheMissTokens = raw.input_tokens || 0; + const inputCachedTokens = raw.cached_input_tokens || 0; + const totalInputTokens = inputCacheMissTokens + inputCachedTokens; + const totalOutputTokens = raw.output_tokens || 0; + + if (totalInputTokens + totalOutputTokens === 0) return undefined; + + return { + inputCachedTokens: inputCachedTokens || undefined, + inputCacheMissTokens, + totalInputTokens, + totalOutputTokens, + totalTokens: totalInputTokens + totalOutputTokens, + }; +}; + +const isMonotonicUsage = (current: UsageData, previous: UsageData) => + current.inputCacheMissTokens >= previous.inputCacheMissTokens && + (current.inputCachedTokens || 0) >= (previous.inputCachedTokens || 0) && + current.totalInputTokens >= previous.totalInputTokens && + current.totalOutputTokens >= previous.totalOutputTokens && + current.totalTokens >= previous.totalTokens; + +export const toTurnUsageFromCumulative = ( + current: UsageData | undefined, + previous: UsageData | undefined, +): UsageData | undefined => { + if (!current || !previous) return current; + if (!isMonotonicUsage(current, previous)) return current; + + const inputCacheMissTokens = current.inputCacheMissTokens - previous.inputCacheMissTokens; + const inputCachedTokens = (current.inputCachedTokens || 0) - (previous.inputCachedTokens || 0); + const totalInputTokens = current.totalInputTokens - previous.totalInputTokens; + const totalOutputTokens = current.totalOutputTokens - previous.totalOutputTokens; + const totalTokens = totalInputTokens + totalOutputTokens; + + if (totalTokens === 0) return undefined; + + return { + inputCachedTokens: inputCachedTokens || undefined, + inputCacheMissTokens, + totalInputTokens, + totalOutputTokens, + totalTokens, + }; +};