Compare commits

...

4 Commits

Author SHA1 Message Date
Arvin Xu d5b06e080c 🐛 fix: return full execAgent result from executeTaskTopic
Preserve success/operationId/topicId fields so CLI result.success check works.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 19:08:16 +08:00
Arvin Xu f770246f45 feat: wire task self-scheduling — onTopicComplete triggers next topic
- Add webhook route /api/workflows/task/on-topic-complete for production
- TaskLifecycleService.onTopicComplete now calls maybeScheduleNextTopic
  after handoff/review/checkpoint to auto-schedule next topic
- Extract executeTaskTopic shared function from task.run router
- Wire LocalTaskScheduler callback to call executeTaskTopic
- Self-scheduling conditions: task still running, maxTopics not reached,
  no result brief produced

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 19:08:16 +08:00
Arvin Xu 6021bb713c ♻️ refactor(cli): extract followAgentStream as shared gateway/SSE helper
Deduplicate gateway WebSocket + SSE fallback logic from agent.ts and
task/lifecycle.ts into a single followAgentStream() in agentStream.ts.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 19:08:16 +08:00
Arvin Xu 93c471fbb5 🐛 fix: use gateway WebSocket for task stream and add step-level heartbeat
- CLI task start/run: use agent-gateway WebSocket with SSE fallback
  (same pattern as `lh agent run`)
- task.run router: add afterStep hook to update heartbeat on every step,
  not just on topic completion — prevents watchdog false positives
  during long-running tool calls
