Compare commits

...

4 Commits

Author SHA1 Message Date
ONLY-yours 8ef7e7e84e 🐛 fix: restore cold replica state in HeterogeneousPersistenceHandler
Vercel serverless functions are stateless per-request, so `operationStates`
is empty on every `heteroIngest` call. loadOrCreateState always cold-creates.

#14539 fixed `toolMsgIdByCallId` restoration but left `accumulatedContent`,
`toolState.payloads`, and `toolState.persistedIds` empty on cold load,
causing two bugs:

- Content truncation: cold instance starts with `accumulatedContent=''`,
  accumulates only the current batch's text, then writes that shorter string
  on the next step boundary or terminal — overwriting the longer content the
  previous write had already stored in DB.

- Tool duplication / tools[] overwrite: `persistedIds={}` on cold load
  means every `tools_calling` event re-creates already-persisted tool
  messages, and `payloads=[]` means phase 1/3 writes only the current
  batch's tools, wiping previous tools from `assistant.tools[]`.

Fix: in `loadOrCreateState`, fetch the current assistant message and restore
`accumulatedContent`, `accumulatedReasoning`, `toolState.payloads`, and
`toolState.persistedIds` from it. Cold load is now equivalent to warm load.

Also adds two regression tests covering the cold-replica scenarios.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 23:25:47 +08:00
ONLY-yours aaa8de0254 🐛 fix: extend reconnect guard to cover all in-flight connection statuses
The previous guard only skipped reconnect for 'connecting'/'connected'
but the connection can already be in 'authenticating' or 'reconnecting'
by the time useGatewayReconnect fires, leaving the race window open.

Flip the condition: skip for any status that is not 'disconnected'.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 23:10:35 +08:00
ONLY-yours 644d1b6788 🐛 fix: always pass --cwd /workspace for cloud CC to ensure session resume
CC stores session files at ~/.claude/projects/<encoded-cwd>/.
Without an explicit --cwd the actual working directory can differ
between sandbox invocations, so --resume <heteroSessionId> fails
to locate the previous session files even though the container is
persistent and the ID is correctly stored in topic.metadata.

Default cwd to /workspace for cloud runs (desktop keeps its own
explicit path), guaranteeing a stable session-file location across
page reloads within the same sandbox lifecycle.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 22:37:58 +08:00
ONLY-yours c4f7995863 🐛 fix: skip reconnect when gateway action already established a connection
Race condition on new-topic first message:
1. switchTopic loads runningOperation → useGatewayReconnect fires
2. executeGatewayAgent calls connectToGateway (status: connecting)
3. reconnectToGatewayOperation overwrites with resumeOnConnect:true
4. Gateway sees resume on a brand-new session → no events → stuck

Second message works because the client store's runningOperation is
stale (from the first op), so SWR deduplications and no reconnect fires.

