mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-14 03:30:19 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d4e9553ea8 |
@@ -30,6 +30,50 @@ const messageProcedure = authedProcedure.use(serverDatabase).use(async (opts) =>
|
||||
});
|
||||
|
||||
export const messageRouter = router({
|
||||
/**
|
||||
* Batch operations for messages
|
||||
* Executes multiple operations in a single request and returns the updated message list
|
||||
* This reduces HTTP calls by combining create/update/delete operations
|
||||
*/
|
||||
batchOperations: messageProcedure
|
||||
.input(
|
||||
z
|
||||
.object({
|
||||
operations: z.array(
|
||||
z.object({
|
||||
data: z.record(z.any()).optional(),
|
||||
messageId: z.string(),
|
||||
type: z.enum([
|
||||
'create',
|
||||
'update',
|
||||
'updateMetadata',
|
||||
'updateToolMessage',
|
||||
'updateToolArguments',
|
||||
'delete',
|
||||
]),
|
||||
}),
|
||||
),
|
||||
})
|
||||
.extend(basicContextSchema.shape),
|
||||
)
|
||||
.mutation(async ({ input, ctx }) => {
|
||||
const { operations, agentId, ...options } = input;
|
||||
|
||||
if (!agentId) {
|
||||
throw new TRPCError({
|
||||
code: 'BAD_REQUEST',
|
||||
message: 'agentId is required for batch operations',
|
||||
});
|
||||
}
|
||||
|
||||
const resolved = await resolveContext({ agentId, ...options }, ctx.serverDB, ctx.userId);
|
||||
|
||||
return ctx.messageService.batchOperations(operations, {
|
||||
...resolved,
|
||||
agentId,
|
||||
});
|
||||
}),
|
||||
|
||||
addFilesToMessage: messageProcedure
|
||||
.input(
|
||||
z
|
||||
|
||||
@@ -256,4 +256,94 @@ export class MessageService {
|
||||
}
|
||||
return this.queryWithSuccess(options);
|
||||
}
|
||||
|
||||
/**
|
||||
* Batch operations for messages
|
||||
* Executes multiple operations in a single transaction and returns the updated message list
|
||||
*
|
||||
* Supported operation types:
|
||||
* - create: Create a new message
|
||||
* - update: Update message content/error/reasoning
|
||||
* - updateMetadata: Update message metadata
|
||||
* - updateToolMessage: Update tool message (content, metadata, pluginState, pluginError)
|
||||
* - delete: Delete a message
|
||||
*
|
||||
* @param operations - Array of operations to execute
|
||||
* @param options - Query options for returning updated messages
|
||||
*/
|
||||
async batchOperations(
|
||||
operations: Array<{
|
||||
data?: Record<string, any>;
|
||||
messageId: string;
|
||||
type: 'create' | 'update' | 'updateMetadata' | 'updateToolMessage' | 'updateToolArguments' | 'delete';
|
||||
}>,
|
||||
options: QueryOptions & { agentId: string },
|
||||
): Promise<{ messages: UIChatMessage[]; success: boolean }> {
|
||||
// Execute all operations sequentially
|
||||
// Note: We don't wrap in a transaction here because each model method
|
||||
// may have its own transaction logic. For true atomicity, we'd need
|
||||
// to refactor the model layer.
|
||||
for (const op of operations) {
|
||||
try {
|
||||
switch (op.type) {
|
||||
case 'create': {
|
||||
await this.messageModel.create({
|
||||
...op.data,
|
||||
agentId: options.agentId,
|
||||
groupId: options.groupId,
|
||||
id: op.messageId, // Use frontend-generated ID
|
||||
topicId: options.topicId,
|
||||
} as CreateMessageParams);
|
||||
break;
|
||||
}
|
||||
case 'update': {
|
||||
await this.messageModel.update(op.messageId, op.data as any);
|
||||
break;
|
||||
}
|
||||
case 'updateMetadata': {
|
||||
if (op.data) {
|
||||
await this.messageModel.updateMetadata(op.messageId, op.data);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'updateToolMessage': {
|
||||
await this.messageModel.updateToolMessage(op.messageId, op.data as any);
|
||||
break;
|
||||
}
|
||||
case 'updateToolArguments': {
|
||||
if (op.data?.toolCallId && op.data?.arguments) {
|
||||
const argsString =
|
||||
typeof op.data.arguments === 'string'
|
||||
? op.data.arguments
|
||||
: JSON.stringify(op.data.arguments);
|
||||
await this.messageModel.updateToolArguments(op.data.toolCallId, argsString);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'delete': {
|
||||
await this.messageModel.deleteMessage(op.messageId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
console.error(`Batch operation failed for ${op.type} on message ${op.messageId}:`, error);
|
||||
// Continue with other operations even if one fails
|
||||
}
|
||||
}
|
||||
|
||||
// Query and return the updated message list
|
||||
const messages = await this.messageModel.query(
|
||||
{
|
||||
agentId: options.agentId,
|
||||
current: 0,
|
||||
groupId: options.groupId,
|
||||
pageSize: 9999,
|
||||
threadId: options.threadId,
|
||||
topicId: options.topicId,
|
||||
},
|
||||
this.getQueryOptions(),
|
||||
);
|
||||
|
||||
return { messages, success: true };
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,387 @@
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { MessagesBatcher } from '../batcher';
|
||||
|
||||
// Mock the lambdaClient
|
||||
vi.mock('@/libs/trpc/client', () => ({
|
||||
lambdaClient: {
|
||||
message: {
|
||||
batchOperations: {
|
||||
mutate: vi.fn(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
describe('MessagesBatcher', () => {
|
||||
let batcher: MessagesBatcher;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
batcher = new MessagesBatcher();
|
||||
});
|
||||
|
||||
describe('dispatch', () => {
|
||||
it('should add operation to queue', () => {
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
expect(batcher.getQueueLength()).toBe(1);
|
||||
});
|
||||
|
||||
it('should merge create + update operations', () => {
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello world' },
|
||||
type: 'update',
|
||||
});
|
||||
|
||||
// Should still be 1 operation (merged)
|
||||
expect(batcher.getQueueLength()).toBe(1);
|
||||
});
|
||||
|
||||
it('should cancel out create + delete operations', () => {
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: {},
|
||||
type: 'delete',
|
||||
});
|
||||
|
||||
// Should be empty (cancelled out)
|
||||
expect(batcher.getQueueLength()).toBe(0);
|
||||
});
|
||||
|
||||
it('should merge update + update operations', () => {
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'update',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { reasoning: 'thinking...' },
|
||||
type: 'update',
|
||||
});
|
||||
|
||||
expect(batcher.getQueueLength()).toBe(1);
|
||||
});
|
||||
|
||||
it('should replace update with delete', () => {
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'update',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: {},
|
||||
type: 'delete',
|
||||
});
|
||||
|
||||
expect(batcher.getQueueLength()).toBe(1);
|
||||
});
|
||||
|
||||
it('should merge updateMetadata operations', () => {
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { key1: 'value1' },
|
||||
type: 'updateMetadata',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { key2: 'value2' },
|
||||
type: 'updateMetadata',
|
||||
});
|
||||
|
||||
expect(batcher.getQueueLength()).toBe(1);
|
||||
});
|
||||
|
||||
it('should merge updateToolMessage operations', () => {
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { pluginState: { step: 1 } },
|
||||
type: 'updateToolMessage',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'result' },
|
||||
type: 'updateToolMessage',
|
||||
});
|
||||
|
||||
expect(batcher.getQueueLength()).toBe(1);
|
||||
});
|
||||
|
||||
it('should keep operations for different messages separate', () => {
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-2',
|
||||
payload: { content: 'world' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
expect(batcher.getQueueLength()).toBe(2);
|
||||
});
|
||||
|
||||
it('should keep operations for different contexts separate', () => {
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1', topicId: 'topic-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'update',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1', topicId: 'topic-2' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'world' },
|
||||
type: 'update',
|
||||
});
|
||||
|
||||
// Same messageId but different context, so they should be separate
|
||||
expect(batcher.getQueueLength()).toBe(2);
|
||||
});
|
||||
});
|
||||
|
||||
describe('microtask flush', () => {
|
||||
it('should flush via microtask after dispatch', async () => {
|
||||
const { lambdaClient } = await import('@/libs/trpc/client');
|
||||
vi.mocked(lambdaClient.message.batchOperations.mutate).mockResolvedValue({
|
||||
messages: [],
|
||||
success: true,
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
// Not flushed yet (synchronously)
|
||||
expect(lambdaClient.message.batchOperations.mutate).not.toHaveBeenCalled();
|
||||
|
||||
// Wait for microtask to complete
|
||||
await Promise.resolve();
|
||||
await Promise.resolve(); // Extra tick for the flush to complete
|
||||
|
||||
expect(lambdaClient.message.batchOperations.mutate).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('should batch multiple synchronous dispatches into one flush', async () => {
|
||||
const { lambdaClient } = await import('@/libs/trpc/client');
|
||||
vi.mocked(lambdaClient.message.batchOperations.mutate).mockResolvedValue({
|
||||
messages: [],
|
||||
success: true,
|
||||
});
|
||||
|
||||
// Multiple synchronous dispatches
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-2',
|
||||
payload: { content: 'world' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
// Wait for microtask
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
// Should have been called only once with both operations
|
||||
expect(lambdaClient.message.batchOperations.mutate).toHaveBeenCalledTimes(1);
|
||||
expect(lambdaClient.message.batchOperations.mutate).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
operations: expect.arrayContaining([
|
||||
expect.objectContaining({ messageId: 'msg-1' }),
|
||||
expect.objectContaining({ messageId: 'msg-2' }),
|
||||
]),
|
||||
}),
|
||||
);
|
||||
});
|
||||
|
||||
it('should clear the queue after flush', async () => {
|
||||
const { lambdaClient } = await import('@/libs/trpc/client');
|
||||
vi.mocked(lambdaClient.message.batchOperations.mutate).mockResolvedValue({
|
||||
messages: [],
|
||||
success: true,
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
// Wait for flush
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
expect(batcher.getQueueLength()).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('error handling', () => {
|
||||
it('should log error but not re-queue on failure', async () => {
|
||||
const { lambdaClient } = await import('@/libs/trpc/client');
|
||||
const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
|
||||
vi.mocked(lambdaClient.message.batchOperations.mutate).mockRejectedValue(
|
||||
new Error('Network error'),
|
||||
);
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
// Wait for flush
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
await Promise.resolve(); // Extra tick for error handling
|
||||
|
||||
// Error should be logged
|
||||
expect(consoleErrorSpy).toHaveBeenCalled();
|
||||
|
||||
// Queue should be empty (not re-queued - local cache is source of truth)
|
||||
expect(batcher.getQueueLength()).toBe(0);
|
||||
|
||||
consoleErrorSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
describe('complex scenarios', () => {
|
||||
it('should handle rapid create-update-delete sequence', () => {
|
||||
// Simulate rapid user actions
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello world' },
|
||||
type: 'update',
|
||||
});
|
||||
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: {},
|
||||
type: 'delete',
|
||||
});
|
||||
|
||||
// All should cancel out
|
||||
expect(batcher.getQueueLength()).toBe(0);
|
||||
});
|
||||
|
||||
it('should handle multiple messages with mixed operations', async () => {
|
||||
const { lambdaClient } = await import('@/libs/trpc/client');
|
||||
vi.mocked(lambdaClient.message.batchOperations.mutate).mockResolvedValue({
|
||||
messages: [],
|
||||
success: true,
|
||||
});
|
||||
|
||||
// Create msg-1
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
// Create msg-2
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-2',
|
||||
payload: { content: 'world' },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
// Update msg-1
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-1',
|
||||
payload: { content: 'hello updated' },
|
||||
type: 'update',
|
||||
});
|
||||
|
||||
// Delete msg-2
|
||||
batcher.dispatch({
|
||||
context: { agentId: 'agent-1' },
|
||||
messageId: 'msg-2',
|
||||
payload: {},
|
||||
type: 'delete',
|
||||
});
|
||||
|
||||
// Should have 1 operation (create msg-1 with updated content)
|
||||
// msg-2 create + delete should cancel out
|
||||
expect(batcher.getQueueLength()).toBe(1);
|
||||
|
||||
// Wait for flush
|
||||
await Promise.resolve();
|
||||
await Promise.resolve();
|
||||
|
||||
expect(lambdaClient.message.batchOperations.mutate).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
operations: [
|
||||
expect.objectContaining({
|
||||
data: expect.objectContaining({ content: 'hello updated' }),
|
||||
messageId: 'msg-1',
|
||||
type: 'create',
|
||||
}),
|
||||
],
|
||||
}),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,157 +0,0 @@
|
||||
import { beforeEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { lambdaClient } from '@/libs/trpc/client';
|
||||
|
||||
import { MessageService } from '../index';
|
||||
|
||||
vi.mock('@/libs/trpc/client', () => ({
|
||||
lambdaClient: {
|
||||
message: {
|
||||
updateMetadata: {
|
||||
mutate: vi.fn(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}));
|
||||
|
||||
describe('MessageService - Race Condition Control', () => {
|
||||
let messageService: MessageService;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
messageService = new MessageService();
|
||||
});
|
||||
|
||||
describe('updateMessageMetadata race condition', () => {
|
||||
it('should cancel previous request when new update is triggered for same message', async () => {
|
||||
const messageId = 'test-message-id';
|
||||
let firstRequestAborted = false;
|
||||
let secondRequestCompleted = false;
|
||||
|
||||
// Mock first request (slow)
|
||||
vi.mocked(lambdaClient.message.updateMetadata.mutate).mockImplementationOnce(
|
||||
(_params, options) =>
|
||||
new Promise((resolve, reject) => {
|
||||
const signal = options?.signal;
|
||||
if (signal) {
|
||||
signal.addEventListener('abort', () => {
|
||||
firstRequestAborted = true;
|
||||
reject(new Error('Aborted'));
|
||||
});
|
||||
}
|
||||
// Simulate slow request
|
||||
setTimeout(() => resolve({ success: true, messages: [] }), 200);
|
||||
}),
|
||||
);
|
||||
|
||||
// Mock second request (fast)
|
||||
vi.mocked(lambdaClient.message.updateMetadata.mutate).mockImplementationOnce(
|
||||
async (_params, _options) => {
|
||||
secondRequestCompleted = true;
|
||||
return { success: true, messages: [] };
|
||||
},
|
||||
);
|
||||
|
||||
// Start first update
|
||||
const firstPromise = messageService.updateMessageMetadata(messageId, { compare: true });
|
||||
|
||||
// Wait a bit then start second update
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
const secondPromise = messageService.updateMessageMetadata(messageId, { compare: false });
|
||||
|
||||
// First should be aborted
|
||||
await expect(firstPromise).rejects.toThrow('Aborted');
|
||||
expect(firstRequestAborted).toBe(true);
|
||||
|
||||
// Second should complete successfully
|
||||
await expect(secondPromise).resolves.toEqual({ success: true, messages: [] });
|
||||
expect(secondRequestCompleted).toBe(true);
|
||||
});
|
||||
|
||||
it('should allow concurrent updates for different messages', async () => {
|
||||
const message1Id = 'message-1';
|
||||
const message2Id = 'message-2';
|
||||
|
||||
vi.mocked(lambdaClient.message.updateMetadata.mutate).mockResolvedValue({
|
||||
success: true,
|
||||
messages: [],
|
||||
});
|
||||
|
||||
const [result1, result2] = await Promise.all([
|
||||
messageService.updateMessageMetadata(message1Id, { cost: 0.001 }),
|
||||
messageService.updateMessageMetadata(message2Id, { cost: 0.002 }),
|
||||
]);
|
||||
|
||||
expect(result1).toEqual({ success: true, messages: [] });
|
||||
expect(result2).toEqual({ success: true, messages: [] });
|
||||
expect(lambdaClient.message.updateMetadata.mutate).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('should handle rapid successive updates correctly', async () => {
|
||||
const messageId = 'test-message-id';
|
||||
let completedUpdates = 0;
|
||||
const abortedUpdates: number[] = [];
|
||||
|
||||
// All but the last request should be aborted
|
||||
let callIndex = 0;
|
||||
vi.mocked(lambdaClient.message.updateMetadata.mutate).mockImplementation(
|
||||
(_params, options) => {
|
||||
const currentIndex = callIndex++;
|
||||
return new Promise((resolve, reject) => {
|
||||
const signal = options?.signal;
|
||||
let isAborted = false;
|
||||
|
||||
if (signal) {
|
||||
signal.addEventListener('abort', () => {
|
||||
isAborted = true;
|
||||
abortedUpdates.push(currentIndex);
|
||||
reject(new Error('Aborted'));
|
||||
});
|
||||
}
|
||||
|
||||
setTimeout(() => {
|
||||
if (!isAborted) {
|
||||
completedUpdates++;
|
||||
resolve({ success: true, messages: [] });
|
||||
}
|
||||
}, 50);
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
// Trigger 5 rapid updates sequentially with catch to prevent unhandled rejections
|
||||
const promise1 = messageService
|
||||
.updateMessageMetadata(messageId, { cost: 0.001 })
|
||||
.catch((e) => e);
|
||||
await new Promise((resolve) => setTimeout(resolve, 5));
|
||||
const promise2 = messageService
|
||||
.updateMessageMetadata(messageId, { cost: 0.002 })
|
||||
.catch((e) => e);
|
||||
await new Promise((resolve) => setTimeout(resolve, 5));
|
||||
const promise3 = messageService.updateMessageMetadata(messageId, { tps: 10 }).catch((e) => e);
|
||||
await new Promise((resolve) => setTimeout(resolve, 5));
|
||||
const promise4 = messageService.updateMessageMetadata(messageId, { tps: 20 }).catch((e) => e);
|
||||
await new Promise((resolve) => setTimeout(resolve, 5));
|
||||
const promise5 = messageService
|
||||
.updateMessageMetadata(messageId, { compare: true })
|
||||
.catch((e) => e);
|
||||
|
||||
// Wait for all to settle
|
||||
const results = await Promise.all([promise1, promise2, promise3, promise4, promise5]);
|
||||
|
||||
// First 4 should be errors (aborted), last should succeed
|
||||
expect(results[0]).toBeInstanceOf(Error);
|
||||
expect(results[1]).toBeInstanceOf(Error);
|
||||
expect(results[2]).toBeInstanceOf(Error);
|
||||
expect(results[3]).toBeInstanceOf(Error);
|
||||
expect(results[4]).toEqual({ success: true, messages: [] });
|
||||
|
||||
// 4 requests should have been aborted
|
||||
expect(abortedUpdates.length).toBe(4);
|
||||
expect(abortedUpdates).toEqual([0, 1, 2, 3]);
|
||||
|
||||
// Only the last request should complete
|
||||
expect(completedUpdates).toBe(1);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,218 @@
|
||||
import { nanoid } from 'nanoid';
|
||||
|
||||
import { lambdaClient } from '@/libs/trpc/client';
|
||||
import { messageMapKey, type MessageMapKeyInput } from '@/store/chat/utils/messageMapKey';
|
||||
|
||||
// ==========================================
|
||||
// Types
|
||||
// ==========================================
|
||||
|
||||
export type BatchOperationType =
|
||||
| 'create'
|
||||
| 'update'
|
||||
| 'updateMetadata'
|
||||
| 'updateToolMessage'
|
||||
| 'updateToolArguments'
|
||||
| 'delete';
|
||||
|
||||
export interface BatchOperation {
|
||||
context: MessageMapKeyInput;
|
||||
contextKey: string;
|
||||
messageId: string;
|
||||
payload: Record<string, any>;
|
||||
type: BatchOperationType;
|
||||
}
|
||||
|
||||
interface BatchOperationsInput {
|
||||
agentId?: string;
|
||||
groupId?: string;
|
||||
operations: Array<{
|
||||
data?: Record<string, any>;
|
||||
messageId: string;
|
||||
type: BatchOperationType;
|
||||
}>;
|
||||
threadId?: string | null;
|
||||
topicId?: string | null;
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// MessagesBatcher Class
|
||||
// ==========================================
|
||||
|
||||
/**
|
||||
* Messages Batcher - batches multiple message operations into a single request
|
||||
*
|
||||
* Uses microtask scheduling to merge operations within the same event loop,
|
||||
* then flushes them asynchronously without blocking the caller.
|
||||
*/
|
||||
export class MessagesBatcher {
|
||||
private flushPromise: Promise<void> | null = null;
|
||||
private queue: BatchOperation[] = [];
|
||||
|
||||
// ==========================================
|
||||
// Public API
|
||||
// ==========================================
|
||||
|
||||
/**
|
||||
* Dispatch an operation to the batcher queue
|
||||
* Operations are automatically merged if they target the same message
|
||||
* Triggers an async flush via microtask
|
||||
*/
|
||||
dispatch(op: Omit<BatchOperation, 'contextKey'>): void {
|
||||
const contextKey = messageMapKey(op.context);
|
||||
this.mergeOrPush({ ...op, contextKey });
|
||||
this.scheduleFlush();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current queue length (for testing/debugging)
|
||||
*/
|
||||
getQueueLength(): number {
|
||||
return this.queue.length;
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// Queue Management
|
||||
// ==========================================
|
||||
|
||||
private mergeOrPush(op: BatchOperation): void {
|
||||
const existingIndex = this.queue.findIndex(
|
||||
(o) => o.messageId === op.messageId && o.contextKey === op.contextKey,
|
||||
);
|
||||
|
||||
if (existingIndex >= 0) {
|
||||
const existing = this.queue[existingIndex];
|
||||
|
||||
// Merge strategies based on operation type combinations
|
||||
if (existing.type === 'create' && op.type === 'update') {
|
||||
// create + update = create with merged payload
|
||||
existing.payload = { ...existing.payload, ...op.payload };
|
||||
} else if (existing.type === 'create' && op.type === 'updateMetadata') {
|
||||
// create + updateMetadata = create with merged metadata
|
||||
existing.payload = {
|
||||
...existing.payload,
|
||||
metadata: { ...existing.payload.metadata, ...op.payload },
|
||||
};
|
||||
} else if (existing.type === 'create' && op.type === 'delete') {
|
||||
// create + delete = cancel out (remove from queue)
|
||||
this.queue.splice(existingIndex, 1);
|
||||
return;
|
||||
} else if (existing.type === 'update' && op.type === 'update') {
|
||||
// update + update = merge updates
|
||||
existing.payload = { ...existing.payload, ...op.payload };
|
||||
} else if (existing.type === 'update' && op.type === 'updateMetadata') {
|
||||
// update + updateMetadata = merge
|
||||
existing.payload = {
|
||||
...existing.payload,
|
||||
metadata: { ...existing.payload.metadata, ...op.payload },
|
||||
};
|
||||
} else if (existing.type === 'update' && op.type === 'delete') {
|
||||
// update + delete = just delete
|
||||
this.queue[existingIndex] = op;
|
||||
} else if (existing.type === 'updateMetadata' && op.type === 'updateMetadata') {
|
||||
// metadata + metadata = merge
|
||||
existing.payload = { ...existing.payload, ...op.payload };
|
||||
} else if (existing.type === 'updateMetadata' && op.type === 'delete') {
|
||||
// metadata + delete = just delete
|
||||
this.queue[existingIndex] = op;
|
||||
} else if (existing.type === 'updateToolMessage' && op.type === 'updateToolMessage') {
|
||||
// toolMessage + toolMessage = merge
|
||||
existing.payload = { ...existing.payload, ...op.payload };
|
||||
} else if (existing.type === 'updateToolMessage' && op.type === 'delete') {
|
||||
// toolMessage + delete = just delete
|
||||
this.queue[existingIndex] = op;
|
||||
} else {
|
||||
// Other combinations: just append
|
||||
this.queue.push(op);
|
||||
}
|
||||
} else {
|
||||
this.queue.push(op);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Schedule a flush using microtask
|
||||
* This ensures all synchronous operations in the same event loop are batched together
|
||||
*/
|
||||
private scheduleFlush(): void {
|
||||
if (!this.flushPromise) {
|
||||
this.flushPromise = Promise.resolve().then(() => {
|
||||
this.flushPromise = null;
|
||||
this.flush();
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// Flush Operations
|
||||
// ==========================================
|
||||
|
||||
private async flush(): Promise<void> {
|
||||
if (this.queue.length === 0) return;
|
||||
|
||||
const ops = [...this.queue];
|
||||
this.queue = [];
|
||||
|
||||
// Group by context
|
||||
const byContext = this.groupByContext(ops);
|
||||
|
||||
// Send batches in parallel for different contexts
|
||||
await Promise.all(
|
||||
Object.entries(byContext).map(([contextKey, contextOps]) =>
|
||||
this.sendBatch(contextKey, contextOps),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
private groupByContext(ops: BatchOperation[]): Record<string, BatchOperation[]> {
|
||||
return ops.reduce(
|
||||
(acc, op) => {
|
||||
if (!acc[op.contextKey]) acc[op.contextKey] = [];
|
||||
acc[op.contextKey].push(op);
|
||||
return acc;
|
||||
},
|
||||
{} as Record<string, BatchOperation[]>,
|
||||
);
|
||||
}
|
||||
|
||||
private async sendBatch(_contextKey: string, ops: BatchOperation[]): Promise<void> {
|
||||
try {
|
||||
const context = ops[0].context;
|
||||
|
||||
const input: BatchOperationsInput = {
|
||||
agentId: context.agentId,
|
||||
groupId: context.groupId,
|
||||
operations: ops.map((op) => ({
|
||||
data: op.payload,
|
||||
messageId: op.messageId,
|
||||
type: op.type,
|
||||
})),
|
||||
threadId: context.threadId,
|
||||
topicId: context.topicId,
|
||||
};
|
||||
|
||||
await lambdaClient.message.batchOperations.mutate(input);
|
||||
} catch (error) {
|
||||
// Log error but don't re-queue - local cache is source of truth
|
||||
console.error('Batch sync failed:', error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ==========================================
|
||||
// Helper: Generate Message ID
|
||||
// ==========================================
|
||||
|
||||
/**
|
||||
* Generate a new message ID (frontend-generated)
|
||||
* This allows optimistic updates without waiting for server
|
||||
*/
|
||||
export const generateMessageId = (): string => {
|
||||
return nanoid();
|
||||
};
|
||||
|
||||
// ==========================================
|
||||
// Singleton Instance
|
||||
// ==========================================
|
||||
|
||||
export const messagesBatcher = new MessagesBatcher();
|
||||
@@ -0,0 +1,124 @@
|
||||
import type { UIChatMessage } from '@lobechat/types';
|
||||
|
||||
import { messageMapKey, type MessageMapKeyInput } from '@/store/chat/utils/messageMapKey';
|
||||
|
||||
/**
|
||||
* Local messages cache for optimistic updates
|
||||
* Stores messages grouped by context key (agentId + topicId + threadId)
|
||||
*/
|
||||
class MessagesCache {
|
||||
private cache: Map<string, UIChatMessage[]> = new Map();
|
||||
|
||||
/**
|
||||
* Get context key from input
|
||||
*/
|
||||
private getKey(ctx: MessageMapKeyInput): string {
|
||||
return messageMapKey(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get messages for a context
|
||||
*/
|
||||
get(ctx: MessageMapKeyInput): UIChatMessage[] | undefined {
|
||||
return this.cache.get(this.getKey(ctx));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if cache has messages for a context
|
||||
*/
|
||||
has(ctx: MessageMapKeyInput): boolean {
|
||||
return this.cache.has(this.getKey(ctx));
|
||||
}
|
||||
|
||||
/**
|
||||
* Set messages for a context (used when loading from backend)
|
||||
*/
|
||||
set(ctx: MessageMapKeyInput, messages: UIChatMessage[]): void {
|
||||
this.cache.set(this.getKey(ctx), [...messages]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a message to the cache (optimistic create)
|
||||
*/
|
||||
addMessage(ctx: MessageMapKeyInput, message: UIChatMessage): UIChatMessage[] {
|
||||
const key = this.getKey(ctx);
|
||||
const messages = this.cache.get(key) || [];
|
||||
const updated = [...messages, message];
|
||||
this.cache.set(key, updated);
|
||||
return updated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update a message in the cache (optimistic update)
|
||||
*/
|
||||
updateMessage(
|
||||
ctx: MessageMapKeyInput,
|
||||
id: string,
|
||||
value: Partial<UIChatMessage>,
|
||||
): UIChatMessage[] {
|
||||
const key = this.getKey(ctx);
|
||||
const messages = this.cache.get(key) || [];
|
||||
const updated = messages.map((m) => (m.id === id ? { ...m, ...value, updatedAt: Date.now() } : m));
|
||||
this.cache.set(key, updated);
|
||||
return updated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update message metadata in the cache
|
||||
*/
|
||||
updateMessageMetadata(
|
||||
ctx: MessageMapKeyInput,
|
||||
id: string,
|
||||
metadata: Record<string, any>,
|
||||
): UIChatMessage[] {
|
||||
const key = this.getKey(ctx);
|
||||
const messages = this.cache.get(key) || [];
|
||||
const updated = messages.map((m) =>
|
||||
m.id === id
|
||||
? { ...m, metadata: { ...m.metadata, ...metadata }, updatedAt: Date.now() }
|
||||
: m,
|
||||
);
|
||||
this.cache.set(key, updated);
|
||||
return updated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a message from the cache (optimistic delete)
|
||||
*/
|
||||
removeMessage(ctx: MessageMapKeyInput, id: string): UIChatMessage[] {
|
||||
const key = this.getKey(ctx);
|
||||
const messages = this.cache.get(key) || [];
|
||||
const updated = messages.filter((m) => m.id !== id);
|
||||
this.cache.set(key, updated);
|
||||
return updated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove multiple messages from the cache
|
||||
*/
|
||||
removeMessages(ctx: MessageMapKeyInput, ids: string[]): UIChatMessage[] {
|
||||
const key = this.getKey(ctx);
|
||||
const messages = this.cache.get(key) || [];
|
||||
const idSet = new Set(ids);
|
||||
const updated = messages.filter((m) => !idSet.has(m.id));
|
||||
this.cache.set(key, updated);
|
||||
return updated;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear cache for a specific context
|
||||
*/
|
||||
clear(ctx: MessageMapKeyInput): void {
|
||||
this.cache.delete(this.getKey(ctx));
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all cache
|
||||
*/
|
||||
clearAll(): void {
|
||||
this.cache.clear();
|
||||
}
|
||||
}
|
||||
|
||||
// Singleton instance
|
||||
export const messagesCache = new MessagesCache();
|
||||
+257
-76
@@ -4,20 +4,19 @@ import {
|
||||
type ChatTTS,
|
||||
type ChatTranslate,
|
||||
type CreateMessageParams,
|
||||
type CreateMessageResult,
|
||||
type MessageMetadata,
|
||||
type MessagePluginItem,
|
||||
type ModelRankItem,
|
||||
type UIChatMessage,
|
||||
type UpdateMessageParams,
|
||||
type UpdateMessageRAGParams,
|
||||
type UpdateMessageResult,
|
||||
} from '@lobechat/types';
|
||||
import type { HeatmapsProps } from '@lobehub/charts';
|
||||
|
||||
import { lambdaClient } from '@/libs/trpc/client';
|
||||
|
||||
import { abortableRequest } from '../utils/abortableRequest';
|
||||
import { generateMessageId, messagesBatcher } from './batcher';
|
||||
import { messagesCache } from './cache';
|
||||
|
||||
/**
|
||||
* Query context for message operations
|
||||
@@ -32,16 +31,246 @@ export interface MessageQueryContext {
|
||||
}
|
||||
|
||||
export class MessageService {
|
||||
createMessage = async (params: CreateMessageParams): Promise<CreateMessageResult> => {
|
||||
return lambdaClient.message.createMessage.mutate(params as any);
|
||||
// ==========================================
|
||||
// Batched Operations (with local cache)
|
||||
// These return immediately from local cache
|
||||
// ==========================================
|
||||
|
||||
/**
|
||||
* Create a message with optimistic update
|
||||
* Updates local cache immediately, dispatches to batcher
|
||||
*/
|
||||
createMessage = async (
|
||||
params: CreateMessageParams & { id?: string },
|
||||
): Promise<{ id: string; messages: UIChatMessage[] }> => {
|
||||
const id = params.id || generateMessageId();
|
||||
const now = Date.now();
|
||||
|
||||
if (!params.agentId) {
|
||||
console.warn('createMessage: agentId is required');
|
||||
return { id, messages: [] };
|
||||
}
|
||||
|
||||
const ctx = {
|
||||
agentId: params.agentId,
|
||||
groupId: params.groupId ?? undefined,
|
||||
threadId: params.threadId,
|
||||
topicId: params.topicId,
|
||||
};
|
||||
|
||||
// Create the message object for local cache
|
||||
const newMessage: UIChatMessage = {
|
||||
...params,
|
||||
content: params.content || '',
|
||||
createdAt: now,
|
||||
id,
|
||||
meta: {},
|
||||
updatedAt: now,
|
||||
} as UIChatMessage;
|
||||
|
||||
// Update local cache
|
||||
const messages = messagesCache.addMessage(ctx, newMessage);
|
||||
|
||||
// Dispatch to batcher (async, non-blocking)
|
||||
messagesBatcher.dispatch({
|
||||
context: ctx,
|
||||
messageId: id,
|
||||
payload: { ...params, id },
|
||||
type: 'create',
|
||||
});
|
||||
|
||||
return { id, messages };
|
||||
};
|
||||
|
||||
/**
|
||||
* Update a message with optimistic update
|
||||
*/
|
||||
updateMessage = async (
|
||||
id: string,
|
||||
value: Partial<UpdateMessageParams>,
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<{ messages?: UIChatMessage[]; success: boolean }> => {
|
||||
if (!ctx?.agentId) {
|
||||
console.warn('updateMessage: agentId is required');
|
||||
return { success: false };
|
||||
}
|
||||
|
||||
const context = {
|
||||
agentId: ctx.agentId,
|
||||
groupId: ctx.groupId,
|
||||
threadId: ctx.threadId,
|
||||
topicId: ctx.topicId,
|
||||
};
|
||||
|
||||
// Update local cache
|
||||
const messages = messagesCache.updateMessage(context, id, value as Partial<UIChatMessage>);
|
||||
|
||||
// Dispatch to batcher
|
||||
messagesBatcher.dispatch({
|
||||
context,
|
||||
messageId: id,
|
||||
payload: value,
|
||||
type: 'update',
|
||||
});
|
||||
|
||||
return { messages, success: true };
|
||||
};
|
||||
|
||||
/**
|
||||
* Update message metadata with optimistic update
|
||||
*/
|
||||
updateMessageMetadata = async (
|
||||
id: string,
|
||||
value: Partial<MessageMetadata>,
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<{ messages?: UIChatMessage[]; success: boolean }> => {
|
||||
if (!ctx?.agentId) {
|
||||
console.warn('updateMessageMetadata: agentId is required');
|
||||
return { success: false };
|
||||
}
|
||||
|
||||
const context = {
|
||||
agentId: ctx.agentId,
|
||||
groupId: ctx.groupId,
|
||||
threadId: ctx.threadId,
|
||||
topicId: ctx.topicId,
|
||||
};
|
||||
|
||||
// Update local cache
|
||||
const messages = messagesCache.updateMessageMetadata(context, id, value);
|
||||
|
||||
// Dispatch to batcher
|
||||
messagesBatcher.dispatch({
|
||||
context,
|
||||
messageId: id,
|
||||
payload: value,
|
||||
type: 'updateMetadata',
|
||||
});
|
||||
|
||||
return { messages, success: true };
|
||||
};
|
||||
|
||||
/**
|
||||
* Update tool message with optimistic update
|
||||
*/
|
||||
updateToolMessage = async (
|
||||
id: string,
|
||||
value: {
|
||||
content?: string;
|
||||
metadata?: Record<string, any>;
|
||||
pluginError?: any;
|
||||
pluginState?: Record<string, any>;
|
||||
},
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<{ messages?: UIChatMessage[]; success: boolean }> => {
|
||||
if (!ctx?.agentId) {
|
||||
console.warn('updateToolMessage: agentId is required');
|
||||
return { success: false };
|
||||
}
|
||||
|
||||
const context = {
|
||||
agentId: ctx.agentId,
|
||||
groupId: ctx.groupId,
|
||||
threadId: ctx.threadId,
|
||||
topicId: ctx.topicId,
|
||||
};
|
||||
|
||||
// Update local cache with the relevant fields
|
||||
const updateValue: Partial<UIChatMessage> = {};
|
||||
if (value.content !== undefined) updateValue.content = value.content;
|
||||
if (value.metadata !== undefined) updateValue.metadata = value.metadata;
|
||||
if (value.pluginError !== undefined) (updateValue as any).pluginError = value.pluginError;
|
||||
if (value.pluginState !== undefined) (updateValue as any).pluginState = value.pluginState;
|
||||
|
||||
const messages = messagesCache.updateMessage(context, id, updateValue);
|
||||
|
||||
// Dispatch to batcher
|
||||
messagesBatcher.dispatch({
|
||||
context,
|
||||
messageId: id,
|
||||
payload: value,
|
||||
type: 'updateToolMessage',
|
||||
});
|
||||
|
||||
return { messages, success: true };
|
||||
};
|
||||
|
||||
/**
|
||||
* Remove a message with optimistic update
|
||||
*/
|
||||
removeMessage = async (
|
||||
id: string,
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<{ messages?: UIChatMessage[]; success: boolean }> => {
|
||||
if (!ctx?.agentId) {
|
||||
console.warn('removeMessage: agentId is required');
|
||||
return { success: false };
|
||||
}
|
||||
|
||||
const context = {
|
||||
agentId: ctx.agentId,
|
||||
groupId: ctx.groupId,
|
||||
threadId: ctx.threadId,
|
||||
topicId: ctx.topicId,
|
||||
};
|
||||
|
||||
// Remove from local cache
|
||||
const messages = messagesCache.removeMessage(context, id);
|
||||
|
||||
// Dispatch to batcher
|
||||
messagesBatcher.dispatch({
|
||||
context,
|
||||
messageId: id,
|
||||
payload: {},
|
||||
type: 'delete',
|
||||
});
|
||||
|
||||
return { messages, success: true };
|
||||
};
|
||||
|
||||
// ==========================================
|
||||
// Query Operations (with cache)
|
||||
// ==========================================
|
||||
|
||||
getMessages = async (params: MessageQueryContext): Promise<UIChatMessage[]> => {
|
||||
const data = await lambdaClient.message.getMessages.query(params);
|
||||
// For shared topics, always fetch from backend
|
||||
if (params.topicShareId) {
|
||||
const data = await lambdaClient.message.getMessages.query(params);
|
||||
return data as unknown as UIChatMessage[];
|
||||
}
|
||||
|
||||
// Check if agentId is provided for cache
|
||||
if (params.agentId) {
|
||||
const ctx = {
|
||||
agentId: params.agentId,
|
||||
groupId: params.groupId,
|
||||
threadId: params.threadId,
|
||||
topicId: params.topicId,
|
||||
};
|
||||
|
||||
// Check cache first
|
||||
const cached = messagesCache.get(ctx);
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
|
||||
// Fetch from backend and cache
|
||||
const data = await lambdaClient.message.getMessages.query(params);
|
||||
const messages = data as unknown as UIChatMessage[];
|
||||
messagesCache.set(ctx, messages);
|
||||
return messages;
|
||||
}
|
||||
|
||||
// Fallback to backend
|
||||
const data = await lambdaClient.message.getMessages.query(params);
|
||||
return data as unknown as UIChatMessage[];
|
||||
};
|
||||
|
||||
// ==========================================
|
||||
// Direct API Operations (no batching)
|
||||
// These operations need immediate server response
|
||||
// ==========================================
|
||||
|
||||
countMessages = async (params?: {
|
||||
endDate?: string;
|
||||
range?: [string, string];
|
||||
@@ -86,10 +315,6 @@ export class MessageService {
|
||||
/**
|
||||
* Update tool arguments by toolCallId - updates both tool message and parent assistant message in one transaction
|
||||
* This is the preferred method for updating tool arguments as it prevents race conditions
|
||||
*
|
||||
* @param toolCallId - The tool call ID (stable identifier from AI response)
|
||||
* @param value - The new arguments value
|
||||
* @param ctx - Message query context
|
||||
*/
|
||||
updateToolArguments = async (
|
||||
toolCallId: string,
|
||||
@@ -99,18 +324,6 @@ export class MessageService {
|
||||
return lambdaClient.message.updateToolArguments.mutate({ ...ctx, toolCallId, value });
|
||||
};
|
||||
|
||||
updateMessage = async (
|
||||
id: string,
|
||||
value: Partial<UpdateMessageParams>,
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<UpdateMessageResult> => {
|
||||
return lambdaClient.message.update.mutate({
|
||||
...ctx,
|
||||
id,
|
||||
value,
|
||||
});
|
||||
};
|
||||
|
||||
updateMessageTranslate = async (id: string, translate: Partial<ChatTranslate> | false) => {
|
||||
return lambdaClient.message.updateTranslate.mutate({ id, value: translate as ChatTranslate });
|
||||
};
|
||||
@@ -119,21 +332,11 @@ export class MessageService {
|
||||
return lambdaClient.message.updateTTS.mutate({ id, value: tts });
|
||||
};
|
||||
|
||||
updateMessageMetadata = async (
|
||||
id: string,
|
||||
value: Partial<MessageMetadata>,
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<UpdateMessageResult> => {
|
||||
return abortableRequest.execute(`message-metadata-${id}`, (signal) =>
|
||||
lambdaClient.message.updateMetadata.mutate({ ...ctx, id, value }, { signal }),
|
||||
);
|
||||
};
|
||||
|
||||
updateMessagePluginState = async (
|
||||
id: string,
|
||||
value: Record<string, any>,
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<UpdateMessageResult> => {
|
||||
) => {
|
||||
return lambdaClient.message.updatePluginState.mutate({ ...ctx, id, value });
|
||||
};
|
||||
|
||||
@@ -141,7 +344,7 @@ export class MessageService {
|
||||
id: string,
|
||||
error: ChatMessagePluginError | null,
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<UpdateMessageResult> => {
|
||||
) => {
|
||||
return lambdaClient.message.updatePluginError.mutate({ ...ctx, id, value: error as any });
|
||||
};
|
||||
|
||||
@@ -149,46 +352,26 @@ export class MessageService {
|
||||
id: string,
|
||||
value: Partial<Omit<MessagePluginItem, 'id'>>,
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<UpdateMessageResult> => {
|
||||
) => {
|
||||
return lambdaClient.message.updateMessagePlugin.mutate({ ...ctx, id, value });
|
||||
};
|
||||
|
||||
updateMessageRAG = async (
|
||||
id: string,
|
||||
data: UpdateMessageRAGParams,
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<UpdateMessageResult> => {
|
||||
updateMessageRAG = async (id: string, data: UpdateMessageRAGParams, ctx?: MessageQueryContext) => {
|
||||
return lambdaClient.message.updateMessageRAG.mutate({ ...ctx, id, value: data });
|
||||
};
|
||||
|
||||
/**
|
||||
* Update tool message with content, metadata, pluginState, and pluginError in a single request
|
||||
* This prevents race conditions when updating multiple fields
|
||||
* Uses abortableRequest to cancel previous requests for the same message
|
||||
*/
|
||||
updateToolMessage = async (
|
||||
id: string,
|
||||
value: {
|
||||
content?: string;
|
||||
metadata?: Record<string, any>;
|
||||
pluginError?: any;
|
||||
pluginState?: Record<string, any>;
|
||||
},
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<UpdateMessageResult> => {
|
||||
return abortableRequest.execute(`tool-message-${id}`, (signal) =>
|
||||
lambdaClient.message.updateToolMessage.mutate({ ...ctx, id, value }, { signal }),
|
||||
);
|
||||
};
|
||||
removeMessages = async (ids: string[], ctx?: MessageQueryContext) => {
|
||||
// Update local cache if context is provided
|
||||
if (ctx?.agentId) {
|
||||
const context = {
|
||||
agentId: ctx.agentId,
|
||||
groupId: ctx.groupId,
|
||||
threadId: ctx.threadId,
|
||||
topicId: ctx.topicId,
|
||||
};
|
||||
messagesCache.removeMessages(context, ids);
|
||||
}
|
||||
|
||||
removeMessage = async (id: string, ctx?: MessageQueryContext): Promise<UpdateMessageResult> => {
|
||||
return lambdaClient.message.removeMessage.mutate({ ...ctx, id });
|
||||
};
|
||||
|
||||
removeMessages = async (
|
||||
ids: string[],
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<UpdateMessageResult> => {
|
||||
return lambdaClient.message.removeMessages.mutate({ ...ctx, ids });
|
||||
};
|
||||
|
||||
@@ -201,20 +384,18 @@ export class MessageService {
|
||||
};
|
||||
|
||||
removeAllMessages = async () => {
|
||||
// Clear all cache
|
||||
messagesCache.clearAll();
|
||||
return lambdaClient.message.removeAllMessages.mutate();
|
||||
};
|
||||
|
||||
/**
|
||||
* Add files to a message
|
||||
* Used to associate exported files from code interpreter with the tool message
|
||||
*/
|
||||
addFilesToMessage = async (
|
||||
id: string,
|
||||
fileIds: string[],
|
||||
ctx?: MessageQueryContext,
|
||||
): Promise<UpdateMessageResult> => {
|
||||
addFilesToMessage = async (id: string, fileIds: string[], ctx?: MessageQueryContext) => {
|
||||
return lambdaClient.message.addFilesToMessage.mutate({ ...ctx, fileIds, id });
|
||||
};
|
||||
}
|
||||
|
||||
export const messageService = new MessageService();
|
||||
|
||||
// Re-export batcher utilities
|
||||
export { generateMessageId, messagesBatcher } from './batcher';
|
||||
export { messagesCache } from './cache';
|
||||
|
||||
Reference in New Issue
Block a user