mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-14 03:30:19 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d5b06e080c | |||
| f770246f45 | |||
| 6021bb713c | |||
| 93c471fbb5 |
@@ -4,13 +4,7 @@ import type { Command } from 'commander';
|
||||
import pc from 'picocolors';
|
||||
|
||||
import { getTrpcClient } from '../api/client';
|
||||
import { getAgentStreamAuthInfo } from '../api/http';
|
||||
import { resolveAgentGatewayUrl } from '../settings';
|
||||
import {
|
||||
replayAgentEvents,
|
||||
streamAgentEvents,
|
||||
streamAgentEventsViaWebSocket,
|
||||
} from '../utils/agentStream';
|
||||
import { followAgentStream, replayAgentEvents } from '../utils/agentStream';
|
||||
import { resolveLocalDeviceId } from '../utils/device';
|
||||
import { confirm, outputJson, printTable, truncate } from '../utils/format';
|
||||
import { log, setVerbose } from '../utils/logger';
|
||||
@@ -355,25 +349,11 @@ export function registerAgentCommand(program: Command) {
|
||||
}
|
||||
|
||||
// 2. Connect to stream (WebSocket via Gateway, or fallback to SSE)
|
||||
const { serverUrl, headers } = await getAgentStreamAuthInfo();
|
||||
const agentGatewayUrl = options.sse ? undefined : resolveAgentGatewayUrl();
|
||||
|
||||
if (agentGatewayUrl) {
|
||||
const token = headers['Oidc-Auth'] || headers['X-API-Key'] || '';
|
||||
await streamAgentEventsViaWebSocket({
|
||||
gatewayUrl: agentGatewayUrl,
|
||||
json: options.json,
|
||||
operationId,
|
||||
token,
|
||||
verbose: options.verbose,
|
||||
});
|
||||
} else {
|
||||
const streamUrl = `${serverUrl}/api/agent/stream?operationId=${encodeURIComponent(operationId)}`;
|
||||
await streamAgentEvents(streamUrl, headers, {
|
||||
json: options.json,
|
||||
verbose: options.verbose,
|
||||
});
|
||||
}
|
||||
await followAgentStream(operationId, {
|
||||
json: options.json,
|
||||
sse: options.sse,
|
||||
verbose: options.verbose,
|
||||
});
|
||||
},
|
||||
);
|
||||
|
||||
|
||||
@@ -2,8 +2,7 @@ import type { Command } from 'commander';
|
||||
import pc from 'picocolors';
|
||||
|
||||
import { getTrpcClient } from '../../api/client';
|
||||
import { getAuthInfo } from '../../api/http';
|
||||
import { streamAgentEvents } from '../../utils/agentStream';
|
||||
import { followAgentStream } from '../../utils/agentStream';
|
||||
import { log } from '../../utils/logger';
|
||||
|
||||
export function registerLifecycleCommands(task: Command) {
|
||||
@@ -71,10 +70,7 @@ export function registerLifecycleCommands(task: Command) {
|
||||
return;
|
||||
}
|
||||
|
||||
const { serverUrl, headers } = await getAuthInfo();
|
||||
const streamUrl = `${serverUrl}/api/agent/stream?operationId=${encodeURIComponent(result.operationId)}`;
|
||||
|
||||
await streamAgentEvents(streamUrl, headers, {
|
||||
await followAgentStream(result.operationId, {
|
||||
json: options.json,
|
||||
verbose: options.verbose,
|
||||
});
|
||||
@@ -165,11 +161,8 @@ export function registerLifecycleCommands(task: Command) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Connect to SSE stream and wait for completion
|
||||
const { serverUrl, headers } = await getAuthInfo();
|
||||
const streamUrl = `${serverUrl}/api/agent/stream?operationId=${encodeURIComponent(operationId)}`;
|
||||
|
||||
await streamAgentEvents(streamUrl, headers, {
|
||||
// Connect to stream (WebSocket via Gateway, or fallback to SSE)
|
||||
await followAgentStream(operationId, {
|
||||
json: options.json,
|
||||
verbose: options.verbose,
|
||||
});
|
||||
|
||||
@@ -159,6 +159,38 @@ export function replayAgentEvents(events: AgentStreamEvent[], options: StreamOpt
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Connect to agent stream via Gateway WebSocket (preferred) or SSE fallback.
|
||||
* Shared by `lh agent run` and `lh task start/run`.
|
||||
*/
|
||||
export async function followAgentStream(
|
||||
operationId: string,
|
||||
options: { json?: boolean; sse?: boolean; verbose?: boolean } = {},
|
||||
): Promise<void> {
|
||||
const { getAgentStreamAuthInfo } = await import('../api/http');
|
||||
const { resolveAgentGatewayUrl } = await import('../settings');
|
||||
|
||||
const { serverUrl, headers } = await getAgentStreamAuthInfo();
|
||||
const agentGatewayUrl = options.sse ? undefined : resolveAgentGatewayUrl();
|
||||
|
||||
if (agentGatewayUrl) {
|
||||
const token = headers['Oidc-Auth'] || headers['X-API-Key'] || '';
|
||||
await streamAgentEventsViaWebSocket({
|
||||
gatewayUrl: agentGatewayUrl,
|
||||
json: options.json,
|
||||
operationId,
|
||||
token,
|
||||
verbose: options.verbose,
|
||||
});
|
||||
} else {
|
||||
const streamUrl = `${serverUrl}/api/agent/stream?operationId=${encodeURIComponent(operationId)}`;
|
||||
await streamAgentEvents(streamUrl, headers, {
|
||||
json: options.json,
|
||||
verbose: options.verbose,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
const HEARTBEAT_INTERVAL = 30_000;
|
||||
|
||||
/**
|
||||
|
||||
@@ -0,0 +1,57 @@
|
||||
import debug from 'debug';
|
||||
import { NextResponse } from 'next/server';
|
||||
|
||||
import { getServerDB } from '@/database/server';
|
||||
import { TaskLifecycleService } from '@/server/services/taskLifecycle';
|
||||
|
||||
const log = debug('lobe-server:workflows:task:on-topic-complete');
|
||||
|
||||
/**
|
||||
* Webhook handler for task topic completion.
|
||||
*
|
||||
* Called by AgentRuntime's onComplete hook (via QStash in production)
|
||||
* when a topic finishes executing. Triggers the task lifecycle flow:
|
||||
* heartbeat → handoff → review → checkpoint → self-schedule next topic.
|
||||
*/
|
||||
export async function POST(req: Request) {
|
||||
try {
|
||||
const body = await req.json();
|
||||
|
||||
// Hook event fields (from AgentHookEvent) + webhook body extras
|
||||
const { taskId, userId, operationId, topicId, reason, lastAssistantContent, errorMessage } =
|
||||
body;
|
||||
|
||||
if (!taskId || !userId) {
|
||||
return NextResponse.json({ error: 'Missing taskId or userId' }, { status: 400 });
|
||||
}
|
||||
|
||||
log(
|
||||
'Received: taskId=%s topicId=%s reason=%s operationId=%s',
|
||||
taskId,
|
||||
topicId,
|
||||
reason,
|
||||
operationId,
|
||||
);
|
||||
|
||||
const db = await getServerDB();
|
||||
const lifecycle = new TaskLifecycleService(db, userId);
|
||||
|
||||
await lifecycle.onTopicComplete({
|
||||
errorMessage,
|
||||
lastAssistantContent,
|
||||
operationId: operationId || '',
|
||||
reason: reason || 'done',
|
||||
taskId,
|
||||
taskIdentifier: '', // not available from webhook, lifecycle will resolve
|
||||
topicId,
|
||||
});
|
||||
|
||||
return NextResponse.json({ success: true });
|
||||
} catch (error) {
|
||||
console.error('[task:on-topic-complete] Error:', error);
|
||||
return NextResponse.json(
|
||||
{ error: error instanceof Error ? error.message : 'Internal error' },
|
||||
{ status: 500 },
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -16,14 +16,151 @@ import { AiAgentService } from '@/server/services/aiAgent';
|
||||
import { TaskService } from '@/server/services/task';
|
||||
import { TaskLifecycleService } from '@/server/services/taskLifecycle';
|
||||
import { TaskReviewService } from '@/server/services/taskReview';
|
||||
import {
|
||||
createTaskSchedulerModule,
|
||||
LocalTaskScheduler,
|
||||
type TaskSchedulerImpl,
|
||||
} from '@/server/services/taskScheduler';
|
||||
|
||||
/**
|
||||
* Shared task scheduler — wired once with a self-scheduling callback.
|
||||
* LocalTaskScheduler uses setTimeout, QStash uses delayed jobs.
|
||||
*/
|
||||
let taskSchedulerInstance: TaskSchedulerImpl | undefined;
|
||||
|
||||
function getTaskScheduler(): TaskSchedulerImpl {
|
||||
if (!taskSchedulerInstance) {
|
||||
const scheduler = createTaskSchedulerModule();
|
||||
|
||||
// Wire LocalTaskScheduler callback: when scheduleNextTopic fires,
|
||||
// execute the next topic for the task
|
||||
if (scheduler instanceof LocalTaskScheduler) {
|
||||
scheduler.setExecutionCallback(async (taskId: string, userId: string) => {
|
||||
try {
|
||||
await executeTaskTopic(taskId, userId);
|
||||
} catch (error) {
|
||||
console.error('[task-scheduler] Failed to execute next topic:', error);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
taskSchedulerInstance = scheduler;
|
||||
}
|
||||
return taskSchedulerInstance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Core task topic execution — shared by tRPC `task.run` and scheduler callback.
|
||||
* Creates a new topic for the given task and triggers agent execution.
|
||||
*/
|
||||
async function executeTaskTopic(
|
||||
taskId: string,
|
||||
userId: string,
|
||||
options?: { extraPrompt?: string },
|
||||
): Promise<Record<string, any>> {
|
||||
const { getServerDB } = await import('@/database/server');
|
||||
const db = await getServerDB();
|
||||
|
||||
const taskModel = new TaskModel(db, userId);
|
||||
const taskTopicModel = new TaskTopicModel(db, userId);
|
||||
const briefModel = new BriefModel(db, userId);
|
||||
const scheduler = getTaskScheduler();
|
||||
const taskLifecycle = new TaskLifecycleService(db, userId, scheduler);
|
||||
|
||||
const task = await taskModel.findById(taskId);
|
||||
if (!task) throw new Error(`Task ${taskId} not found`);
|
||||
if (!task.assigneeAgentId) throw new Error(`Task ${taskId} has no assigned agent`);
|
||||
|
||||
// Build prompt with handoff context
|
||||
const prompt = await buildTaskPrompt(
|
||||
task,
|
||||
{ briefModel, taskModel, taskTopicModel },
|
||||
options?.extraPrompt,
|
||||
);
|
||||
|
||||
// Ensure running status
|
||||
if (task.status !== 'running') {
|
||||
await taskModel.updateStatus(task.id, 'running', { error: null, startedAt: new Date() });
|
||||
}
|
||||
|
||||
const agentRef = task.assigneeAgentId;
|
||||
const isSlug = !agentRef.startsWith('agt_');
|
||||
const taskIdentifier = task.identifier;
|
||||
|
||||
const checkpoint = taskModel.getCheckpointConfig(task);
|
||||
const reviewConfig = taskModel.getReviewConfig(task);
|
||||
const pluginIds = [TaskSkillIdentifier, NotebookIdentifier];
|
||||
if (!reviewConfig?.enabled && checkpoint.onAgentRequest !== false) {
|
||||
pluginIds.push(BriefIdentifier);
|
||||
}
|
||||
|
||||
const taskConfig = (task.config ?? {}) as Record<string, unknown>;
|
||||
const aiAgentService = new AiAgentService(db, userId);
|
||||
|
||||
const result = await aiAgentService.execAgent({
|
||||
...(isSlug ? { slug: agentRef } : { agentId: agentRef }),
|
||||
additionalPluginIds: pluginIds,
|
||||
...(typeof taskConfig.model === 'string' && { model: taskConfig.model }),
|
||||
...(typeof taskConfig.provider === 'string' && { provider: taskConfig.provider }),
|
||||
hooks: [
|
||||
{
|
||||
handler: async () => {
|
||||
await taskModel.updateHeartbeat(taskId);
|
||||
},
|
||||
id: 'task-heartbeat',
|
||||
type: 'afterStep' as const,
|
||||
},
|
||||
{
|
||||
handler: async (event) => {
|
||||
await taskLifecycle.onTopicComplete({
|
||||
errorMessage: event.errorMessage,
|
||||
lastAssistantContent: event.lastAssistantContent,
|
||||
operationId: event.operationId,
|
||||
reason: event.reason || 'done',
|
||||
taskId,
|
||||
taskIdentifier,
|
||||
topicId: event.topicId,
|
||||
});
|
||||
},
|
||||
id: 'task-on-complete',
|
||||
type: 'onComplete' as const,
|
||||
webhook: {
|
||||
body: { taskId, userId },
|
||||
url: '/api/workflows/task/on-topic-complete',
|
||||
},
|
||||
},
|
||||
],
|
||||
prompt,
|
||||
taskId: task.id,
|
||||
title: task.name || task.identifier,
|
||||
trigger: 'task',
|
||||
userInterventionConfig: { approvalMode: 'headless' },
|
||||
});
|
||||
|
||||
// Track topic
|
||||
if (result.topicId) {
|
||||
await taskModel.incrementTopicCount(task.id);
|
||||
await taskModel.updateCurrentTopic(task.id, result.topicId);
|
||||
await taskTopicModel.add(task.id, result.topicId, {
|
||||
operationId: result.operationId,
|
||||
seq: (task.totalTopics || 0) + 1,
|
||||
});
|
||||
}
|
||||
|
||||
await taskModel.updateHeartbeat(task.id);
|
||||
|
||||
return { ...result, operationId: result.operationId, topicId: result.topicId };
|
||||
}
|
||||
|
||||
const taskProcedure = authedProcedure.use(serverDatabase).use(async (opts) => {
|
||||
const { ctx } = opts;
|
||||
const scheduler = getTaskScheduler();
|
||||
return opts.next({
|
||||
ctx: {
|
||||
briefModel: new BriefModel(ctx.serverDB, ctx.userId),
|
||||
taskLifecycle: new TaskLifecycleService(ctx.serverDB, ctx.userId),
|
||||
taskLifecycle: new TaskLifecycleService(ctx.serverDB, ctx.userId, scheduler),
|
||||
taskModel: new TaskModel(ctx.serverDB, ctx.userId),
|
||||
taskScheduler: scheduler,
|
||||
taskService: new TaskService(ctx.serverDB, ctx.userId),
|
||||
taskTopicModel: new TaskTopicModel(ctx.serverDB, ctx.userId),
|
||||
topicModel: new TopicModel(ctx.serverDB, ctx.userId),
|
||||
@@ -759,7 +896,6 @@ export const taskRouter = router({
|
||||
const existingTopics = await ctx.taskTopicModel.findByTaskId(task.id);
|
||||
|
||||
if (continueTopicId) {
|
||||
// If continuing a topic that's already running, reject
|
||||
const target = existingTopics.find((t) => t.topicId === continueTopicId);
|
||||
if (target?.status === 'running') {
|
||||
throw new TRPCError({
|
||||
@@ -768,7 +904,6 @@ export const taskRouter = router({
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// If there's already a running topic, reject creating a new one
|
||||
const runningTopic = existingTopics.find((t) => t.status === 'running');
|
||||
if (runningTopic) {
|
||||
throw new TRPCError({
|
||||
@@ -786,99 +921,82 @@ export const taskRouter = router({
|
||||
}
|
||||
}
|
||||
|
||||
// Build prompt with handoff context from previous topics
|
||||
const prompt = await buildTaskPrompt(task, ctx, extraPrompt);
|
||||
|
||||
// Update task status to running if not already, clear previous error
|
||||
if (task.status !== 'running') {
|
||||
await model.updateStatus(task.id, 'running', { error: null, startedAt: new Date() });
|
||||
} else if (task.error) {
|
||||
await model.update(task.id, { error: null });
|
||||
}
|
||||
|
||||
// Call AiAgentService.execAgent
|
||||
// assigneeAgentId can be either a raw agentId (agt_xxx) or a slug (inbox)
|
||||
const agentRef = task.assigneeAgentId!;
|
||||
const isSlug = !agentRef.startsWith('agt_');
|
||||
|
||||
const aiAgentService = new AiAgentService(ctx.serverDB, ctx.userId);
|
||||
const taskId = task.id;
|
||||
const taskIdentifier = task.identifier;
|
||||
const { taskLifecycle } = ctx;
|
||||
const db = ctx.serverDB;
|
||||
const userId = ctx.userId;
|
||||
|
||||
// Task execution always injects: Task skill (auto-activated) + Notebook tool (for document output)
|
||||
// Conditionally inject Brief tool based on checkpoint/review config
|
||||
const checkpoint = model.getCheckpointConfig(task);
|
||||
const reviewConfig = model.getReviewConfig(task);
|
||||
const pluginIds = [TaskSkillIdentifier, NotebookIdentifier];
|
||||
if (!reviewConfig?.enabled && checkpoint.onAgentRequest !== false) {
|
||||
pluginIds.push(BriefIdentifier);
|
||||
}
|
||||
|
||||
// Read per-task model/provider overrides from task.config
|
||||
const taskConfig = (task.config ?? {}) as Record<string, unknown>;
|
||||
|
||||
const result = await aiAgentService.execAgent({
|
||||
...(isSlug ? { slug: agentRef } : { agentId: agentRef }),
|
||||
additionalPluginIds: pluginIds,
|
||||
...(typeof taskConfig.model === 'string' && { model: taskConfig.model }),
|
||||
...(typeof taskConfig.provider === 'string' && { provider: taskConfig.provider }),
|
||||
hooks: [
|
||||
{
|
||||
handler: async (event) => {
|
||||
await taskLifecycle.onTopicComplete({
|
||||
errorMessage: event.errorMessage,
|
||||
lastAssistantContent: event.lastAssistantContent,
|
||||
operationId: event.operationId,
|
||||
reason: event.reason || 'done',
|
||||
taskId,
|
||||
taskIdentifier,
|
||||
topicId: event.topicId,
|
||||
});
|
||||
},
|
||||
id: 'task-on-complete',
|
||||
type: 'onComplete' as const,
|
||||
webhook: {
|
||||
body: { taskId, userId },
|
||||
url: '/api/workflows/task/on-topic-complete',
|
||||
},
|
||||
},
|
||||
],
|
||||
prompt,
|
||||
taskId: task.id,
|
||||
title: extraPrompt ? extraPrompt.slice(0, 100) : task.name || task.identifier,
|
||||
trigger: 'task',
|
||||
userInterventionConfig: { approvalMode: 'headless' },
|
||||
// Continue on existing topic if specified
|
||||
...(continueTopicId && { appContext: { topicId: continueTopicId } }),
|
||||
});
|
||||
|
||||
// Update task topic count, current topic, and association
|
||||
if (result.topicId) {
|
||||
if (continueTopicId) {
|
||||
// Continuing existing topic — update status and operationId
|
||||
await ctx.taskTopicModel.updateStatus(task.id, continueTopicId, 'running');
|
||||
await ctx.taskTopicModel.updateOperationId(
|
||||
task.id,
|
||||
continueTopicId,
|
||||
result.operationId,
|
||||
);
|
||||
await model.updateCurrentTopic(task.id, continueTopicId);
|
||||
} else {
|
||||
// New topic
|
||||
await model.incrementTopicCount(task.id);
|
||||
await model.updateCurrentTopic(task.id, result.topicId);
|
||||
await ctx.taskTopicModel.add(task.id, result.topicId, {
|
||||
operationId: result.operationId,
|
||||
seq: (task.totalTopics || 0) + 1,
|
||||
});
|
||||
// For continue-topic mode, handle it directly; otherwise delegate to shared function
|
||||
if (continueTopicId) {
|
||||
const prompt = await buildTaskPrompt(task, ctx, extraPrompt);
|
||||
if (task.status !== 'running') {
|
||||
await model.updateStatus(task.id, 'running', { error: null, startedAt: new Date() });
|
||||
}
|
||||
|
||||
const agentRef = task.assigneeAgentId!;
|
||||
const isSlug = !agentRef.startsWith('agt_');
|
||||
const aiAgentService = new AiAgentService(ctx.serverDB, ctx.userId);
|
||||
|
||||
const checkpoint = model.getCheckpointConfig(task);
|
||||
const reviewConfig = model.getReviewConfig(task);
|
||||
const pluginIds = [TaskSkillIdentifier, NotebookIdentifier];
|
||||
if (!reviewConfig?.enabled && checkpoint.onAgentRequest !== false) {
|
||||
pluginIds.push(BriefIdentifier);
|
||||
}
|
||||
|
||||
const taskConfig = (task.config ?? {}) as Record<string, unknown>;
|
||||
const taskId = task.id;
|
||||
const taskIdentifier = task.identifier;
|
||||
const { taskLifecycle } = ctx;
|
||||
|
||||
const result = await aiAgentService.execAgent({
|
||||
...(isSlug ? { slug: agentRef } : { agentId: agentRef }),
|
||||
additionalPluginIds: pluginIds,
|
||||
...(typeof taskConfig.model === 'string' && { model: taskConfig.model }),
|
||||
...(typeof taskConfig.provider === 'string' && { provider: taskConfig.provider }),
|
||||
appContext: { topicId: continueTopicId },
|
||||
hooks: [
|
||||
{
|
||||
handler: async () => {
|
||||
await model.updateHeartbeat(taskId);
|
||||
},
|
||||
id: 'task-heartbeat',
|
||||
type: 'afterStep' as const,
|
||||
},
|
||||
{
|
||||
handler: async (event) => {
|
||||
await taskLifecycle.onTopicComplete({
|
||||
errorMessage: event.errorMessage,
|
||||
lastAssistantContent: event.lastAssistantContent,
|
||||
operationId: event.operationId,
|
||||
reason: event.reason || 'done',
|
||||
taskId,
|
||||
taskIdentifier,
|
||||
topicId: event.topicId,
|
||||
});
|
||||
},
|
||||
id: 'task-on-complete',
|
||||
type: 'onComplete' as const,
|
||||
webhook: {
|
||||
body: { taskId, userId: ctx.userId },
|
||||
url: '/api/workflows/task/on-topic-complete',
|
||||
},
|
||||
},
|
||||
],
|
||||
prompt,
|
||||
taskId,
|
||||
title: extraPrompt ? extraPrompt.slice(0, 100) : task.name || task.identifier,
|
||||
trigger: 'task',
|
||||
userInterventionConfig: { approvalMode: 'headless' },
|
||||
});
|
||||
|
||||
await ctx.taskTopicModel.updateStatus(task.id, continueTopicId, 'running');
|
||||
await ctx.taskTopicModel.updateOperationId(task.id, continueTopicId, result.operationId);
|
||||
await model.updateCurrentTopic(task.id, continueTopicId);
|
||||
await model.updateHeartbeat(task.id);
|
||||
|
||||
return { ...result, taskId: task.id, taskIdentifier: task.identifier };
|
||||
}
|
||||
|
||||
// Update heartbeat
|
||||
await model.updateHeartbeat(task.id);
|
||||
// New topic — use shared executeTaskTopic
|
||||
const result = await executeTaskTopic(task.id, ctx.userId, {
|
||||
extraPrompt,
|
||||
});
|
||||
|
||||
return {
|
||||
...result,
|
||||
|
||||
@@ -10,6 +10,7 @@ import type { LobeChatDatabase } from '@/database/type';
|
||||
import { initModelRuntimeFromDB } from '@/server/modules/ModelRuntime';
|
||||
import { SystemAgentService } from '@/server/services/systemAgent';
|
||||
import { TaskReviewService } from '@/server/services/taskReview';
|
||||
import { createTaskSchedulerModule, type TaskSchedulerImpl } from '@/server/services/taskScheduler';
|
||||
|
||||
const log = debug('task-lifecycle');
|
||||
|
||||
@@ -30,13 +31,14 @@ export interface TopicCompleteParams {
|
||||
export class TaskLifecycleService {
|
||||
private briefModel: BriefModel;
|
||||
private db: LobeChatDatabase;
|
||||
private scheduler: TaskSchedulerImpl;
|
||||
private systemAgentService: SystemAgentService;
|
||||
private taskModel: TaskModel;
|
||||
private taskTopicModel: TaskTopicModel;
|
||||
private topicModel: TopicModel;
|
||||
private userId: string;
|
||||
|
||||
constructor(db: LobeChatDatabase, userId: string) {
|
||||
constructor(db: LobeChatDatabase, userId: string, scheduler?: TaskSchedulerImpl) {
|
||||
this.db = db;
|
||||
this.userId = userId;
|
||||
this.taskModel = new TaskModel(db, userId);
|
||||
@@ -44,6 +46,7 @@ export class TaskLifecycleService {
|
||||
this.briefModel = new BriefModel(db, userId);
|
||||
this.topicModel = new TopicModel(db, userId);
|
||||
this.systemAgentService = new SystemAgentService(db, userId);
|
||||
this.scheduler = scheduler || createTaskSchedulerModule();
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -102,6 +105,9 @@ export class TaskLifecycleService {
|
||||
if (currentTask && this.taskModel.shouldPauseOnTopicComplete(currentTask)) {
|
||||
await this.taskModel.updateStatus(taskId, 'paused', { error: null });
|
||||
}
|
||||
|
||||
// 6. Self-schedule next topic if task should continue
|
||||
await this.maybeScheduleNextTopic(taskId, currentTask);
|
||||
} else if (reason === 'error') {
|
||||
if (topicId) await this.taskTopicModel.updateStatus(taskId, topicId, 'failed');
|
||||
|
||||
@@ -121,6 +127,50 @@ export class TaskLifecycleService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Decide whether to schedule the next topic for a task.
|
||||
*
|
||||
* Conditions to continue:
|
||||
* - Task status is still 'running' (not paused by checkpoint/review)
|
||||
* - maxTopics not reached (or maxTopics is null = unlimited)
|
||||
* - No 'result' brief was produced (Agent didn't signal completion)
|
||||
*/
|
||||
private async maybeScheduleNextTopic(taskId: string, task: any): Promise<void> {
|
||||
if (!task) return;
|
||||
|
||||
// Re-read task status (may have been paused by checkpoint or review above)
|
||||
const freshTask = await this.taskModel.findById(taskId);
|
||||
if (!freshTask || freshTask.status !== 'running') {
|
||||
log('skip scheduling: task %s status=%s', taskId, freshTask?.status);
|
||||
return;
|
||||
}
|
||||
|
||||
// Check maxTopics
|
||||
if (freshTask.maxTopics && (freshTask.totalTopics || 0) >= freshTask.maxTopics) {
|
||||
log('skip scheduling: task %s reached maxTopics (%d)', taskId, freshTask.maxTopics);
|
||||
await this.taskModel.updateStatus(taskId, 'paused', { error: null });
|
||||
return;
|
||||
}
|
||||
|
||||
// Check if Agent signaled completion via result brief
|
||||
const briefs = await this.briefModel.findByTaskId(taskId);
|
||||
const latestBrief = briefs[0];
|
||||
if (latestBrief?.type === 'result') {
|
||||
log('skip scheduling: task %s has result brief', taskId);
|
||||
return;
|
||||
}
|
||||
|
||||
log('scheduling next topic for task %s', taskId);
|
||||
try {
|
||||
await this.scheduler.scheduleNextTopic({
|
||||
taskId,
|
||||
userId: this.userId,
|
||||
});
|
||||
} catch (error) {
|
||||
log('failed to schedule next topic for task %s: %O', taskId, error);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate handoff summary and update topic title via LLM.
|
||||
* Writes to task_topics handoff fields + updates topic title.
|
||||
|
||||
Reference in New Issue
Block a user