♻️ refactor(agent-runtime): clarify virtual sub-agent naming (#15737)

This commit is contained in:
Arvin Xu
2026-06-13 11:10:14 +08:00
committed by GitHub
parent ab958a0b98
commit c7e0c83174
6 changed files with 59 additions and 48 deletions
@@ -324,7 +324,7 @@ const buildPostProcessUrl = (
}; };
/** /**
* Build the per-tool-call server sub-agent runner injected into the tool * Build the per-tool-call server virtual sub-agent runner injected into the tool
* execution context. Closes over the current tool payload + parent message so * execution context. Closes over the current tool payload + parent message so
* the `callSubAgent` server tool can fork a child op without re-deriving the * the `callSubAgent` server tool can fork a child op without re-deriving the
* message anchor (which it cannot do correctly from its own context). * message anchor (which it cannot do correctly from its own context).
@@ -336,7 +336,7 @@ const buildPostProcessUrl = (
* not available (no `execVirtualSubAgent` callback, or missing agent/topic * not available (no `execVirtualSubAgent` callback, or missing agent/topic
* context). * context).
*/ */
const buildServerSubAgentRunner = ( const buildServerVirtualSubAgentRunner = (
ctx: RuntimeExecutorContext, ctx: RuntimeExecutorContext,
state: AgentState, state: AgentState,
chatToolPayload: ChatToolPayload, chatToolPayload: ChatToolPayload,
@@ -388,7 +388,7 @@ const buildServerSubAgentRunner = (
await ctx.messageModel.deleteMessage(placeholder.id); await ctx.messageModel.deleteMessage(placeholder.id);
} catch (error) { } catch (error) {
log( log(
'buildServerSubAgentRunner: failed to clean up placeholder %s: %O', 'buildServerVirtualSubAgentRunner: failed to clean up placeholder %s: %O',
placeholder.id, placeholder.id,
error, error,
); );
@@ -2483,7 +2483,7 @@ export const createRuntimeExecutors = (
scope: state.metadata?.scope, scope: state.metadata?.scope,
serverDB: ctx.serverDB, serverDB: ctx.serverDB,
skipResultTruncation: true, skipResultTruncation: true,
subAgent: buildServerSubAgentRunner( subAgent: buildServerVirtualSubAgentRunner(
ctx, ctx,
state, state,
chatToolPayload, chatToolPayload,
@@ -2725,14 +2725,15 @@ export const createRuntimeExecutors = (
log('[%s:%d] Tool execution completed', operationId, stepIndex); log('[%s:%d] Tool execution completed', operationId, stepIndex);
// When the tool result carries an execSubAgent / execSubAgents state the // When a legacy callAgent task result carries execSubAgent / execSubAgents
// GeneralChatAgent needs `stop: true` in the payload to detect it and // state, the GeneralChatAgent needs `stop: true` in the payload to detect
// emit the matching exec_sub_agent / exec_sub_agents instruction. Without // it and emit the matching exec_sub_agent / exec_sub_agents instruction.
// this flag the agent falls through to the normal LLM-call path and the // Without this flag the agent falls through to the normal LLM-call path
// sub-agent is never spawned. // and the background agent run is never spawned.
const execTaskStateType = executionResult.state?.type as string | undefined; const legacyAgentInvocationStateType = executionResult.state?.type as string | undefined;
const isExecTaskState = const isLegacyAgentInvocationState =
execTaskStateType === 'execSubAgent' || execTaskStateType === 'execSubAgents'; legacyAgentInvocationStateType === 'execSubAgent' ||
legacyAgentInvocationStateType === 'execSubAgents';
executeToolSpan.setAttributes( executeToolSpan.setAttributes(
buildExecuteToolResultAttributes({ attempts: execution.attempts, success: isSuccess }), buildExecuteToolResultAttributes({ attempts: execution.attempts, success: isSuccess }),
@@ -2748,7 +2749,7 @@ export const createRuntimeExecutors = (
isSuccess, isSuccess,
// Pass tool message ID as parentMessageId for the next LLM call // Pass tool message ID as parentMessageId for the next LLM call
parentMessageId: toolMessageId, parentMessageId: toolMessageId,
...(isExecTaskState && { stop: true }), ...(isLegacyAgentInvocationState && { stop: true }),
toolCall: chatToolPayload, toolCall: chatToolPayload,
toolCallId: chatToolPayload.id, toolCallId: chatToolPayload.id,
}, },
@@ -3055,7 +3056,7 @@ export const createRuntimeExecutors = (
scope: state.metadata?.scope, scope: state.metadata?.scope,
serverDB: ctx.serverDB, serverDB: ctx.serverDB,
skipResultTruncation: true, skipResultTruncation: true,
subAgent: buildServerSubAgentRunner( subAgent: buildServerVirtualSubAgentRunner(
ctx, ctx,
state, state,
chatToolPayload, chatToolPayload,
+27 -16
View File
@@ -417,9 +417,10 @@ export class AiAgentService {
* Execute a single agent step against this service's runtime. * Execute a single agent step against this service's runtime.
* *
* Delegates to the internal AgentRuntimeService, which is already wired with * Delegates to the internal AgentRuntimeService, which is already wired with
* the `execSubAgent` fork callback. The QStash step worker drives stepping * the agent-invocation fork callbacks. The QStash step worker drives stepping
* through here so `lobe-agent.callSubAgent` can fork sub-agents — building a * through here so `lobe-agent.callSubAgent` can fork virtual sub-agents —
* bare runtime there would lose the callback and fail with SUB_AGENT_UNAVAILABLE. * building a bare runtime there would lose the callback and fail with
* SUB_AGENT_UNAVAILABLE.
*/ */
executeStep(params: AgentExecutionParams): Promise<AgentExecutionResult> { executeStep(params: AgentExecutionParams): Promise<AgentExecutionResult> {
return this.agentRuntimeService.executeStep(params); return this.agentRuntimeService.executeStep(params);
@@ -2298,7 +2299,7 @@ export class AiAgentService {
: undefined; : undefined;
// 13. Create user message in database // 13. Create user message in database
// Include threadId if provided (for SubAgent task execution in isolated Thread) // Include threadId if provided (for isolated agent execution)
const userMessageRecord = runFromHistory const userMessageRecord = runFromHistory
? undefined ? undefined
: await this.messageModel.create({ : await this.messageModel.create({
@@ -2346,7 +2347,7 @@ export class AiAgentService {
} }
// 14. Create assistant message placeholder in database // 14. Create assistant message placeholder in database
// Include threadId if provided (for SubAgent task execution in isolated Thread) // Include threadId if provided (for isolated agent execution)
const assistantMessageRecord = await this.messageModel.create({ const assistantMessageRecord = await this.messageModel.create({
agentId: persistAgentId, agentId: persistAgentId,
content: LOADING_FLAT, content: LOADING_FLAT,
@@ -2940,7 +2941,12 @@ export class AiAgentService {
}); });
// 3. Create hooks for updating Thread metadata and source message // 3. Create hooks for updating Thread metadata and source message
const threadHooks = this.createThreadHooks(thread.id, startedAt, parentMessageId); const threadHooks = this.createThreadHooks(
thread.id,
startedAt,
parentMessageId,
options.logScope,
);
// For the virtual sub-agent path, also register the completion bridge that // For the virtual sub-agent path, also register the completion bridge that
// backfills the parent's placeholder tool message and resumes the parked // backfills the parent's placeholder tool message and resumes the parked
// parent op once the child run is done. Registered last so its tool-message // parent op once the child run is done. Registered last so its tool-message
@@ -3063,6 +3069,7 @@ export class AiAgentService {
threadId: string, threadId: string,
startedAt: string, startedAt: string,
sourceMessageId: string, sourceMessageId: string,
logScope: 'execSubAgent' | 'execVirtualSubAgent' = 'execSubAgent',
): StepLifecycleCallbacks { ): StepLifecycleCallbacks {
// Accumulator for tracking metrics across steps // Accumulator for tracking metrics across steps
let accumulatedToolCalls = 0; let accumulatedToolCalls = 0;
@@ -3088,9 +3095,9 @@ export class AiAgentService {
totalToolCalls: accumulatedToolCalls, totalToolCalls: accumulatedToolCalls,
}, },
}); });
log('execSubAgent: updated thread %s metadata after step %d', threadId, state.stepCount); log('%s: updated thread %s metadata after step %d', logScope, threadId, state.stepCount);
} catch (error) { } catch (error) {
log('execSubAgent: failed to update thread metadata: %O', error); log('%s: failed to update thread metadata: %O', logScope, error);
} }
}, },
@@ -3124,7 +3131,7 @@ export class AiAgentService {
// Log error when the isolated run fails // Log error when the isolated run fails
if (reason === 'error' && finalState.error) { if (reason === 'error' && finalState.error) {
console.error('execSubAgent: run failed for thread %s:', threadId, finalState.error); console.error('%s: run failed for thread %s:', logScope, threadId, finalState.error);
} }
try { try {
@@ -3138,7 +3145,7 @@ export class AiAgentService {
await this.messageModel.update(sourceMessageId, { await this.messageModel.update(sourceMessageId, {
content: lastAssistantMessage.content, content: lastAssistantMessage.content,
}); });
log('execSubAgent: updated source message %s with summary', sourceMessageId); log('%s: updated source message %s with summary', logScope, sourceMessageId);
} }
// Format error for proper serialization (Error objects don't serialize with JSON.stringify) // Format error for proper serialization (Error objects don't serialize with JSON.stringify)
@@ -3161,13 +3168,14 @@ export class AiAgentService {
}); });
log( log(
'execSubAgent: thread %s completed with status %s, reason: %s', '%s: thread %s completed with status %s, reason: %s',
logScope,
threadId, threadId,
status, status,
reason, reason,
); );
} catch (error) { } catch (error) {
console.error('execSubAgent: failed to update thread on completion: %O', error); console.error('%s: failed to update thread on completion: %O', logScope, error);
} }
}, },
}; };
@@ -3181,6 +3189,7 @@ export class AiAgentService {
threadId: string, threadId: string,
startedAt: string, startedAt: string,
sourceMessageId: string, sourceMessageId: string,
logScope: 'execSubAgent' | 'execVirtualSubAgent',
): AgentHook[] { ): AgentHook[] {
let accumulatedToolCalls = 0; let accumulatedToolCalls = 0;
@@ -3207,7 +3216,7 @@ export class AiAgentService {
}, },
}); });
} catch (error) { } catch (error) {
log('Thread hook afterStep: failed to update metadata: %O', error); log('%s: thread hook afterStep failed to update metadata: %O', logScope, error);
} }
}, },
id: 'thread-metadata-update', id: 'thread-metadata-update',
@@ -3247,7 +3256,8 @@ export class AiAgentService {
if (event.reason === 'error' && finalState.error) { if (event.reason === 'error' && finalState.error) {
console.error( console.error(
'Thread hook onComplete: run failed for thread %s:', '%s: thread hook onComplete run failed for thread %s:',
logScope,
threadId, threadId,
finalState.error, finalState.error,
); );
@@ -3284,13 +3294,14 @@ export class AiAgentService {
}); });
log( log(
'Thread hook onComplete: thread %s status=%s reason=%s', '%s: thread hook onComplete thread %s status=%s reason=%s',
logScope,
threadId, threadId,
status, status,
event.reason, event.reason,
); );
} catch (error) { } catch (error) {
console.error('Thread hook onComplete: failed to update: %O', error); console.error('%s: thread hook onComplete failed to update: %O', logScope, error);
} }
}, },
id: 'thread-completion', id: 'thread-completion',
@@ -43,9 +43,9 @@ export const agentManagementRuntime: ServerRuntimeRegistration = {
): Promise<ToolExecutionResult> => { ): Promise<ToolExecutionResult> => {
const { agentId, instruction, taskTitle, timeout } = params; const { agentId, instruction, taskTitle, timeout } = params;
// Server runtime always uses the task path because there is no // Server runtime always uses the legacy async invocation path because
// client-side `registerAfterCompletion` callback available to execute // there is no client-side `registerAfterCompletion` callback available
// synchronous agent calls. // to execute synchronous agent calls.
return { return {
content: `🚀 Triggered async task to call agent "${agentId}"${taskTitle ? `: ${taskTitle}` : ''}`, content: `🚀 Triggered async task to call agent "${agentId}"${taskTitle ? `: ${taskTitle}` : ''}`,
state: { state: {
@@ -570,13 +570,13 @@ export class GeneralChatAgent implements Agent {
const { data, parentMessageId, stop } = const { data, parentMessageId, stop } =
context.payload as GeneralAgentCallToolResultPayload; context.payload as GeneralAgentCallToolResultPayload;
// Check if this is a sub-agent dispatch request (lobe-agent.callSubAgent // Legacy async agent invocation path. `callAgent({ runAsTask: true })`
// and similarly-shaped tools emit state.type=execSubAgent* with stop=true // emits state.type=execSubAgent* with stop=true so the runtime can fork
// so the runtime forks a sub-agent here). // a background agent run after the tool call is persisted.
if (stop && data?.state) { if (stop && data?.state) {
const stateType = data.state.type; const stateType = data.state.type;
// Server-side sub-agent (single) // Server-side legacy agent invocation (single)
if (stateType === 'execSubAgent') { if (stateType === 'execSubAgent') {
const { parentMessageId: execParentId, task } = data.state as { const { parentMessageId: execParentId, task } = data.state as {
parentMessageId: string; parentMessageId: string;
@@ -588,7 +588,7 @@ export class GeneralChatAgent implements Agent {
}; };
} }
// Server-side sub-agents (multiple) // Server-side legacy agent invocations (multiple)
if (stateType === 'execSubAgents') { if (stateType === 'execSubAgents') {
const { parentMessageId: execParentId, tasks } = data.state as { const { parentMessageId: execParentId, tasks } = data.state as {
parentMessageId: string; parentMessageId: string;
@@ -119,7 +119,7 @@ class AgentManagementExecutor extends BaseExecutor<typeof AgentManagementApiName
} = params; } = params;
if (runAsTask) { if (runAsTask) {
// Dispatch as a sub-agent using the lobe-agent exec_sub_agent pattern // Dispatch as a legacy async agent invocation.
// Pre-load target agent config to ensure it exists // Pre-load target agent config to ensure it exists
const targetAgentExists = useAgentStore.getState().agentMap[agentId]; const targetAgentExists = useAgentStore.getState().agentMap[agentId];
if (!targetAgentExists) { if (!targetAgentExists) {
@@ -141,8 +141,8 @@ class AgentManagementExecutor extends BaseExecutor<typeof AgentManagementApiName
} }
} }
// Return special state that will be recognized by AgentRuntime's exec_sub_agent executor // Return special state recognized by AgentRuntime's legacy exec_sub_agent executor.
// Follows the lobe-agent callSubAgent pattern: stop: true + state.type = 'execSubAgent' // callAgent keeps this alias until it is redesigned as an explicit agent invocation.
return { return {
content: `🚀 Triggered async task to call agent "${agentId}"${taskTitle ? `: ${taskTitle}` : ''}`, content: `🚀 Triggered async task to call agent "${agentId}"${taskTitle ? `: ${taskTitle}` : ''}`,
state: { state: {
@@ -153,7 +153,7 @@ class AgentManagementExecutor extends BaseExecutor<typeof AgentManagementApiName
targetAgentId: agentId, // Special field for callAgent - indicates target agent targetAgentId: agentId, // Special field for callAgent - indicates target agent
timeout: timeout || 1_800_000, timeout: timeout || 1_800_000,
}, },
type: 'execSubAgent', // Same wire-level type as lobe-agent so the runtime reuses its executor type: 'execSubAgent',
}, },
stop: true, stop: true,
success: true, success: true,
@@ -1041,17 +1041,16 @@ export const createAgentExecutors = (context: {
const stateType = result.state?.type; const stateType = result.state?.type;
// Sub-agent dispatches need to be forwarded to the Agent runtime as an // Legacy agent-invocation dispatches need to be forwarded to the Agent
// exec_sub_agent / exec_sub_agents instruction. Covers both server-side // runtime as exec_sub_agent / exec_sub_agents instructions. This covers
// (execSubAgent / execSubAgents) and client-side (execClientSubAgent / // server-side callAgent task states plus the desktop client-side variants.
// execClientSubAgents) wire-level state types. const legacyAgentInvocationStateTypes = [
const subAgentStateTypes = [
'execSubAgent', 'execSubAgent',
'execSubAgents', 'execSubAgents',
'execClientSubAgent', 'execClientSubAgent',
'execClientSubAgents', 'execClientSubAgents',
]; ];
if (subAgentStateTypes.includes(stateType)) { if (legacyAgentInvocationStateTypes.includes(stateType)) {
log( log(
'[%s][call_tool] Detected %s state, passing to Agent for decision', '[%s][call_tool] Detected %s state, passing to Agent for decision',
sessionLogId, sessionLogId,