diff --git a/apps/server/src/routers/lambda/__tests__/aiChat.test.ts b/apps/server/src/routers/lambda/__tests__/aiChat.test.ts index b7335dd71d..be44a249f8 100644 --- a/apps/server/src/routers/lambda/__tests__/aiChat.test.ts +++ b/apps/server/src/routers/lambda/__tests__/aiChat.test.ts @@ -119,7 +119,7 @@ describe('aiChatRouter', () => { expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledTimes(1); expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledWith( expect.any(Object), - expect.objectContaining({ touchTopicUpdatedAt: false }), + expect.not.objectContaining({ touchTopicUpdatedAt: expect.anything() }), ); expect(mockGet).toHaveBeenCalledWith( @@ -161,7 +161,7 @@ describe('aiChatRouter', () => { expect(mockCreateMessage).toHaveBeenCalled(); expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledWith( expect.any(Object), - expect.objectContaining({ touchTopicUpdatedAt: true }), + expect.not.objectContaining({ touchTopicUpdatedAt: expect.anything() }), ); expect(mockGet).toHaveBeenCalledWith( expect.objectContaining({ diff --git a/apps/server/src/routers/lambda/aiChat.ts b/apps/server/src/routers/lambda/aiChat.ts index 12bf7af7c7..891464babc 100644 --- a/apps/server/src/routers/lambda/aiChat.ts +++ b/apps/server/src/routers/lambda/aiChat.ts @@ -370,7 +370,6 @@ export const aiChatRouter = router({ { assistantMessage, userMessage }, { ...(modelTiming ? { timing: modelTiming } : {}), - touchTopicUpdatedAt: !isCreateNewTopic, }, ); }, diff --git a/packages/database/src/models/__tests__/messages/message.create.test.ts b/packages/database/src/models/__tests__/messages/message.create.test.ts index ae4c2cf147..9637ab074f 100644 --- a/packages/database/src/models/__tests__/messages/message.create.test.ts +++ b/packages/database/src/models/__tests__/messages/message.create.test.ts @@ -339,18 +339,15 @@ describe('MessageModel Create Tests', () => { (event) => event === 'db.message.createUserAndAssistant.messages.insert:start', ), ).toHaveLength(1); - expect( - timingEvents.filter( - (event) => event === 'db.message.createUserAndAssistant.topic.touchUpdatedAt:start', - ), - ).toHaveLength(1); + expect(timingEvents.some((event) => event.includes('topic.touchUpdatedAt'))).toBe(false); }); - it('should skip topic touch when creating a pair for an already-created topic', async () => { + it('should not touch topic updatedAt when creating a pair for an existing topic', async () => { await serverDB.insert(topics).values({ id: 'topic-pair-no-touch', sessionId: '1', title: 'Topic pair no touch', + updatedAt: new Date('2024-01-01T00:00:00Z'), userId, }); @@ -376,9 +373,11 @@ describe('MessageModel Create Tests', () => { timing: { log: (event) => timingEvents.push(event), }, - touchTopicUpdatedAt: false, }, ); + const topic = await serverDB.query.topics.findFirst({ + where: (table, { eq }) => eq(table.id, 'topic-pair-no-touch'), + }); expect(result.userMessage.id).toBeDefined(); expect(result.assistantMessage.parentId).toBe(result.userMessage.id); @@ -387,11 +386,8 @@ describe('MessageModel Create Tests', () => { (event) => event === 'db.message.createUserAndAssistant.messages.insert:start', ), ).toHaveLength(1); - expect( - timingEvents.filter( - (event) => event === 'db.message.createUserAndAssistant.topic.touchUpdatedAt:start', - ), - ).toHaveLength(0); + expect(timingEvents.some((event) => event.includes('topic.touchUpdatedAt'))).toBe(false); + expect(topic?.updatedAt.toISOString()).toBe('2024-01-01T00:00:00.000Z'); }); describe('create with advanced parameters', () => { diff --git a/packages/database/src/models/__tests__/recent.test.ts b/packages/database/src/models/__tests__/recent.test.ts index cdd2aa7967..52a8713a1a 100644 --- a/packages/database/src/models/__tests__/recent.test.ts +++ b/packages/database/src/models/__tests__/recent.test.ts @@ -2,7 +2,16 @@ import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { getTestDB } from '../../core/getTestDB'; -import { agents, chatGroups, documents, knowledgeBases, tasks, topics, users } from '../../schemas'; +import { + agents, + chatGroups, + documents, + knowledgeBases, + messages, + tasks, + topics, + users, +} from '../../schemas'; import type { LobeChatDatabase } from '../../type'; import { RecentModel } from '../recent'; @@ -104,6 +113,41 @@ describe('RecentModel', () => { }); }); + it('orders topic rows by latest message activity', async () => { + await serverDB.insert(agents).values({ id: 'agent-activity', userId, virtual: false }); + await serverDB.insert(topics).values([ + { + agentId: 'agent-activity', + id: 'topic-old-row-latest-message', + title: 'latest message wins', + updatedAt: minutesAgo(30), + userId, + }, + { + agentId: 'agent-activity', + id: 'topic-new-row-old-message', + title: 'newer topic row', + updatedAt: minutesAgo(5), + userId, + }, + ]); + await serverDB.insert(messages).values({ + id: 'recent-topic-latest-message', + role: 'user', + topicId: 'topic-old-row-latest-message', + updatedAt: now(), + userId, + }); + + const result = await recentModel.queryRecent(); + + expect(result.map((row) => row.id)).toEqual([ + 'topic-old-row-latest-message', + 'topic-new-row-old-message', + ]); + expect(result[0].updatedAt.getTime()).toBeGreaterThan(result[1].updatedAt.getTime()); + }); + it('includes topics on non-virtual non-group agents', async () => { await serverDB.insert(agents).values({ id: 'agent-real', userId, virtual: false }); diff --git a/packages/database/src/models/__tests__/topics/topic.query.test.ts b/packages/database/src/models/__tests__/topics/topic.query.test.ts index 34d38341c2..c8e82409d6 100644 --- a/packages/database/src/models/__tests__/topics/topic.query.test.ts +++ b/packages/database/src/models/__tests__/topics/topic.query.test.ts @@ -162,7 +162,7 @@ describe('TopicModel - Query', () => { ]); }); - it('should keep updatedAt ordering by default (no sortBy)', async () => { + it('should order by latest message activity by default (no sortBy)', async () => { await serverDB.insert(topics).values([ { id: 'waiting', @@ -173,11 +173,27 @@ describe('TopicModel - Query', () => { }, { id: 'active', sessionId, updatedAt: new Date('2023-05-01'), userId }, ]); + await serverDB.insert(messages).values([ + { + id: 'waiting-latest-message', + role: 'user', + topicId: 'waiting', + updatedAt: new Date('2023-06-01'), + userId, + }, + { + id: 'active-older-message', + role: 'user', + topicId: 'active', + updatedAt: new Date('2023-04-01'), + userId, + }, + ]); const result = await topicModel.query({ containerId: sessionId }); - // Without status sort, most-recently-updated wins even if lower priority - expect(result.items.map((t) => t.id)).toEqual(['active', 'waiting']); + // Without status sort, most-recent message activity wins even if topic.updatedAt is older. + expect(result.items.map((t) => t.id)).toEqual(['waiting', 'active']); }); it('should query topics with pagination', async () => { @@ -1591,6 +1607,43 @@ describe('TopicModel - Query', () => { expect(result).toHaveLength(2); }); + it('should order recent topics by latest message activity', async () => { + await serverDB.transaction(async (tx) => { + await tx.insert(agents).values([{ id: 'activity-agent', userId, title: 'Activity Agent' }]); + await tx.insert(topics).values([ + { + agentId: 'activity-agent', + id: 'activity-topic-old-topic-row', + title: 'Older topic row', + updatedAt: new Date('2023-01-01'), + userId, + }, + { + agentId: 'activity-agent', + id: 'activity-topic-new-topic-row', + title: 'Newer topic row', + updatedAt: new Date('2023-05-01'), + userId, + }, + ]); + await tx.insert(messages).values({ + id: 'activity-topic-latest-message', + role: 'user', + topicId: 'activity-topic-old-topic-row', + updatedAt: new Date('2023-06-01'), + userId, + }); + }); + + const result = await topicModel.queryRecent(); + + expect(result.map((topic) => topic.id)).toEqual([ + 'activity-topic-old-topic-row', + 'activity-topic-new-topic-row', + ]); + expect(result[0].updatedAt.toISOString()).toBe('2023-06-01T00:00:00.000Z'); + }); + it('should return null agentId when topic has groupId but no agentId', async () => { // Topics with groupId are included even without agentId await serverDB.transaction(async (tx) => { diff --git a/packages/database/src/models/message.ts b/packages/database/src/models/message.ts index c2959c9f8d..79d45eeea5 100644 --- a/packages/database/src/models/message.ts +++ b/packages/database/src/models/message.ts @@ -156,7 +156,6 @@ interface CreateUserAndAssistantMessagesParams { interface CreateUserAndAssistantMessagesOptions { timing?: ModelTimingContext; - touchTopicUpdatedAt?: boolean; } interface CreateMessageInsertParams { @@ -219,22 +218,6 @@ export class MessageModel { private agentsToSessionsOwnership = () => buildWorkspaceWhere({ userId: this.userId, workspaceId: this.workspaceId }, agentsToSessions); - /** - * Touch topics' updatedAt timestamp within a transaction - */ - private async touchTopicUpdatedAt(trx: Transaction, topicIds: string[]) { - if (topicIds.length === 0) return; - await trx - .update(topics) - .set({ updatedAt: new Date() }) - .where( - and( - inArray(topics.id, topicIds), - buildWorkspaceWhere({ userId: this.userId, workspaceId: this.workspaceId }, topics), - ), - ); - } - // **************** Query *************** // /** @@ -1877,16 +1860,6 @@ export class MessageModel { this.db.transaction(async (trx) => { const item = await this.createInTransaction(trx, params, id, timing); - // Touch topic's updatedAt when creating a message in a topic - if (params.topicId) { - await runTimedStage( - timing, - 'db.message.create.topic.touchUpdatedAt', - () => this.touchTopicUpdatedAt(trx, [params.topicId!]), - { topicCount: 1 }, - ); - } - return item; }), { @@ -1900,7 +1873,7 @@ export class MessageModel { createUserAndAssistantMessages = async ( { userMessage, assistantMessage }: CreateUserAndAssistantMessagesParams, - { timing, touchTopicUpdatedAt = true }: CreateUserAndAssistantMessagesOptions = {}, + { timing }: CreateUserAndAssistantMessagesOptions = {}, ): Promise<{ assistantMessage: DBMessageItem; userMessage: DBMessageItem }> => { const userMessageId = this.genId(); const assistantMessageId = this.genId(); @@ -1964,15 +1937,6 @@ export class MessageModel { 'db.message.createUserAndAssistant.assistant', ); - if (touchTopicUpdatedAt && topicIds.length > 0) { - await runTimedStage( - timing, - 'db.message.createUserAndAssistant.topic.touchUpdatedAt', - () => this.touchTopicUpdatedAt(trx, topicIds), - { topicCount: topicIds.length }, - ); - } - const userMessageItem = messageMap.get(userMessageId); const assistantMessageItem = messageMap.get(assistantMessageId); @@ -1999,13 +1963,9 @@ export class MessageModel { ), ); - const topicIds = [...new Set(newMessages.map((m) => m.topicId).filter(Boolean))] as string[]; - return this.db.transaction(async (trx) => { const result = await trx.insert(messages).values(messagesToInsert); - await this.touchTopicUpdatedAt(trx, topicIds); - return result; }); }; @@ -2094,15 +2054,7 @@ export class MessageModel { { hasMetadata: !!metadataPatch, valueKeys: Object.keys(message) }, ); - // Touch topic's updatedAt when updating a message if (updated?.topicId) { - await runTimedStage( - timing, - 'db.message.update.topic.touchUpdatedAt', - () => this.touchTopicUpdatedAt(trx, [updated.topicId!]), - { topicCount: 1 }, - ); - // When this write carries token usage (assistant finalize / hetero // step), recompute the topic's denormalized usage rollup from its // messages. Gated on the *incoming* payload so streaming diff --git a/packages/database/src/models/recent.ts b/packages/database/src/models/recent.ts index 7edabe6fbc..dfbc04b91e 100644 --- a/packages/database/src/models/recent.ts +++ b/packages/database/src/models/recent.ts @@ -2,7 +2,7 @@ import type { TaskStatus } from '@lobechat/types'; import { and, desc, eq, inArray, isNotNull, isNull, ne, not, or, sql } from 'drizzle-orm'; import { unionAll } from 'drizzle-orm/pg-core'; -import { agents, DOCUMENT_FOLDER_TYPE, documents, tasks, topics } from '../schemas'; +import { agents, DOCUMENT_FOLDER_TYPE, documents, messages, tasks, topics } from '../schemas'; import type { LobeChatDatabase } from '../type'; import { buildWorkspaceWhere } from '../utils/workspace'; @@ -48,6 +48,17 @@ export class RecentModel { const taskScopeWhere = this.workspaceId ? eq(tasks.workspaceId, this.workspaceId) : and(eq(tasks.createdByUserId, this.userId), isNull(tasks.workspaceId)); + const latestTopicMessageAtSubquery = this.db + .select({ value: messages.updatedAt }) + .from(messages) + .where(and(eq(messages.topicId, topics.id), buildWorkspaceWhere(scope, messages))) + .orderBy(desc(messages.updatedAt)) + .limit(1); + + const topicActivityAt = + sql`COALESCE((${latestTopicMessageAtSubquery}), ${topics.updatedAt})`.mapWith( + topics.updatedAt, + ); const topicArm = this.db .select({ @@ -58,7 +69,7 @@ export class RecentModel { status: sql`NULL`.as('status'), title: sql`COALESCE(${topics.title}, 'Untitled Topic')`.as('title'), type: sql`'topic'`.as('type'), - updatedAt: topics.updatedAt, + updatedAt: topicActivityAt.as('updated_at'), }) .from(topics) .leftJoin(agents, eq(topics.agentId, agents.id)) diff --git a/packages/database/src/models/topic.ts b/packages/database/src/models/topic.ts index c05ce9e513..ffdb6ab339 100644 --- a/packages/database/src/models/topic.ts +++ b/packages/database/src/models/topic.ts @@ -134,10 +134,10 @@ const STATUS_SORT_RANK = sql`CASE ${topics.status} // Favorites always float to the top; the rest are ordered by the requested // strategy. `status` adds the priority bucket before the recency tiebreaker. -const buildTopicOrderBy = (sortBy?: TopicQuerySortBy): SQL[] => +const buildTopicOrderBy = (topicActivityAt: SQL, sortBy?: TopicQuerySortBy): SQL[] => sortBy === 'status' - ? [desc(topics.favorite), asc(STATUS_SORT_RANK), desc(topics.updatedAt)] - : [desc(topics.favorite), desc(topics.updatedAt)]; + ? [desc(topics.favorite), asc(STATUS_SORT_RANK), desc(topicActivityAt)] + : [desc(topics.favorite), desc(topicActivityAt)]; export class TopicModel { private userId: string; @@ -171,7 +171,6 @@ export class TopicModel { triggers, withDetails = false, }: QueryTopicParams = {}) => { - const orderBy = buildTopicOrderBy(sortBy); const queryStartedAt = Date.now(); logTiming(timing, 'db.topic.query:start', { current, @@ -208,6 +207,17 @@ export class TopicModel { .select({ value: sql`count(*)::int` }) .from(messages) .where(eq(messages.topicId, topics.id)); + const latestMessageAtSubquery = this.db + .select({ value: messages.updatedAt }) + .from(messages) + .where(and(eq(messages.topicId, topics.id), this.messageOwnership())) + .orderBy(desc(messages.updatedAt)) + .limit(1); + const topicActivityAt = + sql`COALESCE((${latestMessageAtSubquery}), ${topics.updatedAt})`.mapWith( + topics.updatedAt, + ); + const orderBy = buildTopicOrderBy(topicActivityAt, sortBy); const detailColumns = withDetails ? { @@ -556,6 +566,17 @@ export class TopicModel { * - For inbox: includes topics with slug='inbox' */ queryRecent = async (limit: number = 12) => { + const latestMessageAtSubquery = this.db + .select({ value: messages.updatedAt }) + .from(messages) + .where(and(eq(messages.topicId, topics.id), this.messageOwnership())) + .orderBy(desc(messages.updatedAt)) + .limit(1); + const topicActivityAt = + sql`COALESCE((${latestMessageAtSubquery}), ${topics.updatedAt})`.mapWith( + topics.updatedAt, + ); + const result = await this.db .select({ agentId: topics.agentId, @@ -563,7 +584,7 @@ export class TopicModel { id: topics.id, sessionId: topics.sessionId, title: topics.title, - updatedAt: topics.updatedAt, + updatedAt: topicActivityAt, }) .from(topics) .leftJoin(agents, eq(topics.agentId, agents.id)) @@ -580,12 +601,13 @@ export class TopicModel { ), ), ) - .orderBy(desc(topics.updatedAt)) + .orderBy(desc(topicActivityAt)) .limit(limit); return result.map((item) => ({ ...item, type: item.groupId ? ('group' as const) : ('agent' as const), + updatedAt: item.updatedAt instanceof Date ? item.updatedAt : new Date(item.updatedAt), })); }; diff --git a/packages/database/src/models/topicUsage.ts b/packages/database/src/models/topicUsage.ts index 695a435b12..cd2e7221a6 100644 --- a/packages/database/src/models/topicUsage.ts +++ b/packages/database/src/models/topicUsage.ts @@ -52,10 +52,15 @@ const num = (v: unknown): number => (v == null ? 0 : Number(v)); * usage (e.g. after deletions), the columns are reset to NULL ("not measured"), * so deletes / regenerations are reflected correctly. * - * NOTE: this writes through drizzle, whose `topics.updatedAt` has `$onUpdate`, - * so calling it bumps `updated_at`. That's intended for the live path (the - * topic is active anyway). The historical backfill must NOT use this — it runs - * its own raw-SQL aggregate that leaves `updated_at` untouched. + * Keep activity timestamps stable: recency is derived from `messages.updated_at`, + * so this projection update must not bump `topics.updated_at` / `accessed_at`. + * Drizzle `$onUpdate` is bypassed by explicitly assigning the columns to + * themselves. + * + * TODO: This still updates the `topics` row for usage/cost/token rollups. Under + * high-concurrency assistant finalization for the same topic, it can still + * serialize on the topic row lock. Consider moving this projection to a + * debounced/asynchronous rollup or a separate per-topic usage table. */ export const recomputeTopicUsage = async ( trx: Transaction, @@ -99,6 +104,7 @@ export const recomputeTopicUsage = async ( await trx .update(topics) .set({ + accessedAt: topics.accessedAt, cost: null, model: null, provider: null, @@ -106,6 +112,7 @@ export const recomputeTopicUsage = async ( totalInputTokens: null, totalOutputTokens: null, totalTokens: null, + updatedAt: topics.updatedAt, usage: null, }) .where(and(eq(topics.id, topicId), buildWorkspaceWhere({ userId, workspaceId }, topics))); @@ -191,6 +198,7 @@ export const recomputeTopicUsage = async ( await trx .update(topics) .set({ + accessedAt: topics.accessedAt, cost, model: primary?.model ?? null, provider: primary?.provider ?? null, @@ -198,6 +206,7 @@ export const recomputeTopicUsage = async ( totalInputTokens, totalOutputTokens, totalTokens, + updatedAt: topics.updatedAt, usage, }) .where(and(eq(topics.id, topicId), buildWorkspaceWhere({ userId, workspaceId }, topics)));