mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-13 19:20:04 +00:00
🐛 fix(hetero): chain step boundary off tool row when tools[] backfill is unseen
On a warm replica that did not drain the prior step's `tools_calling` (or before the assistant's `tools[]` JSONB has its `result_msg_id` backfilled), the in-memory tool state is empty, so the step boundary falls back to the previous assistant and forks the wire into two disconnected bubbles. Fall back to the authoritative anchor — the `role:'tool'` rows themselves, committed in Phase 2 independently of the JSONB mirror's Phase-3 backfill — via a new `MessageModel.getLastChildToolMessageId`. Excludes subagent tool rows (threadId set) so they never anchor the main-agent wire. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
@@ -795,9 +795,24 @@ export class HeterogeneousPersistenceHandler {
|
||||
await this.deps.messageModel.update(state.currentAssistantMessageId, prevUpdate);
|
||||
}
|
||||
|
||||
const lastToolMsgId = [...state.toolState.payloads]
|
||||
let lastToolMsgId = [...state.toolState.payloads]
|
||||
.reverse()
|
||||
.find((p) => !!p.result_msg_id)?.result_msg_id;
|
||||
|
||||
// In-memory tool state can be empty or unresolved here: a different
|
||||
// replica drained this step's `tools_calling`, or the batch-start refresh
|
||||
// read the assistant's `tools[]` JSONB before its Phase-3 `result_msg_id`
|
||||
// backfill was visible. Chaining off `currentAssistantMessageId` in that
|
||||
// case forks the wire (`asst1 → asst2` with the tools as a dead branch),
|
||||
// which renders as two disconnected bubbles. Fall back to the
|
||||
// authoritative source — the `role:'tool'` rows themselves (Phase 2),
|
||||
// which are committed earlier and independently of the JSONB mirror.
|
||||
if (!lastToolMsgId) {
|
||||
lastToolMsgId = await this.deps.messageModel.getLastChildToolMessageId(
|
||||
state.currentAssistantMessageId,
|
||||
);
|
||||
}
|
||||
|
||||
const stepParentId = lastToolMsgId || state.currentAssistantMessageId;
|
||||
|
||||
const newMsg = await this.deps.messageModel.create({
|
||||
|
||||
+6
@@ -122,6 +122,12 @@ const createHarness = (
|
||||
},
|
||||
),
|
||||
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
|
||||
getLastChildToolMessageId: vi.fn(async (assistantMessageId: string) => {
|
||||
const match = [...messages.values()].findLast(
|
||||
(m) => m.role === 'tool' && m.parentId === assistantMessageId && !m.threadId,
|
||||
);
|
||||
return match?.id;
|
||||
}),
|
||||
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => []),
|
||||
};
|
||||
|
||||
|
||||
+126
@@ -117,6 +117,14 @@ const createHarness = (params: {
|
||||
},
|
||||
),
|
||||
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
|
||||
getLastChildToolMessageId: vi.fn(async (assistantMessageId: string) => {
|
||||
// Mirror the SQL: last-created main-agent (threadId null) tool row whose
|
||||
// parentId is the assistant. Map insertion order == creation order.
|
||||
const match = [...messages.values()].findLast(
|
||||
(m) => m.role === 'tool' && m.parentId === assistantMessageId && !m.threadId,
|
||||
);
|
||||
return match?.id;
|
||||
}),
|
||||
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => []),
|
||||
};
|
||||
|
||||
@@ -695,6 +703,124 @@ describe('HeterogeneousPersistenceHandler', () => {
|
||||
expect(step2Asst!.provider).toBe('claude-code');
|
||||
});
|
||||
|
||||
it('chains off the tool ROW when the refresh misses the tools[] result_msg_id backfill', async () => {
|
||||
// Residual race the batch-start refresh does NOT cover: the other replica
|
||||
// created the tool row (Phase 2) but its assistant.tools[] result_msg_id
|
||||
// backfill (Phase 3) is not yet visible. The refresh keys off
|
||||
// result_msg_id, so it sees 0 resolved tools → does NOT adopt → toolState
|
||||
// stays empty → pre-fix the step boundary falls back to the previous
|
||||
// assistant and forks the wire. The fix queries the role:'tool' row
|
||||
// itself (committed in Phase 2, independent of the JSONB mirror).
|
||||
const h = createHarness({
|
||||
assistantMessageId: 'asst-init',
|
||||
operationId: 'op-1',
|
||||
topicId: 'topic-1',
|
||||
});
|
||||
|
||||
const metaState: FakeTopicMetadata = {
|
||||
runningOperation: { assistantMessageId: 'asst-init', operationId: 'op-1' },
|
||||
};
|
||||
h.topicModel.findById.mockImplementation(async (id: string) => {
|
||||
if (id !== 'topic-1') return null;
|
||||
return { agentId: null, id, metadata: { ...metaState } };
|
||||
});
|
||||
h.topicModel.updateMetadata.mockImplementation(async (_id: string, patch: any) => {
|
||||
Object.assign(metaState, patch);
|
||||
});
|
||||
|
||||
// ── Batch 1: this replica drains step 1's stream_start (no tools yet) ──
|
||||
await h.handler.ingest({
|
||||
events: [buildEvent('stream_start', 1, { newStep: true })],
|
||||
operationId: 'op-1',
|
||||
topicId: 'topic-1',
|
||||
});
|
||||
const step1Asst = [...h.messages.values()].find(
|
||||
(m) => m.role === 'assistant' && m.id !== 'asst-init',
|
||||
)!;
|
||||
|
||||
// ── Other replica created the tool ROW but NOT the tools[] backfill ──
|
||||
// Note: step1Asst.tools[] is intentionally left WITHOUT result_msg_id,
|
||||
// so the ingest refresh cannot recover the anchor.
|
||||
h.messages.set('tool-row-only', {
|
||||
agentId: null,
|
||||
content: 'result body',
|
||||
id: 'tool-row-only',
|
||||
parentId: step1Asst.id,
|
||||
role: 'tool',
|
||||
threadId: null,
|
||||
tool_call_id: 'tc-1',
|
||||
topicId: 'topic-1',
|
||||
});
|
||||
|
||||
// ── Batch 2: step 2 stream_start lands on THIS (empty-state) replica ──
|
||||
await h.handler.ingest({
|
||||
events: [buildEvent('stream_start', 2, { newStep: true })],
|
||||
operationId: 'op-1',
|
||||
topicId: 'topic-1',
|
||||
});
|
||||
|
||||
const step2Asst = [...h.messages.values()].find(
|
||||
(m) => m.role === 'assistant' && m.id !== 'asst-init' && m.id !== step1Asst.id,
|
||||
);
|
||||
expect(step2Asst).toBeDefined();
|
||||
// Chains off the tool row, NOT the previous assistant → wire stays linear.
|
||||
expect(step2Asst!.parentId).toBe('tool-row-only');
|
||||
});
|
||||
|
||||
it('ignores subagent tool rows (threadId set) when resolving the step anchor', async () => {
|
||||
// A subagent tool row lives on its own thread and must never anchor the
|
||||
// main-agent wire. If the only `role:'tool'` child carries a threadId,
|
||||
// the fallback must skip it and chain off the previous assistant.
|
||||
const h = createHarness({
|
||||
assistantMessageId: 'asst-init',
|
||||
operationId: 'op-1',
|
||||
topicId: 'topic-1',
|
||||
});
|
||||
|
||||
const metaState: FakeTopicMetadata = {
|
||||
runningOperation: { assistantMessageId: 'asst-init', operationId: 'op-1' },
|
||||
};
|
||||
h.topicModel.findById.mockImplementation(async (id: string) => {
|
||||
if (id !== 'topic-1') return null;
|
||||
return { agentId: null, id, metadata: { ...metaState } };
|
||||
});
|
||||
h.topicModel.updateMetadata.mockImplementation(async (_id: string, patch: any) => {
|
||||
Object.assign(metaState, patch);
|
||||
});
|
||||
|
||||
await h.handler.ingest({
|
||||
events: [buildEvent('stream_start', 1, { newStep: true })],
|
||||
operationId: 'op-1',
|
||||
topicId: 'topic-1',
|
||||
});
|
||||
const step1Asst = [...h.messages.values()].find(
|
||||
(m) => m.role === 'assistant' && m.id !== 'asst-init',
|
||||
)!;
|
||||
|
||||
h.messages.set('subagent-tool', {
|
||||
agentId: null,
|
||||
content: 'sub result',
|
||||
id: 'subagent-tool',
|
||||
parentId: step1Asst.id,
|
||||
role: 'tool',
|
||||
threadId: 'thread-sub',
|
||||
tool_call_id: 'tc-sub',
|
||||
topicId: 'topic-1',
|
||||
});
|
||||
|
||||
await h.handler.ingest({
|
||||
events: [buildEvent('stream_start', 2, { newStep: true })],
|
||||
operationId: 'op-1',
|
||||
topicId: 'topic-1',
|
||||
});
|
||||
|
||||
const step2Asst = [...h.messages.values()].find(
|
||||
(m) => m.role === 'assistant' && m.id !== 'asst-init' && m.id !== step1Asst.id,
|
||||
);
|
||||
expect(step2Asst).toBeDefined();
|
||||
expect(step2Asst!.parentId).toBe(step1Asst.id);
|
||||
});
|
||||
|
||||
it('handleTurnMetadata persists model/provider to DB so other replicas can recover lastModel/lastProvider', async () => {
|
||||
const h = createHarness({
|
||||
assistantMessageId: 'asst-init',
|
||||
|
||||
@@ -2392,6 +2392,41 @@ export class MessageModel {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Id of the most recently-created main-agent `role:'tool'` message under an
|
||||
* assistant message, or `undefined` when the assistant produced no tools.
|
||||
*
|
||||
* Heterogeneous step boundaries chain the next assistant off the previous
|
||||
* step's final tool message (the wire is `asst → tool → … → asst → tool`).
|
||||
* The persistence handler normally derives that anchor from its in-memory
|
||||
* tool state, but on a warm replica that did NOT drain the prior step's
|
||||
* `tools_calling` (or before the assistant's `tools[]` JSONB has its
|
||||
* `result_msg_id` backfilled) that state is empty — so it falls back here.
|
||||
*
|
||||
* The `role:'tool'` rows are the authoritative anchor: they are created
|
||||
* (Phase 2) with `parentId` = their step's assistant, earlier than and
|
||||
* independent of the JSONB mirror's `result_msg_id` backfill (Phase 3).
|
||||
* `threadId IS NULL` excludes subagent tool rows, which live on their own
|
||||
* thread and must not anchor the main-agent wire.
|
||||
*/
|
||||
getLastChildToolMessageId = async (assistantMessageId: string): Promise<string | undefined> => {
|
||||
const [row] = await this.db
|
||||
.select({ id: messages.id })
|
||||
.from(messages)
|
||||
.where(
|
||||
and(
|
||||
eq(messages.parentId, assistantMessageId),
|
||||
eq(messages.role, 'tool'),
|
||||
isNull(messages.threadId),
|
||||
this.ownership(),
|
||||
),
|
||||
)
|
||||
.orderBy(desc(messages.createdAt))
|
||||
.limit(1);
|
||||
|
||||
return row?.id;
|
||||
};
|
||||
|
||||
updateTranslate = async (id: string, translate: Partial<ChatTranslate>) => {
|
||||
const result = await this.db.query.messageTranslates.findFirst({
|
||||
where: and(eq(messageTranslates.id, id), this.translatesOwnership()),
|
||||
|
||||
Reference in New Issue
Block a user