Fix: bail out of reconnectToGatewayOperation if gatewayConnections
already shows connecting/connected for that operationId.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-09 22:29:36 +08:00
7 changed files with 155 additions and 11 deletions
@@ -349,9 +349,25 @@ export class HeterogeneousPersistenceHandler {
if (plugin.toolCallId) toolMsgIdByCallId.set(plugin.toolCallId, plugin.id);
}
// Restore in-progress accumulators and tool state from the current assistant
// message so a cold replica (Vercel serverless — each request is a new process)
// continues from where the previous request left off rather than overwriting
// with an empty/shorter value. Without this, every ingest call would reset
// accumulatedContent to '' and toolState.payloads to [], causing:
// - content truncation: warm instance writes "hello world", cold instance
// accumulates only " more text" and overwrites with that shorter string.
// - tool duplication: cold instance sees persistedIds={}, re-creates already-
// persisted tool messages, and overwrites assistant.tools[] with only the
// current batch's tools (losing all previous ones).
const currentMsg = await this.deps.messageModel.findById(currentAssistantMessageId);
const restoredContent = (currentMsg?.content ?? '') as string;
const restoredReasoning = (currentMsg?.reasoning as { content?: string } | null)?.content ?? '';
const restoredTools = (currentMsg?.tools ?? []) as ChatToolPayload[];
const restoredPersistedIds = new Set(restoredTools.map((t) => t.id));
state = {
accumulatedContent: '',
accumulatedReasoning: '',
accumulatedContent: restoredContent,
accumulatedReasoning: restoredReasoning,
agentId: topic.agentId ?? null,
currentAssistantMessageId,
lastModel: undefined,
@@ -360,16 +376,18 @@ export class HeterogeneousPersistenceHandler {
processedKeys: new Set(),
subagentRuns: new Map(),
toolMsgIdByCallId,
toolState: { payloads: [], persistedIds: new Set() },
toolState: { payloads: restoredTools, persistedIds: restoredPersistedIds },
topicId,
};
operationStates.set(operationId, state);
log(
'created state for operation %s on topic %s msgId=%s tools=%d',
'created state for operation %s on topic %s msgId=%s tools=%d restored(content=%d tools=%d)',
operationId,
topicId,
currentAssistantMessageId,
toolMsgIdByCallId.size,
restoredContent.length,
restoredTools.length,
);
return state;
}
@@ -121,6 +121,7 @@ const createHarness = (
return { success: true };
},
),
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => []),
};
@@ -181,6 +181,7 @@ const createHarness = () => {
});
return { success: true };
}),
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => []),
};
@@ -101,6 +101,7 @@ const createHarness = (params: {
return { success: true };
},
),
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => []),
};
@@ -715,4 +716,112 @@ describe('HeterogeneousPersistenceHandler', () => {
).resolves.not.toThrow();
});
});
describe('cold replica state restoration (Vercel serverless)', () => {
it('restores accumulatedContent from DB so a cold instance does not truncate previous text', async () => {
const h = createHarness({
assistantMessageId: 'asst-1',
operationId: 'op-1',
topicId: 'topic-1',
});
// Batch 1 (warm instance): stream two text chunks, flush happens via flushBatchContent
await h.handler.ingest({
events: [
buildEvent('stream_chunk', 0, { chunkType: 'text', content: 'hello ' }, 1000),
buildEvent('stream_chunk', 0, { chunkType: 'text', content: 'world' }, 1001),
],
operationId: 'op-1',
topicId: 'topic-1',
});
// DB should have the partial content written by flushBatchContent
expect(h.messages.get('asst-1')?.content).toBe('hello world');
// Simulate cold replica: drop the in-memory operation state
__resetOperationStatesForTesting();
// Batch 2 (cold instance): receives more text.
// Without restoration the new instance would start with accumulatedContent='' and
// write only " more" — truncating "hello world".
await h.handler.ingest({
events: [buildEvent('agent_runtime_end', 0, { reason: 'success' }, 2000)],
operationId: 'op-1',
topicId: 'topic-1',
});
// The terminal flush should preserve the previously accumulated content.
expect(h.messages.get('asst-1')?.content).toBe('hello world');
});
it('restores toolState.payloads and persistedIds so cold replica does not duplicate tools or overwrite tools[]', async () => {
const h = createHarness({
assistantMessageId: 'asst-1',
operationId: 'op-1',
topicId: 'topic-1',
});
// Batch 1 (warm): persist tool1
const tool1: any = {
apiName: 'tool1',
arguments: '{}',
id: 'tc-1',
identifier: 'tool1',
type: 'default',
};
await h.handler.ingest({
events: [
buildEvent(
'stream_chunk',
0,
{ chunkType: 'tools_calling', toolsCalling: [tool1] },
1000,
),
],
operationId: 'op-1',
topicId: 'topic-1',
});
// assistant.tools[] should have tool1
const asstAfterBatch1 = h.messages.get('asst-1')!;
expect(asstAfterBatch1.tools).toHaveLength(1);
expect(asstAfterBatch1.tools![0].id).toBe('tc-1');
// tool message created for tool1
const toolMsgsBatch1 = [...h.messages.values()].filter((m) => m.role === 'tool');
expect(toolMsgsBatch1).toHaveLength(1);
// Simulate cold replica: drop in-memory state
__resetOperationStatesForTesting();
// Batch 2 (cold): receives tool2 — should ADD to tools[], not overwrite
const tool2: any = {
apiName: 'tool2',
arguments: '{}',
id: 'tc-2',
identifier: 'tool2',
type: 'default',
};
await h.handler.ingest({
events: [
buildEvent(
'stream_chunk',
1,
{ chunkType: 'tools_calling', toolsCalling: [tool2] },
2000,
),
],
operationId: 'op-1',
topicId: 'topic-1',
});
const asstAfterBatch2 = h.messages.get('asst-1')!;
// Both tools should be present — cold restore kept tool1 in payloads
expect(asstAfterBatch2.tools).toHaveLength(2);
// tool1 should NOT be duplicated — persistedIds was restored
const allToolMsgs = [...h.messages.values()].filter((m) => m.role === 'tool');
const tool1Msgs = allToolMsgs.filter((m) => m.tool_call_id === 'tc-1');
expect(tool1Msgs).toHaveLength(1);
});
});
});
@@ -32,6 +32,7 @@ describe('HeterogeneousAgentService — phase 2c session id persistence + resume
// Real handler so we exercise the persistSessionId path end-to-end
const handler = new HeterogeneousPersistenceHandler({
messageModel: {
findById: vi.fn(async () => null),
listMessagePluginsByTopic: vi.fn(async () => []),
update: vi.fn(async () => ({ success: true })),
} as any,
@@ -87,6 +88,7 @@ describe('HeterogeneousAgentService — phase 2c session id persistence + resume
const handler = new HeterogeneousPersistenceHandler({
messageModel: {
findById: vi.fn(async () => null),
listMessagePluginsByTopic: vi.fn(async () => []),
update: vi.fn(async () => ({ success: true })),
} as any,
@@ -138,6 +140,7 @@ describe('HeterogeneousAgentService — phase 2c session id persistence + resume
const handler = new HeterogeneousPersistenceHandler({
messageModel: {
findById: vi.fn(async () => null),
listMessagePluginsByTopic: vi.fn(async () => []),
update: vi.fn(async () => ({ success: true })),
} as any,
@@ -196,6 +199,7 @@ describe('HeterogeneousAgentService — phase 2c session id persistence + resume
const handler = new HeterogeneousPersistenceHandler({
messageModel: {
findById: vi.fn(async () => null),
listMessagePluginsByTopic: vi.fn(async () => []),
update: vi.fn(async () => ({ success: true })),
} as any,
@@ -257,6 +261,7 @@ describe('HeterogeneousAgentService — phase 2c session id persistence + resume
const handler = new HeterogeneousPersistenceHandler({
messageModel: {
findById: vi.fn(async () => null),
listMessagePluginsByTopic: vi.fn(async () => []),
update: vi.fn(async () => ({ success: true })),
} as any,
@@ -90,10 +90,11 @@ export async function spawnHeteroSandbox(params: SandboxRunParams): Promise<void
userId,
} = params;
// cwd is only set when explicitly passed (e.g. desktop local path).
// For cloud sandbox, CC always runs in the sandbox root (/workspace); repos are cloned
// as subdirectories there and CC navigates into them via its own tools.
const { cwd } = params;
// For cloud sandbox, default cwd is /workspace — must be explicit so CC stores and
// finds session files at the same path on every invocation (session files live under
// ~/.claude/projects/<encoded-cwd>/). Without a consistent --cwd the session id stored
// in topic.metadata.heteroSessionId can't be resolved on --resume after a page reload.
const cwd = params.cwd ?? '/workspace';
// Build the `lh hetero exec` command string.
// Prompt is passed via --input-json stdin ('-') to avoid shell quoting issues
@@ -116,9 +117,7 @@ export async function spawnHeteroSandbox(params: SandboxRunParams): Promise<void
if (resumeSessionId) {
args.push('--resume', resumeSessionId);
}
if (cwd) {
args.push('--cwd', cwd);
}
args.push('--cwd', cwd);
// Encode the prompt as base64 to avoid all shell quoting issues.
// echo + shell quoting mangled inner JSON quotes; base64 is quote-safe.
@@ -467,6 +467,17 @@ export class GatewayActionImpl {
window.global_serverConfigStore?.getState()?.serverConfig?.agentGatewayUrl;
if (!agentGatewayUrl) return;
// Skip reconnect if the gateway action already established (or is establishing)
// a fresh connection for this operation. This prevents a race on new-topic creation
// where switchTopic loads runningOperation → useGatewayReconnect fires → overwrites
// the connectToGateway call made by executeGatewayAgent with resumeOnConnect: true,
// causing the gateway to treat a brand-new session as a resume → stuck / no events.
// Any status other than 'disconnected' means the gateway action already owns this
// connection (connecting / authenticating / reconnecting / connected). Skip to avoid
// overwriting the fresh non-resume connect with resumeOnConnect:true.
const existingStatus = this.#get().gatewayConnections[operationId]?.status;
if (existingStatus && existingStatus !== 'disconnected') return;
// Get a fresh JWT token (original expired after 5 min)
const { token } = await aiAgentService.refreshGatewayToken(topicId);