♻️ refactor(chat): expose unified agent run lifecycle dispatcher

This commit is contained in:
Arvin Xu
2026-06-13 21:07:43 +08:00
parent b5122dcea2
commit af79c8a24f
8 changed files with 120 additions and 53 deletions
@@ -4,12 +4,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest';
import { topicService } from '@/services/topic';
import { emitClientAgentSignalSourceEvent } from '@/store/chat/slices/aiChat/actions/agentSignalBridge';
import {
completeAgentRunLifecycle,
completeAgentRunOperationLifecycle,
runAgentRunEventLifecycle,
startAgentRunLifecycle,
} from '../agentRunLifecycle';
import { runAgentRunLifecycle } from '../agentRunLifecycle';
const desktopEnv = vi.hoisted(() => ({ isDesktop: false }));
const desktopNotificationMock = vi.hoisted(() => ({
@@ -102,7 +97,7 @@ const createStore = (overrides: Record<string, unknown> = {}) => {
} as any;
};
describe('completeAgentRunLifecycle', () => {
describe('runAgentRunLifecycle', () => {
beforeEach(() => {
vi.clearAllMocks();
desktopEnv.isDesktop = false;
@@ -114,7 +109,7 @@ describe('completeAgentRunLifecycle', () => {
const store = createStore();
const get = vi.fn(() => store);
const result = await completeAgentRunLifecycle({
const result = await runAgentRunLifecycle({
afterRunComplete: [
() => {
store.events.push('afterRunComplete');
@@ -130,6 +125,7 @@ describe('completeAgentRunLifecycle', () => {
context: { agentId: 'agent-1', topicId: 'topic-1' } as any,
get,
operationId: 'op-1',
phase: 'runComplete',
queueDrainDelayMs: 0,
runtimeType: 'client',
status: 'completed',
@@ -173,7 +169,7 @@ describe('completeAgentRunLifecycle', () => {
const consoleErrorSpy = vi.spyOn(console, 'error').mockImplementation(() => {});
const store = createStore();
await completeAgentRunLifecycle({
await runAgentRunLifecycle({
beforeRunComplete: [
async () => {
throw new Error('metadata write failed');
@@ -182,6 +178,7 @@ describe('completeAgentRunLifecycle', () => {
context: { agentId: 'agent-1', topicId: 'topic-1' } as any,
get: () => store,
operationId: 'op-1',
phase: 'runComplete',
queueDrainDelayMs: 0,
runtimeType: 'heterogeneous',
status: 'completed',
@@ -201,18 +198,20 @@ describe('completeAgentRunLifecycle', () => {
it('does not drain queued messages for failed or cancelled terminal states', async () => {
const store = createStore();
await completeAgentRunLifecycle({
await runAgentRunLifecycle({
context: { agentId: 'agent-1', topicId: 'topic-1' } as any,
get: () => store,
operationId: 'op-1',
phase: 'runComplete',
runtimeType: 'gateway',
status: 'failed',
});
await completeAgentRunLifecycle({
await runAgentRunLifecycle({
context: { agentId: 'agent-1', topicId: 'topic-1' } as any,
get: () => store,
operationId: 'op-1',
phase: 'runComplete',
runtimeType: 'gateway',
status: 'cancelled',
});
@@ -221,17 +220,19 @@ describe('completeAgentRunLifecycle', () => {
expect(store.drainQueuedMessages).not.toHaveBeenCalled();
});
it('emits client runtime start from the start lifecycle only for client runs', () => {
startAgentRunLifecycle({
it('emits client runtime start from the start lifecycle only for client runs', async () => {
await runAgentRunLifecycle({
context: { agentId: 'agent-1', threadId: 'thread-1', topicId: 'topic-1' } as any,
operationId: 'op-1',
parentMessageId: 'user-1',
parentMessageType: 'user',
phase: 'runStart',
runtimeType: 'client',
});
startAgentRunLifecycle({
await runAgentRunLifecycle({
context: { agentId: 'agent-1', topicId: 'topic-1' } as any,
operationId: 'op-2',
phase: 'runStart',
runtimeType: 'gateway',
});
@@ -249,13 +250,14 @@ describe('completeAgentRunLifecycle', () => {
);
});
it('emits gateway event signals from the event lifecycle', () => {
runAgentRunEventLifecycle({
it('emits gateway event signals from the event lifecycle', async () => {
await runAgentRunLifecycle({
anchorMessageId: 'asst-1',
assistantMessageId: 'asst-1',
context: { agentId: 'agent-1', topicId: 'topic-1' } as any,
eventType: 'stream_start',
operationId: 'op-1',
phase: 'runEvent',
runtimeType: 'gateway',
stepIndex: 2,
});
@@ -278,11 +280,12 @@ describe('completeAgentRunLifecycle', () => {
const onComplete = vi.fn();
const store = createStore();
await completeAgentRunOperationLifecycle({
await runAgentRunLifecycle({
context: { agentId: 'agent-1', groupId: 'group-1', topicId: 'topic-1' } as any,
get: () => store,
onComplete,
operationId: 'op-1',
phase: 'operationComplete',
runtimeType: 'gateway',
});
@@ -313,12 +316,13 @@ describe('completeAgentRunLifecycle', () => {
},
});
await completeAgentRunLifecycle({
await runAgentRunLifecycle({
assistantMessageId: 'asst-1',
context: { agentId: 'agent-1', topicId: 'topic-1' } as any,
drainQueuedMessages: false,
get: () => store,
operationId: 'op-1',
phase: 'runComplete',
runtimeType: 'client',
status: 'completed',
});
@@ -6,17 +6,22 @@ import type * as ConstVersion from '@/const/version';
import { aiAgentService } from '@/services/aiAgent';
import { messageService } from '@/services/message';
import type * as AgentRunLifecycleModule from '@/store/chat/slices/aiChat/actions/agentRunLifecycle';
import { runAfterUserMessagePersistedLifecycle } from '@/store/chat/slices/aiChat/actions/agentRunLifecycle';
import type { GatewayConnection } from '../gateway';
import { GatewayActionImpl } from '../gateway';
const postPersistLifecycleMock = vi.hoisted(() => vi.fn().mockResolvedValue(undefined));
vi.mock('@/store/chat/slices/aiChat/actions/agentRunLifecycle', async (importOriginal) => {
const actual = await importOriginal<typeof AgentRunLifecycleModule>();
return {
...actual,
runAfterUserMessagePersistedLifecycle: vi.fn().mockResolvedValue(undefined),
runAgentRunLifecycle: vi.fn((event: AgentRunLifecycleModule.AgentRunLifecycleEvent) => {
if (event.phase === 'afterUserMessagePersisted') return postPersistLifecycleMock(event);
return actual.runAgentRunLifecycle(event as any);
}),
};
});
@@ -131,7 +136,7 @@ describe('GatewayActionImpl', () => {
beforeEach(() => {
vi.clearAllMocks();
vi.mocked(messageService.getMessages).mockResolvedValue([]);
vi.mocked(runAfterUserMessagePersistedLifecycle).mockResolvedValue(undefined);
postPersistLifecycleMock.mockResolvedValue(undefined);
});
afterEach(() => {
@@ -532,12 +537,13 @@ describe('GatewayActionImpl', () => {
});
await vi.waitFor(() => {
expect(runAfterUserMessagePersistedLifecycle).toHaveBeenCalledWith(
expect(postPersistLifecycleMock).toHaveBeenCalledWith(
expect.objectContaining({
agentId: 'agent-1',
assistantMessageId: 'ast-1',
isCreateNewTopic: false,
messages: [],
phase: 'afterUserMessagePersisted',
topicId: 'topic-1',
}),
);
@@ -555,7 +561,7 @@ describe('GatewayActionImpl', () => {
refreshTopic.mockImplementation(async () => {
events.push('refreshTopic');
});
vi.mocked(runAfterUserMessagePersistedLifecycle).mockImplementation(async () => {
postPersistLifecycleMock.mockImplementation(async () => {
events.push('postPersistLifecycle');
});
vi.mocked(messageService.getMessages).mockResolvedValueOnce(persistedMessages);
@@ -582,12 +588,13 @@ describe('GatewayActionImpl', () => {
await vi.waitFor(() => {
expect(events).toEqual(['refreshTopic', 'postPersistLifecycle']);
});
expect(runAfterUserMessagePersistedLifecycle).toHaveBeenCalledWith(
expect(postPersistLifecycleMock).toHaveBeenCalledWith(
expect.objectContaining({
agentId: 'agent-1',
assistantMessageId: 'ast-1',
isCreateNewTopic: true,
messages: persistedMessages,
phase: 'afterUserMessagePersisted',
topicId: 'topic-new',
}),
);
@@ -71,7 +71,7 @@ export interface CompleteAgentRunOperationLifecycleParams {
runtimeType: AgentRunRuntimeType;
}
interface AfterUserMessagePersistedParams {
export interface AfterUserMessagePersistedLifecycleParams {
agentId: string;
assistantMessageId: string;
get: () => ChatStore;
@@ -80,6 +80,51 @@ interface AfterUserMessagePersistedParams {
topicId?: string | null;
}
export type AgentRunLifecycleEvent =
| ({ phase: 'afterUserMessagePersisted' } & AfterUserMessagePersistedLifecycleParams)
| ({ phase: 'operationComplete' } & CompleteAgentRunOperationLifecycleParams)
| ({ phase: 'runComplete' } & CompleteAgentRunLifecycleParams)
| ({ phase: 'runEvent' } & AgentRunEventLifecycleParams)
| ({ phase: 'runStart' } & StartAgentRunLifecycleParams);
type AgentRunLifecycleNonCompleteEvent = Exclude<AgentRunLifecycleEvent, { phase: 'runComplete' }>;
export function runAgentRunLifecycle(
event: { phase: 'runComplete' } & CompleteAgentRunLifecycleParams,
): Promise<CompleteAgentRunLifecycleResult>;
export function runAgentRunLifecycle(event: AgentRunLifecycleNonCompleteEvent): Promise<void>;
export async function runAgentRunLifecycle(
event: AgentRunLifecycleEvent,
): Promise<CompleteAgentRunLifecycleResult | void> {
switch (event.phase) {
case 'afterUserMessagePersisted': {
const { phase: _, ...params } = event;
return runAfterUserMessagePersistedLifecycle(params);
}
case 'operationComplete': {
const { phase: _, ...params } = event;
return completeAgentRunOperationLifecycle(params);
}
case 'runComplete': {
const { phase: _, ...params } = event;
return completeAgentRunLifecycle(params);
}
case 'runEvent': {
const { phase: _, ...params } = event;
runAgentRunEventLifecycle(params);
return;
}
case 'runStart': {
const { phase: _, ...params } = event;
startAgentRunLifecycle(params);
}
}
}
const runCallbacks = async (
phase: string,
callbacks: AgentRunLifecycleCallback[] | undefined,
@@ -149,7 +194,7 @@ const drainQueuedMessagesAfterComplete = ({
return remainingQueued.length;
};
export const startAgentRunLifecycle = ({
const startAgentRunLifecycle = ({
context,
operationId,
parentMessageId,
@@ -173,7 +218,7 @@ export const startAgentRunLifecycle = ({
});
};
export const runAgentRunEventLifecycle = ({
const runAgentRunEventLifecycle = ({
anchorMessageId,
assistantMessageId,
context,
@@ -307,7 +352,7 @@ const runClientCompleteLifecycle = async ({
}
};
export const completeAgentRunLifecycle = async ({
const completeAgentRunLifecycle = async ({
afterRunComplete,
anchorMessageId,
assistantMessageId,
@@ -379,7 +424,7 @@ export const completeAgentRunLifecycle = async ({
return { contextKey, queuedMessageCount };
};
export const completeAgentRunOperationLifecycle = async ({
const completeAgentRunOperationLifecycle = async ({
context,
get,
onComplete,
@@ -426,14 +471,14 @@ const applyTopicTitle = async (
console.info('[dev] sliced topic title (NEXT_PUBLIC_DEV_DISABLE_AUTO_TOPIC=1):', title);
};
export const runAfterUserMessagePersistedLifecycle = async ({
const runAfterUserMessagePersistedLifecycle = async ({
agentId,
assistantMessageId,
get,
isCreateNewTopic,
messages,
topicId,
}: AfterUserMessagePersistedParams): Promise<void> => {
}: AfterUserMessagePersistedLifecycleParams): Promise<void> => {
if (!topicId) return;
if (isCreateNewTopic) {
@@ -40,7 +40,7 @@ import {
} from '@/store/agent/selectors';
import { agentGroupByIdSelectors, getChatGroupStoreState } from '@/store/agentGroup';
import { selectRuntimeType } from '@/store/chat/slices/aiChat/actions/agentDispatcher';
import { runAfterUserMessagePersistedLifecycle } from '@/store/chat/slices/aiChat/actions/agentRunLifecycle';
import { runAgentRunLifecycle } from '@/store/chat/slices/aiChat/actions/agentRunLifecycle';
import { resolveHeteroResume } from '@/store/chat/slices/aiChat/actions/heteroResume';
import { dispatchNonHeteroSubAgent } from '@/store/chat/slices/aiChat/actions/nonHeteroSubAgentDispatcher';
import { PortalViewType } from '@/store/chat/slices/portal/initialState';
@@ -645,12 +645,13 @@ export class ConversationLifecycleActionImpl {
{ operationId },
);
runAfterUserMessagePersistedLifecycle({
runAgentRunLifecycle({
agentId,
assistantMessageId: heteroData.assistantMessageId,
get: this.#get,
isCreateNewTopic: heteroData.isCreateNewTopic,
messages: heteroMessages,
phase: 'afterUserMessagePersisted',
topicId: heteroData.topicId ?? heteroContext.topicId,
}).catch(console.error);
@@ -992,12 +993,13 @@ export class ConversationLifecycleActionImpl {
if (data.topicId) this.#get().internal_updateTopicLoading(data.topicId, true);
runAfterUserMessagePersistedLifecycle({
runAgentRunLifecycle({
agentId,
assistantMessageId: data.assistantMessageId,
get: this.#get,
isCreateNewTopic: data.isCreateNewTopic,
messages: data.messages,
phase: 'afterUserMessagePersisted',
topicId: data.topicId,
}).catch(console.error);
@@ -23,10 +23,7 @@ import type { ChatStore } from '@/store/chat/store';
import type { StoreSetter } from '@/store/types';
import { useUserStore } from '@/store/user';
import {
completeAgentRunOperationLifecycle,
runAfterUserMessagePersistedLifecycle,
} from './agentRunLifecycle';
import { runAgentRunLifecycle } from './agentRunLifecycle';
import { createGatewayEventHandler } from './gatewayEventHandler';
/**
@@ -456,12 +453,13 @@ export class GatewayActionImpl {
void refreshCreatedTopic
.then(() =>
runAfterUserMessagePersistedLifecycle({
runAgentRunLifecycle({
agentId: context.agentId,
assistantMessageId: result.assistantMessageId,
get: this.#get,
isCreateNewTopic,
messages: persistedMessagesForLifecycle,
phase: 'afterUserMessagePersisted',
topicId: result.topicId,
}),
)
@@ -553,11 +551,12 @@ export class GatewayActionImpl {
gatewayUrl: agentGatewayUrl,
onEvent: eventHandler,
onOperationComplete: () => {
void completeAgentRunOperationLifecycle({
void runAgentRunLifecycle({
context: execContext,
get: this.#get,
onComplete,
operationId: gatewayOpId,
phase: 'operationComplete',
runtimeType: 'gateway',
});
},
@@ -665,10 +664,11 @@ export class GatewayActionImpl {
gatewayUrl: agentGatewayUrl,
onEvent: eventHandler,
onOperationComplete: () => {
void completeAgentRunOperationLifecycle({
void runAgentRunLifecycle({
context,
get: this.#get,
operationId: gatewayOpId,
phase: 'operationComplete',
runtimeType: 'gateway',
});
},
@@ -21,7 +21,7 @@ import { notifyDesktopHumanApprovalRequired } from '@/store/chat/utils/desktopNo
import { addUsageToOperationMetrics, type OperationUsageLike } from '@/utils/operationUsageMetrics';
import type { AgentRunLifecycleCallback, AgentRunRuntimeType } from './agentRunLifecycle';
import { completeAgentRunLifecycle, runAgentRunEventLifecycle } from './agentRunLifecycle';
import { runAgentRunLifecycle } from './agentRunLifecycle';
// Lazy-loaded to break the import cycle:
// gateway.ts → gatewayEventHandler.ts → executors/index.ts (which pulls in
@@ -337,12 +337,13 @@ export const createGatewayEventHandler = (
}
}
runAgentRunEventLifecycle({
void runAgentRunLifecycle({
anchorMessageId: currentAssistantMessageId,
assistantMessageId: currentAssistantMessageId,
context,
eventType: 'stream_start',
operationId,
phase: 'runEvent',
runtimeType,
stepIndex: event.stepIndex,
});
@@ -517,10 +518,11 @@ export const createGatewayEventHandler = (
// Refresh on execution_complete to ensure final step state is consistent
if (data?.phase === 'execution_complete') {
enqueue(async () => {
runAgentRunEventLifecycle({
void runAgentRunLifecycle({
context,
eventType: 'step_complete',
operationId,
phase: 'runEvent',
runtimeType,
stepIndex: event.stepIndex,
});
@@ -538,12 +540,13 @@ export const createGatewayEventHandler = (
? 'cancelled'
: 'completed';
runAgentRunEventLifecycle({
void runAgentRunLifecycle({
anchorMessageId: currentAssistantMessageId,
assistantMessageId: currentAssistantMessageId,
context,
eventType: 'runtime_end',
operationId,
phase: 'runEvent',
runtimeType,
});
get().internal_toggleToolCallingStreaming(currentAssistantMessageId, undefined);
@@ -584,7 +587,7 @@ export const createGatewayEventHandler = (
await fetchAndReplaceMessages(get, context).catch(console.error);
}
await completeAgentRunLifecycle({
await runAgentRunLifecycle({
afterRunComplete: params.afterRunComplete,
anchorMessageId: currentAssistantMessageId,
assistantMessageId: currentAssistantMessageId,
@@ -593,6 +596,7 @@ export const createGatewayEventHandler = (
drainQueuedMessages: params.drainQueuedMessages,
get,
operationId,
phase: 'runComplete',
runtimeType,
status: terminalStatus,
});
@@ -614,11 +618,12 @@ export const createGatewayEventHandler = (
const messageError = toChatMessageError(event.data);
const errorMessage = messageError.message;
runAgentRunEventLifecycle({
void runAgentRunLifecycle({
context,
errorMessage,
eventType: 'error',
operationId,
phase: 'runEvent',
runtimeType,
});
@@ -655,7 +660,7 @@ export const createGatewayEventHandler = (
dispatchContext,
);
await completeAgentRunLifecycle({
await runAgentRunLifecycle({
afterRunComplete: params.afterRunComplete,
anchorMessageId: currentAssistantMessageId,
assistantMessageId: currentAssistantMessageId,
@@ -664,6 +669,7 @@ export const createGatewayEventHandler = (
drainQueuedMessages: false,
get,
operationId,
phase: 'runComplete',
runtimeType,
status: 'failed',
});
@@ -40,7 +40,7 @@ import { resolveNotificationNavigatePath } from '@/store/chat/utils/desktopNotif
import { markdownToTxt } from '@/utils/markdownToTxt';
import { addUsageToOperationMetrics } from '@/utils/operationUsageMetrics';
import { completeAgentRunLifecycle } from './agentRunLifecycle';
import { runAgentRunLifecycle } from './agentRunLifecycle';
import { createGatewayEventHandler } from './gatewayEventHandler';
/** Mirrors `idGenerator('threads', 16)` on the server so sync-allocated ids have the same shape. */
@@ -472,13 +472,14 @@ export const executeHeterogeneousAgent = async (
{ operationId },
);
await completeAgentRunLifecycle({
await runAgentRunLifecycle({
anchorMessageId: mainState.currentAssistantId,
assistantMessageId: mainState.currentAssistantId,
context,
drainQueuedMessages: false,
get,
operationId,
phase: 'runComplete',
runtimeType: 'heterogeneous',
status: 'failed',
});
@@ -52,7 +52,7 @@ import {
selectActivatedToolIdsFromMessages,
selectTodosFromMessages,
} from '../../message/selectors/dbMessage';
import { completeAgentRunLifecycle, startAgentRunLifecycle } from './agentRunLifecycle';
import { runAgentRunLifecycle } from './agentRunLifecycle';
const log = debug('lobe-store:streaming-executor');
@@ -536,11 +536,12 @@ export class StreamingExecutorActionImpl {
originalMessages.length,
disableTools,
);
startAgentRunLifecycle({
void runAgentRunLifecycle({
context,
operationId,
parentMessageId,
parentMessageType,
phase: 'runStart',
runtimeType: 'client',
});
@@ -830,12 +831,13 @@ export class StreamingExecutorActionImpl {
}
const runtimeCompletePayload = getRuntimeCompletePayload();
await completeAgentRunLifecycle({
await runAgentRunLifecycle({
anchorMessageId: runtimeCompletePayload.assistantMessageId,
assistantMessageId: runtimeCompletePayload.assistantMessageId,
context,
get: this.#get,
operationId,
phase: 'runComplete',
runtimeType: 'client',
status: runtimeCompletePayload.status ?? 'failed',
triggerMessageId: runtimeCompletePayload.triggerMessageId,