2026-04-08 19:08:16 +08:00
6 changed files with 361 additions and 131 deletions
+6 -26
View File
@@ -4,13 +4,7 @@ import type { Command } from 'commander';
import pc from 'picocolors';
import { getTrpcClient } from '../api/client';
import { getAgentStreamAuthInfo } from '../api/http';
import { resolveAgentGatewayUrl } from '../settings';
import {
replayAgentEvents,
streamAgentEvents,
streamAgentEventsViaWebSocket,
} from '../utils/agentStream';
import { followAgentStream, replayAgentEvents } from '../utils/agentStream';
import { resolveLocalDeviceId } from '../utils/device';
import { confirm, outputJson, printTable, truncate } from '../utils/format';
import { log, setVerbose } from '../utils/logger';
@@ -355,25 +349,11 @@ export function registerAgentCommand(program: Command) {
}
// 2. Connect to stream (WebSocket via Gateway, or fallback to SSE)
const { serverUrl, headers } = await getAgentStreamAuthInfo();
const agentGatewayUrl = options.sse ? undefined : resolveAgentGatewayUrl();
if (agentGatewayUrl) {
const token = headers['Oidc-Auth'] || headers['X-API-Key'] || '';
await streamAgentEventsViaWebSocket({
gatewayUrl: agentGatewayUrl,
json: options.json,
operationId,
token,
verbose: options.verbose,
});
} else {
const streamUrl = `${serverUrl}/api/agent/stream?operationId=${encodeURIComponent(operationId)}`;
await streamAgentEvents(streamUrl, headers, {
json: options.json,
verbose: options.verbose,
});
}
await followAgentStream(operationId, {
json: options.json,
sse: options.sse,
verbose: options.verbose,
});
},
);
+4 -11
View File
@@ -2,8 +2,7 @@ import type { Command } from 'commander';
import pc from 'picocolors';
import { getTrpcClient } from '../../api/client';
import { getAuthInfo } from '../../api/http';
import { streamAgentEvents } from '../../utils/agentStream';
import { followAgentStream } from '../../utils/agentStream';
import { log } from '../../utils/logger';
export function registerLifecycleCommands(task: Command) {
@@ -71,10 +70,7 @@ export function registerLifecycleCommands(task: Command) {
return;
}
const { serverUrl, headers } = await getAuthInfo();
const streamUrl = `${serverUrl}/api/agent/stream?operationId=${encodeURIComponent(result.operationId)}`;
await streamAgentEvents(streamUrl, headers, {
await followAgentStream(result.operationId, {
json: options.json,
verbose: options.verbose,
});
@@ -165,11 +161,8 @@ export function registerLifecycleCommands(task: Command) {
return;
}
// Connect to SSE stream and wait for completion
const { serverUrl, headers } = await getAuthInfo();
const streamUrl = `${serverUrl}/api/agent/stream?operationId=${encodeURIComponent(operationId)}`;
await streamAgentEvents(streamUrl, headers, {
// Connect to stream (WebSocket via Gateway, or fallback to SSE)
await followAgentStream(operationId, {
json: options.json,
verbose: options.verbose,
});
+32
View File
@@ -159,6 +159,38 @@ export function replayAgentEvents(events: AgentStreamEvent[], options: StreamOpt
}
}
/**
* Connect to agent stream via Gateway WebSocket (preferred) or SSE fallback.
* Shared by `lh agent run` and `lh task start/run`.
*/
export async function followAgentStream(
operationId: string,
options: { json?: boolean; sse?: boolean; verbose?: boolean } = {},
): Promise<void> {
const { getAgentStreamAuthInfo } = await import('../api/http');
const { resolveAgentGatewayUrl } = await import('../settings');
const { serverUrl, headers } = await getAgentStreamAuthInfo();
const agentGatewayUrl = options.sse ? undefined : resolveAgentGatewayUrl();
if (agentGatewayUrl) {
const token = headers['Oidc-Auth'] || headers['X-API-Key'] || '';
await streamAgentEventsViaWebSocket({
gatewayUrl: agentGatewayUrl,
json: options.json,
operationId,
token,
verbose: options.verbose,
});
} else {
const streamUrl = `${serverUrl}/api/agent/stream?operationId=${encodeURIComponent(operationId)}`;
await streamAgentEvents(streamUrl, headers, {
json: options.json,
verbose: options.verbose,
});
}
}
const HEARTBEAT_INTERVAL = 30_000;
/**
@@ -0,0 +1,57 @@
import debug from 'debug';
import { NextResponse } from 'next/server';
import { getServerDB } from '@/database/server';
import { TaskLifecycleService } from '@/server/services/taskLifecycle';
const log = debug('lobe-server:workflows:task:on-topic-complete');
/**
* Webhook handler for task topic completion.
*
* Called by AgentRuntime's onComplete hook (via QStash in production)
* when a topic finishes executing. Triggers the task lifecycle flow:
* heartbeat → handoff → review → checkpoint → self-schedule next topic.
*/
export async function POST(req: Request) {
try {
const body = await req.json();
// Hook event fields (from AgentHookEvent) + webhook body extras
const { taskId, userId, operationId, topicId, reason, lastAssistantContent, errorMessage } =
body;
if (!taskId || !userId) {
return NextResponse.json({ error: 'Missing taskId or userId' }, { status: 400 });
}
log(
'Received: taskId=%s topicId=%s reason=%s operationId=%s',
taskId,
topicId,
reason,
operationId,
);
const db = await getServerDB();
const lifecycle = new TaskLifecycleService(db, userId);
await lifecycle.onTopicComplete({
errorMessage,
lastAssistantContent,
operationId: operationId || '',
reason: reason || 'done',
taskId,
taskIdentifier: '', // not available from webhook, lifecycle will resolve
topicId,
});
return NextResponse.json({ success: true });
} catch (error) {
console.error('[task:on-topic-complete] Error:', error);
return NextResponse.json(
{ error: error instanceof Error ? error.message : 'Internal error' },
{ status: 500 },
);
}
}
+211 -93
View File
@@ -16,14 +16,151 @@ import { AiAgentService } from '@/server/services/aiAgent';
import { TaskService } from '@/server/services/task';
import { TaskLifecycleService } from '@/server/services/taskLifecycle';
import { TaskReviewService } from '@/server/services/taskReview';
import {
createTaskSchedulerModule,
LocalTaskScheduler,
type TaskSchedulerImpl,
} from '@/server/services/taskScheduler';
/**
* Shared task scheduler — wired once with a self-scheduling callback.
* LocalTaskScheduler uses setTimeout, QStash uses delayed jobs.
*/
let taskSchedulerInstance: TaskSchedulerImpl | undefined;
function getTaskScheduler(): TaskSchedulerImpl {
if (!taskSchedulerInstance) {
const scheduler = createTaskSchedulerModule();
// Wire LocalTaskScheduler callback: when scheduleNextTopic fires,
// execute the next topic for the task
if (scheduler instanceof LocalTaskScheduler) {
scheduler.setExecutionCallback(async (taskId: string, userId: string) => {
try {
await executeTaskTopic(taskId, userId);
} catch (error) {
console.error('[task-scheduler] Failed to execute next topic:', error);
}
});
}
taskSchedulerInstance = scheduler;
}
return taskSchedulerInstance;
}
/**
* Core task topic execution — shared by tRPC `task.run` and scheduler callback.
* Creates a new topic for the given task and triggers agent execution.
*/
async function executeTaskTopic(
taskId: string,
userId: string,
options?: { extraPrompt?: string },
): Promise<Record<string, any>> {
const { getServerDB } = await import('@/database/server');
const db = await getServerDB();
const taskModel = new TaskModel(db, userId);
const taskTopicModel = new TaskTopicModel(db, userId);
const briefModel = new BriefModel(db, userId);
const scheduler = getTaskScheduler();
const taskLifecycle = new TaskLifecycleService(db, userId, scheduler);
const task = await taskModel.findById(taskId);
if (!task) throw new Error(`Task ${taskId} not found`);
if (!task.assigneeAgentId) throw new Error(`Task ${taskId} has no assigned agent`);
// Build prompt with handoff context
const prompt = await buildTaskPrompt(
task,
{ briefModel, taskModel, taskTopicModel },
options?.extraPrompt,
);
// Ensure running status
if (task.status !== 'running') {
await taskModel.updateStatus(task.id, 'running', { error: null, startedAt: new Date() });
}
const agentRef = task.assigneeAgentId;
const isSlug = !agentRef.startsWith('agt_');
const taskIdentifier = task.identifier;
const checkpoint = taskModel.getCheckpointConfig(task);
const reviewConfig = taskModel.getReviewConfig(task);
const pluginIds = [TaskSkillIdentifier, NotebookIdentifier];
if (!reviewConfig?.enabled && checkpoint.onAgentRequest !== false) {
pluginIds.push(BriefIdentifier);
}
const taskConfig = (task.config ?? {}) as Record<string, unknown>;
const aiAgentService = new AiAgentService(db, userId);
const result = await aiAgentService.execAgent({
...(isSlug ? { slug: agentRef } : { agentId: agentRef }),
additionalPluginIds: pluginIds,
...(typeof taskConfig.model === 'string' && { model: taskConfig.model }),
...(typeof taskConfig.provider === 'string' && { provider: taskConfig.provider }),
hooks: [
{
handler: async () => {
await taskModel.updateHeartbeat(taskId);
},
id: 'task-heartbeat',
type: 'afterStep' as const,
},
{
handler: async (event) => {
await taskLifecycle.onTopicComplete({
errorMessage: event.errorMessage,
lastAssistantContent: event.lastAssistantContent,
operationId: event.operationId,
reason: event.reason || 'done',
taskId,
taskIdentifier,
topicId: event.topicId,
});
},
id: 'task-on-complete',
type: 'onComplete' as const,
webhook: {
body: { taskId, userId },
url: '/api/workflows/task/on-topic-complete',
},
},
],
prompt,
taskId: task.id,
title: task.name || task.identifier,
trigger: 'task',
userInterventionConfig: { approvalMode: 'headless' },
});
// Track topic
if (result.topicId) {
await taskModel.incrementTopicCount(task.id);
await taskModel.updateCurrentTopic(task.id, result.topicId);
await taskTopicModel.add(task.id, result.topicId, {
operationId: result.operationId,
seq: (task.totalTopics || 0) + 1,
});
}
await taskModel.updateHeartbeat(task.id);
return { ...result, operationId: result.operationId, topicId: result.topicId };
}
const taskProcedure = authedProcedure.use(serverDatabase).use(async (opts) => {
const { ctx } = opts;
const scheduler = getTaskScheduler();
return opts.next({
ctx: {
briefModel: new BriefModel(ctx.serverDB, ctx.userId),
taskLifecycle: new TaskLifecycleService(ctx.serverDB, ctx.userId),
taskLifecycle: new TaskLifecycleService(ctx.serverDB, ctx.userId, scheduler),
taskModel: new TaskModel(ctx.serverDB, ctx.userId),
taskScheduler: scheduler,
taskService: new TaskService(ctx.serverDB, ctx.userId),
taskTopicModel: new TaskTopicModel(ctx.serverDB, ctx.userId),
topicModel: new TopicModel(ctx.serverDB, ctx.userId),
@@ -759,7 +896,6 @@ export const taskRouter = router({
const existingTopics = await ctx.taskTopicModel.findByTaskId(task.id);
if (continueTopicId) {
// If continuing a topic that's already running, reject
const target = existingTopics.find((t) => t.topicId === continueTopicId);
if (target?.status === 'running') {
throw new TRPCError({
@@ -768,7 +904,6 @@ export const taskRouter = router({
});
}
} else {
// If there's already a running topic, reject creating a new one
const runningTopic = existingTopics.find((t) => t.status === 'running');
if (runningTopic) {
throw new TRPCError({
@@ -786,99 +921,82 @@ export const taskRouter = router({
}
}
// Build prompt with handoff context from previous topics
const prompt = await buildTaskPrompt(task, ctx, extraPrompt);
// Update task status to running if not already, clear previous error
if (task.status !== 'running') {
await model.updateStatus(task.id, 'running', { error: null, startedAt: new Date() });
} else if (task.error) {
await model.update(task.id, { error: null });
}
// Call AiAgentService.execAgent
// assigneeAgentId can be either a raw agentId (agt_xxx) or a slug (inbox)
const agentRef = task.assigneeAgentId!;
const isSlug = !agentRef.startsWith('agt_');
const aiAgentService = new AiAgentService(ctx.serverDB, ctx.userId);
const taskId = task.id;
const taskIdentifier = task.identifier;
const { taskLifecycle } = ctx;
const db = ctx.serverDB;
const userId = ctx.userId;
// Task execution always injects: Task skill (auto-activated) + Notebook tool (for document output)
// Conditionally inject Brief tool based on checkpoint/review config
const checkpoint = model.getCheckpointConfig(task);
const reviewConfig = model.getReviewConfig(task);
const pluginIds = [TaskSkillIdentifier, NotebookIdentifier];
if (!reviewConfig?.enabled && checkpoint.onAgentRequest !== false) {
pluginIds.push(BriefIdentifier);
}
// Read per-task model/provider overrides from task.config
const taskConfig = (task.config ?? {}) as Record<string, unknown>;
const result = await aiAgentService.execAgent({
...(isSlug ? { slug: agentRef } : { agentId: agentRef }),
additionalPluginIds: pluginIds,
...(typeof taskConfig.model === 'string' && { model: taskConfig.model }),
...(typeof taskConfig.provider === 'string' && { provider: taskConfig.provider }),
hooks: [
{
handler: async (event) => {
await taskLifecycle.onTopicComplete({
errorMessage: event.errorMessage,
lastAssistantContent: event.lastAssistantContent,
operationId: event.operationId,
reason: event.reason || 'done',
taskId,
taskIdentifier,
topicId: event.topicId,
});
},
id: 'task-on-complete',
type: 'onComplete' as const,
webhook: {
body: { taskId, userId },
url: '/api/workflows/task/on-topic-complete',
},
},
],
prompt,
taskId: task.id,
title: extraPrompt ? extraPrompt.slice(0, 100) : task.name || task.identifier,
trigger: 'task',
userInterventionConfig: { approvalMode: 'headless' },
// Continue on existing topic if specified
...(continueTopicId && { appContext: { topicId: continueTopicId } }),
});
// Update task topic count, current topic, and association
if (result.topicId) {
if (continueTopicId) {
// Continuing existing topic — update status and operationId
await ctx.taskTopicModel.updateStatus(task.id, continueTopicId, 'running');
await ctx.taskTopicModel.updateOperationId(
task.id,
continueTopicId,
result.operationId,
);
await model.updateCurrentTopic(task.id, continueTopicId);
} else {
// New topic
await model.incrementTopicCount(task.id);
await model.updateCurrentTopic(task.id, result.topicId);
await ctx.taskTopicModel.add(task.id, result.topicId, {
operationId: result.operationId,
seq: (task.totalTopics || 0) + 1,
});
// For continue-topic mode, handle it directly; otherwise delegate to shared function
if (continueTopicId) {
const prompt = await buildTaskPrompt(task, ctx, extraPrompt);
if (task.status !== 'running') {
await model.updateStatus(task.id, 'running', { error: null, startedAt: new Date() });
}
const agentRef = task.assigneeAgentId!;
const isSlug = !agentRef.startsWith('agt_');
const aiAgentService = new AiAgentService(ctx.serverDB, ctx.userId);
const checkpoint = model.getCheckpointConfig(task);
const reviewConfig = model.getReviewConfig(task);
const pluginIds = [TaskSkillIdentifier, NotebookIdentifier];
if (!reviewConfig?.enabled && checkpoint.onAgentRequest !== false) {
pluginIds.push(BriefIdentifier);
}
const taskConfig = (task.config ?? {}) as Record<string, unknown>;
const taskId = task.id;
const taskIdentifier = task.identifier;
const { taskLifecycle } = ctx;
const result = await aiAgentService.execAgent({
...(isSlug ? { slug: agentRef } : { agentId: agentRef }),
additionalPluginIds: pluginIds,
...(typeof taskConfig.model === 'string' && { model: taskConfig.model }),
...(typeof taskConfig.provider === 'string' && { provider: taskConfig.provider }),
appContext: { topicId: continueTopicId },
hooks: [
{
handler: async () => {
await model.updateHeartbeat(taskId);
},
id: 'task-heartbeat',
type: 'afterStep' as const,
},
{
handler: async (event) => {
await taskLifecycle.onTopicComplete({
errorMessage: event.errorMessage,
lastAssistantContent: event.lastAssistantContent,
operationId: event.operationId,
reason: event.reason || 'done',
taskId,
taskIdentifier,
topicId: event.topicId,
});
},
id: 'task-on-complete',
type: 'onComplete' as const,
webhook: {
body: { taskId, userId: ctx.userId },
url: '/api/workflows/task/on-topic-complete',
},
},
],
prompt,
taskId,
title: extraPrompt ? extraPrompt.slice(0, 100) : task.name || task.identifier,
trigger: 'task',
userInterventionConfig: { approvalMode: 'headless' },
});
await ctx.taskTopicModel.updateStatus(task.id, continueTopicId, 'running');
await ctx.taskTopicModel.updateOperationId(task.id, continueTopicId, result.operationId);
await model.updateCurrentTopic(task.id, continueTopicId);
await model.updateHeartbeat(task.id);
return { ...result, taskId: task.id, taskIdentifier: task.identifier };
}
// Update heartbeat
await model.updateHeartbeat(task.id);
// New topic — use shared executeTaskTopic
const result = await executeTaskTopic(task.id, ctx.userId, {
extraPrompt,
});
return {
...result,
+51 -1
View File
@@ -10,6 +10,7 @@ import type { LobeChatDatabase } from '@/database/type';
import { initModelRuntimeFromDB } from '@/server/modules/ModelRuntime';
import { SystemAgentService } from '@/server/services/systemAgent';
import { TaskReviewService } from '@/server/services/taskReview';
import { createTaskSchedulerModule, type TaskSchedulerImpl } from '@/server/services/taskScheduler';
const log = debug('task-lifecycle');
@@ -30,13 +31,14 @@ export interface TopicCompleteParams {
export class TaskLifecycleService {
private briefModel: BriefModel;
private db: LobeChatDatabase;
private scheduler: TaskSchedulerImpl;
private systemAgentService: SystemAgentService;
private taskModel: TaskModel;
private taskTopicModel: TaskTopicModel;
private topicModel: TopicModel;
private userId: string;
constructor(db: LobeChatDatabase, userId: string) {
constructor(db: LobeChatDatabase, userId: string, scheduler?: TaskSchedulerImpl) {
this.db = db;
this.userId = userId;
this.taskModel = new TaskModel(db, userId);
@@ -44,6 +46,7 @@ export class TaskLifecycleService {
this.briefModel = new BriefModel(db, userId);
this.topicModel = new TopicModel(db, userId);
this.systemAgentService = new SystemAgentService(db, userId);
this.scheduler = scheduler || createTaskSchedulerModule();
}
/**
@@ -102,6 +105,9 @@ export class TaskLifecycleService {
if (currentTask && this.taskModel.shouldPauseOnTopicComplete(currentTask)) {
await this.taskModel.updateStatus(taskId, 'paused', { error: null });
}
// 6. Self-schedule next topic if task should continue
await this.maybeScheduleNextTopic(taskId, currentTask);
} else if (reason === 'error') {
if (topicId) await this.taskTopicModel.updateStatus(taskId, topicId, 'failed');
@@ -121,6 +127,50 @@ export class TaskLifecycleService {
}
}
/**
* Decide whether to schedule the next topic for a task.
*
* Conditions to continue:
* - Task status is still 'running' (not paused by checkpoint/review)
* - maxTopics not reached (or maxTopics is null = unlimited)
* - No 'result' brief was produced (Agent didn't signal completion)
*/
private async maybeScheduleNextTopic(taskId: string, task: any): Promise<void> {
if (!task) return;
// Re-read task status (may have been paused by checkpoint or review above)
const freshTask = await this.taskModel.findById(taskId);
if (!freshTask || freshTask.status !== 'running') {
log('skip scheduling: task %s status=%s', taskId, freshTask?.status);
return;
}
// Check maxTopics
if (freshTask.maxTopics && (freshTask.totalTopics || 0) >= freshTask.maxTopics) {
log('skip scheduling: task %s reached maxTopics (%d)', taskId, freshTask.maxTopics);
await this.taskModel.updateStatus(taskId, 'paused', { error: null });
return;
}
// Check if Agent signaled completion via result brief
const briefs = await this.briefModel.findByTaskId(taskId);
const latestBrief = briefs[0];
if (latestBrief?.type === 'result') {
log('skip scheduling: task %s has result brief', taskId);
return;
}
log('scheduling next topic for task %s', taskId);
try {
await this.scheduler.scheduleNextTopic({
taskId,
userId: this.userId,
});
} catch (error) {
log('failed to schedule next topic for task %s: %O', taskId, error);
}
}
/**
* Generate handoff summary and update topic title via LLM.
* Writes to task_topics handoff fields + updates topic title.