Files
lobe-chat/apps/server/src/routers/lambda/task.ts
T

1198 lines
40 KiB
TypeScript
Raw Normal View History

2026-04-23 02:10:45 +08:00
import { TASK_STATUSES } from '@lobechat/builtin-tool-task';
import type { TaskListItem, TaskParticipant } from '@lobechat/types';
2026-03-26 17:43:51 +08:00
import { TRPCError } from '@trpc/server';
2026-06-09 15:54:26 +08:00
import { and, eq, isNull } from 'drizzle-orm';
2026-03-26 17:43:51 +08:00
import { z } from 'zod';
2026-06-09 15:54:26 +08:00
import { withScopedPermission } from '@/business/server/trpc-middlewares/rbacPermission';
import { wsCompatProcedure } from '@/business/server/trpc-middlewares/workspaceAuth';
import { AgentModel } from '@/database/models/agent';
2026-03-26 17:43:51 +08:00
import { BriefModel } from '@/database/models/brief';
import { TaskModel } from '@/database/models/task';
import { TaskTopicModel } from '@/database/models/taskTopic';
2026-06-09 15:54:26 +08:00
import { TopicModel } from '@/database/models/topic';
import { workspaceMembers } from '@/database/schemas';
import { router } from '@/libs/trpc/lambda';
2026-03-26 17:43:51 +08:00
import { serverDatabase } from '@/libs/trpc/lambda/middleware';
import { EditLockService } from '@/server/services/editLock';
import { publishResourceEvent } from '@/server/services/resourceEvents';
2026-03-26 17:43:51 +08:00
import { TaskService } from '@/server/services/task';
2026-06-09 15:54:26 +08:00
import { TaskLifecycleService } from '@/server/services/taskLifecycle';
import { TaskRunnerService } from '@/server/services/taskRunner';
2026-06-09 15:54:26 +08:00
import { TransferErrorCode } from '@/types/transferError';
2026-03-26 17:43:51 +08:00
2026-06-09 15:54:26 +08:00
const taskProcedure = wsCompatProcedure.use(serverDatabase).use(async (opts) => {
2026-03-26 17:43:51 +08:00
const { ctx } = opts;
2026-06-09 15:54:26 +08:00
const wsId = ctx.workspaceId ?? undefined;
2026-03-26 17:43:51 +08:00
return opts.next({
ctx: {
2026-06-09 15:54:26 +08:00
agentModel: new AgentModel(ctx.serverDB, ctx.userId, wsId),
briefModel: new BriefModel(ctx.serverDB, ctx.userId, wsId),
editLockService: new EditLockService(ctx.userId),
2026-06-09 15:54:26 +08:00
taskLifecycle: new TaskLifecycleService(ctx.serverDB, ctx.userId, wsId),
taskModel: new TaskModel(ctx.serverDB, ctx.userId, wsId),
taskService: new TaskService(ctx.serverDB, ctx.userId, wsId),
taskTopicModel: new TaskTopicModel(ctx.serverDB, ctx.userId, wsId),
topicModel: new TopicModel(ctx.serverDB, ctx.userId, wsId),
2026-03-26 17:43:51 +08:00
},
});
});
2026-06-09 15:54:26 +08:00
// Write variant gates viewers out of every task mutation (create/update/delete/
// run). Reads keep using `taskProcedure` so viewers can still inspect tasks
// and their status.
const taskProcedureWrite = taskProcedure.use(withScopedPermission('agent:update'));
2026-03-26 17:43:51 +08:00
// All procedures that take an id accept either raw id (task_xxx) or identifier (TASK-1)
// Resolution happens in the model layer via model.resolve()
const idInput = z.object({ id: z.string() });
// Priority: 0=None, 1=Urgent, 2=High, 3=Normal, 4=Low
const createSchema = z.object({
assigneeAgentId: z.string().optional(),
assigneeUserId: z.string().optional(),
2026-05-09 13:07:15 +08:00
// Optional schedule wiring at create time. When `automationMode` is
// 'schedule', `schedulePattern` (cron) is required for the central
// schedule-dispatch sweep to pick the task up.
automationMode: z.enum(['heartbeat', 'schedule']).optional(),
2026-04-23 02:10:45 +08:00
createdByAgentId: z.string().optional(),
2026-03-26 17:43:51 +08:00
description: z.string().optional(),
editorData: z.unknown().optional(),
2026-03-26 17:43:51 +08:00
identifierPrefix: z.string().optional(),
instruction: z.string().min(1),
name: z.string().optional(),
parentTaskId: z.string().optional(),
priority: z.number().min(0).max(4).optional(),
2026-05-09 13:07:15 +08:00
schedulePattern: z.string().optional(),
scheduleTimezone: z.string().optional(),
2026-03-26 17:43:51 +08:00
});
const updateSchema = z.object({
assigneeAgentId: z.string().nullable().optional(),
assigneeUserId: z.string().nullable().optional(),
2026-04-23 02:10:45 +08:00
automationMode: z.enum(['heartbeat', 'schedule']).nullable().optional(),
2026-03-26 17:43:51 +08:00
config: z.record(z.unknown()).optional(),
context: z.record(z.unknown()).optional(),
description: z.string().optional(),
editorData: z.unknown().optional(),
// 0 clears the interval (disables heartbeat); any positive value must be
// ≥600s (10 min) to match the UI minimum and prevent sub-minute ticks if an
// LLM calls setTaskSchedule with a tiny number.
heartbeatInterval: z
.number()
.int()
.refine((v) => v === 0 || v >= 600, {
message: 'heartbeatInterval must be 0 (disabled) or at least 600 seconds (10 minutes)',
})
.optional(),
2026-03-26 17:43:51 +08:00
heartbeatTimeout: z.number().min(1).nullable().optional(),
instruction: z.string().optional(),
name: z.string().optional(),
parentTaskId: z.string().nullable().optional(),
2026-03-26 17:43:51 +08:00
priority: z.number().min(0).max(4).optional(),
schedulePattern: z.string().nullable().optional(),
scheduleTimezone: z.string().nullable().optional(),
2026-03-26 17:43:51 +08:00
});
const listSchema = z.object({
assigneeAgentId: z.string().optional(),
limit: z.number().min(1).max(100).default(50),
offset: z.number().min(0).default(0),
2026-04-23 02:10:45 +08:00
parentIdentifier: z.string().optional(),
2026-03-26 17:43:51 +08:00
parentTaskId: z.string().nullable().optional(),
2026-04-23 02:10:45 +08:00
priorities: z.array(z.number().min(0).max(4)).max(5).optional(),
statuses: z.array(z.enum(TASK_STATUSES)).max(10).optional(),
2026-03-26 17:43:51 +08:00
});
const groupListSchema = z.object({
assigneeAgentId: z.string().optional(),
groups: z
.array(
z.object({
key: z.string(),
limit: z.number().min(1).max(100).default(50),
offset: z.number().min(0).default(0),
statuses: z.array(z.string()).min(1).max(10),
}),
)
.min(1)
.max(10),
parentTaskId: z.string().nullable().optional(),
});
2026-03-26 17:43:51 +08:00
// Helper: resolve id/identifier and throw if not found
async function resolveOrThrow(model: TaskModel, id: string) {
const task = await model.resolve(id);
if (!task) throw new TRPCError({ code: 'NOT_FOUND', message: 'Task not found' });
return task;
}
2026-04-29 01:21:40 +08:00
async function assertAssigneeAgentBelongsToUser(
model: AgentModel,
assigneeAgentId?: string | null,
) {
if (!assigneeAgentId) return;
const exists = await model.existsById(assigneeAgentId);
if (!exists) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: 'Assignee agent not found',
});
}
}
async function resolveSafeParentTaskId(
model: TaskModel,
taskId: string,
parentTaskId: string | null,
): Promise<string | null> {
if (parentTaskId === null) return null;
const parent = await resolveOrThrow(model, parentTaskId);
if (parent.id === taskId) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: 'Task cannot be parented to itself',
});
}
const descendants = await model.findAllDescendants(taskId);
if (descendants.some((task) => task.id === parent.id)) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: 'Task cannot be parented to its own descendant',
});
}
return parent.id;
}
2026-03-26 17:43:51 +08:00
export const taskRouter = router({
2026-06-09 15:54:26 +08:00
reorderSubtasks: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(
z.object({
id: z.string(),
// Ordered list of subtask identifiers (e.g. ['TASK-2', 'TASK-4', 'TASK-3'])
order: z.array(z.string()),
}),
)
.mutation(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
const subtasks = await model.findSubtasks(task.id);
// Build identifier → id map
const idMap = new Map<string, string>();
for (const s of subtasks) idMap.set(s.identifier, s.id);
// Validate all identifiers exist
const reorderItems: Array<{ id: string; sortOrder: number }> = [];
for (let i = 0; i < input.order.length; i++) {
const identifier = input.order[i].toUpperCase();
const taskId = idMap.get(identifier);
if (!taskId) {
throw new TRPCError({
code: 'BAD_REQUEST',
message: `Subtask not found: ${identifier}`,
});
}
reorderItems.push({ id: taskId, sortOrder: i });
}
await model.reorder(reorderItems);
return {
data: reorderItems.map((item, i) => ({
identifier: input.order[i],
sortOrder: item.sortOrder,
})),
message: 'Subtasks reordered',
success: true,
};
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:reorderSubtasks]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to reorder subtasks',
});
}
}),
2026-06-09 15:54:26 +08:00
addComment: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(
z.object({
authorAgentId: z.string().optional(),
2026-03-26 17:43:51 +08:00
briefId: z.string().optional(),
content: z.string().min(1),
editorData: z.unknown().optional(),
2026-03-26 17:43:51 +08:00
id: z.string(),
topicId: z.string().optional(),
}),
)
.mutation(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
await assertAssigneeAgentBelongsToUser(ctx.agentModel, input.authorAgentId);
2026-03-26 17:43:51 +08:00
const comment = await model.addComment({
authorAgentId: input.authorAgentId,
authorUserId: input.authorAgentId ? undefined : ctx.userId,
2026-03-26 17:43:51 +08:00
briefId: input.briefId,
content: input.content,
editorData: input.editorData as never,
2026-03-26 17:43:51 +08:00
taskId: task.id,
topicId: input.topicId,
userId: ctx.userId,
});
return { data: comment, message: 'Comment added', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:addComment]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to add comment',
});
}
}),
2026-06-09 15:54:26 +08:00
deleteComment: taskProcedureWrite
2026-04-23 02:10:45 +08:00
.input(z.object({ commentId: z.string() }))
.mutation(async ({ input, ctx }) => {
try {
const deleted = await ctx.taskModel.deleteComment(input.commentId);
if (!deleted) {
throw new TRPCError({ code: 'NOT_FOUND', message: 'Comment not found' });
}
return { message: 'Comment deleted', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:deleteComment]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to delete comment',
});
}
}),
2026-06-09 15:54:26 +08:00
updateComment: taskProcedureWrite
.input(
z.object({
commentId: z.string(),
content: z.string().min(1),
editorData: z.unknown().optional(),
}),
)
2026-04-23 02:10:45 +08:00
.mutation(async ({ input, ctx }) => {
try {
const comment = await ctx.taskModel.updateComment(input.commentId, input.content, {
editorData: input.editorData,
});
2026-04-23 02:10:45 +08:00
if (!comment) {
throw new TRPCError({ code: 'NOT_FOUND', message: 'Comment not found' });
}
return { data: comment, message: 'Comment updated', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:updateComment]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to update comment',
});
}
}),
2026-06-09 15:54:26 +08:00
addDependency: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(
z.object({
dependsOnId: z.string(),
taskId: z.string(),
type: z.enum(['blocks', 'relates']).default('blocks'),
}),
)
.mutation(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.taskId);
const dep = await resolveOrThrow(model, input.dependsOnId);
await model.addDependency(task.id, dep.id, input.type);
return { message: 'Dependency added', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:addDependency]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to add dependency',
});
}
}),
2026-06-09 15:54:26 +08:00
cancelTopic: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(z.object({ topicId: z.string() }))
.mutation(async ({ input, ctx }) => {
try {
await ctx.taskService.cancelTopic(input.topicId);
2026-03-26 17:43:51 +08:00
return { message: 'Topic canceled', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:cancelTopic]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to cancel topic',
});
}
}),
2026-06-09 15:54:26 +08:00
deleteTopic: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(z.object({ topicId: z.string() }))
.mutation(async ({ input, ctx }) => {
try {
await ctx.taskService.deleteTopic(input.topicId);
2026-03-26 17:43:51 +08:00
return { message: 'Topic deleted', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:deleteTopic]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to delete topic',
});
}
}),
2026-06-09 15:54:26 +08:00
create: taskProcedureWrite.input(createSchema).mutation(async ({ input, ctx }) => {
2026-03-26 17:43:51 +08:00
try {
const task = await ctx.taskService.createTask(input);
2026-03-26 17:43:51 +08:00
return { data: task, message: 'Task created', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:create]', error);
2026-04-23 02:10:45 +08:00
const causeMessage = error instanceof Error ? error.message : String(error);
2026-03-26 17:43:51 +08:00
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
2026-04-23 02:10:45 +08:00
message: causeMessage ? `Failed to create task: ${causeMessage}` : 'Failed to create task',
2026-03-26 17:43:51 +08:00
});
}
}),
2026-06-09 15:54:26 +08:00
clearAll: taskProcedureWrite.mutation(async ({ ctx }) => {
2026-03-26 17:43:51 +08:00
try {
const model = ctx.taskModel;
const count = await model.deleteAll();
return { count, message: `${count} tasks deleted`, success: true };
} catch (error) {
console.error('[task:clearAll]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to clear tasks',
});
}
}),
2026-06-09 15:54:26 +08:00
delete: taskProcedureWrite.input(idInput).mutation(async ({ input, ctx }) => {
2026-03-26 17:43:51 +08:00
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
await model.delete(task.id);
2026-04-23 02:10:45 +08:00
return { data: task, message: 'Task deleted', success: true };
2026-03-26 17:43:51 +08:00
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:delete]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to delete task',
});
}
}),
detail: taskProcedure.input(idInput).query(async ({ input, ctx }) => {
try {
2026-04-23 02:10:45 +08:00
const detail = await ctx.taskService.getTaskDetail(input.id);
2026-03-26 17:43:51 +08:00
if (!detail) {
throw new TRPCError({ code: 'NOT_FOUND', message: 'Task not found' });
}
return { data: detail, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:detail]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to get task detail',
});
}
}),
find: taskProcedure.input(idInput).query(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
return { data: task, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:find]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to find task',
});
}
}),
getDependencies: taskProcedure.input(idInput).query(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
const deps = await model.getDependencies(task.id);
return { data: deps, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:getDependencies]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to get dependencies',
});
}
}),
getPinnedDocuments: taskProcedure.input(idInput).query(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
const docs = await model.getPinnedDocuments(task.id);
return { data: docs, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:getPinnedDocuments]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to get documents',
});
}
}),
getTopics: taskProcedure.input(idInput).query(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
const results = await ctx.taskTopicModel.findWithDetails(task.id);
return { data: results, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:getTopics]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to get task topics',
});
}
}),
getSubtasks: taskProcedure.input(idInput).query(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
const subtasks = await model.findSubtasks(task.id);
return { data: subtasks, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:getSubtasks]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to get subtasks',
});
}
}),
getTaskTree: taskProcedure.input(idInput).query(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
const tree = await model.getTaskTree(task.id);
return { data: tree, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:getTaskTree]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to get task tree',
});
}
}),
2026-06-09 15:54:26 +08:00
heartbeat: taskProcedureWrite.input(idInput).mutation(async ({ input, ctx }) => {
2026-03-26 17:43:51 +08:00
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
await model.updateHeartbeat(task.id);
return { message: 'Heartbeat updated', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:heartbeat]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to update heartbeat',
});
}
}),
2026-06-09 15:54:26 +08:00
watchdog: taskProcedureWrite.mutation(async ({ ctx }) => {
2026-03-26 17:43:51 +08:00
try {
const stuckTasks = await TaskModel.findStuckTasks(ctx.serverDB);
const failed: string[] = [];
for (const task of stuckTasks) {
2026-06-09 15:54:26 +08:00
const wsId = task.workspaceId ?? undefined;
const model = new TaskModel(ctx.serverDB, task.createdByUserId, wsId);
2026-03-26 17:43:51 +08:00
await model.updateStatus(task.id, 'failed', {
completedAt: new Date(),
error: 'Heartbeat timeout',
});
// Create error brief
2026-06-09 15:54:26 +08:00
const briefModel = new BriefModel(ctx.serverDB, task.createdByUserId, wsId);
2026-03-26 17:43:51 +08:00
await briefModel.create({
agentId: task.assigneeAgentId || undefined,
priority: 'urgent',
summary: `Task has been running without heartbeat update for more than ${task.heartbeatTimeout} seconds.`,
taskId: task.id,
title: `${task.identifier} heartbeat timeout`,
trigger: 'task',
2026-03-26 17:43:51 +08:00
type: 'error',
});
failed.push(task.identifier);
}
return {
checked: stuckTasks.length,
failed,
message:
failed.length > 0
? `${failed.length} stuck tasks marked as failed`
: 'No stuck tasks found',
success: true,
};
} catch (error) {
console.error('[task:watchdog]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Watchdog check failed',
});
}
}),
groupList: taskProcedure.input(groupListSchema).query(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const groups = await model.groupList(input);
return { data: groups, success: true };
} catch (error) {
console.error('[task:groupList]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to fetch grouped tasks',
});
}
}),
2026-03-26 17:43:51 +08:00
list: taskProcedure.input(listSchema).query(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
2026-04-23 02:10:45 +08:00
const { parentIdentifier, ...query } = input;
let parentTaskId = query.parentTaskId;
if (parentIdentifier) {
const parent = await model.resolve(parentIdentifier);
if (!parent) {
throw new TRPCError({
code: 'NOT_FOUND',
message: `Parent task not found: ${parentIdentifier}`,
});
}
parentTaskId = parent.id;
}
const result = await model.list({
...query,
parentTaskId,
});
const assigneeIds = [
...new Set(result.tasks.map((t) => t.assigneeAgentId).filter((id): id is string => !!id)),
];
const agents =
assigneeIds.length > 0 ? await ctx.agentModel.getAgentAvatarsByIds(assigneeIds) : [];
const agentMap = new Map(agents.map((a) => [a.id, a]));
const data: TaskListItem[] = result.tasks.map((task) => {
const participants: TaskParticipant[] = [];
if (task.assigneeAgentId) {
const agent = agentMap.get(task.assigneeAgentId);
if (agent) {
participants.push({
avatar: agent.avatar,
backgroundColor: agent.backgroundColor,
id: agent.id,
title: agent.title ?? '',
type: 'agent',
});
}
}
return { ...task, participants };
});
return { data, success: true, total: result.total };
2026-03-26 17:43:51 +08:00
} catch (error) {
2026-04-23 02:10:45 +08:00
if (error instanceof TRPCError) throw error;
2026-03-26 17:43:51 +08:00
console.error('[task:list]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to list tasks',
});
}
}),
2026-06-09 15:54:26 +08:00
run: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(
idInput.merge(
z.object({
continueTopicId: z.string().optional(),
prompt: z.string().optional(),
}),
),
)
.mutation(async ({ input, ctx }) => {
try {
2026-06-09 15:54:26 +08:00
const runner = new TaskRunnerService(
ctx.serverDB,
ctx.userId,
ctx.workspaceId ?? undefined,
);
return await runner.runTask({
continueTopicId: input.continueTopicId,
extraPrompt: input.prompt,
taskId: input.id,
2026-03-26 17:43:51 +08:00
});
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:run]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to run task',
});
}
}),
2026-06-09 15:54:26 +08:00
pinDocument: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(
z.object({
documentId: z.string(),
pinnedBy: z.string().default('user'),
taskId: z.string(),
}),
)
.mutation(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.taskId);
await model.pinDocument(task.id, input.documentId, input.pinnedBy);
return { message: 'Document pinned', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:pinDocument]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to pin document',
});
}
}),
2026-06-09 15:54:26 +08:00
removeDependency: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(z.object({ dependsOnId: z.string(), taskId: z.string() }))
.mutation(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.taskId);
const dep = await resolveOrThrow(model, input.dependsOnId);
await model.removeDependency(task.id, dep.id);
return { message: 'Dependency removed', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:removeDependency]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to remove dependency',
});
}
}),
2026-06-09 15:54:26 +08:00
unpinDocument: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(z.object({ documentId: z.string(), taskId: z.string() }))
.mutation(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.taskId);
await model.unpinDocument(task.id, input.documentId);
return { message: 'Document unpinned', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:unpinDocument]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to unpin document',
});
}
}),
getCheckpoint: taskProcedure.input(idInput).query(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
const checkpoint = model.getCheckpointConfig(task);
return { data: checkpoint, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:getCheckpoint]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to get checkpoint',
});
}
}),
2026-06-09 15:54:26 +08:00
updateCheckpoint: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(
idInput.merge(
z.object({
checkpoint: z.object({
onAgentRequest: z.boolean().optional(),
tasks: z
.object({
afterIds: z.array(z.string()).optional(),
beforeIds: z.array(z.string()).optional(),
})
.optional(),
topic: z
.object({
after: z.boolean().optional(),
before: z.boolean().optional(),
})
.optional(),
}),
}),
),
)
.mutation(async ({ input, ctx }) => {
const { id, checkpoint } = input;
try {
const model = ctx.taskModel;
const resolved = await resolveOrThrow(model, id);
const task = await model.updateCheckpointConfig(resolved.id, checkpoint);
if (!task) throw new TRPCError({ code: 'NOT_FOUND', message: 'Task not found' });
return {
data: model.getCheckpointConfig(task),
message: 'Checkpoint updated',
success: true,
};
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:updateCheckpoint]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to update checkpoint',
});
}
}),
getReview: taskProcedure.input(idInput).query(async ({ input, ctx }) => {
try {
const model = ctx.taskModel;
const task = await resolveOrThrow(model, input.id);
return { data: model.getReviewConfig(task) || null, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:getReview]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to get review config',
});
}
}),
2026-06-09 15:54:26 +08:00
updateReview: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(
idInput.merge(
z.object({
review: z.object({
autoRetry: z.boolean().default(true),
enabled: z.boolean(),
judge: z
.object({
model: z.string().optional(),
provider: z.string().optional(),
})
.default({}),
maxIterations: z.number().min(1).max(10).default(3),
rubrics: z.array(
z.object({
config: z.record(z.unknown()),
extractor: z.record(z.unknown()).optional(),
id: z.string(),
name: z.string(),
threshold: z.number().min(0).max(1).optional(),
type: z.string(),
weight: z.number().default(1),
}),
),
}),
}),
),
)
.mutation(async ({ input, ctx }) => {
const { id, review } = input;
try {
const model = ctx.taskModel;
const resolved = await resolveOrThrow(model, id);
const task = await model.updateReviewConfig(resolved.id, review);
if (!task) throw new TRPCError({ code: 'NOT_FOUND', message: 'Task not found' });
return {
data: model.getReviewConfig(task),
message: 'Review config updated',
success: true,
};
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:updateReview]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to update review config',
});
}
}),
2026-06-09 15:54:26 +08:00
runReview: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(
idInput.merge(
z.object({
content: z.string().optional(),
topicId: z.string().optional(),
}),
),
)
.mutation(async ({ input, ctx }) => {
try {
const result = await ctx.taskService.runReview(input);
2026-03-26 17:43:51 +08:00
return { data: result, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:runReview]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to run review',
});
}
}),
2026-06-09 15:54:26 +08:00
update: taskProcedureWrite.input(idInput.merge(updateSchema)).mutation(async ({ input, ctx }) => {
const { id, parentTaskId, ...data } = input;
2026-03-26 17:43:51 +08:00
try {
const model = ctx.taskModel;
2026-04-29 01:21:40 +08:00
await assertAssigneeAgentBelongsToUser(ctx.agentModel, data.assigneeAgentId);
2026-03-26 17:43:51 +08:00
const resolved = await resolveOrThrow(model, id);
// Collaborative edit lock: reject writes to a workspace task another member
// is actively editing. Inert until a client acquires the lock.
if (ctx.workspaceId) {
const blockedBy = await ctx.editLockService.getBlockingHolder('task', resolved.id);
if (blockedBy) {
throw new TRPCError({
cause: { data: { code: 'DocumentLocked' } },
code: 'CONFLICT',
message: 'Task is being edited by another user',
});
}
}
const resolvedParentTaskId =
parentTaskId === undefined
? undefined
: await resolveSafeParentTaskId(model, resolved.id, parentTaskId);
const updateData =
parentTaskId === undefined ? data : { ...data, parentTaskId: resolvedParentTaskId };
const task = await model.update(resolved.id, updateData);
2026-03-26 17:43:51 +08:00
if (!task) throw new TRPCError({ code: 'NOT_FOUND', message: 'Task not found' });
return { data: task, message: 'Task updated', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:update]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to update task',
});
}
}),
acquireTaskLock: taskProcedureWrite.input(idInput).mutation(async ({ ctx, input }) => {
if (!ctx.workspaceId) return { expiresAt: null, holderId: null, lockedByOther: false };
const resolved = await resolveOrThrow(ctx.taskModel, input.id);
const prev = await ctx.editLockService.getActiveHolder('task', resolved.id);
const result = await ctx.editLockService.acquire('task', resolved.id);
if ((result.holderId ?? null) !== (prev ?? null)) {
void publishResourceEvent(
{ id: resolved.id, type: 'task' },
{ actorId: ctx.userId, data: { holderId: result.holderId }, type: 'lock.changed' },
);
}
return result;
}),
getTaskLock: taskProcedureWrite.input(idInput).query(async ({ ctx, input }) => {
if (!ctx.workspaceId) return { expiresAt: null, holderId: null, lockedByOther: false };
const resolved = await resolveOrThrow(ctx.taskModel, input.id);
const holder = await ctx.editLockService.getActiveHolder('task', resolved.id);
return {
expiresAt: null,
holderId: holder ?? null,
lockedByOther: Boolean(holder) && holder !== ctx.userId,
};
}),
releaseTaskLock: taskProcedureWrite.input(idInput).mutation(async ({ ctx, input }) => {
if (!ctx.workspaceId) return;
const resolved = await resolveOrThrow(ctx.taskModel, input.id);
// Only broadcast "unlocked" when we actually released our own lock — if the
// lease expired and another member took over, the lock is still held.
const released = await ctx.editLockService.release('task', resolved.id);
if (!released) return;
void publishResourceEvent(
{ id: resolved.id, type: 'task' },
{ actorId: ctx.userId, data: { holderId: null }, type: 'lock.changed' },
);
}),
2026-06-09 15:54:26 +08:00
updateConfig: taskProcedureWrite
.input(idInput.merge(z.object({ config: z.record(z.unknown()) })))
.mutation(async ({ input, ctx }) => {
const { id, config } = input;
try {
const model = ctx.taskModel;
const resolved = await resolveOrThrow(model, id);
const task = await model.updateTaskConfig(resolved.id, config);
if (!task) throw new TRPCError({ code: 'NOT_FOUND', message: 'Task not found' });
return { data: task, message: 'Config updated', success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:updateConfig]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to update task config',
});
}
}),
previewSubtaskLayers: taskProcedure.input(idInput).query(async ({ input, ctx }) => {
try {
const plan = await ctx.taskService.previewSubtaskLayers(input.id);
return { data: plan, success: true };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:previewSubtaskLayers]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to plan subtask layers',
});
}
}),
2026-06-09 15:54:26 +08:00
runReadySubtasks: taskProcedureWrite.input(idInput).mutation(async ({ input, ctx }) => {
try {
const result = await ctx.taskService.runReadySubtasks(input.id);
return { data: result, success: result.failed.length === 0 };
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:runReadySubtasks]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to run subtasks',
});
}
}),
2026-06-09 15:54:26 +08:00
updateStatus: taskProcedureWrite
2026-03-26 17:43:51 +08:00
.input(
z.object({
error: z.string().optional(),
id: z.string(),
2026-04-23 02:10:45 +08:00
status: z.enum(TASK_STATUSES),
2026-03-26 17:43:51 +08:00
}),
)
.mutation(async ({ input, ctx }) => {
try {
const result = await ctx.taskService.updateStatus(input);
const { task, unlocked, paused, checkpointTriggered, allSubtasksDone, parentTaskId } =
result;
2026-03-26 17:43:51 +08:00
return {
data: task,
message: `Task ${input.status}`,
2026-03-26 17:43:51 +08:00
success: true,
...(unlocked.length > 0 && { unlocked }),
...(paused.length > 0 && { paused }),
...(checkpointTriggered && { checkpointTriggered: true }),
...(allSubtasksDone && { allSubtasksDone: true, parentTaskId }),
2026-03-26 17:43:51 +08:00
};
} catch (error) {
if (error instanceof TRPCError) throw error;
console.error('[task:updateStatus]', error);
throw new TRPCError({
cause: error,
code: 'INTERNAL_SERVER_ERROR',
message: 'Failed to update status',
});
}
}),
2026-06-09 15:54:26 +08:00
transferTask: taskProcedureWrite
.input(
z.object({
targetWorkspaceId: z.string().nullable(),
taskId: z.string(),
}),
)
.mutation(async ({ ctx, input }) => {
const task = await ctx.taskModel.resolve(input.taskId);
if (!task)
throw new TRPCError({
cause: { data: { code: TransferErrorCode.ResourceNotFound } },
code: 'NOT_FOUND',
message: 'Task not found',
});
if (ctx.workspaceId && task.createdByUserId !== ctx.userId) {
const [membership] = await ctx.serverDB
.select({ role: workspaceMembers.role })
.from(workspaceMembers)
.where(
and(
eq(workspaceMembers.workspaceId, ctx.workspaceId),
eq(workspaceMembers.userId, ctx.userId),
isNull(workspaceMembers.deletedAt),
),
)
.limit(1);
if (!membership || membership.role !== 'owner') {
throw new TRPCError({
cause: { data: { code: TransferErrorCode.OwnerOnly } },
code: 'FORBIDDEN',
message: 'Only workspace owners can transfer tasks created by others',
});
}
}
if (input.targetWorkspaceId === (ctx.workspaceId ?? null)) {
throw new TRPCError({
cause: { data: { code: TransferErrorCode.SameWorkspace } },
code: 'BAD_REQUEST',
message: 'Cannot transfer task to the same workspace',
});
}
if (input.targetWorkspaceId) {
const [targetMembership] = await ctx.serverDB
.select({ role: workspaceMembers.role })
.from(workspaceMembers)
.where(
and(
eq(workspaceMembers.workspaceId, input.targetWorkspaceId),
eq(workspaceMembers.userId, ctx.userId),
isNull(workspaceMembers.deletedAt),
),
)
.limit(1);
if (!targetMembership || targetMembership.role === 'viewer') {
throw new TRPCError({
cause: { data: { code: TransferErrorCode.TargetNoWriteAccess } },
code: 'FORBIDDEN',
message: 'No write access to target workspace',
});
}
}
return ctx.taskModel.transferTo(task.id, input.targetWorkspaceId, ctx.userId);
}),
copyTaskToWorkspace: taskProcedureWrite
.input(
z.object({
targetWorkspaceId: z.string().nullable(),
taskId: z.string(),
}),
)
.mutation(async ({ ctx, input }) => {
const task = await ctx.taskModel.resolve(input.taskId);
if (!task)
throw new TRPCError({
cause: { data: { code: TransferErrorCode.ResourceNotFound } },
code: 'NOT_FOUND',
message: 'Task not found',
});
if (input.targetWorkspaceId) {
const [targetMembership] = await ctx.serverDB
.select({ role: workspaceMembers.role })
.from(workspaceMembers)
.where(
and(
eq(workspaceMembers.workspaceId, input.targetWorkspaceId),
eq(workspaceMembers.userId, ctx.userId),
isNull(workspaceMembers.deletedAt),
),
)
.limit(1);
if (!targetMembership || targetMembership.role === 'viewer') {
throw new TRPCError({
cause: { data: { code: TransferErrorCode.TargetNoWriteAccess } },
code: 'FORBIDDEN',
message: 'No write access to target workspace',
});
}
}
return ctx.taskModel.copyToWorkspace(task.id, input.targetWorkspaceId, ctx.userId);
}),
2026-03-26 17:43:51 +08:00
});