🐛 fix(hetero): chain step boundary off tool row when tools[] backfill is unseen

On a warm replica that did not drain the prior step's `tools_calling` (or
before the assistant's `tools[]` JSONB has its `result_msg_id` backfilled),
the in-memory tool state is empty, so the step boundary falls back to the
previous assistant and forks the wire into two disconnected bubbles.

Fall back to the authoritative anchor — the `role:'tool'` rows themselves,
committed in Phase 2 independently of the JSONB mirror's Phase-3 backfill —
via a new `MessageModel.getLastChildToolMessageId`. Excludes subagent tool
rows (threadId set) so they never anchor the main-agent wire.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-06-09 23:19:04 +08:00
parent af3f0ea171
commit 599eea5bda
4 changed files with 183 additions and 1 deletions
@@ -795,9 +795,24 @@ export class HeterogeneousPersistenceHandler {
await this.deps.messageModel.update(state.currentAssistantMessageId, prevUpdate);
}
const lastToolMsgId = [...state.toolState.payloads]
let lastToolMsgId = [...state.toolState.payloads]
.reverse()
.find((p) => !!p.result_msg_id)?.result_msg_id;
// In-memory tool state can be empty or unresolved here: a different
// replica drained this step's `tools_calling`, or the batch-start refresh
// read the assistant's `tools[]` JSONB before its Phase-3 `result_msg_id`
// backfill was visible. Chaining off `currentAssistantMessageId` in that
// case forks the wire (`asst1 → asst2` with the tools as a dead branch),
// which renders as two disconnected bubbles. Fall back to the
// authoritative source — the `role:'tool'` rows themselves (Phase 2),
// which are committed earlier and independently of the JSONB mirror.
if (!lastToolMsgId) {
lastToolMsgId = await this.deps.messageModel.getLastChildToolMessageId(
state.currentAssistantMessageId,
);
}
const stepParentId = lastToolMsgId || state.currentAssistantMessageId;
const newMsg = await this.deps.messageModel.create({
@@ -122,6 +122,12 @@ const createHarness = (
},
),
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
getLastChildToolMessageId: vi.fn(async (assistantMessageId: string) => {
const match = [...messages.values()].findLast(
(m) => m.role === 'tool' && m.parentId === assistantMessageId && !m.threadId,
);
return match?.id;
}),
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => []),
};
@@ -117,6 +117,14 @@ const createHarness = (params: {
},
),
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
getLastChildToolMessageId: vi.fn(async (assistantMessageId: string) => {
// Mirror the SQL: last-created main-agent (threadId null) tool row whose
// parentId is the assistant. Map insertion order == creation order.
const match = [...messages.values()].findLast(
(m) => m.role === 'tool' && m.parentId === assistantMessageId && !m.threadId,
);
return match?.id;
}),
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => []),
};
@@ -695,6 +703,124 @@ describe('HeterogeneousPersistenceHandler', () => {
expect(step2Asst!.provider).toBe('claude-code');
});
it('chains off the tool ROW when the refresh misses the tools[] result_msg_id backfill', async () => {
// Residual race the batch-start refresh does NOT cover: the other replica
// created the tool row (Phase 2) but its assistant.tools[] result_msg_id
// backfill (Phase 3) is not yet visible. The refresh keys off
// result_msg_id, so it sees 0 resolved tools → does NOT adopt → toolState
// stays empty → pre-fix the step boundary falls back to the previous
// assistant and forks the wire. The fix queries the role:'tool' row
// itself (committed in Phase 2, independent of the JSONB mirror).
const h = createHarness({
assistantMessageId: 'asst-init',
operationId: 'op-1',
topicId: 'topic-1',
});
const metaState: FakeTopicMetadata = {
runningOperation: { assistantMessageId: 'asst-init', operationId: 'op-1' },
};
h.topicModel.findById.mockImplementation(async (id: string) => {
if (id !== 'topic-1') return null;
return { agentId: null, id, metadata: { ...metaState } };
});
h.topicModel.updateMetadata.mockImplementation(async (_id: string, patch: any) => {
Object.assign(metaState, patch);
});
// ── Batch 1: this replica drains step 1's stream_start (no tools yet) ──
await h.handler.ingest({
events: [buildEvent('stream_start', 1, { newStep: true })],
operationId: 'op-1',
topicId: 'topic-1',
});
const step1Asst = [...h.messages.values()].find(
(m) => m.role === 'assistant' && m.id !== 'asst-init',
)!;
// ── Other replica created the tool ROW but NOT the tools[] backfill ──
// Note: step1Asst.tools[] is intentionally left WITHOUT result_msg_id,
// so the ingest refresh cannot recover the anchor.
h.messages.set('tool-row-only', {
agentId: null,
content: 'result body',
id: 'tool-row-only',
parentId: step1Asst.id,
role: 'tool',
threadId: null,
tool_call_id: 'tc-1',
topicId: 'topic-1',
});
// ── Batch 2: step 2 stream_start lands on THIS (empty-state) replica ──
await h.handler.ingest({
events: [buildEvent('stream_start', 2, { newStep: true })],
operationId: 'op-1',
topicId: 'topic-1',
});
const step2Asst = [...h.messages.values()].find(
(m) => m.role === 'assistant' && m.id !== 'asst-init' && m.id !== step1Asst.id,
);
expect(step2Asst).toBeDefined();
// Chains off the tool row, NOT the previous assistant → wire stays linear.
expect(step2Asst!.parentId).toBe('tool-row-only');
});
it('ignores subagent tool rows (threadId set) when resolving the step anchor', async () => {
// A subagent tool row lives on its own thread and must never anchor the
// main-agent wire. If the only `role:'tool'` child carries a threadId,
// the fallback must skip it and chain off the previous assistant.
const h = createHarness({
assistantMessageId: 'asst-init',
operationId: 'op-1',
topicId: 'topic-1',
});
const metaState: FakeTopicMetadata = {
runningOperation: { assistantMessageId: 'asst-init', operationId: 'op-1' },
};
h.topicModel.findById.mockImplementation(async (id: string) => {
if (id !== 'topic-1') return null;
return { agentId: null, id, metadata: { ...metaState } };
});
h.topicModel.updateMetadata.mockImplementation(async (_id: string, patch: any) => {
Object.assign(metaState, patch);
});
await h.handler.ingest({
events: [buildEvent('stream_start', 1, { newStep: true })],
operationId: 'op-1',
topicId: 'topic-1',
});
const step1Asst = [...h.messages.values()].find(
(m) => m.role === 'assistant' && m.id !== 'asst-init',
)!;
h.messages.set('subagent-tool', {
agentId: null,
content: 'sub result',
id: 'subagent-tool',
parentId: step1Asst.id,
role: 'tool',
threadId: 'thread-sub',
tool_call_id: 'tc-sub',
topicId: 'topic-1',
});
await h.handler.ingest({
events: [buildEvent('stream_start', 2, { newStep: true })],
operationId: 'op-1',
topicId: 'topic-1',
});
const step2Asst = [...h.messages.values()].find(
(m) => m.role === 'assistant' && m.id !== 'asst-init' && m.id !== step1Asst.id,
);
expect(step2Asst).toBeDefined();
expect(step2Asst!.parentId).toBe(step1Asst.id);
});
it('handleTurnMetadata persists model/provider to DB so other replicas can recover lastModel/lastProvider', async () => {
const h = createHarness({
assistantMessageId: 'asst-init',
+35
View File
@@ -2392,6 +2392,41 @@ export class MessageModel {
}
};
/**
* Id of the most recently-created main-agent `role:'tool'` message under an
* assistant message, or `undefined` when the assistant produced no tools.
*
* Heterogeneous step boundaries chain the next assistant off the previous
* step's final tool message (the wire is `asst → tool → … → asst → tool`).
* The persistence handler normally derives that anchor from its in-memory
* tool state, but on a warm replica that did NOT drain the prior step's
* `tools_calling` (or before the assistant's `tools[]` JSONB has its
* `result_msg_id` backfilled) that state is empty — so it falls back here.
*
* The `role:'tool'` rows are the authoritative anchor: they are created
* (Phase 2) with `parentId` = their step's assistant, earlier than and
* independent of the JSONB mirror's `result_msg_id` backfill (Phase 3).
* `threadId IS NULL` excludes subagent tool rows, which live on their own
* thread and must not anchor the main-agent wire.
*/
getLastChildToolMessageId = async (assistantMessageId: string): Promise<string | undefined> => {
const [row] = await this.db
.select({ id: messages.id })
.from(messages)
.where(
and(
eq(messages.parentId, assistantMessageId),
eq(messages.role, 'tool'),
isNull(messages.threadId),
this.ownership(),
),
)
.orderBy(desc(messages.createdAt))
.limit(1);
return row?.id;
};
updateTranslate = async (id: string, translate: Partial<ChatTranslate>) => {
const result = await this.db.query.messageTranslates.findFirst({
where: and(eq(messageTranslates.id, id), this.translatesOwnership()),