🐛 fix(server): rehydrate subagent runs from DB on cold replica (#15788)

* 🐛 fix(server): rehydrate subagent runs from DB on cold replica

Server-side hetero persistence kept per-operation state in a module-level
map. On a cold serverless replica (or any cross-replica batch), the main
agent state is rebuilt from DB but `MainAgentRunState.subagents` was seeded
empty. A continuing subagent event then hit the `!existing` branch of
`ensureRun` and forked a brand-new isolation thread for a parentToolCallId
that already had one — producing piles of generic "Subagent" threads that
were never attached to the right thread. Desktop never hit this (one
long-lived run-state closure).

Rebuild `state.main.subagents` from DB the same way the main half is
rehydrated: add `rehydrateSubagentRunsState` to @lobechat/heterogeneous-agents
and call a new `refreshSubagentRunsFromDb` each ingest. Only runs MISSING
from memory are rehydrated (warm accumulators win); finalized (Active)
threads are excluded so completed spawns are never resurrected.

Sibling of #15783 (main message chaining) — same root cause, subagent half.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* 🐛 fix(server): scope subagent rehydration to operation + de-dupe inner tools

Two follow-up fixes on the cold-replica subagent rehydration:

- P1: de-dupe inner tool creation against the run-lifetime tool set, not just
  the per-turn `persistedIds`. Per-turn state is reset on every turn boundary
  and starts empty after a rehydration, so a replayed / continued tools_calling
  on a cold replica minted a SECOND tool message for an id the run already
  wrote. `lifetimeToolCallIds` survives boundaries and is restored from DB, so
  it is the durable de-dupe key. Mirrors the main-agent retry protection.

- P2: scope `refreshSubagentRunsFromDb` to the current operation. Topics are
  reused across turns; a prior crashed/cancelled run can leave a subagent
  thread stuck `Processing`. Rehydrating purely by topic+status would merge
  that unrelated thread into the new operation's reducer state and finalize it
  on the new run's terminal drain. Stamp `operationId` on the subagent thread
  metadata at creation and filter rehydration by it.

Adds regression cases for both (each verified to fail without its fix).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-06-14 03:13:35 +08:00
committed by GitHub
parent 6dcbd387f7
commit f47e65d215
6 changed files with 561 additions and 17 deletions
@@ -6,9 +6,14 @@ import type {
MainAgentRunState,
MainAgentTurnToolState,
SubagentIntent,
SubagentRunSnapshot,
ToolCallPayload,
} from '@lobechat/heterogeneous-agents';
import { createMainAgentRunState, reduceMainAgent } from '@lobechat/heterogeneous-agents';
import {
createMainAgentRunState,
reduceMainAgent,
rehydrateSubagentRunsState,
} from '@lobechat/heterogeneous-agents';
import {
AgentRuntimeErrorType,
type ChatMessageError,
@@ -219,6 +224,7 @@ export class HeterogeneousPersistenceHandler {
await this.refreshToolMessageIndex(state);
await this.refreshMainStateFromDb(state);
await this.refreshSubagentRunsFromDb(state);
for (const event of params.events) {
const key = eventKey(event);
@@ -549,6 +555,96 @@ export class HeterogeneousPersistenceHandler {
}
}
/**
* Rebuild the in-flight subagent runs (`state.main.subagents`) from DB.
*
* The shared reducer keys runs by `parentToolCallId` and only lazy-creates a
* thread when the run is ABSENT from this map. On a cold serverless replica
* `createMainAgentRunState` seeds an empty map, so a subagent event whose
* thread already exists (created by an earlier batch / another replica) would
* fork a brand-new thread — the "大量无意义的 Subagent" bug. `refreshMainStateFromDb`
* rebuilds the main-agent half; this rebuilds the subagent half the same way.
*
* Merge semantics: only runs MISSING from the in-memory map are rehydrated, so
* a warm replica's live per-turn accumulators (`accContent`, current
* `toolState`) are never clobbered by the DB projection. Finalized runs are
* excluded (their thread is `Active`, not `Processing`), so a completed spawn
* is never resurrected.
*
* Best-effort: any DB hiccup (or a partial test mock without the query
* methods) leaves `state.main.subagents` untouched rather than aborting the
* whole ingest.
*/
private async refreshSubagentRunsFromDb(state: OperationState): Promise<void> {
try {
const threads = await this.deps.threadModel.queryByTopicId(state.topicId);
const existing = state.main.subagents.runs;
const snapshots: SubagentRunSnapshot[] = [];
for (const thread of threads ?? []) {
if (thread.type !== ThreadType.Isolation) continue;
if (thread.status !== ThreadStatus.Processing) continue;
const meta = thread.metadata as { operationId?: string; sourceToolCallId?: string } | null;
// Operation-scoped: only rehydrate threads THIS operation created.
// Topics are reused across turns, so a prior run that crashed / was
// cancelled without an ingested terminal event can leave its subagent
// thread stuck in `Processing`. Without this guard the next operation
// would merge that unrelated thread into its reducer state and then
// finalize/mutate it on its own terminal drain. Threads written before
// this field existed have no `operationId` and are skipped (safe — we
// can't attribute them, and the live run re-creates what it needs).
if (meta?.operationId !== state.operationId) continue;
const parentToolCallId = meta?.sourceToolCallId;
if (!parentToolCallId || existing.has(parentToolCallId)) continue;
const messages = await this.deps.messageModel.query({
threadId: thread.id,
topicId: state.topicId,
});
const snapshot = this.buildSubagentSnapshot(parentToolCallId, thread.id, messages);
if (snapshot) snapshots.push(snapshot);
}
if (snapshots.length === 0) return;
// Union: rehydrated (missing) runs + the in-memory ones (which win, since
// they carry live accumulators the DB hasn't caught up to yet).
const merged = rehydrateSubagentRunsState(snapshots);
for (const [parentToolCallId, run] of existing) merged.runs.set(parentToolCallId, run);
state.main = { ...state.main, subagents: merged };
} catch (err) {
log('refreshSubagentRunsFromDb failed op=%s err=%O', state.operationId, err);
}
}
/**
* Reconstruct one {@link SubagentRunSnapshot} from a thread's persisted
* messages (ordered `createdAt` asc by the query). Returns undefined when the
* thread has no assistant yet — without one there is nothing to attach a
* continuation turn to, and the first-event path will (correctly) seed it.
*/
private buildSubagentSnapshot(
parentToolCallId: string,
threadId: string,
messages: Array<{ id: string; parentId?: string | null; role: string; tool_call_id?: string }>,
): SubagentRunSnapshot | undefined {
const assistants = messages.filter((m) => m.role === 'assistant');
const currentAssistant = assistants.at(-1);
if (!currentAssistant) return undefined;
const toolRows = messages.filter((m) => m.role === 'tool' && m.tool_call_id);
const childTools = toolRows.filter((m) => m.parentId === currentAssistant.id);
const lastChainParentId = childTools.at(-1)?.id ?? currentAssistant.id;
return {
currentAssistantId: currentAssistant.id,
lastChainParentId,
lifetimeToolCallIds: toolRows.map((m) => m.tool_call_id!),
parentToolCallId,
threadId,
};
}
private async syncAssistantPointerForAdvancedStep(state: OperationState): Promise<void> {
const topic = await this.deps.topicModel.findById(state.topicId);
const running = topic?.metadata?.runningOperation;
@@ -844,6 +940,10 @@ export class HeterogeneousPersistenceHandler {
await this.deps.threadModel.create({
id: intent.threadId,
metadata: {
// Stamp the owning hetero operation so `refreshSubagentRunsFromDb`
// only rehydrates threads from THIS run — never a stale Processing
// thread a prior crashed/cancelled run left on the same topic.
operationId: state.operationId,
sourceToolCallId: intent.sourceToolCallId,
startedAt: new Date().toISOString(),
subagentType: intent.subagentType,
@@ -0,0 +1,374 @@
// @vitest-environment node
import type { AgentStreamEvent } from '@lobechat/agent-gateway-client';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import {
__resetOperationStatesForTesting,
HeterogeneousPersistenceHandler,
} from '../HeterogeneousPersistenceHandler';
/**
* Regression for the SERVER-ONLY "大量无意义的 SubAgent" bug.
*
* Root cause: `HeterogeneousPersistenceHandler` keeps per-operation state in a
* module-level `operationStates` map. On Vercel serverless, consecutive ingest
* batches for one operation can land on DIFFERENT (cold) replicas, so that map
* is empty on the next batch. `loadOrCreateState` rehydrates the MAIN-agent
* state from DB (accumulatedContent, toolState, toolMsgIdByCallId,
* currentAssistantMessageId) — but initializes `subagentState` with an empty
* `createSubagentRunsState()` and NEVER reconstructs the in-flight subagent
* runs from DB.
*
* Consequence: when a subagent run spans multiple batches, the first subagent
* event seen by each fresh replica hits the `!existing` branch of `ensureRun`
* and creates a BRAND-NEW thread for a `parentToolCallId` that already has one.
* The duplicates get the generic "Subagent" title because spawnMetadata only
* rides the first subagent event per parent (adapter `announcedSpawns`).
*
* The desktop client never hits this — it has a single long-lived
* `subagentState` closure for the whole run.
*
* This test simulates a cold replica between batches via
* `__resetOperationStatesForTesting()` (the in-memory map is dropped while the
* mock DB — `threads` / `messages` — persists, exactly like a fresh Lambda).
*/
interface FakeMessage {
agentId: string | null;
content: string;
id: string;
metadata?: any;
model?: string;
parentId?: string | null;
plugin?: any;
reasoning?: any;
role: 'user' | 'assistant' | 'tool' | 'task' | 'system';
threadId?: string | null;
tool_call_id?: string;
tools?: any[];
topicId: string | null;
}
interface FakeThread {
id: string;
metadata?: any;
sourceMessageId?: string | null;
status: string;
title: string;
topicId: string;
type: string;
}
const createHarness = (params: {
assistantMessageId: string;
operationId: string;
topicId: string;
}) => {
let nextMsgIdSeq = 0;
const messages = new Map<string, FakeMessage>();
const threads = new Map<string, FakeThread>();
messages.set(params.assistantMessageId, {
agentId: null,
content: '',
id: params.assistantMessageId,
role: 'assistant',
topicId: params.topicId,
});
const messageModel = {
create: vi.fn(async (input: Partial<FakeMessage>, id?: string) => {
nextMsgIdSeq += 1;
const msgId = id ?? `msg_${nextMsgIdSeq}`;
const msg: FakeMessage = {
agentId: input.agentId ?? null,
content: input.content ?? '',
id: msgId,
metadata: input.metadata,
model: input.model,
parentId: input.parentId ?? null,
plugin: input.plugin,
provider: undefined,
reasoning: input.reasoning,
role: input.role!,
threadId: input.threadId ?? null,
tool_call_id: input.tool_call_id,
topicId: input.topicId ?? null,
} as FakeMessage;
messages.set(msgId, msg);
return msg;
}),
update: vi.fn(async (id: string, patch: Partial<FakeMessage>) => {
const existing = messages.get(id);
if (!existing) return { success: false };
messages.set(id, { ...existing, ...patch });
return { success: true };
}),
updateToolMessage: vi.fn(async (id: string, patch: any) => {
const existing = messages.get(id);
if (!existing) return { success: false };
messages.set(id, { ...existing, content: patch.content ?? existing.content });
return { success: true };
}),
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
query: vi.fn(async (params: { threadId?: string; topicId?: string }) => {
if (params?.threadId) {
return [...messages.values()].filter((m) => m.threadId === params.threadId);
}
return [...messages.values()].filter((m) => !m.threadId && m.topicId === params?.topicId);
}),
getLastChildToolMessageId: vi.fn(async (assistantMessageId: string) => {
const match = [...messages.values()].findLast(
(m) => m.role === 'tool' && m.parentId === assistantMessageId && !m.threadId,
);
return match?.id;
}),
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => {
// Mirror the real query: every persisted tool row's (toolCallId → id).
return [...messages.values()]
.filter((m) => m.role === 'tool' && m.tool_call_id)
.map((m) => ({ id: m.id, toolCallId: m.tool_call_id! }));
}),
};
const threadModel = {
create: vi.fn(async (input: Partial<FakeThread>) => {
const thread: FakeThread = {
id: input.id!,
metadata: input.metadata,
sourceMessageId: input.sourceMessageId,
status: input.status ?? 'active',
title: input.title ?? '',
topicId: input.topicId ?? params.topicId,
type: input.type ?? 'isolation',
};
threads.set(thread.id, thread);
return thread;
}),
findById: vi.fn(async (id: string) => threads.get(id) ?? null),
queryByTopicId: vi.fn(async (topicId: string) =>
[...threads.values()].filter((t) => t.topicId === topicId),
),
update: vi.fn(async (id: string, patch: Partial<FakeThread>) => {
const existing = threads.get(id);
if (!existing) return;
threads.set(id, { ...existing, ...patch });
}),
};
const topicModel = {
findById: vi.fn(async (id: string) => {
if (id !== params.topicId) return null;
return {
agentId: null,
id,
metadata: {
runningOperation: {
assistantMessageId: params.assistantMessageId,
operationId: params.operationId,
},
},
};
}),
updateMetadata: vi.fn(async () => {}),
};
const handler = new HeterogeneousPersistenceHandler({
messageModel: messageModel as any,
threadModel: threadModel as any,
topicModel: topicModel as any,
});
return { handler, messages, threadModel, threads };
};
const buildEvent = (
type: AgentStreamEvent['type'],
stepIndex: number,
data: Record<string, unknown>,
): AgentStreamEvent => ({
data,
operationId: 'op-1',
stepIndex,
timestamp: 1_700_000_000_000 + stepIndex,
type,
});
const innerTool = (id: string) => ({
apiName: 'Bash',
arguments: '{}',
id,
identifier: 'bash',
type: 'default',
});
describe('HeterogeneousPersistenceHandler — subagent run survives a cold replica', () => {
beforeEach(() => __resetOperationStatesForTesting());
afterEach(() => __resetOperationStatesForTesting());
it('does NOT spawn a duplicate thread when a later batch of the SAME subagent run lands on a fresh replica', async () => {
const h = createHarness({
assistantMessageId: 'asst-1',
operationId: 'op-1',
topicId: 'topic-1',
});
const PARENT = 'tc-spawn-1';
// ── Batch 1 (replica A): first subagent turn. Carries spawnMetadata, so the
// thread is created with a real title. ──
await h.handler.ingest({
assistantMessageId: 'asst-1',
events: [
buildEvent('stream_chunk', 0, {
chunkType: 'tools_calling',
subagent: {
parentToolCallId: PARENT,
spawnMetadata: {
description: 'Explore session/agent topic data model',
prompt: 'investigate',
subagentType: 'Explore',
},
subagentMessageId: 'sub-msg-1',
},
toolsCalling: [innerTool('inner-1')],
}),
],
operationId: 'op-1',
topicId: 'topic-1',
});
expect(h.threads.size).toBe(1);
// ── Cold replica: the warm in-memory operation state is gone, but the DB
// (threads + messages) persists. ──
__resetOperationStatesForTesting();
// ── Batch 2 (replica B): the SAME subagent run continues with a new turn.
// Mirroring the adapter, this later event carries NO spawnMetadata. ──
await h.handler.ingest({
assistantMessageId: 'asst-1',
events: [
buildEvent('stream_chunk', 1, {
chunkType: 'tools_calling',
subagent: {
parentToolCallId: PARENT,
subagentMessageId: 'sub-msg-2',
},
toolsCalling: [innerTool('inner-2')],
}),
],
operationId: 'op-1',
topicId: 'topic-1',
});
// The continuation must attach to the EXISTING thread, not fork a new one.
expect(h.threads.size).toBe(1);
// And we must never produce a generic-titled "Subagent" duplicate.
expect([...h.threads.values()].some((t) => t.title === 'Subagent')).toBe(false);
});
// P1: a tools_calling batch reprocessed on a cold replica (BatchIngester
// retry, or a turn split across a cold boundary so the cumulative array is
// re-seen) must NOT mint a second tool message for an inner tool the run
// already persisted. Rehydration restores `lifetimeToolCallIds`, and the
// reducer de-dupes against it.
it('does NOT re-create an already-persisted inner tool row after a cold replica', async () => {
const h = createHarness({
assistantMessageId: 'asst-1',
operationId: 'op-1',
topicId: 'topic-1',
});
const PARENT = 'tc-spawn-1';
// Batch 1: turn sub-msg-1 persists inner-1.
await h.handler.ingest({
assistantMessageId: 'asst-1',
events: [
buildEvent('stream_chunk', 0, {
chunkType: 'tools_calling',
subagent: {
parentToolCallId: PARENT,
spawnMetadata: { prompt: 'go', subagentType: 'Explore' },
subagentMessageId: 'sub-msg-1',
},
toolsCalling: [innerTool('inner-1')],
}),
],
operationId: 'op-1',
topicId: 'topic-1',
});
__resetOperationStatesForTesting(); // cold replica
// Batch 2 (replica B): the SAME turn's cumulative array is re-seen (inner-1
// again) plus a new inner-2.
await h.handler.ingest({
assistantMessageId: 'asst-1',
events: [
buildEvent('stream_chunk', 1, {
chunkType: 'tools_calling',
subagent: { parentToolCallId: PARENT, subagentMessageId: 'sub-msg-1' },
toolsCalling: [innerTool('inner-1'), innerTool('inner-2')],
}),
],
operationId: 'op-1',
topicId: 'topic-1',
});
const toolRows = (callId: string) =>
[...h.messages.values()].filter((m) => m.role === 'tool' && m.tool_call_id === callId);
// inner-1 persisted exactly once (no duplicate row), inner-2 once.
expect(toolRows('inner-1')).toHaveLength(1);
expect(toolRows('inner-2')).toHaveLength(1);
expect(h.threads.size).toBe(1);
});
// P2: a stale `Processing` isolation thread left by a PRIOR operation on the
// same topic must not be rehydrated into — or finalized by — the current
// operation. The rehydration is scoped by `metadata.operationId`.
it('ignores a stale Processing thread from a different operation on the same topic', async () => {
const h = createHarness({
assistantMessageId: 'asst-1',
operationId: 'op-2',
topicId: 'topic-1',
});
// Seed a thread (+ its in-thread assistant) left Processing by op-1.
h.threads.set('thd-stale', {
id: 'thd-stale',
metadata: { operationId: 'op-1', sourceToolCallId: 'tc-old' },
sourceMessageId: 'asst-old',
status: 'processing',
title: 'Old Subagent',
topicId: 'topic-1',
type: 'isolation',
});
h.messages.set('stale-asst', {
agentId: null,
content: '',
id: 'stale-asst',
parentId: 'asst-old',
role: 'assistant',
threadId: 'thd-stale',
topicId: 'topic-1',
} as any);
// op-2 runs and terminates. The terminal orphan-drain would finalize every
// run in the reducer state — so if the stale thread were merged in, it would
// be flipped to Active here.
await h.handler.ingest({
assistantMessageId: 'asst-1',
events: [
buildEvent('stream_chunk', 0, { chunkType: 'text', content: 'working' }),
buildEvent('agent_runtime_end', 1, {}),
],
operationId: 'op-2',
topicId: 'topic-1',
});
// The unrelated thread is untouched: still Processing, never updated.
expect(h.threads.get('thd-stale')!.status).toBe('processing');
expect(h.threadModel.update).not.toHaveBeenCalledWith('thd-stale', expect.anything());
});
});