Compare commits

...

1 Commits

Author SHA1 Message Date
arvinxx 79dbb67b22 feat: support agency frontend api and tool engine 2026-03-13 19:34:44 +08:00
21 changed files with 1454 additions and 33 deletions
@@ -145,6 +145,60 @@ describe('createEnableChecker', () => {
});
});
describe('default behavior - should disable unknown tools', () => {
it('should disable tools not listed in rules by default', () => {
const checker = createEnableChecker({
rules: {
'knowledge-base': true,
'memory': false,
'web-browsing': true,
},
});
// Tools in rules should follow their rule
expect(checker(makeParams('knowledge-base'))).toBe(true);
expect(checker(makeParams('memory'))).toBe(false);
expect(checker(makeParams('web-browsing'))).toBe(true);
// BUG: Tools NOT in rules currently default to true,
// but should default to false to prevent unintended tool activation
// This is the regression test for the "all 7 builtin tools enabled" bug
expect(checker(makeParams('lobe-tools'))).toBe(false);
expect(checker(makeParams('lobe-skills'))).toBe(false);
expect(checker(makeParams('lobe-skill-store'))).toBe(false);
});
});
describe('user-selected tools via rules', () => {
it('should only enable user-selected tools plus explicitly enabled defaults', () => {
// Simulates: user selected only "notebook", system enables knowledge-base and web-browsing
const userPlugins = ['notebook'];
const rules: Record<string, boolean> = {
// System-level rules
'knowledge-base': true,
'memory': false,
'web-browsing': true,
// User-selected plugins
...Object.fromEntries(userPlugins.map((id) => [id, true])),
};
const checker = createEnableChecker({ rules });
// User-selected tool: enabled
expect(checker(makeParams('notebook'))).toBe(true);
// System-enabled tools: follow their rules
expect(checker(makeParams('knowledge-base'))).toBe(true);
expect(checker(makeParams('web-browsing'))).toBe(true);
expect(checker(makeParams('memory'))).toBe(false);
// Default tools NOT in rules: should be disabled
expect(checker(makeParams('lobe-tools'))).toBe(false);
expect(checker(makeParams('lobe-skills'))).toBe(false);
expect(checker(makeParams('lobe-skill-store'))).toBe(false);
});
});
describe('priority order', () => {
it('should apply: explicitActivation > platformFilter > rules > default', () => {
const checker = createEnableChecker({
@@ -0,0 +1,90 @@
/**
* Application context for message storage
*/
export interface ExecAgentAppContext {
/** Group ID for group chat */
groupId?: string | null;
/** Scope identifier */
scope?: string | null;
/** Session ID */
sessionId?: string;
/** Thread ID for threaded conversations */
threadId?: string | null;
/** Topic ID */
topicId?: string | null;
}
/**
* Parameters for execAgent - execute a single Agent
* Either agentId or slug must be provided
*/
export interface ExecAgentParams {
/** The agent ID to run (either agentId or slug is required) */
agentId?: string;
/** Application context for message storage */
appContext?: ExecAgentAppContext;
/** Whether to auto-start execution after creating operation (default: true) */
autoStart?: boolean;
/** Optional existing message IDs to include in context */
existingMessageIds?: string[];
/** Already-uploaded file IDs to attach to the user message */
fileIds?: string[];
/** Create a new thread along with execution */
newThread?: {
/** Parent thread ID for nested threads */
parentThreadId?: string;
/** Source message ID that spawned this thread */
sourceMessageId: string;
/** Thread title */
title?: string;
/** Thread type */
type: string;
};
/** Create a new topic with additional options */
newTopic?: {
/** Existing message IDs to associate with the topic */
topicMessageIds?: string[];
};
/** Page selections metadata to attach to the user message */
pageSelections?: Array<{ content: string; title?: string; url?: string }>;
/** The user input/prompt */
prompt: string;
/** The agent slug to run (either agentId or slug is required) */
slug?: string;
}
/**
* Response from execAgent
*/
export interface ExecAgentResult {
/** The resolved agent ID */
agentId: string;
/** The assistant message ID created for this operation */
assistantMessageId: string;
/** Whether the operation was auto-started */
autoStarted: boolean;
/** Timestamp when operation was created */
createdAt: string;
/** The thread ID if a new thread was created */
createdThreadId?: string;
/** Error message if operation failed to start */
error?: string;
/** Whether a new topic was created */
isCreateNewTopic?: boolean;
/** Status message */
message: string;
/** Queue message ID if auto-started */
messageId?: string;
/** Operation ID for SSE connection */
operationId: string;
/** Operation status */
status: string;
/** Whether the operation was created successfully */
success: boolean;
/** ISO timestamp */
timestamp: string;
/** The topic ID (created or reused) */
topicId: string;
/** The user message ID created for this operation */
userMessageId: string;
}
+1
View File
@@ -1,5 +1,6 @@
export * from './agent';
export * from './agentCronJob';
export * from './agentExecution';
export * from './agentGroup';
export * from './aiChat';
export * from './aiProvider';
+3 -3
View File
@@ -3,7 +3,7 @@ import debug from 'debug';
import { type NextRequest } from 'next/server';
import { NextResponse } from 'next/server';
import { StreamEventManager } from '@/server/modules/AgentRuntime';
import { createStreamEventManager } from '@/server/modules/AgentRuntime';
const log = debug('api-route:agent:stream');
const timing = debug('lobe-server:agent-runtime:timing');
@@ -13,8 +13,8 @@ const timing = debug('lobe-server:agent-runtime:timing');
* Provides real-time Agent execution event stream for clients
*/
export async function GET(request: NextRequest) {
// Initialize stream event manager
const streamManager = new StreamEventManager();
// Initialize stream event manager (uses InMemory singleton in local dev, Redis in production)
const streamManager = createStreamEventManager();
const { searchParams } = new URL(request.url);
const operationId = searchParams.get('operationId');
+2
View File
@@ -91,6 +91,8 @@ export const createAgentToolsEngine = (
const agentState = getAgentStoreState();
const userPlugins = agentSelectors.currentAgentPlugins(agentState);
const userPlugins = agentSelectors.currentAgentPlugins(agentState);
return createToolsEngine({
defaultToolIds,
enableChecker: createEnableChecker({
@@ -130,6 +130,42 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
log('InMemoryStreamEventManager disconnected');
}
/**
* Subscribe to stream events (for SSE endpoint)
* Compatible with Redis StreamEventManager.subscribeStreamEvents
*/
async subscribeStreamEvents(
operationId: string,
_lastEventId: string,
onEvents: (events: StreamEvent[]) => void,
signal?: AbortSignal,
): Promise<void> {
return new Promise<void>((resolve) => {
const unsubscribe = this.subscribe(operationId, (events) => {
onEvents(events);
// Check if agent_runtime_end was received — caller will handle closing
const hasEnd = events.some((e) => e.type === 'agent_runtime_end');
if (hasEnd) {
unsubscribe();
resolve();
}
});
// Handle abort signal
if (signal) {
const onAbort = () => {
unsubscribe();
resolve();
};
if (signal.aborted) {
onAbort();
return;
}
signal.addEventListener('abort', onAbort, { once: true });
}
});
}
/**
* Subscribe to stream events (for testing)
*/
+10
View File
@@ -144,4 +144,14 @@ export interface IStreamEventManager {
operationId: string,
event: Omit<StreamEvent, 'operationId' | 'timestamp'>,
) => Promise<string>;
/**
* Subscribe to stream events (for SSE endpoint)
*/
subscribeStreamEvents: (
operationId: string,
lastEventId: string,
onEvents: (events: StreamEvent[]) => void,
signal?: AbortSignal,
) => Promise<void>;
}
+56 -2
View File
@@ -93,6 +93,33 @@ const ExecAgentSchema = z
autoStart: z.boolean().optional().default(true),
/** Optional existing message IDs to include in context */
existingMessageIds: z.array(z.string()).optional().default([]),
/** Already-uploaded file IDs to attach to the user message */
fileIds: z.array(z.string()).optional(),
/** Create a new thread along with execution */
newThread: z
.object({
parentThreadId: z.string().optional(),
sourceMessageId: z.string(),
title: z.string().optional(),
type: z.string(),
})
.optional(),
/** Create a new topic with additional options */
newTopic: z
.object({
topicMessageIds: z.array(z.string()).optional(),
})
.optional(),
/** Page selections metadata to attach to the user message */
pageSelections: z
.array(
z.object({
content: z.string(),
title: z.string().optional(),
url: z.string().optional(),
}),
)
.optional(),
/** The user input/prompt */
prompt: z.string(),
/** The agent slug to run (either agentId or slug is required) */
@@ -518,19 +545,46 @@ export const aiAgentRouter = router({
}),
execAgent: aiAgentProcedure.input(ExecAgentSchema).mutation(async ({ input, ctx }) => {
const { agentId, slug, prompt, appContext, autoStart = true, existingMessageIds = [] } = input;
const {
agentId,
slug,
prompt,
appContext,
autoStart = true,
existingMessageIds = [],
fileIds,
newThread,
newTopic,
pageSelections,
} = input;
log('execAgent: identifier=%s, prompt=%s', agentId || slug, prompt.slice(0, 50));
try {
return await ctx.aiAgentService.execAgent({
const result = await ctx.aiAgentService.execAgent({
agentId,
appContext,
autoStart,
existingMessageIds,
fileIds,
newThread,
newTopic,
pageSelections,
prompt,
slug,
});
// Return messages for UI sync regardless of success/failure,
// since execAgent persists user and assistant messages before starting the operation
const { messages, topics } = await ctx.aiChatService.getMessagesAndTopics({
agentId: result.agentId,
groupId: appContext?.groupId,
includeTopic: result.isCreateNewTopic,
threadId: result.createdThreadId ?? appContext?.threadId,
topicId: result.topicId,
});
return { ...result, messages, topics };
} catch (error: any) {
console.error('execAgent failed: %O', error);
@@ -576,6 +576,46 @@ describe('AgentRuntimeService', () => {
}),
);
});
it('should publish agent_runtime_end when step completes with status done', async () => {
// Regression: when runtime.step returns status='done' (simple LLM response, no tool calls),
// coordinator.saveStepResult should detect the state transition and publish agent_runtime_end.
// Without this, the SSE connection stays open and keeps sending heartbeats forever.
const mockStepResult = {
newState: { ...mockState, stepCount: 1, status: 'done' },
nextContext: null,
events: [{ type: 'done', reason: 'natural_end' }],
};
const mockRuntime = { step: vi.fn().mockResolvedValue(mockStepResult) };
vi.spyOn(service as any, 'createAgentRuntime').mockReturnValue({ runtime: mockRuntime });
await service.executeStep(mockParams);
// Verify agent_runtime_end was published via coordinator.saveStepResult
// The coordinator should detect status transition to 'done' and call publishAgentRuntimeEnd
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
'test-operation-1',
expect.any(Number), // stepCount
expect.objectContaining({ status: 'done' }),
);
});
it('should NOT publish agent_runtime_end when step completes with status running', async () => {
const mockStepResult = {
newState: { ...mockState, stepCount: 2, status: 'running' },
nextContext: mockParams.context,
events: [],
};
const mockRuntime = { step: vi.fn().mockResolvedValue(mockStepResult) };
vi.spyOn(service as any, 'createAgentRuntime').mockReturnValue({ runtime: mockRuntime });
await service.executeStep(mockParams);
// agent_runtime_end should NOT be called for running state
expect(mockStreamManager.publishAgentRuntimeEnd).not.toHaveBeenCalled();
});
});
describe('executeStep - tool result extraction', () => {
@@ -0,0 +1,408 @@
import type * as ModelBankModule from 'model-bank';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { AiAgentService } from '../index';
const {
mockMessageCreate,
mockCreateOperation,
mockTopicCreate,
mockThreadCreate,
mockUploadFromUrl,
} = vi.hoisted(() => ({
mockCreateOperation: vi.fn(),
mockMessageCreate: vi.fn(),
mockThreadCreate: vi.fn(),
mockTopicCreate: vi.fn(),
mockUploadFromUrl: vi.fn(),
}));
vi.mock('@/libs/trusted-client', () => ({
generateTrustedClientToken: vi.fn().mockReturnValue(undefined),
getTrustedClientTokenForSession: vi.fn().mockResolvedValue(undefined),
isTrustedClientEnabled: vi.fn().mockReturnValue(false),
}));
vi.mock('@/database/models/message', () => ({
MessageModel: vi.fn().mockImplementation(() => ({
create: mockMessageCreate,
query: vi.fn().mockResolvedValue([]),
update: vi.fn().mockResolvedValue({}),
})),
}));
vi.mock('@/database/models/agent', () => ({
AgentModel: vi.fn().mockImplementation(() => ({
getAgentConfig: vi.fn().mockResolvedValue({
chatConfig: {},
files: [],
id: 'agent-1',
knowledgeBases: [],
model: 'gpt-4',
plugins: [],
provider: 'openai',
systemRole: 'You are a helpful assistant',
}),
})),
}));
vi.mock('@/server/services/agent', () => ({
AgentService: vi.fn().mockImplementation(() => ({
getAgentConfig: vi.fn().mockResolvedValue({
chatConfig: {},
files: [],
id: 'agent-1',
knowledgeBases: [],
model: 'gpt-4',
plugins: [],
provider: 'openai',
systemRole: 'You are a helpful assistant',
}),
})),
}));
vi.mock('@/database/models/plugin', () => ({
PluginModel: vi.fn().mockImplementation(() => ({
query: vi.fn().mockResolvedValue([]),
})),
}));
vi.mock('@/database/models/topic', () => ({
TopicModel: vi.fn().mockImplementation(() => ({
create: mockTopicCreate,
})),
}));
vi.mock('@/database/models/thread', () => ({
ThreadModel: vi.fn().mockImplementation(() => ({
create: mockThreadCreate,
findById: vi.fn(),
update: vi.fn(),
})),
}));
vi.mock('@/server/services/agentRuntime', () => ({
AgentRuntimeService: vi.fn().mockImplementation(() => ({
createOperation: mockCreateOperation,
})),
}));
vi.mock('@/server/services/market', () => ({
MarketService: vi.fn().mockImplementation(() => ({
getLobehubSkillManifests: vi.fn().mockResolvedValue([]),
})),
}));
vi.mock('@/server/services/klavis', () => ({
KlavisService: vi.fn().mockImplementation(() => ({
getKlavisManifests: vi.fn().mockResolvedValue([]),
})),
}));
vi.mock('@/server/services/file', () => ({
FileService: vi.fn().mockImplementation(() => ({
uploadFromUrl: mockUploadFromUrl,
})),
}));
vi.mock('@/server/modules/Mecha', () => ({
createServerAgentToolsEngine: vi.fn().mockReturnValue({
generateToolsDetailed: vi.fn().mockReturnValue({ enabledToolIds: [], tools: [] }),
getEnabledPluginManifests: vi.fn().mockReturnValue(new Map()),
}),
serverMessagesEngine: vi.fn().mockResolvedValue([{ content: 'test', role: 'user' }]),
}));
vi.mock('@/server/services/toolExecution/deviceProxy', () => ({
deviceProxy: {
isConfigured: false,
queryDeviceList: vi.fn().mockResolvedValue([]),
},
}));
vi.mock('model-bank', async (importOriginal) => {
const actual = await importOriginal<typeof ModelBankModule>();
return {
...actual,
LOBE_DEFAULT_MODEL_LIST: [
{
abilities: { functionCall: true, video: false, vision: true },
id: 'gpt-4',
providerId: 'openai',
},
],
};
});
describe('AiAgentService.execAgent - regression tests for sendMessage support', () => {
let service: AiAgentService;
const mockDb = {} as any;
const userId = 'test-user-id';
beforeEach(() => {
vi.clearAllMocks();
mockMessageCreate.mockResolvedValue({ id: 'msg-1' });
mockTopicCreate.mockResolvedValue({ id: 'topic-1' });
mockCreateOperation.mockResolvedValue({
autoStarted: true,
messageId: 'queue-msg-1',
operationId: 'op-123',
success: true,
});
service = new AiAgentService(mockDb, userId);
});
afterEach(() => {
vi.clearAllMocks();
});
// ========== Gap 1: fileIds passthrough ==========
describe('fileIds - pass already-uploaded file IDs', () => {
it('should pass fileIds directly to user message without calling uploadFromUrl', async () => {
const result = await service.execAgent({
agentId: 'agent-1',
fileIds: ['file-already-1', 'file-already-2'],
prompt: 'Analyze these files',
});
// Should NOT call uploadFromUrl since files are already uploaded
expect(mockUploadFromUrl).not.toHaveBeenCalled();
// Verify user message was created with the provided fileIds
const userMessageCall = mockMessageCreate.mock.calls.find((call) => call[0].role === 'user');
expect(userMessageCall![0].files).toEqual(['file-already-1', 'file-already-2']);
expect(result.success).toBe(true);
});
it('should merge fileIds with newly uploaded file IDs', async () => {
mockUploadFromUrl.mockResolvedValue({
fileId: 'file-new-1',
key: 'files/test-user-id/xxx/photo.png',
url: 'https://app.lobehub.com/f/file-new-1',
});
await service.execAgent({
agentId: 'agent-1',
fileIds: ['file-existing-1'],
files: [
{
mimeType: 'image/png',
name: 'photo.png',
size: 12345,
url: 'https://cdn.example.com/photo.png',
},
],
prompt: 'Look at these',
});
// Verify both existing and new file IDs are included
const userMessageCall = mockMessageCreate.mock.calls.find((call) => call[0].role === 'user');
expect(userMessageCall![0].files).toContain('file-existing-1');
expect(userMessageCall![0].files).toContain('file-new-1');
expect(userMessageCall![0].files).toHaveLength(2);
});
});
// ========== Gap 2: newTopic.topicMessageIds ==========
describe('newTopic.topicMessageIds - associate existing messages with new topic', () => {
it('should pass topicMessageIds to topicModel.create when creating new topic', async () => {
await service.execAgent({
agentId: 'agent-1',
newTopic: {
topicMessageIds: ['existing-msg-1', 'existing-msg-2'],
},
prompt: 'Continue this conversation',
});
// Verify topicModel.create was called with messages
expect(mockTopicCreate).toHaveBeenCalledWith(
expect.objectContaining({
messages: ['existing-msg-1', 'existing-msg-2'],
}),
);
});
it('should create topic without topicMessageIds when not provided', async () => {
await service.execAgent({
agentId: 'agent-1',
prompt: 'Hello',
});
// topicModel.create should be called (new topic created since no topicId given)
expect(mockTopicCreate).toHaveBeenCalledWith(
expect.objectContaining({
messages: undefined,
}),
);
});
});
// ========== Gap 3: newThread creation ==========
describe('newThread - create thread during execution', () => {
it('should create a thread and use its ID for messages', async () => {
mockThreadCreate.mockResolvedValue({ id: 'thread-new-1' });
const result = await service.execAgent({
agentId: 'agent-1',
appContext: { topicId: 'topic-existing' },
newThread: {
sourceMessageId: 'source-msg-1',
title: 'Sub-task thread',
type: 'agent_task',
},
prompt: 'Execute this sub-task',
});
// Verify thread was created with correct params
expect(mockThreadCreate).toHaveBeenCalledWith(
expect.objectContaining({
sourceMessageId: 'source-msg-1',
title: 'Sub-task thread',
type: 'agent_task',
}),
);
// Verify both user and assistant messages use the new thread ID
const userMsgCall = mockMessageCreate.mock.calls.find((call) => call[0].role === 'user');
const asstMsgCall = mockMessageCreate.mock.calls.find((call) => call[0].role === 'assistant');
expect(userMsgCall![0].threadId).toBe('thread-new-1');
expect(asstMsgCall![0].threadId).toBe('thread-new-1');
// Verify createdThreadId is in the result
expect(result.createdThreadId).toBe('thread-new-1');
});
it('should use appContext.threadId when newThread is not provided', async () => {
const result = await service.execAgent({
agentId: 'agent-1',
appContext: { threadId: 'existing-thread', topicId: 'topic-existing' },
prompt: 'Reply in this thread',
});
// Should NOT create a new thread
expect(mockThreadCreate).not.toHaveBeenCalled();
// Messages should use appContext.threadId
const userMsgCall = mockMessageCreate.mock.calls.find((call) => call[0].role === 'user');
expect(userMsgCall![0].threadId).toBe('existing-thread');
// No createdThreadId
expect(result.createdThreadId).toBeUndefined();
});
it('should pass newThread.parentThreadId for nested threads', async () => {
mockThreadCreate.mockResolvedValue({ id: 'thread-nested-1' });
await service.execAgent({
agentId: 'agent-1',
newThread: {
parentThreadId: 'parent-thread-1',
sourceMessageId: 'source-msg-1',
type: 'agent_task',
},
prompt: 'Nested task',
});
expect(mockThreadCreate).toHaveBeenCalledWith(
expect.objectContaining({
parentThreadId: 'parent-thread-1',
sourceMessageId: 'source-msg-1',
}),
);
});
});
// ========== Gap 4: isCreateNewTopic and createdThreadId in response ==========
describe('response includes isCreateNewTopic and createdThreadId', () => {
it('should return isCreateNewTopic=true when a new topic was created', async () => {
// No topicId in appContext → creates new topic
const result = await service.execAgent({
agentId: 'agent-1',
prompt: 'Start new conversation',
});
expect(result.isCreateNewTopic).toBe(true);
expect(result.topicId).toBe('topic-1');
});
it('should return isCreateNewTopic=false when using existing topicId', async () => {
const result = await service.execAgent({
agentId: 'agent-1',
appContext: { topicId: 'topic-existing' },
prompt: 'Continue conversation',
});
expect(result.isCreateNewTopic).toBe(false);
expect(result.topicId).toBe('topic-existing');
});
it('should include createdThreadId when newThread is provided', async () => {
mockThreadCreate.mockResolvedValue({ id: 'thread-created' });
const result = await service.execAgent({
agentId: 'agent-1',
newThread: {
sourceMessageId: 'src-msg',
type: 'agent_task',
},
prompt: 'Task with thread',
});
expect(result.createdThreadId).toBe('thread-created');
});
it('should include isCreateNewTopic and createdThreadId even on operation error', async () => {
mockThreadCreate.mockResolvedValue({ id: 'thread-err' });
mockCreateOperation.mockRejectedValue(new Error('QStash unavailable'));
const result = await service.execAgent({
agentId: 'agent-1',
newThread: {
sourceMessageId: 'src-msg',
type: 'agent_task',
},
prompt: 'This will fail',
});
expect(result.success).toBe(false);
expect(result.isCreateNewTopic).toBe(true);
expect(result.createdThreadId).toBe('thread-err');
expect(result.error).toBe('QStash unavailable');
});
});
// ========== pageSelections metadata ==========
describe('pageSelections - attach metadata to user message', () => {
it('should include pageSelections in user message metadata', async () => {
await service.execAgent({
agentId: 'agent-1',
pageSelections: [
{ content: 'Selected text from page', title: 'Test Page', url: 'https://example.com' },
],
prompt: 'Summarize this selection',
});
const userMessageCall = mockMessageCreate.mock.calls.find((call) => call[0].role === 'user');
expect(userMessageCall![0].metadata).toEqual({
pageSelections: [
{
content: 'Selected text from page',
title: 'Test Page',
url: 'https://example.com',
},
],
});
});
it('should not set metadata when pageSelections is empty', async () => {
await service.execAgent({
agentId: 'agent-1',
prompt: 'No selections',
});
const userMessageCall = mockMessageCreate.mock.calls.find((call) => call[0].role === 'user');
expect(userMessageCall![0].metadata).toBeUndefined();
});
});
});
+59 -12
View File
@@ -93,6 +93,8 @@ interface InternalExecAgentParams extends ExecAgentParams {
discordContext?: any;
/** Eval context for injecting environment prompts into system message */
evalContext?: EvalContext;
/** Already-uploaded file IDs to attach to the user message (from frontend) */
fileIds?: string[];
/** External file URLs to download, upload to S3, and attach to the user message */
files?: Array<{
mimeType?: string;
@@ -102,6 +104,20 @@ interface InternalExecAgentParams extends ExecAgentParams {
}>;
/** Maximum steps for the agent operation */
maxSteps?: number;
/** Create a new thread along with execution */
newThread?: {
parentThreadId?: string;
sourceMessageId: string;
title?: string;
type: string;
};
/** Create a new topic with additional options (topicMessageIds) */
newTopic?: {
topicMessageIds?: string[];
};
/** Page selections metadata to attach to the user message */
pageSelections?: Array<{ content: string; title?: string; url?: string }>;
/** Step lifecycle callbacks for operation tracking (server-side only) */
stepCallbacks?: StepLifecycleCallbacks;
/**
@@ -196,8 +212,12 @@ export class AiAgentService {
botContext,
discordContext,
existingMessageIds = [],
fileIds,
files,
instructions,
newThread,
newTopic,
pageSelections,
stepCallbacks,
stream,
title,
@@ -271,6 +291,7 @@ export class AiAgentService {
// 3. Handle topic creation: if no topicId provided, create a new topic; otherwise reuse existing
let topicId = appContext?.topicId;
let isCreateNewTopic = false;
if (!topicId) {
// Prepare metadata with cronJobId and botContext if provided
const metadata =
@@ -278,14 +299,16 @@ export class AiAgentService {
? { bot: botContext, cronJobId: cronJobId || undefined }
: undefined;
const newTopic = await this.topicModel.create({
const createdTopic = await this.topicModel.create({
agentId: resolvedAgentId,
messages: newTopic?.topicMessageIds,
metadata,
title:
title !== undefined ? title : prompt.slice(0, 50) + (prompt.length > 50 ? '...' : ''),
trigger,
});
topicId = newTopic.id;
topicId = createdTopic.id;
isCreateNewTopic = true;
log(
'execAgent: created new topic %s with trigger %s, cronJobId %s',
topicId,
@@ -617,13 +640,13 @@ export class AiAgentService {
});
}
// 12. Upload external files to S3 and collect file IDs
let fileIds: string[] | undefined;
// 12. Handle file IDs: either use already-uploaded fileIds or upload external files
let resolvedFileIds: string[] | undefined = fileIds;
let imageList: Array<{ alt: string; id: string; url: string }> | undefined;
if (files && files.length > 0) {
const fileService = new FileService(this.db, this.userId);
fileIds = [];
resolvedFileIds = resolvedFileIds || [];
imageList = [];
for (const file of files) {
@@ -632,7 +655,7 @@ export class AiAgentService {
try {
const result = await fileService.uploadFromUrl(file.url, pathname);
fileIds.push(result.fileId);
resolvedFileIds.push(result.fileId);
// Build imageList for vision-capable models
const mimeType = file.mimeType || '';
@@ -644,20 +667,40 @@ export class AiAgentService {
}
}
if (fileIds.length > 0) {
log('execAgent: uploaded %d files to S3', fileIds.length);
if (resolvedFileIds.length > 0) {
log('execAgent: uploaded %d files to S3', resolvedFileIds.length);
}
if (imageList.length === 0) imageList = undefined;
}
// 12.5. Create thread if newThread is provided
let createdThreadId: string | undefined;
let threadId = appContext?.threadId ?? undefined;
if (newThread) {
const threadItem = await this.threadModel.create({
parentThreadId: newThread.parentThreadId,
sourceMessageId: newThread.sourceMessageId,
title: newThread.title,
topicId,
type: newThread.type as any,
});
if (threadItem) {
threadId = threadItem.id;
createdThreadId = threadItem.id;
log('execAgent: created new thread %s', threadId);
}
}
// 13. Create user message in database
// Include threadId if provided (for SubAgent task execution in isolated Thread)
const userMessageMetadata = pageSelections?.length ? { pageSelections } : undefined;
const userMessageRecord = await this.messageModel.create({
agentId: resolvedAgentId,
content: prompt,
files: fileIds,
files: resolvedFileIds,
metadata: userMessageMetadata,
role: 'user',
threadId: appContext?.threadId ?? undefined,
threadId,
topicId,
});
log('execAgent: created user message %s', userMessageRecord.id);
@@ -671,7 +714,7 @@ export class AiAgentService {
parentId: userMessageRecord.id,
provider,
role: 'assistant',
threadId: appContext?.threadId ?? undefined,
threadId,
topicId,
});
log('execAgent: created assistant message %s', assistantMessageRecord.id);
@@ -732,7 +775,7 @@ export class AiAgentService {
appContext: {
agentId: resolvedAgentId,
groupId: appContext?.groupId,
threadId: appContext?.threadId,
threadId,
topicId,
},
autoStart,
@@ -766,6 +809,8 @@ export class AiAgentService {
assistantMessageId: assistantMessageRecord.id,
autoStarted: result.autoStarted,
createdAt: new Date().toISOString(),
createdThreadId,
isCreateNewTopic,
message: 'Agent operation created successfully',
messageId: result.messageId,
operationId,
@@ -801,7 +846,9 @@ export class AiAgentService {
assistantMessageId: assistantMessageRecord.id,
autoStarted: false,
createdAt: new Date().toISOString(),
createdThreadId,
error: errorMessage,
isCreateNewTopic,
message: 'Agent operation failed to start',
operationId,
status: 'error',
@@ -9,6 +9,7 @@
import { type ToolExecutionContext } from '../types';
import { calculatorRuntime } from './calculator';
import { cloudSandboxRuntime } from './cloudSandbox';
import { knowledgeBaseRuntime } from './knowledgeBase';
import { localSystemRuntime } from './localSystem';
import { memoryRuntime } from './memory';
import { notebookRuntime } from './notebook';
@@ -38,6 +39,7 @@ registerRuntimes([
webBrowsingRuntime,
cloudSandboxRuntime,
calculatorRuntime,
knowledgeBaseRuntime,
notebookRuntime,
skillStoreRuntime,
skillsRuntime,
@@ -0,0 +1,47 @@
import { KnowledgeBaseIdentifier } from '@lobechat/builtin-tool-knowledge-base';
import {
KnowledgeBaseExecutionRuntime,
type KnowledgeBaseService,
} from '@lobechat/builtin-tool-knowledge-base/executionRuntime';
import { KnowledgeBaseModel } from '@/database/models/knowledgeBase';
import { type ServerRuntimeRegistration } from './types';
/**
* Knowledge Base Server Runtime
* Per-request runtime (needs serverDB, userId)
*/
export const knowledgeBaseRuntime: ServerRuntimeRegistration = {
factory: (context) => {
if (!context.userId || !context.serverDB) {
throw new Error('userId and serverDB are required for KnowledgeBase execution');
}
const knowledgeBaseModel = new KnowledgeBaseModel(context.serverDB, context.userId);
const knowledgeBaseService: KnowledgeBaseService = {
createKnowledgeBase: async (params) => {
const result = await knowledgeBaseModel.create(params);
return result.id;
},
getKnowledgeBaseList: async () => {
return knowledgeBaseModel.query();
},
updateKnowledgeBase: async (id, value) => {
await knowledgeBaseModel.update(id, value);
},
};
// Note: ragService is not available on server side for this runtime,
// search/read operations are handled by the client-side executor.
// Server runtime only handles CRUD operations.
const noopRagService = {
getFileContents: async () => [],
semanticSearchForChat: async () => ({ chunks: [], fileResults: [] }),
};
return new KnowledgeBaseExecutionRuntime(noopRagService, knowledgeBaseService);
},
identifier: KnowledgeBaseIdentifier,
};
@@ -69,6 +69,8 @@ const getAgentModeById =
const getAgentEnableModeById =
(agentId: string) =>
(s: AgentStoreState): boolean => {
// TODO: remove this override after testing
return true;
const mode = getAgentModeById(agentId)(s);
return mode !== undefined;
};
@@ -7,6 +7,13 @@ import { useChatStore } from '@/store/chat/store';
// Keep zustand mock as it's needed globally
vi.mock('zustand/traditional');
// Mock packages that have cloud-specific import chains to prevent resolution errors
vi.mock('@lobechat/business-model-runtime', () => ({}));
vi.mock('@lobechat/model-runtime', () => ({
ModelProviderList: [],
ProviderRuntimeMap: {},
}));
// Test Constants
const TEST_IDS = {
ASSISTANT_MESSAGE_ID: 'test-assistant-id',
@@ -185,7 +192,7 @@ describe('runAgent actions', () => {
expect(context.assistantId).toBe(customAssistantId);
});
it('should NOT update assistantId when it is already set', async () => {
it('should NOT update assistantId when stream_start has the same assistantId', async () => {
const { result } = renderHook(() => useChatStore());
const originalAssistantId = 'original-assistant-id';
@@ -196,7 +203,7 @@ describe('runAgent actions', () => {
const event = createStreamStartEvent({
data: {
assistantMessage: {
id: 'different-assistant-id',
id: originalAssistantId, // Same ID
role: 'assistant',
content: '',
createdAt: Date.now(),
@@ -215,6 +222,274 @@ describe('runAgent actions', () => {
// assistantId should remain unchanged
expect(context.assistantId).toBe(originalAssistantId);
// Should NOT call refreshMessages or dispatchMessage
expect(result.current.refreshMessages).not.toHaveBeenCalled();
expect(result.current.internal_dispatchMessage).not.toHaveBeenCalled();
});
});
describe('multi-step tool execution (stream_start with new assistantId)', () => {
it('should switch assistantId when a new stream_start arrives with different ID', async () => {
const { result } = renderHook(() => useChatStore());
const step0AssistantId = 'step-0-assistant-id';
const step1AssistantId = 'step-1-assistant-id';
const context = createStreamingContext({
assistantId: step0AssistantId,
content: 'Step 0 content',
reasoning: 'Step 0 reasoning',
toolsCalling: [
{
id: 'tool-1',
identifier: 'search',
type: 'default',
apiName: 'search',
arguments: '{}',
},
],
});
const event = createStreamStartEvent({
data: {
assistantMessage: {
id: step1AssistantId,
role: 'assistant',
content: '',
createdAt: Date.now(),
updatedAt: Date.now(),
},
},
stepIndex: 2,
});
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
event,
context,
);
});
// assistantId should switch to the new message
expect(context.assistantId).toBe(step1AssistantId);
// Accumulated content should be reset
expect(context.content).toBe('');
expect(context.reasoning).toBe('');
expect(context.toolsCalling).toBeUndefined();
// Should refresh messages to load tool result messages
expect(result.current.refreshMessages).toHaveBeenCalled();
// Should start loading on the new assistant message
expect(result.current.internal_toggleMessageLoading).toHaveBeenCalledWith(
true,
step1AssistantId,
);
});
it('should stream text to the correct (new) assistant message after switching', async () => {
const { result } = renderHook(() => useChatStore());
const step0AssistantId = 'step-0-assistant-id';
const step1AssistantId = 'step-1-assistant-id';
const context = createStreamingContext({
assistantId: step0AssistantId,
content: 'Step 0 content',
});
// Step 1 stream_start with new assistant ID
const streamStartEvent = createStreamStartEvent({
data: {
assistantMessage: {
id: step1AssistantId,
role: 'assistant',
content: '',
createdAt: Date.now(),
updatedAt: Date.now(),
},
},
stepIndex: 2,
});
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
streamStartEvent,
context,
);
});
// Now send a text chunk — it should go to the NEW assistant message
const textChunkEvent: StreamEvent = {
type: 'stream_chunk',
timestamp: Date.now(),
operationId: TEST_IDS.OPERATION_ID,
stepIndex: 2,
data: {
chunkType: 'text',
content: 'Step 1 response',
},
};
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
textChunkEvent,
context,
);
});
// Content should only contain step 1's text (not step 0's)
expect(context.content).toBe('Step 1 response');
// Update should target the NEW assistant message
expect(result.current.internal_dispatchMessage).toHaveBeenCalledWith({
id: step1AssistantId,
type: 'updateMessage',
value: { content: 'Step 1 response' },
});
// Should NOT have updated the old assistant message with step 1 content
expect(result.current.internal_dispatchMessage).not.toHaveBeenCalledWith(
expect.objectContaining({
id: step0AssistantId,
type: 'updateMessage',
value: expect.objectContaining({ content: expect.stringContaining('Step 1') }),
}),
);
});
it('should handle full multi-step lifecycle: stream → tools → new stream', async () => {
const { result } = renderHook(() => useChatStore());
const step0AssistantId = 'step-0-assistant-id';
const step1AssistantId = 'step-1-assistant-id';
const context = createStreamingContext({ assistantId: '' });
// === Step 0: First stream_start ===
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
createStreamStartEvent({
data: {
assistantMessage: {
id: step0AssistantId,
role: 'assistant',
content: '',
createdAt: Date.now(),
updatedAt: Date.now(),
},
},
stepIndex: 0,
}),
context,
);
});
expect(context.assistantId).toBe(step0AssistantId);
// === Step 0: Text chunk ===
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
{
type: 'stream_chunk',
timestamp: Date.now(),
operationId: TEST_IDS.OPERATION_ID,
stepIndex: 0,
data: { chunkType: 'text', content: 'Let me search' },
},
context,
);
});
expect(context.content).toBe('Let me search');
// === Step 0: Tools calling chunk ===
const toolPayload = [
{
id: 'call-1',
identifier: 'web-search',
type: 'default' as const,
apiName: 'search',
arguments: '{"q":"test"}',
},
];
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
{
type: 'stream_chunk',
timestamp: Date.now(),
operationId: TEST_IDS.OPERATION_ID,
stepIndex: 0,
data: { chunkType: 'tools_calling', toolsCalling: toolPayload },
},
context,
);
});
expect(context.toolsCalling).toEqual(toolPayload);
// === Step 0: stream_end ===
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
{
type: 'stream_end',
timestamp: Date.now(),
operationId: TEST_IDS.OPERATION_ID,
stepIndex: 0,
data: { finalContent: 'Let me search', toolCalls: toolPayload },
},
context,
);
});
// === Step 1: New stream_start with different assistant ID ===
vi.mocked(result.current.internal_dispatchMessage).mockClear();
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
createStreamStartEvent({
data: {
assistantMessage: {
id: step1AssistantId,
role: 'assistant',
content: '',
createdAt: Date.now(),
updatedAt: Date.now(),
},
},
stepIndex: 2,
}),
context,
);
});
// Context should be reset for step 1
expect(context.assistantId).toBe(step1AssistantId);
expect(context.content).toBe('');
expect(context.reasoning).toBe('');
expect(context.toolsCalling).toBeUndefined();
// === Step 1: Text chunk ===
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
{
type: 'stream_chunk',
timestamp: Date.now(),
operationId: TEST_IDS.OPERATION_ID,
stepIndex: 2,
data: { chunkType: 'text', content: 'Based on the search results' },
},
context,
);
});
// Step 1 content should only have step 1's text
expect(context.content).toBe('Based on the search results');
// Should update the NEW assistant message
expect(result.current.internal_dispatchMessage).toHaveBeenCalledWith({
id: step1AssistantId,
type: 'updateMessage',
value: { content: 'Based on the search results' },
});
});
});
@@ -291,6 +566,44 @@ describe('runAgent actions', () => {
});
});
describe('agent_runtime_end event', () => {
it('should call refreshMessages to sync final state', async () => {
const { result } = renderHook(() => useChatStore());
const context = createStreamingContext({
assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID,
});
const event: StreamEvent = {
type: 'agent_runtime_end',
timestamp: Date.now(),
operationId: TEST_IDS.OPERATION_ID,
stepIndex: 2,
data: {
reason: 'completed',
reasonDetail: 'Agent completed successfully',
finalState: { status: 'done' },
},
};
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
event,
context,
);
});
// Should refresh messages to ensure all tool results are synced
expect(result.current.refreshMessages).toHaveBeenCalled();
// Should stop loading
expect(result.current.internal_toggleMessageLoading).toHaveBeenCalledWith(
false,
TEST_IDS.ASSISTANT_MESSAGE_ID,
);
});
});
describe('operation validation', () => {
it('should ignore events when operation is not found', async () => {
const { result } = renderHook(() => useChatStore());
@@ -129,6 +129,9 @@ export class AgentActionImpl {
});
}
// Final refresh to ensure all messages (tool results, etc.) are in sync
await this.#get().refreshMessages();
// Stop loading state
log(`Stopping loading for completed agent runtime: ${assistantId}`);
this.#get().internal_toggleMessageLoading(false, assistantId);
@@ -136,14 +139,34 @@ export class AgentActionImpl {
}
case 'stream_start': {
// If assistantId is already set (Group Chat flow), skip message creation/deletion
// In Group Chat, messages are already synced via replaceMessages from backend response
const newAssistantId = event.data?.assistantMessage?.id;
if (context.assistantId) {
log(`Stream started for ${context.assistantId} (message already synced from backend)`);
// Multi-step: a new stream started for a different assistant message
// This happens when tools are called and the LLM is invoked again
if (newAssistantId && newAssistantId !== context.assistantId) {
log(
`New stream step started, switching assistant from ${context.assistantId} to ${newAssistantId}`,
);
// Reset accumulated content for the new step
context.content = '';
context.reasoning = '';
context.toolsCalling = undefined;
context.assistantId = newAssistantId;
// Refresh messages to show tool result messages and the new assistant message
await this.#get().refreshMessages();
// Start loading on the new assistant message
this.#get().internal_toggleMessageLoading(true, newAssistantId);
} else {
log(`Stream started for ${context.assistantId} (message already synced from backend)`);
}
break;
}
// Original logic for normal Agent flow
// Original logic for normal Agent flow (first stream_start, no assistantId yet)
log(`Stream started for ${assistantId}:`, event.data);
internal_dispatchMessage({
id: context.tmpAssistantId,
@@ -60,13 +60,16 @@ export class ConversationControlActionImpl {
}
});
// Restore editor state if it's the active session
// Restore editor content if it's the active session
if (contextKey === messageMapKey({ agentId: activeAgentId, topicId: activeTopicId })) {
// Find the latest sendMessage operation with editor state
// Find the latest sendMessage operation with saved message content
for (const opId of [...operationIds].reverse()) {
const op = this.#get().operations[opId];
if (op && op.type === 'sendMessage' && op.metadata.inputEditorTempState) {
this.#get().mainInputEditor?.setJSONState(op.metadata.inputEditorTempState);
if (op && op.type === 'sendMessage' && op.metadata.inputMessageContent) {
this.#get().mainInputEditor?.setDocument(
'markdown',
op.metadata.inputMessageContent as string,
);
break;
}
}
@@ -11,12 +11,16 @@ import {
} from '@lobechat/types';
import { nanoid } from '@lobechat/utils';
import { TRPCClientError } from '@trpc/client';
import debug from 'debug';
import { t } from 'i18next';
import { markUserValidAction } from '@/business/client/markUserValidAction';
import { lambdaClient } from '@/libs/trpc/client';
import { agentRuntimeClient, type StreamEvent } from '@/services/agentRuntime';
import { aiAgentService } from '@/services/aiAgent';
import { aiChatService } from '@/services/aiChat';
import { getAgentStoreState } from '@/store/agent';
import { agentSelectors } from '@/store/agent/selectors';
import { agentByIdSelectors, agentSelectors } from '@/store/agent/selectors';
import { agentGroupByIdSelectors, getChatGroupStoreState } from '@/store/agentGroup';
import { type ChatStore } from '@/store/chat/store';
import { getFileStoreState } from '@/store/file/store';
@@ -28,6 +32,8 @@ import { useUserMemoryStore } from '@/store/userMemory';
import { dbMessageSelectors, displayMessageSelectors, topicSelectors } from '../../../selectors';
import { messageMapKey } from '../../../utils/messageMapKey';
const log = debug('store:chat:conversationLifecycle');
/**
* Extended params for sendMessage with context
*/
@@ -208,13 +214,33 @@ export class ConversationLifecycleActionImpl {
this.#get().associateMessageWithOperation(tempId, operationId);
this.#get().associateMessageWithOperation(tempAssistantId, operationId);
// Store editor state in operation metadata for cancel restoration
const jsonState = mainInputEditor?.getJSONState();
// Store message content in operation metadata for cancel restoration
// Note: editor JSON state is already cleared at this point (clearContent in handleSend),
// so we store the raw message text instead
this.#get().updateOperationMetadata(operationId, {
inputEditorTempState: jsonState,
inputMessageContent: message,
inputSendErrorMsg: undefined,
});
// ===== Agent Mode Branch =====
// When agent mode is enabled, use server-side execution (execAgent TRPC + SSE)
// instead of the client-side agent loop (sendMessageInServer + internal_execAgentRuntime)
const isAgentMode = agentByIdSelectors.getAgentEnableModeById(agentId)(getAgentStoreState());
if (isAgentMode) {
return this.#execAgentModeSendMessage({
abortController,
context: operationContext,
fileIds: fileIdList,
message,
messages,
newThread,
operationId,
pageSelections,
tempAssistantId,
tempId,
});
}
let data: SendMessageServerResponse | undefined;
try {
const { model, provider } = agentSelectors.getAgentConfigById(agentId)(getAgentStoreState());
@@ -333,7 +359,7 @@ export class ConversationLifecycleActionImpl {
// Clear editor temp state after message created
if (data) {
this.#get().updateOperationMetadata(operationId, { inputEditorTempState: null });
this.#get().updateOperationMetadata(operationId, { inputMessageContent: null });
}
if (ENABLE_BUSINESS_FEATURES) {
@@ -416,6 +442,256 @@ export class ConversationLifecycleActionImpl {
};
};
/**
* Server-side agent execution flow (agentMode=true).
* Follows the same pattern as sendGroupMessage:
* 1. Call execAgent TRPC (creates messages + starts background task)
* 2. Sync messages from response
* 3. Connect SSE stream for real-time updates
*/
#execAgentModeSendMessage = async ({
abortController,
context,
fileIds,
message,
messages,
newThread,
operationId,
pageSelections,
tempAssistantId,
tempId,
}: {
abortController: AbortController;
context: ConversationContext;
fileIds?: string[];
message: string;
messages: { id: string }[];
newThread?: { sourceMessageId: string; type: string };
operationId: string;
pageSelections?: SendMessageParams['pageSelections'];
tempAssistantId: string;
tempId: string;
}): Promise<SendMessageResult | undefined> => {
const { agentId, topicId } = context;
try {
// 1. Call execAgent TRPC - creates messages + starts background agent task
const result = await lambdaClient.aiAgent.execAgent.mutate(
{
agentId,
appContext: {
groupId: context.groupId ?? undefined,
threadId: context.threadId ?? undefined,
topicId: topicId ?? undefined,
},
fileIds,
newThread: newThread
? { sourceMessageId: newThread.sourceMessageId, type: newThread.type }
: undefined,
newTopic: !topicId ? { topicMessageIds: messages.map((m) => m.id) } : undefined,
pageSelections,
prompt: message,
},
{ signal: abortController.signal },
);
log(
'execAgent result: operationId=%s, topicId=%s, success=%s',
result.operationId,
result.topicId,
result.success,
);
// 2. Update topics if new topic was created
if (result.topics) {
const pageSize = systemStatusSelectors.topicPageSize(useGlobalStore.getState());
this.#get().internal_updateTopics(agentId, {
groupId: context.groupId,
items: result.topics.items as any,
pageSize,
total: result.topics.total,
});
}
// 3. Switch to new topic if created
if (result.isCreateNewTopic && result.topicId) {
await this.#get().switchTopic(result.topicId, {
clearNewKey: true,
skipRefreshMessage: true,
});
}
// 4. Handle thread creation
if (result.createdThreadId) {
this.#get().updateOperationMetadata(operationId, {
createdThreadId: result.createdThreadId,
});
this.#get().openThreadInPortal(result.createdThreadId, context.sourceMessageId);
this.#get().refreshThreads();
}
// 5. Create exec context with updated topicId from server
const execContext = { ...context, topicId: result.topicId || topicId };
// 6. Replace temp messages with server messages
if (result.messages) {
this.#get().replaceMessages(result.messages, {
action: 'sendMessage/agentMode/syncMessages',
context: execContext,
});
this.#get().internal_dispatchMessage(
{ ids: [tempId, tempAssistantId], type: 'deleteMessages' },
{ operationId },
);
}
// 7. Check if operation failed to start (e.g., QStash unavailable)
if (result.success === false) {
log('Agent operation failed to start: %s', result.error);
this.#get().failOperation(operationId, {
message: result.error || 'Agent operation failed to start',
type: 'AgentStartupError',
});
this.#get().internal_toggleMessageLoading(false, result.assistantMessageId);
return {
assistantMessageId: result.assistantMessageId,
createdThreadId: result.createdThreadId,
userMessageId: result.userMessageId,
};
}
// Clear editor temp state after successful creation
this.#get().updateOperationMetadata(operationId, { inputMessageContent: null });
if (ENABLE_BUSINESS_FEATURES) {
markUserValidAction();
}
// 8. Topic title summarization (fire and forget)
if (result.topicId) {
const summaryTitle = async () => {
if (result.isCreateNewTopic) {
await this.#get().summaryTopicTitle(result.topicId, result.messages || []);
return;
}
const topic = topicSelectors.getTopicById(result.topicId)(this.#get());
if (topic && !topic.title) {
const chats = displayMessageSelectors
.getDisplayMessagesByKey(messageMapKey({ agentId, topicId: topic.id }))(this.#get())
.filter((item) => item.id !== result.assistantMessageId);
await this.#get().summaryTopicTitle(topic.id, chats);
}
};
summaryTitle().catch(console.error);
}
// 9. Complete sendMessage operation - agent execution is handled by SSE child operation
this.#get().completeOperation(operationId);
// 10. Create streaming context
const streamContext = {
assistantId: result.assistantMessageId,
content: '',
reasoning: '',
tmpAssistantId: tempAssistantId,
};
// 11. Start child operation for SSE stream using backend operationId
this.#get().startOperation({
context: { ...execContext, messageId: result.assistantMessageId },
label: 'Agent Mode Stream',
operationId: result.operationId,
parentOperationId: operationId,
type: 'agentModeStream',
});
// Associate assistant message with both operations
this.#get().associateMessageWithOperation(result.assistantMessageId, operationId);
this.#get().associateMessageWithOperation(result.assistantMessageId, result.operationId);
// 12. Connect to SSE stream
const { internal_handleAgentStreamEvent } = this.#get();
const eventSource = agentRuntimeClient.createStreamConnection(result.operationId, {
includeHistory: false,
onConnect: () => {
log('Stream connected to %s', result.operationId);
},
onDisconnect: () => {
log('Stream disconnected from %s', result.operationId);
this.#get().completeOperation(result.operationId);
if (result.topicId) this.#get().internal_updateTopicLoading(result.topicId, false);
},
onError: (error: Error) => {
log('Stream error for %s: %O', result.operationId, error);
this.#get().failOperation(result.operationId, {
message: error.message,
type: 'AgentStreamError',
});
if (streamContext.assistantId) {
this.#get().internal_handleAgentError(streamContext.assistantId, error.message);
}
},
onEvent: async (event: StreamEvent) => {
await internal_handleAgentStreamEvent(result.operationId, event, streamContext);
},
});
// 13. Register cancel handler
this.#get().onOperationCancel(result.operationId, async () => {
log('Cancelling SSE stream for operation %s', result.operationId);
eventSource.abort();
// Notify server to stop the agent execution
try {
await aiAgentService.interruptTask({ operationId: result.operationId });
} catch (e) {
log('Failed to interrupt server task: %O', e);
}
});
// 14. Topic loading state
if (result.topicId) this.#get().internal_updateTopicLoading(result.topicId, true);
return {
assistantMessageId: result.assistantMessageId,
createdThreadId: result.createdThreadId,
userMessageId: result.userMessageId,
};
} catch (error) {
const isAbortError =
error instanceof Error &&
(error.name === 'AbortError' ||
error.message.includes('aborted') ||
error.message.includes('cancelled'));
if (isAbortError) {
log('sendMessage (agentMode) aborted by user');
} else {
console.error('sendMessage (agentMode) failed:', error);
this.#get().failOperation(operationId, {
message: error instanceof Error ? error.message : 'Unknown error',
type: 'SendAgentModeMessageError',
});
if (error instanceof TRPCClientError) {
this.#get().updateOperationMetadata(operationId, { inputSendErrorMsg: error.message });
this.#get().mainInputEditor?.setDocument('markdown', message);
}
}
// Clean up temp messages
this.#get().internal_dispatchMessage(
{ ids: [tempId, tempAssistantId], type: 'deleteMessages' },
{ operationId },
);
return undefined;
} finally {
this.#get().internal_toggleMessageLoading(false, tempId);
this.#get().internal_toggleMessageLoading(false, tempAssistantId);
}
};
continueGenerationMessage = async (id: string, messageId: string): Promise<void> => {
const message = dbMessageSelectors.getDbMessageById(id)(this.#get());
if (!message) return;
@@ -564,6 +564,7 @@ export const operationSelectors = {
isAgentRuntimeRunningByContext,
isInputLoadingByContext,
isAnyMessageLoading,
isInputLoadingByContext,
isContinuing,
isInSearchWorkflow,
isMainWindowAgentRuntimeRunning,
+3
View File
@@ -16,6 +16,7 @@ export type OperationType =
| 'continue' // Continue generation
// === AI generation ===
| 'agentModeStream' // Agent mode SSE stream (server-side agent execution)
| 'execAgentRuntime' // Execute agent runtime (client-side, entire agent runtime execution)
| 'execServerAgentRuntime' // Execute server agent runtime (server-side, e.g., Group Chat)
| 'createAssistantMessage' // Create assistant message (sub-operation of execAgentRuntime)
@@ -201,10 +202,12 @@ export interface OperationFilter {
* Used for loading state indicators and animation in UI
*
* Includes:
* - agentModeStream: Agent mode SSE stream (server-side agent execution)
* - execAgentRuntime: Client-side agent execution (single chat)
* - execServerAgentRuntime: Server-side agent execution (Group Chat)
*/
export const AI_RUNTIME_OPERATION_TYPES: OperationType[] = [
'agentModeStream',
'execAgentRuntime',
'execServerAgentRuntime',
];
@@ -83,9 +83,18 @@ export class PluginInternalsActionImpl {
const repairer = new ToolArgumentsRepairer(manifest);
const repairedArgs = repairer.parse(payload.apiName, payload.arguments);
// During streaming, arguments may be partial JSON that can't be parsed.
// ToolArgumentsRepairer.parse() falls back to {} when JSON.parse fails.
// In that case, keep the raw arguments string so streaming renderers
// can use safeParsePartialJSON to extract partial fields (e.g., title, content).
const repairProducedEmpty = Object.keys(repairedArgs).length === 0;
const hasRawContent = !!payload.arguments && payload.arguments !== '{}';
const args =
repairProducedEmpty && hasRawContent ? payload.arguments : JSON.stringify(repairedArgs);
return {
...payload,
arguments: JSON.stringify(repairedArgs),
arguments: args,
source: sourceMap[payload.identifier],
};
});