diff --git a/apps/server/src/services/heterogeneousAgent/HeterogeneousPersistenceHandler.ts b/apps/server/src/services/heterogeneousAgent/HeterogeneousPersistenceHandler.ts index c0f439f03b..7be99fc7f8 100644 --- a/apps/server/src/services/heterogeneousAgent/HeterogeneousPersistenceHandler.ts +++ b/apps/server/src/services/heterogeneousAgent/HeterogeneousPersistenceHandler.ts @@ -1,11 +1,14 @@ import type { AgentStreamEvent } from '@lobechat/agent-gateway-client'; import { LOADING_FLAT } from '@lobechat/const'; import type { + MainAgentIntent, + MainAgentReduceCtx, + MainAgentRunState, + MainAgentTurnToolState, SubagentIntent, - SubagentReduceCtx, - SubagentRunsState, + ToolCallPayload, } from '@lobechat/heterogeneous-agents'; -import { createSubagentRunsState, reduceSubagentRuns } from '@lobechat/heterogeneous-agents'; +import { createMainAgentRunState, reduceMainAgent } from '@lobechat/heterogeneous-agents'; import { AgentRuntimeErrorType, type ChatMessageError, @@ -74,78 +77,46 @@ const eventKey = (event: AgentStreamEvent): string => { return `${event.stepIndex}:${event.type}:${event.timestamp}:${fnv1a(dataJson)}`; }; -const normalizeErrorText = (value?: string) => value?.replaceAll(/\s+/g, ' ').trim(); +interface AssistantDbSnapshot { + content: string; + metadata: Record; + model: string | undefined; + parentId: string | null | undefined; + provider: string | undefined; + reasoning: string; + textSnapshotSeq: number; + tools: ChatToolPayload[]; +} -/** - * CC sometimes streams the error string into `content` BEFORE emitting the - * structured error event (e.g. AuthRequired echoes the stderr line). Mirrors - * the renderer's `shouldSuppressTerminalErrorEcho` (lines 113–130 of - * heterogeneousAgentExecutor.ts): only suppress when the body is explicitly - * marked or matches the AuthRequired code, AND the trimmed strings are - * equal. Anything else stays — accidental partial overlaps are not echo. - */ -const shouldSuppressTerminalErrorEcho = ( - content: string, - error: ChatMessageError | undefined, -): boolean => { - if (!error) return false; - const errorBody = error.body as - | { - clearEchoedContent?: boolean; - code?: string; - message?: string; - stderr?: string; - } - | undefined; - // The renderer guards on either an explicit flag or AuthRequired (the most - // common echo source). Other error codes might echo too, but we err on the - // side of preserving content unless the producer asks for the cleanup. - const ECHO_TRIGGER_CODES = new Set(['AuthRequired']); - if ( - !errorBody?.clearEchoedContent && - (!errorBody?.code || !ECHO_TRIGGER_CODES.has(errorBody.code)) - ) { - return false; - } - const normalizedContent = normalizeErrorText(content); - const normalizedError = normalizeErrorText( - errorBody?.stderr || errorBody?.message || error.message, - ); - return !!normalizedContent && !!normalizedError && normalizedContent === normalizedError; -}; - -interface ToolCallPayload extends ChatToolPayload {} - -/** Per-assistant-message tool persistence state (main or sub-agent scope). */ -interface ToolPersistenceState { - payloads: ChatToolPayload[]; - persistedIds: Set; +interface AssistantMessageDbLike { + content?: unknown; + metadata?: Record | null; + model?: string; + parentId?: string | null; + provider?: string; + reasoning?: { content?: string } | null; + tools?: ChatToolPayload[] | null; } /** * Per-operation in-memory state. Lifetime spans the whole CLI run from first - * `heteroIngest` batch through `heteroFinish`. Multi-replica caveat: state is - * per-Node-process; cloud sandbox routing must be sticky to a single replica - * for one operationId, otherwise turn boundaries on the second replica will - * lose the chain-parent and pre-existing tool map. (Phase 3 sandbox owns the - * endpoint per-instance, so this is not a problem in practice.) + * `heteroIngest` batch through `heteroFinish`. Main-agent state is projected + * back from DB at each ingest boundary; active subagent run state is still the + * in-memory part of the operation. */ interface OperationState { - accumulatedContent: string; - accumulatedReasoning: string; agentId: string | null; - currentAssistantMessageId: string; - lastModel: string | undefined; - lastProvider: string | undefined; lastStepIndex: number; - lastTextSnapshotSeq: number; - messageMetadata: Record; + main: MainAgentRunState; operationId: string; processedKeys: Set; - /** Shared subagent run coordinator state (see `@lobechat/heterogeneous-agents`). */ - subagentState: SubagentRunsState; + /** + * Run-global DB index for every tool message in the topic, keyed by + * `tool_call_id`. Main and subagent reducers keep only their per-turn maps; + * this map lets a `tool_result` land even when its `tools_calling` was + * reduced by another serverless replica. + */ toolMsgIdByCallId: Map; - toolState: ToolPersistenceState; topicId: string; } @@ -246,105 +217,8 @@ export class HeterogeneousPersistenceHandler { await this.syncAssistantPointerForAdvancedStep(state); } - // Refresh content/reasoning baseline from DB before processing this batch. - // - // Root cause of truncation: Vercel serverless routes consecutive batches to - // different Lambda instances. A warm replica's in-memory `accumulatedContent` - // reflects only the batches IT processed — it has no visibility into batches - // handled by other replicas. When that warm replica later processes a - // tools_calling event, `persistMainToolBatch` writes the stale short content - // alongside the new tools, overwriting the correct (longer) DB value. - // - // Fix: re-read the current assistant message from DB at the start of every - // ingest call. Since `flushBatchContent` always writes at the end of each - // batch, DB is authoritative. Reading here gives us the freshest flushed - // content as the new baseline, so any text accumulated in this batch extends - // the correct full string rather than a stale partial. - // - // Cost: one extra `findById` round-trip per warm ingest call (cold calls - // already read the message in `loadOrCreateState` — the second read is - // redundant but harmless and keeps the logic uniform). - const refreshed = await this.deps.messageModel.findById(state.currentAssistantMessageId); - const rawDbContent = (refreshed?.content ?? '') as string; - const dbContent = rawDbContent === LOADING_FLAT ? '' : rawDbContent; - const dbReasoning = (refreshed?.reasoning as { content?: string } | null)?.content ?? ''; - const dbMetadata = ((refreshed?.metadata as Record | null) ?? {}) as Record< - string, - any - >; - const dbTextSnapshotSeq = Number(dbMetadata.heteroTextSnapshotSeq ?? 0); - - // Adopt DB value only when it is LONGER than what this instance holds in memory. - // This correctly handles two competing cases without introducing a dirty flag: - // - // 1. Multi-replica stale (the problem this refresh was added to fix): - // Another replica flushed more content to DB than this warm instance - // has in memory → dbContent is longer → adopt it so new text in this - // batch extends the correct full string rather than a stale partial. - // - // 2. flushBatchContent retry on the same warm instance (P1 concern): - // Events were already processed and marked in processedKeys, but the - // end-of-batch flush threw a transient DB error. DB still holds the - // shorter pre-batch value; in-memory already has the correct result. - // Unconditionally overwriting with the DB value would wipe those - // chunks permanently (processedKeys prevents replay). Taking the - // longer in-memory value keeps them safe. - if (dbContent.length > state.accumulatedContent.length) { - state.accumulatedContent = dbContent; - } - if (dbReasoning.length > state.accumulatedReasoning.length) { - state.accumulatedReasoning = dbReasoning; - } - if (Number.isFinite(dbTextSnapshotSeq) && dbTextSnapshotSeq > state.lastTextSnapshotSeq) { - state.lastTextSnapshotSeq = dbTextSnapshotSeq; - state.messageMetadata = dbMetadata; - } else if ( - Object.keys(state.messageMetadata).length === 0 && - Object.keys(dbMetadata).length > 0 - ) { - state.messageMetadata = dbMetadata; - } - - // Same multi-replica concern for `tools[]` and `lastModel`/`lastProvider`. - // - // Why this is necessary: `handleStepStart` computes the new assistant's - // parentId from `state.toolState.payloads` and copies model/provider from - // `state.lastModel` / `state.lastProvider`. Those are populated by - // `persistMainToolBatch` and `handleTurnMetadata` respectively — both - // run on whichever replica drains the relevant event. When the replica - // driving the next step boundary is NOT the one that drained the prior - // step's tools_calling / step_complete, the in-memory state is empty: - // - parentId falls back to `state.currentAssistantMessageId`, so the - // new turn chains off the previous assistant instead of the tool - // message (observed in prod: 4/11 step boundaries in one topic). - // - model/provider are written as null on the new assistant. - // - // Adopt the DB view as authoritative whenever it carries more resolved - // state than memory. `tools[]` is rewritten end-to-end on every Phase-3 - // backfill, so it's safe to replace wholesale rather than merge by id — - // the same-batch transient where mem has a tool DB hasn't seen yet does - // not happen at refresh time (refresh runs before the event loop). - const dbTools = (refreshed?.tools ?? []) as ChatToolPayload[]; - const dbResolvedToolCount = dbTools.filter((t) => !!t.result_msg_id).length; - const memResolvedToolCount = state.toolState.payloads.filter((t) => !!t.result_msg_id).length; - if ( - dbTools.length > state.toolState.payloads.length || - dbResolvedToolCount > memResolvedToolCount - ) { - state.toolState = { - payloads: [...dbTools], - // Only treat tool ids whose `result_msg_id` is already filled in as - // persisted. Phase 1 of `persistToolBatch` writes `tools[]` before - // creating the `role:'tool'` row (Phase 2), so a refresh that lands - // between the two would see an unresolved id. Marking that id as - // persisted would cause a subsequent retry of the same tools_calling - // event to skip the create (Phase 2) entirely — leaving the tool - // permanently without a tool message / result_msg_id. - persistedIds: new Set(dbTools.filter((t) => !!t.result_msg_id).map((t) => t.id)), - }; - } - if (!state.lastModel && refreshed?.model) state.lastModel = refreshed.model; - if (!state.lastProvider && refreshed?.provider) state.lastProvider = refreshed.provider; + await this.refreshToolMessageIndex(state); + await this.refreshMainStateFromDb(state); for (const event of params.events) { const key = eventKey(event); @@ -510,75 +384,171 @@ export class HeterogeneousPersistenceHandler { ? (stored.msgId ?? baseAssistantMessageId) : baseAssistantMessageId; - // Restore toolMsgIdByCallId from the DB so tool_results that arrive on a - // different replica than their tool_use can still be matched and persisted. - const toolPlugins = await this.deps.messageModel.listMessagePluginsByTopic(topicId); - const toolMsgIdByCallId = new Map(); - for (const plugin of toolPlugins) { - if (plugin.toolCallId) toolMsgIdByCallId.set(plugin.toolCallId, plugin.id); - } - - // Restore in-progress accumulators and tool state from the current assistant - // message so a cold replica (Vercel serverless — each request is a new process) - // continues from where the previous request left off rather than overwriting - // with an empty/shorter value. Without this, every ingest call would reset - // accumulatedContent to '' and toolState.payloads to [], causing: - // - content truncation: warm instance writes "hello world", cold instance - // accumulates only " more text" and overwrites with that shorter string. - // - tool duplication: cold instance sees persistedIds={}, re-creates already- - // persisted tool messages, and overwrites assistant.tools[] with only the - // current batch's tools (losing all previous ones). - const currentMsg = await this.deps.messageModel.findById(currentAssistantMessageId); - // Skip LOADING_FLAT placeholder — it is the initial DB value before any - // content arrives and must not be treated as real accumulated text. - const rawContent = (currentMsg?.content ?? '') as string; - const restoredContent = rawContent === LOADING_FLAT ? '' : rawContent; - const restoredReasoning = (currentMsg?.reasoning as { content?: string } | null)?.content ?? ''; - const restoredMetadata = ((currentMsg?.metadata as Record | null) ?? {}) as Record< - string, - any - >; - const restoredTools = (currentMsg?.tools ?? []) as ChatToolPayload[]; - const restoredTextSnapshotSeq = Number(restoredMetadata.heteroTextSnapshotSeq ?? 0); - // Phase 1 of `persistToolBatch` writes `tools[]` BEFORE the tool message - // row is created (Phase 2 sets `result_msg_id`). Only ids that already - // carry a `result_msg_id` are truly persisted — restoring an unresolved - // id into `persistedIds` would make a retry of the same tools_calling - // event skip the Phase 2 create, orphaning the tool forever. - const restoredPersistedIds = new Set( - restoredTools.filter((t) => !!t.result_msg_id).map((t) => t.id), - ); - state = { - accumulatedContent: restoredContent, - accumulatedReasoning: restoredReasoning, agentId: topic?.agentId ?? null, - currentAssistantMessageId, - lastModel: undefined, - lastProvider: undefined, lastStepIndex: 0, - lastTextSnapshotSeq: Number.isFinite(restoredTextSnapshotSeq) ? restoredTextSnapshotSeq : 0, - messageMetadata: restoredMetadata, + main: createMainAgentRunState(currentAssistantMessageId), operationId, processedKeys: new Set(), - subagentState: createSubagentRunsState(), - toolMsgIdByCallId, - toolState: { payloads: restoredTools, persistedIds: restoredPersistedIds }, + toolMsgIdByCallId: new Map(), topicId, }; + await this.refreshToolMessageIndex(state); + await this.refreshMainStateFromDb(state); operationStates.set(operationId, state); log( 'created state for operation %s on topic %s msgId=%s tools=%d restored(content=%d tools=%d)', operationId, topicId, currentAssistantMessageId, - toolMsgIdByCallId.size, - restoredContent.length, - restoredTools.length, + state.toolMsgIdByCallId.size, + state.main.accContent.length, + state.main.toolState.payloads.length, ); return state; } + private createEmptyMainToolState(): MainAgentTurnToolState { + return { payloads: [], persistedIds: new Set(), toolMsgIdByCallId: new Map() }; + } + + private toAssistantSnapshot( + message: AssistantMessageDbLike | null | undefined, + ): AssistantDbSnapshot { + const rawContent = (message?.content ?? '') as string; + const metadata = ((message?.metadata as Record | null) ?? {}) as Record< + string, + any + >; + const textSnapshotSeq = Number(metadata.heteroTextSnapshotSeq ?? 0); + return { + content: rawContent === LOADING_FLAT ? '' : rawContent, + metadata, + model: message?.model, + parentId: message?.parentId, + provider: message?.provider, + reasoning: (message?.reasoning as { content?: string } | null)?.content ?? '', + textSnapshotSeq: Number.isFinite(textSnapshotSeq) ? textSnapshotSeq : 0, + tools: (message?.tools ?? []) as ChatToolPayload[], + }; + } + + private toToolPayload(tool: ChatToolPayload): ToolCallPayload { + return { + apiName: tool.apiName, + arguments: tool.arguments, + id: tool.id, + identifier: tool.identifier, + type: tool.type, + }; + } + + private buildMainToolStateFromSnapshot( + snapshot: AssistantDbSnapshot, + toolMsgIdByCallId: Map, + ): MainAgentTurnToolState { + const toolState = this.createEmptyMainToolState(); + const seen = new Set(); + + for (const tool of snapshot.tools) { + if (!tool.id || seen.has(tool.id)) continue; + const toolMessageId = tool.result_msg_id ?? toolMsgIdByCallId.get(tool.id); + if (!toolMessageId) continue; + + seen.add(tool.id); + toolState.payloads.push(this.toToolPayload(tool)); + toolState.persistedIds.add(tool.id); + toolState.toolMsgIdByCallId.set(tool.id, toolMessageId); + } + + return toolState; + } + + private getLastSnapshotToolMessageId( + snapshot: AssistantDbSnapshot, + toolMsgIdByCallId: Map, + ): string | undefined { + for (const tool of [...snapshot.tools].reverse()) { + const toolMessageId = tool.result_msg_id ?? toolMsgIdByCallId.get(tool.id); + if (toolMessageId) return toolMessageId; + } + return undefined; + } + + private async refreshToolMessageIndex(state: OperationState): Promise { + const toolPlugins = await this.deps.messageModel.listMessagePluginsByTopic(state.topicId); + for (const plugin of toolPlugins) { + if (plugin.toolCallId) state.toolMsgIdByCallId.set(plugin.toolCallId, plugin.id); + } + } + + private async getLastChildToolMessageId(assistantMessageId: string): Promise { + return await this.deps.messageModel.getLastChildToolMessageId?.(assistantMessageId); + } + + /** + * Rehydrate reducer state from the DB projection of the current assistant. + * This preserves the shared pure reducer as the single state machine while + * keeping the serverless-specific "another replica already wrote this" + * recovery outside the package. + */ + private async refreshMainStateFromDb(state: OperationState): Promise { + const currentMsg = await this.deps.messageModel.findById(state.main.currentAssistantId); + const snapshot = this.toAssistantSnapshot(currentMsg); + + if (snapshot.textSnapshotSeq > state.main.lastTextSnapshotSeq) { + state.main.accContent = snapshot.content; + state.main.lastTextSnapshotSeq = snapshot.textSnapshotSeq; + state.main.turnMetadata = snapshot.metadata; + } else { + if (snapshot.content.length > state.main.accContent.length) { + state.main.accContent = snapshot.content; + } + if ( + Object.keys(state.main.turnMetadata).length === 0 && + Object.keys(snapshot.metadata).length > 0 + ) { + state.main.turnMetadata = snapshot.metadata; + } + } + + if (snapshot.reasoning.length > state.main.accReasoning.length) { + state.main.accReasoning = snapshot.reasoning; + } + + const dbToolState = this.buildMainToolStateFromSnapshot(snapshot, state.toolMsgIdByCallId); + for (const [toolCallId, toolMessageId] of dbToolState.toolMsgIdByCallId) { + state.toolMsgIdByCallId.set(toolCallId, toolMessageId); + } + if ( + dbToolState.payloads.length > state.main.toolState.payloads.length || + dbToolState.persistedIds.size > state.main.toolState.persistedIds.size + ) { + state.main.toolState = dbToolState; + } + + if (snapshot.model) state.main.turnModel = snapshot.model; + if (snapshot.provider) state.main.turnProvider = snapshot.provider; + + // Prefer the authoritative child tool row over the assistant.tools[] JSONB + // mirror. During multi-tool batches, an earlier tool may already have + // result_msg_id backfilled while a later tool row exists but Phase 3 has not + // rewritten the JSONB payload yet; anchoring from the snapshot would pick + // the earlier tool and fork the main wire. + const currentTurnToolId = + (await this.getLastChildToolMessageId(state.main.currentAssistantId)) ?? + this.getLastSnapshotToolMessageId(snapshot, state.toolMsgIdByCallId); + if (currentTurnToolId) { + state.main.lastToolMsgIdEver = currentTurnToolId; + return; + } + + const toolMessageIds = new Set(state.toolMsgIdByCallId.values()); + if (snapshot.parentId && toolMessageIds.has(snapshot.parentId)) { + state.main.lastToolMsgIdEver = snapshot.parentId; + } + } + private async syncAssistantPointerForAdvancedStep(state: OperationState): Promise { const topic = await this.deps.topicModel.findById(state.topicId); const running = topic?.metadata?.runningOperation; @@ -597,38 +567,22 @@ export class HeterogeneousPersistenceHandler { if ( !authoritativeAssistantMessageId || - authoritativeAssistantMessageId === state.currentAssistantMessageId + authoritativeAssistantMessageId === state.main.currentAssistantId ) { return; } - const currentMsg = await this.deps.messageModel.findById(authoritativeAssistantMessageId); - const restoredContent = (currentMsg?.content ?? '') as string; - const restoredReasoning = (currentMsg?.reasoning as { content?: string } | null)?.content ?? ''; - const restoredMetadata = ((currentMsg?.metadata as Record | null) ?? {}) as Record< - string, - any - >; - const restoredTools = (currentMsg?.tools ?? []) as ChatToolPayload[]; - const restoredTextSnapshotSeq = Number(restoredMetadata.heteroTextSnapshotSeq ?? 0); - - state.currentAssistantMessageId = authoritativeAssistantMessageId; - state.accumulatedContent = restoredContent; - state.accumulatedReasoning = restoredReasoning; - state.lastTextSnapshotSeq = Number.isFinite(restoredTextSnapshotSeq) - ? restoredTextSnapshotSeq - : 0; - state.messageMetadata = restoredMetadata; - state.toolState = { - payloads: restoredTools, - // Same `persistedIds` invariant as `loadOrCreateState`: only ids with a - // backfilled `result_msg_id` count as persisted. An unresolved id (Phase - // 1 written, Phase 2 not yet) must remain re-createable so a retry on - // this replica can complete the tool message. - persistedIds: new Set( - restoredTools.filter((tool) => !!tool.result_msg_id).map((tool) => tool.id), - ), + state.main = { + ...state.main, + accContent: '', + accReasoning: '', + currentAssistantId: authoritativeAssistantMessageId, + lastTextSnapshotSeq: 0, + toolState: this.createEmptyMainToolState(), + turnMetadata: {}, }; + await this.refreshToolMessageIndex(state); + await this.refreshMainStateFromDb(state); log( 'synced warm state op=%s to assistant=%s after step advance', @@ -640,295 +594,183 @@ export class HeterogeneousPersistenceHandler { // ─── Event dispatch ────────────────────────────────────────────────────── private async handleEvent(state: OperationState, event: AgentStreamEvent): Promise { - switch (event.type) { - case 'tool_result': { - await this.handleToolResult(state, event); - return; - } - - case 'step_complete': { - if (event.data?.phase === 'turn_metadata') { - await this.handleTurnMetadata(state, event); - } - return; - } - - case 'stream_start': { - if (event.data?.newStep) { - await this.handleStepStart(state); - } else { - await this.handleStreamInit(state, event); - } - return; - } - - case 'stream_chunk': { - await this.handleStreamChunk(state, event); - return; - } - - case 'agent_runtime_end': - case 'error': { - await this.handleTerminal(state, event); - return; - } - - // tool_start / tool_end / step_start / stream_end / agent_runtime_init / - // tool_execute / stream_retry: no-op — server only persists the - // adapter-level events, lifecycle markers are renderer-side concerns. - default: { - return; - } - } + await this.reduceAndApply(state, event); } - // ─── Per-event handlers ────────────────────────────────────────────────── + // ─── Main-agent reducer interpreter ────────────────────────────────────── - /** - * The adapter's FIRST `stream_start` (CC's system/init, `newStep` unset) - * carries the CLI's authoritative model/provider (e.g. claude-sonnet-x / - * 'claude-code'). Capture it into step state and backfill the placeholder - * assistant so the model tag shows the real CLI model from the very first - * turn — even before (or entirely without) any usage-bearing `turn_metadata`. - * - * The placeholder is created with only `provider: heteroType` and no model - * (see `aiAgent.execAgent`), so without this the first turn would render an - * empty model until `turn_metadata` lands, and a usage-less run would never - * resolve a real model at all. - */ - private async handleStreamInit(state: OperationState, event: AgentStreamEvent) { - const { model, provider } = event.data ?? {}; - const update: Record = {}; - if (model) { - state.lastModel = model; - update.model = model; - } - if (provider) { - state.lastProvider = provider; - update.provider = provider; - } - if (Object.keys(update).length === 0) return; - await this.deps.messageModel.update(state.currentAssistantMessageId, update); - } - - private async handleTurnMetadata(state: OperationState, event: AgentStreamEvent) { - const { model, provider, usage } = event.data ?? {}; - - // Subagent-tagged usage routes through the coordinator (RecordUsage intent → - // written onto the subagent's in-thread assistant). It must NOT touch - // `state.lastModel` / `state.lastProvider`, which carry main-agent step - // boundary state and would contaminate the next main-agent assistant. - if ((event.data as any)?.subagent) { - await this.reduceAndApply(state, event); - return; - } - - if (model) state.lastModel = model; - if (provider) state.lastProvider = provider; - - // Persist model/provider/usage to DB so a replica that didn't drain this - // event can still recover lastModel/lastProvider via the ingest-refresh - // path. Previously only `metadata.usage` was written, which left - // model/provider in-memory only — and the next step boundary on a - // different replica created assistants with model=null/provider=null. - const update: Record = {}; - if (usage) { - state.messageMetadata = { ...state.messageMetadata, usage }; - update.metadata = state.messageMetadata; - } - if (model) update.model = model; - if (provider) update.provider = provider; - if (Object.keys(update).length === 0) return; - - await this.deps.messageModel.update(state.currentAssistantMessageId, update); - } - - /** - * `stream_start { newStep: true }` opens a new assistant turn within the - * same operation. Mirrors renderer logic: - * - * 1. Flush prior assistant's accumulated content / reasoning / model - * 2. Create the new assistant — chained off the last main-agent tool - * message (so the wire becomes `asst → tool → asst → tool → ...`), - * falling back to the prev assistant when the prior step had no tools - * 3. Reset main-agent tool state (NOT the global `toolMsgIdByCallId` — - * late subagent tool_results from prior steps still resolve via it) - */ - private async handleStepStart(state: OperationState) { - const prevUpdate: Record = {}; - if (state.accumulatedContent) prevUpdate.content = state.accumulatedContent; - if (state.accumulatedReasoning) prevUpdate.reasoning = { content: state.accumulatedReasoning }; - if (state.lastModel) prevUpdate.model = state.lastModel; - if (state.lastProvider) prevUpdate.provider = state.lastProvider; - - if (Object.keys(prevUpdate).length > 0) { - await this.deps.messageModel.update(state.currentAssistantMessageId, prevUpdate); - } - - let lastToolMsgId = [...state.toolState.payloads] - .reverse() - .find((p) => !!p.result_msg_id)?.result_msg_id; - - // In-memory tool state can be empty or unresolved here: a different - // replica drained this step's `tools_calling`, or the batch-start refresh - // read the assistant's `tools[]` JSONB before its Phase-3 `result_msg_id` - // backfill was visible. Chaining off `currentAssistantMessageId` in that - // case forks the wire (`asst1 → asst2` with the tools as a dead branch), - // which renders as two disconnected bubbles. Fall back to the - // authoritative source — the `role:'tool'` rows themselves (Phase 2), - // which are committed earlier and independently of the JSONB mirror. - if (!lastToolMsgId) { - lastToolMsgId = await this.deps.messageModel.getLastChildToolMessageId( - state.currentAssistantMessageId, - ); - } - - const stepParentId = lastToolMsgId || state.currentAssistantMessageId; - - const newMsg = await this.deps.messageModel.create({ - agentId: state.agentId ?? undefined, - content: '', - model: state.lastModel, - parentId: stepParentId, - provider: state.lastProvider, - role: 'assistant', + private mainReduceCtx(state: OperationState): MainAgentReduceCtx { + return { + agentId: state.agentId, + newId: (kind) => (kind === 'thread' ? generateThreadId() : `msg_${createNanoId(18)()}`), topicId: state.topicId, - }); - - // Persist BEFORE advancing in-memory state (P2 fix). If this write fails - // transiently and the event is retried, state is still at the previous step - // so handleStepStart re-creates the new message with the correct parent - // rather than chaining off the partially-created one. The first attempt's - // empty message becomes an orphan but does not corrupt the turn chain. - await this.deps.topicModel.updateMetadata(state.topicId, { - heteroCurrentMsgId: { msgId: newMsg.id, operationId: state.operationId }, - }); - - // Advance state only after the DB write lands. - state.currentAssistantMessageId = newMsg.id; - state.accumulatedContent = ''; - state.accumulatedReasoning = ''; - state.lastTextSnapshotSeq = 0; - state.messageMetadata = {}; - state.toolState = { payloads: [], persistedIds: new Set() }; + }; } - private async handleStreamChunk(state: OperationState, event: AgentStreamEvent) { - const chunk = event.data ?? {}; + /** + * Single reducer entry point for the server persistence path. The reducer owns + * both the main thread and nested subagent runs; this interpreter only applies + * declarative intents to DB models. State commits after every intent succeeds, + * so a failing DB write leaves the event unmarked and the BatchIngester retry + * replays it against the previous reducer state. + */ + private async reduceAndApply(state: OperationState, event: AgentStreamEvent) { + const { intents, state: next } = reduceMainAgent(state.main, event, this.mainReduceCtx(state)); - // Subagent-scoped chunks (text / reasoning / tools_calling) are routed - // through the shared run coordinator — it owns thread create, turn - // boundaries, tool persistence, and finalize. - if (chunk.subagent) { - await this.reduceAndApply(state, event); - return; - } - - if (chunk.chunkType === 'text' && typeof chunk.content === 'string') { - const snapshotMode = chunk.snapshotMode; - const snapshotSeq = - typeof chunk.snapshotSeq === 'number' ? Number(chunk.snapshotSeq) : undefined; - - if (snapshotMode === 'replace' && snapshotSeq !== undefined) { - if (snapshotSeq <= state.lastTextSnapshotSeq) { - log( - 'skip stale text snapshot op=%s seq=%d current=%d', - state.operationId, - snapshotSeq, - state.lastTextSnapshotSeq, - ); - return; - } - state.lastTextSnapshotSeq = snapshotSeq; - state.messageMetadata = { - ...state.messageMetadata, - heteroTextSnapshotSeq: snapshotSeq, - }; - state.accumulatedContent = chunk.content; + for (const intent of intents) { + if ('threadId' in intent) { + await this.applySubagentIntent(state, intent as SubagentIntent); } else { - state.accumulatedContent += chunk.content; + await this.applyMainIntent(state, intent as MainAgentIntent); } - return; } - if (chunk.chunkType === 'reasoning' && typeof chunk.reasoning === 'string') { - state.accumulatedReasoning += chunk.reasoning; - return; - } + state.main = next; + } - if (chunk.chunkType === 'tools_calling') { - const tools = chunk.toolsCalling as ToolCallPayload[] | undefined; - if (!tools?.length) return; - await this.persistMainToolBatch(state, tools); + private async applyMainIntent(state: OperationState, intent: MainAgentIntent) { + switch (intent.kind) { + case 'createAssistant': { + await this.deps.messageModel.create( + { + agentId: intent.agentId ?? undefined, + content: '', + ...(intent.signal ? { metadata: { signal: intent.signal } } : {}), + model: intent.model, + parentId: intent.parentId, + provider: intent.provider, + role: 'assistant', + topicId: intent.topicId ?? state.topicId, + } as any, + intent.messageId, + ); + + await this.deps.topicModel.updateMetadata(state.topicId, { + heteroCurrentMsgId: { msgId: intent.messageId, operationId: state.operationId }, + }); + return; + } + + case 'persistAssistant': { + const update: Record = {}; + if (intent.content !== undefined) update.content = intent.content; + if (intent.reasoning !== undefined) update.reasoning = { content: intent.reasoning }; + if (intent.model) update.model = intent.model; + if (intent.provider) update.provider = intent.provider; + if (intent.metadata) update.metadata = intent.metadata; + if (Object.keys(update).length > 0) { + await this.deps.messageModel.update(intent.messageId, update); + } + return; + } + + // Token-level live updates are renderer-only. The server persists durable + // snapshots via persistAssistant / persistToolBatch / flushBatchContent. + case 'streamContent': { + return; + } + + case 'persistToolBatch': { + const buildUpdate = (withResult: boolean) => + this.buildToolBatchUpdate(intent.tools, { + content: intent.content, + reasoning: intent.reasoning, + withResult, + }); + + // Phase 1: assistant.tools[] without result_msg_id. + await this.deps.messageModel.update(intent.assistantMessageId, buildUpdate(false)); + + // Phase 2: create new tool rows with reducer-preallocated ids. + for (const tool of intent.tools) { + if (!tool.isNew) continue; + await this.deps.messageModel.create( + { + agentId: state.agentId ?? undefined, + content: '', + parentId: intent.assistantMessageId, + plugin: { + apiName: tool.payload.apiName, + arguments: tool.payload.arguments, + identifier: tool.payload.identifier, + type: tool.payload.type, + }, + role: 'tool', + threadId: null, + tool_call_id: tool.payload.id, + topicId: state.topicId, + } as any, + tool.toolMessageId, + ); + state.toolMsgIdByCallId.set(tool.payload.id, tool.toolMessageId); + } + + // Phase 3: backfill result_msg_id. + await this.deps.messageModel.update(intent.assistantMessageId, buildUpdate(true)); + return; + } + + case 'resolveToolResult': { + await this.applyToolResult(state, intent); + return; + } + + case 'recordUsage': { + const update: Record = {}; + if (intent.usage !== undefined) { + update.metadata = { ...state.main.turnMetadata, usage: intent.usage }; + } + if (intent.model) update.model = intent.model; + if (intent.provider) update.provider = intent.provider; + if (Object.keys(update).length > 0) { + await this.deps.messageModel.update(intent.messageId, update); + } + return; + } + + case 'setError': { + const update: Record = { error: this.toChatMessageError(intent.errorData) }; + if (intent.clearContent) update.content = ''; + await this.deps.messageModel.update(intent.messageId, update); + return; + } } } - private async handleToolResult(state: OperationState, event: AgentStreamEvent) { - const data = event.data ?? {}; - const toolCallId: string | undefined = data.toolCallId; - if (!toolCallId) return; - - const content: string = data.content ?? ''; - const isError: boolean = !!data.isError; - const pluginState: Record | undefined = data.pluginState; - - const toolMsgId = state.toolMsgIdByCallId.get(toolCallId); - if (toolMsgId) { - await this.deps.messageModel.updateToolMessage(toolMsgId, { - content, - pluginError: isError ? { message: content } : undefined, - pluginState, - }); - } else { - // Late-arriving result for a call we never saw the tool_use for is - // recoverable on a follow-up batch (out-of-order delivery); log and - // move on so the rest of the batch lands. - log('tool_result for unknown toolCallId=%s op=%s', toolCallId, state.operationId); + private async applyToolResult( + state: OperationState, + intent: { + content: string; + isError: boolean; + pluginState?: Record; + toolCallId: string; + }, + ) { + const toolMsgId = state.toolMsgIdByCallId.get(intent.toolCallId); + if (!toolMsgId) { + log('tool_result for unknown toolCallId=%s op=%s', intent.toolCallId, state.operationId); + return; } - // Route through the coordinator. For a parent-spawn tool_result it - // finalizes the run (terminal assistant + thread Active); for a subagent - // inner tool_result the coordinator emits ResolveToolResult, which the - // server interpreter no-ops because the generic `toolMsgIdByCallId` update - // above already wrote the DB content. Main-agent tool_results yield no - // intents. - await this.reduceAndApply(state, event); + await this.deps.messageModel.updateToolMessage(toolMsgId, { + content: intent.content, + pluginError: intent.isError ? { message: intent.content } : undefined, + pluginState: intent.pluginState, + }); } - private async handleTerminal(state: OperationState, event: AgentStreamEvent) { - const isError = event.type === 'error'; - const messageError = isError ? this.toChatMessageError(event.data) : undefined; - const suppressEcho = - !!messageError && shouldSuppressTerminalErrorEcho(state.accumulatedContent, messageError); - - const updateValue: Record = {}; - if (suppressEcho) { - // CC sometimes streams the error string into `content` BEFORE emitting - // the structured error event. When the two payloads echo each other, - // surface only the structured error and clear the duplicate text. - updateValue.content = ''; - } else if (state.accumulatedContent) { - updateValue.content = state.accumulatedContent; - } - if (state.accumulatedReasoning) updateValue.reasoning = { content: state.accumulatedReasoning }; - if (state.lastModel) updateValue.model = state.lastModel; - if (state.lastProvider) updateValue.provider = state.lastProvider; - if (messageError) updateValue.error = messageError; - - if (Object.keys(updateValue).length > 0) { - await this.deps.messageModel.update(state.currentAssistantMessageId, updateValue); - } - - // Drain any subagent runs that never saw their parent tool_result (CLI - // crashed mid-spawn, or main never closed the spawn). The coordinator - // flushes each run's trailing content and marks the thread Active — no - // terminal assistant, since there's no authoritative resultContent. - await this.reduceAndApply(state, event); - - // Reset accumulators so a `finish()` flush after a terminal event in the - // stream is a no-op (idempotent finalize). - state.accumulatedContent = ''; - state.accumulatedReasoning = ''; + private buildToolBatchUpdate( + tools: Array<{ payload: ToolCallPayload; toolMessageId: string }>, + options: { content?: string; reasoning?: string; withResult: boolean }, + ): Record { + const update: Record = { + tools: tools.map(({ payload, toolMessageId }) => + options.withResult ? { ...payload, result_msg_id: toolMessageId } : { ...payload }, + ), + }; + if (options.content) update.content = options.content; + if (options.reasoning) update.reasoning = { content: options.reasoning }; + return update; } /** Final safety flush triggered by `heteroFinish`. */ @@ -937,14 +779,14 @@ export class HeterogeneousPersistenceHandler { error: { message: string; type: string } | undefined, result: 'success' | 'error' | 'cancelled', ) { - if (!state.accumulatedContent && !state.accumulatedReasoning && !error && result !== 'error') { + if (!state.main.accContent && !state.main.accReasoning && !error && result !== 'error') { // Nothing pending — terminal event already flushed in-stream. return; } const updateValue: Record = {}; - if (state.accumulatedContent) updateValue.content = state.accumulatedContent; - if (state.accumulatedReasoning) updateValue.reasoning = { content: state.accumulatedReasoning }; + if (state.main.accContent) updateValue.content = state.main.accContent; + if (state.main.accReasoning) updateValue.reasoning = { content: state.main.accReasoning }; if (error) { // `error.type` is a free-form string from the CLI; coerce to the // shared union via `as` since the runtime contract accepts arbitrary @@ -959,7 +801,7 @@ export class HeterogeneousPersistenceHandler { } if (Object.keys(updateValue).length > 0) { - await this.deps.messageModel.update(state.currentAssistantMessageId, updateValue); + await this.deps.messageModel.update(state.main.currentAssistantId, updateValue); } } @@ -970,12 +812,12 @@ export class HeterogeneousPersistenceHandler { * event (which are the normal flush triggers). */ private async flushBatchContent(state: OperationState): Promise { - if (!state.accumulatedContent && !state.accumulatedReasoning) return; + if (!state.main.accContent && !state.main.accReasoning) return; const update: Record = {}; - if (state.accumulatedContent) update.content = state.accumulatedContent; - if (state.accumulatedReasoning) update.reasoning = { content: state.accumulatedReasoning }; - if (Object.keys(state.messageMetadata).length > 0) update.metadata = state.messageMetadata; - await this.deps.messageModel.update(state.currentAssistantMessageId, update); + if (state.main.accContent) update.content = state.main.accContent; + if (state.main.accReasoning) update.reasoning = { content: state.main.accReasoning }; + if (Object.keys(state.main.turnMetadata).length > 0) update.metadata = state.main.turnMetadata; + await this.deps.messageModel.update(state.main.currentAssistantId, update); } private toChatMessageError(data: unknown): ChatMessageError { @@ -996,133 +838,6 @@ export class HeterogeneousPersistenceHandler { }; } - // ─── 3-phase tool persist (main agent) ─────────────────────────────────── - - /** - * Same shape as renderer's `persistToolBatch` (lines 319–411 in - * `heterogeneousAgentExecutor.ts`): - * - * 1. Append fresh tools to `state.payloads`, write them on the assistant - * together with the latest streamed content / reasoning so DB stays - * in sync (no orphan-tool window once the parser sees them). - * 2. Create a `role:'tool'` message per fresh tool_use, capture its DB - * id into the global `toolMsgIdByCallId` lookup, and write - * `result_msg_id` onto the matching `state.payloads` entry. - * 3. Re-write `state.payloads` so phase 2's backfilled `result_msg_id` - * lands on the assistant row. - * - * Idempotent on retry: tool_use ids already in `state.persistedIds` are - * skipped up front. - */ - private async persistToolBatch( - incoming: ToolCallPayload[], - persistState: ToolPersistenceState, - assistantMessageId: string, - state: OperationState, - snapshot: { content: string; reasoning: string }, - threadId?: string, - ): Promise<{ newToolMsgIds: string[] }> { - // Merge incoming tools into the payloads array, de-duped by id. On a - // retry of the same event, payloads already has these entries — skip the - // re-push to keep phase 1/3 writes idempotent. - for (const tool of incoming) { - if (!persistState.payloads.some((p) => p.id === tool.id)) { - persistState.payloads.push({ ...tool }); - } - } - - const buildUpdate = (): Record => { - const update: Record = { tools: persistState.payloads }; - if (snapshot.content) update.content = snapshot.content; - if (snapshot.reasoning) update.reasoning = { content: snapshot.reasoning }; - return update; - }; - - // ─── Phase 1: pre-register tools[] on the assistant ─── - // Idempotent re-write of the tools[] JSONB column. Throws propagate so - // the outer ingest loop leaves the event un-marked → retry replays. - await this.deps.messageModel.update(assistantMessageId, buildUpdate()); - - // ─── Phase 2: create tool messages, capture ids ─── - // Only create rows for tools that haven't been persisted yet. On retry - // after a phase 2 mid-batch failure, this skips the ones that already - // landed (their ids are in `persistedIds`) and re-tries the rest. - const newToolMsgIds: string[] = []; - const freshForCreate = incoming.filter((t) => !persistState.persistedIds.has(t.id)); - for (const tool of freshForCreate) { - const result = await this.deps.messageModel.create({ - agentId: state.agentId ?? undefined, - content: '', - parentId: assistantMessageId, - plugin: { - apiName: tool.apiName, - arguments: tool.arguments, - identifier: tool.identifier, - type: tool.type, - }, - role: 'tool', - threadId: threadId ?? null, - tool_call_id: tool.id, - topicId: state.topicId, - }); - // Mark persisted ONLY after the create resolves cleanly — a thrown - // create leaves the id absent so retries re-attempt this tool. - state.toolMsgIdByCallId.set(tool.id, result.id); - persistState.persistedIds.add(tool.id); - newToolMsgIds.push(result.id); - const entry = persistState.payloads.find((p) => p.id === tool.id); - if (entry) entry.result_msg_id = result.id; - } - - // ─── Phase 3: backfill result_msg_id on assistant.tools[] ─── - // Always runs: even if every tool was already persisted in a prior call, - // a phase 3 retry after a partial-failure replay needs to land the - // up-to-date payloads. The write is idempotent (same JSONB). - await this.deps.messageModel.update(assistantMessageId, buildUpdate()); - - return { newToolMsgIds }; - } - - private async persistMainToolBatch(state: OperationState, tools: ToolCallPayload[]) { - await this.persistToolBatch(tools, state.toolState, state.currentAssistantMessageId, state, { - content: state.accumulatedContent, - reasoning: state.accumulatedReasoning, - }); - } - - // ─── Subagent thread + turn tracking ───────────────────────────────────── - - // ─── Subagent run coordinator (shared reducer) interpreter ─────────────── - - private subagentReduceCtx(state: OperationState): SubagentReduceCtx { - return { - agentId: state.agentId, - mainAssistantId: state.currentAssistantMessageId, - // Pre-allocate DB-compatible ids so the reducer can chain parentId - // without a create→read round-trip. `thd_…` matches `generateThreadId`. - newId: (kind) => (kind === 'thread' ? generateThreadId() : `msg_${createNanoId(18)()}`), - topicId: state.topicId, - }; - } - - /** - * Reduce one event through the shared subagent coordinator and apply the - * resulting intents. Commit-on-success: `state.subagentState` is only - * advanced after every intent lands, so a throwing intent leaves the event - * unmarked in `processedKeys` and the CLI BatchIngester replays it against - * the unchanged state (same resilience the per-event idempotency loop relies - * on elsewhere). - */ - private async reduceAndApply(state: OperationState, event: AgentStreamEvent) { - const { state: next, intents } = reduceSubagentRuns( - state.subagentState, - event, - this.subagentReduceCtx(state), - ); - for (const intent of intents) await this.applySubagentIntent(state, intent); - state.subagentState = next; - } - private async applySubagentIntent(state: OperationState, intent: SubagentIntent) { switch (intent.kind) { case 'createThread': { @@ -1163,9 +878,8 @@ export class HeterogeneousPersistenceHandler { return; } - // Inner subagent tool_result DB content is already written by the generic - // `toolMsgIdByCallId` update in handleToolResult; nothing extra server-side. case 'resolveToolResult': { + await this.applyToolResult(state, intent); return; } @@ -1180,16 +894,12 @@ export class HeterogeneousPersistenceHandler { } case 'persistToolBatch': { - const buildUpdate = (withResult: boolean): Record => { - const update: Record = { - tools: intent.tools.map((t) => - withResult ? { ...t.payload, result_msg_id: t.toolMessageId } : { ...t.payload }, - ), - }; - if (intent.content) update.content = intent.content; - if (intent.reasoning) update.reasoning = { content: intent.reasoning }; - return update; - }; + const buildUpdate = (withResult: boolean) => + this.buildToolBatchUpdate(intent.tools, { + content: intent.content, + reasoning: intent.reasoning, + withResult, + }); // Phase 1: pre-register assistant.tools[] (no result_msg_id yet). await this.deps.messageModel.update(intent.assistantMessageId, buildUpdate(false)); diff --git a/apps/server/src/services/heterogeneousAgent/__tests__/HeterogeneousPersistenceHandler.test.ts b/apps/server/src/services/heterogeneousAgent/__tests__/HeterogeneousPersistenceHandler.test.ts index 24483c6053..18653d8fae 100644 --- a/apps/server/src/services/heterogeneousAgent/__tests__/HeterogeneousPersistenceHandler.test.ts +++ b/apps/server/src/services/heterogeneousAgent/__tests__/HeterogeneousPersistenceHandler.test.ts @@ -486,9 +486,9 @@ describe('HeterogeneousPersistenceHandler', () => { if (id === 'asst-1') order.push('update-asst'); return origUpdate(id, patch); }); - h.messageModel.create.mockImplementation(async (input: any) => { + h.messageModel.create.mockImplementation(async (input: any, id?: string) => { order.push(input.role === 'tool' ? 'create-tool' : 'create-other'); - return origCreate(input); + return origCreate(input, id); }); const tool = { @@ -767,6 +767,91 @@ describe('HeterogeneousPersistenceHandler', () => { expect(step2Asst!.parentId).toBe('tool-row-only'); }); + it('chains off the latest tool row when parallel tools are only partially backfilled', async () => { + // Regression for main-chain breaks with parallel/multi tool calls: + // tool A is visible in assistant.tools[].result_msg_id, while tool B's + // row exists but Phase 3 has not backfilled assistant.tools[] yet. The + // step anchor must be tool B, not the earlier resolved tool A. + const h = createHarness({ + assistantMessageId: 'asst-init', + operationId: 'op-1', + topicId: 'topic-1', + }); + + const metaState: FakeTopicMetadata = { + runningOperation: { assistantMessageId: 'asst-init', operationId: 'op-1' }, + }; + h.topicModel.findById.mockImplementation(async (id: string) => { + if (id !== 'topic-1') return null; + return { agentId: null, id, metadata: { ...metaState } }; + }); + h.topicModel.updateMetadata.mockImplementation(async (_id: string, patch: any) => { + Object.assign(metaState, patch); + }); + + await h.handler.ingest({ + events: [buildEvent('stream_start', 1, { newStep: true })], + operationId: 'op-1', + topicId: 'topic-1', + }); + const step1Asst = [...h.messages.values()].find( + (m) => m.role === 'assistant' && m.id !== 'asst-init', + )!; + + h.messages.set('tool-a-backfilled', { + agentId: null, + content: 'tool A result', + id: 'tool-a-backfilled', + parentId: step1Asst.id, + role: 'tool', + threadId: null, + tool_call_id: 'tc-a', + topicId: 'topic-1', + }); + h.messages.set('tool-b-row-only', { + agentId: null, + content: 'tool B result', + id: 'tool-b-row-only', + parentId: step1Asst.id, + role: 'tool', + threadId: null, + tool_call_id: 'tc-b', + topicId: 'topic-1', + }); + h.messages.set(step1Asst.id, { + ...h.messages.get(step1Asst.id)!, + tools: [ + { + apiName: 'Read', + arguments: '{}', + id: 'tc-a', + identifier: 'read', + result_msg_id: 'tool-a-backfilled', + type: 'default', + }, + { + apiName: 'Bash', + arguments: '{}', + id: 'tc-b', + identifier: 'bash', + type: 'default', + }, + ], + }); + + await h.handler.ingest({ + events: [buildEvent('stream_start', 2, { newStep: true })], + operationId: 'op-1', + topicId: 'topic-1', + }); + + const step2Asst = [...h.messages.values()].find( + (m) => m.role === 'assistant' && m.id !== 'asst-init' && m.id !== step1Asst.id, + ); + expect(step2Asst).toBeDefined(); + expect(step2Asst!.parentId).toBe('tool-b-row-only'); + }); + it('ignores subagent tool rows (threadId set) when resolving the step anchor', async () => { // A subagent tool row lives on its own thread and must never anchor the // main-agent wire. If the only `role:'tool'` child carries a threadId,