🐛 fix: fix thread not working issue

This commit is contained in:
arvinxx
2025-12-21 15:39:16 +08:00
parent c632b22d97
commit 7dd30ebb98
9 changed files with 884 additions and 170 deletions
@@ -0,0 +1,745 @@
import { eq, sql } from 'drizzle-orm';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { agents, messages, sessions, threads, topics, users } from '../../../schemas';
import { LobeChatDatabase } from '../../../type';
import { MessageModel } from '../../message';
import { getTestDB } from '../_util';
const serverDB: LobeChatDatabase = await getTestDB();
const userId = 'thread-query-test-user';
const messageModel = new MessageModel(serverDB, userId);
beforeEach(async () => {
await serverDB.transaction(async (trx) => {
await trx.delete(users).where(eq(users.id, userId));
await trx.insert(users).values([{ id: userId }]);
});
});
afterEach(async () => {
await serverDB.transaction(async (trx) => {
await trx.delete(users).where(eq(users.id, userId));
});
});
describe('MessageModel thread query', () => {
describe('query with threadId - complete thread data', () => {
it('should return parent messages + thread messages for Continuation type', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
// Create thread with Continuation type
await trx.insert(threads).values([
{
id: 'thread1',
userId,
topicId: 'topic1',
sourceMessageId: 'msg2',
type: 'continuation',
},
]);
// Create main conversation messages (parent messages)
await trx.insert(messages).values([
{
id: 'msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'first message',
createdAt: new Date('2023-01-01'),
},
{
id: 'msg2',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'assistant',
content: 'second message - source',
createdAt: new Date('2023-01-02'),
},
{
id: 'msg3',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'third message - after source',
createdAt: new Date('2023-01-03'),
},
// Thread messages
{
id: 'thread-msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: 'thread1',
role: 'user',
content: 'thread message 1',
createdAt: new Date('2023-01-02T10:00:00'),
},
{
id: 'thread-msg2',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: 'thread1',
role: 'assistant',
content: 'thread message 2',
createdAt: new Date('2023-01-02T11:00:00'),
},
]);
});
const result = await messageModel.query({ threadId: 'thread1' });
// Should include parent messages (msg1, msg2) + thread messages (thread-msg1, thread-msg2)
// Should NOT include msg3 (after source message)
expect(result).toHaveLength(4);
expect(result.map((m) => m.id)).toEqual(['msg1', 'msg2', 'thread-msg1', 'thread-msg2']);
});
it('should return parent messages + thread messages when querying with agentId', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(agents).values([{ id: 'agent1', userId }]);
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
// Create thread with Continuation type
await trx.insert(threads).values([
{
id: 'thread1',
userId,
topicId: 'topic1',
sourceMessageId: 'msg2',
type: 'continuation',
},
]);
// Create main conversation messages (parent messages) with agentId
await trx.insert(messages).values([
{
id: 'msg1',
userId,
agentId: 'agent1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'first message',
createdAt: new Date('2023-01-01'),
},
{
id: 'msg2',
userId,
agentId: 'agent1',
topicId: 'topic1',
threadId: null,
role: 'assistant',
content: 'second message - source',
createdAt: new Date('2023-01-02'),
},
// Thread messages
{
id: 'thread-msg1',
userId,
agentId: 'agent1',
topicId: 'topic1',
threadId: 'thread1',
role: 'user',
content: 'thread message 1',
createdAt: new Date('2023-01-02T10:00:00'),
},
{
id: 'thread-msg2',
userId,
agentId: 'agent1',
topicId: 'topic1',
threadId: 'thread1',
role: 'assistant',
content: 'thread message 2',
createdAt: new Date('2023-01-02T11:00:00'),
},
]);
});
// Query with both agentId and threadId
const result = await messageModel.query({ agentId: 'agent1', threadId: 'thread1' });
// Should include parent messages (msg1, msg2) + thread messages (thread-msg1, thread-msg2)
expect(result).toHaveLength(4);
expect(result.map((m) => m.id)).toEqual(['msg1', 'msg2', 'thread-msg1', 'thread-msg2']);
});
it('should return parent messages + thread messages when querying with topicId and threadId', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(agents).values([{ id: 'agent1', userId }]);
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([
{ id: 'topic1', sessionId: 'session1', userId },
{ id: 'topic2', sessionId: 'session1', userId },
]);
// Create thread with Continuation type
await trx.insert(threads).values([
{
id: 'thread1',
userId,
topicId: 'topic1',
sourceMessageId: 'msg2',
type: 'continuation',
},
]);
// Create messages in topic1
await trx.insert(messages).values([
{
id: 'msg1',
userId,
agentId: 'agent1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'topic1 first message',
createdAt: new Date('2023-01-01'),
},
{
id: 'msg2',
userId,
agentId: 'agent1',
topicId: 'topic1',
threadId: null,
role: 'assistant',
content: 'topic1 second message - source',
createdAt: new Date('2023-01-02'),
},
// Thread messages in topic1
{
id: 'thread-msg1',
userId,
agentId: 'agent1',
topicId: 'topic1',
threadId: 'thread1',
role: 'user',
content: 'thread message 1',
createdAt: new Date('2023-01-02T10:00:00'),
},
// Messages in topic2 (should not be included)
{
id: 'topic2-msg1',
userId,
agentId: 'agent1',
topicId: 'topic2',
threadId: null,
role: 'user',
content: 'topic2 message',
createdAt: new Date('2023-01-01T05:00:00'),
},
]);
});
// Query with agentId, topicId and threadId
const result = await messageModel.query({
agentId: 'agent1',
topicId: 'topic1',
threadId: 'thread1',
});
// Should include parent messages (msg1, msg2) + thread message (thread-msg1)
// Should NOT include topic2-msg1
expect(result).toHaveLength(3);
expect(result.map((m) => m.id)).toEqual(['msg1', 'msg2', 'thread-msg1']);
});
it('should return only source message + thread messages for Standalone type', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
// Create thread with Standalone type
await trx.insert(threads).values([
{
id: 'thread1',
userId,
topicId: 'topic1',
sourceMessageId: 'msg2',
type: 'standalone',
},
]);
// Create main conversation messages
await trx.insert(messages).values([
{
id: 'msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'first message',
createdAt: new Date('2023-01-01'),
},
{
id: 'msg2',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'assistant',
content: 'second message - source',
createdAt: new Date('2023-01-02'),
},
{
id: 'msg3',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'third message',
createdAt: new Date('2023-01-03'),
},
// Thread messages
{
id: 'thread-msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: 'thread1',
role: 'user',
content: 'thread message 1',
createdAt: new Date('2023-01-02T10:00:00'),
},
{
id: 'thread-msg2',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: 'thread1',
role: 'assistant',
content: 'thread message 2',
createdAt: new Date('2023-01-02T11:00:00'),
},
]);
});
const result = await messageModel.query({ threadId: 'thread1' });
// For Standalone: should include only source message (msg2) + thread messages
// Should NOT include msg1 or msg3
expect(result).toHaveLength(3);
expect(result.map((m) => m.id)).toEqual(['msg2', 'thread-msg1', 'thread-msg2']);
});
it('should return only thread messages when thread has no sourceMessageId', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
// Create thread without sourceMessageId
await trx.insert(threads).values([
{
id: 'thread1',
userId,
topicId: 'topic1',
sourceMessageId: null,
type: 'continuation',
},
]);
await trx.insert(messages).values([
{
id: 'msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'main message',
createdAt: new Date('2023-01-01'),
},
{
id: 'thread-msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: 'thread1',
role: 'user',
content: 'thread message',
createdAt: new Date('2023-01-02'),
},
]);
});
const result = await messageModel.query({ threadId: 'thread1' });
// Should only return thread messages (fallback to original behavior)
expect(result).toHaveLength(1);
expect(result[0].id).toBe('thread-msg1');
});
it('should handle messages in correct chronological order', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
// Create thread
await trx.insert(threads).values([
{
id: 'thread1',
userId,
topicId: 'topic1',
sourceMessageId: 'msg2',
type: 'continuation',
},
]);
// Insert messages in non-chronological order
await trx.insert(messages).values([
{
id: 'thread-msg2',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: 'thread1',
role: 'assistant',
content: 'thread message 2',
createdAt: new Date('2023-01-02T11:00:00'),
},
{
id: 'msg2',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'assistant',
content: 'source message',
createdAt: new Date('2023-01-02'),
},
{
id: 'thread-msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: 'thread1',
role: 'user',
content: 'thread message 1',
createdAt: new Date('2023-01-02T10:00:00'),
},
{
id: 'msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'first message',
createdAt: new Date('2023-01-01'),
},
]);
});
const result = await messageModel.query({ threadId: 'thread1' });
// Should be in chronological order: msg1 -> msg2 -> thread-msg1 -> thread-msg2
expect(result).toHaveLength(4);
expect(result[0].id).toBe('msg1');
expect(result[1].id).toBe('msg2');
expect(result[2].id).toBe('thread-msg1');
expect(result[3].id).toBe('thread-msg2');
});
it('should exclude messages from other threads', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
// Create two threads
await trx.insert(threads).values([
{
id: 'thread1',
userId,
topicId: 'topic1',
sourceMessageId: 'msg1',
type: 'continuation',
},
{
id: 'thread2',
userId,
topicId: 'topic1',
sourceMessageId: 'msg1',
type: 'continuation',
},
]);
await trx.insert(messages).values([
{
id: 'msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'source message',
createdAt: new Date('2023-01-01'),
},
{
id: 'thread1-msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: 'thread1',
role: 'user',
content: 'thread 1 message',
createdAt: new Date('2023-01-02'),
},
{
id: 'thread2-msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: 'thread2',
role: 'user',
content: 'thread 2 message',
createdAt: new Date('2023-01-02T01:00:00'),
},
]);
});
const result = await messageModel.query({ threadId: 'thread1' });
// Should include parent (msg1) + thread1 messages only
// Should NOT include thread2 messages
expect(result).toHaveLength(2);
expect(result.map((m) => m.id)).toEqual(['msg1', 'thread1-msg1']);
});
});
describe('getThreadParentMessages', () => {
it('should return only source message for Standalone thread type', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
await trx.insert(messages).values([
{
id: 'msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'first',
createdAt: new Date('2023-01-01'),
},
{
id: 'msg2',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'assistant',
content: 'second - source',
createdAt: new Date('2023-01-02'),
},
{
id: 'msg3',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'third',
createdAt: new Date('2023-01-03'),
},
]);
});
const result = await messageModel.getThreadParentMessages({
sourceMessageId: 'msg2',
topicId: 'topic1',
threadType: 'standalone' as any,
});
// Standalone should only return the source message
expect(result).toHaveLength(1);
expect(result[0].id).toBe('msg2');
});
it('should return all messages up to source message for Continuation thread type', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
await trx.insert(messages).values([
{
id: 'msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'first',
createdAt: new Date('2023-01-01'),
},
{
id: 'msg2',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'assistant',
content: 'second - source',
createdAt: new Date('2023-01-02'),
},
{
id: 'msg3',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'third - after source',
createdAt: new Date('2023-01-03'),
},
]);
});
const result = await messageModel.getThreadParentMessages({
sourceMessageId: 'msg2',
topicId: 'topic1',
threadType: 'continuation' as any,
});
// Continuation should return msg1 and msg2 (up to and including source)
expect(result).toHaveLength(2);
expect(result[0].id).toBe('msg1');
expect(result[1].id).toBe('msg2');
});
it('should exclude messages from other threads in Continuation mode', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
// Create thread first due to foreign key constraint
await trx.insert(threads).values([
{
id: 'thread1',
userId,
topicId: 'topic1',
sourceMessageId: 'msg3',
type: 'continuation',
},
]);
await trx.insert(messages).values([
{
id: 'msg1',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'first',
createdAt: new Date('2023-01-01'),
},
{
id: 'msg2',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'assistant',
content: 'second',
createdAt: new Date('2023-01-02'),
},
{
id: 'msg-in-thread',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: 'thread1',
role: 'user',
content: 'message in thread',
createdAt: new Date('2023-01-02T12:00:00'), // Same day but in thread
},
{
id: 'msg3',
userId,
sessionId: 'session1',
topicId: 'topic1',
threadId: null,
role: 'user',
content: 'third - source',
createdAt: new Date('2023-01-03'),
},
]);
});
const result = await messageModel.getThreadParentMessages({
sourceMessageId: 'msg3',
topicId: 'topic1',
threadType: 'continuation' as any,
});
// Should include msg1, msg2, msg3
// Should NOT include msg-in-thread (belongs to another thread)
expect(result).toHaveLength(3);
expect(result.map((m) => m.id)).toEqual(['msg1', 'msg2', 'msg3']);
});
it('should return empty array when source message not found', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
});
const result = await messageModel.getThreadParentMessages({
sourceMessageId: 'non-existent',
topicId: 'topic1',
threadType: 'continuation' as any,
});
expect(result).toHaveLength(0);
});
it('should include source message even with microsecond timestamp precision difference', async () => {
// This test simulates the production scenario where:
// - PostgreSQL timestamptz has microsecond precision (e.g., 06:56:15.804448)
// - JavaScript Date only has millisecond precision (e.g., 06:56:15.804)
// When Drizzle reads the timestamp, microseconds are truncated, causing lte comparison to fail
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
// Insert messages with raw SQL to set microsecond-precision timestamps
// msg1: 06:56:15.074286 (earlier)
// msg2: 06:56:15.804448 (source message with microseconds)
await trx.execute(sql`
INSERT INTO messages (id, user_id, session_id, topic_id, thread_id, role, content, created_at)
VALUES
('msg1', ${userId}, 'session1', 'topic1', NULL, 'user', 'first message', '2023-01-01 06:56:15.074286+00'),
('msg2', ${userId}, 'session1', 'topic1', NULL, 'assistant', 'source message', '2023-01-01 06:56:15.804448+00')
`);
});
const result = await messageModel.getThreadParentMessages({
sourceMessageId: 'msg2',
topicId: 'topic1',
threadType: 'continuation' as any,
});
// Should include BOTH messages, including source message (msg2)
// This tests the fix for microsecond precision issue
expect(result).toHaveLength(2);
expect(result.map((m) => m.id)).toEqual(['msg1', 'msg2']);
});
});
});
+6 -1
View File
@@ -517,6 +517,8 @@ export class MessageModel {
if (!sourceMessage) return [];
// Get all main conversation messages up to and including the source message
// Use `or` with explicit id match to handle timestamp precision issues
// (JavaScript Date has millisecond precision, but PostgreSQL timestamptz has microsecond precision)
const result = await this.db
.select()
.from(messages)
@@ -525,7 +527,10 @@ export class MessageModel {
eq(messages.userId, this.userId),
eq(messages.topicId, topicId),
isNull(messages.threadId), // Only main conversation messages (not in any thread)
lte(messages.createdAt, sourceMessage.createdAt),
or(
lte(messages.createdAt, sourceMessage.createdAt),
eq(messages.id, sourceMessageId), // Ensure source message is always included
),
),
)
.orderBy(asc(messages.createdAt));
+1 -1
View File
@@ -38,6 +38,7 @@ const SideBarDrawer = memo<SideBarDrawerProps>(
rootStyle={{
position: 'absolute',
}}
size={280}
styles={{
body: {
background: theme.colorBgLayout,
@@ -78,7 +79,6 @@ const SideBarDrawer = memo<SideBarDrawerProps>(
{subHeader}
</>
}
width={280}
>
<Suspense
fallback={
+35 -16
View File
@@ -14,8 +14,10 @@ import {
useConversationStore,
} from '@/features/Conversation';
import SkeletonList from '@/features/Conversation/components/SkeletonList';
import { useOperationState } from '@/hooks/useOperationState';
import { useChatStore } from '@/store/chat';
import { threadSelectors } from '@/store/chat/selectors';
import { MessageMapKeyInput, messageMapKey } from '@/store/chat/utils/messageMapKey';
import ThreadDivider from './ThreadDivider';
import { useThreadActionsBarConfig } from './useThreadActionsBarConfig';
@@ -102,13 +104,8 @@ ThreadChatContent.displayName = 'ThreadChatContent';
* Thread Chat Component
*
* Two modes:
* 1. With portalThreadId: Uses ConversationProvider to fetch complete thread data from backend
* 2. Without portalThreadId (creating new thread): Uses main conversation's displayMessages slice
*
* Thread context is determined by:
* - agentId: current active agent
* - topicId: current active topic
* - threadId: portal thread ID (optional)
* 1. Creating new thread (!portalThreadId): Uses 'thread_xxx_new' key (isNew: true)
* 2. Existing thread (portalThreadId): Uses 'thread_xxx_topicId_threadId' key
*/
const ThreadChat = memo(() => {
// Get thread context from ChatStore
@@ -121,12 +118,6 @@ const ThreadChat = memo(() => {
s.newThreadMode,
]);
// When creating new thread (no portalThreadId), get messages from main conversation
// Use s.portalThreadId directly to avoid stale closure
const messagesFromMain = useChatStore((s) =>
!s.portalThreadId ? threadSelectors.portalDisplayChats(s) : undefined,
);
// Get thread-specific actionsBar config
const actionsBarConfig = useThreadActionsBarConfig();
@@ -134,6 +125,7 @@ const ThreadChat = memo(() => {
// When creating new thread (!portalThreadId), use isNew + scope: 'thread'
const isCreatingNewThread = !portalThreadId && !!threadStartMessageId;
// Context for ConversationProvider (includes sourceMessageId/threadType for new thread creation)
const context: ConversationContext = useMemo(
() => ({
agentId: activeAgentId,
@@ -155,7 +147,29 @@ const ThreadChat = memo(() => {
isCreatingNewThread,
],
);
console.log('Thread Chat', context);
// Context for messageMapKey (only needs fields used in key generation)
const keyContext = useMemo<MessageMapKeyInput>(
() => ({
agentId: activeAgentId,
isNew: isCreatingNewThread,
scope: 'thread',
threadId: portalThreadId,
topicId: activeTopicId,
}),
[activeAgentId, activeTopicId, portalThreadId, isCreatingNewThread],
);
// Generate messageMapKey for direct subscription to dbMessagesMap
const chatKey = useMemo(() => messageMapKey(keyContext), [keyContext]);
// Subscribe directly to dbMessagesMap for reactive updates
// This ensures optimistic updates work (read/write use same key)
const replaceMessages = useChatStore((s) => s.replaceMessages);
const messages = useChatStore((s) => s.dbMessagesMap[chatKey]);
// Get operation state for reactive updates
const operationState = useOperationState(context);
// Hooks to handle post-message-creation tasks for new thread
const hooks: ConversationHooks = useMemo(
@@ -187,9 +201,14 @@ const ThreadChat = memo(() => {
<ConversationProvider
actionsBar={actionsBarConfig}
context={context}
hasInitMessages={!!messages}
hooks={hooks}
messages={messagesFromMain}
skipFetch={!!messagesFromMain}
messages={messages}
onMessagesChange={(msgs) => {
replaceMessages(msgs, { context });
}}
operationState={operationState}
skipFetch={isCreatingNewThread}
>
<ThreadChatContent />
</ConversationProvider>
+1
View File
@@ -181,6 +181,7 @@ export const aiChatRouter = router({
groupId: input.groupId,
includeTopic: isCreateNewTopic,
sessionId,
threadId,
topicId,
});
+1
View File
@@ -25,6 +25,7 @@ export class AiChatService {
includeTopic?: boolean;
pageSize?: number;
sessionId?: string;
threadId?: string;
topicId?: string;
}) {
const [messages, topics] = await Promise.all([
+22
View File
@@ -18,7 +18,9 @@ import { systemAgentSelectors } from '@/store/user/selectors';
import { merge } from '@/utils/merge';
import { setNamespace } from '@/utils/storeDebug';
import { displayMessageSelectors } from '../message/selectors';
import { ThreadDispatch, threadReducer } from './reducer';
import { genParentMessages } from './selectors';
const n = setNamespace('thd');
const SWR_USE_FETCH_THREADS = 'SWR_USE_FETCH_THREADS';
@@ -61,6 +63,26 @@ export const chatThreadMessage: StateCreator<
},
openThreadCreator: (messageId) => {
const { activeAgentId, activeTopicId, newThreadMode, replaceMessages } = get();
// Get parent messages up to and including the source message
const displayMessages = displayMessageSelectors.activeDisplayMessages(get());
// Filter out messages that have threadId (they belong to other threads)
const mainMessages = displayMessages.filter((m) => !m.threadId);
const parentMessages = genParentMessages(mainMessages, messageId, newThreadMode);
// Initialize messages in thread scope for optimistic update
// This ensures the UI can display messages immediately
if (parentMessages.length > 0) {
const context = {
agentId: activeAgentId,
isNew: true,
scope: 'thread' as const,
topicId: activeTopicId,
};
replaceMessages(parentMessages, { action: 'initThreadMessages', context });
}
set(
{ threadStartMessageId: messageId, portalThreadId: undefined, startToForkThread: true },
false,
+66 -151
View File
@@ -1,4 +1,3 @@
import { THREAD_DRAFT_ID } from '@lobechat/const';
import { ThreadItem, UIChatMessage } from '@lobechat/types';
import { useAgentStore } from '@/store/agent';
@@ -7,7 +6,9 @@ import type { ChatStoreState } from '@/store/chat';
import { chatHelpers } from '@/store/chat/helpers';
import { displayMessageSelectors } from '../../message/selectors';
import { genMessage } from './util';
import { genParentMessages } from './util';
// ============= Thread List Selectors ============= //
const currentTopicThreads = (s: ChatStoreState) => {
if (!s.activeTopicId) return [];
@@ -23,135 +24,12 @@ const currentPortalThread = (s: ChatStoreState): ThreadItem | undefined => {
return threads.find((t) => t.id === s.portalThreadId);
};
const threadStartMessageId = (s: ChatStoreState) => s.threadStartMessageId;
const threadSourceMessageId = (s: ChatStoreState) => {
if (s.startToForkThread) return threadStartMessageId(s);
const portalThread = currentPortalThread(s);
return portalThread?.sourceMessageId;
};
const getTheadParentMessages = (s: ChatStoreState, data: UIChatMessage[]) => {
if (s.startToForkThread) {
const startMessageId = threadStartMessageId(s)!;
// 存在 threadId 的消息是子消息,在创建付消息时需要忽略
const messages = data.filter((m) => !m.threadId);
return genMessage(messages, startMessageId, s.newThreadMode);
}
const portalThread = currentPortalThread(s);
return genMessage(data, portalThread?.sourceMessageId, portalThread?.type);
};
// ======= Portal Thread Display Chats ======= //
// =========================================== //
/**
* 获取当前 thread 的父级消息
*/
const portalDisplayParentMessages = (s: ChatStoreState): UIChatMessage[] => {
const data = displayMessageSelectors.activeDisplayMessages(s);
return getTheadParentMessages(s, data);
};
/**
* these messages are the messages that are in the thread
*
*/
const portalDisplayChildChatsByThreadId =
(id?: string) =>
(s: ChatStoreState): UIChatMessage[] => {
// skip tool message
const data = displayMessageSelectors.activeDisplayMessages(s);
return data.filter((m) => !!id && m.threadId === id);
};
const portalDisplayChats = (s: ChatStoreState) => {
const parentMessages = portalDisplayParentMessages(s);
const afterMessages = portalDisplayChildChatsByThreadId(s.portalThreadId)(s);
// use for optimistic update
const draftMessage = displayMessageSelectors
.activeDisplayMessages(s)
.find((m) => m.threadId === THREAD_DRAFT_ID);
return [...parentMessages, draftMessage, ...afterMessages].filter(Boolean) as UIChatMessage[];
};
const portalDisplayChatsLength = (s: ChatStoreState) => {
// history length include a thread divider
return portalDisplayChats(s).length;
};
const portalDisplayChatsString = (s: ChatStoreState) => {
const messages = portalDisplayChats(s);
return messages.map((m) => m.content).join('');
};
const portalDisplayChatIDs = (s: ChatStoreState): string[] =>
portalDisplayChats(s).map((i) => i.id);
// ========= Portal Thread AI Chats ========= //
// ========================================== //
const portalAIParentMessages = (s: ChatStoreState): UIChatMessage[] => {
const data = displayMessageSelectors.activeDisplayMessages(s);
return getTheadParentMessages(s, data);
};
const portalAIChildChatsByThreadId =
(id?: string) =>
(s: ChatStoreState): UIChatMessage[] => {
// skip tool message
const data = displayMessageSelectors.activeDisplayMessages(s);
return data.filter((m) => !!id && m.threadId === id);
};
const portalAIChats = (s: ChatStoreState) => {
const parentMessages = portalAIParentMessages(s);
const afterMessages = portalAIChildChatsByThreadId(s.portalThreadId)(s);
return [...parentMessages, ...afterMessages].filter(Boolean) as UIChatMessage[];
};
const portalAIChatsWithHistoryConfig = (s: ChatStoreState) => {
const parentMessages = portalAIParentMessages(s);
const afterMessages = portalAIChildChatsByThreadId(s.portalThreadId)(s);
const messages = [...parentMessages, ...afterMessages].filter(Boolean) as UIChatMessage[];
const enableHistoryCount = agentChatConfigSelectors.enableHistoryCount(useAgentStore.getState());
const historyCount = agentChatConfigSelectors.historyCount(useAgentStore.getState());
return chatHelpers.getSlicedMessages(messages, {
enableHistoryCount,
historyCount,
});
};
const threadSourceMessageIndex = (s: ChatStoreState) => {
const theadMessageId = threadSourceMessageId(s);
const data = portalDisplayChats(s);
return !theadMessageId ? -1 : data.findIndex((d) => d.id === theadMessageId);
};
const getThreadsByTopic = (topicId?: string) => (s: ChatStoreState) => {
if (!topicId) return;
return s.threadMaps[topicId];
};
const getFirstThreadBySourceMsgId = (id: string) => (s: ChatStoreState) => {
const threads = currentTopicThreads(s);
return threads.find((t) => t.sourceMessageId === id);
};
const getThreadsBySourceMsgId = (id: string) => (s: ChatStoreState) => {
const threads = currentTopicThreads(s);
@@ -164,43 +42,80 @@ const hasThreadBySourceMsgId = (id: string) => (s: ChatStoreState) => {
return threads.some((t) => t.sourceMessageId === id);
};
const isThreadAIGenerating = (s: ChatStoreState) => {
const { operationSelectors } = require('../../operation/selectors');
return operationSelectors.isAnyMessageLoading(portalDisplayChatIDs(s))(s);
};
const isCreatingMessage = (s: ChatStoreState) => s.isCreatingThreadMessage;
const isHasMessageLoading = (s: ChatStoreState) =>
s.messageLoadingIds.some((id) => portalDisplayChatIDs(s).includes(id));
// ============= Thread Messages Selectors ============= //
// These are kept for Token calculation and AI title summarization
// Thread Chat component now uses dbMessagesMap directly
/**
* this function is used to determine whether the send button should be disabled
* Internal helper to get parent messages for a thread
*/
const isSendButtonDisabledByMessage = (s: ChatStoreState) =>
// 1. when there is message loading
isHasMessageLoading(s) ||
// 2. when is creating the topic
s.isCreatingThread ||
// 3. when is creating the message
isCreatingMessage(s);
const getThreadParentMessages = (s: ChatStoreState, data: UIChatMessage[]) => {
if (s.startToForkThread) {
const startMessageId = s.threadStartMessageId!;
// Filter out messages that belong to other threads
const messages = data.filter((m) => !m.threadId);
return genParentMessages(messages, startMessageId, s.newThreadMode);
}
const portalThread = currentPortalThread(s);
return genParentMessages(data, portalThread?.sourceMessageId, portalThread?.type);
};
/**
* Get thread child messages by thread ID
*/
const getThreadChildMessages =
(id?: string) =>
(s: ChatStoreState): UIChatMessage[] => {
const data = displayMessageSelectors.activeDisplayMessages(s);
return data.filter((m) => !!id && m.threadId === id);
};
/**
* Portal AI chats - used for AI title summarization
*/
const portalAIChats = (s: ChatStoreState) => {
const data = displayMessageSelectors.activeDisplayMessages(s);
const parentMessages = getThreadParentMessages(s, data);
const childMessages = getThreadChildMessages(s.portalThreadId)(s);
return [...parentMessages, ...childMessages].filter(Boolean) as UIChatMessage[];
};
/**
* Portal AI chats with history config - used for workflow
*/
const portalAIChatsWithHistoryConfig = (s: ChatStoreState) => {
const messages = portalAIChats(s);
const enableHistoryCount = agentChatConfigSelectors.enableHistoryCount(useAgentStore.getState());
const historyCount = agentChatConfigSelectors.historyCount(useAgentStore.getState());
return chatHelpers.getSlicedMessages(messages, {
enableHistoryCount,
historyCount,
});
};
/**
* Portal display chats string - used for Token calculation
*/
const portalDisplayChatsString = (s: ChatStoreState) => {
const messages = portalAIChats(s);
return messages.map((m) => m.content).join('');
};
export const threadSelectors = {
currentPortalThread,
currentTopicThreads,
getFirstThreadBySourceMsgId,
getThreadsBySourceMsgId,
getThreadsByTopic,
hasThreadBySourceMsgId,
isSendButtonDisabledByMessage,
isThreadAIGenerating,
portalAIChats,
portalAIChatsWithHistoryConfig,
portalDisplayChatIDs,
portalDisplayChats,
portalDisplayChatsLength,
portalDisplayChatsString,
portalDisplayChildChatsByThreadId,
threadSourceMessageId,
threadSourceMessageIndex,
threadStartMessageId,
};
// Re-export utility function for use in action.ts
export { genParentMessages } from './util';
@@ -1,6 +1,12 @@
import { IThreadType, ThreadType, UIChatMessage } from '@lobechat/types';
export const genMessage = (
/**
* Generate parent messages for thread display
* Based on thread type:
* - Standalone: only include the source message
* - Continuation: include all messages up to and including the source message
*/
export const genParentMessages = (
messages: UIChatMessage[],
startMessageId: string | null | undefined,
threadMode?: IThreadType,