diff --git a/apps/server/src/modules/AgentRuntime/AgentRuntimeCoordinator.ts b/apps/server/src/modules/AgentRuntime/AgentRuntimeCoordinator.ts index 03a6998f32..fbf7180a61 100644 --- a/apps/server/src/modules/AgentRuntime/AgentRuntimeCoordinator.ts +++ b/apps/server/src/modules/AgentRuntime/AgentRuntimeCoordinator.ts @@ -17,24 +17,23 @@ const log = debug('lobe-server:agent-runtime:coordinator'); * decision) starts, but that resume runs under a **new** operationId with * its own event stream. For the paused operationId no further events will * arrive, so clients should stop waiting the same way they do on done. + * + * `waiting_for_async_tool` is different: deferred tools such as server + * sub-agents resume the SAME operationId after the out-of-band result is + * backfilled. Ending the stream at park time makes the client mark the turn + * as stopped while the server is still waiting for sub-agents. */ const STREAM_END_STATUSES = new Set([ 'done', 'error', 'interrupted', 'waiting_for_human', - 'waiting_for_async_tool', ]); const hasEnteredStreamEndState = ( previousStatus?: AgentState['status'], nextStatus?: AgentState['status'], -): nextStatus is - | 'done' - | 'error' - | 'interrupted' - | 'waiting_for_human' - | 'waiting_for_async_tool' => { +): nextStatus is 'done' | 'error' | 'interrupted' | 'waiting_for_human' => { const wasStreamEnd = previousStatus ? STREAM_END_STATUSES.has(previousStatus) : false; return Boolean(nextStatus && STREAM_END_STATUSES.has(nextStatus) && !wasStreamEnd); }; diff --git a/apps/server/src/modules/AgentRuntime/__tests__/AgentRuntimeCoordinator.test.ts b/apps/server/src/modules/AgentRuntime/__tests__/AgentRuntimeCoordinator.test.ts index 789559d835..a2aae2a9f8 100644 --- a/apps/server/src/modules/AgentRuntime/__tests__/AgentRuntimeCoordinator.test.ts +++ b/apps/server/src/modules/AgentRuntime/__tests__/AgentRuntimeCoordinator.test.ts @@ -176,6 +176,19 @@ describe('AgentRuntimeCoordinator', () => { }); }); + it('should not publish end event when status changes to waiting_for_async_tool because the same stream will resume', async () => { + const operationId = 'test-operation-id'; + const previousState = { status: 'running', stepCount: 3 }; + const newState = { status: 'waiting_for_async_tool', stepCount: 4 }; + + mockStateManager.loadAgentState.mockResolvedValue(previousState); + + await coordinator.saveAgentState(operationId, newState as any); + + expect(mockStateManager.saveAgentState).toHaveBeenCalledWith(operationId, newState); + expect(mockStreamManager.publishAgentRuntimeEnd).not.toHaveBeenCalled(); + }); + it('should not publish end event when status was already done', async () => { const operationId = 'test-operation-id'; const previousState = { status: 'done', stepCount: 5 }; @@ -291,6 +304,22 @@ describe('AgentRuntimeCoordinator', () => { }); }); + it('should not publish end event when status becomes waiting_for_async_tool because deferred tools resume this operation', async () => { + const operationId = 'test-operation-id'; + const stepResult = { + executionTime: 1000, + newState: { status: 'waiting_for_async_tool', stepCount: 4 }, + stepIndex: 4, + }; + + mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 3 }); + + await coordinator.saveStepResult(operationId, stepResult as any); + + expect(mockStateManager.saveStepResult).toHaveBeenCalledWith(operationId, stepResult); + expect(mockStreamManager.publishAgentRuntimeEnd).not.toHaveBeenCalled(); + }); + it('should publish end event when status becomes interrupted', async () => { const operationId = 'test-operation-id'; const stepResult = { diff --git a/apps/server/src/services/agentRuntime/AgentRuntimeService.test.ts b/apps/server/src/services/agentRuntime/AgentRuntimeService.test.ts index 9058aa2147..63a0599f40 100644 --- a/apps/server/src/services/agentRuntime/AgentRuntimeService.test.ts +++ b/apps/server/src/services/agentRuntime/AgentRuntimeService.test.ts @@ -522,6 +522,76 @@ describe('AgentRuntimeService', () => { expect(mockQueueService.scheduleMessage).toHaveBeenCalled(); }); + it('should resume async tools with the last pending tool result as parentMessageId', async () => { + const pendingTools = [ + { + apiName: 'callSubAgent', + arguments: '{}', + id: 'tool-call-1', + identifier: 'agent-management', + type: 'default', + }, + { + apiName: 'callSubAgent', + arguments: '{}', + id: 'tool-call-2', + identifier: 'agent-management', + type: 'default', + }, + ]; + const parkedState = { + ...mockState, + interruption: { + canResume: true, + interruptedAt: new Date().toISOString(), + reason: 'async_tool', + }, + pendingToolsCalling: pendingTools, + status: 'waiting_for_async_tool', + }; + const refreshedMessages = [ + { content: 'use tools', id: 'user-msg-1', role: 'user' }, + { + children: [ + { + id: 'assistant-msg-1', + role: 'assistant', + tools: [ + { ...pendingTools[0], result_msg_id: 'tool-msg-1' }, + { ...pendingTools[1], result_msg_id: 'tool-msg-2' }, + ], + }, + ], + id: 'assistant-group-1', + role: 'assistantGroup', + }, + ]; + const mockStepResult = { + events: [], + newState: { ...parkedState, pendingToolsCalling: [], status: 'done', stepCount: 2 }, + nextContext: null, + }; + const mockRuntime = { step: vi.fn().mockResolvedValue(mockStepResult) }; + + mockCoordinator.loadAgentState.mockResolvedValue(parkedState); + vi.spyOn(service as any, 'refreshMessagesFromDB').mockResolvedValue(refreshedMessages); + vi.spyOn(service as any, 'createAgentRuntime').mockReturnValue({ runtime: mockRuntime }); + + await service.executeStep({ ...mockParams, resumeAsyncTool: true }); + + expect(mockRuntime.step).toHaveBeenCalledWith( + expect.objectContaining({ + messages: refreshedMessages, + pendingToolsCalling: [], + status: 'running', + }), + expect.objectContaining({ + payload: { parentMessageId: 'tool-msg-2' }, + phase: 'user_input', + }), + ); + }); + it('should handle missing agent state', async () => { mockCoordinator.loadAgentState.mockResolvedValue(null); diff --git a/apps/server/src/services/agentRuntime/AgentRuntimeService.ts b/apps/server/src/services/agentRuntime/AgentRuntimeService.ts index d68f0b0184..4f190a7fea 100644 --- a/apps/server/src/services/agentRuntime/AgentRuntimeService.ts +++ b/apps/server/src/services/agentRuntime/AgentRuntimeService.ts @@ -813,6 +813,11 @@ export class AgentRuntimeService { // results written out-of-band), and re-enter the LLM with them. if (resumeAsyncTool && currentState.status === 'waiting_for_async_tool') { const refreshed = await this.refreshMessagesFromDB(currentState); + const pendingTools = (currentState.pendingToolsCalling ?? []) as ChatToolPayload[]; + const resumeParentMessageId = this.resolveAsyncToolResumeParentMessageId( + refreshed, + pendingTools, + ); currentState = structuredClone(currentState); currentState.messages = refreshed; currentState.pendingToolsCalling = []; @@ -820,14 +825,15 @@ export class AgentRuntimeService { currentState.interruption = undefined; currentState.lastModified = new Date().toISOString(); currentContext = { - payload: { parentMessageId: refreshed.at(-1)?.id }, + payload: { parentMessageId: resumeParentMessageId }, phase: 'user_input', } as AgentRuntimeContext; log( - '[%s][%d] Resuming from async tool with %d messages', + '[%s][%d] Resuming from async tool with %d messages (parent=%s)', operationId, stepIndex, refreshed.length, + resumeParentMessageId, ); } @@ -1828,6 +1834,65 @@ export class AgentRuntimeService { return flatList as AgentState['messages']; } + private resolveAsyncToolResumeParentMessageId( + messages: AgentState['messages'], + pendingTools: ChatToolPayload[], + ): string | undefined { + const fallbackParentMessageId = messages.at(-1)?.id; + if (pendingTools.length === 0) return fallbackParentMessageId; + + const toolResultMessageIds = new Map(); + + const collectToolResultIds = (message: unknown) => { + if (!message || typeof message !== 'object') return; + + const candidate = message as { + children?: unknown; + id?: unknown; + tool_call_id?: unknown; + tools?: unknown; + }; + + if (typeof candidate.tool_call_id === 'string' && typeof candidate.id === 'string') { + toolResultMessageIds.set(candidate.tool_call_id, candidate.id); + } + + if (Array.isArray(candidate.tools)) { + for (const tool of candidate.tools) { + if (!tool || typeof tool !== 'object') continue; + + const toolPayload = tool as { id?: unknown; result_msg_id?: unknown }; + if ( + typeof toolPayload.id === 'string' && + typeof toolPayload.result_msg_id === 'string' + ) { + toolResultMessageIds.set(toolPayload.id, toolPayload.result_msg_id); + } + } + } + + if (Array.isArray(candidate.children)) { + for (const child of candidate.children) { + collectToolResultIds(child); + } + } + }; + + for (const message of messages) { + collectToolResultIds(message); + } + + for (let index = pendingTools.length - 1; index >= 0; index -= 1) { + const pendingTool = pendingTools[index]; + if (pendingTool.result_msg_id) return pendingTool.result_msg_id; + + const resultMessageId = toolResultMessageIds.get(pendingTool.id); + if (resultMessageId) return resultMessageId; + } + + return fallbackParentMessageId; + } + /** * Create Agent Runtime instance */