🐛 fix(hetero): chain step boundary off tool row when tools[] backfill is unseen (#15607)

* 🐛 fix(hetero): chain step boundary off tool row when tools[] backfill is unseen

On a warm replica that did not drain the prior step's `tools_calling` (or
before the assistant's `tools[]` JSONB has its `result_msg_id` backfilled),
the in-memory tool state is empty, so the step boundary falls back to the
previous assistant and forks the wire into two disconnected bubbles.

Fall back to the authoritative anchor — the `role:'tool'` rows themselves,
committed in Phase 2 independently of the JSONB mirror's Phase-3 backfill —
via a new `MessageModel.getLastChildToolMessageId`. Excludes subagent tool
rows (threadId set) so they never anchor the main-agent wire.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* 🐛 fix(hetero): write per-device cwd when adding topic from project group

The sidebar "+ new topic in this directory" action wrote the working
directory to the legacy per-agent slot (localAgentWorkingDirectoryMap),
which sits below agencyConfig.workingDirByDevice in the resolution
precedence. Once a directory had been picked via the ControlBar (which
writes workingDirByDevice), the "+" action was silently shadowed and the
new topic was created with the previously-picked directory instead.

Route the action through useCommitWorkingDirectory.commitAgentDefault so
it writes the same high-precedence per-device slot the picker uses,
keeping the two write paths from drifting again.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

*  test(hetero): cover MessageModel.getLastChildToolMessageId

The fallback anchor query added in 599eea5bda had no DB-level test — the
persistence handler mocks it, so its real SQL was never exercised and
patch coverage dropped. Add direct PGlite tests covering all branches:
latest-tool ordering, no-tool → undefined (ignoring non-tool children),
subagent thread exclusion (threadId IS NULL), and ownership isolation.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-06-10 00:42:37 +08:00
committed by GitHub
parent 4b5e001934
commit c329696dc2
7 changed files with 353 additions and 8 deletions
@@ -795,9 +795,24 @@ export class HeterogeneousPersistenceHandler {
await this.deps.messageModel.update(state.currentAssistantMessageId, prevUpdate);
}
const lastToolMsgId = [...state.toolState.payloads]
let lastToolMsgId = [...state.toolState.payloads]
.reverse()
.find((p) => !!p.result_msg_id)?.result_msg_id;
// In-memory tool state can be empty or unresolved here: a different
// replica drained this step's `tools_calling`, or the batch-start refresh
// read the assistant's `tools[]` JSONB before its Phase-3 `result_msg_id`
// backfill was visible. Chaining off `currentAssistantMessageId` in that
// case forks the wire (`asst1 → asst2` with the tools as a dead branch),
// which renders as two disconnected bubbles. Fall back to the
// authoritative source — the `role:'tool'` rows themselves (Phase 2),
// which are committed earlier and independently of the JSONB mirror.
if (!lastToolMsgId) {
lastToolMsgId = await this.deps.messageModel.getLastChildToolMessageId(
state.currentAssistantMessageId,
);
}
const stepParentId = lastToolMsgId || state.currentAssistantMessageId;
const newMsg = await this.deps.messageModel.create({
@@ -122,6 +122,12 @@ const createHarness = (
},
),
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
getLastChildToolMessageId: vi.fn(async (assistantMessageId: string) => {
const match = [...messages.values()].findLast(
(m) => m.role === 'tool' && m.parentId === assistantMessageId && !m.threadId,
);
return match?.id;
}),
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => []),
};
@@ -117,6 +117,14 @@ const createHarness = (params: {
},
),
findById: vi.fn(async (id: string) => messages.get(id) ?? null),
getLastChildToolMessageId: vi.fn(async (assistantMessageId: string) => {
// Mirror the SQL: last-created main-agent (threadId null) tool row whose
// parentId is the assistant. Map insertion order == creation order.
const match = [...messages.values()].findLast(
(m) => m.role === 'tool' && m.parentId === assistantMessageId && !m.threadId,
);
return match?.id;
}),
listMessagePluginsByTopic: vi.fn(async (_topicId: string) => []),
};
@@ -695,6 +703,124 @@ describe('HeterogeneousPersistenceHandler', () => {
expect(step2Asst!.provider).toBe('claude-code');
});
it('chains off the tool ROW when the refresh misses the tools[] result_msg_id backfill', async () => {
// Residual race the batch-start refresh does NOT cover: the other replica
// created the tool row (Phase 2) but its assistant.tools[] result_msg_id
// backfill (Phase 3) is not yet visible. The refresh keys off
// result_msg_id, so it sees 0 resolved tools → does NOT adopt → toolState
// stays empty → pre-fix the step boundary falls back to the previous
// assistant and forks the wire. The fix queries the role:'tool' row
// itself (committed in Phase 2, independent of the JSONB mirror).
const h = createHarness({
assistantMessageId: 'asst-init',
operationId: 'op-1',
topicId: 'topic-1',
});
const metaState: FakeTopicMetadata = {
runningOperation: { assistantMessageId: 'asst-init', operationId: 'op-1' },
};
h.topicModel.findById.mockImplementation(async (id: string) => {
if (id !== 'topic-1') return null;
return { agentId: null, id, metadata: { ...metaState } };
});
h.topicModel.updateMetadata.mockImplementation(async (_id: string, patch: any) => {
Object.assign(metaState, patch);
});
// ── Batch 1: this replica drains step 1's stream_start (no tools yet) ──
await h.handler.ingest({
events: [buildEvent('stream_start', 1, { newStep: true })],
operationId: 'op-1',
topicId: 'topic-1',
});
const step1Asst = [...h.messages.values()].find(
(m) => m.role === 'assistant' && m.id !== 'asst-init',
)!;
// ── Other replica created the tool ROW but NOT the tools[] backfill ──
// Note: step1Asst.tools[] is intentionally left WITHOUT result_msg_id,
// so the ingest refresh cannot recover the anchor.
h.messages.set('tool-row-only', {
agentId: null,
content: 'result body',
id: 'tool-row-only',
parentId: step1Asst.id,
role: 'tool',
threadId: null,
tool_call_id: 'tc-1',
topicId: 'topic-1',
});
// ── Batch 2: step 2 stream_start lands on THIS (empty-state) replica ──
await h.handler.ingest({
events: [buildEvent('stream_start', 2, { newStep: true })],
operationId: 'op-1',
topicId: 'topic-1',
});
const step2Asst = [...h.messages.values()].find(
(m) => m.role === 'assistant' && m.id !== 'asst-init' && m.id !== step1Asst.id,
);
expect(step2Asst).toBeDefined();
// Chains off the tool row, NOT the previous assistant → wire stays linear.
expect(step2Asst!.parentId).toBe('tool-row-only');
});
it('ignores subagent tool rows (threadId set) when resolving the step anchor', async () => {
// A subagent tool row lives on its own thread and must never anchor the
// main-agent wire. If the only `role:'tool'` child carries a threadId,
// the fallback must skip it and chain off the previous assistant.
const h = createHarness({
assistantMessageId: 'asst-init',
operationId: 'op-1',
topicId: 'topic-1',
});
const metaState: FakeTopicMetadata = {
runningOperation: { assistantMessageId: 'asst-init', operationId: 'op-1' },
};
h.topicModel.findById.mockImplementation(async (id: string) => {
if (id !== 'topic-1') return null;
return { agentId: null, id, metadata: { ...metaState } };
});
h.topicModel.updateMetadata.mockImplementation(async (_id: string, patch: any) => {
Object.assign(metaState, patch);
});
await h.handler.ingest({
events: [buildEvent('stream_start', 1, { newStep: true })],
operationId: 'op-1',
topicId: 'topic-1',
});
const step1Asst = [...h.messages.values()].find(
(m) => m.role === 'assistant' && m.id !== 'asst-init',
)!;
h.messages.set('subagent-tool', {
agentId: null,
content: 'sub result',
id: 'subagent-tool',
parentId: step1Asst.id,
role: 'tool',
threadId: 'thread-sub',
tool_call_id: 'tc-sub',
topicId: 'topic-1',
});
await h.handler.ingest({
events: [buildEvent('stream_start', 2, { newStep: true })],
operationId: 'op-1',
topicId: 'topic-1',
});
const step2Asst = [...h.messages.values()].find(
(m) => m.role === 'assistant' && m.id !== 'asst-init' && m.id !== step1Asst.id,
);
expect(step2Asst).toBeDefined();
expect(step2Asst!.parentId).toBe(step1Asst.id);
});
it('handleTurnMetadata persists model/provider to DB so other replicas can recover lastModel/lastProvider', async () => {
const h = createHarness({
assistantMessageId: 'asst-init',
@@ -2956,4 +2956,134 @@ describe('MessageModel Query Tests', () => {
});
});
});
describe('getLastChildToolMessageId', () => {
it('should return the most recently-created tool child of an assistant message', async () => {
await serverDB.insert(messages).values([
{
id: 'asst-1',
userId,
role: 'assistant',
content: 'assistant step',
createdAt: new Date('2023-01-01T00:00:00'),
},
{
id: 'tool-early',
userId,
role: 'tool',
parentId: 'asst-1',
content: 'first tool',
createdAt: new Date('2023-01-01T00:00:01'),
},
{
id: 'tool-late',
userId,
role: 'tool',
parentId: 'asst-1',
content: 'second tool',
createdAt: new Date('2023-01-01T00:00:02'),
},
]);
const result = await messageModel.getLastChildToolMessageId('asst-1');
expect(result).toBe('tool-late');
});
it('should return undefined when the assistant produced no tool children', async () => {
await serverDB.insert(messages).values([
{
id: 'asst-no-tools',
userId,
role: 'assistant',
content: 'no tools here',
createdAt: new Date('2023-01-01'),
},
{
// a non-tool child must be ignored
id: 'child-assistant',
userId,
role: 'assistant',
parentId: 'asst-no-tools',
content: 'follow-up assistant',
createdAt: new Date('2023-01-02'),
},
]);
const result = await messageModel.getLastChildToolMessageId('asst-no-tools');
expect(result).toBeUndefined();
});
it('should exclude subagent tool rows that live on their own thread', async () => {
await serverDB.transaction(async (trx) => {
await trx.insert(sessions).values([{ id: 'session1', userId }]);
await trx.insert(topics).values([{ id: 'topic1', sessionId: 'session1', userId }]);
await trx.insert(threads).values([
{
id: 'subagent-thread',
userId,
topicId: 'topic1',
sourceMessageId: 'asst-main',
type: 'standalone',
},
]);
await trx.insert(messages).values([
{
id: 'asst-main',
userId,
role: 'assistant',
content: 'main agent step',
createdAt: new Date('2023-01-01T00:00:00'),
},
{
id: 'tool-main',
userId,
role: 'tool',
parentId: 'asst-main',
threadId: null,
content: 'main-agent tool',
createdAt: new Date('2023-01-01T00:00:01'),
},
{
// newer, but on a subagent thread — must not anchor the main wire
id: 'tool-subagent',
userId,
role: 'tool',
parentId: 'asst-main',
threadId: 'subagent-thread',
content: 'subagent tool',
createdAt: new Date('2023-01-01T00:00:02'),
},
]);
});
const result = await messageModel.getLastChildToolMessageId('asst-main');
expect(result).toBe('tool-main');
});
it('should not return tool rows belonging to other users', async () => {
const otherModel = new MessageModel(serverDB, otherUserId);
await serverDB.insert(messages).values([
{
id: 'asst-shared-id',
userId: otherUserId,
role: 'assistant',
content: 'other user assistant',
createdAt: new Date('2023-01-01T00:00:00'),
},
{
id: 'tool-other-user',
userId: otherUserId,
role: 'tool',
parentId: 'asst-shared-id',
content: 'other user tool',
createdAt: new Date('2023-01-01T00:00:01'),
},
]);
// current user must not see another user's tool rows
expect(await messageModel.getLastChildToolMessageId('asst-shared-id')).toBeUndefined();
// the owning user does
expect(await otherModel.getLastChildToolMessageId('asst-shared-id')).toBe('tool-other-user');
});
});
});
+35
View File
@@ -2392,6 +2392,41 @@ export class MessageModel {
}
};
/**
* Id of the most recently-created main-agent `role:'tool'` message under an
* assistant message, or `undefined` when the assistant produced no tools.
*
* Heterogeneous step boundaries chain the next assistant off the previous
* step's final tool message (the wire is `asst → tool → … → asst → tool`).
* The persistence handler normally derives that anchor from its in-memory
* tool state, but on a warm replica that did NOT drain the prior step's
* `tools_calling` (or before the assistant's `tools[]` JSONB has its
* `result_msg_id` backfilled) that state is empty — so it falls back here.
*
* The `role:'tool'` rows are the authoritative anchor: they are created
* (Phase 2) with `parentId` = their step's assistant, earlier than and
* independent of the JSONB mirror's `result_msg_id` backfill (Phase 3).
* `threadId IS NULL` excludes subagent tool rows, which live on their own
* thread and must not anchor the main-agent wire.
*/
getLastChildToolMessageId = async (assistantMessageId: string): Promise<string | undefined> => {
const [row] = await this.db
.select({ id: messages.id })
.from(messages)
.where(
and(
eq(messages.parentId, assistantMessageId),
eq(messages.role, 'tool'),
isNull(messages.threadId),
this.ownership(),
),
)
.orderBy(desc(messages.createdAt))
.limit(1);
return row?.id;
};
updateTranslate = async (id: string, translate: Partial<ChatTranslate>) => {
const result = await this.db.query.messageTranslates.findFirst({
where: and(eq(messageTranslates.id, id), this.translatesOwnership()),
@@ -110,6 +110,35 @@ export const useCommitWorkingDirectory = (agentId: string) => {
[activeTopic, t, writeCwd],
);
/**
* Set the agent's per-device default cwd, ignoring any active topic. Used by
* the sidebar "new topic in this directory" (+) action, which always targets
* a fresh topic regardless of what conversation is currently open — so it must
* write the same high-precedence slot the picker uses for new topics
* (`workingDirByDevice`), not the legacy per-agent fallback. The new topic
* bakes this value into its own `metadata.workingDirectory` on first send.
*/
const commitAgentDefault = useCallback(
async (newPath: string) => {
const path = newPath.trim();
if (!path) return;
if (targetDeviceId) {
const prev = agencyConfig?.workingDirByDevice ?? {};
await updateAgentConfigById(agentId, {
agencyConfig: {
...agencyConfig,
workingDirByDevice: { ...prev, [targetDeviceId]: path },
},
});
} else {
// No resolvable device (e.g. gateway id unavailable) — fall back to the
// legacy per-agent slot so the action still takes effect.
await updateAgentRuntimeEnvConfigById(agentId, { workingDirectory: path });
}
},
[agentId, agencyConfig, targetDeviceId, updateAgentConfigById, updateAgentRuntimeEnvConfigById],
);
/** Clear the current selection (falls back to the next precedence level). */
const clear = useCallback(async () => {
const run = () => writeCwd(undefined);
@@ -129,5 +158,5 @@ export const useCommitWorkingDirectory = (agentId: string) => {
await run();
}, [activeTopic, t, writeCwd]);
return { clear, commit };
return { clear, commit, commitAgentDefault };
};
@@ -5,6 +5,7 @@ import { memo, useCallback, useMemo } from 'react';
import { useTranslation } from 'react-i18next';
import { isDesktop } from '@/const/version';
import { useCommitWorkingDirectory } from '@/features/ChatInput/ControlBar/useCommitWorkingDirectory';
import { useAgentStore } from '@/store/agent';
import { useChatStore } from '@/store/chat';
@@ -22,14 +23,17 @@ const GroupItem = memo<GroupItemComponentProps>(({ group, activeTopicId, activeT
[id],
);
const agentId = useAgentStore((s) => s.activeAgentId);
const { commitAgentDefault } = useCommitWorkingDirectory(agentId ?? '');
const handleAddTopic = useCallback(async () => {
if (!workingDirectory) return;
const agentId = useAgentStore.getState().activeAgentId;
if (agentId) {
await useAgentStore.getState().updateAgentRuntimeEnvConfigById(agentId, { workingDirectory });
}
if (!workingDirectory || !agentId) return;
// Write the agent's per-device default so the new topic inherits this
// directory at creation time — the same high-precedence slot the picker
// uses, not the legacy per-agent fallback that gets shadowed by it.
await commitAgentDefault(workingDirectory);
useChatStore.getState().switchTopic(null, { skipRefreshMessage: true });
}, [workingDirectory]);
}, [workingDirectory, agentId, commitAgentDefault]);
const canAddTopic = isDesktop && !!workingDirectory;