Files
lobe-chat/src/server/routers/lambda/__tests__/integration/task.integration.test.ts
T
Arvin Xu 093fa7bcae feat: support agent tasks system (#13289)
*  feat: agent task system — CLI, review rubrics, workspace, comments, brief tool split

support import md

Major changes:
- Split task CLI into modular files (task/, lifecycle, topic, doc, review, checkpoint, dep)
- Split builtin-tool-task into task + brief tools (conditional injection)
- Task review uses EvalBenchmarkRubric from @lobechat/eval-rubric
- Task workspace: documents auto-pin via Notebook, tree view with folders
- Task comments system (task_comments table)
- Task topics: dedicated TaskTopicModel with userId, handoff fields, review results
- Heartbeat timeout auto-detection in detail API
- Run idempotency (reject duplicate runs) + error rollback
- Topic cancel/delete by topicId only (no taskId needed)
- Integration tests for task router (13 tests)
- interruptOperation fix (string param, not object)
- Global TRPC error handler in CLI

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

task document workflow

task handoff loop

🗃️ chore: consolidate task system migrations into single 0095

Merged 7 separate migrations (0095-0101) into one:
- tasks, briefs, task_comments, task_dependencies, task_documents, task_topics tables
- All fields including sort_order, resolved_action/comment, review fields
- Idempotent CREATE TABLE IF NOT EXISTS, DROP/ADD CONSTRAINT, CREATE INDEX IF NOT EXISTS

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

fix interruptOperation

topic auto review workflow

topic handoff workflow

finish run topic and brief workflow

support task tool

improve task schema

update

 feat: add onComplete hook to task.run for completion callbacks

When agent execution completes, the hook:
- Updates task heartbeat
- Creates a result Brief (on success) with assistant content summary
- Creates an error Brief (on failure) with error message
- Supports both local (handler) and production (webhook) modes

Uses the new Agent Runtime Hooks system instead of raw stepCallbacks.

LOBE-6160 LOBE-6208

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

 feat: add Review system — LLM-as-Judge automated review

Task review uses an independent LLM call to evaluate topic output
quality against configurable criteria with pass/fail thresholds.

- TaskReviewService: structured LLM review via generateObject,
  auto-resolves model/provider from user's system agent defaults
- Model: getReviewConfig, updateReviewConfig on TaskModel
- Router: getReview, updateReview, runReview procedures
- CLI: `task review set/view/run` commands
- Auto-creates Brief with review results

LOBE-6165

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

 feat: add TaskScheduler, multi-topic execution, and handoff context

- TaskScheduler: interface + Local implementation (setTimeout-based),
  following QueueService dual-mode pattern
- Multi-topic execution: `task run --topics N --delay S` runs N topics
  in sequence with optional delay between them
- Handoff context: buildTaskPrompt() queries previous topics by
  metadata.taskId and injects handoff summaries into the next topic's
  prompt (sliding window: latest full, older summaries only)
- Heartbeat auto-update between topics

LOBE-6161

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

 feat: add Heartbeat watchdog + heartbeat CLI

Watchdog scans running tasks with expired heartbeats, marks them as
failed, and creates urgent error Briefs. Heartbeat CLI allows manual
heartbeat reporting for testing.

- Model: refactored to use Drizzle operators (isNull, isNotNull, ne)
  instead of raw SQL where possible; fixed findStuckTasks to skip
  tasks without heartbeat data
- Router: heartbeat (manual report), watchdog (scan + fail + brief)
- Router: updateSchema now includes heartbeatInterval, heartbeatTimeout
- CLI: `task heartbeat <id>`, `task watchdog`, `task edit` with
  --heartbeat-timeout, --heartbeat-interval, --description

LOBE-6161

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

♻️ refactor: move CheckpointConfig to @lobechat/types

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

 feat: add task run — trigger agent execution for tasks

Task.run creates a topic, triggers AiAgentService.execAgent with task
context, and streams results via SSE. Supports both agentId and slug.

- Service: added taskId to ExecAgentParams, included in topic metadata
- Router: task.run procedure — resolves agent, builds prompt, calls execAgent,
  updates topic count and heartbeat
- CLI: `task run <id>` command with SSE streaming, --prompt, --verbose

LOBE-6160

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

 feat: add Checkpoint system for task review gates

Checkpoint allows configuring pause points in task execution flow.
Supports beforeIds (pause before subtask starts) and afterIds (pause
after subtask completes) on parent tasks.

- Model: CheckpointConfig type, getCheckpointConfig, updateCheckpointConfig,
  shouldPauseBeforeStart, shouldPauseAfterComplete
- Router: getCheckpoint, updateCheckpoint procedures; integrated with
  updateStatus for automatic checkpoint triggering
- CLI: `task checkpoint view/set` commands with --before, --after,
  --topic-before, --topic-after, --on-agent-request options
- Tests: 3 new checkpoint tests (37 total)

LOBE-6162

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

 feat: add dependency unlocking on task completion

When a task completes, automatically check and unlock blocked tasks
whose dependencies are all satisfied (backlog → running). Also notify
when all subtasks of a parent are completed.

- Model: getUnlockedTasks, areAllSubtasksCompleted (Drizzle, no raw SQL)
- Router: updateStatus hook triggers unlocking on completion
- CLI: shows unlocked tasks and parent completion notification
- Tests: 3 new tests (34 total)

LOBE-6164

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

 feat: add Brief system — schema, model, router, CLI

Brief is a universal Agent-to-User reporting mechanism, not limited to
Tasks. CronJobs, Agents, and future systems can all produce Briefs.

- Schema: briefs table with polymorphic source (taskId, cronJobId, agentId)
- Model: BriefModel with CRUD, listUnresolved (Daily Brief), markRead, resolve
- Router: TRPC brief router with taskId identifier resolution
- CLI: `lh brief` command (list/view/read/resolve)
- Tests: 11 model tests
- Migration: 0096_add_briefs_table.sql

LOBE-6163

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

 feat: add Task system — schema, model, router, CLI

Implement the foundational Task system for managing long-running,
multi-topic agent tasks with subtask trees and dependency chains.

- Schema: tasks, task_dependencies, task_documents tables
- Model: TaskModel with CRUD, tree queries, heartbeat, dependencies, document pinning
- Router: TRPC task router with identifier/id resolution
- CLI: `lh task` command (list/view/create/edit/delete/start/pause/resume/complete/cancel/tree/dep)
- Tests: 31 model tests
- Migration: 0095_add_task_tables.sql

LOBE-6036 LOBE-6054

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* update

* 🐛 fix: update brief model import path and add raw-md vitest plugin

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: eslint import sort in vitest config

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: brief ID validation, auto-review retry, and continueTopicId operationId

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: task integration tests — create test agent for FK, fix children spread

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: task integration tests — correct identifier prefix and agent ID

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: remove unused toolsActivatorRuntime import

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: create real topic in task integration tests to satisfy FK constraint

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: type errors in task prompt tests, handoff schema, and activity mapping

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* 🐛 fix: create real agent/topic/brief records in database model tests for FK constraints

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 17:43:51 +08:00

386 lines
11 KiB
TypeScript

// @vitest-environment node
import { type LobeChatDatabase } from '@lobechat/database';
import { getTestDB } from '@lobechat/database/test-utils';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { taskRouter } from '../../task';
import {
cleanupTestUser,
createTestAgent,
createTestContext,
createTestTopic,
createTestUser,
} from './setup';
// Mock getServerDB
let testDB: LobeChatDatabase;
vi.mock('@/database/core/db-adaptor', () => ({
getServerDB: vi.fn(() => testDB),
}));
// Mock AiAgentService
const mockExecAgent = vi.fn().mockResolvedValue({
operationId: 'op_test',
success: true,
topicId: 'tpc_test',
});
const mockInterruptTask = vi.fn().mockResolvedValue({ success: true });
vi.mock('@/server/services/aiAgent', () => ({
AiAgentService: vi.fn().mockImplementation(() => ({
execAgent: mockExecAgent,
interruptTask: mockInterruptTask,
})),
}));
// Mock TaskLifecycleService
vi.mock('@/server/services/taskLifecycle', () => ({
TaskLifecycleService: vi.fn().mockImplementation(() => ({
onTopicComplete: vi.fn(),
})),
}));
// Mock TaskReviewService
vi.mock('@/server/services/taskReview', () => ({
TaskReviewService: vi.fn().mockImplementation(() => ({
review: vi.fn(),
})),
}));
// Mock initModelRuntimeFromDB
vi.mock('@/server/modules/ModelRuntime', () => ({
initModelRuntimeFromDB: vi.fn(),
}));
describe('Task Router Integration', () => {
let serverDB: LobeChatDatabase;
let userId: string;
let testAgentId: string;
let testTopicId: string;
let caller: ReturnType<typeof taskRouter.createCaller>;
beforeEach(async () => {
vi.clearAllMocks();
serverDB = await getTestDB();
testDB = serverDB;
userId = await createTestUser(serverDB);
testAgentId = await createTestAgent(serverDB, userId, 'agt_test');
testTopicId = await createTestTopic(serverDB, userId, 'tpc_test');
// Update mock to return the real topic ID
mockExecAgent.mockResolvedValue({
operationId: 'op_test',
success: true,
topicId: testTopicId,
});
caller = taskRouter.createCaller(createTestContext(userId));
});
afterEach(async () => {
await cleanupTestUser(serverDB, userId);
});
describe('create + find + detail', () => {
it('should create a task and retrieve it', async () => {
const result = await caller.create({
instruction: 'Write a book',
name: 'Write Book',
});
expect(result.data.identifier).toBe('T-1');
expect(result.data.name).toBe('Write Book');
expect(result.data.status).toBe('backlog');
// find
const found = await caller.find({ id: 'T-1' });
expect(found.data.id).toBe(result.data.id);
// detail
const detail = await caller.detail({ id: 'T-1' });
expect(detail.data.identifier).toBe('T-1');
expect(detail.data.subtasks).toHaveLength(0);
expect(detail.data.activities).toBeUndefined();
});
});
describe('subtasks + dependencies', () => {
it('should create subtasks and set dependencies', async () => {
const parent = await caller.create({
instruction: 'Write a book',
name: 'Book',
});
const ch1 = await caller.create({
instruction: 'Write chapter 1',
name: 'Chapter 1',
parentTaskId: parent.data.id,
});
const ch2 = await caller.create({
instruction: 'Write chapter 2',
name: 'Chapter 2',
parentTaskId: parent.data.id,
});
// Add dependency: ch2 blocks on ch1
await caller.addDependency({
dependsOnId: ch1.data.id,
taskId: ch2.data.id,
});
const detail = await caller.detail({ id: parent.data.identifier });
expect(detail.data.subtasks).toHaveLength(2);
// ch2 should have blockedBy pointing to ch1's identifier
const ch2Sub = detail.data.subtasks!.find((s) => s.name === 'Chapter 2');
expect(ch2Sub?.blockedBy).toBeTruthy();
});
});
describe('status transitions', () => {
it('should transition backlog → running → paused → completed', async () => {
const task = await caller.create({ instruction: 'Test' });
// backlog → running
const running = await caller.updateStatus({
id: task.data.id,
status: 'running',
});
expect(running.data.status).toBe('running');
// running → paused
const paused = await caller.updateStatus({
id: task.data.id,
status: 'paused',
});
expect(paused.data.status).toBe('paused');
// paused → completed
const completed = await caller.updateStatus({
id: task.data.id,
status: 'completed',
});
expect(completed.data.status).toBe('completed');
});
});
describe('comments', () => {
it('should add and retrieve comments', async () => {
const task = await caller.create({ instruction: 'Test' });
await caller.addComment({
content: 'First comment',
id: task.data.id,
});
await caller.addComment({
content: 'Second comment',
id: task.data.id,
});
const detail = await caller.detail({ id: task.data.identifier });
const commentActivities = detail.data.activities?.filter((a) => a.type === 'comment');
expect(commentActivities).toHaveLength(2);
expect(commentActivities?.[0].content).toBe('First comment');
});
});
describe('review config', () => {
it('should set and retrieve review rubrics', async () => {
const task = await caller.create({ instruction: 'Test' });
await caller.updateReview({
id: task.data.id,
review: {
autoRetry: true,
enabled: true,
maxIterations: 3,
rubrics: [
{
config: { criteria: '内容准确性' },
id: 'r1',
name: '准确性',
threshold: 0.8,
type: 'llm-rubric',
weight: 1,
},
{
config: { value: '```' },
id: 'r2',
name: '包含代码',
type: 'contains',
weight: 1,
},
],
},
});
const review = await caller.getReview({ id: task.data.id });
expect(review.data!.enabled).toBe(true);
expect(review.data!.rubrics).toHaveLength(2);
expect(review.data!.rubrics[0].type).toBe('llm-rubric');
});
});
describe('run idempotency', () => {
it('should reject run when a topic is already running', async () => {
const task = await caller.create({
assigneeAgentId: testAgentId,
instruction: 'Test',
});
// First run succeeds
await caller.run({ id: task.data.id });
// Second run should fail with CONFLICT
await expect(caller.run({ id: task.data.id })).rejects.toThrow(/already has a running topic/);
});
it('should reject continue on already running topic', async () => {
const task = await caller.create({
assigneeAgentId: testAgentId,
instruction: 'Test',
});
const result = await caller.run({ id: task.data.id });
await expect(caller.run({ continueTopicId: 'tpc_test', id: task.data.id })).rejects.toThrow(
/already running/,
);
});
});
describe('run error rollback', () => {
it('should rollback task status to paused on run failure', async () => {
mockExecAgent.mockRejectedValueOnce(new Error('LLM failed'));
const task = await caller.create({
assigneeAgentId: testAgentId,
instruction: 'Test',
});
await expect(caller.run({ id: task.data.id })).rejects.toThrow();
// Task should be rolled back to paused with error
const found = await caller.find({ id: task.data.id });
expect(found.data.status).toBe('paused');
expect(found.data.error).toContain('LLM failed');
});
});
describe('clearAll', () => {
it('should delete all tasks for user', async () => {
await caller.create({ instruction: 'Task 1' });
await caller.create({ instruction: 'Task 2' });
await caller.create({ instruction: 'Task 3' });
const result = await caller.clearAll();
expect(result.count).toBe(3);
const list = await caller.list({});
expect(list.data).toHaveLength(0);
});
});
describe('cancelTopic', () => {
it('should cancel a running topic and pause task', async () => {
const task = await caller.create({
assigneeAgentId: testAgentId,
instruction: 'Test',
});
await caller.run({ id: task.data.id });
// Cancel the topic
await caller.cancelTopic({ topicId: 'tpc_test' });
// Task should be paused
const found = await caller.find({ id: task.data.id });
expect(found.data.status).toBe('paused');
});
it('should reject cancel on non-running topic', async () => {
const task = await caller.create({
assigneeAgentId: testAgentId,
instruction: 'Test',
});
await caller.run({ id: task.data.id });
await caller.cancelTopic({ topicId: 'tpc_test' });
// Try to cancel again — should fail
await expect(caller.cancelTopic({ topicId: 'tpc_test' })).rejects.toThrow(/not running/);
});
});
describe('workspace documents', () => {
it('should pin and show documents in detail', async () => {
const task = await caller.create({ instruction: 'Test' });
// Create a document via the documents table directly
const { documents } = await import('@/database/schemas');
const [doc] = await serverDB
.insert(documents)
.values({
content: 'Test content',
fileType: 'markdown',
source: 'test',
sourceType: 'api',
title: 'Test Doc',
totalCharCount: 12,
totalLineCount: 1,
userId,
})
.returning();
// Pin to task
await caller.pinDocument({
documentId: doc.id,
pinnedBy: 'user',
taskId: task.data.id,
});
// Check detail workspace
const detail = await caller.detail({ id: task.data.identifier });
expect(detail.data.workspace).toBeDefined();
// Document should appear somewhere in the workspace tree
const allDocs = detail.data.workspace!.flatMap((f) => [
{ documentId: f.documentId, title: f.title },
...(f.children ?? []),
]);
expect(allDocs.find((d) => d.documentId === doc.id)?.title).toBe('Test Doc');
// Unpin
await caller.unpinDocument({
documentId: doc.id,
taskId: task.data.id,
});
const detail2 = await caller.detail({ id: task.data.identifier });
expect(detail2.data.workspace).toBeUndefined();
});
});
describe('heartbeat timeout detection', () => {
it('should auto-detect timeout on detail and pause task', async () => {
const task = await caller.create({
assigneeAgentId: testAgentId,
instruction: 'Test',
});
// Start running with very short timeout
await caller.update({
heartbeatTimeout: 1,
id: task.data.id,
});
await caller.run({ id: task.data.id });
// Wait for timeout
await new Promise((r) => setTimeout(r, 1500));
// detail should auto-detect timeout and pause
const detail = await caller.detail({ id: task.data.identifier });
expect(detail.data.status).toBe('paused');
// Verify stale timeout error gets cleared via find
const found = await caller.find({ id: task.data.id });
expect(found.data.error).toBeNull();
});
});
});