mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-15 12:10:16 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d450efd3b2 | |||
| ab2e95ad7d | |||
| 9ffd9d39e3 | |||
| f1ea407d74 | |||
| 98d77166cc | |||
| 9821328d43 |
@@ -17,6 +17,7 @@ import type {
|
||||
ToolsCalling,
|
||||
Usage,
|
||||
} from '../types';
|
||||
import { isBlockedStatus } from '../utils/status';
|
||||
|
||||
/**
|
||||
* Simplified Agent Runtime - The "Engine" that executes instructions from an "Agent" (Brain).
|
||||
@@ -197,7 +198,7 @@ export class AgentRuntime {
|
||||
}
|
||||
|
||||
// Stop execution if blocked
|
||||
if (currentState.status === 'waiting_for_human' || currentState.status === 'interrupted') {
|
||||
if (isBlockedStatus(currentState.status)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -214,12 +215,10 @@ export class AgentRuntime {
|
||||
return {
|
||||
events: allEvents,
|
||||
newState: currentState,
|
||||
// When execution is blocked (waiting for human or interrupted),
|
||||
// clear nextContext so the outer loop stops instead of continuing
|
||||
nextContext:
|
||||
currentState.status === 'waiting_for_human' || currentState.status === 'interrupted'
|
||||
? undefined
|
||||
: finalNextContext,
|
||||
// When execution is blocked (waiting for human, waiting for an async
|
||||
// tool result, or interrupted), clear nextContext so the outer loop
|
||||
// stops instead of continuing
|
||||
nextContext: isBlockedStatus(currentState.status) ? undefined : finalNextContext,
|
||||
};
|
||||
} catch (error) {
|
||||
const errorState = structuredClone(state);
|
||||
|
||||
@@ -113,7 +113,14 @@ export interface AgentState {
|
||||
*/
|
||||
securityBlacklist?: SecurityBlacklistConfig;
|
||||
// --- State Machine ---
|
||||
status: 'idle' | 'running' | 'waiting_for_human' | 'done' | 'error' | 'interrupted';
|
||||
status:
|
||||
| 'idle'
|
||||
| 'running'
|
||||
| 'waiting_for_human'
|
||||
| 'waiting_for_async_tool'
|
||||
| 'done'
|
||||
| 'error'
|
||||
| 'interrupted';
|
||||
|
||||
// --- Execution Tracking ---
|
||||
/**
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
export * from './messageSelectors';
|
||||
export * from './status';
|
||||
export * from './stepContextComputer';
|
||||
export * from './tokenCounter';
|
||||
|
||||
@@ -0,0 +1,43 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import type { AgentState } from '../types/state';
|
||||
import { isBlockedStatus, isParkedStatus } from './status';
|
||||
|
||||
const ALL_STATUSES: AgentState['status'][] = [
|
||||
'idle',
|
||||
'running',
|
||||
'waiting_for_human',
|
||||
'waiting_for_async_tool',
|
||||
'done',
|
||||
'error',
|
||||
'interrupted',
|
||||
];
|
||||
|
||||
describe('isParkedStatus', () => {
|
||||
it('is true only for the non-terminal resumable pauses', () => {
|
||||
expect(isParkedStatus('waiting_for_human')).toBe(true);
|
||||
expect(isParkedStatus('waiting_for_async_tool')).toBe(true);
|
||||
});
|
||||
|
||||
it('is false for running, terminal, and interrupted', () => {
|
||||
const nonParked = ALL_STATUSES.filter(
|
||||
(s) => s !== 'waiting_for_human' && s !== 'waiting_for_async_tool',
|
||||
);
|
||||
for (const status of nonParked) expect(isParkedStatus(status)).toBe(false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('isBlockedStatus', () => {
|
||||
it('is true for parked statuses and user interrupt', () => {
|
||||
expect(isBlockedStatus('waiting_for_human')).toBe(true);
|
||||
expect(isBlockedStatus('waiting_for_async_tool')).toBe(true);
|
||||
expect(isBlockedStatus('interrupted')).toBe(true);
|
||||
});
|
||||
|
||||
it('is false for idle, running, and terminal statuses', () => {
|
||||
expect(isBlockedStatus('idle')).toBe(false);
|
||||
expect(isBlockedStatus('running')).toBe(false);
|
||||
expect(isBlockedStatus('done')).toBe(false);
|
||||
expect(isBlockedStatus('error')).toBe(false);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,19 @@
|
||||
import type { AgentState } from '../types/state';
|
||||
|
||||
/**
|
||||
* Parked statuses are non-terminal, resumable pauses: the operation is still
|
||||
* alive but waiting on something out-of-band — human approval
|
||||
* (`waiting_for_human`) or an async tool / sub-agent result
|
||||
* (`waiting_for_async_tool`). They are deliberately distinct from `interrupted`
|
||||
* (user cancel) and the terminal `done` / `error`, so the completion lifecycle
|
||||
* never stamps `completedAt` and the scheduler keeps treating them as active.
|
||||
*/
|
||||
export const isParkedStatus = (status: AgentState['status']): boolean =>
|
||||
status === 'waiting_for_human' || status === 'waiting_for_async_tool';
|
||||
|
||||
/**
|
||||
* Blocked statuses halt the step loop — a parked pause or a user interrupt.
|
||||
* `done` / `error` terminate through their own handling.
|
||||
*/
|
||||
export const isBlockedStatus = (status: AgentState['status']): boolean =>
|
||||
isParkedStatus(status) || status === 'interrupted';
|
||||
@@ -34,13 +34,20 @@ export interface RecordOperationCompletionParams {
|
||||
| 'interrupted'
|
||||
| 'max_steps'
|
||||
| 'cost_limit'
|
||||
| 'waiting_for_human';
|
||||
| 'waiting_for_human'
|
||||
| 'waiting_for_async_tool';
|
||||
cost?: Record<string, unknown> | null;
|
||||
error?: AgentOperationError | null;
|
||||
interruption?: AgentOperationInterruption | null;
|
||||
llmCalls?: number | null;
|
||||
processingTimeMs?: number | null;
|
||||
status: 'running' | 'waiting_for_human' | 'done' | 'error' | 'interrupted';
|
||||
status:
|
||||
| 'running'
|
||||
| 'waiting_for_human'
|
||||
| 'waiting_for_async_tool'
|
||||
| 'done'
|
||||
| 'error'
|
||||
| 'interrupted';
|
||||
stepCount?: number | null;
|
||||
toolCalls?: number | null;
|
||||
totalCost?: number | null;
|
||||
@@ -135,4 +142,47 @@ export class AgentOperationModel {
|
||||
.limit(1);
|
||||
return row ?? null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return every sub-operation spawned by a parent op. Used by the
|
||||
* sub-agent completion bridge to reconcile which deferred tool calls have
|
||||
* reached a terminal state when deciding whether to resume the parent.
|
||||
*/
|
||||
async queryByParentOperationId(parentOperationId: string) {
|
||||
return this.db
|
||||
.select({
|
||||
completionReason: agentOperations.completionReason,
|
||||
id: agentOperations.id,
|
||||
metadata: agentOperations.metadata,
|
||||
status: agentOperations.status,
|
||||
})
|
||||
.from(agentOperations)
|
||||
.where(
|
||||
and(
|
||||
eq(agentOperations.parentOperationId, parentOperationId),
|
||||
eq(agentOperations.userId, this.userId),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically flip a parked parent op from `waiting_for_async_tool` back to
|
||||
* `running`. Returns true only for the single winner (affected === 1) so
|
||||
* concurrent sub-op completions that lose the race no-op instead of
|
||||
* double-resuming the parent.
|
||||
*/
|
||||
async tryResumeFromAsyncTool(operationId: string): Promise<boolean> {
|
||||
const rows = await this.db
|
||||
.update(agentOperations)
|
||||
.set({ status: 'running' })
|
||||
.where(
|
||||
and(
|
||||
eq(agentOperations.id, operationId),
|
||||
eq(agentOperations.userId, this.userId),
|
||||
eq(agentOperations.status, 'waiting_for_async_tool'),
|
||||
),
|
||||
)
|
||||
.returning({ id: agentOperations.id });
|
||||
return rows.length === 1;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ const operationStatuses = [
|
||||
'idle',
|
||||
'running',
|
||||
'waiting_for_human',
|
||||
'waiting_for_async_tool',
|
||||
'done',
|
||||
'error',
|
||||
'interrupted',
|
||||
@@ -22,6 +23,7 @@ const completionReasons = [
|
||||
'max_steps',
|
||||
'cost_limit',
|
||||
'waiting_for_human',
|
||||
'waiting_for_async_tool',
|
||||
] as const;
|
||||
|
||||
export interface AgentOperationInterruption {
|
||||
|
||||
@@ -322,7 +322,7 @@ export class ResponsesService extends BaseService {
|
||||
const usage = this.extractUsage(finalState);
|
||||
|
||||
const isClientToolInterrupt =
|
||||
finalState.status === 'interrupted' &&
|
||||
finalState.status === 'waiting_for_async_tool' &&
|
||||
finalState.interruption?.reason === 'client_tool_execution';
|
||||
|
||||
return this.buildResponseObject({
|
||||
@@ -753,7 +753,7 @@ export class ResponsesService extends BaseService {
|
||||
|
||||
// Determine if agent was interrupted for client tool execution
|
||||
const isClientToolInterrupt =
|
||||
finalState?.status === 'interrupted' &&
|
||||
finalState?.status === 'waiting_for_async_tool' &&
|
||||
finalState?.interruption?.reason === 'client_tool_execution';
|
||||
|
||||
if (isClientToolInterrupt) {
|
||||
|
||||
@@ -326,6 +326,13 @@ export type BuiltinStreaming = <A = any>(props: BuiltinStreamingProps<A>) => Rea
|
||||
|
||||
export interface BuiltinServerRuntimeOutput {
|
||||
content: string;
|
||||
/**
|
||||
* When true, the tool executed a side-effect but its result is delivered
|
||||
* out-of-band later (e.g. an async sub-agent). The agent runtime parks the
|
||||
* operation instead of writing a tool_result, mirroring the client-tool
|
||||
* pause path. The deferred result is filled in by a completion bridge.
|
||||
*/
|
||||
deferred?: boolean;
|
||||
error?: any;
|
||||
state?: any;
|
||||
success: boolean;
|
||||
|
||||
@@ -23,12 +23,18 @@ const STREAM_END_STATUSES = new Set<AgentState['status']>([
|
||||
'error',
|
||||
'interrupted',
|
||||
'waiting_for_human',
|
||||
'waiting_for_async_tool',
|
||||
]);
|
||||
|
||||
const hasEnteredStreamEndState = (
|
||||
previousStatus?: AgentState['status'],
|
||||
nextStatus?: AgentState['status'],
|
||||
): nextStatus is 'done' | 'error' | 'interrupted' | 'waiting_for_human' => {
|
||||
): nextStatus is
|
||||
| 'done'
|
||||
| 'error'
|
||||
| 'interrupted'
|
||||
| 'waiting_for_human'
|
||||
| 'waiting_for_async_tool' => {
|
||||
const wasStreamEnd = previousStatus ? STREAM_END_STATUSES.has(previousStatus) : false;
|
||||
return Boolean(nextStatus && STREAM_END_STATUSES.has(nextStatus) && !wasStreamEnd);
|
||||
};
|
||||
@@ -127,7 +133,11 @@ export class AgentRuntimeCoordinator {
|
||||
*/
|
||||
private async resolveUiMessages(state: AgentState): Promise<UIChatMessage[] | undefined> {
|
||||
if (!this.uiMessagesResolver) return undefined;
|
||||
if (state.status === 'interrupted') return undefined;
|
||||
// `waiting_for_async_tool` is the deferred-tool pause (client tool or
|
||||
// sub-agent): the result message is written back later, so don't push a
|
||||
// premature SoT snapshot that would clobber the client's in-memory state.
|
||||
if (state.status === 'interrupted' || state.status === 'waiting_for_async_tool')
|
||||
return undefined;
|
||||
try {
|
||||
return await this.uiMessagesResolver(state);
|
||||
} catch (error) {
|
||||
|
||||
@@ -1799,7 +1799,7 @@ export const createRuntimeExecutors = (
|
||||
|
||||
const newState = structuredClone(state);
|
||||
newState.lastModified = new Date().toISOString();
|
||||
newState.status = 'interrupted';
|
||||
newState.status = 'waiting_for_async_tool';
|
||||
newState.interruption = {
|
||||
canResume: true,
|
||||
interruptedAt: new Date().toISOString(),
|
||||
@@ -1937,6 +1937,7 @@ export const createRuntimeExecutors = (
|
||||
activeDeviceId: state.metadata?.activeDeviceId,
|
||||
agentId: state.metadata?.agentId,
|
||||
documentId: state.metadata?.documentId,
|
||||
execSubAgentTask: ctx.execSubAgentTask,
|
||||
executionTimeoutMs: timeoutMs,
|
||||
groupId: state.metadata?.groupId,
|
||||
memoryToolPermission: agentConfig?.chatConfig?.memory?.toolPermission,
|
||||
@@ -1971,6 +1972,42 @@ export const createRuntimeExecutors = (
|
||||
);
|
||||
}
|
||||
|
||||
// Deferred tool (e.g. async sub-agent): the executor performed its
|
||||
// side-effect and created a pending placeholder; the real result is
|
||||
// delivered out-of-band later by a completion bridge. Park like a
|
||||
// client tool — surface the pending call, hold it in pendingToolsCalling,
|
||||
// and do not write a tool_result now.
|
||||
if (execution.result.deferred) {
|
||||
log(`[${operationLogId}] Tool ${toolName} deferred; parking for async result`);
|
||||
await streamManager.publishStreamChunk(operationId, stepIndex, {
|
||||
chunkType: 'tools_calling',
|
||||
toolsCalling: [chatToolPayload] as any,
|
||||
});
|
||||
executeToolSpan.setAttributes(
|
||||
buildExecuteToolResultAttributes({ attempts: execution.attempts, success: true }),
|
||||
);
|
||||
const newState = structuredClone(state);
|
||||
newState.lastModified = new Date().toISOString();
|
||||
newState.status = 'waiting_for_async_tool';
|
||||
newState.interruption = {
|
||||
canResume: true,
|
||||
interruptedAt: new Date().toISOString(),
|
||||
reason: 'async_tool',
|
||||
};
|
||||
newState.pendingToolsCalling = [chatToolPayload];
|
||||
return {
|
||||
events: [
|
||||
{
|
||||
canResume: true,
|
||||
interruptedAt: new Date().toISOString(),
|
||||
reason: 'async_tool',
|
||||
type: 'interrupted',
|
||||
},
|
||||
],
|
||||
newState,
|
||||
};
|
||||
}
|
||||
|
||||
const executionResult = await archiveRuntimeToolResult(execution.result, {
|
||||
agentId: state.metadata?.agentId,
|
||||
identifier: chatToolPayload.identifier,
|
||||
@@ -2293,7 +2330,7 @@ export const createRuntimeExecutors = (
|
||||
|
||||
const newState = structuredClone(state);
|
||||
newState.lastModified = new Date().toISOString();
|
||||
newState.status = 'interrupted';
|
||||
newState.status = 'waiting_for_async_tool';
|
||||
newState.interruption = {
|
||||
canResume: true,
|
||||
interruptedAt: new Date().toISOString(),
|
||||
@@ -2317,6 +2354,9 @@ export const createRuntimeExecutors = (
|
||||
// Track all tool message IDs created during execution
|
||||
const toolMessageIds: string[] = [];
|
||||
const toolResults: any[] = [];
|
||||
// Deferred (async) tools whose result is delivered out-of-band later;
|
||||
// collected here so the batch parks for them after server tools finish.
|
||||
const deferredTools: ChatToolPayload[] = [];
|
||||
|
||||
// Execute server tools concurrently (skip client tools in mixed batch)
|
||||
const toolsToExecute = serverTools.length > 0 ? serverTools : toolsCalling;
|
||||
@@ -2469,6 +2509,7 @@ export const createRuntimeExecutors = (
|
||||
activeDeviceId: state.metadata?.activeDeviceId,
|
||||
agentId: state.metadata?.agentId,
|
||||
documentId: state.metadata?.documentId,
|
||||
execSubAgentTask: ctx.execSubAgentTask,
|
||||
executionTimeoutMs: timeoutMs,
|
||||
groupId: state.metadata?.groupId,
|
||||
memoryToolPermission: batchAgentConfig?.chatConfig?.memory?.toolPermission,
|
||||
@@ -2494,6 +2535,18 @@ export const createRuntimeExecutors = (
|
||||
);
|
||||
}
|
||||
|
||||
// Deferred (async) tool: executor created a pending placeholder and
|
||||
// the real result arrives out-of-band. Skip the tool_result write;
|
||||
// the batch parks for it after all server tools settle.
|
||||
if (execution.result.deferred) {
|
||||
log(`[${operationLogId}] Tool ${toolName} deferred; will park after batch`);
|
||||
deferredTools.push(chatToolPayload);
|
||||
batchExecuteToolSpan.setAttributes(
|
||||
buildExecuteToolResultAttributes({ attempts: execution.attempts, success: true }),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const executionResult = await archiveRuntimeToolResult(execution.result, {
|
||||
agentId: state.metadata?.agentId,
|
||||
identifier: chatToolPayload.identifier,
|
||||
@@ -2748,24 +2801,31 @@ export const createRuntimeExecutors = (
|
||||
// Get the last tool message ID as parentMessageId for next LLM call
|
||||
const lastToolMessageId = toolMessageIds.at(-1);
|
||||
|
||||
// If there are remaining client tools in a mixed batch, interrupt after server tools
|
||||
if (clientTools.length > 0) {
|
||||
// Park if any tools still owe an out-of-band result: client tools (run on
|
||||
// the client) and/or deferred async tools (e.g. sub-agents). The operation
|
||||
// resumes once every pending tool's result is delivered.
|
||||
const pendingTools = [...deferredTools, ...clientTools];
|
||||
if (pendingTools.length > 0) {
|
||||
// Prefer the async-tool reason when any deferred tool is present; the
|
||||
// individual pending payloads still carry their own identity for the
|
||||
// resume gate.
|
||||
const pauseReason = deferredTools.length > 0 ? 'async_tool' : 'client_tool_execution';
|
||||
log(
|
||||
`[${operationLogId}][call_tools_batch] Mixed batch: ${serverTools.length} server tools done, pausing for ${clientTools.length} client tools`,
|
||||
`[${operationLogId}][call_tools_batch] Pausing after ${serverTools.length} server tools: ${deferredTools.length} deferred + ${clientTools.length} client`,
|
||||
);
|
||||
|
||||
await streamManager.publishStreamChunk(operationId, stepIndex, {
|
||||
chunkType: 'tools_calling',
|
||||
toolsCalling: clientTools as any,
|
||||
toolsCalling: pendingTools as any,
|
||||
});
|
||||
|
||||
newState.status = 'interrupted';
|
||||
newState.status = 'waiting_for_async_tool';
|
||||
newState.interruption = {
|
||||
canResume: true,
|
||||
interruptedAt: new Date().toISOString(),
|
||||
reason: 'client_tool_execution',
|
||||
reason: pauseReason,
|
||||
};
|
||||
newState.pendingToolsCalling = clientTools;
|
||||
newState.pendingToolsCalling = pendingTools;
|
||||
|
||||
return {
|
||||
events: [
|
||||
@@ -2773,7 +2833,7 @@ export const createRuntimeExecutors = (
|
||||
{
|
||||
canResume: true,
|
||||
interruptedAt: new Date().toISOString(),
|
||||
reason: 'client_tool_execution',
|
||||
reason: pauseReason,
|
||||
type: 'interrupted',
|
||||
},
|
||||
],
|
||||
|
||||
@@ -4,7 +4,12 @@ import type {
|
||||
AgentState,
|
||||
GeneralAgentConfig,
|
||||
} from '@lobechat/agent-runtime';
|
||||
import { AgentRuntime, findInMessages, GeneralChatAgent } from '@lobechat/agent-runtime';
|
||||
import {
|
||||
AgentRuntime,
|
||||
findInMessages,
|
||||
GeneralChatAgent,
|
||||
isParkedStatus,
|
||||
} from '@lobechat/agent-runtime';
|
||||
import type { ISnapshotStore } from '@lobechat/agent-tracing';
|
||||
import { dynamicInterventionAudits } from '@lobechat/builtin-tools/dynamicInterventionAudits';
|
||||
import { getModelPropertyWithFallback } from '@lobechat/model-runtime';
|
||||
@@ -1183,7 +1188,7 @@ export class AgentRuntimeService {
|
||||
},
|
||||
executionHistory: executionHistory?.slice(0, historyLimit),
|
||||
hasError: currentState.status === 'error',
|
||||
isActive: ['running', 'waiting_for_human'].includes(currentState.status),
|
||||
isActive: currentState.status === 'running' || isParkedStatus(currentState.status),
|
||||
isCompleted: currentState.status === 'done',
|
||||
metadata: operationMetadata,
|
||||
needsHumanInput: currentState.status === 'waiting_for_human',
|
||||
@@ -1585,6 +1590,9 @@ export class AgentRuntimeService {
|
||||
// Needs human intervention
|
||||
if (state.status === 'waiting_for_human') return false;
|
||||
|
||||
// Parked waiting for an async tool result (client tool / sub-agent)
|
||||
if (state.status === 'waiting_for_async_tool') return false;
|
||||
|
||||
// Error occurred
|
||||
if (state.status === 'error') return false;
|
||||
|
||||
@@ -1649,6 +1657,7 @@ export class AgentRuntimeService {
|
||||
if (state.status === 'error') return 'error';
|
||||
if (state.status === 'interrupted') return 'interrupted';
|
||||
if (state.status === 'waiting_for_human') return 'waiting_for_human';
|
||||
if (state.status === 'waiting_for_async_tool') return 'waiting_for_async_tool';
|
||||
if (state.maxSteps && state.stepCount >= state.maxSteps) return 'max_steps';
|
||||
if (state.costLimit && state.cost?.total >= state.costLimit.maxTotalCost) return 'cost_limit';
|
||||
return 'done';
|
||||
@@ -1722,9 +1731,11 @@ export class AgentRuntimeService {
|
||||
break;
|
||||
}
|
||||
|
||||
// Check if human intervention is needed
|
||||
if (state.status === 'waiting_for_human') {
|
||||
log('[%s] Sync execution paused: waiting for human intervention', operationId);
|
||||
// Parked on a pause (human intervention or an async tool / sub-agent
|
||||
// result) — the result is delivered out-of-band, so sync execution
|
||||
// can't resume it
|
||||
if (isParkedStatus(state.status)) {
|
||||
log('[%s] Sync execution paused: %s', operationId, state.status);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import { isParkedStatus } from '@lobechat/agent-runtime';
|
||||
import debug from 'debug';
|
||||
|
||||
import {
|
||||
@@ -65,10 +66,12 @@ export class CompletionLifecycle {
|
||||
|
||||
/**
|
||||
* Map a completion reason to the terminal `agent_operations.status` value.
|
||||
* `waiting_for_human` keeps `status='waiting_for_human'` so analytics can
|
||||
* distinguish paused ops from terminal ones.
|
||||
* `waiting_for_human` / `waiting_for_async_tool` keep their own status so
|
||||
* analytics can distinguish paused ops from terminal ones.
|
||||
*/
|
||||
private statusForReason(reason: string): 'done' | 'error' | 'interrupted' | 'waiting_for_human' {
|
||||
private statusForReason(
|
||||
reason: string,
|
||||
): 'done' | 'error' | 'interrupted' | 'waiting_for_human' | 'waiting_for_async_tool' {
|
||||
switch (reason) {
|
||||
case 'error': {
|
||||
return 'error';
|
||||
@@ -79,6 +82,9 @@ export class CompletionLifecycle {
|
||||
case 'waiting_for_human': {
|
||||
return 'waiting_for_human';
|
||||
}
|
||||
case 'waiting_for_async_tool': {
|
||||
return 'waiting_for_async_tool';
|
||||
}
|
||||
default: {
|
||||
return 'done';
|
||||
}
|
||||
@@ -92,7 +98,10 @@ export class CompletionLifecycle {
|
||||
*/
|
||||
private async persistCompletion(operationId: string, state: any, reason: string): Promise<void> {
|
||||
const completionReason: any =
|
||||
reason === 'max_steps' || reason === 'cost_limit' || reason === 'waiting_for_human'
|
||||
reason === 'max_steps' ||
|
||||
reason === 'cost_limit' ||
|
||||
reason === 'waiting_for_human' ||
|
||||
reason === 'waiting_for_async_tool'
|
||||
? reason
|
||||
: this.statusForReason(reason);
|
||||
|
||||
@@ -107,11 +116,10 @@ export class CompletionLifecycle {
|
||||
: null;
|
||||
|
||||
const status = this.statusForReason(reason);
|
||||
// `waiting_for_human` is a pause, not a true terminal state — leave
|
||||
// completedAt null so analytics doesn't read a paused op as completed.
|
||||
// The next dispatchHooks call (when the human resumes and the op truly
|
||||
// ends) will overwrite both fields.
|
||||
const completedAt = status === 'waiting_for_human' ? undefined : new Date();
|
||||
// Parked statuses are pauses, not true terminal states — leave completedAt
|
||||
// null so analytics doesn't read a paused op as completed. The next
|
||||
// dispatchHooks call (when the op resumes and truly ends) overwrites both.
|
||||
const completedAt = isParkedStatus(status) ? undefined : new Date();
|
||||
|
||||
try {
|
||||
await this.agentOperationModel.recordCompletion(operationId, {
|
||||
|
||||
@@ -112,7 +112,8 @@ export type StepCompletionReason =
|
||||
| 'interrupted'
|
||||
| 'max_steps'
|
||||
| 'cost_limit'
|
||||
| 'waiting_for_human';
|
||||
| 'waiting_for_human'
|
||||
| 'waiting_for_async_tool';
|
||||
|
||||
// ==================== Execution Params ====================
|
||||
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import { type LobeToolManifest } from '@lobechat/context-engine';
|
||||
import { type LobeChatDatabase } from '@lobechat/database';
|
||||
import { type ChatToolPayload, type ClientSecretPayload } from '@lobechat/types';
|
||||
import {
|
||||
type ChatToolPayload,
|
||||
type ClientSecretPayload,
|
||||
type ExecSubAgentTaskParams,
|
||||
} from '@lobechat/types';
|
||||
|
||||
export interface ToolExecutionMemoryEmbeddingRuntime {
|
||||
/** Embedding model id used by the memory search runtime. */
|
||||
@@ -18,6 +22,12 @@ export interface ToolExecutionContext {
|
||||
agentId?: string;
|
||||
/** Current page document ID for page-scoped conversations */
|
||||
documentId?: string | null;
|
||||
/**
|
||||
* Spawn a sub-agent as an independent async operation. Injected by the agent
|
||||
* runtime (forwarded from `RuntimeExecutorContext.execSubAgentTask`) so the
|
||||
* `callSubAgent` server tool can fork a child op without a circular import.
|
||||
*/
|
||||
execSubAgentTask?: (params: ExecSubAgentTaskParams) => Promise<unknown>;
|
||||
/** Per-call execution timeout resolved by the agent runtime. */
|
||||
executionTimeoutMs?: number;
|
||||
/** Current group ID for group chat context */
|
||||
@@ -65,6 +75,12 @@ export interface ToolExecutionContext {
|
||||
|
||||
export interface ToolExecutionResult {
|
||||
content: string;
|
||||
/**
|
||||
* When true, the result is delivered out-of-band later (e.g. an async
|
||||
* sub-agent). The agent runtime parks the operation instead of writing a
|
||||
* tool_result. Mirrors the client-tool pause path.
|
||||
*/
|
||||
deferred?: boolean;
|
||||
error?: any;
|
||||
state?: Record<string, any>;
|
||||
success: boolean;
|
||||
|
||||
@@ -551,10 +551,14 @@ export const createGatewayEventHandler = (
|
||||
action: 'gateway/agent_runtime_end',
|
||||
context,
|
||||
});
|
||||
} else if (data?.reason === 'interrupted' && hasStreamedContent) {
|
||||
// MID-stream cancel. The server's
|
||||
} else if (
|
||||
(data?.reason === 'interrupted' || data?.reason === 'waiting_for_async_tool') &&
|
||||
hasStreamedContent
|
||||
) {
|
||||
// MID-stream cancel, or a deferred-tool pause
|
||||
// (`waiting_for_async_tool`). The server's
|
||||
// `AgentRuntimeCoordinator.resolveUiMessages` omits uiMessages
|
||||
// for status='interrupted' precisely so we can preserve the
|
||||
// for both statuses precisely so we can preserve the
|
||||
// in-memory streamed content here. The executor's partial-
|
||||
// finalize catch writes the real content to DB asynchronously,
|
||||
// but it may not be durable yet — refetching here would race
|
||||
|
||||
Reference in New Issue
Block a user