diff --git a/apps/server/src/modules/AgentRuntime/RuntimeExecutors.ts b/apps/server/src/modules/AgentRuntime/RuntimeExecutors.ts index 09695f9e4f..d4f384b0e8 100644 --- a/apps/server/src/modules/AgentRuntime/RuntimeExecutors.ts +++ b/apps/server/src/modules/AgentRuntime/RuntimeExecutors.ts @@ -61,6 +61,7 @@ import { chainCompressContext } from '@lobechat/prompts'; import { type ChatToolPayload, type ExecSubAgentParams, + type ExecVirtualSubAgentParams, type MessageToolCall, type UIChatMessage, } from '@lobechat/types'; @@ -331,8 +332,9 @@ const buildPostProcessUrl = ( * The runner creates the pending placeholder tool message that anchors the * isolation thread (so the UI shows a loading state and the completion bridge * has a message to backfill), then kicks off the child op asynchronously and - * returns immediately. Returns `undefined` when sub-agent execution is not - * available (no `execSubAgent` callback, or missing agent/topic context). + * returns immediately. Returns `undefined` when virtual sub-agent execution is + * not available (no `execVirtualSubAgent` callback, or missing agent/topic + * context). */ const buildServerSubAgentRunner = ( ctx: RuntimeExecutorContext, @@ -340,8 +342,8 @@ const buildServerSubAgentRunner = ( chatToolPayload: ChatToolPayload, parentMessageId: string, ): ServerSubAgentRunner | undefined => { - const execSubAgent = ctx.execSubAgent; - if (!execSubAgent) return undefined; + const execVirtualSubAgent = ctx.execVirtualSubAgent; + if (!execVirtualSubAgent) return undefined; const agentId = state.metadata?.agentId; const topicId = ctx.topicId ?? state.metadata?.topicId; @@ -364,17 +366,15 @@ const buildServerSubAgentRunner = ( topicId, }); - // 2. Fork the child op anchored to the placeholder. `resumeParentOnComplete` - // tells execSubAgent to register the completion bridge that - // backfills this tool message and resumes the parent op. - const result = (await execSubAgent({ + // 2. Fork the virtual child op anchored to the placeholder. The virtual + // entry marks the child as `isSubAgent` and registers the completion + // bridge that backfills this tool message and resumes the parent op. + const result = (await execVirtualSubAgent({ agentId: targetAgentId ?? agentId, groupId: state.metadata?.groupId ?? undefined, instruction, - isSubAgent: true, parentMessageId: placeholder.id, parentOperationId: ctx.operationId, - resumeParentOnComplete: true, timeout, title: description, topicId, @@ -523,11 +523,17 @@ export interface RuntimeExecutorContext { discordContext?: any; evalContext?: EvalContext; /** - * Callback to spawn a sub-agent task server-side. + * Callback to run a legacy agent invocation server-side. * Injected by AiAgentService so exec_sub_agent / exec_sub_agents executors - * can dispatch callAgent-triggered tasks without a circular import. + * can dispatch callAgent-triggered runs without a circular import. */ execSubAgent?: (params: ExecSubAgentParams) => Promise; + /** + * Callback to fork a `lobe-agent.callSubAgent` virtual child run. Unlike + * execSubAgent, this path installs the async completion bridge and marks the + * child operation as a sub-agent. + */ + execVirtualSubAgent?: (params: ExecVirtualSubAgentParams) => Promise; hookDispatcher?: HookDispatcher; loadAgentState?: (operationId: string) => Promise; messageModel: MessageModel; diff --git a/apps/server/src/services/agentRuntime/AgentRuntimeService.ts b/apps/server/src/services/agentRuntime/AgentRuntimeService.ts index 038b593fa6..be21776f5b 100644 --- a/apps/server/src/services/agentRuntime/AgentRuntimeService.ts +++ b/apps/server/src/services/agentRuntime/AgentRuntimeService.ts @@ -25,7 +25,12 @@ import { invokeAgentSpanName, tracer as agentRuntimeTracer, } from '@lobechat/observability-otel/modules/agent-runtime'; -import { type ChatToolPayload, type ExecSubAgentParams, type UIChatMessage } from '@lobechat/types'; +import { + type ChatToolPayload, + type ExecSubAgentParams, + type ExecVirtualSubAgentParams, + type UIChatMessage, +} from '@lobechat/types'; import debug from 'debug'; import urlJoin from 'url-join'; @@ -126,13 +131,17 @@ const toAgentSignalSnapshotEvents = ( */ export interface AgentRuntimeDelegate { /** - * Fork a sub-agent through the full high-level pipeline + * Run a legacy agent invocation through the full high-level pipeline * (AiAgentService.execSubAgent → execAgent: agent-config resolution, tool - * engine, context engineering, createOperation). Returns a deferred result; - * the parent op parks (`waiting_for_async_tool`) until the completion bridge - * backfills the placeholder and resumes it. + * engine, context engineering, createOperation). */ execSubAgent?: (params: ExecSubAgentParams) => Promise; + /** + * Fork a `lobe-agent.callSubAgent` virtual child run. The child is marked as a + * sub-agent and owns the completion bridge that backfills the parent tool + * placeholder before resuming the parked parent operation. + */ + execVirtualSubAgent?: (params: ExecVirtualSubAgentParams) => Promise; } export interface AgentRuntimeServiceOptions { @@ -1864,10 +1873,7 @@ export class AgentRuntimeService { if (!tool || typeof tool !== 'object') continue; const toolPayload = tool as { id?: unknown; result_msg_id?: unknown }; - if ( - typeof toolPayload.id === 'string' && - typeof toolPayload.result_msg_id === 'string' - ) { + if (typeof toolPayload.id === 'string' && typeof toolPayload.result_msg_id === 'string') { toolResultMessageIds.set(toolPayload.id, toolPayload.result_msg_id); } } @@ -1944,6 +1950,7 @@ export class AgentRuntimeService { userTimezone: metadata?.userTimezone, evalContext: metadata?.evalContext, execSubAgent: this.delegate.execSubAgent, + execVirtualSubAgent: this.delegate.execVirtualSubAgent, hookDispatcher, loadAgentState: this.coordinator.loadAgentState.bind(this.coordinator), messageModel: this.messageModel, diff --git a/apps/server/src/services/aiAgent/__tests__/execGroupSubAgentTask.test.ts b/apps/server/src/services/aiAgent/__tests__/execSubAgent.test.ts similarity index 97% rename from apps/server/src/services/aiAgent/__tests__/execGroupSubAgentTask.test.ts rename to apps/server/src/services/aiAgent/__tests__/execSubAgent.test.ts index 2eaf3a3018..44ef1e7d0b 100644 --- a/apps/server/src/services/aiAgent/__tests__/execGroupSubAgentTask.test.ts +++ b/apps/server/src/services/aiAgent/__tests__/execSubAgent.test.ts @@ -121,7 +121,7 @@ describe('AiAgentService.execSubAgent', () => { service = new AiAgentService(mockDb, userId); }); - describe('successful task execution', () => { + describe('successful isolated execution', () => { it('should create Thread with correct parameters', async () => { // Mock execAgent to return success vi.spyOn(service, 'execAgent').mockResolvedValue({ @@ -214,6 +214,7 @@ describe('AiAgentService.execSubAgent', () => { agentId: 'agent-1', appContext: { groupId: 'group-1', + isSubAgent: false, threadId: 'thread-123', topicId: 'topic-1', }, @@ -229,7 +230,7 @@ describe('AiAgentService.execSubAgent', () => { }); }); - it('should mark deferred lobe-agent sub-agent children as sub-agents', async () => { + it('should run deferred lobe-agent children through execVirtualSubAgent', async () => { const execAgentSpy = vi.spyOn(service, 'execAgent').mockResolvedValue({ agentId: 'agent-1', assistantMessageId: 'assistant-msg-1', @@ -245,13 +246,11 @@ describe('AiAgentService.execSubAgent', () => { userMessageId: 'user-msg-1', }); - await service.execSubAgent({ + await service.execVirtualSubAgent({ agentId: 'agent-1', instruction: 'Nested research task', - isSubAgent: true, parentMessageId: 'tool-msg-1', parentOperationId: 'parent-op-1', - resumeParentOnComplete: true, topicId: 'topic-1', }); @@ -262,6 +261,9 @@ describe('AiAgentService.execSubAgent', () => { threadId: 'thread-123', topicId: 'topic-1', }), + hooks: expect.arrayContaining([ + expect.objectContaining({ id: 'sub-agent-bridge', type: 'onComplete' }), + ]), parentOperationId: 'parent-op-1', trigger: 'cli', }), @@ -454,7 +456,7 @@ describe('AiAgentService.execSubAgent', () => { parentMessageId: 'parent-msg-1', topicId: 'topic-1', }), - ).rejects.toThrow('Failed to create thread for task execution'); + ).rejects.toThrow('Failed to create thread for agent execution'); }); it('should throw error when Thread creation throws', async () => { @@ -472,7 +474,7 @@ describe('AiAgentService.execSubAgent', () => { }); }); - describe('task message summary update', () => { + describe('source message summary update', () => { it('should pass sourceMessageId (parentMessageId) to callbacks for summary update', async () => { const execAgentSpy = vi.spyOn(service, 'execAgent').mockResolvedValue({ agentId: 'agent-1', diff --git a/apps/server/src/services/aiAgent/index.ts b/apps/server/src/services/aiAgent/index.ts index 40ca10057d..1ae3c4356f 100644 --- a/apps/server/src/services/aiAgent/index.ts +++ b/apps/server/src/services/aiAgent/index.ts @@ -36,6 +36,7 @@ import type { ExecGroupAgentResult, ExecSubAgentParams, ExecSubAgentResult, + ExecVirtualSubAgentParams, LobeAgentAgencyConfig, MessagePluginItem, UserInterventionConfig, @@ -318,9 +319,10 @@ export class AiAgentService { // high-level pipelines mid-step. See AgentRuntimeDelegate. New high-level // capabilities the runtime calls into go in this `delegate` object. // - // `execSubAgent` is an auto-bound arrow field, so no `.bind(this)`. + // Arrow fields are auto-bound, so no `.bind(this)`. delegate: { execSubAgent: this.execSubAgent, + execVirtualSubAgent: this.execVirtualSubAgent, }, workspaceId: wsId, }); @@ -2856,36 +2858,46 @@ export class AiAgentService { } /** - * Execute SubAgent task (supports both Group and Single Agent mode) + * Execute an agent in an isolated Thread context. * - * This method is called by Supervisor (Group mode) or Agent (Single mode) - * to delegate tasks to SubAgents. Each task runs in an isolated Thread context. - * - * - Group mode: pass groupId, Thread will be associated with the Group - * - Single Agent mode: omit groupId, Thread will only be associated with the Agent - * - * Flow: - * 1. Create Thread (type='isolation', status='processing') - * 2. Delegate to execAgent with threadId in appContext - * 3. Store operationId in Thread metadata + * Group/callAgent paths use this entry. It does not mark the child as a + * virtual sub-agent and it does not install the async completion bridge. */ - // Arrow field (not a method) so it stays bound to this instance when handed to - // AgentRuntimeService as the `execSubAgent` fork callback — no `.bind(this)`. - execSubAgent = async (params: ExecSubAgentParams): Promise => { - const { - groupId, - topicId, - parentMessageId, - agentId, - instruction, - isSubAgent, - title, - parentOperationId, - resumeParentOnComplete, - } = params; + // Arrow field (not a method) so it stays bound when handed to AgentRuntimeService. + execSubAgent = async (params: ExecSubAgentParams): Promise => + this.execAgentThreadRun(params, { + isSubAgent: false, + logScope: 'execSubAgent', + }); + + /** + * Execute a virtual sub-agent created by `lobe-agent.callSubAgent`. + * + * This path is a child operation of the current agent run. It is marked as a + * sub-agent so it cannot recursively spawn more sub-agents, and it registers + * the bridge that backfills the parent's placeholder tool message. + */ + execVirtualSubAgent = async (params: ExecVirtualSubAgentParams): Promise => + this.execAgentThreadRun(params, { + isSubAgent: true, + logScope: 'execVirtualSubAgent', + resumeParentOnComplete: true, + }); + + private async execAgentThreadRun( + params: ExecSubAgentParams | ExecVirtualSubAgentParams, + options: { + isSubAgent: boolean; + logScope: 'execSubAgent' | 'execVirtualSubAgent'; + resumeParentOnComplete?: boolean; + }, + ): Promise { + const { groupId, topicId, parentMessageId, agentId, instruction, title, parentOperationId } = + params; log( - 'execSubAgent: agentId=%s, groupId=%s, topicId=%s, instruction=%s', + '%s: agentId=%s, groupId=%s, topicId=%s, instruction=%s', + options.logScope, agentId, groupId, topicId, @@ -2904,7 +2916,7 @@ export class AiAgentService { .catch(() => {}); } - // 1. Create Thread for isolated task execution + // 1. Create Thread for isolated agent execution const thread = await this.threadModel.create({ agentId, groupId, @@ -2915,10 +2927,10 @@ export class AiAgentService { }); if (!thread) { - throw new Error('Failed to create thread for task execution'); + throw new Error('Failed to create thread for agent execution'); } - log('execSubAgent: created thread %s', thread.id); + log('%s: created thread %s', options.logScope, thread.id); // 2. Update Thread status to processing with startedAt timestamp const startedAt = new Date().toISOString(); @@ -2927,14 +2939,14 @@ export class AiAgentService { status: ThreadStatus.Processing, }); - // 3. Create hooks for updating Thread metadata and task message + // 3. Create hooks for updating Thread metadata and source message const threadHooks = this.createThreadHooks(thread.id, startedAt, parentMessageId); - // For the deferred-tool path, also register the completion bridge that + // For the virtual sub-agent path, also register the completion bridge that // backfills the parent's placeholder tool message and resumes the parked - // parent op once the whole batch is done. Registered last so its - // tool-message backfill (content + pluginState) is the final write. + // parent op once the child run is done. Registered last so its tool-message + // backfill (content + pluginState) is the final write. const hooks = - resumeParentOnComplete && parentOperationId + options.resumeParentOnComplete && parentOperationId ? [ ...threadHooks, this.createSubAgentBridgeHook(parentOperationId, parentMessageId, thread.id), @@ -2954,20 +2966,20 @@ export class AiAgentService { ).findById(parentOperationId); inheritedTrigger = parentOp?.trigger ?? undefined; } catch (error) { - log('execSubAgent: failed to read parent operation trigger: %O', error); + log('%s: failed to read parent operation trigger: %O', options.logScope, error); } } const appContext: NonNullable = { groupId, + isSubAgent: options.isSubAgent, threadId: thread.id, topicId, }; - if (isSubAgent) appContext.isSubAgent = true; // 4. Delegate to execAgent with threadId in appContext and hooks // The instruction will be created as user message in the Thread - // Use headless mode to skip human approval in async task execution + // Use headless mode to skip human approval in async agent execution const result = await this.execAgent({ agentId, appContext, @@ -2980,7 +2992,8 @@ export class AiAgentService { }); log( - 'execSubAgent: delegated to execAgent, operationId=%s, success=%s', + '%s: delegated to execAgent, operationId=%s, success=%s', + options.logScope, result.operationId, result.success, ); @@ -3036,7 +3049,7 @@ export class AiAgentService { success: result.success ?? false, threadId: thread.id, }; - }; + } /** * Create step lifecycle callbacks for updating Thread metadata @@ -3044,7 +3057,7 @@ export class AiAgentService { * * @param threadId - The Thread ID to update * @param startedAt - The start time ISO string - * @param sourceMessageId - The task message ID (sourceMessageId from Thread) to update with summary + * @param sourceMessageId - The source message ID from Thread to update with summary */ private createThreadMetadataCallbacks( threadId: string, @@ -3109,13 +3122,13 @@ export class AiAgentService { } } - // Log error when task fails + // Log error when the isolated run fails if (reason === 'error' && finalState.error) { - console.error('execSubAgent: task failed for thread %s:', threadId, finalState.error); + console.error('execSubAgent: run failed for thread %s:', threadId, finalState.error); } try { - // Extract summary from last assistant message and update task message content + // Extract summary from last assistant message and update source message content const lastAssistantMessage = finalState.messages ?.slice() .reverse() @@ -3125,7 +3138,7 @@ export class AiAgentService { await this.messageModel.update(sourceMessageId, { content: lastAssistantMessage.content, }); - log('execSubAgent: updated task message %s with summary', sourceMessageId); + log('execSubAgent: updated source message %s with summary', sourceMessageId); } // Format error for proper serialization (Error objects don't serialize with JSON.stringify) @@ -3234,14 +3247,14 @@ export class AiAgentService { if (event.reason === 'error' && finalState.error) { console.error( - 'Thread hook onComplete: task failed for thread %s:', + 'Thread hook onComplete: run failed for thread %s:', threadId, finalState.error, ); } try { - // Update task message with summary + // Update source message with summary const lastAssistantMessage = finalState.messages ?.slice() .reverse() diff --git a/apps/server/src/services/toolExecution/types.ts b/apps/server/src/services/toolExecution/types.ts index 0866ddeeec..d482ea0f77 100644 --- a/apps/server/src/services/toolExecution/types.ts +++ b/apps/server/src/services/toolExecution/types.ts @@ -61,9 +61,9 @@ export interface ToolExecutionContext { /** Current page document ID for page-scoped conversations */ documentId?: string | null; /** - * Spawn a sub-agent as an independent async operation. Injected by the agent - * runtime (forwarded from `RuntimeExecutorContext.execSubAgent`) so the - * `callSubAgent` server tool can fork a child op without a circular import. + * Legacy agent invocation callback forwarded from RuntimeExecutorContext. + * Kept for tool runtimes that still dispatch through exec_sub_agent style + * flows; `lobe-agent.callSubAgent` uses the per-call `subAgent` runner below. */ execSubAgent?: (params: ExecSubAgentParams) => Promise; /** Per-call execution timeout resolved by the agent runtime. */ diff --git a/packages/types/src/agentExecution/index.ts b/packages/types/src/agentExecution/index.ts index a4cb7cd6ad..6a1c208c46 100644 --- a/packages/types/src/agentExecution/index.ts +++ b/packages/types/src/agentExecution/index.ts @@ -250,39 +250,57 @@ export interface ExecGroupAgentResponse { userMessageId: string; } -// ============ SubAgent Task Execution Types ============ +// ============ SubAgent Execution Types ============ /** - * Parameters for execSubAgent - execute SubAgent task + * Parameters for execSubAgent - execute an agent in an isolated thread * Supports both Group mode and Single Agent mode * * - Group mode: pass groupId, Thread will be associated with the Group * - Single Agent mode: omit groupId, Thread will only be associated with the Agent */ export interface ExecSubAgentParams { - /** The SubAgent ID to execute the task */ + /** The agent ID to execute */ agentId: string; /** The Group ID (optional, only for Group mode) */ groupId?: string; - /** Task instruction/prompt for the SubAgent */ + /** Instruction/prompt for the agent */ instruction: string; - /** Whether the child run is a lobe-agent virtual sub-agent and must not spawn more sub-agents */ - isSubAgent?: boolean; - /** The parent message ID (Supervisor's tool call message or task message) */ + /** The parent message ID that anchors the isolated thread */ parentMessageId: string; /** Parent operation ID for dispatching callAgent hooks */ parentOperationId?: string; - /** - * When true, register the completion bridge that backfills the parent's - * placeholder tool message with this sub-agent's result and resumes the - * parked parent op (`waiting_for_async_tool` → running). Used by the server - * `callSubAgent` deferred-tool path; left false for the legacy fire-and-forget - * task dispatch. - */ - resumeParentOnComplete?: boolean; /** Timeout in milliseconds (optional) */ timeout?: number; - /** Task title (shown in UI, used as thread title) */ + /** Thread title shown in UI */ + title?: string; + /** The Topic ID */ + topicId: string; +} + +/** + * Parameters for execVirtualSubAgent - execute a `lobe-agent.callSubAgent` + * child run. + * + * Virtual sub-agents are tool-created isolated runs. They are marked with + * `appContext.isSubAgent` so the child cannot recursively spawn more + * sub-agents, and they install the completion bridge that backfills the + * parent's placeholder tool message before resuming the parent operation. + */ +export interface ExecVirtualSubAgentParams { + /** The agent ID to execute */ + agentId: string; + /** The Group ID inherited from the parent operation, when present */ + groupId?: string; + /** Instruction/prompt for the virtual sub-agent */ + instruction: string; + /** The parent placeholder tool message ID */ + parentMessageId: string; + /** Parent operation ID to bridge and resume on completion */ + parentOperationId: string; + /** Timeout in milliseconds (optional) */ + timeout?: number; + /** Thread title shown in UI */ title?: string; /** The Topic ID */ topicId: string; @@ -292,15 +310,15 @@ export interface ExecSubAgentParams { * Result from execSubAgent */ export interface ExecSubAgentResult { - /** The assistant message ID created for this task */ + /** The assistant message ID created for this run */ assistantMessageId: string; - /** Error message if task failed to start */ + /** Error message if execution failed to start */ error?: string; /** Operation ID for tracking execution status */ operationId: string; - /** Whether the task was created successfully */ + /** Whether the execution was created successfully */ success: boolean; - /** The Thread ID where the task is executed */ + /** The Thread ID where the execution is isolated */ threadId: string; }