diff --git a/apps/server/src/services/heterogeneousAgent/HeterogeneousPersistenceHandler.ts b/apps/server/src/services/heterogeneousAgent/HeterogeneousPersistenceHandler.ts index 7be99fc7f8..67892ff04b 100644 --- a/apps/server/src/services/heterogeneousAgent/HeterogeneousPersistenceHandler.ts +++ b/apps/server/src/services/heterogeneousAgent/HeterogeneousPersistenceHandler.ts @@ -6,9 +6,14 @@ import type { MainAgentRunState, MainAgentTurnToolState, SubagentIntent, + SubagentRunSnapshot, ToolCallPayload, } from '@lobechat/heterogeneous-agents'; -import { createMainAgentRunState, reduceMainAgent } from '@lobechat/heterogeneous-agents'; +import { + createMainAgentRunState, + reduceMainAgent, + rehydrateSubagentRunsState, +} from '@lobechat/heterogeneous-agents'; import { AgentRuntimeErrorType, type ChatMessageError, @@ -219,6 +224,7 @@ export class HeterogeneousPersistenceHandler { await this.refreshToolMessageIndex(state); await this.refreshMainStateFromDb(state); + await this.refreshSubagentRunsFromDb(state); for (const event of params.events) { const key = eventKey(event); @@ -549,6 +555,96 @@ export class HeterogeneousPersistenceHandler { } } + /** + * Rebuild the in-flight subagent runs (`state.main.subagents`) from DB. + * + * The shared reducer keys runs by `parentToolCallId` and only lazy-creates a + * thread when the run is ABSENT from this map. On a cold serverless replica + * `createMainAgentRunState` seeds an empty map, so a subagent event whose + * thread already exists (created by an earlier batch / another replica) would + * fork a brand-new thread — the "大量无意义的 Subagent" bug. `refreshMainStateFromDb` + * rebuilds the main-agent half; this rebuilds the subagent half the same way. + * + * Merge semantics: only runs MISSING from the in-memory map are rehydrated, so + * a warm replica's live per-turn accumulators (`accContent`, current + * `toolState`) are never clobbered by the DB projection. Finalized runs are + * excluded (their thread is `Active`, not `Processing`), so a completed spawn + * is never resurrected. + * + * Best-effort: any DB hiccup (or a partial test mock without the query + * methods) leaves `state.main.subagents` untouched rather than aborting the + * whole ingest. + */ + private async refreshSubagentRunsFromDb(state: OperationState): Promise { + try { + const threads = await this.deps.threadModel.queryByTopicId(state.topicId); + const existing = state.main.subagents.runs; + const snapshots: SubagentRunSnapshot[] = []; + + for (const thread of threads ?? []) { + if (thread.type !== ThreadType.Isolation) continue; + if (thread.status !== ThreadStatus.Processing) continue; + const meta = thread.metadata as { operationId?: string; sourceToolCallId?: string } | null; + // Operation-scoped: only rehydrate threads THIS operation created. + // Topics are reused across turns, so a prior run that crashed / was + // cancelled without an ingested terminal event can leave its subagent + // thread stuck in `Processing`. Without this guard the next operation + // would merge that unrelated thread into its reducer state and then + // finalize/mutate it on its own terminal drain. Threads written before + // this field existed have no `operationId` and are skipped (safe — we + // can't attribute them, and the live run re-creates what it needs). + if (meta?.operationId !== state.operationId) continue; + const parentToolCallId = meta?.sourceToolCallId; + if (!parentToolCallId || existing.has(parentToolCallId)) continue; + + const messages = await this.deps.messageModel.query({ + threadId: thread.id, + topicId: state.topicId, + }); + const snapshot = this.buildSubagentSnapshot(parentToolCallId, thread.id, messages); + if (snapshot) snapshots.push(snapshot); + } + + if (snapshots.length === 0) return; + + // Union: rehydrated (missing) runs + the in-memory ones (which win, since + // they carry live accumulators the DB hasn't caught up to yet). + const merged = rehydrateSubagentRunsState(snapshots); + for (const [parentToolCallId, run] of existing) merged.runs.set(parentToolCallId, run); + state.main = { ...state.main, subagents: merged }; + } catch (err) { + log('refreshSubagentRunsFromDb failed op=%s err=%O', state.operationId, err); + } + } + + /** + * Reconstruct one {@link SubagentRunSnapshot} from a thread's persisted + * messages (ordered `createdAt` asc by the query). Returns undefined when the + * thread has no assistant yet — without one there is nothing to attach a + * continuation turn to, and the first-event path will (correctly) seed it. + */ + private buildSubagentSnapshot( + parentToolCallId: string, + threadId: string, + messages: Array<{ id: string; parentId?: string | null; role: string; tool_call_id?: string }>, + ): SubagentRunSnapshot | undefined { + const assistants = messages.filter((m) => m.role === 'assistant'); + const currentAssistant = assistants.at(-1); + if (!currentAssistant) return undefined; + + const toolRows = messages.filter((m) => m.role === 'tool' && m.tool_call_id); + const childTools = toolRows.filter((m) => m.parentId === currentAssistant.id); + const lastChainParentId = childTools.at(-1)?.id ?? currentAssistant.id; + + return { + currentAssistantId: currentAssistant.id, + lastChainParentId, + lifetimeToolCallIds: toolRows.map((m) => m.tool_call_id!), + parentToolCallId, + threadId, + }; + } + private async syncAssistantPointerForAdvancedStep(state: OperationState): Promise { const topic = await this.deps.topicModel.findById(state.topicId); const running = topic?.metadata?.runningOperation; @@ -844,6 +940,10 @@ export class HeterogeneousPersistenceHandler { await this.deps.threadModel.create({ id: intent.threadId, metadata: { + // Stamp the owning hetero operation so `refreshSubagentRunsFromDb` + // only rehydrates threads from THIS run — never a stale Processing + // thread a prior crashed/cancelled run left on the same topic. + operationId: state.operationId, sourceToolCallId: intent.sourceToolCallId, startedAt: new Date().toISOString(), subagentType: intent.subagentType, diff --git a/apps/server/src/services/heterogeneousAgent/__tests__/HeterogeneousPersistenceHandler.subagentRehydration.test.ts b/apps/server/src/services/heterogeneousAgent/__tests__/HeterogeneousPersistenceHandler.subagentRehydration.test.ts new file mode 100644 index 0000000000..25b41fc9ed --- /dev/null +++ b/apps/server/src/services/heterogeneousAgent/__tests__/HeterogeneousPersistenceHandler.subagentRehydration.test.ts @@ -0,0 +1,374 @@ +// @vitest-environment node +import type { AgentStreamEvent } from '@lobechat/agent-gateway-client'; +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { + __resetOperationStatesForTesting, + HeterogeneousPersistenceHandler, +} from '../HeterogeneousPersistenceHandler'; + +/** + * Regression for the SERVER-ONLY "大量无意义的 SubAgent" bug. + * + * Root cause: `HeterogeneousPersistenceHandler` keeps per-operation state in a + * module-level `operationStates` map. On Vercel serverless, consecutive ingest + * batches for one operation can land on DIFFERENT (cold) replicas, so that map + * is empty on the next batch. `loadOrCreateState` rehydrates the MAIN-agent + * state from DB (accumulatedContent, toolState, toolMsgIdByCallId, + * currentAssistantMessageId) — but initializes `subagentState` with an empty + * `createSubagentRunsState()` and NEVER reconstructs the in-flight subagent + * runs from DB. + * + * Consequence: when a subagent run spans multiple batches, the first subagent + * event seen by each fresh replica hits the `!existing` branch of `ensureRun` + * and creates a BRAND-NEW thread for a `parentToolCallId` that already has one. + * The duplicates get the generic "Subagent" title because spawnMetadata only + * rides the first subagent event per parent (adapter `announcedSpawns`). + * + * The desktop client never hits this — it has a single long-lived + * `subagentState` closure for the whole run. + * + * This test simulates a cold replica between batches via + * `__resetOperationStatesForTesting()` (the in-memory map is dropped while the + * mock DB — `threads` / `messages` — persists, exactly like a fresh Lambda). + */ + +interface FakeMessage { + agentId: string | null; + content: string; + id: string; + metadata?: any; + model?: string; + parentId?: string | null; + plugin?: any; + reasoning?: any; + role: 'user' | 'assistant' | 'tool' | 'task' | 'system'; + threadId?: string | null; + tool_call_id?: string; + tools?: any[]; + topicId: string | null; +} + +interface FakeThread { + id: string; + metadata?: any; + sourceMessageId?: string | null; + status: string; + title: string; + topicId: string; + type: string; +} + +const createHarness = (params: { + assistantMessageId: string; + operationId: string; + topicId: string; +}) => { + let nextMsgIdSeq = 0; + const messages = new Map(); + const threads = new Map(); + + messages.set(params.assistantMessageId, { + agentId: null, + content: '', + id: params.assistantMessageId, + role: 'assistant', + topicId: params.topicId, + }); + + const messageModel = { + create: vi.fn(async (input: Partial, id?: string) => { + nextMsgIdSeq += 1; + const msgId = id ?? `msg_${nextMsgIdSeq}`; + const msg: FakeMessage = { + agentId: input.agentId ?? null, + content: input.content ?? '', + id: msgId, + metadata: input.metadata, + model: input.model, + parentId: input.parentId ?? null, + plugin: input.plugin, + provider: undefined, + reasoning: input.reasoning, + role: input.role!, + threadId: input.threadId ?? null, + tool_call_id: input.tool_call_id, + topicId: input.topicId ?? null, + } as FakeMessage; + messages.set(msgId, msg); + return msg; + }), + update: vi.fn(async (id: string, patch: Partial) => { + const existing = messages.get(id); + if (!existing) return { success: false }; + messages.set(id, { ...existing, ...patch }); + return { success: true }; + }), + updateToolMessage: vi.fn(async (id: string, patch: any) => { + const existing = messages.get(id); + if (!existing) return { success: false }; + messages.set(id, { ...existing, content: patch.content ?? existing.content }); + return { success: true }; + }), + findById: vi.fn(async (id: string) => messages.get(id) ?? null), + query: vi.fn(async (params: { threadId?: string; topicId?: string }) => { + if (params?.threadId) { + return [...messages.values()].filter((m) => m.threadId === params.threadId); + } + return [...messages.values()].filter((m) => !m.threadId && m.topicId === params?.topicId); + }), + getLastChildToolMessageId: vi.fn(async (assistantMessageId: string) => { + const match = [...messages.values()].findLast( + (m) => m.role === 'tool' && m.parentId === assistantMessageId && !m.threadId, + ); + return match?.id; + }), + listMessagePluginsByTopic: vi.fn(async (_topicId: string) => { + // Mirror the real query: every persisted tool row's (toolCallId → id). + return [...messages.values()] + .filter((m) => m.role === 'tool' && m.tool_call_id) + .map((m) => ({ id: m.id, toolCallId: m.tool_call_id! })); + }), + }; + + const threadModel = { + create: vi.fn(async (input: Partial) => { + const thread: FakeThread = { + id: input.id!, + metadata: input.metadata, + sourceMessageId: input.sourceMessageId, + status: input.status ?? 'active', + title: input.title ?? '', + topicId: input.topicId ?? params.topicId, + type: input.type ?? 'isolation', + }; + threads.set(thread.id, thread); + return thread; + }), + findById: vi.fn(async (id: string) => threads.get(id) ?? null), + queryByTopicId: vi.fn(async (topicId: string) => + [...threads.values()].filter((t) => t.topicId === topicId), + ), + update: vi.fn(async (id: string, patch: Partial) => { + const existing = threads.get(id); + if (!existing) return; + threads.set(id, { ...existing, ...patch }); + }), + }; + + const topicModel = { + findById: vi.fn(async (id: string) => { + if (id !== params.topicId) return null; + return { + agentId: null, + id, + metadata: { + runningOperation: { + assistantMessageId: params.assistantMessageId, + operationId: params.operationId, + }, + }, + }; + }), + updateMetadata: vi.fn(async () => {}), + }; + + const handler = new HeterogeneousPersistenceHandler({ + messageModel: messageModel as any, + threadModel: threadModel as any, + topicModel: topicModel as any, + }); + + return { handler, messages, threadModel, threads }; +}; + +const buildEvent = ( + type: AgentStreamEvent['type'], + stepIndex: number, + data: Record, +): AgentStreamEvent => ({ + data, + operationId: 'op-1', + stepIndex, + timestamp: 1_700_000_000_000 + stepIndex, + type, +}); + +const innerTool = (id: string) => ({ + apiName: 'Bash', + arguments: '{}', + id, + identifier: 'bash', + type: 'default', +}); + +describe('HeterogeneousPersistenceHandler — subagent run survives a cold replica', () => { + beforeEach(() => __resetOperationStatesForTesting()); + afterEach(() => __resetOperationStatesForTesting()); + + it('does NOT spawn a duplicate thread when a later batch of the SAME subagent run lands on a fresh replica', async () => { + const h = createHarness({ + assistantMessageId: 'asst-1', + operationId: 'op-1', + topicId: 'topic-1', + }); + + const PARENT = 'tc-spawn-1'; + + // ── Batch 1 (replica A): first subagent turn. Carries spawnMetadata, so the + // thread is created with a real title. ── + await h.handler.ingest({ + assistantMessageId: 'asst-1', + events: [ + buildEvent('stream_chunk', 0, { + chunkType: 'tools_calling', + subagent: { + parentToolCallId: PARENT, + spawnMetadata: { + description: 'Explore session/agent topic data model', + prompt: 'investigate', + subagentType: 'Explore', + }, + subagentMessageId: 'sub-msg-1', + }, + toolsCalling: [innerTool('inner-1')], + }), + ], + operationId: 'op-1', + topicId: 'topic-1', + }); + + expect(h.threads.size).toBe(1); + + // ── Cold replica: the warm in-memory operation state is gone, but the DB + // (threads + messages) persists. ── + __resetOperationStatesForTesting(); + + // ── Batch 2 (replica B): the SAME subagent run continues with a new turn. + // Mirroring the adapter, this later event carries NO spawnMetadata. ── + await h.handler.ingest({ + assistantMessageId: 'asst-1', + events: [ + buildEvent('stream_chunk', 1, { + chunkType: 'tools_calling', + subagent: { + parentToolCallId: PARENT, + subagentMessageId: 'sub-msg-2', + }, + toolsCalling: [innerTool('inner-2')], + }), + ], + operationId: 'op-1', + topicId: 'topic-1', + }); + + // The continuation must attach to the EXISTING thread, not fork a new one. + expect(h.threads.size).toBe(1); + // And we must never produce a generic-titled "Subagent" duplicate. + expect([...h.threads.values()].some((t) => t.title === 'Subagent')).toBe(false); + }); + + // P1: a tools_calling batch reprocessed on a cold replica (BatchIngester + // retry, or a turn split across a cold boundary so the cumulative array is + // re-seen) must NOT mint a second tool message for an inner tool the run + // already persisted. Rehydration restores `lifetimeToolCallIds`, and the + // reducer de-dupes against it. + it('does NOT re-create an already-persisted inner tool row after a cold replica', async () => { + const h = createHarness({ + assistantMessageId: 'asst-1', + operationId: 'op-1', + topicId: 'topic-1', + }); + const PARENT = 'tc-spawn-1'; + + // Batch 1: turn sub-msg-1 persists inner-1. + await h.handler.ingest({ + assistantMessageId: 'asst-1', + events: [ + buildEvent('stream_chunk', 0, { + chunkType: 'tools_calling', + subagent: { + parentToolCallId: PARENT, + spawnMetadata: { prompt: 'go', subagentType: 'Explore' }, + subagentMessageId: 'sub-msg-1', + }, + toolsCalling: [innerTool('inner-1')], + }), + ], + operationId: 'op-1', + topicId: 'topic-1', + }); + + __resetOperationStatesForTesting(); // cold replica + + // Batch 2 (replica B): the SAME turn's cumulative array is re-seen (inner-1 + // again) plus a new inner-2. + await h.handler.ingest({ + assistantMessageId: 'asst-1', + events: [ + buildEvent('stream_chunk', 1, { + chunkType: 'tools_calling', + subagent: { parentToolCallId: PARENT, subagentMessageId: 'sub-msg-1' }, + toolsCalling: [innerTool('inner-1'), innerTool('inner-2')], + }), + ], + operationId: 'op-1', + topicId: 'topic-1', + }); + + const toolRows = (callId: string) => + [...h.messages.values()].filter((m) => m.role === 'tool' && m.tool_call_id === callId); + // inner-1 persisted exactly once (no duplicate row), inner-2 once. + expect(toolRows('inner-1')).toHaveLength(1); + expect(toolRows('inner-2')).toHaveLength(1); + expect(h.threads.size).toBe(1); + }); + + // P2: a stale `Processing` isolation thread left by a PRIOR operation on the + // same topic must not be rehydrated into — or finalized by — the current + // operation. The rehydration is scoped by `metadata.operationId`. + it('ignores a stale Processing thread from a different operation on the same topic', async () => { + const h = createHarness({ + assistantMessageId: 'asst-1', + operationId: 'op-2', + topicId: 'topic-1', + }); + + // Seed a thread (+ its in-thread assistant) left Processing by op-1. + h.threads.set('thd-stale', { + id: 'thd-stale', + metadata: { operationId: 'op-1', sourceToolCallId: 'tc-old' }, + sourceMessageId: 'asst-old', + status: 'processing', + title: 'Old Subagent', + topicId: 'topic-1', + type: 'isolation', + }); + h.messages.set('stale-asst', { + agentId: null, + content: '', + id: 'stale-asst', + parentId: 'asst-old', + role: 'assistant', + threadId: 'thd-stale', + topicId: 'topic-1', + } as any); + + // op-2 runs and terminates. The terminal orphan-drain would finalize every + // run in the reducer state — so if the stale thread were merged in, it would + // be flipped to Active here. + await h.handler.ingest({ + assistantMessageId: 'asst-1', + events: [ + buildEvent('stream_chunk', 0, { chunkType: 'text', content: 'working' }), + buildEvent('agent_runtime_end', 1, {}), + ], + operationId: 'op-2', + topicId: 'topic-1', + }); + + // The unrelated thread is untouched: still Processing, never updated. + expect(h.threads.get('thd-stale')!.status).toBe('processing'); + expect(h.threadModel.update).not.toHaveBeenCalledWith('thd-stale', expect.anything()); + }); +}); diff --git a/packages/heterogeneous-agents/src/index.ts b/packages/heterogeneous-agents/src/index.ts index a408c37636..1abfaa140f 100644 --- a/packages/heterogeneous-agents/src/index.ts +++ b/packages/heterogeneous-agents/src/index.ts @@ -38,6 +38,7 @@ export type { StreamContentIntent, SubagentIntent, SubagentReduceCtx, + SubagentRunSnapshot, SubagentRunsState, } from './subagentCoordinator'; export { @@ -45,6 +46,7 @@ export { type EventScope, getEventScope, reduceSubagentRuns, + rehydrateSubagentRunsState, } from './subagentCoordinator'; export type { AgentEventAdapter, diff --git a/packages/heterogeneous-agents/src/subagentCoordinator/index.ts b/packages/heterogeneous-agents/src/subagentCoordinator/index.ts index b3a52db574..90a064dca2 100644 --- a/packages/heterogeneous-agents/src/subagentCoordinator/index.ts +++ b/packages/heterogeneous-agents/src/subagentCoordinator/index.ts @@ -13,7 +13,8 @@ export type { SubagentIntent, SubagentReduceCtx, SubagentRun, + SubagentRunSnapshot, SubagentRunsState, SubagentTurnToolState, } from './types'; -export { createSubagentRunsState } from './types'; +export { createSubagentRunsState, rehydrateSubagentRunsState } from './types'; diff --git a/packages/heterogeneous-agents/src/subagentCoordinator/reducer.ts b/packages/heterogeneous-agents/src/subagentCoordinator/reducer.ts index 9097d4fa5d..c00bc351ce 100644 --- a/packages/heterogeneous-agents/src/subagentCoordinator/reducer.ts +++ b/packages/heterogeneous-agents/src/subagentCoordinator/reducer.ts @@ -281,25 +281,34 @@ const reduceToolsChunk = ( const run = ensured.run; const intents = ensured.intents; - for (const tool of tools) run.lifetimeToolCallIds.add(tool.id); - const newToolMsgIds: string[] = []; for (const tool of tools) { - if (!run.toolState.persistedIds.has(tool.id)) { - run.toolState.persistedIds.add(tool.id); - run.toolState.payloads.push({ - apiName: tool.apiName, - arguments: tool.arguments, - id: tool.id, - identifier: tool.identifier, - type: tool.type, - }); - const toolMessageId = ctx.newId('message'); - run.toolState.toolMsgIdByCallId.set(tool.id, toolMessageId); - newToolMsgIds.push(toolMessageId); - } + // Run-lifetime de-dupe FIRST: a tool already persisted anywhere in this run + // must never be re-created. Per-turn `persistedIds` is reset on every turn + // boundary — and starts empty after a cold-replica rehydration — so it alone + // would let a replayed / continued `tools_calling` mint a SECOND tool message + // for an id the run already wrote (duplicate inner-tool row in the thread). + // `lifetimeToolCallIds` survives turn boundaries and is restored from DB on + // rehydration, so it is the durable de-dupe key. (Checked BEFORE the + // add-to-lifetime loop below, which would otherwise mark this batch's ids as + // already-seen and skip everything.) + if (run.lifetimeToolCallIds.has(tool.id)) continue; + if (run.toolState.persistedIds.has(tool.id)) continue; + run.toolState.persistedIds.add(tool.id); + run.toolState.payloads.push({ + apiName: tool.apiName, + arguments: tool.arguments, + id: tool.id, + identifier: tool.identifier, + type: tool.type, + }); + const toolMessageId = ctx.newId('message'); + run.toolState.toolMsgIdByCallId.set(tool.id, toolMessageId); + newToolMsgIds.push(toolMessageId); } + for (const tool of tools) run.lifetimeToolCallIds.add(tool.id); + intents.push({ assistantMessageId: run.currentAssistantId, content: run.accContent || undefined, diff --git a/packages/heterogeneous-agents/src/subagentCoordinator/types.ts b/packages/heterogeneous-agents/src/subagentCoordinator/types.ts index fe9b34124d..6828f3c3fc 100644 --- a/packages/heterogeneous-agents/src/subagentCoordinator/types.ts +++ b/packages/heterogeneous-agents/src/subagentCoordinator/types.ts @@ -79,6 +79,64 @@ export const createSubagentRunsState = (): SubagentRunsState => ({ runs: new Map(), }); +/** + * DB-derived snapshot of one in-flight subagent run, used to rebuild a + * {@link SubagentRun} after the in-memory coordinator state was lost. + * + * Why this exists: the desktop renderer keeps one long-lived `SubagentRunsState` + * closure for a whole CC run, so its `runs` map always has the entry for an + * active spawn. The server (`HeterogeneousPersistenceHandler`) keeps per-operation + * state in a module-level map that a COLD serverless replica starts empty — and + * if that empty state reaches `reduce`, the next subagent event hits the + * `!existing` branch of `ensureRun` and forks a BRAND-NEW thread for a + * `parentToolCallId` that already has one (the "大量无意义的 Subagent" bug). The + * server rebuilds main-agent state from DB on cold start; this lets it rebuild + * the subagent runs the same way. + * + * Only the fields needed to keep the run attached to its EXISTING thread are + * required. `currentSubagentMessageId` is intentionally NOT recoverable from DB + * (CC's per-turn `message.id` is not persisted) — leaving it empty makes the + * first post-rehydration subagent event read as a turn boundary, cutting a fresh + * in-thread assistant chained off `lastChainParentId`. That is correct and safe: + * it reuses the thread (no duplicate) and never appends to a half-written turn. + */ +export interface SubagentRunSnapshot { + /** Latest in-thread assistant id (where a continuation turn would otherwise append). */ + currentAssistantId: string; + /** Chain anchor for the next turn's assistant — last tool row of the thread, else the assistant. */ + lastChainParentId?: string; + /** Every inner tool_call_id already persisted in the thread (delayed tool_results resolve via this). */ + lifetimeToolCallIds?: string[]; + /** The spawn tool_use id (`thread.metadata.sourceToolCallId`) — the run key. */ + parentToolCallId: string; + /** The existing isolation Thread this run owns. */ + threadId: string; +} + +/** + * Rebuild a {@link SubagentRunsState} from DB-derived snapshots of in-flight + * runs. Use on a cold start so a continuing subagent reuses its existing thread + * instead of forking a new one. `accContent` / `accReasoning` / per-turn + * `toolState` start empty — the next turn boundary opens a fresh in-thread + * assistant, and inner tool_results still resolve through `lifetimeToolCallIds`. + */ +export const rehydrateSubagentRunsState = (snapshots: SubagentRunSnapshot[]): SubagentRunsState => { + const runs = new Map(); + for (const s of snapshots) { + runs.set(s.parentToolCallId, { + accContent: '', + accReasoning: '', + currentAssistantId: s.currentAssistantId, + currentSubagentMessageId: '', + lastChainParentId: s.lastChainParentId ?? s.currentAssistantId, + lifetimeToolCallIds: new Set(s.lifetimeToolCallIds ?? []), + threadId: s.threadId, + toolState: { payloads: [], persistedIds: new Set(), toolMsgIdByCallId: new Map() }, + }); + } + return { runs }; +}; + // ─── Reduce context (per event) ─── /**