🐛 fix(server): rehydrate subagent runs from DB on cold replica

Server-side hetero persistence kept per-operation state in a module-level
map. On a cold serverless replica (or any cross-replica batch), the main
agent state is rebuilt from DB but `MainAgentRunState.subagents` was seeded
empty. A continuing subagent event then hit the `!existing` branch of
`ensureRun` and forked a brand-new isolation thread for a parentToolCallId
that already had one — producing piles of generic "Subagent" threads that
were never attached to the right thread. Desktop never hit this (one
long-lived run-state closure).

Rebuild `state.main.subagents` from DB the same way the main half is
rehydrated: add `rehydrateSubagentRunsState` to @lobechat/heterogeneous-agents
and call a new `refreshSubagentRunsFromDb` each ingest. Only runs MISSING
from memory are rehydrated (warm accumulators win); finalized (Active)
threads are excluded so completed spawns are never resurrected.

Sibling of #15783 (main message chaining) — same root cause, subagent half.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-06-14 01:49:13 +08:00
parent 99411041b9
commit 0d4502c6c0
5 changed files with 420 additions and 2 deletions
@@ -6,9 +6,14 @@ import type {
MainAgentRunState,
MainAgentTurnToolState,
SubagentIntent,
SubagentRunSnapshot,
ToolCallPayload,
} from '@lobechat/heterogeneous-agents';
import { createMainAgentRunState, reduceMainAgent } from '@lobechat/heterogeneous-agents';
import {
createMainAgentRunState,
reduceMainAgent,
rehydrateSubagentRunsState,
} from '@lobechat/heterogeneous-agents';
import {
AgentRuntimeErrorType,
type ChatMessageError,
@@ -219,6 +224,7 @@ export class HeterogeneousPersistenceHandler {
await this.refreshToolMessageIndex(state);
await this.refreshMainStateFromDb(state);
await this.refreshSubagentRunsFromDb(state);
for (const event of params.events) {
const key = eventKey(event);
@@ -549,6 +555,87 @@ export class HeterogeneousPersistenceHandler {
}
}
/**
* Rebuild the in-flight subagent runs (`state.main.subagents`) from DB.
*
* The shared reducer keys runs by `parentToolCallId` and only lazy-creates a
* thread when the run is ABSENT from this map. On a cold serverless replica
* `createMainAgentRunState` seeds an empty map, so a subagent event whose
* thread already exists (created by an earlier batch / another replica) would
* fork a brand-new thread — the "大量无意义的 Subagent" bug. `refreshMainStateFromDb`
* rebuilds the main-agent half; this rebuilds the subagent half the same way.
*
* Merge semantics: only runs MISSING from the in-memory map are rehydrated, so
* a warm replica's live per-turn accumulators (`accContent`, current
* `toolState`) are never clobbered by the DB projection. Finalized runs are
* excluded (their thread is `Active`, not `Processing`), so a completed spawn
* is never resurrected.
*
* Best-effort: any DB hiccup (or a partial test mock without the query
* methods) leaves `state.main.subagents` untouched rather than aborting the
* whole ingest.
*/
private async refreshSubagentRunsFromDb(state: OperationState): Promise<void> {
try {
const threads = await this.deps.threadModel.queryByTopicId(state.topicId);
const existing = state.main.subagents.runs;
const snapshots: SubagentRunSnapshot[] = [];
for (const thread of threads ?? []) {
if (thread.type !== ThreadType.Isolation) continue;
if (thread.status !== ThreadStatus.Processing) continue;
const parentToolCallId = (thread.metadata as { sourceToolCallId?: string } | null)
?.sourceToolCallId;
if (!parentToolCallId || existing.has(parentToolCallId)) continue;
const messages = await this.deps.messageModel.query({
threadId: thread.id,
topicId: state.topicId,
});
const snapshot = this.buildSubagentSnapshot(parentToolCallId, thread.id, messages);
if (snapshot) snapshots.push(snapshot);
}
if (snapshots.length === 0) return;
// Union: rehydrated (missing) runs + the in-memory ones (which win, since
// they carry live accumulators the DB hasn't caught up to yet).
const merged = rehydrateSubagentRunsState(snapshots);
for (const [parentToolCallId, run] of existing) merged.runs.set(parentToolCallId, run);
state.main = { ...state.main, subagents: merged };
} catch (err) {
log('refreshSubagentRunsFromDb failed op=%s err=%O', state.operationId, err);
}
}
/**
* Reconstruct one {@link SubagentRunSnapshot} from a thread's persisted
* messages (ordered `createdAt` asc by the query). Returns undefined when the
* thread has no assistant yet — without one there is nothing to attach a
* continuation turn to, and the first-event path will (correctly) seed it.
*/
private buildSubagentSnapshot(
parentToolCallId: string,
threadId: string,
messages: Array<{ id: string; parentId?: string | null; role: string; tool_call_id?: string }>,
): SubagentRunSnapshot | undefined {
const assistants = messages.filter((m) => m.role === 'assistant');
if (assistants.length === 0) return undefined;
const currentAssistant = assistants.at(-1);
const toolRows = messages.filter((m) => m.role === 'tool' && m.tool_call_id);
const childTools = toolRows.filter((m) => m.parentId === currentAssistant.id);
const lastChainParentId = childTools.length > 0 ? childTools.at(-1).id : currentAssistant.id;
return {
currentAssistantId: currentAssistant.id,
lastChainParentId,
lifetimeToolCallIds: toolRows.map((m) => m.tool_call_id!),
parentToolCallId,
threadId,
};
}
private async syncAssistantPointerForAdvancedStep(state: OperationState): Promise<void> {
const topic = await this.deps.topicModel.findById(state.topicId);
const running = topic?.metadata?.runningOperation;
@@ -0,0 +1,270 @@
// @vitest-environment node
import type { AgentStreamEvent } from '@lobechat/agent-gateway-client';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import {
__resetOperationStatesForTesting,
HeterogeneousPersistenceHandler,
} from '../HeterogeneousPersistenceHandler';
/**
* Regression for the SERVER-ONLY "大量无意义的 SubAgent" bug.
*
* Root cause: `HeterogeneousPersistenceHandler` keeps per-operation state in a
* module-level `operationStates` map. On Vercel serverless, consecutive ingest
* batches for one operation can land on DIFFERENT (cold) replicas, so that map
* is empty on the next batch. `loadOrCreateState` rehydrates the MAIN-agent
* state from DB (accumulatedContent, toolState, toolMsgIdByCallId,
* currentAssistantMessageId) — but initializes `subagentState` with an empty
* `createSubagentRunsState()` and NEVER reconstructs the in-flight subagent
* runs from DB.
*
* Consequence: when a subagent run spans multiple batches, the first subagent
* event seen by each fresh replica hits the `!existing` branch of `ensureRun`
* and creates a BRAND-NEW thread for a `parentToolCallId` that already has one.
* The duplicates get the generic "Subagent" title because spawnMetadata only
* rides the first subagent event per parent (adapter `announcedSpawns`).
*
* The desktop client never hits this — it has a single long-lived
* `subagentState` closure for the whole run.
*
* This test simulates a cold replica between batches via
* `__resetOperationStatesForTesting()` (the in-memory map is dropped while the
* mock DB — `threads` / `messages` — persists, exactly like a fresh Lambda).
*/
interface FakeMessage {
agentId: string | null;
content: string;
id: string;
metadata?: any;
model?: string;
parentId?: string | null;
plugin?: any;
reasoning?: any;
role: 'user' | 'assistant' | 'tool' | 'task' | 'system';
threadId?: string | null;
tool_call_id?: string;
tools?: any[];
topicId: string | null;
}
interface FakeThread {
id: string;
metadata?: any;
sourceMessageId?: string | null;
status: string;
title: string;
topicId: string;
type: string;
}
const createHarness = (params: {
assistantMessageId: string;
operationId: string;
topicId: string;
}) => {
let nextMsgIdSeq = 0;
const messages = new Map<string, FakeMessage>();
const threads = new Map<string, FakeThread>();
messages.set(params.assistantMessageId, {
agentId: null,
content: '',
id: params.assistantMessageId,
role: 'assistant',
topicId: params.topicId,
});
const messageModel = {
create: vi.fn(async (input: Partial<FakeMessage>, id?: string) => {
nextMsgIdSeq += 1;
const msgId = id ?? `msg_${nextMsgIdSeq}`;
const msg: FakeMessage = {
agentId: input.agentId ?? null,
content: input.content ?? '',
id: msgId,
metadata: input.metadata,
model: input.model,
parentId: input.parentId ?? null,
plugin: input.plugin,
provider: undefined,
reasoning: input.reasoning,
role: input.role!,
threadId: input.threadId ?? null,
tool_call_id: input.tool_call_id,
topicId: input.topicId ?? null,
} as FakeMessage;
messages.set(msgId, msg);
return msg;
}),
update: vi.fn(async (id: string, patch: Partial<FakeMessage>) => {
const existing = messages.get(id);
if (!existing) return { success: false };
messages.set(id, { ...existing, ...patch });
return { success: true };
}),
updateToolMessage: vi.fn(async (id: string, patch: any) => {
const existing = messages.get(id);
if (!existing) return { success: false };
messages.set(id, { ...existing, content: patch.content ?? existing.content });
return { success: true };
}),
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
query: vi.fn(async (params: { threadId?: string; topicId?: string }) => {
if (params?.threadId) {
return [...messages.values()].filter((m) => m.threadId === params.threadId);
}
return [...messages.values()].filter((m) => !m.threadId && m.topicId === params?.topicId);
}),
getLastChildToolMessageId: vi.fn(async (assistantMessageId: string) => {
const match = [...messages.values()].findLast(
(m) => m.role === 'tool' && m.parentId === assistantMessageId && !m.threadId,
);
return match?.id;
}),
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => {
// Mirror the real query: every persisted tool row's (toolCallId → id).
return [...messages.values()]
.filter((m) => m.role === 'tool' && m.tool_call_id)
.map((m) => ({ id: m.id, toolCallId: m.tool_call_id! }));
}),
};
const threadModel = {
create: vi.fn(async (input: Partial<FakeThread>) => {
const thread: FakeThread = {
id: input.id!,
metadata: input.metadata,
sourceMessageId: input.sourceMessageId,
status: input.status ?? 'active',
title: input.title ?? '',
topicId: input.topicId ?? params.topicId,
type: input.type ?? 'isolation',
};
threads.set(thread.id, thread);
return thread;
}),
findById: vi.fn(async (id: string) => threads.get(id) ?? null),
queryByTopicId: vi.fn(async (topicId: string) =>
[...threads.values()].filter((t) => t.topicId === topicId),
),
update: vi.fn(async (id: string, patch: Partial<FakeThread>) => {
const existing = threads.get(id);
if (!existing) return;
threads.set(id, { ...existing, ...patch });
}),
};
const topicModel = {
findById: vi.fn(async (id: string) => {
if (id !== params.topicId) return null;
return {
agentId: null,
id,
metadata: {
runningOperation: {
assistantMessageId: params.assistantMessageId,
operationId: params.operationId,
},
},
};
}),
updateMetadata: vi.fn(async () => {}),
};
const handler = new HeterogeneousPersistenceHandler({
messageModel: messageModel as any,
threadModel: threadModel as any,
topicModel: topicModel as any,
});
return { handler, messages, threadModel, threads };
};
const buildEvent = (
type: AgentStreamEvent['type'],
stepIndex: number,
data: Record<string, unknown>,
): AgentStreamEvent => ({
data,
operationId: 'op-1',
stepIndex,
timestamp: 1_700_000_000_000 + stepIndex,
type,
});
const innerTool = (id: string) => ({
apiName: 'Bash',
arguments: '{}',
id,
identifier: 'bash',
type: 'default',
});
describe('HeterogeneousPersistenceHandler — subagent run survives a cold replica', () => {
beforeEach(() => __resetOperationStatesForTesting());
afterEach(() => __resetOperationStatesForTesting());
it('does NOT spawn a duplicate thread when a later batch of the SAME subagent run lands on a fresh replica', async () => {
const h = createHarness({
assistantMessageId: 'asst-1',
operationId: 'op-1',
topicId: 'topic-1',
});
const PARENT = 'tc-spawn-1';
// ── Batch 1 (replica A): first subagent turn. Carries spawnMetadata, so the
// thread is created with a real title. ──
await h.handler.ingest({
assistantMessageId: 'asst-1',
events: [
buildEvent('stream_chunk', 0, {
chunkType: 'tools_calling',
subagent: {
parentToolCallId: PARENT,
spawnMetadata: {
description: 'Explore session/agent topic data model',
prompt: 'investigate',
subagentType: 'Explore',
},
subagentMessageId: 'sub-msg-1',
},
toolsCalling: [innerTool('inner-1')],
}),
],
operationId: 'op-1',
topicId: 'topic-1',
});
expect(h.threads.size).toBe(1);
// ── Cold replica: the warm in-memory operation state is gone, but the DB
// (threads + messages) persists. ──
__resetOperationStatesForTesting();
// ── Batch 2 (replica B): the SAME subagent run continues with a new turn.
// Mirroring the adapter, this later event carries NO spawnMetadata. ──
await h.handler.ingest({
assistantMessageId: 'asst-1',
events: [
buildEvent('stream_chunk', 1, {
chunkType: 'tools_calling',
subagent: {
parentToolCallId: PARENT,
subagentMessageId: 'sub-msg-2',
},
toolsCalling: [innerTool('inner-2')],
}),
],
operationId: 'op-1',
topicId: 'topic-1',
});
// The continuation must attach to the EXISTING thread, not fork a new one.
expect(h.threads.size).toBe(1);
// And we must never produce a generic-titled "Subagent" duplicate.
expect([...h.threads.values()].some((t) => t.title === 'Subagent')).toBe(false);
});
});
@@ -38,6 +38,7 @@ export type {
StreamContentIntent,
SubagentIntent,
SubagentReduceCtx,
SubagentRunSnapshot,
SubagentRunsState,
} from './subagentCoordinator';
export {
@@ -45,6 +46,7 @@ export {
type EventScope,
getEventScope,
reduceSubagentRuns,
rehydrateSubagentRunsState,
} from './subagentCoordinator';
export type {
AgentEventAdapter,
@@ -13,7 +13,8 @@ export type {
SubagentIntent,
SubagentReduceCtx,
SubagentRun,
SubagentRunSnapshot,
SubagentRunsState,
SubagentTurnToolState,
} from './types';
export { createSubagentRunsState } from './types';
export { createSubagentRunsState, rehydrateSubagentRunsState } from './types';
@@ -79,6 +79,64 @@ export const createSubagentRunsState = (): SubagentRunsState => ({
runs: new Map(),
});
/**
* DB-derived snapshot of one in-flight subagent run, used to rebuild a
* {@link SubagentRun} after the in-memory coordinator state was lost.
*
* Why this exists: the desktop renderer keeps one long-lived `SubagentRunsState`
* closure for a whole CC run, so its `runs` map always has the entry for an
* active spawn. The server (`HeterogeneousPersistenceHandler`) keeps per-operation
* state in a module-level map that a COLD serverless replica starts empty — and
* if that empty state reaches `reduce`, the next subagent event hits the
* `!existing` branch of `ensureRun` and forks a BRAND-NEW thread for a
* `parentToolCallId` that already has one (the "大量无意义的 Subagent" bug). The
* server rebuilds main-agent state from DB on cold start; this lets it rebuild
* the subagent runs the same way.
*
* Only the fields needed to keep the run attached to its EXISTING thread are
* required. `currentSubagentMessageId` is intentionally NOT recoverable from DB
* (CC's per-turn `message.id` is not persisted) — leaving it empty makes the
* first post-rehydration subagent event read as a turn boundary, cutting a fresh
* in-thread assistant chained off `lastChainParentId`. That is correct and safe:
* it reuses the thread (no duplicate) and never appends to a half-written turn.
*/
export interface SubagentRunSnapshot {
/** Latest in-thread assistant id (where a continuation turn would otherwise append). */
currentAssistantId: string;
/** Chain anchor for the next turn's assistant — last tool row of the thread, else the assistant. */
lastChainParentId?: string;
/** Every inner tool_call_id already persisted in the thread (delayed tool_results resolve via this). */
lifetimeToolCallIds?: string[];
/** The spawn tool_use id (`thread.metadata.sourceToolCallId`) — the run key. */
parentToolCallId: string;
/** The existing isolation Thread this run owns. */
threadId: string;
}
/**
* Rebuild a {@link SubagentRunsState} from DB-derived snapshots of in-flight
* runs. Use on a cold start so a continuing subagent reuses its existing thread
* instead of forking a new one. `accContent` / `accReasoning` / per-turn
* `toolState` start empty — the next turn boundary opens a fresh in-thread
* assistant, and inner tool_results still resolve through `lifetimeToolCallIds`.
*/
export const rehydrateSubagentRunsState = (snapshots: SubagentRunSnapshot[]): SubagentRunsState => {
const runs = new Map<string, SubagentRun>();
for (const s of snapshots) {
runs.set(s.parentToolCallId, {
accContent: '',
accReasoning: '',
currentAssistantId: s.currentAssistantId,
currentSubagentMessageId: '',
lastChainParentId: s.lastChainParentId ?? s.currentAssistantId,
lifetimeToolCallIds: new Set(s.lifetimeToolCallIds ?? []),
threadId: s.threadId,
toolState: { payloads: [], persistedIds: new Set(), toolMsgIdByCallId: new Map() },
});
}
return { runs };
};
// ─── Reduce context (per event) ───
/**