♻️ refactor(agent): split virtual sub-agent entry (#15733)

This commit is contained in:
Arvin Xu
2026-06-13 02:10:47 +08:00
committed by GitHub
parent 6887930428
commit 5362be4078
6 changed files with 144 additions and 98 deletions
@@ -61,6 +61,7 @@ import { chainCompressContext } from '@lobechat/prompts';
import {
type ChatToolPayload,
type ExecSubAgentParams,
type ExecVirtualSubAgentParams,
type MessageToolCall,
type UIChatMessage,
} from '@lobechat/types';
@@ -331,8 +332,9 @@ const buildPostProcessUrl = (
* The runner creates the pending placeholder tool message that anchors the
* isolation thread (so the UI shows a loading state and the completion bridge
* has a message to backfill), then kicks off the child op asynchronously and
* returns immediately. Returns `undefined` when sub-agent execution is not
* available (no `execSubAgent` callback, or missing agent/topic context).
* returns immediately. Returns `undefined` when virtual sub-agent execution is
* not available (no `execVirtualSubAgent` callback, or missing agent/topic
* context).
*/
const buildServerSubAgentRunner = (
ctx: RuntimeExecutorContext,
@@ -340,8 +342,8 @@ const buildServerSubAgentRunner = (
chatToolPayload: ChatToolPayload,
parentMessageId: string,
): ServerSubAgentRunner | undefined => {
const execSubAgent = ctx.execSubAgent;
if (!execSubAgent) return undefined;
const execVirtualSubAgent = ctx.execVirtualSubAgent;
if (!execVirtualSubAgent) return undefined;
const agentId = state.metadata?.agentId;
const topicId = ctx.topicId ?? state.metadata?.topicId;
@@ -364,17 +366,15 @@ const buildServerSubAgentRunner = (
topicId,
});
// 2. Fork the child op anchored to the placeholder. `resumeParentOnComplete`
// tells execSubAgent to register the completion bridge that
// backfills this tool message and resumes the parent op.
const result = (await execSubAgent({
// 2. Fork the virtual child op anchored to the placeholder. The virtual
// entry marks the child as `isSubAgent` and registers the completion
// bridge that backfills this tool message and resumes the parent op.
const result = (await execVirtualSubAgent({
agentId: targetAgentId ?? agentId,
groupId: state.metadata?.groupId ?? undefined,
instruction,
isSubAgent: true,
parentMessageId: placeholder.id,
parentOperationId: ctx.operationId,
resumeParentOnComplete: true,
timeout,
title: description,
topicId,
@@ -523,11 +523,17 @@ export interface RuntimeExecutorContext {
discordContext?: any;
evalContext?: EvalContext;
/**
* Callback to spawn a sub-agent task server-side.
* Callback to run a legacy agent invocation server-side.
* Injected by AiAgentService so exec_sub_agent / exec_sub_agents executors
* can dispatch callAgent-triggered tasks without a circular import.
* can dispatch callAgent-triggered runs without a circular import.
*/
execSubAgent?: (params: ExecSubAgentParams) => Promise<unknown>;
/**
* Callback to fork a `lobe-agent.callSubAgent` virtual child run. Unlike
* execSubAgent, this path installs the async completion bridge and marks the
* child operation as a sub-agent.
*/
execVirtualSubAgent?: (params: ExecVirtualSubAgentParams) => Promise<unknown>;
hookDispatcher?: HookDispatcher;
loadAgentState?: (operationId: string) => Promise<AgentState | null>;
messageModel: MessageModel;
@@ -25,7 +25,12 @@ import {
invokeAgentSpanName,
tracer as agentRuntimeTracer,
} from '@lobechat/observability-otel/modules/agent-runtime';
import { type ChatToolPayload, type ExecSubAgentParams, type UIChatMessage } from '@lobechat/types';
import {
type ChatToolPayload,
type ExecSubAgentParams,
type ExecVirtualSubAgentParams,
type UIChatMessage,
} from '@lobechat/types';
import debug from 'debug';
import urlJoin from 'url-join';
@@ -126,13 +131,17 @@ const toAgentSignalSnapshotEvents = (
*/
export interface AgentRuntimeDelegate {
/**
* Fork a sub-agent through the full high-level pipeline
* Run a legacy agent invocation through the full high-level pipeline
* (AiAgentService.execSubAgent → execAgent: agent-config resolution, tool
* engine, context engineering, createOperation). Returns a deferred result;
* the parent op parks (`waiting_for_async_tool`) until the completion bridge
* backfills the placeholder and resumes it.
* engine, context engineering, createOperation).
*/
execSubAgent?: (params: ExecSubAgentParams) => Promise<unknown>;
/**
* Fork a `lobe-agent.callSubAgent` virtual child run. The child is marked as a
* sub-agent and owns the completion bridge that backfills the parent tool
* placeholder before resuming the parked parent operation.
*/
execVirtualSubAgent?: (params: ExecVirtualSubAgentParams) => Promise<unknown>;
}
export interface AgentRuntimeServiceOptions {
@@ -1864,10 +1873,7 @@ export class AgentRuntimeService {
if (!tool || typeof tool !== 'object') continue;
const toolPayload = tool as { id?: unknown; result_msg_id?: unknown };
if (
typeof toolPayload.id === 'string' &&
typeof toolPayload.result_msg_id === 'string'
) {
if (typeof toolPayload.id === 'string' && typeof toolPayload.result_msg_id === 'string') {
toolResultMessageIds.set(toolPayload.id, toolPayload.result_msg_id);
}
}
@@ -1944,6 +1950,7 @@ export class AgentRuntimeService {
userTimezone: metadata?.userTimezone,
evalContext: metadata?.evalContext,
execSubAgent: this.delegate.execSubAgent,
execVirtualSubAgent: this.delegate.execVirtualSubAgent,
hookDispatcher,
loadAgentState: this.coordinator.loadAgentState.bind(this.coordinator),
messageModel: this.messageModel,
@@ -121,7 +121,7 @@ describe('AiAgentService.execSubAgent', () => {
service = new AiAgentService(mockDb, userId);
});
describe('successful task execution', () => {
describe('successful isolated execution', () => {
it('should create Thread with correct parameters', async () => {
// Mock execAgent to return success
vi.spyOn(service, 'execAgent').mockResolvedValue({
@@ -214,6 +214,7 @@ describe('AiAgentService.execSubAgent', () => {
agentId: 'agent-1',
appContext: {
groupId: 'group-1',
isSubAgent: false,
threadId: 'thread-123',
topicId: 'topic-1',
},
@@ -229,7 +230,7 @@ describe('AiAgentService.execSubAgent', () => {
});
});
it('should mark deferred lobe-agent sub-agent children as sub-agents', async () => {
it('should run deferred lobe-agent children through execVirtualSubAgent', async () => {
const execAgentSpy = vi.spyOn(service, 'execAgent').mockResolvedValue({
agentId: 'agent-1',
assistantMessageId: 'assistant-msg-1',
@@ -245,13 +246,11 @@ describe('AiAgentService.execSubAgent', () => {
userMessageId: 'user-msg-1',
});
await service.execSubAgent({
await service.execVirtualSubAgent({
agentId: 'agent-1',
instruction: 'Nested research task',
isSubAgent: true,
parentMessageId: 'tool-msg-1',
parentOperationId: 'parent-op-1',
resumeParentOnComplete: true,
topicId: 'topic-1',
});
@@ -262,6 +261,9 @@ describe('AiAgentService.execSubAgent', () => {
threadId: 'thread-123',
topicId: 'topic-1',
}),
hooks: expect.arrayContaining([
expect.objectContaining({ id: 'sub-agent-bridge', type: 'onComplete' }),
]),
parentOperationId: 'parent-op-1',
trigger: 'cli',
}),
@@ -454,7 +456,7 @@ describe('AiAgentService.execSubAgent', () => {
parentMessageId: 'parent-msg-1',
topicId: 'topic-1',
}),
).rejects.toThrow('Failed to create thread for task execution');
).rejects.toThrow('Failed to create thread for agent execution');
});
it('should throw error when Thread creation throws', async () => {
@@ -472,7 +474,7 @@ describe('AiAgentService.execSubAgent', () => {
});
});
describe('task message summary update', () => {
describe('source message summary update', () => {
it('should pass sourceMessageId (parentMessageId) to callbacks for summary update', async () => {
const execAgentSpy = vi.spyOn(service, 'execAgent').mockResolvedValue({
agentId: 'agent-1',
+60 -47
View File
@@ -36,6 +36,7 @@ import type {
ExecGroupAgentResult,
ExecSubAgentParams,
ExecSubAgentResult,
ExecVirtualSubAgentParams,
LobeAgentAgencyConfig,
MessagePluginItem,
UserInterventionConfig,
@@ -318,9 +319,10 @@ export class AiAgentService {
// high-level pipelines mid-step. See AgentRuntimeDelegate. New high-level
// capabilities the runtime calls into go in this `delegate` object.
//
// `execSubAgent` is an auto-bound arrow field, so no `.bind(this)`.
// Arrow fields are auto-bound, so no `.bind(this)`.
delegate: {
execSubAgent: this.execSubAgent,
execVirtualSubAgent: this.execVirtualSubAgent,
},
workspaceId: wsId,
});
@@ -2856,36 +2858,46 @@ export class AiAgentService {
}
/**
* Execute SubAgent task (supports both Group and Single Agent mode)
* Execute an agent in an isolated Thread context.
*
* This method is called by Supervisor (Group mode) or Agent (Single mode)
* to delegate tasks to SubAgents. Each task runs in an isolated Thread context.
*
* - Group mode: pass groupId, Thread will be associated with the Group
* - Single Agent mode: omit groupId, Thread will only be associated with the Agent
*
* Flow:
* 1. Create Thread (type='isolation', status='processing')
* 2. Delegate to execAgent with threadId in appContext
* 3. Store operationId in Thread metadata
* Group/callAgent paths use this entry. It does not mark the child as a
* virtual sub-agent and it does not install the async completion bridge.
*/
// Arrow field (not a method) so it stays bound to this instance when handed to
// AgentRuntimeService as the `execSubAgent` fork callback — no `.bind(this)`.
execSubAgent = async (params: ExecSubAgentParams): Promise<ExecSubAgentResult> => {
const {
groupId,
topicId,
parentMessageId,
agentId,
instruction,
isSubAgent,
title,
parentOperationId,
resumeParentOnComplete,
} = params;
// Arrow field (not a method) so it stays bound when handed to AgentRuntimeService.
execSubAgent = async (params: ExecSubAgentParams): Promise<ExecSubAgentResult> =>
this.execAgentThreadRun(params, {
isSubAgent: false,
logScope: 'execSubAgent',
});
/**
* Execute a virtual sub-agent created by `lobe-agent.callSubAgent`.
*
* This path is a child operation of the current agent run. It is marked as a
* sub-agent so it cannot recursively spawn more sub-agents, and it registers
* the bridge that backfills the parent's placeholder tool message.
*/
execVirtualSubAgent = async (params: ExecVirtualSubAgentParams): Promise<ExecSubAgentResult> =>
this.execAgentThreadRun(params, {
isSubAgent: true,
logScope: 'execVirtualSubAgent',
resumeParentOnComplete: true,
});
private async execAgentThreadRun(
params: ExecSubAgentParams | ExecVirtualSubAgentParams,
options: {
isSubAgent: boolean;
logScope: 'execSubAgent' | 'execVirtualSubAgent';
resumeParentOnComplete?: boolean;
},
): Promise<ExecSubAgentResult> {
const { groupId, topicId, parentMessageId, agentId, instruction, title, parentOperationId } =
params;
log(
'execSubAgent: agentId=%s, groupId=%s, topicId=%s, instruction=%s',
'%s: agentId=%s, groupId=%s, topicId=%s, instruction=%s',
options.logScope,
agentId,
groupId,
topicId,
@@ -2904,7 +2916,7 @@ export class AiAgentService {
.catch(() => {});
}
// 1. Create Thread for isolated task execution
// 1. Create Thread for isolated agent execution
const thread = await this.threadModel.create({
agentId,
groupId,
@@ -2915,10 +2927,10 @@ export class AiAgentService {
});
if (!thread) {
throw new Error('Failed to create thread for task execution');
throw new Error('Failed to create thread for agent execution');
}
log('execSubAgent: created thread %s', thread.id);
log('%s: created thread %s', options.logScope, thread.id);
// 2. Update Thread status to processing with startedAt timestamp
const startedAt = new Date().toISOString();
@@ -2927,14 +2939,14 @@ export class AiAgentService {
status: ThreadStatus.Processing,
});
// 3. Create hooks for updating Thread metadata and task message
// 3. Create hooks for updating Thread metadata and source message
const threadHooks = this.createThreadHooks(thread.id, startedAt, parentMessageId);
// For the deferred-tool path, also register the completion bridge that
// For the virtual sub-agent path, also register the completion bridge that
// backfills the parent's placeholder tool message and resumes the parked
// parent op once the whole batch is done. Registered last so its
// tool-message backfill (content + pluginState) is the final write.
// parent op once the child run is done. Registered last so its tool-message
// backfill (content + pluginState) is the final write.
const hooks =
resumeParentOnComplete && parentOperationId
options.resumeParentOnComplete && parentOperationId
? [
...threadHooks,
this.createSubAgentBridgeHook(parentOperationId, parentMessageId, thread.id),
@@ -2954,20 +2966,20 @@ export class AiAgentService {
).findById(parentOperationId);
inheritedTrigger = parentOp?.trigger ?? undefined;
} catch (error) {
log('execSubAgent: failed to read parent operation trigger: %O', error);
log('%s: failed to read parent operation trigger: %O', options.logScope, error);
}
}
const appContext: NonNullable<InternalExecAgentParams['appContext']> = {
groupId,
isSubAgent: options.isSubAgent,
threadId: thread.id,
topicId,
};
if (isSubAgent) appContext.isSubAgent = true;
// 4. Delegate to execAgent with threadId in appContext and hooks
// The instruction will be created as user message in the Thread
// Use headless mode to skip human approval in async task execution
// Use headless mode to skip human approval in async agent execution
const result = await this.execAgent({
agentId,
appContext,
@@ -2980,7 +2992,8 @@ export class AiAgentService {
});
log(
'execSubAgent: delegated to execAgent, operationId=%s, success=%s',
'%s: delegated to execAgent, operationId=%s, success=%s',
options.logScope,
result.operationId,
result.success,
);
@@ -3036,7 +3049,7 @@ export class AiAgentService {
success: result.success ?? false,
threadId: thread.id,
};
};
}
/**
* Create step lifecycle callbacks for updating Thread metadata
@@ -3044,7 +3057,7 @@ export class AiAgentService {
*
* @param threadId - The Thread ID to update
* @param startedAt - The start time ISO string
* @param sourceMessageId - The task message ID (sourceMessageId from Thread) to update with summary
* @param sourceMessageId - The source message ID from Thread to update with summary
*/
private createThreadMetadataCallbacks(
threadId: string,
@@ -3109,13 +3122,13 @@ export class AiAgentService {
}
}
// Log error when task fails
// Log error when the isolated run fails
if (reason === 'error' && finalState.error) {
console.error('execSubAgent: task failed for thread %s:', threadId, finalState.error);
console.error('execSubAgent: run failed for thread %s:', threadId, finalState.error);
}
try {
// Extract summary from last assistant message and update task message content
// Extract summary from last assistant message and update source message content
const lastAssistantMessage = finalState.messages
?.slice()
.reverse()
@@ -3125,7 +3138,7 @@ export class AiAgentService {
await this.messageModel.update(sourceMessageId, {
content: lastAssistantMessage.content,
});
log('execSubAgent: updated task message %s with summary', sourceMessageId);
log('execSubAgent: updated source message %s with summary', sourceMessageId);
}
// Format error for proper serialization (Error objects don't serialize with JSON.stringify)
@@ -3234,14 +3247,14 @@ export class AiAgentService {
if (event.reason === 'error' && finalState.error) {
console.error(
'Thread hook onComplete: task failed for thread %s:',
'Thread hook onComplete: run failed for thread %s:',
threadId,
finalState.error,
);
}
try {
// Update task message with summary
// Update source message with summary
const lastAssistantMessage = finalState.messages
?.slice()
.reverse()
@@ -61,9 +61,9 @@ export interface ToolExecutionContext {
/** Current page document ID for page-scoped conversations */
documentId?: string | null;
/**
* Spawn a sub-agent as an independent async operation. Injected by the agent
* runtime (forwarded from `RuntimeExecutorContext.execSubAgent`) so the
* `callSubAgent` server tool can fork a child op without a circular import.
* Legacy agent invocation callback forwarded from RuntimeExecutorContext.
* Kept for tool runtimes that still dispatch through exec_sub_agent style
* flows; `lobe-agent.callSubAgent` uses the per-call `subAgent` runner below.
*/
execSubAgent?: (params: ExecSubAgentParams) => Promise<unknown>;
/** Per-call execution timeout resolved by the agent runtime. */
+38 -20
View File
@@ -250,39 +250,57 @@ export interface ExecGroupAgentResponse {
userMessageId: string;
}
// ============ SubAgent Task Execution Types ============
// ============ SubAgent Execution Types ============
/**
* Parameters for execSubAgent - execute SubAgent task
* Parameters for execSubAgent - execute an agent in an isolated thread
* Supports both Group mode and Single Agent mode
*
* - Group mode: pass groupId, Thread will be associated with the Group
* - Single Agent mode: omit groupId, Thread will only be associated with the Agent
*/
export interface ExecSubAgentParams {
/** The SubAgent ID to execute the task */
/** The agent ID to execute */
agentId: string;
/** The Group ID (optional, only for Group mode) */
groupId?: string;
/** Task instruction/prompt for the SubAgent */
/** Instruction/prompt for the agent */
instruction: string;
/** Whether the child run is a lobe-agent virtual sub-agent and must not spawn more sub-agents */
isSubAgent?: boolean;
/** The parent message ID (Supervisor's tool call message or task message) */
/** The parent message ID that anchors the isolated thread */
parentMessageId: string;
/** Parent operation ID for dispatching callAgent hooks */
parentOperationId?: string;
/**
* When true, register the completion bridge that backfills the parent's
* placeholder tool message with this sub-agent's result and resumes the
* parked parent op (`waiting_for_async_tool` → running). Used by the server
* `callSubAgent` deferred-tool path; left false for the legacy fire-and-forget
* task dispatch.
*/
resumeParentOnComplete?: boolean;
/** Timeout in milliseconds (optional) */
timeout?: number;
/** Task title (shown in UI, used as thread title) */
/** Thread title shown in UI */
title?: string;
/** The Topic ID */
topicId: string;
}
/**
* Parameters for execVirtualSubAgent - execute a `lobe-agent.callSubAgent`
* child run.
*
* Virtual sub-agents are tool-created isolated runs. They are marked with
* `appContext.isSubAgent` so the child cannot recursively spawn more
* sub-agents, and they install the completion bridge that backfills the
* parent's placeholder tool message before resuming the parent operation.
*/
export interface ExecVirtualSubAgentParams {
/** The agent ID to execute */
agentId: string;
/** The Group ID inherited from the parent operation, when present */
groupId?: string;
/** Instruction/prompt for the virtual sub-agent */
instruction: string;
/** The parent placeholder tool message ID */
parentMessageId: string;
/** Parent operation ID to bridge and resume on completion */
parentOperationId: string;
/** Timeout in milliseconds (optional) */
timeout?: number;
/** Thread title shown in UI */
title?: string;
/** The Topic ID */
topicId: string;
@@ -292,15 +310,15 @@ export interface ExecSubAgentParams {
* Result from execSubAgent
*/
export interface ExecSubAgentResult {
/** The assistant message ID created for this task */
/** The assistant message ID created for this run */
assistantMessageId: string;
/** Error message if task failed to start */
/** Error message if execution failed to start */
error?: string;
/** Operation ID for tracking execution status */
operationId: string;
/** Whether the task was created successfully */
/** Whether the execution was created successfully */
success: boolean;
/** The Thread ID where the task is executed */
/** The Thread ID where the execution is isolated */
threadId: string;
}