Compare commits

...

1 Commits

Author SHA1 Message Date
arvinxx d4e9553ea8 try new message engine 2026-01-16 09:49:38 +08:00
7 changed files with 1120 additions and 233 deletions
+44
View File
@@ -30,6 +30,50 @@ const messageProcedure = authedProcedure.use(serverDatabase).use(async (opts) =>
});
export const messageRouter = router({
/**
* Batch operations for messages
* Executes multiple operations in a single request and returns the updated message list
* This reduces HTTP calls by combining create/update/delete operations
*/
batchOperations: messageProcedure
.input(
z
.object({
operations: z.array(
z.object({
data: z.record(z.any()).optional(),
messageId: z.string(),
type: z.enum([
'create',
'update',
'updateMetadata',
'updateToolMessage',
'updateToolArguments',
'delete',
]),
}),
),
})
.extend(basicContextSchema.shape),
)
.mutation(async ({ input, ctx }) => {
const { operations, agentId, ...options } = input;
if (!agentId) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: 'agentId is required for batch operations',
});
}
const resolved = await resolveContext({ agentId, ...options }, ctx.serverDB, ctx.userId);
return ctx.messageService.batchOperations(operations, {
...resolved,
agentId,
});
}),
addFilesToMessage: messageProcedure
.input(
z
+90
View File
@@ -256,4 +256,94 @@ export class MessageService {
}
return this.queryWithSuccess(options);
}
/**
* Batch operations for messages
* Executes multiple operations in a single transaction and returns the updated message list
*
* Supported operation types:
* - create: Create a new message
* - update: Update message content/error/reasoning
* - updateMetadata: Update message metadata
* - updateToolMessage: Update tool message (content, metadata, pluginState, pluginError)
* - delete: Delete a message
*
* @param operations - Array of operations to execute
* @param options - Query options for returning updated messages
*/
async batchOperations(
operations: Array<{
data?: Record<string, any>;
messageId: string;
type: 'create' | 'update' | 'updateMetadata' | 'updateToolMessage' | 'updateToolArguments' | 'delete';
}>,
options: QueryOptions & { agentId: string },
): Promise<{ messages: UIChatMessage[]; success: boolean }> {
// Execute all operations sequentially
// Note: We don't wrap in a transaction here because each model method
// may have its own transaction logic. For true atomicity, we'd need
// to refactor the model layer.
for (const op of operations) {
try {
switch (op.type) {
case 'create': {
await this.messageModel.create({
...op.data,
agentId: options.agentId,
groupId: options.groupId,
id: op.messageId, // Use frontend-generated ID
topicId: options.topicId,
} as CreateMessageParams);
break;
}
case 'update': {
await this.messageModel.update(op.messageId, op.data as any);
break;
}
case 'updateMetadata': {
if (op.data) {
await this.messageModel.updateMetadata(op.messageId, op.data);
}
break;
}
case 'updateToolMessage': {
await this.messageModel.updateToolMessage(op.messageId, op.data as any);
break;
}
case 'updateToolArguments': {
if (op.data?.toolCallId && op.data?.arguments) {
const argsString =
typeof op.data.arguments === 'string'
? op.data.arguments
: JSON.stringify(op.data.arguments);
await this.messageModel.updateToolArguments(op.data.toolCallId, argsString);
}
break;
}
case 'delete': {
await this.messageModel.deleteMessage(op.messageId);
break;
}
}
} catch (error) {
console.error(`Batch operation failed for ${op.type} on message ${op.messageId}:`, error);
// Continue with other operations even if one fails
}
}
// Query and return the updated message list
const messages = await this.messageModel.query(
{
agentId: options.agentId,
current: 0,
groupId: options.groupId,
pageSize: 9999,
threadId: options.threadId,
topicId: options.topicId,
},
this.getQueryOptions(),
);
return { messages, success: true };
}
}
@@ -0,0 +1,387 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { MessagesBatcher } from '../batcher';
// Mock the lambdaClient
vi.mock('@/libs/trpc/client', () => ({
lambdaClient: {
message: {
batchOperations: {
mutate: vi.fn(),
},
},
},
}));
describe('MessagesBatcher', () => {
let batcher: MessagesBatcher;
beforeEach(() => {
vi.clearAllMocks();
batcher = new MessagesBatcher();
});
describe('dispatch', () => {
it('should add operation to queue', () => {
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'create',
});
expect(batcher.getQueueLength()).toBe(1);
});
it('should merge create + update operations', () => {
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'create',
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello world' },
type: 'update',
});
// Should still be 1 operation (merged)
expect(batcher.getQueueLength()).toBe(1);
});
it('should cancel out create + delete operations', () => {
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'create',
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: {},
type: 'delete',
});
// Should be empty (cancelled out)
expect(batcher.getQueueLength()).toBe(0);
});
it('should merge update + update operations', () => {
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'update',
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { reasoning: 'thinking...' },
type: 'update',
});
expect(batcher.getQueueLength()).toBe(1);
});
it('should replace update with delete', () => {
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'update',
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: {},
type: 'delete',
});
expect(batcher.getQueueLength()).toBe(1);
});
it('should merge updateMetadata operations', () => {
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { key1: 'value1' },
type: 'updateMetadata',
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { key2: 'value2' },
type: 'updateMetadata',
});
expect(batcher.getQueueLength()).toBe(1);
});
it('should merge updateToolMessage operations', () => {
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { pluginState: { step: 1 } },
type: 'updateToolMessage',
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'result' },
type: 'updateToolMessage',
});
expect(batcher.getQueueLength()).toBe(1);
});
it('should keep operations for different messages separate', () => {
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'create',
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-2',
payload: { content: 'world' },
type: 'create',
});
expect(batcher.getQueueLength()).toBe(2);
});
it('should keep operations for different contexts separate', () => {
batcher.dispatch({
context: { agentId: 'agent-1', topicId: 'topic-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'update',
});
batcher.dispatch({
context: { agentId: 'agent-1', topicId: 'topic-2' },
messageId: 'msg-1',
payload: { content: 'world' },
type: 'update',
});
// Same messageId but different context, so they should be separate
expect(batcher.getQueueLength()).toBe(2);
});
});
describe('microtask flush', () => {
it('should flush via microtask after dispatch', async () => {
const { lambdaClient } = await import('@/libs/trpc/client');
vi.mocked(lambdaClient.message.batchOperations.mutate).mockResolvedValue({
messages: [],
success: true,
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'create',
});
// Not flushed yet (synchronously)
expect(lambdaClient.message.batchOperations.mutate).not.toHaveBeenCalled();
// Wait for microtask to complete
await Promise.resolve();
await Promise.resolve(); // Extra tick for the flush to complete
expect(lambdaClient.message.batchOperations.mutate).toHaveBeenCalledTimes(1);
});
it('should batch multiple synchronous dispatches into one flush', async () => {
const { lambdaClient } = await import('@/libs/trpc/client');
vi.mocked(lambdaClient.message.batchOperations.mutate).mockResolvedValue({
messages: [],
success: true,
});
// Multiple synchronous dispatches
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'create',
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-2',
payload: { content: 'world' },
type: 'create',
});
// Wait for microtask
await Promise.resolve();
await Promise.resolve();
// Should have been called only once with both operations
expect(lambdaClient.message.batchOperations.mutate).toHaveBeenCalledTimes(1);
expect(lambdaClient.message.batchOperations.mutate).toHaveBeenCalledWith(
expect.objectContaining({
operations: expect.arrayContaining([
expect.objectContaining({ messageId: 'msg-1' }),
expect.objectContaining({ messageId: 'msg-2' }),
]),
}),
);
});
it('should clear the queue after flush', async () => {
const { lambdaClient } = await import('@/libs/trpc/client');
vi.mocked(lambdaClient.message.batchOperations.mutate).mockResolvedValue({
messages: [],
success: true,
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'create',
});
// Wait for flush
await Promise.resolve();
await Promise.resolve();
expect(batcher.getQueueLength()).toBe(0);
});
});
describe('error handling', () => {
it('should log error but not re-queue on failure', async () => {
const { lambdaClient } = await import('@/libs/trpc/client');
const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
vi.mocked(lambdaClient.message.batchOperations.mutate).mockRejectedValue(
new Error('Network error'),
);
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'create',
});
// Wait for flush
await Promise.resolve();
await Promise.resolve();
await Promise.resolve(); // Extra tick for error handling
// Error should be logged
expect(consoleErrorSpy).toHaveBeenCalled();
// Queue should be empty (not re-queued - local cache is source of truth)
expect(batcher.getQueueLength()).toBe(0);
consoleErrorSpy.mockRestore();
});
});
describe('complex scenarios', () => {
it('should handle rapid create-update-delete sequence', () => {
// Simulate rapid user actions
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'create',
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello world' },
type: 'update',
});
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: {},
type: 'delete',
});
// All should cancel out
expect(batcher.getQueueLength()).toBe(0);
});
it('should handle multiple messages with mixed operations', async () => {
const { lambdaClient } = await import('@/libs/trpc/client');
vi.mocked(lambdaClient.message.batchOperations.mutate).mockResolvedValue({
messages: [],
success: true,
});
// Create msg-1
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello' },
type: 'create',
});
// Create msg-2
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-2',
payload: { content: 'world' },
type: 'create',
});
// Update msg-1
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-1',
payload: { content: 'hello updated' },
type: 'update',
});
// Delete msg-2
batcher.dispatch({
context: { agentId: 'agent-1' },
messageId: 'msg-2',
payload: {},
type: 'delete',
});
// Should have 1 operation (create msg-1 with updated content)
// msg-2 create + delete should cancel out
expect(batcher.getQueueLength()).toBe(1);
// Wait for flush
await Promise.resolve();
await Promise.resolve();
expect(lambdaClient.message.batchOperations.mutate).toHaveBeenCalledWith(
expect.objectContaining({
operations: [
expect.objectContaining({
data: expect.objectContaining({ content: 'hello updated' }),
messageId: 'msg-1',
type: 'create',
}),
],
}),
);
});
});
});
@@ -1,157 +0,0 @@
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { lambdaClient } from '@/libs/trpc/client';
import { MessageService } from '../index';
vi.mock('@/libs/trpc/client', () => ({
lambdaClient: {
message: {
updateMetadata: {
mutate: vi.fn(),
},
},
},
}));
describe('MessageService - Race Condition Control', () => {
let messageService: MessageService;
beforeEach(() => {
vi.clearAllMocks();
messageService = new MessageService();
});
describe('updateMessageMetadata race condition', () => {
it('should cancel previous request when new update is triggered for same message', async () => {
const messageId = 'test-message-id';
let firstRequestAborted = false;
let secondRequestCompleted = false;
// Mock first request (slow)
vi.mocked(lambdaClient.message.updateMetadata.mutate).mockImplementationOnce(
(_params, options) =>
new Promise((resolve, reject) => {
const signal = options?.signal;
if (signal) {
signal.addEventListener('abort', () => {
firstRequestAborted = true;
reject(new Error('Aborted'));
});
}
// Simulate slow request
setTimeout(() => resolve({ success: true, messages: [] }), 200);
}),
);
// Mock second request (fast)
vi.mocked(lambdaClient.message.updateMetadata.mutate).mockImplementationOnce(
async (_params, _options) => {
secondRequestCompleted = true;
return { success: true, messages: [] };
},
);
// Start first update
const firstPromise = messageService.updateMessageMetadata(messageId, { compare: true });
// Wait a bit then start second update
await new Promise((resolve) => setTimeout(resolve, 10));
const secondPromise = messageService.updateMessageMetadata(messageId, { compare: false });
// First should be aborted
await expect(firstPromise).rejects.toThrow('Aborted');
expect(firstRequestAborted).toBe(true);
// Second should complete successfully
await expect(secondPromise).resolves.toEqual({ success: true, messages: [] });
expect(secondRequestCompleted).toBe(true);
});
it('should allow concurrent updates for different messages', async () => {
const message1Id = 'message-1';
const message2Id = 'message-2';
vi.mocked(lambdaClient.message.updateMetadata.mutate).mockResolvedValue({
success: true,
messages: [],
});
const [result1, result2] = await Promise.all([
messageService.updateMessageMetadata(message1Id, { cost: 0.001 }),
messageService.updateMessageMetadata(message2Id, { cost: 0.002 }),
]);
expect(result1).toEqual({ success: true, messages: [] });
expect(result2).toEqual({ success: true, messages: [] });
expect(lambdaClient.message.updateMetadata.mutate).toHaveBeenCalledTimes(2);
});
it('should handle rapid successive updates correctly', async () => {
const messageId = 'test-message-id';
let completedUpdates = 0;
const abortedUpdates: number[] = [];
// All but the last request should be aborted
let callIndex = 0;
vi.mocked(lambdaClient.message.updateMetadata.mutate).mockImplementation(
(_params, options) => {
const currentIndex = callIndex++;
return new Promise((resolve, reject) => {
const signal = options?.signal;
let isAborted = false;
if (signal) {
signal.addEventListener('abort', () => {
isAborted = true;
abortedUpdates.push(currentIndex);
reject(new Error('Aborted'));
});
}
setTimeout(() => {
if (!isAborted) {
completedUpdates++;
resolve({ success: true, messages: [] });
}
}, 50);
});
},
);
// Trigger 5 rapid updates sequentially with catch to prevent unhandled rejections
const promise1 = messageService
.updateMessageMetadata(messageId, { cost: 0.001 })
.catch((e) => e);
await new Promise((resolve) => setTimeout(resolve, 5));
const promise2 = messageService
.updateMessageMetadata(messageId, { cost: 0.002 })
.catch((e) => e);
await new Promise((resolve) => setTimeout(resolve, 5));
const promise3 = messageService.updateMessageMetadata(messageId, { tps: 10 }).catch((e) => e);
await new Promise((resolve) => setTimeout(resolve, 5));
const promise4 = messageService.updateMessageMetadata(messageId, { tps: 20 }).catch((e) => e);
await new Promise((resolve) => setTimeout(resolve, 5));
const promise5 = messageService
.updateMessageMetadata(messageId, { compare: true })
.catch((e) => e);
// Wait for all to settle
const results = await Promise.all([promise1, promise2, promise3, promise4, promise5]);
// First 4 should be errors (aborted), last should succeed
expect(results[0]).toBeInstanceOf(Error);
expect(results[1]).toBeInstanceOf(Error);
expect(results[2]).toBeInstanceOf(Error);
expect(results[3]).toBeInstanceOf(Error);
expect(results[4]).toEqual({ success: true, messages: [] });
// 4 requests should have been aborted
expect(abortedUpdates.length).toBe(4);
expect(abortedUpdates).toEqual([0, 1, 2, 3]);
// Only the last request should complete
expect(completedUpdates).toBe(1);
});
});
});
+218
View File
@@ -0,0 +1,218 @@
import { nanoid } from 'nanoid';
import { lambdaClient } from '@/libs/trpc/client';
import { messageMapKey, type MessageMapKeyInput } from '@/store/chat/utils/messageMapKey';
// ==========================================
// Types
// ==========================================
export type BatchOperationType =
| 'create'
| 'update'
| 'updateMetadata'
| 'updateToolMessage'
| 'updateToolArguments'
| 'delete';
export interface BatchOperation {
context: MessageMapKeyInput;
contextKey: string;
messageId: string;
payload: Record<string, any>;
type: BatchOperationType;
}
interface BatchOperationsInput {
agentId?: string;
groupId?: string;
operations: Array<{
data?: Record<string, any>;
messageId: string;
type: BatchOperationType;
}>;
threadId?: string | null;
topicId?: string | null;
}
// ==========================================
// MessagesBatcher Class
// ==========================================
/**
* Messages Batcher - batches multiple message operations into a single request
*
* Uses microtask scheduling to merge operations within the same event loop,
* then flushes them asynchronously without blocking the caller.
*/
export class MessagesBatcher {
private flushPromise: Promise<void> | null = null;
private queue: BatchOperation[] = [];
// ==========================================
// Public API
// ==========================================
/**
* Dispatch an operation to the batcher queue
* Operations are automatically merged if they target the same message
* Triggers an async flush via microtask
*/
dispatch(op: Omit<BatchOperation, 'contextKey'>): void {
const contextKey = messageMapKey(op.context);
this.mergeOrPush({ ...op, contextKey });
this.scheduleFlush();
}
/**
* Get the current queue length (for testing/debugging)
*/
getQueueLength(): number {
return this.queue.length;
}
// ==========================================
// Queue Management
// ==========================================
private mergeOrPush(op: BatchOperation): void {
const existingIndex = this.queue.findIndex(
(o) => o.messageId === op.messageId && o.contextKey === op.contextKey,
);
if (existingIndex >= 0) {
const existing = this.queue[existingIndex];
// Merge strategies based on operation type combinations
if (existing.type === 'create' && op.type === 'update') {
// create + update = create with merged payload
existing.payload = { ...existing.payload, ...op.payload };
} else if (existing.type === 'create' && op.type === 'updateMetadata') {
// create + updateMetadata = create with merged metadata
existing.payload = {
...existing.payload,
metadata: { ...existing.payload.metadata, ...op.payload },
};
} else if (existing.type === 'create' && op.type === 'delete') {
// create + delete = cancel out (remove from queue)
this.queue.splice(existingIndex, 1);
return;
} else if (existing.type === 'update' && op.type === 'update') {
// update + update = merge updates
existing.payload = { ...existing.payload, ...op.payload };
} else if (existing.type === 'update' && op.type === 'updateMetadata') {
// update + updateMetadata = merge
existing.payload = {
...existing.payload,
metadata: { ...existing.payload.metadata, ...op.payload },
};
} else if (existing.type === 'update' && op.type === 'delete') {
// update + delete = just delete
this.queue[existingIndex] = op;
} else if (existing.type === 'updateMetadata' && op.type === 'updateMetadata') {
// metadata + metadata = merge
existing.payload = { ...existing.payload, ...op.payload };
} else if (existing.type === 'updateMetadata' && op.type === 'delete') {
// metadata + delete = just delete
this.queue[existingIndex] = op;
} else if (existing.type === 'updateToolMessage' && op.type === 'updateToolMessage') {
// toolMessage + toolMessage = merge
existing.payload = { ...existing.payload, ...op.payload };
} else if (existing.type === 'updateToolMessage' && op.type === 'delete') {
// toolMessage + delete = just delete
this.queue[existingIndex] = op;
} else {
// Other combinations: just append
this.queue.push(op);
}
} else {
this.queue.push(op);
}
}
/**
* Schedule a flush using microtask
* This ensures all synchronous operations in the same event loop are batched together
*/
private scheduleFlush(): void {
if (!this.flushPromise) {
this.flushPromise = Promise.resolve().then(() => {
this.flushPromise = null;
this.flush();
});
}
}
// ==========================================
// Flush Operations
// ==========================================
private async flush(): Promise<void> {
if (this.queue.length === 0) return;
const ops = [...this.queue];
this.queue = [];
// Group by context
const byContext = this.groupByContext(ops);
// Send batches in parallel for different contexts
await Promise.all(
Object.entries(byContext).map(([contextKey, contextOps]) =>
this.sendBatch(contextKey, contextOps),
),
);
}
private groupByContext(ops: BatchOperation[]): Record<string, BatchOperation[]> {
return ops.reduce(
(acc, op) => {
if (!acc[op.contextKey]) acc[op.contextKey] = [];
acc[op.contextKey].push(op);
return acc;
},
{} as Record<string, BatchOperation[]>,
);
}
private async sendBatch(_contextKey: string, ops: BatchOperation[]): Promise<void> {
try {
const context = ops[0].context;
const input: BatchOperationsInput = {
agentId: context.agentId,
groupId: context.groupId,
operations: ops.map((op) => ({
data: op.payload,
messageId: op.messageId,
type: op.type,
})),
threadId: context.threadId,
topicId: context.topicId,
};
await lambdaClient.message.batchOperations.mutate(input);
} catch (error) {
// Log error but don't re-queue - local cache is source of truth
console.error('Batch sync failed:', error);
}
}
}
// ==========================================
// Helper: Generate Message ID
// ==========================================
/**
* Generate a new message ID (frontend-generated)
* This allows optimistic updates without waiting for server
*/
export const generateMessageId = (): string => {
return nanoid();
};
// ==========================================
// Singleton Instance
// ==========================================
export const messagesBatcher = new MessagesBatcher();
+124
View File
@@ -0,0 +1,124 @@
import type { UIChatMessage } from '@lobechat/types';
import { messageMapKey, type MessageMapKeyInput } from '@/store/chat/utils/messageMapKey';
/**
* Local messages cache for optimistic updates
* Stores messages grouped by context key (agentId + topicId + threadId)
*/
class MessagesCache {
private cache: Map<string, UIChatMessage[]> = new Map();
/**
* Get context key from input
*/
private getKey(ctx: MessageMapKeyInput): string {
return messageMapKey(ctx);
}
/**
* Get messages for a context
*/
get(ctx: MessageMapKeyInput): UIChatMessage[] | undefined {
return this.cache.get(this.getKey(ctx));
}
/**
* Check if cache has messages for a context
*/
has(ctx: MessageMapKeyInput): boolean {
return this.cache.has(this.getKey(ctx));
}
/**
* Set messages for a context (used when loading from backend)
*/
set(ctx: MessageMapKeyInput, messages: UIChatMessage[]): void {
this.cache.set(this.getKey(ctx), [...messages]);
}
/**
* Add a message to the cache (optimistic create)
*/
addMessage(ctx: MessageMapKeyInput, message: UIChatMessage): UIChatMessage[] {
const key = this.getKey(ctx);
const messages = this.cache.get(key) || [];
const updated = [...messages, message];
this.cache.set(key, updated);
return updated;
}
/**
* Update a message in the cache (optimistic update)
*/
updateMessage(
ctx: MessageMapKeyInput,
id: string,
value: Partial<UIChatMessage>,
): UIChatMessage[] {
const key = this.getKey(ctx);
const messages = this.cache.get(key) || [];
const updated = messages.map((m) => (m.id === id ? { ...m, ...value, updatedAt: Date.now() } : m));
this.cache.set(key, updated);
return updated;
}
/**
* Update message metadata in the cache
*/
updateMessageMetadata(
ctx: MessageMapKeyInput,
id: string,
metadata: Record<string, any>,
): UIChatMessage[] {
const key = this.getKey(ctx);
const messages = this.cache.get(key) || [];
const updated = messages.map((m) =>
m.id === id
? { ...m, metadata: { ...m.metadata, ...metadata }, updatedAt: Date.now() }
: m,
);
this.cache.set(key, updated);
return updated;
}
/**
* Remove a message from the cache (optimistic delete)
*/
removeMessage(ctx: MessageMapKeyInput, id: string): UIChatMessage[] {
const key = this.getKey(ctx);
const messages = this.cache.get(key) || [];
const updated = messages.filter((m) => m.id !== id);
this.cache.set(key, updated);
return updated;
}
/**
* Remove multiple messages from the cache
*/
removeMessages(ctx: MessageMapKeyInput, ids: string[]): UIChatMessage[] {
const key = this.getKey(ctx);
const messages = this.cache.get(key) || [];
const idSet = new Set(ids);
const updated = messages.filter((m) => !idSet.has(m.id));
this.cache.set(key, updated);
return updated;
}
/**
* Clear cache for a specific context
*/
clear(ctx: MessageMapKeyInput): void {
this.cache.delete(this.getKey(ctx));
}
/**
* Clear all cache
*/
clearAll(): void {
this.cache.clear();
}
}
// Singleton instance
export const messagesCache = new MessagesCache();
+257 -76
View File
@@ -4,20 +4,19 @@ import {
type ChatTTS,
type ChatTranslate,
type CreateMessageParams,
type CreateMessageResult,
type MessageMetadata,
type MessagePluginItem,
type ModelRankItem,
type UIChatMessage,
type UpdateMessageParams,
type UpdateMessageRAGParams,
type UpdateMessageResult,
} from '@lobechat/types';
import type { HeatmapsProps } from '@lobehub/charts';
import { lambdaClient } from '@/libs/trpc/client';
import { abortableRequest } from '../utils/abortableRequest';
import { generateMessageId, messagesBatcher } from './batcher';
import { messagesCache } from './cache';
/**
* Query context for message operations
@@ -32,16 +31,246 @@ export interface MessageQueryContext {
}
export class MessageService {
createMessage = async (params: CreateMessageParams): Promise<CreateMessageResult> => {
return lambdaClient.message.createMessage.mutate(params as any);
// ==========================================
// Batched Operations (with local cache)
// These return immediately from local cache
// ==========================================
/**
* Create a message with optimistic update
* Updates local cache immediately, dispatches to batcher
*/
createMessage = async (
params: CreateMessageParams & { id?: string },
): Promise<{ id: string; messages: UIChatMessage[] }> => {
const id = params.id || generateMessageId();
const now = Date.now();
if (!params.agentId) {
console.warn('createMessage: agentId is required');
return { id, messages: [] };
}
const ctx = {
agentId: params.agentId,
groupId: params.groupId ?? undefined,
threadId: params.threadId,
topicId: params.topicId,
};
// Create the message object for local cache
const newMessage: UIChatMessage = {
...params,
content: params.content || '',
createdAt: now,
id,
meta: {},
updatedAt: now,
} as UIChatMessage;
// Update local cache
const messages = messagesCache.addMessage(ctx, newMessage);
// Dispatch to batcher (async, non-blocking)
messagesBatcher.dispatch({
context: ctx,
messageId: id,
payload: { ...params, id },
type: 'create',
});
return { id, messages };
};
/**
* Update a message with optimistic update
*/
updateMessage = async (
id: string,
value: Partial<UpdateMessageParams>,
ctx?: MessageQueryContext,
): Promise<{ messages?: UIChatMessage[]; success: boolean }> => {
if (!ctx?.agentId) {
console.warn('updateMessage: agentId is required');
return { success: false };
}
const context = {
agentId: ctx.agentId,
groupId: ctx.groupId,
threadId: ctx.threadId,
topicId: ctx.topicId,
};
// Update local cache
const messages = messagesCache.updateMessage(context, id, value as Partial<UIChatMessage>);
// Dispatch to batcher
messagesBatcher.dispatch({
context,
messageId: id,
payload: value,
type: 'update',
});
return { messages, success: true };
};
/**
* Update message metadata with optimistic update
*/
updateMessageMetadata = async (
id: string,
value: Partial<MessageMetadata>,
ctx?: MessageQueryContext,
): Promise<{ messages?: UIChatMessage[]; success: boolean }> => {
if (!ctx?.agentId) {
console.warn('updateMessageMetadata: agentId is required');
return { success: false };
}
const context = {
agentId: ctx.agentId,
groupId: ctx.groupId,
threadId: ctx.threadId,
topicId: ctx.topicId,
};
// Update local cache
const messages = messagesCache.updateMessageMetadata(context, id, value);
// Dispatch to batcher
messagesBatcher.dispatch({
context,
messageId: id,
payload: value,
type: 'updateMetadata',
});
return { messages, success: true };
};
/**
* Update tool message with optimistic update
*/
updateToolMessage = async (
id: string,
value: {
content?: string;
metadata?: Record<string, any>;
pluginError?: any;
pluginState?: Record<string, any>;
},
ctx?: MessageQueryContext,
): Promise<{ messages?: UIChatMessage[]; success: boolean }> => {
if (!ctx?.agentId) {
console.warn('updateToolMessage: agentId is required');
return { success: false };
}
const context = {
agentId: ctx.agentId,
groupId: ctx.groupId,
threadId: ctx.threadId,
topicId: ctx.topicId,
};
// Update local cache with the relevant fields
const updateValue: Partial<UIChatMessage> = {};
if (value.content !== undefined) updateValue.content = value.content;
if (value.metadata !== undefined) updateValue.metadata = value.metadata;
if (value.pluginError !== undefined) (updateValue as any).pluginError = value.pluginError;
if (value.pluginState !== undefined) (updateValue as any).pluginState = value.pluginState;
const messages = messagesCache.updateMessage(context, id, updateValue);
// Dispatch to batcher
messagesBatcher.dispatch({
context,
messageId: id,
payload: value,
type: 'updateToolMessage',
});
return { messages, success: true };
};
/**
* Remove a message with optimistic update
*/
removeMessage = async (
id: string,
ctx?: MessageQueryContext,
): Promise<{ messages?: UIChatMessage[]; success: boolean }> => {
if (!ctx?.agentId) {
console.warn('removeMessage: agentId is required');
return { success: false };
}
const context = {
agentId: ctx.agentId,
groupId: ctx.groupId,
threadId: ctx.threadId,
topicId: ctx.topicId,
};
// Remove from local cache
const messages = messagesCache.removeMessage(context, id);
// Dispatch to batcher
messagesBatcher.dispatch({
context,
messageId: id,
payload: {},
type: 'delete',
});
return { messages, success: true };
};
// ==========================================
// Query Operations (with cache)
// ==========================================
getMessages = async (params: MessageQueryContext): Promise<UIChatMessage[]> => {
const data = await lambdaClient.message.getMessages.query(params);
// For shared topics, always fetch from backend
if (params.topicShareId) {
const data = await lambdaClient.message.getMessages.query(params);
return data as unknown as UIChatMessage[];
}
// Check if agentId is provided for cache
if (params.agentId) {
const ctx = {
agentId: params.agentId,
groupId: params.groupId,
threadId: params.threadId,
topicId: params.topicId,
};
// Check cache first
const cached = messagesCache.get(ctx);
if (cached) {
return cached;
}
// Fetch from backend and cache
const data = await lambdaClient.message.getMessages.query(params);
const messages = data as unknown as UIChatMessage[];
messagesCache.set(ctx, messages);
return messages;
}
// Fallback to backend
const data = await lambdaClient.message.getMessages.query(params);
return data as unknown as UIChatMessage[];
};
// ==========================================
// Direct API Operations (no batching)
// These operations need immediate server response
// ==========================================
countMessages = async (params?: {
endDate?: string;
range?: [string, string];
@@ -86,10 +315,6 @@ export class MessageService {
/**
* Update tool arguments by toolCallId - updates both tool message and parent assistant message in one transaction
* This is the preferred method for updating tool arguments as it prevents race conditions
*
* @param toolCallId - The tool call ID (stable identifier from AI response)
* @param value - The new arguments value
* @param ctx - Message query context
*/
updateToolArguments = async (
toolCallId: string,
@@ -99,18 +324,6 @@ export class MessageService {
return lambdaClient.message.updateToolArguments.mutate({ ...ctx, toolCallId, value });
};
updateMessage = async (
id: string,
value: Partial<UpdateMessageParams>,
ctx?: MessageQueryContext,
): Promise<UpdateMessageResult> => {
return lambdaClient.message.update.mutate({
...ctx,
id,
value,
});
};
updateMessageTranslate = async (id: string, translate: Partial<ChatTranslate> | false) => {
return lambdaClient.message.updateTranslate.mutate({ id, value: translate as ChatTranslate });
};
@@ -119,21 +332,11 @@ export class MessageService {
return lambdaClient.message.updateTTS.mutate({ id, value: tts });
};
updateMessageMetadata = async (
id: string,
value: Partial<MessageMetadata>,
ctx?: MessageQueryContext,
): Promise<UpdateMessageResult> => {
return abortableRequest.execute(`message-metadata-${id}`, (signal) =>
lambdaClient.message.updateMetadata.mutate({ ...ctx, id, value }, { signal }),
);
};
updateMessagePluginState = async (
id: string,
value: Record<string, any>,
ctx?: MessageQueryContext,
): Promise<UpdateMessageResult> => {
) => {
return lambdaClient.message.updatePluginState.mutate({ ...ctx, id, value });
};
@@ -141,7 +344,7 @@ export class MessageService {
id: string,
error: ChatMessagePluginError | null,
ctx?: MessageQueryContext,
): Promise<UpdateMessageResult> => {
) => {
return lambdaClient.message.updatePluginError.mutate({ ...ctx, id, value: error as any });
};
@@ -149,46 +352,26 @@ export class MessageService {
id: string,
value: Partial<Omit<MessagePluginItem, 'id'>>,
ctx?: MessageQueryContext,
): Promise<UpdateMessageResult> => {
) => {
return lambdaClient.message.updateMessagePlugin.mutate({ ...ctx, id, value });
};
updateMessageRAG = async (
id: string,
data: UpdateMessageRAGParams,
ctx?: MessageQueryContext,
): Promise<UpdateMessageResult> => {
updateMessageRAG = async (id: string, data: UpdateMessageRAGParams, ctx?: MessageQueryContext) => {
return lambdaClient.message.updateMessageRAG.mutate({ ...ctx, id, value: data });
};
/**
* Update tool message with content, metadata, pluginState, and pluginError in a single request
* This prevents race conditions when updating multiple fields
* Uses abortableRequest to cancel previous requests for the same message
*/
updateToolMessage = async (
id: string,
value: {
content?: string;
metadata?: Record<string, any>;
pluginError?: any;
pluginState?: Record<string, any>;
},
ctx?: MessageQueryContext,
): Promise<UpdateMessageResult> => {
return abortableRequest.execute(`tool-message-${id}`, (signal) =>
lambdaClient.message.updateToolMessage.mutate({ ...ctx, id, value }, { signal }),
);
};
removeMessages = async (ids: string[], ctx?: MessageQueryContext) => {
// Update local cache if context is provided
if (ctx?.agentId) {
const context = {
agentId: ctx.agentId,
groupId: ctx.groupId,
threadId: ctx.threadId,
topicId: ctx.topicId,
};
messagesCache.removeMessages(context, ids);
}
removeMessage = async (id: string, ctx?: MessageQueryContext): Promise<UpdateMessageResult> => {
return lambdaClient.message.removeMessage.mutate({ ...ctx, id });
};
removeMessages = async (
ids: string[],
ctx?: MessageQueryContext,
): Promise<UpdateMessageResult> => {
return lambdaClient.message.removeMessages.mutate({ ...ctx, ids });
};
@@ -201,20 +384,18 @@ export class MessageService {
};
removeAllMessages = async () => {
// Clear all cache
messagesCache.clearAll();
return lambdaClient.message.removeAllMessages.mutate();
};
/**
* Add files to a message
* Used to associate exported files from code interpreter with the tool message
*/
addFilesToMessage = async (
id: string,
fileIds: string[],
ctx?: MessageQueryContext,
): Promise<UpdateMessageResult> => {
addFilesToMessage = async (id: string, fileIds: string[], ctx?: MessageQueryContext) => {
return lambdaClient.message.addFilesToMessage.mutate({ ...ctx, fileIds, id });
};
}
export const messageService = new MessageService();
// Re-export batcher utilities
export { generateMessageId, messagesBatcher } from './batcher';
export { messagesCache } from './cache';