️ perf(app,database): derive topic activity from messages (#15726)

This commit is contained in:
Neko
2026-06-13 00:57:45 +08:00
committed by Neko Ayaka
parent 553d3d8fc7
commit 33ffc88656
9 changed files with 166 additions and 80 deletions
@@ -119,7 +119,7 @@ describe('aiChatRouter', () => {
expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledTimes(1);
expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({ touchTopicUpdatedAt: false }),
expect.not.objectContaining({ touchTopicUpdatedAt: expect.anything() }),
);
expect(mockGet).toHaveBeenCalledWith(
@@ -161,7 +161,7 @@ describe('aiChatRouter', () => {
expect(mockCreateMessage).toHaveBeenCalled();
expect(mockCreateUserAndAssistantMessages).toHaveBeenCalledWith(
expect.any(Object),
expect.objectContaining({ touchTopicUpdatedAt: true }),
expect.not.objectContaining({ touchTopicUpdatedAt: expect.anything() }),
);
expect(mockGet).toHaveBeenCalledWith(
expect.objectContaining({
-1
View File
@@ -330,7 +330,6 @@ export const aiChatRouter = router({
{ assistantMessage, userMessage },
{
...(modelTiming ? { timing: modelTiming } : {}),
touchTopicUpdatedAt: !isCreateNewTopic,
},
);
},
@@ -339,18 +339,15 @@ describe('MessageModel Create Tests', () => {
(event) => event === 'db.message.createUserAndAssistant.messages.insert:start',
),
).toHaveLength(1);
expect(
timingEvents.filter(
(event) => event === 'db.message.createUserAndAssistant.topic.touchUpdatedAt:start',
),
).toHaveLength(1);
expect(timingEvents.some((event) => event.includes('topic.touchUpdatedAt'))).toBe(false);
});
it('should skip topic touch when creating a pair for an already-created topic', async () => {
it('should not touch topic updatedAt when creating a pair for an existing topic', async () => {
await serverDB.insert(topics).values({
id: 'topic-pair-no-touch',
sessionId: '1',
title: 'Topic pair no touch',
updatedAt: new Date('2024-01-01T00:00:00Z'),
userId,
});
@@ -376,9 +373,11 @@ describe('MessageModel Create Tests', () => {
timing: {
log: (event) => timingEvents.push(event),
},
touchTopicUpdatedAt: false,
},
);
const topic = await serverDB.query.topics.findFirst({
where: (table, { eq }) => eq(table.id, 'topic-pair-no-touch'),
});
expect(result.userMessage.id).toBeDefined();
expect(result.assistantMessage.parentId).toBe(result.userMessage.id);
@@ -387,11 +386,8 @@ describe('MessageModel Create Tests', () => {
(event) => event === 'db.message.createUserAndAssistant.messages.insert:start',
),
).toHaveLength(1);
expect(
timingEvents.filter(
(event) => event === 'db.message.createUserAndAssistant.topic.touchUpdatedAt:start',
),
).toHaveLength(0);
expect(timingEvents.some((event) => event.includes('topic.touchUpdatedAt'))).toBe(false);
expect(topic?.updatedAt.toISOString()).toBe('2024-01-01T00:00:00.000Z');
});
describe('create with advanced parameters', () => {
@@ -2,7 +2,16 @@
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { getTestDB } from '../../core/getTestDB';
import { agents, chatGroups, documents, knowledgeBases, tasks, topics, users } from '../../schemas';
import {
agents,
chatGroups,
documents,
knowledgeBases,
messages,
tasks,
topics,
users,
} from '../../schemas';
import type { LobeChatDatabase } from '../../type';
import { RecentModel } from '../recent';
@@ -104,6 +113,41 @@ describe('RecentModel', () => {
});
});
it('orders topic rows by latest message activity', async () => {
await serverDB.insert(agents).values({ id: 'agent-activity', userId, virtual: false });
await serverDB.insert(topics).values([
{
agentId: 'agent-activity',
id: 'topic-old-row-latest-message',
title: 'latest message wins',
updatedAt: minutesAgo(30),
userId,
},
{
agentId: 'agent-activity',
id: 'topic-new-row-old-message',
title: 'newer topic row',
updatedAt: minutesAgo(5),
userId,
},
]);
await serverDB.insert(messages).values({
id: 'recent-topic-latest-message',
role: 'user',
topicId: 'topic-old-row-latest-message',
updatedAt: now(),
userId,
});
const result = await recentModel.queryRecent();
expect(result.map((row) => row.id)).toEqual([
'topic-old-row-latest-message',
'topic-new-row-old-message',
]);
expect(result[0].updatedAt.getTime()).toBeGreaterThan(result[1].updatedAt.getTime());
});
it('includes topics on non-virtual non-group agents', async () => {
await serverDB.insert(agents).values({ id: 'agent-real', userId, virtual: false });
@@ -162,7 +162,7 @@ describe('TopicModel - Query', () => {
]);
});
it('should keep updatedAt ordering by default (no sortBy)', async () => {
it('should order by latest message activity by default (no sortBy)', async () => {
await serverDB.insert(topics).values([
{
id: 'waiting',
@@ -173,11 +173,27 @@ describe('TopicModel - Query', () => {
},
{ id: 'active', sessionId, updatedAt: new Date('2023-05-01'), userId },
]);
await serverDB.insert(messages).values([
{
id: 'waiting-latest-message',
role: 'user',
topicId: 'waiting',
updatedAt: new Date('2023-06-01'),
userId,
},
{
id: 'active-older-message',
role: 'user',
topicId: 'active',
updatedAt: new Date('2023-04-01'),
userId,
},
]);
const result = await topicModel.query({ containerId: sessionId });
// Without status sort, most-recently-updated wins even if lower priority
expect(result.items.map((t) => t.id)).toEqual(['active', 'waiting']);
// Without status sort, most-recent message activity wins even if topic.updatedAt is older.
expect(result.items.map((t) => t.id)).toEqual(['waiting', 'active']);
});
it('should query topics with pagination', async () => {
@@ -1591,6 +1607,43 @@ describe('TopicModel - Query', () => {
expect(result).toHaveLength(2);
});
it('should order recent topics by latest message activity', async () => {
await serverDB.transaction(async (tx) => {
await tx.insert(agents).values([{ id: 'activity-agent', userId, title: 'Activity Agent' }]);
await tx.insert(topics).values([
{
agentId: 'activity-agent',
id: 'activity-topic-old-topic-row',
title: 'Older topic row',
updatedAt: new Date('2023-01-01'),
userId,
},
{
agentId: 'activity-agent',
id: 'activity-topic-new-topic-row',
title: 'Newer topic row',
updatedAt: new Date('2023-05-01'),
userId,
},
]);
await tx.insert(messages).values({
id: 'activity-topic-latest-message',
role: 'user',
topicId: 'activity-topic-old-topic-row',
updatedAt: new Date('2023-06-01'),
userId,
});
});
const result = await topicModel.queryRecent();
expect(result.map((topic) => topic.id)).toEqual([
'activity-topic-old-topic-row',
'activity-topic-new-topic-row',
]);
expect(result[0].updatedAt.toISOString()).toBe('2023-06-01T00:00:00.000Z');
});
it('should return null agentId when topic has groupId but no agentId', async () => {
// Topics with groupId are included even without agentId
await serverDB.transaction(async (tx) => {
+1 -49
View File
@@ -156,7 +156,6 @@ interface CreateUserAndAssistantMessagesParams {
interface CreateUserAndAssistantMessagesOptions {
timing?: ModelTimingContext;
touchTopicUpdatedAt?: boolean;
}
interface CreateMessageInsertParams {
@@ -219,22 +218,6 @@ export class MessageModel {
private agentsToSessionsOwnership = () =>
buildWorkspaceWhere({ userId: this.userId, workspaceId: this.workspaceId }, agentsToSessions);
/**
* Touch topics' updatedAt timestamp within a transaction
*/
private async touchTopicUpdatedAt(trx: Transaction, topicIds: string[]) {
if (topicIds.length === 0) return;
await trx
.update(topics)
.set({ updatedAt: new Date() })
.where(
and(
inArray(topics.id, topicIds),
buildWorkspaceWhere({ userId: this.userId, workspaceId: this.workspaceId }, topics),
),
);
}
// **************** Query *************** //
/**
@@ -1877,16 +1860,6 @@ export class MessageModel {
this.db.transaction(async (trx) => {
const item = await this.createInTransaction(trx, params, id, timing);
// Touch topic's updatedAt when creating a message in a topic
if (params.topicId) {
await runTimedStage(
timing,
'db.message.create.topic.touchUpdatedAt',
() => this.touchTopicUpdatedAt(trx, [params.topicId!]),
{ topicCount: 1 },
);
}
return item;
}),
{
@@ -1900,7 +1873,7 @@ export class MessageModel {
createUserAndAssistantMessages = async (
{ userMessage, assistantMessage }: CreateUserAndAssistantMessagesParams,
{ timing, touchTopicUpdatedAt = true }: CreateUserAndAssistantMessagesOptions = {},
{ timing }: CreateUserAndAssistantMessagesOptions = {},
): Promise<{ assistantMessage: DBMessageItem; userMessage: DBMessageItem }> => {
const userMessageId = this.genId();
const assistantMessageId = this.genId();
@@ -1964,15 +1937,6 @@ export class MessageModel {
'db.message.createUserAndAssistant.assistant',
);
if (touchTopicUpdatedAt && topicIds.length > 0) {
await runTimedStage(
timing,
'db.message.createUserAndAssistant.topic.touchUpdatedAt',
() => this.touchTopicUpdatedAt(trx, topicIds),
{ topicCount: topicIds.length },
);
}
const userMessageItem = messageMap.get(userMessageId);
const assistantMessageItem = messageMap.get(assistantMessageId);
@@ -1999,13 +1963,9 @@ export class MessageModel {
),
);
const topicIds = [...new Set(newMessages.map((m) => m.topicId).filter(Boolean))] as string[];
return this.db.transaction(async (trx) => {
const result = await trx.insert(messages).values(messagesToInsert);
await this.touchTopicUpdatedAt(trx, topicIds);
return result;
});
};
@@ -2094,15 +2054,7 @@ export class MessageModel {
{ hasMetadata: !!metadataPatch, valueKeys: Object.keys(message) },
);
// Touch topic's updatedAt when updating a message
if (updated?.topicId) {
await runTimedStage(
timing,
'db.message.update.topic.touchUpdatedAt',
() => this.touchTopicUpdatedAt(trx, [updated.topicId!]),
{ topicCount: 1 },
);
// When this write carries token usage (assistant finalize / hetero
// step), recompute the topic's denormalized usage rollup from its
// messages. Gated on the *incoming* payload so streaming
+13 -2
View File
@@ -2,7 +2,7 @@ import type { TaskStatus } from '@lobechat/types';
import { and, desc, eq, inArray, isNotNull, isNull, ne, not, or, sql } from 'drizzle-orm';
import { unionAll } from 'drizzle-orm/pg-core';
import { agents, DOCUMENT_FOLDER_TYPE, documents, tasks, topics } from '../schemas';
import { agents, DOCUMENT_FOLDER_TYPE, documents, messages, tasks, topics } from '../schemas';
import type { LobeChatDatabase } from '../type';
import { buildWorkspaceWhere } from '../utils/workspace';
@@ -48,6 +48,17 @@ export class RecentModel {
const taskScopeWhere = this.workspaceId
? eq(tasks.workspaceId, this.workspaceId)
: and(eq(tasks.createdByUserId, this.userId), isNull(tasks.workspaceId));
const latestTopicMessageAtSubquery = this.db
.select({ value: messages.updatedAt })
.from(messages)
.where(and(eq(messages.topicId, topics.id), buildWorkspaceWhere(scope, messages)))
.orderBy(desc(messages.updatedAt))
.limit(1);
const topicActivityAt =
sql<Date>`COALESCE((${latestTopicMessageAtSubquery}), ${topics.updatedAt})`.mapWith(
topics.updatedAt,
);
const topicArm = this.db
.select({
@@ -58,7 +69,7 @@ export class RecentModel {
status: sql<TaskStatus | null>`NULL`.as('status'),
title: sql<string>`COALESCE(${topics.title}, 'Untitled Topic')`.as('title'),
type: sql<RecentDbItem['type']>`'topic'`.as('type'),
updatedAt: topics.updatedAt,
updatedAt: topicActivityAt.as('updated_at'),
})
.from(topics)
.leftJoin(agents, eq(topics.agentId, agents.id))
+28 -6
View File
@@ -134,10 +134,10 @@ const STATUS_SORT_RANK = sql`CASE ${topics.status}
// Favorites always float to the top; the rest are ordered by the requested
// strategy. `status` adds the priority bucket before the recency tiebreaker.
const buildTopicOrderBy = (sortBy?: TopicQuerySortBy): SQL[] =>
const buildTopicOrderBy = (topicActivityAt: SQL, sortBy?: TopicQuerySortBy): SQL[] =>
sortBy === 'status'
? [desc(topics.favorite), asc(STATUS_SORT_RANK), desc(topics.updatedAt)]
: [desc(topics.favorite), desc(topics.updatedAt)];
? [desc(topics.favorite), asc(STATUS_SORT_RANK), desc(topicActivityAt)]
: [desc(topics.favorite), desc(topicActivityAt)];
export class TopicModel {
private userId: string;
@@ -171,7 +171,6 @@ export class TopicModel {
triggers,
withDetails = false,
}: QueryTopicParams = {}) => {
const orderBy = buildTopicOrderBy(sortBy);
const queryStartedAt = Date.now();
logTiming(timing, 'db.topic.query:start', {
current,
@@ -208,6 +207,17 @@ export class TopicModel {
.select({ value: sql<number>`count(*)::int` })
.from(messages)
.where(eq(messages.topicId, topics.id));
const latestMessageAtSubquery = this.db
.select({ value: messages.updatedAt })
.from(messages)
.where(and(eq(messages.topicId, topics.id), this.messageOwnership()))
.orderBy(desc(messages.updatedAt))
.limit(1);
const topicActivityAt =
sql<Date>`COALESCE((${latestMessageAtSubquery}), ${topics.updatedAt})`.mapWith(
topics.updatedAt,
);
const orderBy = buildTopicOrderBy(topicActivityAt, sortBy);
const detailColumns = withDetails
? {
@@ -556,6 +566,17 @@ export class TopicModel {
* - For inbox: includes topics with slug='inbox'
*/
queryRecent = async (limit: number = 12) => {
const latestMessageAtSubquery = this.db
.select({ value: messages.updatedAt })
.from(messages)
.where(and(eq(messages.topicId, topics.id), this.messageOwnership()))
.orderBy(desc(messages.updatedAt))
.limit(1);
const topicActivityAt =
sql<Date>`COALESCE((${latestMessageAtSubquery}), ${topics.updatedAt})`.mapWith(
topics.updatedAt,
);
const result = await this.db
.select({
agentId: topics.agentId,
@@ -563,7 +584,7 @@ export class TopicModel {
id: topics.id,
sessionId: topics.sessionId,
title: topics.title,
updatedAt: topics.updatedAt,
updatedAt: topicActivityAt,
})
.from(topics)
.leftJoin(agents, eq(topics.agentId, agents.id))
@@ -580,12 +601,13 @@ export class TopicModel {
),
),
)
.orderBy(desc(topics.updatedAt))
.orderBy(desc(topicActivityAt))
.limit(limit);
return result.map((item) => ({
...item,
type: item.groupId ? ('group' as const) : ('agent' as const),
updatedAt: item.updatedAt instanceof Date ? item.updatedAt : new Date(item.updatedAt),
}));
};
+13 -4
View File
@@ -52,10 +52,15 @@ const num = (v: unknown): number => (v == null ? 0 : Number(v));
* usage (e.g. after deletions), the columns are reset to NULL ("not measured"),
* so deletes / regenerations are reflected correctly.
*
* NOTE: this writes through drizzle, whose `topics.updatedAt` has `$onUpdate`,
* so calling it bumps `updated_at`. That's intended for the live path (the
* topic is active anyway). The historical backfill must NOT use this it runs
* its own raw-SQL aggregate that leaves `updated_at` untouched.
* Keep activity timestamps stable: recency is derived from `messages.updated_at`,
* so this projection update must not bump `topics.updated_at` / `accessed_at`.
* Drizzle `$onUpdate` is bypassed by explicitly assigning the columns to
* themselves.
*
* TODO: This still updates the `topics` row for usage/cost/token rollups. Under
* high-concurrency assistant finalization for the same topic, it can still
* serialize on the topic row lock. Consider moving this projection to a
* debounced/asynchronous rollup or a separate per-topic usage table.
*/
export const recomputeTopicUsage = async (
trx: Transaction,
@@ -99,6 +104,7 @@ export const recomputeTopicUsage = async (
await trx
.update(topics)
.set({
accessedAt: topics.accessedAt,
cost: null,
model: null,
provider: null,
@@ -106,6 +112,7 @@ export const recomputeTopicUsage = async (
totalInputTokens: null,
totalOutputTokens: null,
totalTokens: null,
updatedAt: topics.updatedAt,
usage: null,
})
.where(and(eq(topics.id, topicId), buildWorkspaceWhere({ userId, workspaceId }, topics)));
@@ -191,6 +198,7 @@ export const recomputeTopicUsage = async (
await trx
.update(topics)
.set({
accessedAt: topics.accessedAt,
cost,
model: primary?.model ?? null,
provider: primary?.provider ?? null,
@@ -198,6 +206,7 @@ export const recomputeTopicUsage = async (
totalInputTokens,
totalOutputTokens,
totalTokens,
updatedAt: topics.updatedAt,
usage,
})
.where(and(eq(topics.id, topicId), buildWorkspaceWhere({ userId, workspaceId }, topics)));