♻️ refactor(agent): delegate callAgent via server runner

This commit is contained in:
Arvin Xu
2026-06-13 17:23:00 +08:00
parent fa58fd12a0
commit 568701ea77
8 changed files with 466 additions and 41 deletions
@@ -89,6 +89,7 @@ import { FileService } from '@/server/services/file';
import { MessageService } from '@/server/services/message';
import { OnboardingService } from '@/server/services/onboarding';
import {
type ServerAgentDelegationRunner,
type ServerSubAgentRunner,
type ToolExecutionResultResponse,
type ToolExecutionService,
@@ -405,6 +406,97 @@ const buildServerVirtualSubAgentRunner = (
};
};
/**
* Build the per-tool-call delegated-agent runner used by the server
* `agent-management.callAgent` runtime. It creates a parent-visible task card,
* then starts the target agent in an isolation thread. Unlike `callSubAgent`,
* this runner does not park the parent operation for async completion.
*/
const buildServerAgentDelegationRunner = (
ctx: RuntimeExecutorContext,
state: AgentState,
parentMessageId: string,
): ServerAgentDelegationRunner | undefined => {
const execSubAgent = ctx.execSubAgent;
if (!execSubAgent) return undefined;
const parentAgentId = state.metadata?.agentId;
const topicId = ctx.topicId ?? state.metadata?.topicId;
if (!parentAgentId || !topicId) return undefined;
return {
run: async ({ agentId: targetAgentId, description, instruction, timeout }) => {
if (state.metadata?.isSubAgent === true) {
return {
error: 'Agent delegation cannot be triggered from within a sub-agent run.',
started: false,
};
}
const taskMessage = await ctx.messageModel.create({
agentId: parentAgentId,
content: '',
groupId: state.metadata?.groupId ?? undefined,
metadata: {
instruction,
subAgentId: targetAgentId,
taskTitle: description,
},
parentId: parentMessageId,
role: 'task',
threadId: state.metadata?.threadId ?? undefined,
topicId,
});
try {
const result = (await execSubAgent({
agentId: targetAgentId,
groupId: state.metadata?.groupId ?? undefined,
instruction,
parentMessageId: taskMessage.id,
parentOperationId: ctx.operationId,
timeout,
title: description,
topicId,
})) as
| { error?: string; operationId?: string; success?: boolean; threadId?: string }
| undefined;
if (!result?.success) {
const error = result?.error || 'Delegated agent run failed to start.';
await ctx.messageModel.update(taskMessage.id, { content: error });
return {
error,
operationId: result?.operationId,
started: false,
taskMessageId: taskMessage.id,
threadId: result?.threadId,
};
}
return {
operationId: result.operationId,
started: true,
taskMessageId: taskMessage.id,
threadId: result.threadId,
};
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
await ctx.messageModel.update(taskMessage.id, {
content: `Task failed to start: ${message}`,
});
return {
error: message,
started: false,
taskMessageId: taskMessage.id,
};
}
},
};
};
const shouldRetryLLM = (kind: LLMErrorKind, attempt: number, maxRetries: number) =>
kind === 'retry' && attempt <= maxRetries;
@@ -2463,6 +2555,11 @@ export const createRuntimeExecutors = (
toolExecutionService.executeTool(chatToolPayload, {
activeDeviceId: state.metadata?.activeDeviceId,
agentId: state.metadata?.agentId,
agentDelegation: buildServerAgentDelegationRunner(
ctx,
state,
payload.parentMessageId,
),
documentId: state.metadata?.documentId,
execSubAgent: ctx.execSubAgent,
executionTimeoutMs: timeoutMs,
@@ -3045,6 +3142,11 @@ export const createRuntimeExecutors = (
toolExecutionService.executeTool(chatToolPayload, {
activeDeviceId: state.metadata?.activeDeviceId,
agentId: state.metadata?.agentId,
agentDelegation: buildServerAgentDelegationRunner(
ctx,
state,
payload.parentMessageId,
),
documentId: state.metadata?.documentId,
execSubAgent: ctx.execSubAgent,
executionTimeoutMs: timeoutMs,
@@ -3465,7 +3567,7 @@ export const createRuntimeExecutors = (
metadata: {
instruction: task.instruction,
taskTitle: task.description,
...(targetAgentId && targetAgentId !== agentId && { targetAgentId }),
...(targetAgentId && targetAgentId !== agentId && { subAgentId: targetAgentId }),
},
parentId: parentMessageId,
role: 'task',
@@ -3593,7 +3695,7 @@ export const createRuntimeExecutors = (
metadata: {
instruction: task.instruction,
taskTitle: task.description,
...(targetAgentId && targetAgentId !== agentId && { targetAgentId }),
...(targetAgentId && targetAgentId !== agentId && { subAgentId: targetAgentId }),
},
parentId: parentMessageId,
role: 'task',
@@ -4850,10 +4850,9 @@ describe('RuntimeExecutors', () => {
...overrides,
});
it('call_tool sets stop:true in tool_result payload when tool returns execSubAgent state', async () => {
// Simulate agentManagement.callAgent returning execSubAgent state
it('call_tool preserves stop:true for legacy execSubAgent state', async () => {
mockToolExecutionService.executeTool.mockResolvedValue({
content: '🚀 Triggered async task to call agent "target-agent"',
content: 'Legacy async task result',
executionTime: 10,
state: {
parentMessageId: 'tool-msg-id',
@@ -4894,13 +4893,167 @@ describe('RuntimeExecutors', () => {
expect((result.nextContext?.payload as any).stop).toBe(true);
});
it('exec_sub_agent executor creates task message and calls execSubAgent callback', async () => {
const mockExecSubAgentTask = vi
it('call_tool injects agentDelegation runner for server callAgent delegation', async () => {
const mockExecSubAgent = vi
.fn()
.mockResolvedValue({ success: true, operationId: 'child-op', threadId: 'thread-child' });
const ctxWithCallback = {
...ctx,
execSubAgent: mockExecSubAgentTask,
execSubAgent: mockExecSubAgent,
topicId: 'topic-123',
};
mockMessageModel.create
.mockResolvedValueOnce({ id: 'task-msg-id' })
.mockResolvedValueOnce({ id: 'tool-msg-id' });
mockToolExecutionService.executeTool.mockImplementation(
async (_payload: any, context: any) => {
const delegation = await context.agentDelegation.run({
agentId: 'target-agent-id',
description: 'Call agent target-agent',
instruction: 'Do something useful',
timeout: 1_800_000,
});
return {
content: 'Delegated work to agent "target-agent-id"',
executionTime: 10,
state: {
targetAgentId: 'target-agent-id',
taskMessageId: delegation.taskMessageId,
threadId: delegation.threadId,
type: 'agentDelegation',
},
success: delegation.started,
};
},
);
const executors = createRuntimeExecutors(ctxWithCallback);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'assistant-msg-id',
toolCalling: {
apiName: 'callAgent',
arguments: JSON.stringify({
agentId: 'target-agent-id',
instruction: 'Do something useful',
runAsTask: true,
}),
id: 'tool-call-1',
identifier: 'lobe-agent-management',
type: 'default' as const,
},
},
type: 'call_tool' as const,
};
const result = await executors.call_tool!(instruction, state);
expect(mockMessageModel.create).toHaveBeenCalledWith(
expect.objectContaining({
agentId: 'parent-agent-id',
metadata: expect.objectContaining({
subAgentId: 'target-agent-id',
}),
parentId: 'assistant-msg-id',
role: 'task',
topicId: 'topic-123',
}),
);
expect(mockExecSubAgent).toHaveBeenCalledWith(
expect.objectContaining({
agentId: 'target-agent-id',
instruction: 'Do something useful',
parentMessageId: 'task-msg-id',
parentOperationId: 'op-123',
title: 'Call agent target-agent',
topicId: 'topic-123',
}),
);
expect(result.nextContext?.phase).toBe('tool_result');
expect((result.nextContext?.payload as any).stop).toBeUndefined();
});
it('call_tool marks delegated task message when server callAgent delegation fails to start', async () => {
const mockExecSubAgent = vi
.fn()
.mockResolvedValue({ error: 'queue unavailable', operationId: 'child-op', success: false });
const ctxWithCallback = {
...ctx,
execSubAgent: mockExecSubAgent,
topicId: 'topic-123',
};
mockMessageModel.create
.mockResolvedValueOnce({ id: 'task-msg-id' })
.mockResolvedValueOnce({ id: 'tool-msg-id' });
mockToolExecutionService.executeTool.mockImplementation(
async (_payload: any, context: any) => {
const delegation = await context.agentDelegation.run({
agentId: 'target-agent-id',
description: 'Call agent target-agent',
instruction: 'Do something useful',
timeout: 1_800_000,
});
return {
content: delegation.error || 'Delegation failed',
error: {
code: 'AGENT_DELEGATION_START_FAILED',
message: delegation.error,
},
executionTime: 10,
state: {
operationId: delegation.operationId,
targetAgentId: 'target-agent-id',
taskMessageId: delegation.taskMessageId,
threadId: delegation.threadId,
type: 'agentDelegation',
},
success: delegation.started,
};
},
);
const executors = createRuntimeExecutors(ctxWithCallback);
const state = createMockState();
const instruction = {
payload: {
parentMessageId: 'assistant-msg-id',
toolCalling: {
apiName: 'callAgent',
arguments: JSON.stringify({
agentId: 'target-agent-id',
instruction: 'Do something useful',
runAsTask: true,
}),
id: 'tool-call-1',
identifier: 'lobe-agent-management',
type: 'default' as const,
},
},
type: 'call_tool' as const,
};
const result = await executors.call_tool!(instruction, state);
expect(mockMessageModel.update).toHaveBeenCalledWith('task-msg-id', {
content: 'queue unavailable',
});
expect(result.nextContext?.phase).toBe('tool_result');
expect((result.nextContext?.payload as any).isSuccess).toBe(false);
expect((result.nextContext?.payload as any).stop).toBeUndefined();
});
it('exec_sub_agent executor creates task message and calls execSubAgent callback', async () => {
const mockExecSubAgent = vi
.fn()
.mockResolvedValue({ success: true, operationId: 'child-op', threadId: 'thread-child' });
const ctxWithCallback = {
...ctx,
execSubAgent: mockExecSubAgent,
topicId: 'topic-123',
};
@@ -4926,6 +5079,9 @@ describe('RuntimeExecutors', () => {
expect(mockMessageModel.create).toHaveBeenCalledWith(
expect.objectContaining({
agentId: 'parent-agent-id',
metadata: expect.objectContaining({
subAgentId: 'target-agent-id',
}),
role: 'task',
parentId: 'tool-msg-id',
topicId: 'topic-123',
@@ -4933,7 +5089,7 @@ describe('RuntimeExecutors', () => {
);
// execSubAgent callback fired with targetAgentId
expect(mockExecSubAgentTask).toHaveBeenCalledWith(
expect(mockExecSubAgent).toHaveBeenCalledWith(
expect.objectContaining({
agentId: 'target-agent-id',
instruction: 'Do something useful',
@@ -4947,10 +5103,10 @@ describe('RuntimeExecutors', () => {
});
it('exec_sub_agent blocks nested dispatch when current state is already a sub-agent', async () => {
const mockExecSubAgentTask = vi.fn();
const mockExecSubAgent = vi.fn();
const ctxWithCallback = {
...ctx,
execSubAgentTask: mockExecSubAgentTask,
execSubAgent: mockExecSubAgent,
topicId: 'topic-123',
};
@@ -4983,7 +5139,7 @@ describe('RuntimeExecutors', () => {
success: false,
});
expect(mockMessageModel.create).not.toHaveBeenCalled();
expect(mockExecSubAgentTask).not.toHaveBeenCalled();
expect(mockExecSubAgent).not.toHaveBeenCalled();
});
it('exec_sub_agent gracefully skips dispatch when execSubAgent not injected', async () => {
+12 -11
View File
@@ -2941,12 +2941,12 @@ export class AiAgentService {
});
// 3. Create hooks for updating Thread metadata and source message
const threadHooks = this.createThreadHooks(
thread.id,
const threadHooks = this.createThreadHooks({
logScope: options.logScope,
sourceMessageId: parentMessageId,
startedAt,
parentMessageId,
options.logScope,
);
threadId: thread.id,
});
// For the virtual sub-agent path, also register the completion bridge that
// backfills the parent's placeholder tool message and resumes the parked
// parent op once the child run is done. Registered last so its tool-message
@@ -3185,12 +3185,13 @@ export class AiAgentService {
* Create hooks for tracking Thread metadata updates during SubAgent execution.
* Replaces the legacy createThreadMetadataCallbacks with the hooks system.
*/
private createThreadHooks(
threadId: string,
startedAt: string,
sourceMessageId: string,
logScope: 'execSubAgent' | 'execVirtualSubAgent',
): AgentHook[] {
private createThreadHooks(params: {
logScope: 'execSubAgent' | 'execVirtualSubAgent';
sourceMessageId: string;
startedAt: string;
threadId: string;
}): AgentHook[] {
const { logScope, sourceMessageId, startedAt, threadId } = params;
let accumulatedToolCalls = 0;
return [
@@ -74,6 +74,97 @@ describe('agentManagementRuntime', () => {
expect(PluginModel).toHaveBeenCalledWith(expect.anything(), 'user-1', 'workspace-1');
});
describe('callAgent', () => {
it('fails when server agent delegation is unavailable', async () => {
const runtime = createRuntime();
const result = await runtime.callAgent(
{
agentId: 'agent-target',
instruction: 'Do delegated work',
runAsTask: true,
},
{ toolManifestMap: {} },
);
expect(result.success).toBe(false);
expect(result.error).toMatchObject({ code: 'AGENT_DELEGATION_UNAVAILABLE' });
});
it('delegates server callAgent runs through the injected runner', async () => {
const run = vi.fn().mockResolvedValue({
operationId: 'op-child',
started: true,
taskMessageId: 'task-msg',
threadId: 'thread-child',
});
const runtime = createRuntime();
const result = await runtime.callAgent(
{
agentId: 'agent-target',
instruction: 'Do delegated work',
runAsTask: true,
taskTitle: 'Delegated task',
timeout: 1234,
},
{
agentDelegation: { run },
toolManifestMap: {},
},
);
expect(run).toHaveBeenCalledWith({
agentId: 'agent-target',
description: 'Delegated task',
instruction: 'Do delegated work',
timeout: 1234,
});
expect(result.success).toBe(true);
expect(result.state).toMatchObject({
operationId: 'op-child',
targetAgentId: 'agent-target',
taskMessageId: 'task-msg',
threadId: 'thread-child',
type: 'agentDelegation',
});
});
it('returns failure state when delegated run cannot start', async () => {
const run = vi.fn().mockResolvedValue({
error: 'queue unavailable',
operationId: 'op-child',
started: false,
taskMessageId: 'task-msg',
});
const runtime = createRuntime();
const result = await runtime.callAgent(
{
agentId: 'agent-target',
instruction: 'Do delegated work',
runAsTask: true,
},
{
agentDelegation: { run },
toolManifestMap: {},
},
);
expect(result.success).toBe(false);
expect(result.error).toMatchObject({
code: 'AGENT_DELEGATION_START_FAILED',
message: 'queue unavailable',
});
expect(result.state).toMatchObject({
operationId: 'op-child',
targetAgentId: 'agent-target',
taskMessageId: 'task-msg',
type: 'agentDelegation',
});
});
});
describe('searchAgent', () => {
it('reports the real total and a pagination hint when more agents exist', async () => {
mockQueryAgents.mockResolvedValue(makeAgents(20));
@@ -43,20 +43,48 @@ export const agentManagementRuntime: ServerRuntimeRegistration = {
): Promise<ToolExecutionResult> => {
const { agentId, instruction, taskTitle, timeout } = params;
// Server runtime always uses the legacy async invocation path because
// there is no client-side `registerAfterCompletion` callback available
// to execute synchronous agent calls.
return {
content: `🚀 Triggered async task to call agent "${agentId}"${taskTitle ? `: ${taskTitle}` : ''}`,
state: {
parentMessageId: ctx.messageId,
task: {
description: taskTitle || `Call agent ${agentId}`,
instruction,
// Server runtime delegates agent calls asynchronously because there is
// no client-side `registerAfterCompletion` callback available to run a
// synchronous agent handoff.
if (!ctx.agentDelegation) {
return {
content: 'Agent delegation is not available in this runtime.',
error: { code: 'AGENT_DELEGATION_UNAVAILABLE' },
success: false,
};
}
const description = taskTitle || `Call agent ${agentId}`;
const result = await ctx.agentDelegation.run({
agentId,
description,
instruction,
timeout: timeout || 1_800_000,
});
if (!result.started) {
return {
content: result.error || `Failed to delegate work to agent "${agentId}".`,
error: { code: 'AGENT_DELEGATION_START_FAILED', message: result.error },
state: {
operationId: result.operationId,
targetAgentId: agentId,
timeout: timeout || 1_800_000,
taskMessageId: result.taskMessageId,
threadId: result.threadId,
type: 'agentDelegation',
},
type: 'execSubAgent',
success: false,
};
}
return {
content: `Delegated work to agent "${agentId}"${taskTitle ? `: ${taskTitle}` : ''}`,
state: {
operationId: result.operationId,
targetAgentId: agentId,
taskMessageId: result.taskMessageId,
threadId: result.threadId,
type: 'agentDelegation',
},
success: true,
};
@@ -53,9 +53,50 @@ export interface ServerSubAgentRunner {
run: (params: ServerSubAgentRunParams) => Promise<ServerSubAgentRunResult>;
}
export interface ServerAgentDelegationRunParams {
/** Target agent id to execute the delegated work. */
agentId: string;
/** Short label shown in the parent conversation task card and thread title. */
description: string;
/** Detailed instruction/prompt for the delegated agent run. */
instruction: string;
/** Optional per-run timeout in milliseconds. */
timeout?: number;
}
export interface ServerAgentDelegationRunResult {
/** Human-readable failure detail when the delegated run could not start. */
error?: string;
/** The spawned child operation id. */
operationId?: string;
/** Whether the delegated run was actually started. */
started: boolean;
/** Parent conversation task-card message id. */
taskMessageId?: string;
/** Isolation thread holding the delegated agent's execution trace. */
threadId?: string;
}
/**
* Server-side runner for `lobe-agent-management.callAgent`.
*
* It creates a parent-conversation `role: task` card, then starts an isolated
* run using the target agent. Unlike `subAgent`, it does not park/resume the
* parent operation: this is a background delegation.
*/
export interface ServerAgentDelegationRunner {
run: (params: ServerAgentDelegationRunParams) => Promise<ServerAgentDelegationRunResult>;
}
export interface ToolExecutionContext {
/** Target device ID for device proxy tool calls */
activeDeviceId?: string;
/**
* Server-side delegated-agent runner. Used by agent-management `callAgent`
* to create a visible task card and run the target agent in an isolation
* thread without relying on legacy exec_sub_agent tool-result states.
*/
agentDelegation?: ServerAgentDelegationRunner;
/** Agent ID executing the tool call */
agentId?: string;
/** Current page document ID for page-scoped conversations */
@@ -28,7 +28,7 @@ interface TaskMessageProps {
isLatestItem?: boolean;
}
const TaskMessage = memo<TaskMessageProps>(({ id, index, disableEditing }) => {
const TaskMessage = memo<TaskMessageProps>(({ id, index: _index, disableEditing }) => {
const { t } = useTranslation('chat');
// Get message and actionsConfig from ConversationStore
@@ -37,7 +37,9 @@ const TaskMessage = memo<TaskMessageProps>(({ id, index, disableEditing }) => {
const { agentId, groupId, error, role, content, createdAt, metadata, taskDetail } = item;
const avatar = useAgentMeta(agentId);
const legacyTargetAgentId = (metadata as { targetAgentId?: string } | undefined)?.targetAgentId;
const displayAgentId = metadata?.subAgentId || legacyTargetAgentId || agentId;
const avatar = useAgentMeta(displayAgentId);
// Get editing and generating state from ConversationStore
const editing = useConversationStore(messageStateSelectors.isMessageEditing(id));
@@ -58,7 +60,7 @@ const TaskMessage = memo<TaskMessageProps>(({ id, index, disableEditing }) => {
} else {
openChatSettings();
}
}, [isInbox]);
}, [isInbox, openChatSettings, toggleSystemRole]);
const onDoubleClick = useDoubleClickEdit({ disableEditing, error, id, role });
@@ -88,7 +90,7 @@ const TaskMessage = memo<TaskMessageProps>(({ id, index, disableEditing }) => {
>
{taskDetail?.clientMode ? (
<ClientTaskDetail
agentId={agentId !== 'supervisor' ? agentId : undefined}
agentId={displayAgentId !== 'supervisor' ? displayAgentId : undefined}
groupId={groupId}
messageId={id}
taskDetail={taskDetail}
@@ -24,8 +24,12 @@ const TasksMessage = memo<TasksMessageProps>(({ id }) => {
const actionsConfig = useConversationStore((s) => s.actionsBar?.assistant);
const tasks = (item as UIChatMessage)?.tasks?.filter(Boolean) as UIChatMessage[] | undefined;
// Use first task's agentId for avatar, or fallback to undefined
const firstTaskAgentId = tasks?.[0]?.agentId;
// Use first task's delegated target agent when available.
const firstTaskMetadata = tasks?.[0]?.metadata;
const firstTaskLegacyTargetAgentId = (firstTaskMetadata as { targetAgentId?: string } | undefined)
?.targetAgentId;
const firstTaskAgentId =
firstTaskMetadata?.subAgentId || firstTaskLegacyTargetAgentId || tasks?.[0]?.agentId;
const avatar = useAgentMeta(firstTaskAgentId);
if (!tasks || tasks.length === 0) {