🐛 fix(server): restore sub-agent forking in QStash step worker (#15609)

* 🐛 fix(server): restore sub-agent forking in QStash step worker

In QStash mode every agent step runs in a fresh HTTP request via the
hono `runStep` handler, which built a bare AgentRuntimeService without
the `execSubAgent` fork callback. As a result `lobe-agent.callSubAgent`
failed with SUB_AGENT_UNAVAILABLE in cloud (the in-process callback
never survives the queue boundary).

Step through AiAgentService.executeStep instead, reusing its internal
runtime that is already wired with the fork callback — no second runtime,
no manual rebinding.

Also rename the internal `execSubAgentTask` → `execSubAgent` (method,
runtime/tool context fields, options, ExecSubAgent{Params,Result} types)
to separate the "task" concept from "sub-agent", and make the method an
auto-bound arrow field so it no longer needs `.bind(this)`. The external
lambda procedure name (`execSubAgentTask`) and the client service are
left unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* ♻️ refactor(server): group runtime upward-calls into an AgentRuntimeDelegate

`execSubAgent` was a loose top-level option on AgentRuntimeService, which
hid that it is not ordinary config but an upward call: the low-level
runtime, mid-step, triggering a high-level pipeline that lives in
AiAgentService (the layer above it).

Introduce `AgentRuntimeDelegate` as the single named home for these
upward-call capabilities, and inject it as `delegate: { execSubAgent }`.
The interface doc states the convention so future "runtime must trigger a
higher-layer pipeline" capabilities land in the same place instead of
sprawling as ad-hoc options.

Scope is deliberately the injection surface (options + service field +
AiAgentService wiring). The downstream executor/tool context keeps its
flat `execSubAgent` field — the tool runner wants the unpacked capability,
not the whole delegate.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-06-10 00:41:01 +08:00
committed by GitHub
parent aa46864df6
commit 4b5e001934
15 changed files with 155 additions and 94 deletions
@@ -60,7 +60,7 @@ import {
import { chainCompressContext } from '@lobechat/prompts';
import {
type ChatToolPayload,
type ExecSubAgentTaskParams,
type ExecSubAgentParams,
type MessageToolCall,
type UIChatMessage,
} from '@lobechat/types';
@@ -286,7 +286,7 @@ const buildPostProcessUrl = (
* isolation thread (so the UI shows a loading state and the completion bridge
* has a message to backfill), then kicks off the child op asynchronously and
* returns immediately. Returns `undefined` when sub-agent execution is not
* available (no `execSubAgentTask` callback, or missing agent/topic context).
* available (no `execSubAgent` callback, or missing agent/topic context).
*/
const buildServerSubAgentRunner = (
ctx: RuntimeExecutorContext,
@@ -294,8 +294,8 @@ const buildServerSubAgentRunner = (
chatToolPayload: ChatToolPayload,
parentMessageId: string,
): ServerSubAgentRunner | undefined => {
const execSubAgentTask = ctx.execSubAgentTask;
if (!execSubAgentTask) return undefined;
const execSubAgent = ctx.execSubAgent;
if (!execSubAgent) return undefined;
const agentId = state.metadata?.agentId;
const topicId = ctx.topicId ?? state.metadata?.topicId;
@@ -319,9 +319,9 @@ const buildServerSubAgentRunner = (
});
// 2. Fork the child op anchored to the placeholder. `resumeParentOnComplete`
// tells execSubAgentTask to register the completion bridge that
// tells execSubAgent to register the completion bridge that
// backfills this tool message and resumes the parent op.
const result = (await execSubAgentTask({
const result = (await execSubAgent({
agentId: targetAgentId ?? agentId,
groupId: state.metadata?.groupId ?? undefined,
instruction,
@@ -480,7 +480,7 @@ export interface RuntimeExecutorContext {
* Injected by AiAgentService so exec_sub_agent / exec_sub_agents executors
* can dispatch callAgent-triggered tasks without a circular import.
*/
execSubAgentTask?: (params: ExecSubAgentTaskParams) => Promise<unknown>;
execSubAgent?: (params: ExecSubAgentParams) => Promise<unknown>;
hookDispatcher?: HookDispatcher;
loadAgentState?: (operationId: string) => Promise<AgentState | null>;
messageModel: MessageModel;
@@ -2402,7 +2402,7 @@ export const createRuntimeExecutors = (
activeDeviceId: state.metadata?.activeDeviceId,
agentId: state.metadata?.agentId,
documentId: state.metadata?.documentId,
execSubAgentTask: ctx.execSubAgentTask,
execSubAgent: ctx.execSubAgent,
executionTimeoutMs: timeoutMs,
groupId: state.metadata?.groupId,
memoryToolPermission: agentConfig?.chatConfig?.memory?.toolPermission,
@@ -2982,7 +2982,7 @@ export const createRuntimeExecutors = (
activeDeviceId: state.metadata?.activeDeviceId,
agentId: state.metadata?.agentId,
documentId: state.metadata?.documentId,
execSubAgentTask: ctx.execSubAgentTask,
execSubAgent: ctx.execSubAgent,
executionTimeoutMs: timeoutMs,
groupId: state.metadata?.groupId,
memoryToolPermission: batchAgentConfig?.chatConfig?.memory?.toolPermission,
@@ -3349,7 +3349,7 @@ export const createRuntimeExecutors = (
* Mirrors the client-side exec_sub_agent executor in createAgentExecutors.ts
* but runs entirely server-side (no polling required). Flow:
* 1. Create a task message (role: 'task') as a placeholder visible in the UI.
* 2. Fire execSubAgentTask via the injected callback so the sub-agent runs as
* 2. Fire execSubAgent via the injected callback so the sub-agent runs as
* an independent QStash operation.
* 3. Return a sub_agent_result context so GeneralChatAgent calls the LLM once
* more and the parent agent can acknowledge the delegation.
@@ -3390,9 +3390,9 @@ export const createRuntimeExecutors = (
const effectiveTaskMessageId = taskMessageId ?? parentMessageId;
let dispatched = false;
if (ctx.execSubAgentTask && topicId && agentId) {
if (ctx.execSubAgent && topicId && agentId) {
try {
await ctx.execSubAgentTask({
await ctx.execSubAgent({
agentId: targetAgentId,
groupId: state.metadata?.groupId ?? undefined,
instruction: task.instruction,
@@ -3417,7 +3417,7 @@ export const createRuntimeExecutors = (
}
}
} else {
log('[%s] execSubAgentTask not available, skipping sub-agent dispatch', taskLogId);
log('[%s] execSubAgent not available, skipping sub-agent dispatch', taskLogId);
}
return {
@@ -3447,7 +3447,7 @@ export const createRuntimeExecutors = (
* Server-side exec_sub_agents executor
*
* Same as exec_sub_agent but for a batch. Each sub-agent is fired
* independently via execSubAgentTask and a task message is created for each.
* independently via execSubAgent and a task message is created for each.
*/
exec_sub_agents: async (instruction, state) => {
const { payload } = instruction as AgentInstructionExecSubAgents;
@@ -3489,9 +3489,9 @@ export const createRuntimeExecutors = (
}
let taskDispatched = false;
if (ctx.execSubAgentTask && topicId && agentId) {
if (ctx.execSubAgent && topicId && agentId) {
try {
await ctx.execSubAgentTask({
await ctx.execSubAgent({
agentId: targetAgentId,
groupId: state.metadata?.groupId ?? undefined,
instruction: task.instruction,
@@ -51,7 +51,7 @@ export const getDefaultReasonDetail = (finalState: any, reason?: string): string
*
* - `messages` — canonical copy lives in the DB (UIChatMessage rows)
* and the runtime in-memory state; in-process consumers that need
* it (e.g. `execSubAgentTask.onComplete`) receive the full state
* it (e.g. `execSubAgent.onComplete`) receive the full state
* via the local `HookContext` channel, not via the stream.
* - `operationToolSet`, `toolManifestMap`, `toolSourceMap`, `tools`
* — operation-level snapshot; back-compat copies of one struct.
@@ -4717,13 +4717,13 @@ describe('RuntimeExecutors', () => {
expect((result.nextContext?.payload as any).stop).toBe(true);
});
it('exec_sub_agent executor creates task message and calls execSubAgentTask callback', async () => {
it('exec_sub_agent executor creates task message and calls execSubAgent callback', async () => {
const mockExecSubAgentTask = vi
.fn()
.mockResolvedValue({ success: true, operationId: 'child-op', threadId: 'thread-child' });
const ctxWithCallback = {
...ctx,
execSubAgentTask: mockExecSubAgentTask,
execSubAgent: mockExecSubAgentTask,
topicId: 'topic-123',
};
@@ -4755,7 +4755,7 @@ describe('RuntimeExecutors', () => {
}),
);
// execSubAgentTask callback fired with targetAgentId
// execSubAgent callback fired with targetAgentId
expect(mockExecSubAgentTask).toHaveBeenCalledWith(
expect.objectContaining({
agentId: 'target-agent-id',
@@ -4769,7 +4769,7 @@ describe('RuntimeExecutors', () => {
expect(result.nextContext?.phase).toBe('sub_agent_result');
});
it('exec_sub_agent gracefully skips dispatch when execSubAgentTask not injected', async () => {
it('exec_sub_agent gracefully skips dispatch when execSubAgent not injected', async () => {
// No callback injected (e.g. in tests that don't set it up)
const executors = createRuntimeExecutors(ctx);
const state = createMockState();
@@ -18,7 +18,7 @@ vi.mock('@/database/core/db-adaptor', () => ({
const mockExecGroupSubAgentTask = vi.fn();
vi.mock('@/server/services/aiAgent', () => ({
AiAgentService: vi.fn().mockImplementation(() => ({
execSubAgentTask: mockExecGroupSubAgentTask,
execSubAgent: mockExecGroupSubAgentTask,
})),
}));
@@ -20,7 +20,7 @@ const mockExecGroupSubAgentTask = vi.fn();
const mockInterruptTask = vi.fn();
vi.mock('@/server/services/aiAgent', () => ({
AiAgentService: vi.fn().mockImplementation(() => ({
execSubAgentTask: mockExecGroupSubAgentTask,
execSubAgent: mockExecGroupSubAgentTask,
interruptTask: mockInterruptTask,
})),
}));
+2 -1
View File
@@ -836,7 +836,8 @@ export const aiAgentRouter = router({
log('execSubAgentTask: agentId=%s, groupId=%s', agentId, groupId);
try {
return await ctx.aiAgentService.execSubAgentTask({
// External procedure name stays `execSubAgentTask`; the service method is `execSubAgent`.
return await ctx.aiAgentService.execSubAgent({
agentId,
groupId,
instruction,
@@ -25,11 +25,7 @@ import {
invokeAgentSpanName,
tracer as agentRuntimeTracer,
} from '@lobechat/observability-otel/modules/agent-runtime';
import {
type ChatToolPayload,
type ExecSubAgentTaskParams,
type UIChatMessage,
} from '@lobechat/types';
import { type ChatToolPayload, type ExecSubAgentParams, type UIChatMessage } from '@lobechat/types';
import debug from 'debug';
import urlJoin from 'url-join';
@@ -93,6 +89,32 @@ const toAgentSignalSnapshotEvents = (
});
};
/**
* Operations the runtime delegates UP to its owning layer (AiAgentService).
*
* The dependency arrow is one-way: AiAgentService AgentRuntimeService. The
* runtime is the low-level step executor it cannot resolve agent configs,
* build tool engines, manage threads, or run the full `execAgent` pipeline;
* those live in the layer above it. Yet some tools (e.g. `lobe-agent.callSubAgent`)
* need exactly such a high-level action *mid-step*. Rather than import
* AiAgentService (which would be a circular dependency), the runtime delegates
* these operations back to its owner through callbacks injected here.
*
* Convention: every future "the runtime, mid-execution, must trigger a
* higher-layer pipeline" capability belongs on this delegate not as a loose
* top-level option. One named home for the whole upward-call surface.
*/
export interface AgentRuntimeDelegate {
/**
* Fork a sub-agent through the full high-level pipeline
* (AiAgentService.execSubAgent execAgent: agent-config resolution, tool
* engine, context engineering, createOperation). Returns a deferred result;
* the parent op parks (`waiting_for_async_tool`) until the completion bridge
* backfills the placeholder and resumes it.
*/
execSubAgent?: (params: ExecSubAgentParams) => Promise<unknown>;
}
export interface AgentRuntimeServiceOptions {
/**
* Custom agent factory. When provided, this function is called instead of
@@ -107,11 +129,12 @@ export interface AgentRuntimeServiceOptions {
*/
coordinatorOptions?: AgentRuntimeCoordinatorOptions;
/**
* Callback to spawn a sub-agent task from within a running server-side agent.
* Injected by AiAgentService to wire up the exec_task / exec_tasks executors
* without creating a circular import between RuntimeExecutors and AiAgentService.
* Operations the runtime delegates up to its owning layer. See
* {@link AgentRuntimeDelegate}. Injected by AiAgentService so the runtime can
* trigger high-level pipelines (e.g. sub-agent forking) mid-step without a
* circular import.
*/
execSubAgentTask?: (params: ExecSubAgentTaskParams) => Promise<unknown>;
delegate?: AgentRuntimeDelegate;
/**
* Custom QueueService
* Set to null to disable queue scheduling (for synchronous execution tests)
@@ -156,7 +179,7 @@ export class AgentRuntimeService {
private agentFactory?: (config: GeneralAgentConfig) => Agent;
private completionLifecycle: CompletionLifecycle;
private coordinator: AgentRuntimeCoordinator;
private execSubAgentTaskCallback?: (params: ExecSubAgentTaskParams) => Promise<unknown>;
private delegate: AgentRuntimeDelegate;
private humanIntervention: HumanInterventionHandler;
private streamManager: IStreamEventManager;
private queueService: QueueService | null;
@@ -207,7 +230,7 @@ export class AgentRuntimeService {
options?.snapshotStore ?? this.createDefaultSnapshotStore(),
);
this.agentFactory = options?.agentFactory;
this.execSubAgentTaskCallback = options?.execSubAgentTask;
this.delegate = options?.delegate ?? {};
this.serverDB = db;
this.userId = userId;
this.workspaceId = options?.workspaceId;
@@ -1661,7 +1684,7 @@ export class AgentRuntimeService {
discordContext: metadata?.discordContext,
userTimezone: metadata?.userTimezone,
evalContext: metadata?.evalContext,
execSubAgentTask: this.execSubAgentTaskCallback,
execSubAgent: this.delegate.execSubAgent,
hookDispatcher,
loadAgentState: this.coordinator.loadAgentState.bind(this.coordinator),
messageModel: this.messageModel,
@@ -213,7 +213,7 @@ export interface OperationCreationParams {
operationSkillSet?: OperationSkillSet;
/**
* Operation ID of the parent run when this operation is a sub-agent
* invocation (e.g. spawned via `execSubAgentTask`). Persisted to
* invocation (e.g. spawned via `execSubAgent`). Persisted to
* `agent_operations.parent_operation_id` so analytics can join the
* sub-tree back to its root.
*/
@@ -92,7 +92,7 @@ vi.mock('@/server/modules/ModelRuntime', () => ({
initModelRuntimeFromDB: vi.fn(),
}));
describe('AiAgentService.execSubAgentTask', () => {
describe('AiAgentService.execSubAgent', () => {
let service: AiAgentService;
const mockDb = {} as any;
const userId = 'test-user-id';
@@ -133,7 +133,7 @@ describe('AiAgentService.execSubAgentTask', () => {
userMessageId: 'user-msg-1',
});
await service.execSubAgentTask({
await service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
@@ -166,7 +166,7 @@ describe('AiAgentService.execSubAgentTask', () => {
userMessageId: 'user-msg-1',
});
await service.execSubAgentTask({
await service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
@@ -196,7 +196,7 @@ describe('AiAgentService.execSubAgentTask', () => {
userMessageId: 'user-msg-1',
});
await service.execSubAgentTask({
await service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
@@ -239,7 +239,7 @@ describe('AiAgentService.execSubAgentTask', () => {
userMessageId: 'user-msg-1',
});
await service.execSubAgentTask({
await service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
@@ -269,7 +269,7 @@ describe('AiAgentService.execSubAgentTask', () => {
userMessageId: 'user-msg-1',
});
const result = await service.execSubAgentTask({
const result = await service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
@@ -304,7 +304,7 @@ describe('AiAgentService.execSubAgentTask', () => {
userMessageId: 'user-msg-1',
});
await service.execSubAgentTask({
await service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
@@ -341,7 +341,7 @@ describe('AiAgentService.execSubAgentTask', () => {
userMessageId: 'user-msg-1',
});
await service.execSubAgentTask({
await service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
@@ -379,7 +379,7 @@ describe('AiAgentService.execSubAgentTask', () => {
userMessageId: 'user-msg-1',
});
const result = await service.execSubAgentTask({
const result = await service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
@@ -402,7 +402,7 @@ describe('AiAgentService.execSubAgentTask', () => {
mockThreadModel.create.mockResolvedValue(null);
await expect(
service.execSubAgentTask({
service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
@@ -416,7 +416,7 @@ describe('AiAgentService.execSubAgentTask', () => {
mockThreadModel.create.mockRejectedValue(new Error('Database connection failed'));
await expect(
service.execSubAgentTask({
service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
@@ -444,7 +444,7 @@ describe('AiAgentService.execSubAgentTask', () => {
userMessageId: 'user-msg-1',
});
await service.execSubAgentTask({
await service.execSubAgent({
agentId: 'agent-1',
groupId: 'group-1',
instruction: 'Test instruction',
+44 -20
View File
@@ -34,8 +34,8 @@ import type {
ExecAgentResult,
ExecGroupAgentParams,
ExecGroupAgentResult,
ExecSubAgentTaskParams,
ExecSubAgentTaskResult,
ExecSubAgentParams,
ExecSubAgentResult,
LobeAgentAgencyConfig,
MessagePluginItem,
UserInterventionConfig,
@@ -71,7 +71,11 @@ import { createServerAgentToolsEngine } from '@/server/modules/Mecha';
import type { ServerUserMemoryConfig } from '@/server/modules/Mecha/ContextEngineering/types';
import { AgentService } from '@/server/services/agent';
import { AgentDocumentsService } from '@/server/services/agentDocuments';
import type { AgentRuntimeServiceOptions } from '@/server/services/agentRuntime';
import type {
AgentExecutionParams,
AgentExecutionResult,
AgentRuntimeServiceOptions,
} from '@/server/services/agentRuntime';
import { AgentRuntimeService } from '@/server/services/agentRuntime';
import { getAbortError, isAbortError, throwIfAborted } from '@/server/services/agentRuntime/abort';
import { hookDispatcher } from '@/server/services/agentRuntime/hooks';
@@ -298,7 +302,17 @@ export class AiAgentService {
this.topicModel = new TopicModel(db, userId, wsId);
this.agentRuntimeService = new AgentRuntimeService(db, userId, {
...options?.runtimeOptions,
execSubAgentTask: this.execSubAgentTask.bind(this),
// ── Runtime delegate ─────────────────────────────────────────────────
// Operations the runtime delegates back UP to this layer. The dependency
// arrow is one-way (AiAgentService → AgentRuntimeService), so the runtime
// can't import us; instead we hand it the callbacks it needs to trigger
// high-level pipelines mid-step. See AgentRuntimeDelegate. New high-level
// capabilities the runtime calls into go in this `delegate` object.
//
// `execSubAgent` is an auto-bound arrow field, so no `.bind(this)`.
delegate: {
execSubAgent: this.execSubAgent,
},
workspaceId: wsId,
});
this.marketService = new MarketService({ userInfo: { userId } });
@@ -388,6 +402,18 @@ export class AiAgentService {
}
}
/**
* Execute a single agent step against this service's runtime.
*
* Delegates to the internal AgentRuntimeService, which is already wired with
* the `execSubAgent` fork callback. The QStash step worker drives stepping
* through here so `lobe-agent.callSubAgent` can fork sub-agents building a
* bare runtime there would lose the callback and fail with SUB_AGENT_UNAVAILABLE.
*/
executeStep(params: AgentExecutionParams): Promise<AgentExecutionResult> {
return this.agentRuntimeService.executeStep(params);
}
/**
* Execute agent with just a prompt
*
@@ -2721,7 +2747,9 @@ export class AiAgentService {
* 2. Delegate to execAgent with threadId in appContext
* 3. Store operationId in Thread metadata
*/
async execSubAgentTask(params: ExecSubAgentTaskParams): Promise<ExecSubAgentTaskResult> {
// Arrow field (not a method) so it stays bound to this instance when handed to
// AgentRuntimeService as the `execSubAgent` fork callback — no `.bind(this)`.
execSubAgent = async (params: ExecSubAgentParams): Promise<ExecSubAgentResult> => {
const {
groupId,
topicId,
@@ -2734,7 +2762,7 @@ export class AiAgentService {
} = params;
log(
'execSubAgentTask: agentId=%s, groupId=%s, topicId=%s, instruction=%s',
'execSubAgent: agentId=%s, groupId=%s, topicId=%s, instruction=%s',
agentId,
groupId,
topicId,
@@ -2767,7 +2795,7 @@ export class AiAgentService {
throw new Error('Failed to create thread for task execution');
}
log('execSubAgentTask: created thread %s', thread.id);
log('execSubAgent: created thread %s', thread.id);
// 2. Update Thread status to processing with startedAt timestamp
const startedAt = new Date().toISOString();
@@ -2803,7 +2831,7 @@ export class AiAgentService {
).findById(parentOperationId);
inheritedTrigger = parentOp?.trigger ?? undefined;
} catch (error) {
log('execSubAgentTask: failed to read parent operation trigger: %O', error);
log('execSubAgent: failed to read parent operation trigger: %O', error);
}
}
@@ -2822,7 +2850,7 @@ export class AiAgentService {
});
log(
'execSubAgentTask: delegated to execAgent, operationId=%s, success=%s',
'execSubAgent: delegated to execAgent, operationId=%s, success=%s',
result.operationId,
result.success,
);
@@ -2878,7 +2906,7 @@ export class AiAgentService {
success: result.success ?? false,
threadId: thread.id,
};
}
};
/**
* Create step lifecycle callbacks for updating Thread metadata
@@ -2917,13 +2945,9 @@ export class AiAgentService {
totalToolCalls: accumulatedToolCalls,
},
});
log(
'execSubAgentTask: updated thread %s metadata after step %d',
threadId,
state.stepCount,
);
log('execSubAgent: updated thread %s metadata after step %d', threadId, state.stepCount);
} catch (error) {
log('execSubAgentTask: failed to update thread metadata: %O', error);
log('execSubAgent: failed to update thread metadata: %O', error);
}
},
@@ -2957,7 +2981,7 @@ export class AiAgentService {
// Log error when task fails
if (reason === 'error' && finalState.error) {
console.error('execSubAgentTask: task failed for thread %s:', threadId, finalState.error);
console.error('execSubAgent: task failed for thread %s:', threadId, finalState.error);
}
try {
@@ -2971,7 +2995,7 @@ export class AiAgentService {
await this.messageModel.update(sourceMessageId, {
content: lastAssistantMessage.content,
});
log('execSubAgentTask: updated task message %s with summary', sourceMessageId);
log('execSubAgent: updated task message %s with summary', sourceMessageId);
}
// Format error for proper serialization (Error objects don't serialize with JSON.stringify)
@@ -2994,13 +3018,13 @@ export class AiAgentService {
});
log(
'execSubAgentTask: thread %s completed with status %s, reason: %s',
'execSubAgent: thread %s completed with status %s, reason: %s',
threadId,
status,
reason,
);
} catch (error) {
console.error('execSubAgentTask: failed to update thread on completion: %O', error);
console.error('execSubAgent: failed to update thread on completion: %O', error);
}
},
};
@@ -3,7 +3,7 @@ import { type LobeChatDatabase } from '@lobechat/database';
import {
type ChatToolPayload,
type ClientSecretPayload,
type ExecSubAgentTaskParams,
type ExecSubAgentParams,
} from '@lobechat/types';
export interface ToolExecutionMemoryEmbeddingRuntime {
@@ -62,10 +62,10 @@ export interface ToolExecutionContext {
documentId?: string | null;
/**
* Spawn a sub-agent as an independent async operation. Injected by the agent
* runtime (forwarded from `RuntimeExecutorContext.execSubAgentTask`) so the
* runtime (forwarded from `RuntimeExecutorContext.execSubAgent`) so the
* `callSubAgent` server tool can fork a child op without a circular import.
*/
execSubAgentTask?: (params: ExecSubAgentTaskParams) => Promise<unknown>;
execSubAgent?: (params: ExecSubAgentParams) => Promise<unknown>;
/** Per-call execution timeout resolved by the agent runtime. */
executionTimeoutMs?: number;
/** Current group ID for group chat context */
+8 -8
View File
@@ -248,13 +248,13 @@ export interface ExecGroupAgentResponse {
// ============ SubAgent Task Execution Types ============
/**
* Parameters for execSubAgentTask - execute SubAgent task
* Parameters for execSubAgent - execute SubAgent task
* Supports both Group mode and Single Agent mode
*
* - Group mode: pass groupId, Thread will be associated with the Group
* - Single Agent mode: omit groupId, Thread will only be associated with the Agent
*/
export interface ExecSubAgentTaskParams {
export interface ExecSubAgentParams {
/** The SubAgent ID to execute the task */
agentId: string;
/** The Group ID (optional, only for Group mode) */
@@ -282,9 +282,9 @@ export interface ExecSubAgentTaskParams {
}
/**
* Result from execSubAgentTask
* Result from execSubAgent
*/
export interface ExecSubAgentTaskResult {
export interface ExecSubAgentResult {
/** The assistant message ID created for this task */
assistantMessageId: string;
/** Error message if task failed to start */
@@ -298,14 +298,14 @@ export interface ExecSubAgentTaskResult {
}
/**
* @deprecated Use ExecSubAgentTaskParams instead
* @deprecated Use ExecSubAgentParams instead
*/
export type ExecGroupSubAgentTaskParams = ExecSubAgentTaskParams;
export type ExecGroupSubAgentTaskParams = ExecSubAgentParams;
/**
* @deprecated Use ExecSubAgentTaskResult instead
* @deprecated Use ExecSubAgentResult instead
*/
export type ExecGroupSubAgentTaskResult = ExecSubAgentTaskResult;
export type ExecGroupSubAgentTaskResult = ExecSubAgentResult;
/**
* Current activity for real-time progress display
+2 -2
View File
@@ -140,8 +140,8 @@ export type {
ExecGroupAgentResult,
ExecGroupSubAgentTaskParams,
ExecGroupSubAgentTaskResult,
ExecSubAgentTaskParams,
ExecSubAgentTaskResult,
ExecSubAgentParams,
ExecSubAgentResult,
TaskCurrentActivity,
TaskStatusResult,
} from '../agentExecution';
@@ -1,7 +1,7 @@
// @vitest-environment node
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { AgentRuntimeService } from '@/server/services/agentRuntime';
import { AiAgentService } from '@/server/services/aiAgent';
import { runStep, runStepHealth } from '../runStep';
@@ -14,8 +14,8 @@ vi.mock('@/server/modules/AgentRuntime', () => ({
})),
}));
vi.mock('@/server/services/agentRuntime', () => ({
AgentRuntimeService: vi.fn().mockImplementation(() => ({
vi.mock('@/server/services/aiAgent', () => ({
AiAgentService: vi.fn().mockImplementation(() => ({
executeStep: mockExecuteStep,
})),
}));
@@ -86,11 +86,16 @@ describe('runStep handler', () => {
expect(mockExecuteStep).not.toHaveBeenCalled();
});
it('constructs AgentRuntimeService with the workspaceId from operation metadata', async () => {
// Regression: a workspace-scoped binding (e.g. Discord bot active agent) runs
// its steps through this QStash worker. Dropping workspaceId here makes the
// runtime personal-scoped, so the parent-message lookup misses the
// workspace-scoped row and throws ConversationParentMissing.
it('steps through AiAgentService scoped to the operation workspace', async () => {
// Regression (two invariants in one path):
// 1. workspaceId — a workspace-scoped binding (e.g. Discord bot active agent)
// runs its steps through this QStash worker. Dropping it makes the runtime
// personal-scoped, so the parent-message lookup misses the workspace-scoped
// row → ConversationParentMissing.
// 2. sub-agent forking — stepping MUST go through AiAgentService (not a bare
// AgentRuntimeService), because only AiAgentService's runtime carries the
// in-process `execSubAgent` fork callback. A bare runtime here makes
// `lobe-agent.callSubAgent` fail with SUB_AGENT_UNAVAILABLE.
mockGetOperationMetadata.mockResolvedValue({ userId: 'user-1', workspaceId: 'ws-1' });
mockExecuteStep.mockResolvedValue({
nextStepScheduled: false,
@@ -101,11 +106,14 @@ describe('runStep handler', () => {
const { ctx } = buildContext({ body: validBody });
await runStep(ctx);
expect(AgentRuntimeService).toHaveBeenCalledWith(
expect(AiAgentService).toHaveBeenCalledWith(
expect.anything(),
'user-1',
expect.objectContaining({ workspaceId: 'ws-1' }),
);
expect(mockExecuteStep).toHaveBeenCalledWith(
expect.objectContaining({ operationId: 'op-1', stepIndex: 2 }),
);
});
it('returns 429 with Retry-After header when the step is locked', async () => {
+8 -3
View File
@@ -3,7 +3,7 @@ import type { Context } from 'hono';
import { getServerDB } from '@/database/core/db-adaptor';
import { AgentRuntimeCoordinator } from '@/server/modules/AgentRuntime';
import { AgentRuntimeService } from '@/server/services/agentRuntime';
import { AiAgentService } from '@/server/services/aiAgent';
const log = debug('lobe-server:agent:run-step');
@@ -59,14 +59,19 @@ export async function runStep(c: Context): Promise<Response> {
}
const serverDB = await getServerDB();
// Step through AiAgentService so the runtime keeps its `execSubAgent`
// fork callback (needed by `lobe-agent.callSubAgent`). In QStash mode every
// step is a fresh HTTP request, and a bare AgentRuntimeService would lose the
// in-process callback → SUB_AGENT_UNAVAILABLE.
//
// Thread the operation's workspace through so the runtime's models stay
// workspace-scoped. Without it the worker is personal-scoped and the
// parent-message lookup misses workspace-scoped rows → ConversationParentMissing.
const agentRuntimeService = new AgentRuntimeService(serverDB, metadata.userId, {
const aiAgentService = new AiAgentService(serverDB, metadata.userId, {
workspaceId: metadata.workspaceId,
});
const result = await agentRuntimeService.executeStep({
const result = await aiAgentService.executeStep({
approvedToolCall,
context,
externalRetryCount,