Compare commits

...

6 Commits

Author SHA1 Message Date
Arvin Xu d450efd3b2 feat(agent-runtime): park call_tools_batch on deferred tools
Mirror the call_tool deferred-park on the parallel path: deferred (async)
tools are collected during the concurrent batch and, once server tools
settle, the operation parks (waiting_for_async_tool + pendingToolsCalling)
alongside any client tools — so K parallel sub-agents in one round all
resolve before the parent resumes.

Part of the server sub-agent suspend/resume mechanism (LOBE-9763).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 20:37:03 +08:00
Arvin Xu ab2e95ad7d feat(agent-runtime): add deferred-tool park infrastructure
Introduce a generic `deferred` result flag (BuiltinServerRuntimeOutput /
ToolExecutionResult). When a tool returns deferred, call_tool parks the
operation (waiting_for_async_tool + pendingToolsCalling) without writing a
tool_result — mirroring the client-tool pause — so the result can be
delivered out-of-band later by a completion bridge. Thread the existing
execSubAgentTask DI seam into ToolExecutionContext so async tools can spawn
a child op without a circular import.

Part of the server sub-agent suspend/resume mechanism (LOBE-9763).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 20:33:20 +08:00
Arvin Xu 9ffd9d39e3 Revert "♻️ refactor(aiAgent): rename execSubAgentTask -> execSubAgent"
This reverts commit f1ea407d74.
2026-05-28 20:26:28 +08:00
Arvin Xu f1ea407d74 ♻️ refactor(aiAgent): rename execSubAgentTask -> execSubAgent
Full rename of the service method, its `ExecSubAgentTaskParams`/`ExecSubAgentTaskResult`
types, the tRPC endpoint, the injected `RuntimeExecutorContext`/`AgentRuntimeServiceOptions`
callback, and tests. Group-mode `execGroupSubAgent*` identifiers are intentionally left
untouched. Prep for the server sub-agent suspend/resume work (LOBE-9763).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 20:11:59 +08:00
Arvin Xu 98d77166cc ♻️ refactor(agent-runtime): extract isParkedStatus / isBlockedStatus predicates
Replace the repeated `status === 'waiting_for_human' || ... === 'waiting_for_async_tool' || ... === 'interrupted'`
chains with named predicates so the parked/blocked semantics live in one place
(runtime step-loop break, completion lifecycle completedAt, executeSync pause,
operation isActive).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 18:01:56 +08:00
Arvin Xu 9821328d43 feat(agent-runtime): add waiting_for_async_tool parked state for deferred tools
Add a dedicated `waiting_for_async_tool` operation status that mirrors
`waiting_for_human` as a non-terminal, resumable pause, and migrate the
client-tool execution pause off `interrupted` onto it — so `interrupted`
once again means only user-initiated cancellation.

Also add the AgentOperationModel primitives the upcoming server sub-agent
bridge needs: queryByParentOperationId (reconcile child ops) and
tryResumeFromAsyncTool (atomic single-fire CAS).

