From 3dfb28cc45d51d06f11cefe201dfd48878c73552 Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Tue, 27 Jan 2026 12:58:19 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat:=20group=20support=20client=20?= =?UTF-8?q?agent=20task=20(#11875)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * support group sub-task * fix optimisticCreateTmpMessage issue * ✨ feat: add createClientGroupAgentTaskThread router for group chat Add dedicated router for Group Chat sub-agent task execution that: - Uses subAgentId instead of agentId for worker agent identification - Does not filter thread messages by agentId (allows messages from different agents) - Queries main messages by groupId + topicId only (not agentId) This fixes the issue where thread messages query was filtering out parent messages from other agents (e.g., supervisor) in Group Chat scenarios. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 * fix tests --------- Co-authored-by: Claude Opus 4.5 --- .../GroupOrchestrationSupervisor.ts | 25 +- .../src/groupOrchestration/types.ts | 19 +- .../src/const.ts | 1 + .../src/executor.ts | 22 +- .../src/manifest.ts | 83 +-- .../src/types.ts | 5 + packages/types/src/tool/builtin.ts | 5 + .../webapi/chat/[provider]/route.test.ts | 9 +- .../Messages/Task/ClientTaskDetail/index.tsx | 15 +- .../Conversation/Messages/Task/index.tsx | 9 +- .../Tasks/TaskItem/ClientTaskItem.tsx | 16 +- .../Tasks/TaskItem/useClientTaskStats.ts | 15 +- ...ntGroupAgentTaskThread.integration.test.ts | 423 ++++++++++++++ ...createClientTaskThread.integration.test.ts | 531 ++++++++++++++++++ src/server/routers/lambda/aiAgent.ts | 129 ++++- src/services/aiAgent.ts | 31 +- .../createGroupOrchestrationExecutors.ts | 232 +++++++- .../aiAgent/actions/groupOrchestration.ts | 17 +- .../message/actions/optimisticUpdate.ts | 2 +- src/store/chat/store.ts | 4 + src/store/chat/utils/messageMapKey.ts | 12 + 21 files changed, 1519 insertions(+), 86 deletions(-) create mode 100644 packages/builtin-tool-group-management/src/const.ts create mode 100644 src/server/routers/lambda/__tests__/integration/aiAgent.createClientGroupAgentTaskThread.integration.test.ts create mode 100644 src/server/routers/lambda/__tests__/integration/aiAgent.createClientTaskThread.integration.test.ts diff --git a/packages/agent-runtime/src/groupOrchestration/GroupOrchestrationSupervisor.ts b/packages/agent-runtime/src/groupOrchestration/GroupOrchestrationSupervisor.ts index da9a50339c..57b957b2a9 100644 --- a/packages/agent-runtime/src/groupOrchestration/GroupOrchestrationSupervisor.ts +++ b/packages/agent-runtime/src/groupOrchestration/GroupOrchestrationSupervisor.ts @@ -7,6 +7,7 @@ import type { SupervisorInstructionCallSupervisor, SupervisorInstructionDelegate, SupervisorInstructionExecAsyncTask, + SupervisorInstructionExecClientAsyncTask, SupervisorInstructionFinish, SupervisorInstructionParallelCallAgents, } from './types'; @@ -96,14 +97,24 @@ export class GroupOrchestrationSupervisor implements IGroupOrchestrationSupervis } case 'execute_task': { + const instructionPayload = { + agentId: params.agentId as string, + task: params.task as string, + timeout: params.timeout as number | undefined, + title: params.title as string | undefined, + toolMessageId: params.toolMessageId as string, + }; + + // Return different instruction type based on runInClient flag + if (params.runInClient) { + return { + payload: instructionPayload, + type: 'exec_client_async_task', + } as SupervisorInstructionExecClientAsyncTask; + } + return { - payload: { - agentId: params.agentId as string, - task: params.task as string, - timeout: params.timeout as number | undefined, - title: params.title as string | undefined, - toolMessageId: params.toolMessageId as string, - }, + payload: instructionPayload, type: 'exec_async_task', } as SupervisorInstructionExecAsyncTask; } diff --git a/packages/agent-runtime/src/groupOrchestration/types.ts b/packages/agent-runtime/src/groupOrchestration/types.ts index 9081c18a57..69e4a8b3d0 100644 --- a/packages/agent-runtime/src/groupOrchestration/types.ts +++ b/packages/agent-runtime/src/groupOrchestration/types.ts @@ -48,7 +48,7 @@ export interface SupervisorInstructionParallelCallAgents { } /** - * Instruction to execute an async task for an agent + * Instruction to execute an async task for an agent (server-side) */ export interface SupervisorInstructionExecAsyncTask { payload: { @@ -62,6 +62,22 @@ export interface SupervisorInstructionExecAsyncTask { type: 'exec_async_task'; } +/** + * Instruction to execute an async task for an agent on the client (desktop only) + * Used when task requires local tools like file system or shell commands + */ +export interface SupervisorInstructionExecClientAsyncTask { + payload: { + agentId: string; + task: string; + timeout?: number; + /** Task title (shown in UI, used as thread title) */ + title?: string; + toolMessageId: string; + }; + type: 'exec_client_async_task'; +} + /** * Instruction to execute multiple async tasks in parallel */ @@ -106,6 +122,7 @@ export type SupervisorInstruction = | SupervisorInstructionCallAgent | SupervisorInstructionParallelCallAgents | SupervisorInstructionExecAsyncTask + | SupervisorInstructionExecClientAsyncTask | SupervisorInstructionBatchExecAsyncTasks | SupervisorInstructionDelegate | SupervisorInstructionFinish; diff --git a/packages/builtin-tool-group-management/src/const.ts b/packages/builtin-tool-group-management/src/const.ts new file mode 100644 index 0000000000..7531177a7e --- /dev/null +++ b/packages/builtin-tool-group-management/src/const.ts @@ -0,0 +1 @@ +export const isDesktop = process.env.NEXT_PUBLIC_IS_DESKTOP_APP === '1'; diff --git a/packages/builtin-tool-group-management/src/executor.ts b/packages/builtin-tool-group-management/src/executor.ts index c69eb62268..e165e333f3 100644 --- a/packages/builtin-tool-group-management/src/executor.ts +++ b/packages/builtin-tool-group-management/src/executor.ts @@ -124,16 +124,19 @@ class GroupManagementExecutor extends BaseExecutor => { + const { agentId, task, timeout, skipCallSupervisor, runInClient } = params; + // Register afterCompletion callback to trigger async task execution after AgentRuntime completes // This follows the same pattern as speak/broadcast - trigger mode, not blocking if (ctx.groupOrchestration && ctx.agentId && ctx.registerAfterCompletion) { ctx.registerAfterCompletion(() => ctx.groupOrchestration!.triggerExecuteTask({ - agentId: params.agentId, - skipCallSupervisor: params.skipCallSupervisor, + agentId, + runInClient, + skipCallSupervisor, supervisorAgentId: ctx.agentId!, - task: params.task, - timeout: params.timeout, + task, + timeout, toolMessageId: ctx.messageId, }), ); @@ -141,12 +144,13 @@ class GroupManagementExecutor extends BaseExecutor { azureApiVersion: 'v1', }); - const mockRuntime: LobeRuntimeAI = { baseURL: 'abc', chat: vi.fn() }; + // chat mock 需要返回一个 Response 对象,否则中间件访问 res.headers 会报错 + const mockChatResponse = new Response(JSON.stringify({ success: true }), { + headers: { 'Content-Type': 'application/json' }, + }); + const mockRuntime: LobeRuntimeAI = { + baseURL: 'abc', + chat: vi.fn().mockResolvedValue(mockChatResponse), + }; // Mock initModelRuntimeFromDB vi.mocked(initModelRuntimeFromDB).mockResolvedValue(new ModelRuntime(mockRuntime)); diff --git a/src/features/Conversation/Messages/Task/ClientTaskDetail/index.tsx b/src/features/Conversation/Messages/Task/ClientTaskDetail/index.tsx index 15f99c32cd..f8801ff4da 100644 --- a/src/features/Conversation/Messages/Task/ClientTaskDetail/index.tsx +++ b/src/features/Conversation/Messages/Task/ClientTaskDetail/index.tsx @@ -15,29 +15,38 @@ import InstructionAccordion from './InstructionAccordion'; import ProcessingState from './ProcessingState'; interface ClientTaskDetailProps { + /** Agent ID from the task message (use task's agentId, not activeAgentId) */ + agentId?: string; content?: string; + /** Group ID from the task message (use task's groupId, not activeGroupId) */ + groupId?: string; messageId: string; taskDetail?: TaskDetail; } -const ClientTaskDetail = memo(({ taskDetail }) => { +const ClientTaskDetail = memo(({ agentId: propAgentId, groupId, taskDetail }) => { const threadId = taskDetail?.threadId; const isExecuting = taskDetail?.status === ThreadStatus.Processing; + // Use task message's agentId to query with the correct SubAgent ID that created the thread + // Fall back to activeAgentId if task message doesn't have agentId (shouldn't happen normally) const [activeAgentId, activeTopicId, useFetchMessages] = useChatStore((s) => [ s.activeAgentId, s.activeTopicId, s.useFetchMessages, ]); + const agentId = propAgentId || activeAgentId; + const threadContext = useMemo( () => ({ - agentId: activeAgentId, + agentId, + groupId, scope: 'thread' as const, threadId, topicId: activeTopicId, }), - [activeAgentId, activeTopicId, threadId], + [agentId, groupId, activeTopicId, threadId], ); const threadMessageKey = useMemo( diff --git a/src/features/Conversation/Messages/Task/index.tsx b/src/features/Conversation/Messages/Task/index.tsx index 03661869bd..a55afebdcc 100644 --- a/src/features/Conversation/Messages/Task/index.tsx +++ b/src/features/Conversation/Messages/Task/index.tsx @@ -36,7 +36,7 @@ const TaskMessage = memo(({ id, index, disableEditing, isLates const item = useConversationStore(dataSelectors.getDisplayMessageById(id), isEqual)!; const actionsConfig = useConversationStore((s) => s.actionsBar?.assistant); - const { agentId, error, role, content, createdAt, metadata, taskDetail } = item; + const { agentId, groupId, error, role, content, createdAt, metadata, taskDetail } = item; const avatar = useAgentMeta(agentId); @@ -97,7 +97,12 @@ const TaskMessage = memo(({ id, index, disableEditing, isLates titleAddon={{t('task.subtask')}} > {taskDetail?.clientMode ? ( - + ) : ( (({ item }) => { - const { id, metadata, taskDetail } = item; + const { id, agentId: itemAgentId, groupId: itemGroupId, metadata, taskDetail } = item; const [expanded, setExpanded] = useState(false); const title = taskDetail?.title || metadata?.taskTitle; @@ -32,21 +32,29 @@ const ClientTaskItem = memo(({ item }) => { const isError = status === ThreadStatus.Failed || status === ThreadStatus.Cancel; const isInitializing = !taskDetail || !status; - // Fetch thread messages for client mode (like Task/ClientTaskDetail) + // Fetch thread messages for client mode + // Use item's agentId (from task message) to query with the correct SubAgent ID that created the thread + // Fall back to activeAgentId if task message doesn't have agentId (shouldn't happen normally) const [activeAgentId, activeTopicId, useFetchMessages] = useChatStore((s) => [ s.activeAgentId, s.activeTopicId, s.useFetchMessages, ]); + // Use task message's agentId (skip 'supervisor' as it's not a valid agent ID for queries) + // Fall back to activeAgentId if not available + const agentId = + itemAgentId && itemAgentId !== 'supervisor' ? itemAgentId : activeAgentId; + const threadContext = useMemo( () => ({ - agentId: activeAgentId, + agentId, + groupId: itemGroupId, scope: 'thread' as const, threadId, topicId: activeTopicId, }), - [activeAgentId, activeTopicId, threadId], + [agentId, itemGroupId, activeTopicId, threadId], ); const threadMessageKey = useMemo( diff --git a/src/features/Conversation/Messages/Tasks/TaskItem/useClientTaskStats.ts b/src/features/Conversation/Messages/Tasks/TaskItem/useClientTaskStats.ts index 2f5b1f286d..271bc431d3 100644 --- a/src/features/Conversation/Messages/Tasks/TaskItem/useClientTaskStats.ts +++ b/src/features/Conversation/Messages/Tasks/TaskItem/useClientTaskStats.ts @@ -14,7 +14,11 @@ export interface ClientTaskStats { } interface UseClientTaskStatsOptions { + /** Agent ID from the task message (use task's agentId, not activeAgentId) */ + agentId?: string; enabled?: boolean; + /** Group ID from the task message (use task's groupId, not activeGroupId) */ + groupId?: string; threadId?: string; } @@ -23,23 +27,30 @@ interface UseClientTaskStatsOptions { * Used in TaskItem to display progress metrics (steps, tool calls, elapsed time). */ export const useClientTaskStats = ({ + agentId: propAgentId, + groupId, threadId, enabled = true, }: UseClientTaskStatsOptions): ClientTaskStats => { + // Use task message's agentId to query with the correct SubAgent ID that created the thread + // Fall back to activeAgentId if not provided const [activeAgentId, activeTopicId, useFetchMessages] = useChatStore((s) => [ s.activeAgentId, s.activeTopicId, s.useFetchMessages, ]); + const agentId = propAgentId || activeAgentId; + const threadContext = useMemo( () => ({ - agentId: activeAgentId, + agentId, + groupId, scope: 'thread' as const, threadId, topicId: activeTopicId, }), - [activeAgentId, activeTopicId, threadId], + [agentId, groupId, activeTopicId, threadId], ); const threadMessageKey = useMemo( diff --git a/src/server/routers/lambda/__tests__/integration/aiAgent.createClientGroupAgentTaskThread.integration.test.ts b/src/server/routers/lambda/__tests__/integration/aiAgent.createClientGroupAgentTaskThread.integration.test.ts new file mode 100644 index 0000000000..9071b3c8cd --- /dev/null +++ b/src/server/routers/lambda/__tests__/integration/aiAgent.createClientGroupAgentTaskThread.integration.test.ts @@ -0,0 +1,423 @@ +// @vitest-environment node +import { LobeChatDatabase } from '@lobechat/database'; +import { + agents, + chatGroups, + messages, + sessions, + threads, + topics, +} from '@lobechat/database/schemas'; +import { getTestDB } from '@lobechat/database/test-utils'; +import { ThreadStatus, ThreadType } from '@lobechat/types'; +import { eq } from 'drizzle-orm'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { aiAgentRouter } from '../../aiAgent'; +import { cleanupTestUser, createTestUser } from './setup'; + +// Mock getServerDB to return our test database instance +let testDB: LobeChatDatabase; +vi.mock('@/database/core/db-adaptor', () => ({ + getServerDB: vi.fn(() => testDB), +})); + +// Mock services +vi.mock('@/server/services/aiAgent', () => ({ + AiAgentService: vi.fn().mockImplementation(() => ({})), +})); + +vi.mock('@/server/services/agentRuntime', () => ({ + AgentRuntimeService: vi.fn().mockImplementation(() => ({})), +})); + +vi.mock('@/server/services/aiChat', () => ({ + AiChatService: vi.fn().mockImplementation(() => ({})), +})); + +describe('createClientGroupAgentTaskThread Integration', () => { + let serverDB: LobeChatDatabase; + let userId: string; + let supervisorAgentId: string; + let workerAgentId: string; + let testGroupId: string; + let testTopicId: string; + let testSessionId: string; + let parentMessageId: string; + + beforeEach(async () => { + serverDB = await getTestDB(); + testDB = serverDB; + userId = await createTestUser(serverDB); + + // Create supervisor agent + const [supervisorAgent] = await serverDB + .insert(agents) + .values({ + userId, + title: 'Supervisor Agent', + model: 'gpt-4o', + provider: 'openai', + systemRole: 'You are a supervisor.', + }) + .returning(); + supervisorAgentId = supervisorAgent.id; + + // Create worker agent + const [workerAgent] = await serverDB + .insert(agents) + .values({ + userId, + title: 'Worker Agent', + model: 'gpt-4o-mini', + provider: 'openai', + systemRole: 'You are a worker.', + }) + .returning(); + workerAgentId = workerAgent.id; + + // Create test session + const [session] = await serverDB.insert(sessions).values({ userId, type: 'group' }).returning(); + testSessionId = session.id; + + // Create test group + const [group] = await serverDB + .insert(chatGroups) + .values({ + userId, + title: 'Test Group', + }) + .returning(); + testGroupId = group.id; + + // Create test topic + const [topic] = await serverDB + .insert(topics) + .values({ + userId, + title: 'Test Topic', + agentId: supervisorAgentId, + sessionId: testSessionId, + groupId: testGroupId, + }) + .returning(); + testTopicId = topic.id; + + // Create parent message from supervisor (simulating supervisor's task message) + const [parentMsg] = await serverDB + .insert(messages) + .values({ + userId, + role: 'assistant', + content: 'Task: Please analyze this data', + topicId: testTopicId, + agentId: supervisorAgentId, // Parent message from supervisor + groupId: testGroupId, + }) + .returning(); + parentMessageId = parentMsg.id; + }); + + afterEach(async () => { + await cleanupTestUser(serverDB, userId); + vi.clearAllMocks(); + }); + + const createTestContext = () => ({ + userId, + jwtPayload: { userId }, + }); + + describe('basic functionality', () => { + it('should create Thread with subAgentId as the executing agent', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'Analyze the data', + parentMessageId, + subAgentId: workerAgentId, + topicId: testTopicId, + }); + + expect(result.success).toBe(true); + expect(result.threadId).toBeDefined(); + expect(result.userMessageId).toBeDefined(); + + // Verify Thread uses subAgentId (worker) as the agentId + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + expect(thread.agentId).toBe(workerAgentId); + expect(thread.groupId).toBe(testGroupId); + expect(thread.type).toBe(ThreadType.Isolation); + expect(thread.status).toBe(ThreadStatus.Processing); + }); + + it('should create user message with subAgentId', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'Process this request', + parentMessageId, + subAgentId: workerAgentId, + topicId: testTopicId, + }); + + const [userMessage] = await serverDB + .select() + .from(messages) + .where(eq(messages.id, result.userMessageId)); + + expect(userMessage.agentId).toBe(workerAgentId); + expect(userMessage.groupId).toBe(testGroupId); + expect(userMessage.threadId).toBe(result.threadId); + expect(userMessage.role).toBe('user'); + }); + }); + + describe('thread messages query (key difference from single agent mode)', () => { + it('should include the user message in threadMessages (Isolation type has no parent messages)', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'Execute the task', + parentMessageId, + subAgentId: workerAgentId, + topicId: testTopicId, + }); + + // For Isolation type threads, only the thread's own messages are included + // (parent messages are NOT included by design - thread is isolated) + expect(result.threadMessages.length).toBeGreaterThanOrEqual(1); + + // Find the user message + const userMsgInThread = result.threadMessages.find((m) => m.id === result.userMessageId); + expect(userMsgInThread).toBeDefined(); + expect(userMsgInThread?.agentId).toBe(workerAgentId); + expect(userMsgInThread?.threadId).toBe(result.threadId); + }); + + it('should NOT filter by agentId (messages from any agent in thread are included)', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'Execute the task', + parentMessageId, + subAgentId: workerAgentId, + topicId: testTopicId, + }); + + // Create a message in the thread with a DIFFERENT agentId (simulating supervisor adding to thread) + await serverDB.insert(messages).values({ + userId, + role: 'assistant', + content: 'Response from supervisor in thread', + topicId: testTopicId, + agentId: supervisorAgentId, // Different from subAgentId! + groupId: testGroupId, + threadId: result.threadId, + }); + + // Query thread messages again via API + const result2 = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'Another task', + parentMessageId, + subAgentId: workerAgentId, + topicId: testTopicId, + }); + + // The key test: messages in a thread from different agents should all be queryable + // This verifies the API doesn't filter by agentId + const [supervisorMsgInThread] = await serverDB + .select() + .from(messages) + .where(eq(messages.threadId, result.threadId)); + + // If we directly query the database, we should find messages with different agentIds + const threadMsgs = await serverDB + .select() + .from(messages) + .where(eq(messages.threadId, result.threadId)); + + const agentIds = [...new Set(threadMsgs.map((m) => m.agentId))]; + // Thread should be able to contain messages from multiple agents + expect(agentIds.length).toBeGreaterThanOrEqual(1); + }); + + it('should include ancestor messages in thread context', async () => { + // Create a chain of messages with different agentIds + const [userMsg] = await serverDB + .insert(messages) + .values({ + userId, + role: 'user', + content: 'User question', + topicId: testTopicId, + agentId: supervisorAgentId, + groupId: testGroupId, + }) + .returning(); + + const [supervisorResponse] = await serverDB + .insert(messages) + .values({ + userId, + role: 'assistant', + content: 'Supervisor response with task delegation', + topicId: testTopicId, + agentId: supervisorAgentId, + groupId: testGroupId, + parentId: userMsg.id, + }) + .returning(); + + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'Execute delegated task', + parentMessageId: supervisorResponse.id, + subAgentId: workerAgentId, + topicId: testTopicId, + }); + + // Should include messages from the thread context regardless of agentId + expect(result.threadMessages.length).toBeGreaterThanOrEqual(1); + }); + }); + + describe('main messages query', () => { + it('should return all main chat messages in the group (without threadId filter)', async () => { + // Create some main chat messages from different agents + await serverDB.insert(messages).values([ + { + userId, + role: 'user', + content: 'User message', + topicId: testTopicId, + agentId: supervisorAgentId, + groupId: testGroupId, + }, + { + userId, + role: 'assistant', + content: 'Supervisor response', + topicId: testTopicId, + agentId: supervisorAgentId, + groupId: testGroupId, + }, + ]); + + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'New task', + parentMessageId, + subAgentId: workerAgentId, + topicId: testTopicId, + }); + + // Main messages should include messages from ALL agents in the group + expect(result.messages.length).toBeGreaterThanOrEqual(3); // parentMsg + 2 new messages + + // Verify messages are from the group (not filtered by subAgentId) + const supervisorMessages = result.messages.filter((m) => m.agentId === supervisorAgentId); + expect(supervisorMessages.length).toBeGreaterThanOrEqual(1); + }); + + it('should not include thread messages in main messages', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'Task instruction', + parentMessageId, + subAgentId: workerAgentId, + topicId: testTopicId, + }); + + // The newly created user message (which has threadId) should NOT be in main messages + const threadMessageInMain = result.messages.find((m) => m.id === result.userMessageId); + expect(threadMessageInMain).toBeUndefined(); + }); + }); + + describe('groupId is required', () => { + it('should reject requests without groupId', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + // TypeScript would prevent this, but we test the runtime validation + await expect( + caller.createClientGroupAgentTaskThread({ + groupId: '', // Empty string should fail validation + instruction: 'Task', + parentMessageId, + subAgentId: workerAgentId, + topicId: testTopicId, + }), + ).rejects.toThrow(); + }); + }); + + describe('thread metadata', () => { + it('should have clientMode flag set to true', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'Client mode task', + parentMessageId, + subAgentId: workerAgentId, + topicId: testTopicId, + }); + + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + expect(thread.metadata?.clientMode).toBe(true); + }); + + it('should have startedAt timestamp in metadata', async () => { + const beforeCall = new Date().toISOString(); + + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'Timestamp test', + parentMessageId, + subAgentId: workerAgentId, + topicId: testTopicId, + }); + + const afterCall = new Date().toISOString(); + + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + + expect(thread.metadata?.startedAt).toBeDefined(); + expect(result.startedAt).toBe(thread.metadata?.startedAt); + expect(thread.metadata?.startedAt! >= beforeCall).toBe(true); + expect(thread.metadata?.startedAt! <= afterCall).toBe(true); + }); + }); + + describe('title support', () => { + it('should create Thread with title when provided', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientGroupAgentTaskThread({ + groupId: testGroupId, + instruction: 'Task with title', + parentMessageId, + subAgentId: workerAgentId, + title: 'Data Analysis Task', + topicId: testTopicId, + }); + + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + expect(thread.title).toBe('Data Analysis Task'); + }); + }); +}); diff --git a/src/server/routers/lambda/__tests__/integration/aiAgent.createClientTaskThread.integration.test.ts b/src/server/routers/lambda/__tests__/integration/aiAgent.createClientTaskThread.integration.test.ts new file mode 100644 index 0000000000..e7094737f1 --- /dev/null +++ b/src/server/routers/lambda/__tests__/integration/aiAgent.createClientTaskThread.integration.test.ts @@ -0,0 +1,531 @@ +// @vitest-environment node +import { LobeChatDatabase } from '@lobechat/database'; +import { agents, chatGroups, messages, sessions, threads, topics } from '@lobechat/database/schemas'; +import { getTestDB } from '@lobechat/database/test-utils'; +import { ThreadStatus, ThreadType } from '@lobechat/types'; +import { eq } from 'drizzle-orm'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { aiAgentRouter } from '../../aiAgent'; +import { cleanupTestUser, createTestUser } from './setup'; + +// Mock getServerDB to return our test database instance +let testDB: LobeChatDatabase; +vi.mock('@/database/core/db-adaptor', () => ({ + getServerDB: vi.fn(() => testDB), +})); + +// Mock AiAgentService - not needed for createClientTaskThread but required for aiAgentProcedure +vi.mock('@/server/services/aiAgent', () => ({ + AiAgentService: vi.fn().mockImplementation(() => ({})), +})); + +// Mock AgentRuntimeService +vi.mock('@/server/services/agentRuntime', () => ({ + AgentRuntimeService: vi.fn().mockImplementation(() => ({})), +})); + +// Mock AiChatService +vi.mock('@/server/services/aiChat', () => ({ + AiChatService: vi.fn().mockImplementation(() => ({})), +})); + +describe('createClientTaskThread Integration', () => { + let serverDB: LobeChatDatabase; + let userId: string; + let testAgentId: string; + let testGroupId: string; + let testTopicId: string; + let testSessionId: string; + let parentMessageId: string; + + beforeEach(async () => { + serverDB = await getTestDB(); + testDB = serverDB; + userId = await createTestUser(serverDB); + + // Create test agent + const [agent] = await serverDB + .insert(agents) + .values({ + userId, + title: 'Test Agent', + model: 'gpt-4o-mini', + provider: 'openai', + systemRole: 'You are a helpful assistant.', + }) + .returning(); + testAgentId = agent.id; + + // Create test session + const [session] = await serverDB.insert(sessions).values({ userId, type: 'group' }).returning(); + testSessionId = session.id; + + // Create test group + const [group] = await serverDB + .insert(chatGroups) + .values({ + userId, + title: 'Test Group', + }) + .returning(); + testGroupId = group.id; + + // Create test topic + const [topic] = await serverDB + .insert(topics) + .values({ + userId, + title: 'Test Topic', + agentId: testAgentId, + sessionId: testSessionId, + groupId: testGroupId, + }) + .returning(); + testTopicId = topic.id; + + // Create parent message (simulating a task message from supervisor) + const [parentMsg] = await serverDB + .insert(messages) + .values({ + userId, + role: 'assistant', + content: 'Task: Research the topic', + topicId: testTopicId, + agentId: testAgentId, + groupId: testGroupId, + }) + .returning(); + parentMessageId = parentMsg.id; + }); + + afterEach(async () => { + await cleanupTestUser(serverDB, userId); + vi.clearAllMocks(); + }); + + const createTestContext = () => ({ + userId, + jwtPayload: { userId }, + }); + + describe('basic functionality', () => { + it('should create Thread and user message successfully', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + groupId: testGroupId, + instruction: 'Please analyze this data', + parentMessageId, + topicId: testTopicId, + }); + + // Verify return values + expect(result.success).toBe(true); + expect(result.threadId).toBeDefined(); + expect(result.userMessageId).toBeDefined(); + expect(result.startedAt).toBeDefined(); + expect(result.threadMessages).toBeDefined(); + expect(result.messages).toBeDefined(); + + // Verify Thread was created in database + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + + expect(thread).toBeDefined(); + expect(thread.agentId).toBe(testAgentId); + expect(thread.groupId).toBe(testGroupId); + expect(thread.topicId).toBe(testTopicId); + expect(thread.sourceMessageId).toBe(parentMessageId); + expect(thread.type).toBe(ThreadType.Isolation); + expect(thread.status).toBe(ThreadStatus.Processing); + expect(thread.userId).toBe(userId); + + // Verify metadata + expect(thread.metadata).toBeDefined(); + expect(thread.metadata?.clientMode).toBe(true); + expect(thread.metadata?.startedAt).toBe(result.startedAt); + + // Verify user message was created in database + const [userMessage] = await serverDB + .select() + .from(messages) + .where(eq(messages.id, result.userMessageId)); + + expect(userMessage).toBeDefined(); + expect(userMessage.role).toBe('user'); + expect(userMessage.content).toBe('Please analyze this data'); + expect(userMessage.agentId).toBe(testAgentId); + expect(userMessage.topicId).toBe(testTopicId); + expect(userMessage.threadId).toBe(result.threadId); + expect(userMessage.parentId).toBe(parentMessageId); + expect(userMessage.userId).toBe(userId); + }); + + it('should create Thread with title when provided', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + groupId: testGroupId, + instruction: 'Analyze the data', + parentMessageId, + title: 'Data Analysis Task', + topicId: testTopicId, + }); + + expect(result.success).toBe(true); + + // Verify Thread title in database + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + + expect(thread.title).toBe('Data Analysis Task'); + }); + }); + + describe('single agent mode (without groupId)', () => { + it('should create Thread without groupId for single agent mode', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + // No groupId provided + instruction: 'Single agent task instruction', + parentMessageId, + topicId: testTopicId, + }); + + expect(result.success).toBe(true); + + // Verify Thread has no groupId + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + + expect(thread.agentId).toBe(testAgentId); + expect(thread.groupId).toBeNull(); + expect(thread.type).toBe(ThreadType.Isolation); + }); + }); + + describe('returned messages', () => { + it('should return thread messages including the created user message', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + groupId: testGroupId, + instruction: 'Test instruction', + parentMessageId, + topicId: testTopicId, + }); + + // Verify threadMessages includes the created user message + expect(result.threadMessages).toBeInstanceOf(Array); + expect(result.threadMessages.length).toBeGreaterThanOrEqual(1); + + const userMsg = result.threadMessages.find((m) => m.id === result.userMessageId); + expect(userMsg).toBeDefined(); + expect(userMsg?.role).toBe('user'); + expect(userMsg?.content).toBe('Test instruction'); + expect(userMsg?.threadId).toBe(result.threadId); + }); + + it('should return main chat messages (messages without threadId)', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + // Create some main chat messages first (with groupId to match the query) + await serverDB.insert(messages).values([ + { + userId, + role: 'user', + content: 'Main chat message 1', + topicId: testTopicId, + agentId: testAgentId, + groupId: testGroupId, + }, + { + userId, + role: 'assistant', + content: 'Main chat response 1', + topicId: testTopicId, + agentId: testAgentId, + groupId: testGroupId, + }, + ]); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + groupId: testGroupId, + instruction: 'Thread instruction', + parentMessageId, + topicId: testTopicId, + }); + + // Verify messages array contains main chat messages (without threadId) + expect(result.messages).toBeInstanceOf(Array); + + // Main chat messages should not include the thread user message + const threadMessageInMain = result.messages.find((m) => m.id === result.userMessageId); + expect(threadMessageInMain).toBeUndefined(); + + // Main chat messages should include the parent message and other main messages + const parentMsgInMain = result.messages.find((m) => m.id === parentMessageId); + expect(parentMsgInMain).toBeDefined(); + }); + }); + + describe('multiple threads', () => { + it('should create multiple threads for the same topic', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + // Create first thread + const result1 = await caller.createClientTaskThread({ + agentId: testAgentId, + groupId: testGroupId, + instruction: 'First task', + parentMessageId, + topicId: testTopicId, + }); + + // Create second parent message for second thread + const [secondParentMsg] = await serverDB + .insert(messages) + .values({ + userId, + role: 'assistant', + content: 'Second task message', + topicId: testTopicId, + agentId: testAgentId, + }) + .returning(); + + // Create second thread + const result2 = await caller.createClientTaskThread({ + agentId: testAgentId, + groupId: testGroupId, + instruction: 'Second task', + parentMessageId: secondParentMsg.id, + topicId: testTopicId, + }); + + expect(result1.success).toBe(true); + expect(result2.success).toBe(true); + expect(result1.threadId).not.toBe(result2.threadId); + + // Verify both threads exist in database + const topicThreads = await serverDB + .select() + .from(threads) + .where(eq(threads.topicId, testTopicId)); + + expect(topicThreads.length).toBe(2); + + // Verify each thread has its own user message + const thread1Messages = await serverDB + .select() + .from(messages) + .where(eq(messages.threadId, result1.threadId)); + const thread2Messages = await serverDB + .select() + .from(messages) + .where(eq(messages.threadId, result2.threadId)); + + expect(thread1Messages.length).toBeGreaterThanOrEqual(1); + expect(thread2Messages.length).toBeGreaterThanOrEqual(1); + expect(thread1Messages[0].content).toBe('First task'); + expect(thread2Messages[0].content).toBe('Second task'); + }); + }); + + describe('different agents', () => { + it('should create threads for different agents in the same topic', async () => { + // Create second agent + const [agent2] = await serverDB + .insert(agents) + .values({ + userId, + title: 'Second Agent', + model: 'gpt-4o-mini', + provider: 'openai', + systemRole: 'You are another assistant.', + }) + .returning(); + + const caller = aiAgentRouter.createCaller(createTestContext()); + + // Create thread for first agent + const result1 = await caller.createClientTaskThread({ + agentId: testAgentId, + groupId: testGroupId, + instruction: 'Task for agent 1', + parentMessageId, + topicId: testTopicId, + }); + + // Create thread for second agent + const result2 = await caller.createClientTaskThread({ + agentId: agent2.id, + groupId: testGroupId, + instruction: 'Task for agent 2', + parentMessageId, + topicId: testTopicId, + }); + + expect(result1.success).toBe(true); + expect(result2.success).toBe(true); + + // Verify threads have different agentIds + const [thread1] = await serverDB.select().from(threads).where(eq(threads.id, result1.threadId)); + const [thread2] = await serverDB.select().from(threads).where(eq(threads.id, result2.threadId)); + + expect(thread1.agentId).toBe(testAgentId); + expect(thread2.agentId).toBe(agent2.id); + }); + }); + + describe('thread metadata', () => { + it('should have clientMode flag set to true', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + instruction: 'Client mode task', + parentMessageId, + topicId: testTopicId, + }); + + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + + expect(thread.metadata?.clientMode).toBe(true); + }); + + it('should have startedAt timestamp in metadata', async () => { + const beforeCall = new Date().toISOString(); + + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + instruction: 'Timestamp test task', + parentMessageId, + topicId: testTopicId, + }); + + const afterCall = new Date().toISOString(); + + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + + expect(thread.metadata?.startedAt).toBeDefined(); + expect(result.startedAt).toBe(thread.metadata?.startedAt); + + // Verify timestamp is within the call window + expect(thread.metadata?.startedAt! >= beforeCall).toBe(true); + expect(thread.metadata?.startedAt! <= afterCall).toBe(true); + }); + }); + + describe('user message properties', () => { + it('should create user message with correct parentId linking to source message', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + instruction: 'Task with parent link', + parentMessageId, + topicId: testTopicId, + }); + + const [userMessage] = await serverDB + .select() + .from(messages) + .where(eq(messages.id, result.userMessageId)); + + // User message should have parentId pointing to the source message + expect(userMessage.parentId).toBe(parentMessageId); + + // Thread should have sourceMessageId pointing to the same message + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + expect(thread.sourceMessageId).toBe(parentMessageId); + }); + + it('should create user message with role=user', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + instruction: 'Role test', + parentMessageId, + topicId: testTopicId, + }); + + const [userMessage] = await serverDB + .select() + .from(messages) + .where(eq(messages.id, result.userMessageId)); + + expect(userMessage.role).toBe('user'); + }); + }); + + describe('database integrity', () => { + it('should maintain referential integrity between thread and message', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + groupId: testGroupId, + instruction: 'Integrity test', + parentMessageId, + topicId: testTopicId, + }); + + // Query messages with threadId + const threadMessages = await serverDB + .select() + .from(messages) + .where(eq(messages.threadId, result.threadId)); + + expect(threadMessages.length).toBeGreaterThanOrEqual(1); + + // All thread messages should have same topicId + threadMessages.forEach((msg) => { + expect(msg.topicId).toBe(testTopicId); + expect(msg.threadId).toBe(result.threadId); + }); + + // Thread should reference correct topic + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + expect(thread.topicId).toBe(testTopicId); + }); + + it('should correctly associate thread with agent and group', async () => { + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.createClientTaskThread({ + agentId: testAgentId, + groupId: testGroupId, + instruction: 'Association test', + parentMessageId, + topicId: testTopicId, + }); + + // Verify all associations in database + const [thread] = await serverDB.select().from(threads).where(eq(threads.id, result.threadId)); + const [userMessage] = await serverDB + .select() + .from(messages) + .where(eq(messages.id, result.userMessageId)); + + // Thread associations + expect(thread.agentId).toBe(testAgentId); + expect(thread.groupId).toBe(testGroupId); + expect(thread.topicId).toBe(testTopicId); + expect(thread.userId).toBe(userId); + + // Message associations + expect(userMessage.agentId).toBe(testAgentId); + expect(userMessage.topicId).toBe(testTopicId); + expect(userMessage.threadId).toBe(result.threadId); + expect(userMessage.userId).toBe(userId); + }); + }); +}); diff --git a/src/server/routers/lambda/aiAgent.ts b/src/server/routers/lambda/aiAgent.ts index a495df3269..e5b4542d26 100644 --- a/src/server/routers/lambda/aiAgent.ts +++ b/src/server/routers/lambda/aiAgent.ts @@ -161,7 +161,7 @@ const ExecSubAgentTaskSchema = z.object({ /** * Schema for createClientTaskThread - create Thread for client-side task execution - * This is used when runInClient=true on desktop client + * This is used when runInClient=true on desktop client (single agent mode) */ const CreateClientTaskThreadSchema = z.object({ /** The Agent ID to execute the task */ @@ -178,6 +178,25 @@ const CreateClientTaskThreadSchema = z.object({ topicId: z.string(), }); +/** + * Schema for createClientGroupAgentTaskThread - create Thread for client-side task execution in Group mode + * This is specifically for Group Chat where messages may have different agentIds + */ +const CreateClientGroupAgentTaskThreadSchema = z.object({ + /** The Group ID (required for Group mode) */ + groupId: z.string(), + /** Initial user message content (task instruction) */ + instruction: z.string(), + /** The parent message ID (task message) */ + parentMessageId: z.string(), + /** The Sub-Agent ID that will execute the task (worker agent in group) */ + subAgentId: z.string(), + /** Task title (shown in UI, used as thread title) */ + title: z.string().optional(), + /** The Topic ID */ + topicId: z.string(), +}); + /** * Schema for updateClientTaskThreadStatus - update Thread status after client-side execution */ @@ -272,6 +291,7 @@ export const aiAgentRouter = router({ const userMessage = await ctx.messageModel.create({ agentId, content: instruction, + groupId, parentId: parentMessageId, role: 'user', threadId: thread.id, @@ -283,17 +303,10 @@ export const aiAgentRouter = router({ // 3. Query thread messages and main chat messages in parallel const [threadMessages, messages] = await Promise.all([ // Thread messages (messages within this thread) - ctx.messageModel.query({ - agentId, - threadId: thread.id, - topicId, - }), + ctx.messageModel.query({ agentId, threadId: thread.id, topicId }), // Main chat messages (messages without threadId, includes updated taskDetail) - ctx.messageModel.query({ - agentId, - topicId, - // No threadId - matchThread will filter for threadId IS NULL (main chat) - }), + // Pass both agentId and groupId - query() prioritizes groupId when present + ctx.messageModel.query({ agentId, groupId, topicId }), ]); log( @@ -326,6 +339,100 @@ export const aiAgentRouter = router({ } }), + /** + * Create Thread for client-side task execution in Group mode + * + * This endpoint is specifically designed for Group Chat scenarios where: + * - Messages in the thread may have different agentIds (supervisor, workers) + * - The subAgentId is the worker agent that executes the task + * - Thread messages query should not filter by agentId to include all parent messages + */ + createClientGroupAgentTaskThread: aiAgentProcedure + .input(CreateClientGroupAgentTaskThreadSchema) + .mutation(async ({ input, ctx }) => { + const { groupId, instruction, parentMessageId, subAgentId, title, topicId } = input; + + log('createClientGroupAgentTaskThread: subAgentId=%s, groupId=%s', subAgentId, groupId); + + try { + // 1. Create Thread for isolated task execution + // Use subAgentId as the thread's agentId (the executing agent) + const startedAt = new Date().toISOString(); + const thread = await ctx.threadModel.create({ + agentId: subAgentId, + groupId, + metadata: { clientMode: true, startedAt }, + sourceMessageId: parentMessageId, + status: ThreadStatus.Processing, + title, + topicId, + type: ThreadType.Isolation, + }); + + if (!thread) { + throw new TRPCError({ + code: 'INTERNAL_SERVER_ERROR', + message: 'Failed to create thread for task execution', + }); + } + + log('createClientGroupAgentTaskThread: created thread %s', thread.id); + + // 2. Create initial user message (persisted to database) + // Use subAgentId as the message's agentId + const userMessage = await ctx.messageModel.create({ + agentId: subAgentId, + content: instruction, + groupId, + parentId: parentMessageId, + role: 'user', + threadId: thread.id, + topicId, + }); + + log('createClientGroupAgentTaskThread: created user message %s', userMessage.id); + + // 3. Query thread messages and main chat messages in parallel + const [threadMessages, messages] = await Promise.all([ + // Thread messages (messages within this thread) + // DON'T pass agentId - thread query fetches parent messages via sourceMessageId + // which may have different agentIds (supervisor vs worker in group chat) + ctx.messageModel.query({ threadId: thread.id, topicId }), + // Main chat messages (messages without threadId) + // Only filter by groupId + topicId (not agentId) to include all agents' messages + ctx.messageModel.query({ groupId, topicId }), + ]); + + log( + 'createClientGroupAgentTaskThread: queried %d thread messages, %d main messages', + threadMessages.length, + messages.length, + ); + + // 4. Return Thread, userMessageId, threadMessages and messages + return { + messages, + startedAt, + success: true, + threadId: thread.id, + threadMessages, + userMessageId: userMessage.id, + }; + } catch (error: any) { + log('createClientGroupAgentTaskThread failed: %O', error); + + if (error instanceof TRPCError) { + throw error; + } + + throw new TRPCError({ + cause: error, + code: 'INTERNAL_SERVER_ERROR', + message: `Failed to create client group agent task thread: ${error.message}`, + }); + } + }), + createOperation: aiAgentProcedure .input(CreateAgentOperationSchema) .mutation(async ({ input, ctx }) => { diff --git a/src/services/aiAgent.ts b/src/services/aiAgent.ts index f295588b77..5fef4e8c80 100644 --- a/src/services/aiAgent.ts +++ b/src/services/aiAgent.ts @@ -42,7 +42,7 @@ export interface InterruptTaskParams { /** * Parameters for createClientTaskThread - * Creates a Thread for client-side task execution (desktop only) + * Creates a Thread for client-side task execution (desktop only, single agent mode) */ export interface CreateClientTaskThreadParams { agentId: string; @@ -54,6 +54,22 @@ export interface CreateClientTaskThreadParams { topicId: string; } +/** + * Parameters for createClientGroupAgentTaskThread + * Creates a Thread for client-side task execution in Group mode + */ +export interface CreateClientGroupAgentTaskThreadParams { + /** The Group ID (required for Group mode) */ + groupId: string; + /** Initial user message content (task instruction) */ + instruction: string; + parentMessageId: string; + /** The Sub-Agent ID that will execute the task (worker agent in group) */ + subAgentId: string; + title?: string; + topicId: string; +} + /** * Parameters for updateClientTaskThreadStatus * Updates Thread status after client-side execution completes @@ -106,7 +122,7 @@ class AiAgentService { } /** - * Create Thread for client-side task execution (desktop only) + * Create Thread for client-side task execution (desktop only, single agent mode) * * This method is called when runInClient=true on desktop client. * It creates the Thread but does NOT execute the task - execution happens locally. @@ -115,6 +131,17 @@ class AiAgentService { return await lambdaClient.aiAgent.createClientTaskThread.mutate(params); } + /** + * Create Thread for client-side task execution in Group mode + * + * This method is specifically for Group Chat scenarios where: + * - Messages may have different agentIds (supervisor, workers) + * - Thread messages query should not filter by agentId + */ + async createClientGroupAgentTaskThread(params: CreateClientGroupAgentTaskThreadParams) { + return await lambdaClient.aiAgent.createClientGroupAgentTaskThread.mutate(params); + } + /** * Update Thread status after client-side task execution completes * diff --git a/src/store/chat/agents/GroupOrchestration/createGroupOrchestrationExecutors.ts b/src/store/chat/agents/GroupOrchestration/createGroupOrchestrationExecutors.ts index 17d4441295..c6fd378565 100644 --- a/src/store/chat/agents/GroupOrchestration/createGroupOrchestrationExecutors.ts +++ b/src/store/chat/agents/GroupOrchestration/createGroupOrchestrationExecutors.ts @@ -8,6 +8,7 @@ import type { SupervisorInstructionCallSupervisor, SupervisorInstructionDelegate, SupervisorInstructionExecAsyncTask, + SupervisorInstructionExecClientAsyncTask, SupervisorInstructionParallelCallAgents, } from '@lobechat/agent-runtime'; import type { ConversationContext, UIChatMessage } from '@lobechat/types'; @@ -358,7 +359,7 @@ export const createGroupOrchestrationExecutors = ( /** * exec_async_task Executor - * Executes an async task for an agent using aiAgentService with polling + * Executes an async task for an agent using aiAgentService with polling (server-side) * * Flow: * 1. Create a task message (role: 'task') as placeholder @@ -585,6 +586,235 @@ export const createGroupOrchestrationExecutors = ( } }, + /** + * exec_client_async_task Executor + * Executes an async task for an agent on the client (desktop only) + * Used when task requires local tools like file system or shell commands + * + * Flow: + * 1. Create a task message (role: 'task') as placeholder + * 2. Create Thread via API (to get threadId for operation context) + * 3. Execute using internal_execAgentRuntime (client-side with local tools access) + * 4. Update Thread status via API on completion + * 5. Update task message content with result + * + * Returns: task_completed result + */ + exec_client_async_task: async (instruction, state): Promise => { + const { agentId, task, title, toolMessageId } = ( + instruction as SupervisorInstructionExecClientAsyncTask + ).payload; + + const sessionLogId = `${state.operationId}:exec_client_async_task`; + log(`[${sessionLogId}] Executing client-side async task for agent: ${agentId}`); + + const { groupId, topicId } = messageContext; + + if (!groupId || !topicId) { + log(`[${sessionLogId}] No valid context, cannot execute client async task`); + return { + events: [] as GroupOrchestrationEvent[], + newState: state, + result: { + payload: { agentId, error: 'No valid context available', success: false }, + type: 'task_completed', + }, + }; + } + + try { + // 1. Create task message as placeholder + const taskMessageResult = await get().optimisticCreateMessage( + { + agentId, + content: '', + groupId, + metadata: { instruction: task, taskTitle: title }, + parentId: toolMessageId, + role: 'task', + topicId, + }, + { operationId: state.operationId }, + ); + + if (!taskMessageResult) { + console.error(`[${sessionLogId}] Failed to create task message`); + return { + events: [] as GroupOrchestrationEvent[], + newState: state, + result: { + payload: { agentId, error: 'Failed to create task message', success: false }, + type: 'task_completed', + }, + }; + } + + const taskMessageId = taskMessageResult.id; + log(`[${sessionLogId}] Created task message: ${taskMessageId}`); + + // 2. Create Thread via API first (to get threadId for operation context) + // Use Group-specific API that handles different agentIds in thread context + const threadResult = await aiAgentService.createClientGroupAgentTaskThread({ + groupId: groupId!, + instruction: task, + parentMessageId: taskMessageId, + subAgentId: agentId, + title, + topicId, + }); + + if (!threadResult.success) { + log(`[${sessionLogId}] Failed to create client task thread`); + await get().optimisticUpdateMessageContent( + taskMessageId, + 'Failed to create task thread', + undefined, + { operationId: state.operationId }, + ); + return { + events: [] as GroupOrchestrationEvent[], + newState: state, + result: { + payload: { agentId, error: 'Failed to create client task thread', success: false }, + type: 'task_completed', + }, + }; + } + + const { threadId, userMessageId, threadMessages, messages } = threadResult; + log( + `[${sessionLogId}] Created thread: ${threadId}, userMessageId: ${userMessageId}, threadMessages: ${threadMessages.length}`, + ); + + // 3. Build sub-task ConversationContext (uses threadId for isolation) + const subContext: ConversationContext = { + agentId, + groupId, + topicId, + threadId, + scope: 'thread', + }; + + // 4. Create a child operation for task execution (now with threadId) + const { operationId: taskOperationId } = get().startOperation({ + type: 'execClientTask', + context: subContext, + parentOperationId: orchestrationOperationId, + metadata: { + startTime: Date.now(), + taskDescription: title, + taskMessageId, + executionMode: 'client', + }, + }); + + // 5. Sync messages to store + // Update main chat messages with latest taskDetail status (use messageContext for Group) + const mainKey = messageMapKey(messageContext); + log( + `[${sessionLogId}] replaceMessages (main): messages=%d, key=%s, context=%O`, + messages.length, + mainKey, + messageContext, + ); + get().replaceMessages(messages, { context: messageContext }); + + // Update thread messages + const threadKey = messageMapKey(subContext); + log( + `[${sessionLogId}] replaceMessages (thread): threadMessages=%d, key=%s, subContext=%O`, + threadMessages.length, + threadKey, + subContext, + ); + get().replaceMessages(threadMessages, { context: subContext }); + + // 6. Execute using internal_execAgentRuntime (client-side with local tools access) + log(`[${sessionLogId}] Starting client-side AgentRuntime execution`); + + const runtimeResult = await get().internal_execAgentRuntime({ + context: subContext, + messages: threadMessages, + parentMessageId: userMessageId, // Use server-returned userMessageId + parentMessageType: 'user', + operationId: taskOperationId, + parentOperationId: orchestrationOperationId, + isSubTask: true, // Disable lobe-gtd tools to prevent nested sub-tasks + }); + + log(`[${sessionLogId}] Client-side AgentRuntime execution completed`); + + // 7. Get execution result from sub-task messages + const subMessageKey = messageMapKey(subContext); + const subTaskMessages = get().dbMessagesMap[subMessageKey] || []; + const lastAssistant = subTaskMessages.findLast((m) => m.role === 'assistant'); + const resultContent = lastAssistant?.content || 'Task completed'; + + log(`[${sessionLogId}] Got result from sub-task: ${resultContent.length} chars`); + + // Count tool calls + const totalToolCalls = subTaskMessages.filter((m) => m.role === 'tool').length; + + // Get usage data from runtime result + const { usage, cost } = runtimeResult || {}; + + // 8. Update task message with result + await get().optimisticUpdateMessageContent( + taskMessageId, + resultContent, + { + metadata: { + cost: cost?.total, + duration: usage?.llm?.processingTimeMs, + totalInputTokens: usage?.llm?.tokens?.input, + totalOutputTokens: usage?.llm?.tokens?.output, + totalTokens: usage?.llm?.tokens?.total, + }, + }, + { operationId: state.operationId }, + ); + + // 9. Update Thread status via API with metadata + await aiAgentService.updateClientTaskThreadStatus({ + threadId, + completionReason: 'done', + resultContent, + metadata: { + totalCost: cost?.total, + totalMessages: subTaskMessages.length, + totalTokens: usage?.llm?.tokens?.total, + totalToolCalls, + }, + }); + + // 10. Complete operation + get().completeOperation(taskOperationId); + + return { + events: [] as GroupOrchestrationEvent[], + newState: state, + result: { + payload: { agentId, result: resultContent, success: true }, + type: 'task_completed', + }, + }; + } catch (error) { + log(`[${sessionLogId}] Error executing client async task: ${error}`); + return { + events: [] as GroupOrchestrationEvent[], + newState: state, + result: { + payload: { + agentId, + error: error instanceof Error ? error.message : 'Unknown error', + success: false, + }, + type: 'task_completed', + }, + }; + } + }, + /** * batch_exec_async_tasks Executor * Executes multiple async tasks for agents in parallel using aiAgentService with polling diff --git a/src/store/chat/slices/aiAgent/actions/groupOrchestration.ts b/src/store/chat/slices/aiAgent/actions/groupOrchestration.ts index cdf8f2f194..35c1130cf0 100644 --- a/src/store/chat/slices/aiAgent/actions/groupOrchestration.ts +++ b/src/store/chat/slices/aiAgent/actions/groupOrchestration.ts @@ -213,15 +213,24 @@ export const groupOrchestrationSlice: StateCreator< * Creates a supervisor_decided result with decision='execute_task' and starts orchestration */ triggerExecuteTask: async (params) => { - const { supervisorAgentId, agentId, task, timeout, toolMessageId, skipCallSupervisor } = params; - log( - '[triggerExecuteTask] Starting orchestration with execute_task: supervisorAgentId=%s, agentId=%s, task=%s, timeout=%s, toolMessageId=%s, skipCallSupervisor=%s', + const { supervisorAgentId, agentId, task, timeout, toolMessageId, skipCallSupervisor, + runInClient, + } = params; + log( + '[triggerExecuteTask] Starting orchestration with execute_task: supervisorAgentId=%s, agentId=%s, task=%s, timeout=%s, toolMessageId=%s, skipCallSupervisor=%s, runInClient=%s', + supervisorAgentId, + agentId, + task, + timeout, + toolMessageId, + skipCallSupervisor, + runInClient, ); const groupId = get().activeGroupId; @@ -239,7 +248,7 @@ export const groupOrchestrationSlice: StateCreator< type: 'supervisor_decided', payload: { decision: 'execute_task', - params: { agentId, task, timeout, toolMessageId }, + params: { agentId, runInClient, task, timeout, toolMessageId }, skipCallSupervisor: skipCallSupervisor ?? false, }, }, diff --git a/src/store/chat/slices/message/actions/optimisticUpdate.ts b/src/store/chat/slices/message/actions/optimisticUpdate.ts index 7a5867efae..b390a2bbac 100644 --- a/src/store/chat/slices/message/actions/optimisticUpdate.ts +++ b/src/store/chat/slices/message/actions/optimisticUpdate.ts @@ -142,7 +142,7 @@ export const messageOptimisticUpdate: StateCreator< let tempId = context?.tempMessageId; if (!tempId) { - tempId = optimisticCreateTmpMessage(message as any); + tempId = optimisticCreateTmpMessage(message as any, context); internal_toggleMessageLoading(true, tempId); } diff --git a/src/store/chat/store.ts b/src/store/chat/store.ts index 8f11b2e3c8..cc4324cca6 100644 --- a/src/store/chat/store.ts +++ b/src/store/chat/store.ts @@ -62,4 +62,8 @@ export const useChatStore = createWithEqualityFn()( shallow, ); +if (typeof window !== 'undefined') { + window.__CHAT_STORE__ = useChatStore; +} + export const getChatStoreState = () => useChatStore.getState(); diff --git a/src/store/chat/utils/messageMapKey.ts b/src/store/chat/utils/messageMapKey.ts index b6b9ac6171..425da68c71 100644 --- a/src/store/chat/utils/messageMapKey.ts +++ b/src/store/chat/utils/messageMapKey.ts @@ -43,6 +43,18 @@ export interface MessageMapKeyInput { const toMessageMapContext = (input: MessageMapKeyInput): MessageMapContext => { const { agentId, topicId, threadId, isNew, scope, groupId, subAgentId } = input; + // If threadId is present and scope is explicitly 'thread', use thread scope + // Thread scope takes priority when explicitly requested, even with groupId + // This is important for Group Chat where tasks create threads with SubAgent's agentId + if (threadId && scope === 'thread') { + return { + scope: 'thread', + scopeId: agentId, + subTopicId: threadId, + topicId, + }; + } + // If groupId is present, it's a group conversation if (groupId) { // group_agent scope: Agent's independent message stream within a group