️ perf(app,database): derive topic activity from messages (#15726)

This commit is contained in:
Neko
2026-06-13 00:57:45 +08:00
committed by GitHub
parent 52eaf2702e
commit f31c94490d
9 changed files with 166 additions and 80 deletions
@@ -119,7 +119,7 @@ describe('aiChatRouter', () => {
expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledTimes(1); expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledTimes(1);
expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledWith( expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledWith(
expect.any(Object), expect.any(Object),
expect.objectContaining({ touchTopicUpdatedAt: false }), expect.not.objectContaining({ touchTopicUpdatedAt: expect.anything() }),
); );
expect(mockGet).toHaveBeenCalledWith( expect(mockGet).toHaveBeenCalledWith(
@@ -161,7 +161,7 @@ describe('aiChatRouter', () => {
expect(mockCreateMessage).toHaveBeenCalled(); expect(mockCreateMessage).toHaveBeenCalled();
expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledWith( expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledWith(
expect.any(Object), expect.any(Object),
expect.objectContaining({ touchTopicUpdatedAt: true }), expect.not.objectContaining({ touchTopicUpdatedAt: expect.anything() }),
); );
expect(mockGet).toHaveBeenCalledWith( expect(mockGet).toHaveBeenCalledWith(
expect.objectContaining({ expect.objectContaining({
-1
View File
@@ -370,7 +370,6 @@ export const aiChatRouter = router({
{ assistantMessage, userMessage }, { assistantMessage, userMessage },
{ {
...(modelTiming ? { timing: modelTiming } : {}), ...(modelTiming ? { timing: modelTiming } : {}),
touchTopicUpdatedAt: !isCreateNewTopic,
}, },
); );
}, },
@@ -339,18 +339,15 @@ describe('MessageModel Create Tests', () => {
(event) => event === 'db.message.createUserAndAssistant.messages.insert:start', (event) => event === 'db.message.createUserAndAssistant.messages.insert:start',
), ),
).toHaveLength(1); ).toHaveLength(1);
expect( expect(timingEvents.some((event) => event.includes('topic.touchUpdatedAt'))).toBe(false);
timingEvents.filter(
(event) => event === 'db.message.createUserAndAssistant.topic.touchUpdatedAt:start',
),
).toHaveLength(1);
}); });
it('should skip topic touch when creating a pair for an already-created topic', async () => { it('should not touch topic updatedAt when creating a pair for an existing topic', async () => {
await serverDB.insert(topics).values({ await serverDB.insert(topics).values({
id: 'topic-pair-no-touch', id: 'topic-pair-no-touch',
sessionId: '1', sessionId: '1',
title: 'Topic pair no touch', title: 'Topic pair no touch',
updatedAt: new Date('2024-01-01T00:00:00Z'),
userId, userId,
}); });
@@ -376,9 +373,11 @@ describe('MessageModel Create Tests', () => {
timing: { timing: {
log: (event) => timingEvents.push(event), log: (event) => timingEvents.push(event),
}, },
touchTopicUpdatedAt: false,
}, },
); );
const topic = await serverDB.query.topics.findFirst({
where: (table, { eq }) => eq(table.id, 'topic-pair-no-touch'),
});
expect(result.userMessage.id).toBeDefined(); expect(result.userMessage.id).toBeDefined();
expect(result.assistantMessage.parentId).toBe(result.userMessage.id); expect(result.assistantMessage.parentId).toBe(result.userMessage.id);
@@ -387,11 +386,8 @@ describe('MessageModel Create Tests', () => {
(event) => event === 'db.message.createUserAndAssistant.messages.insert:start', (event) => event === 'db.message.createUserAndAssistant.messages.insert:start',
), ),
).toHaveLength(1); ).toHaveLength(1);
expect( expect(timingEvents.some((event) => event.includes('topic.touchUpdatedAt'))).toBe(false);
timingEvents.filter( expect(topic?.updatedAt.toISOString()).toBe('2024-01-01T00:00:00.000Z');
(event) => event === 'db.message.createUserAndAssistant.topic.touchUpdatedAt:start',
),
).toHaveLength(0);
}); });
describe('create with advanced parameters', () => { describe('create with advanced parameters', () => {
@@ -2,7 +2,16 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest'; import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { getTestDB } from '../../core/getTestDB'; import { getTestDB } from '../../core/getTestDB';
import { agents, chatGroups, documents, knowledgeBases, tasks, topics, users } from '../../schemas'; import {
agents,
chatGroups,
documents,
knowledgeBases,
messages,
tasks,
topics,
users,
} from '../../schemas';
import type { LobeChatDatabase } from '../../type'; import type { LobeChatDatabase } from '../../type';
import { RecentModel } from '../recent'; import { RecentModel } from '../recent';
@@ -104,6 +113,41 @@ describe('RecentModel', () => {
}); });
}); });
it('orders topic rows by latest message activity', async () => {
await serverDB.insert(agents).values({ id: 'agent-activity', userId, virtual: false });
await serverDB.insert(topics).values([
{
agentId: 'agent-activity',
id: 'topic-old-row-latest-message',
title: 'latest message wins',
updatedAt: minutesAgo(30),
userId,
},
{
agentId: 'agent-activity',
id: 'topic-new-row-old-message',
title: 'newer topic row',
updatedAt: minutesAgo(5),
userId,
},
]);
await serverDB.insert(messages).values({
id: 'recent-topic-latest-message',
role: 'user',
topicId: 'topic-old-row-latest-message',
updatedAt: now(),
userId,
});
const result = await recentModel.queryRecent();
expect(result.map((row) => row.id)).toEqual([
'topic-old-row-latest-message',
'topic-new-row-old-message',
]);
expect(result[0].updatedAt.getTime()).toBeGreaterThan(result[1].updatedAt.getTime());
});
it('includes topics on non-virtual non-group agents', async () => { it('includes topics on non-virtual non-group agents', async () => {
await serverDB.insert(agents).values({ id: 'agent-real', userId, virtual: false }); await serverDB.insert(agents).values({ id: 'agent-real', userId, virtual: false });
@@ -162,7 +162,7 @@ describe('TopicModel - Query', () => {
]); ]);
}); });
it('should keep updatedAt ordering by default (no sortBy)', async () => { it('should order by latest message activity by default (no sortBy)', async () => {
await serverDB.insert(topics).values([ await serverDB.insert(topics).values([
{ {
id: 'waiting', id: 'waiting',
@@ -173,11 +173,27 @@ describe('TopicModel - Query', () => {
}, },
{ id: 'active', sessionId, updatedAt: new Date('2023-05-01'), userId }, { id: 'active', sessionId, updatedAt: new Date('2023-05-01'), userId },
]); ]);
await serverDB.insert(messages).values([
{
id: 'waiting-latest-message',
role: 'user',
topicId: 'waiting',
updatedAt: new Date('2023-06-01'),
userId,
},
{
id: 'active-older-message',
role: 'user',
topicId: 'active',
updatedAt: new Date('2023-04-01'),
userId,
},
]);
const result = await topicModel.query({ containerId: sessionId }); const result = await topicModel.query({ containerId: sessionId });
// Without status sort, most-recently-updated wins even if lower priority // Without status sort, most-recent message activity wins even if topic.updatedAt is older.
expect(result.items.map((t) => t.id)).toEqual(['active', 'waiting']); expect(result.items.map((t) => t.id)).toEqual(['waiting', 'active']);
}); });
it('should query topics with pagination', async () => { it('should query topics with pagination', async () => {
@@ -1591,6 +1607,43 @@ describe('TopicModel - Query', () => {
expect(result).toHaveLength(2); expect(result).toHaveLength(2);
}); });
it('should order recent topics by latest message activity', async () => {
await serverDB.transaction(async (tx) => {
await tx.insert(agents).values([{ id: 'activity-agent', userId, title: 'Activity Agent' }]);
await tx.insert(topics).values([
{
agentId: 'activity-agent',
id: 'activity-topic-old-topic-row',
title: 'Older topic row',
updatedAt: new Date('2023-01-01'),
userId,
},
{
agentId: 'activity-agent',
id: 'activity-topic-new-topic-row',
title: 'Newer topic row',
updatedAt: new Date('2023-05-01'),
userId,
},
]);
await tx.insert(messages).values({
id: 'activity-topic-latest-message',
role: 'user',
topicId: 'activity-topic-old-topic-row',
updatedAt: new Date('2023-06-01'),
userId,
});
});
const result = await topicModel.queryRecent();
expect(result.map((topic) => topic.id)).toEqual([
'activity-topic-old-topic-row',
'activity-topic-new-topic-row',
]);
expect(result[0].updatedAt.toISOString()).toBe('2023-06-01T00:00:00.000Z');
});
it('should return null agentId when topic has groupId but no agentId', async () => { it('should return null agentId when topic has groupId but no agentId', async () => {
// Topics with groupId are included even without agentId // Topics with groupId are included even without agentId
await serverDB.transaction(async (tx) => { await serverDB.transaction(async (tx) => {
+1 -49
View File
@@ -156,7 +156,6 @@ interface CreateUserAndAssistantMessagesParams {
interface CreateUserAndAssistantMessagesOptions { interface CreateUserAndAssistantMessagesOptions {
timing?: ModelTimingContext; timing?: ModelTimingContext;
touchTopicUpdatedAt?: boolean;
} }
interface CreateMessageInsertParams { interface CreateMessageInsertParams {
@@ -219,22 +218,6 @@ export class MessageModel {
private agentsToSessionsOwnership = () => private agentsToSessionsOwnership = () =>
buildWorkspaceWhere({ userId: this.userId, workspaceId: this.workspaceId }, agentsToSessions); buildWorkspaceWhere({ userId: this.userId, workspaceId: this.workspaceId }, agentsToSessions);
/**
* Touch topics' updatedAt timestamp within a transaction
*/
private async touchTopicUpdatedAt(trx: Transaction, topicIds: string[]) {
if (topicIds.length === 0) return;
await trx
.update(topics)
.set({ updatedAt: new Date() })
.where(
and(
inArray(topics.id, topicIds),
buildWorkspaceWhere({ userId: this.userId, workspaceId: this.workspaceId }, topics),
),
);
}
// **************** Query *************** // // **************** Query *************** //
/** /**
@@ -1877,16 +1860,6 @@ export class MessageModel {
this.db.transaction(async (trx) => { this.db.transaction(async (trx) => {
const item = await this.createInTransaction(trx, params, id, timing); const item = await this.createInTransaction(trx, params, id, timing);
// Touch topic's updatedAt when creating a message in a topic
if (params.topicId) {
await runTimedStage(
timing,
'db.message.create.topic.touchUpdatedAt',
() => this.touchTopicUpdatedAt(trx, [params.topicId!]),
{ topicCount: 1 },
);
}
return item; return item;
}), }),
{ {
@@ -1900,7 +1873,7 @@ export class MessageModel {
createUserAndAssistantMessages = async ( createUserAndAssistantMessages = async (
{ userMessage, assistantMessage }: CreateUserAndAssistantMessagesParams, { userMessage, assistantMessage }: CreateUserAndAssistantMessagesParams,
{ timing, touchTopicUpdatedAt = true }: CreateUserAndAssistantMessagesOptions = {}, { timing }: CreateUserAndAssistantMessagesOptions = {},
): Promise<{ assistantMessage: DBMessageItem; userMessage: DBMessageItem }> => { ): Promise<{ assistantMessage: DBMessageItem; userMessage: DBMessageItem }> => {
const userMessageId = this.genId(); const userMessageId = this.genId();
const assistantMessageId = this.genId(); const assistantMessageId = this.genId();
@@ -1964,15 +1937,6 @@ export class MessageModel {
'db.message.createUserAndAssistant.assistant', 'db.message.createUserAndAssistant.assistant',
); );
if (touchTopicUpdatedAt && topicIds.length > 0) {
await runTimedStage(
timing,
'db.message.createUserAndAssistant.topic.touchUpdatedAt',
() => this.touchTopicUpdatedAt(trx, topicIds),
{ topicCount: topicIds.length },
);
}
const userMessageItem = messageMap.get(userMessageId); const userMessageItem = messageMap.get(userMessageId);
const assistantMessageItem = messageMap.get(assistantMessageId); const assistantMessageItem = messageMap.get(assistantMessageId);
@@ -1999,13 +1963,9 @@ export class MessageModel {
), ),
); );
const topicIds = [...new Set(newMessages.map((m) => m.topicId).filter(Boolean))] as string[];
return this.db.transaction(async (trx) => { return this.db.transaction(async (trx) => {
const result = await trx.insert(messages).values(messagesToInsert); const result = await trx.insert(messages).values(messagesToInsert);
await this.touchTopicUpdatedAt(trx, topicIds);
return result; return result;
}); });
}; };
@@ -2094,15 +2054,7 @@ export class MessageModel {
{ hasMetadata: !!metadataPatch, valueKeys: Object.keys(message) }, { hasMetadata: !!metadataPatch, valueKeys: Object.keys(message) },
); );
// Touch topic's updatedAt when updating a message
if (updated?.topicId) { if (updated?.topicId) {
await runTimedStage(
timing,
'db.message.update.topic.touchUpdatedAt',
() => this.touchTopicUpdatedAt(trx, [updated.topicId!]),
{ topicCount: 1 },
);
// When this write carries token usage (assistant finalize / hetero // When this write carries token usage (assistant finalize / hetero
// step), recompute the topic's denormalized usage rollup from its // step), recompute the topic's denormalized usage rollup from its
// messages. Gated on the *incoming* payload so streaming // messages. Gated on the *incoming* payload so streaming
+13 -2
View File
@@ -2,7 +2,7 @@ import type { TaskStatus } from '@lobechat/types';
import { and, desc, eq, inArray, isNotNull, isNull, ne, not, or, sql } from 'drizzle-orm'; import { and, desc, eq, inArray, isNotNull, isNull, ne, not, or, sql } from 'drizzle-orm';
import { unionAll } from 'drizzle-orm/pg-core'; import { unionAll } from 'drizzle-orm/pg-core';
import { agents, DOCUMENT_FOLDER_TYPE, documents, tasks, topics } from '../schemas'; import { agents, DOCUMENT_FOLDER_TYPE, documents, messages, tasks, topics } from '../schemas';
import type { LobeChatDatabase } from '../type'; import type { LobeChatDatabase } from '../type';
import { buildWorkspaceWhere } from '../utils/workspace'; import { buildWorkspaceWhere } from '../utils/workspace';
@@ -48,6 +48,17 @@ export class RecentModel {
const taskScopeWhere = this.workspaceId const taskScopeWhere = this.workspaceId
? eq(tasks.workspaceId, this.workspaceId) ? eq(tasks.workspaceId, this.workspaceId)
: and(eq(tasks.createdByUserId, this.userId), isNull(tasks.workspaceId)); : and(eq(tasks.createdByUserId, this.userId), isNull(tasks.workspaceId));
const latestTopicMessageAtSubquery = this.db
.select({ value: messages.updatedAt })
.from(messages)
.where(and(eq(messages.topicId, topics.id), buildWorkspaceWhere(scope, messages)))
.orderBy(desc(messages.updatedAt))
.limit(1);
const topicActivityAt =
sql<Date>`COALESCE((${latestTopicMessageAtSubquery}), ${topics.updatedAt})`.mapWith(
topics.updatedAt,
);
const topicArm = this.db const topicArm = this.db
.select({ .select({
@@ -58,7 +69,7 @@ export class RecentModel {
status: sql<TaskStatus | null>`NULL`.as('status'), status: sql<TaskStatus | null>`NULL`.as('status'),
title: sql<string>`COALESCE(${topics.title}, 'Untitled Topic')`.as('title'), title: sql<string>`COALESCE(${topics.title}, 'Untitled Topic')`.as('title'),
type: sql<RecentDbItem['type']>`'topic'`.as('type'), type: sql<RecentDbItem['type']>`'topic'`.as('type'),
updatedAt: topics.updatedAt, updatedAt: topicActivityAt.as('updated_at'),
}) })
.from(topics) .from(topics)
.leftJoin(agents, eq(topics.agentId, agents.id)) .leftJoin(agents, eq(topics.agentId, agents.id))
+28 -6
View File
@@ -134,10 +134,10 @@ const STATUS_SORT_RANK = sql`CASE ${topics.status}
// Favorites always float to the top; the rest are ordered by the requested // Favorites always float to the top; the rest are ordered by the requested
// strategy. `status` adds the priority bucket before the recency tiebreaker. // strategy. `status` adds the priority bucket before the recency tiebreaker.
const buildTopicOrderBy = (sortBy?: TopicQuerySortBy): SQL[] => const buildTopicOrderBy = (topicActivityAt: SQL, sortBy?: TopicQuerySortBy): SQL[] =>
sortBy === 'status' sortBy === 'status'
? [desc(topics.favorite), asc(STATUS_SORT_RANK), desc(topics.updatedAt)] ? [desc(topics.favorite), asc(STATUS_SORT_RANK), desc(topicActivityAt)]
: [desc(topics.favorite), desc(topics.updatedAt)]; : [desc(topics.favorite), desc(topicActivityAt)];
export class TopicModel { export class TopicModel {
private userId: string; private userId: string;
@@ -171,7 +171,6 @@ export class TopicModel {
triggers, triggers,
withDetails = false, withDetails = false,
}: QueryTopicParams = {}) => { }: QueryTopicParams = {}) => {
const orderBy = buildTopicOrderBy(sortBy);
const queryStartedAt = Date.now(); const queryStartedAt = Date.now();
logTiming(timing, 'db.topic.query:start', { logTiming(timing, 'db.topic.query:start', {
current, current,
@@ -208,6 +207,17 @@ export class TopicModel {
.select({ value: sql<number>`count(*)::int` }) .select({ value: sql<number>`count(*)::int` })
.from(messages) .from(messages)
.where(eq(messages.topicId, topics.id)); .where(eq(messages.topicId, topics.id));
const latestMessageAtSubquery = this.db
.select({ value: messages.updatedAt })
.from(messages)
.where(and(eq(messages.topicId, topics.id), this.messageOwnership()))
.orderBy(desc(messages.updatedAt))
.limit(1);
const topicActivityAt =
sql<Date>`COALESCE((${latestMessageAtSubquery}), ${topics.updatedAt})`.mapWith(
topics.updatedAt,
);
const orderBy = buildTopicOrderBy(topicActivityAt, sortBy);
const detailColumns = withDetails const detailColumns = withDetails
? { ? {
@@ -556,6 +566,17 @@ export class TopicModel {
* - For inbox: includes topics with slug='inbox' * - For inbox: includes topics with slug='inbox'
*/ */
queryRecent = async (limit: number = 12) => { queryRecent = async (limit: number = 12) => {
const latestMessageAtSubquery = this.db
.select({ value: messages.updatedAt })
.from(messages)
.where(and(eq(messages.topicId, topics.id), this.messageOwnership()))
.orderBy(desc(messages.updatedAt))
.limit(1);
const topicActivityAt =
sql<Date>`COALESCE((${latestMessageAtSubquery}), ${topics.updatedAt})`.mapWith(
topics.updatedAt,
);
const result = await this.db const result = await this.db
.select({ .select({
agentId: topics.agentId, agentId: topics.agentId,
@@ -563,7 +584,7 @@ export class TopicModel {
id: topics.id, id: topics.id,
sessionId: topics.sessionId, sessionId: topics.sessionId,
title: topics.title, title: topics.title,
updatedAt: topics.updatedAt, updatedAt: topicActivityAt,
}) })
.from(topics) .from(topics)
.leftJoin(agents, eq(topics.agentId, agents.id)) .leftJoin(agents, eq(topics.agentId, agents.id))
@@ -580,12 +601,13 @@ export class TopicModel {
), ),
), ),
) )
.orderBy(desc(topics.updatedAt)) .orderBy(desc(topicActivityAt))
.limit(limit); .limit(limit);
return result.map((item) => ({ return result.map((item) => ({
...item, ...item,
type: item.groupId ? ('group' as const) : ('agent' as const), type: item.groupId ? ('group' as const) : ('agent' as const),
updatedAt: item.updatedAt instanceof Date ? item.updatedAt : new Date(item.updatedAt),
})); }));
}; };
+13 -4
View File
@@ -52,10 +52,15 @@ const num = (v: unknown): number => (v == null ? 0 : Number(v));
* usage (e.g. after deletions), the columns are reset to NULL ("not measured"), * usage (e.g. after deletions), the columns are reset to NULL ("not measured"),
* so deletes / regenerations are reflected correctly. * so deletes / regenerations are reflected correctly.
* *
* NOTE: this writes through drizzle, whose `topics.updatedAt` has `$onUpdate`, * Keep activity timestamps stable: recency is derived from `messages.updated_at`,
* so calling it bumps `updated_at`. That's intended for the live path (the * so this projection update must not bump `topics.updated_at` / `accessed_at`.
* topic is active anyway). The historical backfill must NOT use this — it runs * Drizzle `$onUpdate` is bypassed by explicitly assigning the columns to
* its own raw-SQL aggregate that leaves `updated_at` untouched. * themselves.
*
* TODO: This still updates the `topics` row for usage/cost/token rollups. Under
* high-concurrency assistant finalization for the same topic, it can still
* serialize on the topic row lock. Consider moving this projection to a
* debounced/asynchronous rollup or a separate per-topic usage table.
*/ */
export const recomputeTopicUsage = async ( export const recomputeTopicUsage = async (
trx: Transaction, trx: Transaction,
@@ -99,6 +104,7 @@ export const recomputeTopicUsage = async (
await trx await trx
.update(topics) .update(topics)
.set({ .set({
accessedAt: topics.accessedAt,
cost: null, cost: null,
model: null, model: null,
provider: null, provider: null,
@@ -106,6 +112,7 @@ export const recomputeTopicUsage = async (
totalInputTokens: null, totalInputTokens: null,
totalOutputTokens: null, totalOutputTokens: null,
totalTokens: null, totalTokens: null,
updatedAt: topics.updatedAt,
usage: null, usage: null,
}) })
.where(and(eq(topics.id, topicId), buildWorkspaceWhere({ userId, workspaceId }, topics))); .where(and(eq(topics.id, topicId), buildWorkspaceWhere({ userId, workspaceId }, topics)));
@@ -191,6 +198,7 @@ export const recomputeTopicUsage = async (
await trx await trx
.update(topics) .update(topics)
.set({ .set({
accessedAt: topics.accessedAt,
cost, cost,
model: primary?.model ?? null, model: primary?.model ?? null,
provider: primary?.provider ?? null, provider: primary?.provider ?? null,
@@ -198,6 +206,7 @@ export const recomputeTopicUsage = async (
totalInputTokens, totalInputTokens,
totalOutputTokens, totalOutputTokens,
totalTokens, totalTokens,
updatedAt: topics.updatedAt,
usage, usage,
}) })
.where(and(eq(topics.id, topicId), buildWorkspaceWhere({ userId, workspaceId }, topics))); .where(and(eq(topics.id, topicId), buildWorkspaceWhere({ userId, workspaceId }, topics)));