Files
lobe-chat/apps/server/src/routers/lambda/topic.ts
T
Innei 30835b27d2 feat(server): standalone Hono dev runtime for apps/server (dev:hono-lite)
Ports PR #14800's standalone Hono dev runtime onto the new apps/server
package. bun run dev:hono-lite boots Hono on :3011 + Vite on :9876 with
NO Next.js process, serving the full near-parity API surface:

  /trpc/*   - lambda / mobile / tools / async TRPC routers
  /webapi/* - chat, models, createImage, speech, trace
  /market/* - agent / model / plugin market
  /oidc/*   - OIDC provider
  /f/*      - fileProxy + userAvatar
  /api/*    - auth, webhooks, dev, v1, memory (catch-all, registered last)

Better Auth's /api/auth/* and dev-local-login flow run end-to-end;
authenticated tRPC queries return real user data. bun run dev (Next)
stays unbroken.

Squashed from 11 commits ported from the original branch:
  - L0 core: runtime-neutral scheduleAfterResponse (drop next/server
    from apps/server), tRPC runtime handlers + Hono /trpc sub-app,
    standalone Hono root app + Node entry, dev:hono-lite topology
    (devHonoLite.mts + devTopology.ts + viteNodeServer.config.ts),
    Better Auth dev-local-login bootstrap, @lobehub/editor dedupe
  - L1 webapi: chat, models, createImage, speech, trace
  - L2 misc: market, oidc, fileProxy, userAvatar
  - L3 agent + api catch-all: agentStream, agentEvalRunWorkflow,
    agent-hono / workflows-hono mounts, api-hono catch-all

Dev ergonomics:
  - vite-node@3.2.4 as a workspace devDep (was bunx-fetched per-run)
  - npx vite-node instead of bunx vite-node@3.2.4 in dev:hono:server
  - unified process.title across dev nodes: lobe-dev, lobe-dev-hono-lite,
    lobe-dev-hono-${port}, lobe-dev-vite-${platform}, lobe-dev-login
  - fix lobe-dev-proxy-print plugin swallowing Vite's Local/Network
    URLs in printUrls; now: Local + Network + Hono API + Debug Proxy

Docs: apps/server/README.md (routes, ports, dev-login gate, known
gaps, troubleshooting); docs/development/start.mdx EN/zh-CN section.

Tier T1 (dev-runnable only): gray-release machinery, production-
deployable apps/server (T2/T3), and a handful of post-#14800 routes
(oauth/connector/callback, api/dev/test-push, webapi/revalidate,
agent-eval-run extras) are intentionally out of scope.
2026-06-13 16:16:50 +08:00

712 lines
23 KiB
TypeScript

import {
type RecentTopic,
type RecentTopicGroup,
type RecentTopicGroupMember,
} from '@lobechat/types';
import { cleanObject } from '@lobechat/utils';
import { eq, inArray } from 'drizzle-orm';
import { z } from 'zod';
import { withScopedPermission } from '@/business/server/trpc-middlewares/rbacPermission';
import { wsCompatProcedure } from '@/business/server/trpc-middlewares/workspaceAuth';
import { AgentOperationModel } from '@/database/models/agentOperation';
import { MessageModel } from '@/database/models/message';
import { TopicModel } from '@/database/models/topic';
import { TopicShareModel } from '@/database/models/topicShare';
import { AgentMigrationRepo } from '@/database/repositories/agentMigration';
import { TopicImporterRepo } from '@/database/repositories/topicImporter';
import { agents, chatGroups, chatGroupsAgents } from '@/database/schemas';
import { router } from '@/libs/trpc/lambda';
import { serverDatabase } from '@/libs/trpc/lambda/middleware';
import { scheduleAfterResponse } from '@/server/utils/scheduleAfterResponse';
import { type BatchTaskResult } from '@/types/service';
import {
batchResolveAgentIdFromSessions,
resolveAgentIdFromSession,
resolveContext,
} from './_helpers/resolveContext';
import { basicContextSchema } from './_schema/context';
const topicProcedure = wsCompatProcedure.use(serverDatabase).use(async (opts) => {
const { ctx } = opts;
const wsId = ctx.workspaceId ?? undefined;
return opts.next({
ctx: {
agentMigrationRepo: new AgentMigrationRepo(ctx.serverDB, ctx.userId, wsId),
agentOperationModel: new AgentOperationModel(ctx.serverDB, ctx.userId, wsId),
topicImporterRepo: new TopicImporterRepo(ctx.serverDB, ctx.userId, wsId),
topicModel: new TopicModel(ctx.serverDB, ctx.userId, wsId),
topicShareModel: new TopicShareModel(ctx.serverDB, ctx.userId, wsId),
},
});
});
export const topicRouter = router({
getTopicDetail: topicProcedure
.input(z.object({ id: z.string() }))
.query(async ({ input, ctx }) => {
const topic = await ctx.topicModel.findById(input.id);
if (!topic) return null;
return topic;
}),
getTopicContext: topicProcedure
.input(z.object({ topicId: z.string() }))
.query(async ({ input, ctx }) => {
const topic = await ctx.topicModel.findById(input.topicId);
if (!topic) {
return { content: `Topic not found: ${input.topicId}`, success: false };
}
const title = topic.title || 'Untitled';
// Prefer historySummary if available
if (topic.historySummary) {
return {
content: `# Topic: ${title}\n\n## Summary\n${topic.historySummary}`,
success: true,
};
}
// Fallback: fetch recent messages with correct agentId/groupId
const wsId = ctx.workspaceId ?? undefined;
const messageModel = new MessageModel(ctx.serverDB, ctx.userId, wsId);
const messages = await messageModel.query({
agentId: topic.agentId ?? undefined,
groupId: topic.groupId ?? undefined,
topicId: input.topicId,
});
const recentMessages = messages.slice(-30);
const lines = [`# Topic: ${title}`, '', '## Recent Messages', ''];
for (const msg of recentMessages) {
const role =
msg.role === 'user' ? 'User' : msg.role === 'assistant' ? 'Assistant' : msg.role;
const content = (msg.content || '').trim();
if (content) {
lines.push(`**${role}**: ${content}`, '');
}
}
return { content: lines.join('\n'), success: true };
}),
batchCreateTopics: topicProcedure
.use(withScopedPermission('topic:create'))
.input(
z.array(
z
.object({
favorite: z.boolean().optional(),
id: z.string().optional(),
messages: z.array(z.string()).optional(),
title: z.string(),
})
.extend(basicContextSchema.shape),
),
)
.mutation(async ({ input, ctx }): Promise<BatchTaskResult> => {
// Resolve sessionId for each topic
const resolvedTopics = await Promise.all(
input.map(async (item) => {
const { agentId, ...rest } = item;
const resolved = await resolveContext(
{ agentId, sessionId: rest.sessionId },
ctx.serverDB,
ctx.userId,
ctx.workspaceId ?? undefined,
);
return { ...rest, sessionId: resolved.sessionId };
}),
);
const data = await ctx.topicModel.batchCreate(resolvedTopics as any);
return { added: data.length, ids: [], skips: [], success: true };
}),
batchDelete: topicProcedure
.use(withScopedPermission('topic:delete'))
.input(z.object({ ids: z.array(z.string()) }))
.mutation(async ({ input, ctx }) => {
return ctx.topicModel.batchDelete(input.ids);
}),
batchDeleteByAgentId: topicProcedure
.use(withScopedPermission('topic:delete'))
.input(z.object({ agentId: z.string() }))
.mutation(async ({ input, ctx }) => {
return ctx.topicModel.batchDeleteByAgentId(input.agentId);
}),
batchDeleteBySessionId: topicProcedure
.use(withScopedPermission('topic:delete'))
.input(
z.object({
agentId: z.string().optional(),
id: z.string().nullable().optional(),
}),
)
.mutation(async ({ input, ctx }) => {
const resolved = await resolveContext(
{ agentId: input.agentId, sessionId: input.id },
ctx.serverDB,
ctx.userId,
ctx.workspaceId ?? undefined,
);
return ctx.topicModel.batchDeleteBySessionId(resolved.sessionId);
}),
cloneTopic: topicProcedure
.use(withScopedPermission('topic:create'))
.input(z.object({ id: z.string(), newTitle: z.string().optional() }))
.mutation(async ({ input, ctx }) => {
const data = await ctx.topicModel.duplicate(input.id, input.newTitle);
return data.topic.id;
}),
countTopics: topicProcedure
.input(
z
.object({
agentId: z.string().optional(),
containerId: z.string().nullable().optional(),
endDate: z.string().optional(),
range: z.tuple([z.string(), z.string()]).optional(),
startDate: z.string().optional(),
})
.optional(),
)
.query(async ({ ctx, input }) => {
return ctx.topicModel.count(input);
}),
createTopic: topicProcedure
.use(withScopedPermission('topic:create'))
.input(
z
.object({
favorite: z.boolean().optional(),
groupId: z.string().nullable().optional(),
messages: z.array(z.string()).optional(),
title: z.string(),
trigger: z.string().optional(),
})
.extend(basicContextSchema.shape),
)
.mutation(async ({ input, ctx }) => {
const { agentId, ...rest } = input;
const resolved = await resolveContext(
{ agentId, sessionId: rest.sessionId },
ctx.serverDB,
ctx.userId,
ctx.workspaceId ?? undefined,
);
const data = await ctx.topicModel.create({ ...rest, sessionId: resolved.sessionId });
return data.id;
}),
/**
* Disable sharing for a topic (deletes share record)
*/
disableSharing: topicProcedure
.use(withScopedPermission('topic:update'))
.input(z.object({ topicId: z.string() }))
.mutation(async ({ input, ctx }) => {
return ctx.topicShareModel.deleteByTopicId(input.topicId);
}),
/**
* Enable sharing for a topic (creates share record)
*/
enableSharing: topicProcedure
.use(withScopedPermission('topic:update'))
.input(
z.object({
topicId: z.string(),
visibility: z.enum(['private', 'link']).optional(),
}),
)
.mutation(async ({ input, ctx }) => {
return ctx.topicShareModel.create(input.topicId, input.visibility);
}),
getAllTopics: topicProcedure.query(async ({ ctx }) => {
return ctx.topicModel.queryAll();
}),
getShareInfo: topicProcedure
.input(z.object({ topicId: z.string() }))
.query(async ({ input, ctx }) => {
return ctx.topicShareModel.getByTopicId(input.topicId);
}),
getTopics: topicProcedure
.input(
z.object({
agentId: z.string().nullable().optional(),
current: z.number().optional(),
excludeStatuses: z.array(z.string()).optional(),
excludeTriggers: z.array(z.string()).optional(),
groupId: z.string().nullable().optional(),
includeTriggers: z.array(z.string()).optional(),
isInbox: z.boolean().optional(),
pageSize: z.number().max(100).optional(),
sessionId: z.string().nullable().optional(),
/**
* Server-side ordering. Defaults to `updatedAt`; `status` orders by
* status priority for the sidebar "group by status" mode.
*/
sortBy: z.enum(['updatedAt', 'status']).optional(),
triggers: z.array(z.string()).optional(),
/**
* When true, returns extra card-detail columns (firstUserMessage,
* messageCount, cost, tokenUsage, description, trigger). Default false
* so the sidebar list stays cheap — only the management page opts in.
*/
withDetails: z.boolean().optional(),
}),
)
.query(async ({ input, ctx }) => {
const {
sessionId,
isInbox,
groupId,
excludeStatuses,
excludeTriggers,
includeTriggers,
triggers,
...rest
} = input;
// If groupId is provided, query by groupId directly
if (groupId) {
const result = await ctx.topicModel.query({
excludeStatuses,
excludeTriggers,
groupId,
includeTriggers,
triggers,
...rest,
});
return { items: result.items, total: result.total };
}
// If sessionId is provided but no agentId, need to reverse lookup agentId
let effectiveAgentId = rest.agentId;
if (!effectiveAgentId && sessionId) {
effectiveAgentId = await resolveAgentIdFromSession(
sessionId,
ctx.serverDB,
ctx.userId,
ctx.workspaceId ?? undefined,
);
}
const result = await ctx.topicModel.query({
...rest,
agentId: effectiveAgentId,
excludeStatuses,
excludeTriggers,
includeTriggers,
isInbox,
triggers,
});
// Runtime migration: backfill agentId for ALL legacy topics and messages under this agent
const runMigration = async () => {
if (!effectiveAgentId) return;
// Get the associated sessionId for migration
const resolved = await resolveContext(
{ agentId: effectiveAgentId },
ctx.serverDB,
ctx.userId,
ctx.workspaceId ?? undefined,
);
const migrationParams = isInbox
? { agentId: effectiveAgentId, isInbox: true as const, sessionId: resolved.sessionId }
: resolved.sessionId
? { agentId: effectiveAgentId, sessionId: resolved.sessionId }
: null;
if (migrationParams) {
try {
await ctx.agentMigrationRepo.migrateAgentId(migrationParams);
} catch (error) {
console.error('[AgentMigration] Failed to migrate agentId:', error);
}
}
};
// Use Next.js after() for non-blocking execution
scheduleAfterResponse(runMigration, 'AgentMigration:list');
return { items: result.items, total: result.total };
}),
hasTopics: topicProcedure.query(async ({ ctx }) => {
return (await ctx.topicModel.count()) === 0;
}),
importTopic: topicProcedure
.use(withScopedPermission('topic:create'))
.input(
z.object({
agentId: z.string(),
data: z.string(),
groupId: z.string().nullable().optional(),
}),
)
.mutation(async ({ input, ctx }) => {
const result = await ctx.topicImporterRepo.importTopic({
agentId: input.agentId,
data: input.data,
groupId: input.groupId,
});
return result;
}),
getMaxTaskDuration: topicProcedure.query(async ({ ctx }) => {
return ctx.agentOperationModel.getMaxDurationSeconds();
}),
rankTopics: topicProcedure.input(z.number().max(50).optional()).query(async ({ ctx, input }) => {
return ctx.topicModel.rank(input);
}),
recentTopics: topicProcedure
.input(z.object({ limit: z.number().max(50).optional() }).optional())
.query(async ({ ctx, input }): Promise<RecentTopic[]> => {
const recentTopics = await ctx.topicModel.queryRecent(input?.limit ?? 12);
// Separate agent topics and group topics
const agentTopics = recentTopics.filter((t) => t.type === 'agent');
const groupTopics = recentTopics.filter((t) => t.type === 'group');
// Find legacy topics: no agentId but has sessionId
const legacyTopics = agentTopics.filter(
(topic) => topic.agentId === null && topic.sessionId !== null,
);
// Batch resolve agentId for legacy topics
const sessionIds = [...new Set(legacyTopics.map((t) => t.sessionId!))];
const sessionAgentMap = await batchResolveAgentIdFromSessions(
sessionIds,
ctx.serverDB,
ctx.userId,
ctx.workspaceId ?? undefined,
);
// Build agentId map: merge existing agentId with resolved ones
const topicAgentIdMap = new Map<string, string>();
for (const topic of agentTopics) {
if (topic.agentId) {
topicAgentIdMap.set(topic.id, topic.agentId);
} else if (topic.sessionId) {
const resolvedAgentId = sessionAgentMap.get(topic.sessionId);
if (resolvedAgentId) {
topicAgentIdMap.set(topic.id, resolvedAgentId);
}
}
}
// Collect all agentIds to fetch agent info
const allAgentIds = [...new Set(topicAgentIdMap.values())];
// Batch query agent info
const agentInfoMap = new Map<
string,
{ avatar: string | null; backgroundColor: string | null; id: string; title: string | null }
>();
if (allAgentIds.length > 0) {
const agentInfos = await ctx.serverDB
.select({
avatar: agents.avatar,
backgroundColor: agents.backgroundColor,
id: agents.id,
title: agents.title,
})
.from(agents)
.where(inArray(agents.id, allAgentIds));
for (const agent of agentInfos) {
agentInfoMap.set(agent.id, agent);
}
}
// Batch query group info with member avatars
const groupInfoMap = new Map<string, RecentTopicGroup>();
const allGroupIds = [...new Set(groupTopics.map((t) => t.groupId!).filter(Boolean))];
if (allGroupIds.length > 0) {
// Query chat groups
const chatGroupInfos = await ctx.serverDB
.select({
id: chatGroups.id,
title: chatGroups.title,
})
.from(chatGroups)
.where(inArray(chatGroups.id, allGroupIds));
// Query group member agents (get avatar info)
const groupMembersRaw = await ctx.serverDB
.select({
agentAvatar: agents.avatar,
agentBackgroundColor: agents.backgroundColor,
chatGroupId: chatGroupsAgents.chatGroupId,
order: chatGroupsAgents.order,
})
.from(chatGroupsAgents)
.leftJoin(agents, eq(chatGroupsAgents.agentId, agents.id))
.where(inArray(chatGroupsAgents.chatGroupId, allGroupIds));
// Group members by chatGroupId
const groupMembersMap = new Map<string, RecentTopicGroupMember[]>();
for (const member of groupMembersRaw) {
const members = groupMembersMap.get(member.chatGroupId) || [];
members.push({
avatar: member.agentAvatar,
backgroundColor: member.agentBackgroundColor,
});
groupMembersMap.set(member.chatGroupId, members);
}
// Build group info map
for (const group of chatGroupInfos) {
groupInfoMap.set(group.id, {
id: group.id,
members: groupMembersMap.get(group.id) || [],
title: group.title,
});
}
}
// Runtime migration: backfill agentId for legacy topics
const runMigration = async () => {
for (const [sessionId, agentId] of sessionAgentMap) {
try {
await ctx.agentMigrationRepo.migrateAgentId({ agentId, sessionId });
} catch (error) {
console.error('[AgentMigration] Failed to migrate agentId for recentTopics:', error);
}
}
};
// Use Next.js after() for non-blocking execution
scheduleAfterResponse(runMigration, 'AgentMigration:recentTopics');
// Assemble final result
return recentTopics.map((topic) => {
if (topic.type === 'group' && topic.groupId) {
const groupInfo = groupInfoMap.get(topic.groupId);
return {
agent: null,
group: groupInfo ?? null,
id: topic.id,
title: topic.title,
type: 'group' as const,
updatedAt: topic.updatedAt,
};
}
// Agent topic
const agentId = topicAgentIdMap.get(topic.id);
const agentInfo = agentId ? agentInfoMap.get(agentId) : null;
// Always return agent with id if agentId exists (even if avatar/title are null)
// Frontend needs agent.id to generate links
const validAgent = agentInfo ? cleanObject(agentInfo) : null;
return {
agent: validAgent,
group: null,
id: topic.id,
title: topic.title,
type: 'agent' as const,
updatedAt: topic.updatedAt,
};
});
}),
removeAllTopics: topicProcedure
.use(withScopedPermission('topic:delete'))
.mutation(async ({ ctx }) => {
return ctx.topicModel.deleteAll();
}),
removeTopic: topicProcedure
.use(withScopedPermission('topic:delete'))
.input(z.object({ id: z.string() }))
.mutation(async ({ input, ctx }) => {
return ctx.topicModel.delete(input.id);
}),
searchTopics: topicProcedure
.input(
z.object({
agentId: z.string().optional(),
groupId: z.string().nullable().optional(),
keywords: z.string(),
sessionId: z.string().nullable().optional(),
}),
)
.query(async ({ input, ctx }) => {
const resolved = await resolveContext(
{ agentId: input.agentId, sessionId: input.sessionId },
ctx.serverDB,
ctx.userId,
ctx.workspaceId ?? undefined,
);
return ctx.topicModel.queryByKeyword(input.keywords, resolved.sessionId);
}),
/**
* Update share visibility
*/
updateShareVisibility: topicProcedure
.use(withScopedPermission('topic:update'))
.input(
z.object({
topicId: z.string(),
visibility: z.enum(['private', 'link']),
}),
)
.mutation(async ({ input, ctx }) => {
return ctx.topicShareModel.updateVisibility(input.topicId, input.visibility);
}),
updateTopic: topicProcedure
.use(withScopedPermission('topic:update'))
.input(
z.object({
id: z.string(),
value: z.object({
agentId: z.string().optional(),
completedAt: z.date().nullable().optional(),
favorite: z.boolean().optional(),
historySummary: z.string().optional(),
messages: z.array(z.string()).optional(),
metadata: z
.object({
model: z.string().optional(),
provider: z.string().optional(),
})
.optional(),
sessionId: z.string().optional(),
status: z
.enum([
'active',
'running',
'paused',
'waitingForHuman',
'failed',
'completed',
'archived',
])
.nullable()
.optional(),
title: z.string().optional(),
}),
}),
)
.mutation(async ({ input, ctx }) => {
const { agentId, ...restValue } = input.value;
// If agentId is provided, resolve to sessionId
let resolvedSessionId = restValue.sessionId;
if (agentId && !resolvedSessionId) {
const resolved = await resolveContext(
{ agentId },
ctx.serverDB,
ctx.userId,
ctx.workspaceId ?? undefined,
);
resolvedSessionId = resolved.sessionId ?? undefined;
}
return ctx.topicModel.update(input.id, { ...restValue, sessionId: resolvedSessionId });
}),
updateTopicMetadata: topicProcedure
.use(withScopedPermission('topic:update'))
.input(
z.object({
id: z.string(),
metadata: z.object({
boundDeviceId: z.string().optional(),
heteroSessionId: z.string().optional(),
model: z.string().optional(),
onboardingFeedback: z
.object({
comment: z.string().max(500).optional(),
rating: z.enum(['good', 'bad']),
submittedAt: z.string(),
})
.optional(),
onboardingSession: z
.object({
agentIdentityCompletedAt: z.string().optional(),
agentMarketplacePick: z
.object({
categoryHints: z.array(z.string()),
installedAgentIds: z.array(z.string()).optional(),
requestId: z.string(),
resolvedAt: z.string(),
selectedTemplateIds: z.array(z.string()).optional(),
skipReason: z.string().optional(),
skippedAgentIds: z.array(z.string()).optional(),
status: z.enum(['cancelled', 'skipped', 'submitted']),
})
.optional(),
discoveryCompletedAt: z.string().optional(),
finalAgentNames: z.array(z.string()).optional(),
finishedAt: z.string().optional(),
lastActiveAt: z.string().optional(),
phase: z.enum(['agent_identity', 'user_identity', 'discovery', 'summary']).optional(),
startedAt: z.string().optional(),
userIdentityCompletedAt: z.string().optional(),
version: z.number().optional(),
})
.optional(),
provider: z.string().optional(),
runningOperation: z
.object({
assistantMessageId: z.string(),
completionWebhook: z
.object({
body: z.record(z.unknown()).optional(),
delivery: z.enum(['fetch', 'qstash']).optional(),
url: z.string(),
})
.optional(),
operationId: z.string(),
scope: z.string().optional(),
threadId: z.string().nullable().optional(),
})
.nullable()
.optional(),
repos: z.array(z.string()).optional(),
workingDirectory: z.string().optional(),
}),
}),
)
.mutation(async ({ input, ctx }) => {
return ctx.topicModel.updateMetadata(input.id, input.metadata);
}),
});
export type TopicRouter = typeof topicRouter;