🐛 fix(agent-runtime): keep async sub-agent stream alive (#15646)

* 🐛 fix: keep async sub-agent stream alive

* 🐛 fix: preserve async tool resume parent chain
This commit is contained in:
Arvin Xu
2026-06-10 22:12:22 +08:00
committed by GitHub
parent 8b6905ec7e
commit e2be720726
4 changed files with 172 additions and 9 deletions
@@ -17,24 +17,23 @@ const log = debug('lobe-server:agent-runtime:coordinator');
* decision) starts, but that resume runs under a **new** operationId with
* its own event stream. For the paused operationId no further events will
* arrive, so clients should stop waiting the same way they do on done.
*
* `waiting_for_async_tool` is different: deferred tools such as server
* sub-agents resume the SAME operationId after the out-of-band result is
* backfilled. Ending the stream at park time makes the client mark the turn
* as stopped while the server is still waiting for sub-agents.
*/
const STREAM_END_STATUSES = new Set<AgentState['status']>([
'done',
'error',
'interrupted',
'waiting_for_human',
'waiting_for_async_tool',
]);
const hasEnteredStreamEndState = (
previousStatus?: AgentState['status'],
nextStatus?: AgentState['status'],
): nextStatus is
| 'done'
| 'error'
| 'interrupted'
| 'waiting_for_human'
| 'waiting_for_async_tool' => {
): nextStatus is 'done' | 'error' | 'interrupted' | 'waiting_for_human' => {
const wasStreamEnd = previousStatus ? STREAM_END_STATUSES.has(previousStatus) : false;
return Boolean(nextStatus && STREAM_END_STATUSES.has(nextStatus) && !wasStreamEnd);
};
@@ -176,6 +176,19 @@ describe('AgentRuntimeCoordinator', () => {
});
});
it('should not publish end event when status changes to waiting_for_async_tool because the same stream will resume', async () => {
const operationId = 'test-operation-id';
const previousState = { status: 'running', stepCount: 3 };
const newState = { status: 'waiting_for_async_tool', stepCount: 4 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStateManager.saveAgentState).toHaveBeenCalledWith(operationId, newState);
expect(mockStreamManager.publishAgentRuntimeEnd).not.toHaveBeenCalled();
});
it('should not publish end event when status was already done', async () => {
const operationId = 'test-operation-id';
const previousState = { status: 'done', stepCount: 5 };
@@ -291,6 +304,22 @@ describe('AgentRuntimeCoordinator', () => {
});
});
it('should not publish end event when status becomes waiting_for_async_tool because deferred tools resume this operation', async () => {
const operationId = 'test-operation-id';
const stepResult = {
executionTime: 1000,
newState: { status: 'waiting_for_async_tool', stepCount: 4 },
stepIndex: 4,
};
mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 3 });
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStateManager.saveStepResult).toHaveBeenCalledWith(operationId, stepResult);
expect(mockStreamManager.publishAgentRuntimeEnd).not.toHaveBeenCalled();
});
it('should publish end event when status becomes interrupted', async () => {
const operationId = 'test-operation-id';
const stepResult = {
@@ -522,6 +522,76 @@ describe('AgentRuntimeService', () => {
expect(mockQueueService.scheduleMessage).toHaveBeenCalled();
});
it('should resume async tools with the last pending tool result as parentMessageId', async () => {
const pendingTools = [
{
apiName: 'callSubAgent',
arguments: '{}',
id: 'tool-call-1',
identifier: 'agent-management',
type: 'default',
},
{
apiName: 'callSubAgent',
arguments: '{}',
id: 'tool-call-2',
identifier: 'agent-management',
type: 'default',
},
];
const parkedState = {
...mockState,
interruption: {
canResume: true,
interruptedAt: new Date().toISOString(),
reason: 'async_tool',
},
pendingToolsCalling: pendingTools,
status: 'waiting_for_async_tool',
};
const refreshedMessages = [
{ content: 'use tools', id: 'user-msg-1', role: 'user' },
{
children: [
{
id: 'assistant-msg-1',
role: 'assistant',
tools: [
{ ...pendingTools[0], result_msg_id: 'tool-msg-1' },
{ ...pendingTools[1], result_msg_id: 'tool-msg-2' },
],
},
],
id: 'assistant-group-1',
role: 'assistantGroup',
},
];
const mockStepResult = {
events: [],
newState: { ...parkedState, pendingToolsCalling: [], status: 'done', stepCount: 2 },
nextContext: null,
};
const mockRuntime = { step: vi.fn().mockResolvedValue(mockStepResult) };
mockCoordinator.loadAgentState.mockResolvedValue(parkedState);
vi.spyOn(service as any, 'refreshMessagesFromDB').mockResolvedValue(refreshedMessages);
vi.spyOn(service as any, 'createAgentRuntime').mockReturnValue({ runtime: mockRuntime });
await service.executeStep({ ...mockParams, resumeAsyncTool: true });
expect(mockRuntime.step).toHaveBeenCalledWith(
expect.objectContaining({
messages: refreshedMessages,
pendingToolsCalling: [],
status: 'running',
}),
expect.objectContaining({
payload: { parentMessageId: 'tool-msg-2' },
phase: 'user_input',
}),
);
});
it('should handle missing agent state', async () => {
mockCoordinator.loadAgentState.mockResolvedValue(null);
@@ -813,6 +813,11 @@ export class AgentRuntimeService {
// results written out-of-band), and re-enter the LLM with them.
if (resumeAsyncTool && currentState.status === 'waiting_for_async_tool') {
const refreshed = await this.refreshMessagesFromDB(currentState);
const pendingTools = (currentState.pendingToolsCalling ?? []) as ChatToolPayload[];
const resumeParentMessageId = this.resolveAsyncToolResumeParentMessageId(
refreshed,
pendingTools,
);
currentState = structuredClone(currentState);
currentState.messages = refreshed;
currentState.pendingToolsCalling = [];
@@ -820,14 +825,15 @@ export class AgentRuntimeService {
currentState.interruption = undefined;
currentState.lastModified = new Date().toISOString();
currentContext = {
payload: { parentMessageId: refreshed.at(-1)?.id },
payload: { parentMessageId: resumeParentMessageId },
phase: 'user_input',
} as AgentRuntimeContext;
log(
'[%s][%d] Resuming from async tool with %d messages',
'[%s][%d] Resuming from async tool with %d messages (parent=%s)',
operationId,
stepIndex,
refreshed.length,
resumeParentMessageId,
);
}
@@ -1828,6 +1834,65 @@ export class AgentRuntimeService {
return flatList as AgentState['messages'];
}
private resolveAsyncToolResumeParentMessageId(
messages: AgentState['messages'],
pendingTools: ChatToolPayload[],
): string | undefined {
const fallbackParentMessageId = messages.at(-1)?.id;
if (pendingTools.length === 0) return fallbackParentMessageId;
const toolResultMessageIds = new Map<string, string>();
const collectToolResultIds = (message: unknown) => {
if (!message || typeof message !== 'object') return;
const candidate = message as {
children?: unknown;
id?: unknown;
tool_call_id?: unknown;
tools?: unknown;
};
if (typeof candidate.tool_call_id === 'string' && typeof candidate.id === 'string') {
toolResultMessageIds.set(candidate.tool_call_id, candidate.id);
}
if (Array.isArray(candidate.tools)) {
for (const tool of candidate.tools) {
if (!tool || typeof tool !== 'object') continue;
const toolPayload = tool as { id?: unknown; result_msg_id?: unknown };
if (
typeof toolPayload.id === 'string' &&
typeof toolPayload.result_msg_id === 'string'
) {
toolResultMessageIds.set(toolPayload.id, toolPayload.result_msg_id);
}
}
}
if (Array.isArray(candidate.children)) {
for (const child of candidate.children) {
collectToolResultIds(child);
}
}
};
for (const message of messages) {
collectToolResultIds(message);
}
for (let index = pendingTools.length - 1; index >= 0; index -= 1) {
const pendingTool = pendingTools[index];
if (pendingTool.result_msg_id) return pendingTool.result_msg_id;
const resultMessageId = toolResultMessageIds.get(pendingTool.id);
if (resultMessageId) return resultMessageId;
}
return fallbackParentMessageId;
}
/**
* Create Agent Runtime instance
*/