From af79c8a24f2128af89ac45864f39f21975d71153 Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Sat, 13 Jun 2026 21:07:43 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor(chat):=20expose?= =?UTF-8?q?=20unified=20agent=20run=20lifecycle=20dispatcher?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../__tests__/agentRunLifecycle.test.ts | 40 +++++++------ .../aiChat/actions/__tests__/gateway.test.ts | 19 ++++-- .../aiChat/actions/agentRunLifecycle.ts | 59 ++++++++++++++++--- .../aiChat/actions/conversationLifecycle.ts | 8 ++- .../chat/slices/aiChat/actions/gateway.ts | 14 ++--- .../aiChat/actions/gatewayEventHandler.ts | 20 ++++--- .../actions/heterogeneousAgentExecutor.ts | 5 +- .../aiChat/actions/streamingExecutor.ts | 8 ++- 8 files changed, 120 insertions(+), 53 deletions(-) diff --git a/src/store/chat/slices/aiChat/actions/__tests__/agentRunLifecycle.test.ts b/src/store/chat/slices/aiChat/actions/__tests__/agentRunLifecycle.test.ts index b201b18c0f..ed5abf4612 100644 --- a/src/store/chat/slices/aiChat/actions/__tests__/agentRunLifecycle.test.ts +++ b/src/store/chat/slices/aiChat/actions/__tests__/agentRunLifecycle.test.ts @@ -4,12 +4,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'; import { topicService } from '@/services/topic'; import { emitClientAgentSignalSourceEvent } from '@/store/chat/slices/aiChat/actions/agentSignalBridge'; -import { - completeAgentRunLifecycle, - completeAgentRunOperationLifecycle, - runAgentRunEventLifecycle, - startAgentRunLifecycle, -} from '../agentRunLifecycle'; +import { runAgentRunLifecycle } from '../agentRunLifecycle'; const desktopEnv = vi.hoisted(() => ({ isDesktop: false })); const desktopNotificationMock = vi.hoisted(() => ({ @@ -102,7 +97,7 @@ const createStore = (overrides: Record = {}) => { } as any; }; -describe('completeAgentRunLifecycle', () => { +describe('runAgentRunLifecycle', () => { beforeEach(() => { vi.clearAllMocks(); desktopEnv.isDesktop = false; @@ -114,7 +109,7 @@ describe('completeAgentRunLifecycle', () => { const store = createStore(); const get = vi.fn(() => store); - const result = await completeAgentRunLifecycle({ + const result = await runAgentRunLifecycle({ afterRunComplete: [ () => { store.events.push('afterRunComplete'); @@ -130,6 +125,7 @@ describe('completeAgentRunLifecycle', () => { context: { agentId: 'agent-1', topicId: 'topic-1' } as any, get, operationId: 'op-1', + phase: 'runComplete', queueDrainDelayMs: 0, runtimeType: 'client', status: 'completed', @@ -173,7 +169,7 @@ describe('completeAgentRunLifecycle', () => { const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {}); const store = createStore(); - await completeAgentRunLifecycle({ + await runAgentRunLifecycle({ beforeRunComplete: [ async () => { throw new Error('metadata write failed'); @@ -182,6 +178,7 @@ describe('completeAgentRunLifecycle', () => { context: { agentId: 'agent-1', topicId: 'topic-1' } as any, get: () => store, operationId: 'op-1', + phase: 'runComplete', queueDrainDelayMs: 0, runtimeType: 'heterogeneous', status: 'completed', @@ -201,18 +198,20 @@ describe('completeAgentRunLifecycle', () => { it('does not drain queued messages for failed or cancelled terminal states', async () => { const store = createStore(); - await completeAgentRunLifecycle({ + await runAgentRunLifecycle({ context: { agentId: 'agent-1', topicId: 'topic-1' } as any, get: () => store, operationId: 'op-1', + phase: 'runComplete', runtimeType: 'gateway', status: 'failed', }); - await completeAgentRunLifecycle({ + await runAgentRunLifecycle({ context: { agentId: 'agent-1', topicId: 'topic-1' } as any, get: () => store, operationId: 'op-1', + phase: 'runComplete', runtimeType: 'gateway', status: 'cancelled', }); @@ -221,17 +220,19 @@ describe('completeAgentRunLifecycle', () => { expect(store.drainQueuedMessages).not.toHaveBeenCalled(); }); - it('emits client runtime start from the start lifecycle only for client runs', () => { - startAgentRunLifecycle({ + it('emits client runtime start from the start lifecycle only for client runs', async () => { + await runAgentRunLifecycle({ context: { agentId: 'agent-1', threadId: 'thread-1', topicId: 'topic-1' } as any, operationId: 'op-1', parentMessageId: 'user-1', parentMessageType: 'user', + phase: 'runStart', runtimeType: 'client', }); - startAgentRunLifecycle({ + await runAgentRunLifecycle({ context: { agentId: 'agent-1', topicId: 'topic-1' } as any, operationId: 'op-2', + phase: 'runStart', runtimeType: 'gateway', }); @@ -249,13 +250,14 @@ describe('completeAgentRunLifecycle', () => { ); }); - it('emits gateway event signals from the event lifecycle', () => { - runAgentRunEventLifecycle({ + it('emits gateway event signals from the event lifecycle', async () => { + await runAgentRunLifecycle({ anchorMessageId: 'asst-1', assistantMessageId: 'asst-1', context: { agentId: 'agent-1', topicId: 'topic-1' } as any, eventType: 'stream_start', operationId: 'op-1', + phase: 'runEvent', runtimeType: 'gateway', stepIndex: 2, }); @@ -278,11 +280,12 @@ describe('completeAgentRunLifecycle', () => { const onComplete = vi.fn(); const store = createStore(); - await completeAgentRunOperationLifecycle({ + await runAgentRunLifecycle({ context: { agentId: 'agent-1', groupId: 'group-1', topicId: 'topic-1' } as any, get: () => store, onComplete, operationId: 'op-1', + phase: 'operationComplete', runtimeType: 'gateway', }); @@ -313,12 +316,13 @@ describe('completeAgentRunLifecycle', () => { }, }); - await completeAgentRunLifecycle({ + await runAgentRunLifecycle({ assistantMessageId: 'asst-1', context: { agentId: 'agent-1', topicId: 'topic-1' } as any, drainQueuedMessages: false, get: () => store, operationId: 'op-1', + phase: 'runComplete', runtimeType: 'client', status: 'completed', }); diff --git a/src/store/chat/slices/aiChat/actions/__tests__/gateway.test.ts b/src/store/chat/slices/aiChat/actions/__tests__/gateway.test.ts index f86e61309c..f8ea5c4812 100644 --- a/src/store/chat/slices/aiChat/actions/__tests__/gateway.test.ts +++ b/src/store/chat/slices/aiChat/actions/__tests__/gateway.test.ts @@ -6,17 +6,22 @@ import type * as ConstVersion from '@/const/version'; import { aiAgentService } from '@/services/aiAgent'; import { messageService } from '@/services/message'; import type * as AgentRunLifecycleModule from '@/store/chat/slices/aiChat/actions/agentRunLifecycle'; -import { runAfterUserMessagePersistedLifecycle } from '@/store/chat/slices/aiChat/actions/agentRunLifecycle'; import type { GatewayConnection } from '../gateway'; import { GatewayActionImpl } from '../gateway'; +const postPersistLifecycleMock = vi.hoisted(() => vi.fn().mockResolvedValue(undefined)); + vi.mock('@/store/chat/slices/aiChat/actions/agentRunLifecycle', async (importOriginal) => { const actual = await importOriginal(); return { ...actual, - runAfterUserMessagePersistedLifecycle: vi.fn().mockResolvedValue(undefined), + runAgentRunLifecycle: vi.fn((event: AgentRunLifecycleModule.AgentRunLifecycleEvent) => { + if (event.phase === 'afterUserMessagePersisted') return postPersistLifecycleMock(event); + + return actual.runAgentRunLifecycle(event as any); + }), }; }); @@ -131,7 +136,7 @@ describe('GatewayActionImpl', () => { beforeEach(() => { vi.clearAllMocks(); vi.mocked(messageService.getMessages).mockResolvedValue([]); - vi.mocked(runAfterUserMessagePersistedLifecycle).mockResolvedValue(undefined); + postPersistLifecycleMock.mockResolvedValue(undefined); }); afterEach(() => { @@ -532,12 +537,13 @@ describe('GatewayActionImpl', () => { }); await vi.waitFor(() => { - expect(runAfterUserMessagePersistedLifecycle).toHaveBeenCalledWith( + expect(postPersistLifecycleMock).toHaveBeenCalledWith( expect.objectContaining({ agentId: 'agent-1', assistantMessageId: 'ast-1', isCreateNewTopic: false, messages: [], + phase: 'afterUserMessagePersisted', topicId: 'topic-1', }), ); @@ -555,7 +561,7 @@ describe('GatewayActionImpl', () => { refreshTopic.mockImplementation(async () => { events.push('refreshTopic'); }); - vi.mocked(runAfterUserMessagePersistedLifecycle).mockImplementation(async () => { + postPersistLifecycleMock.mockImplementation(async () => { events.push('postPersistLifecycle'); }); vi.mocked(messageService.getMessages).mockResolvedValueOnce(persistedMessages); @@ -582,12 +588,13 @@ describe('GatewayActionImpl', () => { await vi.waitFor(() => { expect(events).toEqual(['refreshTopic', 'postPersistLifecycle']); }); - expect(runAfterUserMessagePersistedLifecycle).toHaveBeenCalledWith( + expect(postPersistLifecycleMock).toHaveBeenCalledWith( expect.objectContaining({ agentId: 'agent-1', assistantMessageId: 'ast-1', isCreateNewTopic: true, messages: persistedMessages, + phase: 'afterUserMessagePersisted', topicId: 'topic-new', }), ); diff --git a/src/store/chat/slices/aiChat/actions/agentRunLifecycle.ts b/src/store/chat/slices/aiChat/actions/agentRunLifecycle.ts index 85c24b244a..ab20a055e6 100644 --- a/src/store/chat/slices/aiChat/actions/agentRunLifecycle.ts +++ b/src/store/chat/slices/aiChat/actions/agentRunLifecycle.ts @@ -71,7 +71,7 @@ export interface CompleteAgentRunOperationLifecycleParams { runtimeType: AgentRunRuntimeType; } -interface AfterUserMessagePersistedParams { +export interface AfterUserMessagePersistedLifecycleParams { agentId: string; assistantMessageId: string; get: () => ChatStore; @@ -80,6 +80,51 @@ interface AfterUserMessagePersistedParams { topicId?: string | null; } +export type AgentRunLifecycleEvent = + | ({ phase: 'afterUserMessagePersisted' } & AfterUserMessagePersistedLifecycleParams) + | ({ phase: 'operationComplete' } & CompleteAgentRunOperationLifecycleParams) + | ({ phase: 'runComplete' } & CompleteAgentRunLifecycleParams) + | ({ phase: 'runEvent' } & AgentRunEventLifecycleParams) + | ({ phase: 'runStart' } & StartAgentRunLifecycleParams); + +type AgentRunLifecycleNonCompleteEvent = Exclude; + +export function runAgentRunLifecycle( + event: { phase: 'runComplete' } & CompleteAgentRunLifecycleParams, +): Promise; +export function runAgentRunLifecycle(event: AgentRunLifecycleNonCompleteEvent): Promise; +export async function runAgentRunLifecycle( + event: AgentRunLifecycleEvent, +): Promise { + switch (event.phase) { + case 'afterUserMessagePersisted': { + const { phase: _, ...params } = event; + return runAfterUserMessagePersistedLifecycle(params); + } + + case 'operationComplete': { + const { phase: _, ...params } = event; + return completeAgentRunOperationLifecycle(params); + } + + case 'runComplete': { + const { phase: _, ...params } = event; + return completeAgentRunLifecycle(params); + } + + case 'runEvent': { + const { phase: _, ...params } = event; + runAgentRunEventLifecycle(params); + return; + } + + case 'runStart': { + const { phase: _, ...params } = event; + startAgentRunLifecycle(params); + } + } +} + const runCallbacks = async ( phase: string, callbacks: AgentRunLifecycleCallback[] | undefined, @@ -149,7 +194,7 @@ const drainQueuedMessagesAfterComplete = ({ return remainingQueued.length; }; -export const startAgentRunLifecycle = ({ +const startAgentRunLifecycle = ({ context, operationId, parentMessageId, @@ -173,7 +218,7 @@ export const startAgentRunLifecycle = ({ }); }; -export const runAgentRunEventLifecycle = ({ +const runAgentRunEventLifecycle = ({ anchorMessageId, assistantMessageId, context, @@ -307,7 +352,7 @@ const runClientCompleteLifecycle = async ({ } }; -export const completeAgentRunLifecycle = async ({ +const completeAgentRunLifecycle = async ({ afterRunComplete, anchorMessageId, assistantMessageId, @@ -379,7 +424,7 @@ export const completeAgentRunLifecycle = async ({ return { contextKey, queuedMessageCount }; }; -export const completeAgentRunOperationLifecycle = async ({ +const completeAgentRunOperationLifecycle = async ({ context, get, onComplete, @@ -426,14 +471,14 @@ const applyTopicTitle = async ( console.info('[dev] sliced topic title (NEXT_PUBLIC_DEV_DISABLE_AUTO_TOPIC=1):', title); }; -export const runAfterUserMessagePersistedLifecycle = async ({ +const runAfterUserMessagePersistedLifecycle = async ({ agentId, assistantMessageId, get, isCreateNewTopic, messages, topicId, -}: AfterUserMessagePersistedParams): Promise => { +}: AfterUserMessagePersistedLifecycleParams): Promise => { if (!topicId) return; if (isCreateNewTopic) { diff --git a/src/store/chat/slices/aiChat/actions/conversationLifecycle.ts b/src/store/chat/slices/aiChat/actions/conversationLifecycle.ts index 3b5b071724..7173d3e814 100644 --- a/src/store/chat/slices/aiChat/actions/conversationLifecycle.ts +++ b/src/store/chat/slices/aiChat/actions/conversationLifecycle.ts @@ -40,7 +40,7 @@ import { } from '@/store/agent/selectors'; import { agentGroupByIdSelectors, getChatGroupStoreState } from '@/store/agentGroup'; import { selectRuntimeType } from '@/store/chat/slices/aiChat/actions/agentDispatcher'; -import { runAfterUserMessagePersistedLifecycle } from '@/store/chat/slices/aiChat/actions/agentRunLifecycle'; +import { runAgentRunLifecycle } from '@/store/chat/slices/aiChat/actions/agentRunLifecycle'; import { resolveHeteroResume } from '@/store/chat/slices/aiChat/actions/heteroResume'; import { dispatchNonHeteroSubAgent } from '@/store/chat/slices/aiChat/actions/nonHeteroSubAgentDispatcher'; import { PortalViewType } from '@/store/chat/slices/portal/initialState'; @@ -645,12 +645,13 @@ export class ConversationLifecycleActionImpl { { operationId }, ); - runAfterUserMessagePersistedLifecycle({ + runAgentRunLifecycle({ agentId, assistantMessageId: heteroData.assistantMessageId, get: this.#get, isCreateNewTopic: heteroData.isCreateNewTopic, messages: heteroMessages, + phase: 'afterUserMessagePersisted', topicId: heteroData.topicId ?? heteroContext.topicId, }).catch(console.error); @@ -992,12 +993,13 @@ export class ConversationLifecycleActionImpl { if (data.topicId) this.#get().internal_updateTopicLoading(data.topicId, true); - runAfterUserMessagePersistedLifecycle({ + runAgentRunLifecycle({ agentId, assistantMessageId: data.assistantMessageId, get: this.#get, isCreateNewTopic: data.isCreateNewTopic, messages: data.messages, + phase: 'afterUserMessagePersisted', topicId: data.topicId, }).catch(console.error); diff --git a/src/store/chat/slices/aiChat/actions/gateway.ts b/src/store/chat/slices/aiChat/actions/gateway.ts index 97f8470cad..b9e424c65e 100644 --- a/src/store/chat/slices/aiChat/actions/gateway.ts +++ b/src/store/chat/slices/aiChat/actions/gateway.ts @@ -23,10 +23,7 @@ import type { ChatStore } from '@/store/chat/store'; import type { StoreSetter } from '@/store/types'; import { useUserStore } from '@/store/user'; -import { - completeAgentRunOperationLifecycle, - runAfterUserMessagePersistedLifecycle, -} from './agentRunLifecycle'; +import { runAgentRunLifecycle } from './agentRunLifecycle'; import { createGatewayEventHandler } from './gatewayEventHandler'; /** @@ -456,12 +453,13 @@ export class GatewayActionImpl { void refreshCreatedTopic .then(() => - runAfterUserMessagePersistedLifecycle({ + runAgentRunLifecycle({ agentId: context.agentId, assistantMessageId: result.assistantMessageId, get: this.#get, isCreateNewTopic, messages: persistedMessagesForLifecycle, + phase: 'afterUserMessagePersisted', topicId: result.topicId, }), ) @@ -553,11 +551,12 @@ export class GatewayActionImpl { gatewayUrl: agentGatewayUrl, onEvent: eventHandler, onOperationComplete: () => { - void completeAgentRunOperationLifecycle({ + void runAgentRunLifecycle({ context: execContext, get: this.#get, onComplete, operationId: gatewayOpId, + phase: 'operationComplete', runtimeType: 'gateway', }); }, @@ -665,10 +664,11 @@ export class GatewayActionImpl { gatewayUrl: agentGatewayUrl, onEvent: eventHandler, onOperationComplete: () => { - void completeAgentRunOperationLifecycle({ + void runAgentRunLifecycle({ context, get: this.#get, operationId: gatewayOpId, + phase: 'operationComplete', runtimeType: 'gateway', }); }, diff --git a/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts b/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts index 9669b7b7e4..882524e7f1 100644 --- a/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts +++ b/src/store/chat/slices/aiChat/actions/gatewayEventHandler.ts @@ -21,7 +21,7 @@ import { notifyDesktopHumanApprovalRequired } from '@/store/chat/utils/desktopNo import { addUsageToOperationMetrics, type OperationUsageLike } from '@/utils/operationUsageMetrics'; import type { AgentRunLifecycleCallback, AgentRunRuntimeType } from './agentRunLifecycle'; -import { completeAgentRunLifecycle, runAgentRunEventLifecycle } from './agentRunLifecycle'; +import { runAgentRunLifecycle } from './agentRunLifecycle'; // Lazy-loaded to break the import cycle: // gateway.ts → gatewayEventHandler.ts → executors/index.ts (which pulls in @@ -337,12 +337,13 @@ export const createGatewayEventHandler = ( } } - runAgentRunEventLifecycle({ + void runAgentRunLifecycle({ anchorMessageId: currentAssistantMessageId, assistantMessageId: currentAssistantMessageId, context, eventType: 'stream_start', operationId, + phase: 'runEvent', runtimeType, stepIndex: event.stepIndex, }); @@ -517,10 +518,11 @@ export const createGatewayEventHandler = ( // Refresh on execution_complete to ensure final step state is consistent if (data?.phase === 'execution_complete') { enqueue(async () => { - runAgentRunEventLifecycle({ + void runAgentRunLifecycle({ context, eventType: 'step_complete', operationId, + phase: 'runEvent', runtimeType, stepIndex: event.stepIndex, }); @@ -538,12 +540,13 @@ export const createGatewayEventHandler = ( ? 'cancelled' : 'completed'; - runAgentRunEventLifecycle({ + void runAgentRunLifecycle({ anchorMessageId: currentAssistantMessageId, assistantMessageId: currentAssistantMessageId, context, eventType: 'runtime_end', operationId, + phase: 'runEvent', runtimeType, }); get().internal_toggleToolCallingStreaming(currentAssistantMessageId, undefined); @@ -584,7 +587,7 @@ export const createGatewayEventHandler = ( await fetchAndReplaceMessages(get, context).catch(console.error); } - await completeAgentRunLifecycle({ + await runAgentRunLifecycle({ afterRunComplete: params.afterRunComplete, anchorMessageId: currentAssistantMessageId, assistantMessageId: currentAssistantMessageId, @@ -593,6 +596,7 @@ export const createGatewayEventHandler = ( drainQueuedMessages: params.drainQueuedMessages, get, operationId, + phase: 'runComplete', runtimeType, status: terminalStatus, }); @@ -614,11 +618,12 @@ export const createGatewayEventHandler = ( const messageError = toChatMessageError(event.data); const errorMessage = messageError.message; - runAgentRunEventLifecycle({ + void runAgentRunLifecycle({ context, errorMessage, eventType: 'error', operationId, + phase: 'runEvent', runtimeType, }); @@ -655,7 +660,7 @@ export const createGatewayEventHandler = ( dispatchContext, ); - await completeAgentRunLifecycle({ + await runAgentRunLifecycle({ afterRunComplete: params.afterRunComplete, anchorMessageId: currentAssistantMessageId, assistantMessageId: currentAssistantMessageId, @@ -664,6 +669,7 @@ export const createGatewayEventHandler = ( drainQueuedMessages: false, get, operationId, + phase: 'runComplete', runtimeType, status: 'failed', }); diff --git a/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts b/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts index 0c26f8e21e..6a1153f978 100644 --- a/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts +++ b/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts @@ -40,7 +40,7 @@ import { resolveNotificationNavigatePath } from '@/store/chat/utils/desktopNotif import { markdownToTxt } from '@/utils/markdownToTxt'; import { addUsageToOperationMetrics } from '@/utils/operationUsageMetrics'; -import { completeAgentRunLifecycle } from './agentRunLifecycle'; +import { runAgentRunLifecycle } from './agentRunLifecycle'; import { createGatewayEventHandler } from './gatewayEventHandler'; /** Mirrors `idGenerator('threads', 16)` on the server so sync-allocated ids have the same shape. */ @@ -472,13 +472,14 @@ export const executeHeterogeneousAgent = async ( { operationId }, ); - await completeAgentRunLifecycle({ + await runAgentRunLifecycle({ anchorMessageId: mainState.currentAssistantId, assistantMessageId: mainState.currentAssistantId, context, drainQueuedMessages: false, get, operationId, + phase: 'runComplete', runtimeType: 'heterogeneous', status: 'failed', }); diff --git a/src/store/chat/slices/aiChat/actions/streamingExecutor.ts b/src/store/chat/slices/aiChat/actions/streamingExecutor.ts index 79e8fe5497..60721e916f 100644 --- a/src/store/chat/slices/aiChat/actions/streamingExecutor.ts +++ b/src/store/chat/slices/aiChat/actions/streamingExecutor.ts @@ -52,7 +52,7 @@ import { selectActivatedToolIdsFromMessages, selectTodosFromMessages, } from '../../message/selectors/dbMessage'; -import { completeAgentRunLifecycle, startAgentRunLifecycle } from './agentRunLifecycle'; +import { runAgentRunLifecycle } from './agentRunLifecycle'; const log = debug('lobe-store:streaming-executor'); @@ -536,11 +536,12 @@ export class StreamingExecutorActionImpl { originalMessages.length, disableTools, ); - startAgentRunLifecycle({ + void runAgentRunLifecycle({ context, operationId, parentMessageId, parentMessageType, + phase: 'runStart', runtimeType: 'client', }); @@ -830,12 +831,13 @@ export class StreamingExecutorActionImpl { } const runtimeCompletePayload = getRuntimeCompletePayload(); - await completeAgentRunLifecycle({ + await runAgentRunLifecycle({ anchorMessageId: runtimeCompletePayload.assistantMessageId, assistantMessageId: runtimeCompletePayload.assistantMessageId, context, get: this.#get, operationId, + phase: 'runComplete', runtimeType: 'client', status: runtimeCompletePayload.status ?? 'failed', triggerMessageId: runtimeCompletePayload.triggerMessageId,