mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-13 19:20:04 +00:00
🐛 fix: fix Codex resumed usage reporting for heterogeneous agents (#15751)
🐛 fix(heterogeneous-agent): normalize codex resumed usage
This commit is contained in:
@@ -23,7 +23,7 @@ import type {
|
||||
HeteroExecImageRef,
|
||||
} from '@lobechat/heterogeneous-agents/protocol';
|
||||
import { buildHeteroExecStdinPayload } from '@lobechat/heterogeneous-agents/protocol';
|
||||
import type { AgentStreamEvent } from '@lobechat/heterogeneous-agents/spawn';
|
||||
import type { AgentStreamEvent, UsageData } from '@lobechat/heterogeneous-agents/spawn';
|
||||
import {
|
||||
AgentStreamPipeline,
|
||||
buildAgentInput,
|
||||
@@ -911,6 +911,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
let spawnPlan;
|
||||
let traceSession;
|
||||
let cwd: string;
|
||||
let initialCumulativeUsage: UsageData | undefined;
|
||||
let spawnEnv: NodeJS.ProcessEnv;
|
||||
try {
|
||||
const driver = getHeterogeneousAgentDriver(session.agentType);
|
||||
@@ -945,6 +946,12 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
session.model = initialModel.model;
|
||||
session.modelSource = initialModel.source;
|
||||
}
|
||||
|
||||
if (session.agentSessionId) {
|
||||
initialCumulativeUsage = (
|
||||
await readCodexSessionModel(session.agentSessionId, { env: spawnEnv })
|
||||
)?.cumulativeUsage;
|
||||
}
|
||||
}
|
||||
|
||||
traceSession = await this.createCliTraceSession({
|
||||
@@ -1001,6 +1008,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
reject,
|
||||
resolve,
|
||||
session,
|
||||
initialCumulativeUsage,
|
||||
spawnEnv,
|
||||
traceSession,
|
||||
useStdin,
|
||||
@@ -1070,6 +1078,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
|
||||
private handleSpawnedAgentProcess({
|
||||
cwd,
|
||||
initialCumulativeUsage,
|
||||
intervention,
|
||||
params,
|
||||
proc,
|
||||
@@ -1088,6 +1097,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
reject: (reason?: unknown) => void;
|
||||
resolve: () => void;
|
||||
session: AgentSession;
|
||||
initialCumulativeUsage?: UsageData | undefined;
|
||||
spawnEnv: NodeJS.ProcessEnv;
|
||||
spawnPlan: HeterogeneousAgentBuildPlan;
|
||||
traceSession: CliTraceSession | undefined;
|
||||
@@ -1128,6 +1138,7 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
const pipeline = new AgentStreamPipeline({
|
||||
agentType: session.agentType,
|
||||
cwd,
|
||||
initialCumulativeUsage,
|
||||
initialModel: session.model,
|
||||
operationId: params.operationId,
|
||||
});
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { readFile } from 'node:fs/promises';
|
||||
|
||||
import { describe, expect, it } from 'vitest';
|
||||
import { describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { CodexAdapter } from './codex';
|
||||
|
||||
@@ -107,6 +107,40 @@ describe('CodexAdapter', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('classifies Codex usage-limit errors with retry metadata', () => {
|
||||
vi.useFakeTimers();
|
||||
vi.setSystemTime(new Date(2026, 5, 13, 3, 9, 27));
|
||||
|
||||
try {
|
||||
const adapter = new CodexAdapter();
|
||||
const message =
|
||||
"You've hit your usage limit. Visit https://chatgpt.com/codex/settings/usage to purchase more credits or try again at 3:10 AM.";
|
||||
const expectedResetAt = Math.floor(new Date(2026, 5, 13, 3, 10).getTime() / 1000);
|
||||
|
||||
adapter.adapt({ type: 'turn.started' });
|
||||
const events = adapter.adapt({
|
||||
message,
|
||||
type: 'error',
|
||||
});
|
||||
|
||||
expect(events.map((event) => event.type)).toEqual(['stream_end', 'error']);
|
||||
expect(events[1].data).toMatchObject({
|
||||
agentType: 'codex',
|
||||
clearEchoedContent: true,
|
||||
code: 'rate_limit',
|
||||
docsUrl: 'https://chatgpt.com/codex/settings/usage',
|
||||
message,
|
||||
rateLimitInfo: {
|
||||
resetsAt: expectedResetAt,
|
||||
status: 'rejected',
|
||||
},
|
||||
stderr: message,
|
||||
});
|
||||
} finally {
|
||||
vi.useRealTimers();
|
||||
}
|
||||
});
|
||||
|
||||
it('deduplicates the following turn.failed after a Codex JSONL error event', () => {
|
||||
const adapter = new CodexAdapter();
|
||||
|
||||
@@ -668,8 +702,16 @@ describe('CodexAdapter', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('keeps a real collab_tool_call stream fixture readable and drains unfinished attempts', async () => {
|
||||
const adapter = new CodexAdapter();
|
||||
it('keeps a real collab_tool_call stream fixture readable and subtracts resumed usage', async () => {
|
||||
const adapter = new CodexAdapter({
|
||||
initialCumulativeUsage: {
|
||||
inputCachedTokens: 42_000,
|
||||
inputCacheMissTokens: 52_000,
|
||||
totalInputTokens: 94_000,
|
||||
totalOutputTokens: 300,
|
||||
totalTokens: 94_300,
|
||||
},
|
||||
});
|
||||
const rawEvents = await loadFixture('collab_tool_call.spawn_wait.jsonl');
|
||||
|
||||
const adapted = rawEvents.flatMap((event) => adapter.adapt(event));
|
||||
@@ -715,6 +757,21 @@ describe('CodexAdapter', () => {
|
||||
}),
|
||||
]),
|
||||
);
|
||||
expect(
|
||||
adapted.find(
|
||||
(event) => event.type === 'step_complete' && event.data?.phase === 'turn_metadata',
|
||||
),
|
||||
).toMatchObject({
|
||||
data: {
|
||||
usage: {
|
||||
inputCachedTokens: 1008,
|
||||
inputCacheMissTokens: 937,
|
||||
totalInputTokens: 1945,
|
||||
totalOutputTokens: 116,
|
||||
totalTokens: 2061,
|
||||
},
|
||||
},
|
||||
});
|
||||
expect(flushed).toEqual([]);
|
||||
});
|
||||
|
||||
@@ -895,6 +952,42 @@ describe('CodexAdapter', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('subtracts the previous cumulative Codex usage for resumed turns', () => {
|
||||
const adapter = new CodexAdapter({
|
||||
initialCumulativeUsage: {
|
||||
inputCachedTokens: 4,
|
||||
inputCacheMissTokens: 10,
|
||||
totalInputTokens: 14,
|
||||
totalOutputTokens: 3,
|
||||
totalTokens: 17,
|
||||
},
|
||||
});
|
||||
|
||||
const events = adapter.adapt({
|
||||
type: 'turn.completed',
|
||||
usage: {
|
||||
cached_input_tokens: 9,
|
||||
input_tokens: 25,
|
||||
output_tokens: 11,
|
||||
},
|
||||
});
|
||||
|
||||
expect(events[0]).toMatchObject({
|
||||
data: {
|
||||
phase: 'turn_metadata',
|
||||
provider: 'codex',
|
||||
usage: {
|
||||
inputCachedTokens: 5,
|
||||
inputCacheMissTokens: 15,
|
||||
totalInputTokens: 20,
|
||||
totalOutputTokens: 8,
|
||||
totalTokens: 28,
|
||||
},
|
||||
},
|
||||
type: 'step_complete',
|
||||
});
|
||||
});
|
||||
|
||||
it('hydrates turn metadata model from session_configured when turn.completed omits it', () => {
|
||||
const adapter = new CodexAdapter();
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type {
|
||||
AgentEventAdapter,
|
||||
HeterogeneousAgentEvent,
|
||||
HeterogeneousRateLimitInfo,
|
||||
HeterogeneousTerminalErrorData,
|
||||
StepCompleteData,
|
||||
StreamStartData,
|
||||
@@ -8,6 +9,7 @@ import type {
|
||||
ToolResultData,
|
||||
UsageData,
|
||||
} from '../types';
|
||||
import { toCodexUsageData, toTurnUsageFromCumulative } from '../utils/codexUsage';
|
||||
|
||||
const CODEX_IDENTIFIER = 'codex';
|
||||
const CODEX_COLLAB_TOOL_CALL_API = 'collab_tool_call';
|
||||
@@ -15,6 +17,15 @@ const CODEX_COMMAND_API = 'command_execution';
|
||||
const CODEX_FILE_CHANGE_API = 'file_change';
|
||||
const CODEX_MCP_TOOL_CALL_API = 'mcp_tool_call';
|
||||
const CODEX_TODO_LIST_API = 'todo_list';
|
||||
const CODEX_USAGE_SETTINGS_URL = 'https://chatgpt.com/codex/settings/usage';
|
||||
|
||||
const CODEX_USER_RATE_LIMIT_PATTERNS = [
|
||||
/you'?ve hit your usage limit/i,
|
||||
/purchase more credits/i,
|
||||
/\busage limit\b/i,
|
||||
] as const;
|
||||
|
||||
const CODEX_RETRY_AT_PATTERN = /\btry again at\s+(\d{1,2})(?::(\d{2}))?\s*(AM|PM)?\b/i;
|
||||
|
||||
interface CodexBaseItem {
|
||||
id: string;
|
||||
@@ -96,34 +107,6 @@ const isMcpToolCallItem = (item: CodexToolItem): item is CodexMcpToolCallItem =>
|
||||
const isTodoListItem = (item: CodexToolItem): item is CodexTodoListItem =>
|
||||
item.type === CODEX_TODO_LIST_API;
|
||||
|
||||
const toUsageData = (
|
||||
raw:
|
||||
| {
|
||||
cached_input_tokens?: number;
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
}
|
||||
| null
|
||||
| undefined,
|
||||
): UsageData | undefined => {
|
||||
if (!raw) return undefined;
|
||||
|
||||
const inputCacheMissTokens = raw.input_tokens || 0;
|
||||
const inputCachedTokens = raw.cached_input_tokens || 0;
|
||||
const totalInputTokens = inputCacheMissTokens + inputCachedTokens;
|
||||
const totalOutputTokens = raw.output_tokens || 0;
|
||||
|
||||
if (totalInputTokens + totalOutputTokens === 0) return undefined;
|
||||
|
||||
return {
|
||||
inputCachedTokens: inputCachedTokens || undefined,
|
||||
inputCacheMissTokens,
|
||||
totalInputTokens,
|
||||
totalOutputTokens,
|
||||
totalTokens: totalInputTokens + totalOutputTokens,
|
||||
};
|
||||
};
|
||||
|
||||
const normalizeTodoListItems = (item: CodexTodoListItem) =>
|
||||
(item.items || [])
|
||||
.map((todo) => ({
|
||||
@@ -522,9 +505,50 @@ const getCodexTerminalErrorStderr = (raw: any): string | undefined => {
|
||||
);
|
||||
};
|
||||
|
||||
const parseCodexRetryAt = (message: string, now = new Date()): number | undefined => {
|
||||
const match = CODEX_RETRY_AT_PATTERN.exec(message);
|
||||
if (!match) return;
|
||||
|
||||
const hour = Number(match[1]);
|
||||
const minute = match[2] ? Number(match[2]) : 0;
|
||||
const meridiem = match[3]?.toUpperCase();
|
||||
|
||||
if (!Number.isInteger(hour) || !Number.isInteger(minute) || minute < 0 || minute > 59) {
|
||||
return;
|
||||
}
|
||||
|
||||
let normalizedHour = hour;
|
||||
if (meridiem) {
|
||||
if (hour < 1 || hour > 12) return;
|
||||
normalizedHour = (hour % 12) + (meridiem === 'PM' ? 12 : 0);
|
||||
} else if (hour < 0 || hour > 23) {
|
||||
return;
|
||||
}
|
||||
|
||||
const resetAt = new Date(now);
|
||||
resetAt.setHours(normalizedHour, minute, 0, 0);
|
||||
if (resetAt.getTime() <= now.getTime()) {
|
||||
resetAt.setDate(resetAt.getDate() + 1);
|
||||
}
|
||||
|
||||
return Math.floor(resetAt.getTime() / 1000);
|
||||
};
|
||||
|
||||
const getCodexRateLimitInfo = (message: string): HeterogeneousRateLimitInfo | undefined => {
|
||||
if (!CODEX_USER_RATE_LIMIT_PATTERNS.some((pattern) => pattern.test(message))) return;
|
||||
|
||||
const resetsAt = parseCodexRetryAt(message);
|
||||
|
||||
return {
|
||||
...(resetsAt ? { resetsAt } : {}),
|
||||
status: 'rejected',
|
||||
};
|
||||
};
|
||||
|
||||
export class CodexAdapter implements AgentEventAdapter {
|
||||
private currentAgentMessageItemId?: string;
|
||||
private currentModel?: string;
|
||||
private lastCumulativeUsage?: UsageData;
|
||||
sessionId?: string;
|
||||
|
||||
private hasTextInCurrentStep = false;
|
||||
@@ -538,6 +562,10 @@ export class CodexAdapter implements AgentEventAdapter {
|
||||
private terminalEndEmitted = false;
|
||||
private terminalErrorEmitted = false;
|
||||
|
||||
constructor(options: { initialCumulativeUsage?: UsageData | undefined } = {}) {
|
||||
this.lastCumulativeUsage = options.initialCumulativeUsage;
|
||||
}
|
||||
|
||||
adapt(raw: any): HeterogeneousAgentEvent[] {
|
||||
if (!raw || typeof raw !== 'object') return [];
|
||||
|
||||
@@ -583,7 +611,9 @@ export class CodexAdapter implements AgentEventAdapter {
|
||||
const model = getEventModel(raw) || this.currentModel;
|
||||
if (model) this.currentModel = model;
|
||||
|
||||
const usage = toUsageData(raw.usage);
|
||||
const cumulativeUsage = toCodexUsageData(raw.usage);
|
||||
const usage = toTurnUsageFromCumulative(cumulativeUsage, this.lastCumulativeUsage);
|
||||
if (cumulativeUsage) this.lastCumulativeUsage = cumulativeUsage;
|
||||
const events = this.drainPendingToolEndEvents();
|
||||
|
||||
if (usage || model) {
|
||||
@@ -607,11 +637,21 @@ export class CodexAdapter implements AgentEventAdapter {
|
||||
if (this.terminalErrorEmitted || this.terminalEndEmitted) return [];
|
||||
|
||||
this.terminalErrorEmitted = true;
|
||||
const message = getCodexTerminalErrorMessage(raw);
|
||||
const stderr = getCodexTerminalErrorStderr(raw);
|
||||
const rateLimitInfo = getCodexRateLimitInfo(message);
|
||||
const data: HeterogeneousTerminalErrorData = {
|
||||
agentType: CODEX_IDENTIFIER,
|
||||
clearEchoedContent: true,
|
||||
message: getCodexTerminalErrorMessage(raw),
|
||||
stderr: getCodexTerminalErrorStderr(raw),
|
||||
...(rateLimitInfo
|
||||
? {
|
||||
code: 'rate_limit',
|
||||
docsUrl: CODEX_USAGE_SETTINGS_URL,
|
||||
rateLimitInfo,
|
||||
}
|
||||
: {}),
|
||||
message,
|
||||
stderr,
|
||||
};
|
||||
|
||||
const events: HeterogeneousAgentEvent[] = this.started
|
||||
@@ -623,6 +663,10 @@ export class CodexAdapter implements AgentEventAdapter {
|
||||
}
|
||||
|
||||
private handleSessionConfigured(raw: any): HeterogeneousAgentEvent[] {
|
||||
if (raw.initialCumulativeUsage) {
|
||||
this.lastCumulativeUsage = raw.initialCumulativeUsage;
|
||||
}
|
||||
|
||||
const model = getEventModel(raw);
|
||||
if (!model || model === this.currentModel) return [];
|
||||
|
||||
|
||||
@@ -84,6 +84,44 @@ describe('AgentStreamPipeline', () => {
|
||||
});
|
||||
});
|
||||
|
||||
it('passes initial Codex cumulative usage into the adapter for resumed turns', async () => {
|
||||
const pipeline = new AgentStreamPipeline({
|
||||
agentType: 'codex',
|
||||
initialCumulativeUsage: {
|
||||
inputCacheMissTokens: 100,
|
||||
totalInputTokens: 100,
|
||||
totalOutputTokens: 20,
|
||||
totalTokens: 120,
|
||||
},
|
||||
operationId: 'op-codex',
|
||||
});
|
||||
|
||||
const events = await pipeline.push(
|
||||
`${JSON.stringify({
|
||||
type: 'turn.completed',
|
||||
usage: {
|
||||
input_tokens: 180,
|
||||
output_tokens: 45,
|
||||
},
|
||||
})}\n`,
|
||||
);
|
||||
|
||||
expect(events[0]).toMatchObject({
|
||||
data: {
|
||||
phase: 'turn_metadata',
|
||||
provider: 'codex',
|
||||
usage: {
|
||||
inputCacheMissTokens: 80,
|
||||
totalInputTokens: 80,
|
||||
totalOutputTokens: 25,
|
||||
totalTokens: 105,
|
||||
},
|
||||
},
|
||||
operationId: 'op-codex',
|
||||
type: 'step_complete',
|
||||
});
|
||||
});
|
||||
|
||||
it('drops non-JSON noise lines instead of throwing', async () => {
|
||||
const pipeline = new AgentStreamPipeline({
|
||||
agentType: 'claude-code',
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import type { AgentStreamEvent } from '@lobechat/agent-gateway-client';
|
||||
|
||||
import { createAdapter } from '../registry';
|
||||
import type { AgentEventAdapter, HeterogeneousAgentEvent } from '../types';
|
||||
import type { AgentEventAdapter, HeterogeneousAgentEvent, UsageData } from '../types';
|
||||
import { CodexFileChangeTracker } from './codexFileChangeTracker';
|
||||
import { JsonlStreamProcessor } from './jsonlProcessor';
|
||||
import { toStreamEvent } from './streamEvent';
|
||||
@@ -11,8 +11,10 @@ export interface AgentStreamPipelineOptions {
|
||||
agentType: string;
|
||||
/** Working directory used to resolve relative file paths emitted by CLI tools. */
|
||||
cwd?: string;
|
||||
/** Last known Codex cumulative usage before a resumed turn starts. */
|
||||
initialCumulativeUsage?: UsageData | undefined;
|
||||
/** Host-known model to emit before the CLI's first stdout payload. */
|
||||
initialModel?: string;
|
||||
initialModel?: string | undefined;
|
||||
/** Operation id to stamp onto every emitted `AgentStreamEvent`. */
|
||||
operationId: string;
|
||||
}
|
||||
@@ -42,8 +44,13 @@ export class AgentStreamPipeline {
|
||||
this.codexTracker =
|
||||
options.agentType === 'codex' ? new CodexFileChangeTracker(options.cwd) : undefined;
|
||||
|
||||
if (options.initialModel) {
|
||||
this.queuedEvents.push(...this.configureSession({ model: options.initialModel }));
|
||||
if (options.initialModel || options.initialCumulativeUsage) {
|
||||
this.queuedEvents.push(
|
||||
...this.configureSession({
|
||||
initialCumulativeUsage: options.initialCumulativeUsage,
|
||||
model: options.initialModel,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,7 +78,10 @@ export class AgentStreamPipeline {
|
||||
return [...trailing, ...flushed];
|
||||
}
|
||||
|
||||
configureSession(data: { model?: string }): AgentStreamEvent[] {
|
||||
configureSession(data: {
|
||||
initialCumulativeUsage?: UsageData | undefined;
|
||||
model?: string | undefined;
|
||||
}): AgentStreamEvent[] {
|
||||
return this.toStreamEvents(
|
||||
this.adapter.adapt({
|
||||
...data,
|
||||
|
||||
@@ -109,4 +109,35 @@ describe('codex model metadata helpers', () => {
|
||||
provider: 'openai',
|
||||
});
|
||||
});
|
||||
|
||||
it('reads the latest cumulative usage from a Codex rollout session file', async () => {
|
||||
const codexHome = await makeTempCodexHome();
|
||||
const sessionDir = path.join(codexHome, 'sessions', '2026', '06', '11');
|
||||
await mkdir(sessionDir, { recursive: true });
|
||||
await writeFile(
|
||||
path.join(sessionDir, 'rollout-2026-06-11T01-31-27-thread-usage.jsonl'),
|
||||
[
|
||||
JSON.stringify({
|
||||
payload: { usage: { input_tokens: 10, output_tokens: 2 } },
|
||||
type: 'event_msg',
|
||||
}),
|
||||
JSON.stringify({
|
||||
type: 'turn.completed',
|
||||
usage: { cached_input_tokens: 5, input_tokens: 25, output_tokens: 9 },
|
||||
}),
|
||||
].join('\n'),
|
||||
);
|
||||
|
||||
await expect(
|
||||
readCodexSessionModel('thread-usage', { env: { CODEX_HOME: codexHome } }),
|
||||
).resolves.toMatchObject({
|
||||
cumulativeUsage: {
|
||||
inputCachedTokens: 5,
|
||||
inputCacheMissTokens: 25,
|
||||
totalInputTokens: 30,
|
||||
totalOutputTokens: 9,
|
||||
totalTokens: 39,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -2,6 +2,9 @@ import { readdir, readFile, stat } from 'node:fs/promises';
|
||||
import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
|
||||
import type { UsageData } from '../types';
|
||||
import { toCodexUsageData } from '../utils/codexUsage';
|
||||
|
||||
type CodexEnv = Record<string, string | undefined>;
|
||||
|
||||
export type CodexInitialModelSource = 'args' | 'config';
|
||||
@@ -14,6 +17,7 @@ export interface CodexInitialModelResolution {
|
||||
|
||||
export interface CodexSessionModelInfo {
|
||||
contextWindow?: number;
|
||||
cumulativeUsage?: UsageData | undefined;
|
||||
line?: number;
|
||||
model?: string;
|
||||
provider?: string;
|
||||
@@ -51,7 +55,7 @@ const parseTomlStringAssignment = (line: string, key: string): string | undefine
|
||||
if (!match?.[1]) return;
|
||||
|
||||
const value = unquoteTomlString(match[1]);
|
||||
return value ? value : undefined;
|
||||
return value || undefined;
|
||||
};
|
||||
|
||||
const normalizeProfileName = (raw: string): string => unquoteTomlString(raw.trim());
|
||||
@@ -73,7 +77,7 @@ export const getCodexHome = (
|
||||
homeDir: string = os.homedir(),
|
||||
): string => {
|
||||
const configured = env.CODEX_HOME?.trim();
|
||||
return configured ? configured : path.join(homeDir, '.codex');
|
||||
return configured || path.join(homeDir, '.codex');
|
||||
};
|
||||
|
||||
export const parseCodexModelFromArgs = (args: string[]): string | undefined => {
|
||||
@@ -265,6 +269,7 @@ export const readCodexSessionModel = async (
|
||||
let model: string | undefined;
|
||||
let provider: string | undefined;
|
||||
let contextWindow: number | undefined;
|
||||
let cumulativeUsage: UsageData | undefined;
|
||||
let lineNumber: number | undefined;
|
||||
|
||||
const content = await readFile(sourceFile, 'utf8').catch(() => undefined);
|
||||
@@ -278,6 +283,9 @@ export const readCodexSessionModel = async (
|
||||
try {
|
||||
const record = JSON.parse(line);
|
||||
const payload = record?.payload;
|
||||
const usage = toCodexUsageData(record?.usage) || toCodexUsageData(payload?.usage);
|
||||
if (usage) cumulativeUsage = usage;
|
||||
|
||||
const payloadModel =
|
||||
getStringValue(payload?.model) ||
|
||||
getStringValue(payload?.collaboration_mode?.settings?.model);
|
||||
@@ -293,7 +301,7 @@ export const readCodexSessionModel = async (
|
||||
}
|
||||
}
|
||||
|
||||
return model || provider || contextWindow
|
||||
? { contextWindow, line: lineNumber, model, provider, sourceFile }
|
||||
return model || provider || contextWindow || cumulativeUsage
|
||||
? { contextWindow, cumulativeUsage, line: lineNumber, model, provider, sourceFile }
|
||||
: undefined;
|
||||
};
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
* `@lobechat/agent-gateway-client` (which is a browser-side WebSocket client
|
||||
* that producers have no business pulling in).
|
||||
*/
|
||||
export type { UsageData } from '../types';
|
||||
export { AgentStreamPipeline, type AgentStreamPipelineOptions } from './agentStreamPipeline';
|
||||
export { type CliSpawnPlan, resolveCliSpawnPlan } from './cliSpawn';
|
||||
export { CodexFileChangeTracker } from './codexFileChangeTracker';
|
||||
|
||||
@@ -1,12 +1,15 @@
|
||||
import * as childProcess from 'node:child_process';
|
||||
import { EventEmitter } from 'node:events';
|
||||
import { mkdir, mkdtemp, readFile, rm, writeFile } from 'node:fs/promises';
|
||||
import * as os from 'node:os';
|
||||
import path from 'node:path';
|
||||
import { PassThrough } from 'node:stream';
|
||||
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
const spawnCalls: Array<{ args: string[]; command: string; options: any }> = [];
|
||||
let nextFakeProc: any = null;
|
||||
const tempDirs: string[] = [];
|
||||
|
||||
const platformMock = vi.mocked(os.platform);
|
||||
const execFileMock = vi.mocked(childProcess.execFile);
|
||||
@@ -103,8 +106,9 @@ describe('spawnAgent', () => {
|
||||
execFileMock.mockReset();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
afterEach(async () => {
|
||||
nextFakeProc = null;
|
||||
await Promise.all(tempDirs.splice(0).map((dir) => rm(dir, { force: true, recursive: true })));
|
||||
});
|
||||
|
||||
it('spawns claude with stream-json flags + writes prompt as user message to stdin', async () => {
|
||||
@@ -222,10 +226,13 @@ describe('spawnAgent', () => {
|
||||
});
|
||||
|
||||
it('uses codex `exec resume` form with thread id + `-` stdin marker on resume', async () => {
|
||||
const codexHome = await mkdtemp(path.join(os.tmpdir(), 'lobe-codex-spawn-empty-'));
|
||||
tempDirs.push(codexHome);
|
||||
nextFakeProc = createFakeProc().proc;
|
||||
const { spawnAgent } = await import('./spawnAgent');
|
||||
await spawnAgent({
|
||||
agentType: 'codex',
|
||||
env: { CODEX_HOME: codexHome },
|
||||
operationId: 'op-1',
|
||||
prompt: 'continue',
|
||||
resumeSessionId: 'thread_abc',
|
||||
@@ -237,10 +244,77 @@ describe('spawnAgent', () => {
|
||||
expect(args.at(-1)).toBe('-');
|
||||
});
|
||||
|
||||
it('seeds a real Codex resumed stream with the previous cumulative usage from the session file', async () => {
|
||||
const threadId = '019dba1e-eec2-7a22-bdfb-ac6175e03081';
|
||||
const realCodexFixture = await readFile(
|
||||
new URL('../adapters/__fixtures__/codex/collab_tool_call.spawn_wait.jsonl', import.meta.url),
|
||||
'utf8',
|
||||
);
|
||||
const codexHome = await mkdtemp(path.join(os.tmpdir(), 'lobe-codex-spawn-'));
|
||||
tempDirs.push(codexHome);
|
||||
const sessionDir = path.join(codexHome, 'sessions', '2026', '06', '11');
|
||||
await mkdir(sessionDir, { recursive: true });
|
||||
await writeFile(
|
||||
path.join(sessionDir, `rollout-2026-06-11T01-31-27-${threadId}.jsonl`),
|
||||
JSON.stringify({
|
||||
type: 'turn.completed',
|
||||
usage: { cached_input_tokens: 42_000, input_tokens: 52_000, output_tokens: 300 },
|
||||
}),
|
||||
);
|
||||
|
||||
const fake = createFakeProc({
|
||||
stdoutChunks: [realCodexFixture],
|
||||
});
|
||||
nextFakeProc = fake.proc;
|
||||
|
||||
const { spawnAgent } = await import('./spawnAgent');
|
||||
const handle = await spawnAgent({
|
||||
agentType: 'codex',
|
||||
env: { CODEX_HOME: codexHome },
|
||||
operationId: 'op-1',
|
||||
prompt: 'continue',
|
||||
resumeSessionId: threadId,
|
||||
});
|
||||
fake.start();
|
||||
|
||||
const events: any[] = [];
|
||||
for await (const event of handle.events) events.push(event);
|
||||
await handle.exit;
|
||||
|
||||
const usageEvent = events.find(
|
||||
(event) => event.type === 'step_complete' && event.data?.phase === 'turn_metadata',
|
||||
);
|
||||
expect(usageEvent).toMatchObject({
|
||||
data: {
|
||||
phase: 'turn_metadata',
|
||||
usage: {
|
||||
inputCachedTokens: 1008,
|
||||
inputCacheMissTokens: 937,
|
||||
totalInputTokens: 1945,
|
||||
totalOutputTokens: 116,
|
||||
totalTokens: 2061,
|
||||
},
|
||||
},
|
||||
type: 'step_complete',
|
||||
});
|
||||
expect(usageEvent?.data.usage.totalTokens).not.toBe(96_361);
|
||||
expect(events).toEqual(
|
||||
expect.arrayContaining([
|
||||
expect.objectContaining({
|
||||
data: expect.objectContaining({
|
||||
content: 'Wait completed: 2 + 2 = 4',
|
||||
toolCallId: 'item_4',
|
||||
}),
|
||||
type: 'tool_result',
|
||||
}),
|
||||
]),
|
||||
);
|
||||
});
|
||||
|
||||
it('serializes multimodal content blocks into the CC stream-json user message', async () => {
|
||||
nextFakeProc = createFakeProc().proc;
|
||||
const { spawnAgent } = await import('./spawnAgent');
|
||||
const pngBytes = Buffer.from([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00]);
|
||||
const pngBytes = Buffer.from('89504e470d0a1a0a00', 'hex');
|
||||
await spawnAgent({
|
||||
agentType: 'claude-code',
|
||||
operationId: 'op-1',
|
||||
@@ -275,7 +349,7 @@ describe('spawnAgent', () => {
|
||||
const fsp = await import('node:fs/promises');
|
||||
const cacheDir = await fsp.mkdtemp(`${os.tmpdir()}/spawn-agent-codex-`);
|
||||
|
||||
const pngBytes = Buffer.from([0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00]);
|
||||
const pngBytes = Buffer.from('89504e470d0a1a0a00', 'hex');
|
||||
const { spawnAgent } = await import('./spawnAgent');
|
||||
await spawnAgent({
|
||||
agentType: 'codex',
|
||||
|
||||
@@ -5,7 +5,7 @@ import type { AgentStreamEvent } from '@lobechat/agent-gateway-client';
|
||||
|
||||
import { AgentStreamPipeline } from './agentStreamPipeline';
|
||||
import { resolveCliSpawnPlan } from './cliSpawn';
|
||||
import { resolveCodexInitialModel } from './codexModel';
|
||||
import { readCodexSessionModel, resolveCodexInitialModel } from './codexModel';
|
||||
import type { AgentPromptInput, BuildAgentInputOptions } from './input';
|
||||
import { buildAgentInput } from './input';
|
||||
|
||||
@@ -262,6 +262,11 @@ export const spawnAgent = async (options: SpawnAgentOptions): Promise<SpawnAgent
|
||||
options.agentType === 'codex'
|
||||
? (await resolveCodexInitialModel({ args, env: childEnv }))?.model
|
||||
: undefined;
|
||||
const resumedCodexSession =
|
||||
options.agentType === 'codex' && options.resumeSessionId
|
||||
? await readCodexSessionModel(options.resumeSessionId, { env: childEnv })
|
||||
: undefined;
|
||||
const initialCumulativeUsage = resumedCodexSession?.cumulativeUsage;
|
||||
|
||||
const cliSpawnPlan = await resolveCliSpawnPlan(command, args);
|
||||
const proc = spawn(cliSpawnPlan.command, cliSpawnPlan.args, {
|
||||
@@ -287,6 +292,7 @@ export const spawnAgent = async (options: SpawnAgentOptions): Promise<SpawnAgent
|
||||
const pipeline = new AgentStreamPipeline({
|
||||
agentType: options.agentType,
|
||||
cwd,
|
||||
initialCumulativeUsage,
|
||||
initialModel,
|
||||
operationId: options.operationId,
|
||||
});
|
||||
|
||||
@@ -0,0 +1,59 @@
|
||||
import type { UsageData } from '../types';
|
||||
|
||||
export interface CodexUsagePayload {
|
||||
cached_input_tokens?: number;
|
||||
input_tokens?: number;
|
||||
output_tokens?: number;
|
||||
}
|
||||
|
||||
export const toCodexUsageData = (
|
||||
raw: CodexUsagePayload | null | undefined,
|
||||
): UsageData | undefined => {
|
||||
if (!raw) return undefined;
|
||||
|
||||
const inputCacheMissTokens = raw.input_tokens || 0;
|
||||
const inputCachedTokens = raw.cached_input_tokens || 0;
|
||||
const totalInputTokens = inputCacheMissTokens + inputCachedTokens;
|
||||
const totalOutputTokens = raw.output_tokens || 0;
|
||||
|
||||
if (totalInputTokens + totalOutputTokens === 0) return undefined;
|
||||
|
||||
return {
|
||||
inputCachedTokens: inputCachedTokens || undefined,
|
||||
inputCacheMissTokens,
|
||||
totalInputTokens,
|
||||
totalOutputTokens,
|
||||
totalTokens: totalInputTokens + totalOutputTokens,
|
||||
};
|
||||
};
|
||||
|
||||
const isMonotonicUsage = (current: UsageData, previous: UsageData) =>
|
||||
current.inputCacheMissTokens >= previous.inputCacheMissTokens &&
|
||||
(current.inputCachedTokens || 0) >= (previous.inputCachedTokens || 0) &&
|
||||
current.totalInputTokens >= previous.totalInputTokens &&
|
||||
current.totalOutputTokens >= previous.totalOutputTokens &&
|
||||
current.totalTokens >= previous.totalTokens;
|
||||
|
||||
export const toTurnUsageFromCumulative = (
|
||||
current: UsageData | undefined,
|
||||
previous: UsageData | undefined,
|
||||
): UsageData | undefined => {
|
||||
if (!current || !previous) return current;
|
||||
if (!isMonotonicUsage(current, previous)) return current;
|
||||
|
||||
const inputCacheMissTokens = current.inputCacheMissTokens - previous.inputCacheMissTokens;
|
||||
const inputCachedTokens = (current.inputCachedTokens || 0) - (previous.inputCachedTokens || 0);
|
||||
const totalInputTokens = current.totalInputTokens - previous.totalInputTokens;
|
||||
const totalOutputTokens = current.totalOutputTokens - previous.totalOutputTokens;
|
||||
const totalTokens = totalInputTokens + totalOutputTokens;
|
||||
|
||||
if (totalTokens === 0) return undefined;
|
||||
|
||||
return {
|
||||
inputCachedTokens: inputCachedTokens || undefined,
|
||||
inputCacheMissTokens,
|
||||
totalInputTokens,
|
||||
totalOutputTokens,
|
||||
totalTokens,
|
||||
};
|
||||
};
|
||||
Reference in New Issue
Block a user