Foundation for the server sub-agent suspend/resume mechanism (LOBE-9763).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-28 17:22:47 +08:00
16 changed files with 281 additions and 43 deletions
+6 -7
View File
@@ -17,6 +17,7 @@ import type {
ToolsCalling,
Usage,
} from '../types';
import { isBlockedStatus } from '../utils/status';
/**
* Simplified Agent Runtime - The "Engine" that executes instructions from an "Agent" (Brain).
@@ -197,7 +198,7 @@ export class AgentRuntime {
}
// Stop execution if blocked
if (currentState.status === 'waiting_for_human' || currentState.status === 'interrupted') {
if (isBlockedStatus(currentState.status)) {
break;
}
}
@@ -214,12 +215,10 @@ export class AgentRuntime {
return {
events: allEvents,
newState: currentState,
// When execution is blocked (waiting for human or interrupted),
// clear nextContext so the outer loop stops instead of continuing
nextContext:
currentState.status === 'waiting_for_human' || currentState.status === 'interrupted'
? undefined
: finalNextContext,
// When execution is blocked (waiting for human, waiting for an async
// tool result, or interrupted), clear nextContext so the outer loop
// stops instead of continuing
nextContext: isBlockedStatus(currentState.status) ? undefined : finalNextContext,
};
} catch (error) {
const errorState = structuredClone(state);
+8 -1
View File
@@ -113,7 +113,14 @@ export interface AgentState {
*/
securityBlacklist?: SecurityBlacklistConfig;
// --- State Machine ---
status: 'idle' | 'running' | 'waiting_for_human' | 'done' | 'error' | 'interrupted';
status:
| 'idle'
| 'running'
| 'waiting_for_human'
| 'waiting_for_async_tool'
| 'done'
| 'error'
| 'interrupted';
// --- Execution Tracking ---
/**
@@ -1,3 +1,4 @@
export * from './messageSelectors';
export * from './status';
export * from './stepContextComputer';
export * from './tokenCounter';
@@ -0,0 +1,43 @@
import { describe, expect, it } from 'vitest';
import type { AgentState } from '../types/state';
import { isBlockedStatus, isParkedStatus } from './status';
const ALL_STATUSES: AgentState['status'][] = [
'idle',
'running',
'waiting_for_human',
'waiting_for_async_tool',
'done',
'error',
'interrupted',
];
describe('isParkedStatus', () => {
it('is true only for the non-terminal resumable pauses', () => {
expect(isParkedStatus('waiting_for_human')).toBe(true);
expect(isParkedStatus('waiting_for_async_tool')).toBe(true);
});
it('is false for running, terminal, and interrupted', () => {
const nonParked = ALL_STATUSES.filter(
(s) => s !== 'waiting_for_human' && s !== 'waiting_for_async_tool',
);
for (const status of nonParked) expect(isParkedStatus(status)).toBe(false);
});
});
describe('isBlockedStatus', () => {
it('is true for parked statuses and user interrupt', () => {
expect(isBlockedStatus('waiting_for_human')).toBe(true);
expect(isBlockedStatus('waiting_for_async_tool')).toBe(true);
expect(isBlockedStatus('interrupted')).toBe(true);
});
it('is false for idle, running, and terminal statuses', () => {
expect(isBlockedStatus('idle')).toBe(false);
expect(isBlockedStatus('running')).toBe(false);
expect(isBlockedStatus('done')).toBe(false);
expect(isBlockedStatus('error')).toBe(false);
});
});
@@ -0,0 +1,19 @@
import type { AgentState } from '../types/state';
/**
* Parked statuses are non-terminal, resumable pauses: the operation is still
* alive but waiting on something out-of-band — human approval
* (`waiting_for_human`) or an async tool / sub-agent result
* (`waiting_for_async_tool`). They are deliberately distinct from `interrupted`
* (user cancel) and the terminal `done` / `error`, so the completion lifecycle
* never stamps `completedAt` and the scheduler keeps treating them as active.
*/
export const isParkedStatus = (status: AgentState['status']): boolean =>
status === 'waiting_for_human' || status === 'waiting_for_async_tool';
/**
* Blocked statuses halt the step loop — a parked pause or a user interrupt.
* `done` / `error` terminate through their own handling.
*/
export const isBlockedStatus = (status: AgentState['status']): boolean =>
isParkedStatus(status) || status === 'interrupted';
+52 -2
View File
@@ -34,13 +34,20 @@ export interface RecordOperationCompletionParams {
| 'interrupted'
| 'max_steps'
| 'cost_limit'
| 'waiting_for_human';
| 'waiting_for_human'
| 'waiting_for_async_tool';
cost?: Record<string, unknown> | null;
error?: AgentOperationError | null;
interruption?: AgentOperationInterruption | null;
llmCalls?: number | null;
processingTimeMs?: number | null;
status: 'running' | 'waiting_for_human' | 'done' | 'error' | 'interrupted';
status:
| 'running'
| 'waiting_for_human'
| 'waiting_for_async_tool'
| 'done'
| 'error'
| 'interrupted';
stepCount?: number | null;
toolCalls?: number | null;
totalCost?: number | null;
@@ -135,4 +142,47 @@ export class AgentOperationModel {
.limit(1);
return row ?? null;
}
/**
* Return every sub-operation spawned by a parent op. Used by the
* sub-agent completion bridge to reconcile which deferred tool calls have
* reached a terminal state when deciding whether to resume the parent.
*/
async queryByParentOperationId(parentOperationId: string) {
return this.db
.select({
completionReason: agentOperations.completionReason,
id: agentOperations.id,
metadata: agentOperations.metadata,
status: agentOperations.status,
})
.from(agentOperations)
.where(
and(
eq(agentOperations.parentOperationId, parentOperationId),
eq(agentOperations.userId, this.userId),
),
);
}
/**
* Atomically flip a parked parent op from `waiting_for_async_tool` back to
* `running`. Returns true only for the single winner (affected === 1) so
* concurrent sub-op completions that lose the race no-op instead of
* double-resuming the parent.
*/
async tryResumeFromAsyncTool(operationId: string): Promise<boolean> {
const rows = await this.db
.update(agentOperations)
.set({ status: 'running' })
.where(
and(
eq(agentOperations.id, operationId),
eq(agentOperations.userId, this.userId),
eq(agentOperations.status, 'waiting_for_async_tool'),
),
)
.returning({ id: agentOperations.id });
return rows.length === 1;
}
}
@@ -10,6 +10,7 @@ const operationStatuses = [
'idle',
'running',
'waiting_for_human',
'waiting_for_async_tool',
'done',
'error',
'interrupted',
@@ -22,6 +23,7 @@ const completionReasons = [
'max_steps',
'cost_limit',
'waiting_for_human',
'waiting_for_async_tool',
] as const;
export interface AgentOperationInterruption {
@@ -322,7 +322,7 @@ export class ResponsesService extends BaseService {
const usage = this.extractUsage(finalState);
const isClientToolInterrupt =
finalState.status === 'interrupted' &&
finalState.status === 'waiting_for_async_tool' &&
finalState.interruption?.reason === 'client_tool_execution';
return this.buildResponseObject({
@@ -753,7 +753,7 @@ export class ResponsesService extends BaseService {
// Determine if agent was interrupted for client tool execution
const isClientToolInterrupt =
finalState?.status === 'interrupted' &&
finalState?.status === 'waiting_for_async_tool' &&
finalState?.interruption?.reason === 'client_tool_execution';
if (isClientToolInterrupt) {
+7
View File
@@ -326,6 +326,13 @@ export type BuiltinStreaming = <A = any>(props: BuiltinStreamingProps<A>) => Rea
export interface BuiltinServerRuntimeOutput {
content: string;
/**
* When true, the tool executed a side-effect but its result is delivered
* out-of-band later (e.g. an async sub-agent). The agent runtime parks the
* operation instead of writing a tool_result, mirroring the client-tool
* pause path. The deferred result is filled in by a completion bridge.
*/
deferred?: boolean;
error?: any;
state?: any;
success: boolean;
@@ -23,12 +23,18 @@ const STREAM_END_STATUSES = new Set<AgentState['status']>([
'error',
'interrupted',
'waiting_for_human',
'waiting_for_async_tool',
]);
const hasEnteredStreamEndState = (
previousStatus?: AgentState['status'],
nextStatus?: AgentState['status'],
): nextStatus is 'done' | 'error' | 'interrupted' | 'waiting_for_human' => {
): nextStatus is
| 'done'
| 'error'
| 'interrupted'
| 'waiting_for_human'
| 'waiting_for_async_tool' => {
const wasStreamEnd = previousStatus ? STREAM_END_STATUSES.has(previousStatus) : false;
return Boolean(nextStatus && STREAM_END_STATUSES.has(nextStatus) && !wasStreamEnd);
};
@@ -127,7 +133,11 @@ export class AgentRuntimeCoordinator {
*/
private async resolveUiMessages(state: AgentState): Promise<UIChatMessage[] | undefined> {
if (!this.uiMessagesResolver) return undefined;
if (state.status === 'interrupted') return undefined;
// `waiting_for_async_tool` is the deferred-tool pause (client tool or
// sub-agent): the result message is written back later, so don't push a
// premature SoT snapshot that would clobber the client's in-memory state.
if (state.status === 'interrupted' || state.status === 'waiting_for_async_tool')
return undefined;
try {
return await this.uiMessagesResolver(state);
} catch (error) {
@@ -1799,7 +1799,7 @@ export const createRuntimeExecutors = (
const newState = structuredClone(state);
newState.lastModified = new Date().toISOString();
newState.status = 'interrupted';
newState.status = 'waiting_for_async_tool';
newState.interruption = {
canResume: true,
interruptedAt: new Date().toISOString(),
@@ -1937,6 +1937,7 @@ export const createRuntimeExecutors = (
activeDeviceId: state.metadata?.activeDeviceId,
agentId: state.metadata?.agentId,
documentId: state.metadata?.documentId,
execSubAgentTask: ctx.execSubAgentTask,
executionTimeoutMs: timeoutMs,
groupId: state.metadata?.groupId,
memoryToolPermission: agentConfig?.chatConfig?.memory?.toolPermission,
@@ -1971,6 +1972,42 @@ export const createRuntimeExecutors = (
);
}
// Deferred tool (e.g. async sub-agent): the executor performed its
// side-effect and created a pending placeholder; the real result is
// delivered out-of-band later by a completion bridge. Park like a
// client tool — surface the pending call, hold it in pendingToolsCalling,
// and do not write a tool_result now.
if (execution.result.deferred) {
log(`[${operationLogId}] Tool ${toolName} deferred; parking for async result`);
await streamManager.publishStreamChunk(operationId, stepIndex, {
chunkType: 'tools_calling',
toolsCalling: [chatToolPayload] as any,
});
executeToolSpan.setAttributes(
buildExecuteToolResultAttributes({ attempts: execution.attempts, success: true }),
);
const newState = structuredClone(state);
newState.lastModified = new Date().toISOString();
newState.status = 'waiting_for_async_tool';
newState.interruption = {
canResume: true,
interruptedAt: new Date().toISOString(),
reason: 'async_tool',
};
newState.pendingToolsCalling = [chatToolPayload];
return {
events: [
{
canResume: true,
interruptedAt: new Date().toISOString(),
reason: 'async_tool',
type: 'interrupted',
},
],
newState,
};
}
const executionResult = await archiveRuntimeToolResult(execution.result, {
agentId: state.metadata?.agentId,
identifier: chatToolPayload.identifier,
@@ -2293,7 +2330,7 @@ export const createRuntimeExecutors = (
const newState = structuredClone(state);
newState.lastModified = new Date().toISOString();
newState.status = 'interrupted';
newState.status = 'waiting_for_async_tool';
newState.interruption = {
canResume: true,
interruptedAt: new Date().toISOString(),
@@ -2317,6 +2354,9 @@ export const createRuntimeExecutors = (
// Track all tool message IDs created during execution
const toolMessageIds: string[] = [];
const toolResults: any[] = [];
// Deferred (async) tools whose result is delivered out-of-band later;
// collected here so the batch parks for them after server tools finish.
const deferredTools: ChatToolPayload[] = [];
// Execute server tools concurrently (skip client tools in mixed batch)
const toolsToExecute = serverTools.length > 0 ? serverTools : toolsCalling;
@@ -2469,6 +2509,7 @@ export const createRuntimeExecutors = (
activeDeviceId: state.metadata?.activeDeviceId,
agentId: state.metadata?.agentId,
documentId: state.metadata?.documentId,
execSubAgentTask: ctx.execSubAgentTask,
executionTimeoutMs: timeoutMs,
groupId: state.metadata?.groupId,
memoryToolPermission: batchAgentConfig?.chatConfig?.memory?.toolPermission,
@@ -2494,6 +2535,18 @@ export const createRuntimeExecutors = (
);
}
// Deferred (async) tool: executor created a pending placeholder and
// the real result arrives out-of-band. Skip the tool_result write;
// the batch parks for it after all server tools settle.
if (execution.result.deferred) {
log(`[${operationLogId}] Tool ${toolName} deferred; will park after batch`);
deferredTools.push(chatToolPayload);
batchExecuteToolSpan.setAttributes(
buildExecuteToolResultAttributes({ attempts: execution.attempts, success: true }),
);
return;
}
const executionResult = await archiveRuntimeToolResult(execution.result, {
agentId: state.metadata?.agentId,
identifier: chatToolPayload.identifier,
@@ -2748,24 +2801,31 @@ export const createRuntimeExecutors = (
// Get the last tool message ID as parentMessageId for next LLM call
const lastToolMessageId = toolMessageIds.at(-1);
// If there are remaining client tools in a mixed batch, interrupt after server tools
if (clientTools.length > 0) {
// Park if any tools still owe an out-of-band result: client tools (run on
// the client) and/or deferred async tools (e.g. sub-agents). The operation
// resumes once every pending tool's result is delivered.
const pendingTools = [...deferredTools, ...clientTools];
if (pendingTools.length > 0) {
// Prefer the async-tool reason when any deferred tool is present; the
// individual pending payloads still carry their own identity for the
// resume gate.
const pauseReason = deferredTools.length > 0 ? 'async_tool' : 'client_tool_execution';
log(
`[${operationLogId}][call_tools_batch] Mixed batch: ${serverTools.length} server tools done, pausing for ${clientTools.length} client tools`,
`[${operationLogId}][call_tools_batch] Pausing after ${serverTools.length} server tools: ${deferredTools.length} deferred + ${clientTools.length} client`,
);
await streamManager.publishStreamChunk(operationId, stepIndex, {
chunkType: 'tools_calling',
toolsCalling: clientTools as any,
toolsCalling: pendingTools as any,
});
newState.status = 'interrupted';
newState.status = 'waiting_for_async_tool';
newState.interruption = {
canResume: true,
interruptedAt: new Date().toISOString(),
reason: 'client_tool_execution',
reason: pauseReason,
};
newState.pendingToolsCalling = clientTools;
newState.pendingToolsCalling = pendingTools;
return {
events: [
@@ -2773,7 +2833,7 @@ export const createRuntimeExecutors = (
{
canResume: true,
interruptedAt: new Date().toISOString(),
reason: 'client_tool_execution',
reason: pauseReason,
type: 'interrupted',
},
],
@@ -4,7 +4,12 @@ import type {
AgentState,
GeneralAgentConfig,
} from '@lobechat/agent-runtime';
import { AgentRuntime, findInMessages, GeneralChatAgent } from '@lobechat/agent-runtime';
import {
AgentRuntime,
findInMessages,
GeneralChatAgent,
isParkedStatus,
} from '@lobechat/agent-runtime';
import type { ISnapshotStore } from '@lobechat/agent-tracing';
import { dynamicInterventionAudits } from '@lobechat/builtin-tools/dynamicInterventionAudits';
import { getModelPropertyWithFallback } from '@lobechat/model-runtime';
@@ -1183,7 +1188,7 @@ export class AgentRuntimeService {
},
executionHistory: executionHistory?.slice(0, historyLimit),
hasError: currentState.status === 'error',
isActive: ['running', 'waiting_for_human'].includes(currentState.status),
isActive: currentState.status === 'running' || isParkedStatus(currentState.status),
isCompleted: currentState.status === 'done',
metadata: operationMetadata,
needsHumanInput: currentState.status === 'waiting_for_human',
@@ -1585,6 +1590,9 @@ export class AgentRuntimeService {
// Needs human intervention
if (state.status === 'waiting_for_human') return false;
// Parked waiting for an async tool result (client tool / sub-agent)
if (state.status === 'waiting_for_async_tool') return false;
// Error occurred
if (state.status === 'error') return false;
@@ -1649,6 +1657,7 @@ export class AgentRuntimeService {
if (state.status === 'error') return 'error';
if (state.status === 'interrupted') return 'interrupted';
if (state.status === 'waiting_for_human') return 'waiting_for_human';
if (state.status === 'waiting_for_async_tool') return 'waiting_for_async_tool';
if (state.maxSteps && state.stepCount >= state.maxSteps) return 'max_steps';
if (state.costLimit && state.cost?.total >= state.costLimit.maxTotalCost) return 'cost_limit';
return 'done';
@@ -1722,9 +1731,11 @@ export class AgentRuntimeService {
break;
}
// Check if human intervention is needed
if (state.status === 'waiting_for_human') {
log('[%s] Sync execution paused: waiting for human intervention', operationId);
// Parked on a pause (human intervention or an async tool / sub-agent
// result) — the result is delivered out-of-band, so sync execution
// can't resume it
if (isParkedStatus(state.status)) {
log('[%s] Sync execution paused: %s', operationId, state.status);
break;
}
@@ -1,3 +1,4 @@
import { isParkedStatus } from '@lobechat/agent-runtime';
import debug from 'debug';
import {
@@ -65,10 +66,12 @@ export class CompletionLifecycle {
/**
* Map a completion reason to the terminal `agent_operations.status` value.
* `waiting_for_human` keeps `status='waiting_for_human'` so analytics can
* distinguish paused ops from terminal ones.
* `waiting_for_human` / `waiting_for_async_tool` keep their own status so
* analytics can distinguish paused ops from terminal ones.
*/
private statusForReason(reason: string): 'done' | 'error' | 'interrupted' | 'waiting_for_human' {
private statusForReason(
reason: string,
): 'done' | 'error' | 'interrupted' | 'waiting_for_human' | 'waiting_for_async_tool' {
switch (reason) {
case 'error': {
return 'error';
@@ -79,6 +82,9 @@ export class CompletionLifecycle {
case 'waiting_for_human': {
return 'waiting_for_human';
}
case 'waiting_for_async_tool': {
return 'waiting_for_async_tool';
}
default: {
return 'done';
}
@@ -92,7 +98,10 @@ export class CompletionLifecycle {
*/
private async persistCompletion(operationId: string, state: any, reason: string): Promise<void> {
const completionReason: any =
reason === 'max_steps' || reason === 'cost_limit' || reason === 'waiting_for_human'
reason === 'max_steps' ||
reason === 'cost_limit' ||
reason === 'waiting_for_human' ||
reason === 'waiting_for_async_tool'
? reason
: this.statusForReason(reason);
@@ -107,11 +116,10 @@ export class CompletionLifecycle {
: null;
const status = this.statusForReason(reason);
// `waiting_for_human` is a pause, not a true terminal state — leave
// completedAt null so analytics doesn't read a paused op as completed.
// The next dispatchHooks call (when the human resumes and the op truly
// ends) will overwrite both fields.
const completedAt = status === 'waiting_for_human' ? undefined : new Date();
// Parked statuses are pauses, not true terminal states — leave completedAt
// null so analytics doesn't read a paused op as completed. The next
// dispatchHooks call (when the op resumes and truly ends) overwrites both.
const completedAt = isParkedStatus(status) ? undefined : new Date();
try {
await this.agentOperationModel.recordCompletion(operationId, {
+2 -1
View File
@@ -112,7 +112,8 @@ export type StepCompletionReason =
| 'interrupted'
| 'max_steps'
| 'cost_limit'
| 'waiting_for_human';
| 'waiting_for_human'
| 'waiting_for_async_tool';
// ==================== Execution Params ====================
+17 -1
View File
@@ -1,6 +1,10 @@
import { type LobeToolManifest } from '@lobechat/context-engine';
import { type LobeChatDatabase } from '@lobechat/database';
import { type ChatToolPayload, type ClientSecretPayload } from '@lobechat/types';
import {
type ChatToolPayload,
type ClientSecretPayload,
type ExecSubAgentTaskParams,
} from '@lobechat/types';
export interface ToolExecutionMemoryEmbeddingRuntime {
/** Embedding model id used by the memory search runtime. */
@@ -18,6 +22,12 @@ export interface ToolExecutionContext {
agentId?: string;
/** Current page document ID for page-scoped conversations */
documentId?: string | null;
/**
* Spawn a sub-agent as an independent async operation. Injected by the agent
* runtime (forwarded from `RuntimeExecutorContext.execSubAgentTask`) so the
* `callSubAgent` server tool can fork a child op without a circular import.
*/
execSubAgentTask?: (params: ExecSubAgentTaskParams) => Promise<unknown>;
/** Per-call execution timeout resolved by the agent runtime. */
executionTimeoutMs?: number;
/** Current group ID for group chat context */
@@ -65,6 +75,12 @@ export interface ToolExecutionContext {
export interface ToolExecutionResult {
content: string;
/**
* When true, the result is delivered out-of-band later (e.g. an async
* sub-agent). The agent runtime parks the operation instead of writing a
* tool_result. Mirrors the client-tool pause path.
*/
deferred?: boolean;
error?: any;
state?: Record<string, any>;
success: boolean;
@@ -551,10 +551,14 @@ export const createGatewayEventHandler = (
action: 'gateway/agent_runtime_end',
context,
});
} else if (data?.reason === 'interrupted' && hasStreamedContent) {
// MID-stream cancel. The server's
} else if (
(data?.reason === 'interrupted' || data?.reason === 'waiting_for_async_tool') &&
hasStreamedContent
) {
// MID-stream cancel, or a deferred-tool pause
// (`waiting_for_async_tool`). The server's
// `AgentRuntimeCoordinator.resolveUiMessages` omits uiMessages
// for status='interrupted' precisely so we can preserve the
// for both statuses precisely so we can preserve the
// in-memory streamed content here. The executor's partial-
// finalize catch writes the real content to DB asynchronously,
// but it may not be durable yet — refetching here would race