diff --git a/apps/cli/src/daemon/taskRegistry.ts b/apps/cli/src/daemon/taskRegistry.ts index a3ffcc1b17..8ce430080a 100644 --- a/apps/cli/src/daemon/taskRegistry.ts +++ b/apps/cli/src/daemon/taskRegistry.ts @@ -4,7 +4,7 @@ import path from 'node:path'; export interface TaskEntry { agentId?: string; - agentType: 'hermes' | 'openclaw'; + agentType: string; operationId: string; pid: number; startedAt: string; diff --git a/apps/cli/src/device/agentRun.test.ts b/apps/cli/src/device/agentRun.test.ts index fe756b9a9d..420b2ae492 100644 --- a/apps/cli/src/device/agentRun.test.ts +++ b/apps/cli/src/device/agentRun.test.ts @@ -4,14 +4,24 @@ import { afterEach, describe, expect, it, vi } from 'vitest'; import { spawnHeteroAgentRun } from './agentRun'; -const { spawnMock } = vi.hoisted(() => ({ spawnMock: vi.fn() })); +const { removeTaskMock, saveTaskMock, spawnMock } = vi.hoisted(() => ({ + removeTaskMock: vi.fn(), + saveTaskMock: vi.fn(), + spawnMock: vi.fn(), +})); vi.mock('node:child_process', () => ({ spawn: spawnMock })); +vi.mock('../daemon/taskRegistry', () => ({ + removeTask: removeTaskMock, + saveTask: saveTaskMock, +})); -const makeFakeChild = () => { +const makeFakeChild = (pid = 1234) => { const child = new EventEmitter() as EventEmitter & { + pid: number; stdin: { end: ReturnType; write: ReturnType }; }; + child.pid = pid; child.stdin = { end: vi.fn(), write: vi.fn() }; return child; }; @@ -27,6 +37,8 @@ const baseParams = { describe('spawnHeteroAgentRun', () => { afterEach(() => { + removeTaskMock.mockReset(); + saveTaskMock.mockReset(); spawnMock.mockReset(); }); @@ -66,6 +78,7 @@ describe('spawnHeteroAgentRun', () => { ]); expect(opts).toMatchObject({ cwd: '/work/dir', + detached: process.platform !== 'win32', env: expect.objectContaining({ LOBEHUB_JWT: 'jwt-token', LOBEHUB_SERVER: 'https://app.lobehub.com', @@ -79,6 +92,15 @@ describe('spawnHeteroAgentRun', () => { await expect(ackPromise).resolves.toEqual({ status: 'accepted' }); expect(child.stdin.write).toHaveBeenCalledWith(JSON.stringify('hi')); expect(child.stdin.end).toHaveBeenCalledTimes(1); + expect(saveTaskMock).toHaveBeenCalledWith( + expect.objectContaining({ + agentType: 'claudeCode', + operationId: 'op-1', + pid: 1234, + taskId: 'op-1', + topicId: 'tpc-1', + }), + ); }); it('rejects (no stuck run) when the child errors before spawning, e.g. bad cwd', async () => { @@ -90,6 +112,24 @@ describe('spawnHeteroAgentRun', () => { await expect(ackPromise).resolves.toEqual({ reason: 'spawn ENOENT', status: 'rejected' }); expect(child.stdin.write).not.toHaveBeenCalled(); + expect(removeTaskMock).toHaveBeenCalledWith('op'); + }); + + it('removes the registered task when the child exits', async () => { + const child = makeFakeChild(4321); + spawnMock.mockReturnValue(child); + + const ackPromise = spawnHeteroAgentRun({ + ...baseParams, + operationId: 'op-exit', + topicId: 'tpc-exit', + }); + child.emit('spawn'); + await ackPromise; + + child.emit('exit', 0, null); + + expect(removeTaskMock).toHaveBeenCalledWith('op-exit'); }); it('appends --resume when resuming a session', () => { diff --git a/apps/cli/src/device/agentRun.ts b/apps/cli/src/device/agentRun.ts index 1bbe8ee3d2..3d7c38b0c8 100644 --- a/apps/cli/src/device/agentRun.ts +++ b/apps/cli/src/device/agentRun.ts @@ -5,6 +5,8 @@ import { type HeteroExecImageRef, } from '@lobechat/heterogeneous-agents/protocol'; +import { removeTask, saveTask } from '../daemon/taskRegistry'; + export interface SpawnHeteroAgentRunParams { agentType: string; cwd?: string; @@ -101,6 +103,7 @@ export function spawnHeteroAgentRun( const child = spawn(process.execPath, [...process.execArgv, ...cliArgs], { cwd: workDir, + detached: process.platform !== 'win32', env: { ...process.env, LOBEHUB_JWT: jwt, @@ -109,7 +112,27 @@ export function spawnHeteroAgentRun( stdio: ['pipe', 'inherit', 'inherit'], }); + let taskSaved = false; + const saveRunningTask = () => { + if (taskSaved || child.pid === undefined) return; + taskSaved = true; + saveTask({ + agentType, + operationId, + pid: child.pid, + startedAt: new Date().toISOString(), + taskId: operationId, + topicId, + }); + }; + + saveRunningTask(); + child.once('spawn', () => { + if (child.pid !== undefined) { + saveRunningTask(); + } + // Only safe to write stdin once the process actually started. try { child.stdin?.write(stdinPayload); @@ -123,11 +146,13 @@ export function spawnHeteroAgentRun( }); child.once('error', (err) => { + removeTask(operationId); logger?.error?.(`hetero exec spawn failed (op=${operationId}): ${err.message}`); settle({ reason: err.message, status: 'rejected' }); }); child.on('exit', (code, signal) => { + removeTask(operationId); logger?.info?.(`hetero exec exited (op=${operationId}) code=${code} signal=${signal}`); }); }); diff --git a/apps/cli/src/tools/__tests__/heteroTask.test.ts b/apps/cli/src/tools/__tests__/heteroTask.test.ts index c2e6a775b4..8732274450 100644 --- a/apps/cli/src/tools/__tests__/heteroTask.test.ts +++ b/apps/cli/src/tools/__tests__/heteroTask.test.ts @@ -1,7 +1,7 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { removeTask, saveTask } from '../../daemon/taskRegistry'; -import { runHeteroTask } from '../heteroTask'; +import { cancelHeteroTask, runHeteroTask } from '../heteroTask'; // ─── Mocks ─── @@ -249,3 +249,31 @@ describe('runHeteroTask (openclaw)', () => { killSpy.mockRestore(); }); }); + +describe('cancelHeteroTask', () => { + beforeEach(() => { + vi.clearAllMocks(); + for (const key of Object.keys(taskStore)) delete taskStore[key]; + }); + + afterEach(() => { + vi.restoreAllMocks(); + }); + + it('signals the process group for a registered codex task', async () => { + const killSpy = vi.spyOn(process, 'kill').mockImplementation(() => true); + taskStore['op-codex'] = { + agentType: 'codex', + operationId: 'op-codex', + pid: 4321, + startedAt: '2026-01-01T00:00:00.000Z', + taskId: 'op-codex', + topicId: 'topic-1', + }; + + const result = await cancelHeteroTask({ taskId: 'op-codex' }); + + expect(result).toBe(JSON.stringify({ pid: 4321, signal: 'SIGINT', taskId: 'op-codex' })); + expect(killSpy).toHaveBeenCalledWith(process.platform === 'win32' ? 4321 : -4321, 'SIGINT'); + }); +}); diff --git a/apps/cli/src/tools/heteroTask.ts b/apps/cli/src/tools/heteroTask.ts index 635c1eb36b..67ce63ed65 100644 --- a/apps/cli/src/tools/heteroTask.ts +++ b/apps/cli/src/tools/heteroTask.ts @@ -64,6 +64,19 @@ export interface CancelHeteroTaskParams { taskId: string; } +function signalTaskProcess(pid: number, signal: NodeJS.Signals): void { + if (process.platform === 'win32') { + process.kill(pid, signal); + return; + } + + try { + process.kill(-pid, signal); + } catch { + process.kill(pid, signal); + } +} + async function sendAutoNotify( topicId: string, taskId: string, @@ -320,9 +333,11 @@ export async function cancelHeteroTask(params: CancelHeteroTaskParams): Promise< return JSON.stringify({ message: `No task found with taskId: ${taskId}`, success: false }); } - // Both openclaw and hermes: kill by PID and let the child's close handler send the notify. + // Signal the whole process group when available. Local CLI agent runs + // (claude-code / codex) can spawn their own tool subprocesses, so a + // parent-only signal is not enough. try { - process.kill(entry.pid, signal); + signalTaskProcess(entry.pid, signal); } catch (err) { // Process already exited — exit handler won't fire; clean up manually. log.warn( diff --git a/apps/server/src/routers/lambda/__tests__/aiAgent.interruptTask.test.ts b/apps/server/src/routers/lambda/__tests__/aiAgent.interruptTask.test.ts index ebf0888a97..45ee7c0211 100644 --- a/apps/server/src/routers/lambda/__tests__/aiAgent.interruptTask.test.ts +++ b/apps/server/src/routers/lambda/__tests__/aiAgent.interruptTask.test.ts @@ -9,6 +9,11 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { aiAgentRouter } from '../aiAgent'; import { cleanupTestUser, createTestUser } from './integration/setup'; +const { mockExecuteToolCall, mockSandboxCallTool } = vi.hoisted(() => ({ + mockExecuteToolCall: vi.fn(), + mockSandboxCallTool: vi.fn(), +})); + // Mock getServerDB to return our test database instance let testDB: LobeChatDatabase; vi.mock('@/database/core/db-adaptor', () => ({ @@ -29,6 +34,18 @@ vi.mock('@/server/services/aiChat', () => ({ AiChatService: vi.fn().mockImplementation(() => ({})), })); +vi.mock('@/server/services/deviceGateway', () => ({ + deviceGateway: { + executeToolCall: mockExecuteToolCall, + }, +})); + +vi.mock('@/server/services/sandbox', () => ({ + createSandboxService: vi.fn(() => ({ + callTool: mockSandboxCallTool, + })), +})); + describe('aiAgentRouter.interruptTask', () => { let serverDB: LobeChatDatabase; let userId: string; @@ -43,6 +60,10 @@ describe('aiAgentRouter.interruptTask', () => { userId = await createTestUser(serverDB); mockInterruptOperation.mockReset(); mockInterruptOperation.mockResolvedValue(true); + mockExecuteToolCall.mockReset(); + mockExecuteToolCall.mockResolvedValue({ success: true }); + mockSandboxCallTool.mockReset(); + mockSandboxCallTool.mockResolvedValue({ success: true }); // Create test agent const [agent] = await serverDB @@ -203,6 +224,104 @@ describe('aiAgentRouter.interruptTask', () => { expect(updatedThread.status).toBe(ThreadStatus.Cancel); }); + + it('should dispatch cancelHeteroTask for a device-dispatched codex operation', async () => { + await serverDB + .update(topics) + .set({ + metadata: { + runningOperation: { + assistantMessageId: 'assistant-msg-1', + deviceId: 'device-1', + heteroType: 'codex', + operationId: 'op-codex', + }, + }, + }) + .where(eq(topics.id, testTopicId)); + + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.interruptTask({ + operationId: 'op-codex', + topicId: testTopicId, + }); + + expect(result.success).toBe(true); + expect(mockExecuteToolCall).toHaveBeenCalledWith( + { deviceId: 'device-1', userId }, + { + apiName: 'cancelHeteroTask', + arguments: JSON.stringify({ signal: 'SIGINT', taskId: 'op-codex' }), + identifier: 'cancelHeteroTask', + }, + 5000, + ); + + const [updatedTopic] = await serverDB.select().from(topics).where(eq(topics.id, testTopicId)); + expect(updatedTopic.metadata?.runningOperation?.cancelRequestedAt).toBeDefined(); + }); + + it('should kill the sandbox background command for a sandbox codex operation', async () => { + await serverDB + .update(topics) + .set({ + metadata: { + runningOperation: { + assistantMessageId: 'assistant-msg-1', + heteroType: 'codex', + operationId: 'op-sandbox', + sandboxCommandId: 'cmd-1', + }, + }, + }) + .where(eq(topics.id, testTopicId)); + + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.interruptTask({ + operationId: 'op-sandbox', + topicId: testTopicId, + }); + + expect(result.success).toBe(true); + expect(mockSandboxCallTool).toHaveBeenCalledWith('killCommand', { commandId: 'cmd-1' }); + + const [updatedTopic] = await serverDB.select().from(topics).where(eq(topics.id, testTopicId)); + expect(updatedTopic.metadata?.runningOperation?.cancelRequestedAt).toBeDefined(); + }); + + it('should not cancel a topic runningOperation that belongs to another operation', async () => { + await serverDB + .update(topics) + .set({ + metadata: { + runningOperation: { + assistantMessageId: 'assistant-msg-current', + deviceId: 'device-current', + heteroType: 'codex', + operationId: 'op-current', + sandboxCommandId: 'cmd-current', + }, + }, + }) + .where(eq(topics.id, testTopicId)); + + const caller = aiAgentRouter.createCaller(createTestContext()); + + const result = await caller.interruptTask({ + operationId: 'op-stale', + topicId: testTopicId, + }); + + expect(result.success).toBe(true); + expect(mockExecuteToolCall).not.toHaveBeenCalled(); + expect(mockSandboxCallTool).not.toHaveBeenCalled(); + + const [updatedTopic] = await serverDB.select().from(topics).where(eq(topics.id, testTopicId)); + expect(updatedTopic.metadata?.runningOperation?.cancelRequestedAt).toBeUndefined(); + expect(updatedTopic.metadata?.runningOperation?.operationId).toBe('op-current'); + }); }); describe('interrupt failure handling', () => { diff --git a/apps/server/src/routers/lambda/aiAgent.ts b/apps/server/src/routers/lambda/aiAgent.ts index 69695eff14..4f2a53b52f 100644 --- a/apps/server/src/routers/lambda/aiAgent.ts +++ b/apps/server/src/routers/lambda/aiAgent.ts @@ -329,9 +329,9 @@ const InterruptTaskSchema = z /** Thread ID */ threadId: z.string().optional(), /** - * Topic ID — required to cancel remote hetero tasks (openclaw / hermes). - * When provided and the topic's runningOperation has a deviceId, the server - * will dispatch a cancelHeteroTask tool call to kill the remote process. + * Topic ID — required to cancel hetero work that lives outside the server + * process. When provided, the topic's runningOperation can route cancellation + * to a connected device process or a sandbox background command. */ topicId: z.string().optional(), }) diff --git a/apps/server/src/routers/lambda/topic.ts b/apps/server/src/routers/lambda/topic.ts index 1856562a62..402faba18b 100644 --- a/apps/server/src/routers/lambda/topic.ts +++ b/apps/server/src/routers/lambda/topic.ts @@ -685,6 +685,7 @@ export const topicRouter = router({ runningOperation: z .object({ assistantMessageId: z.string(), + cancelRequestedAt: z.string().optional(), completionWebhook: z .object({ body: z.record(z.unknown()).optional(), @@ -692,7 +693,10 @@ export const topicRouter = router({ url: z.string(), }) .optional(), + deviceId: z.string().optional(), + heteroType: z.string().optional(), operationId: z.string(), + sandboxCommandId: z.string().optional(), scope: z.string().optional(), threadId: z.string().nullable().optional(), }) diff --git a/apps/server/src/services/aiAgent/__tests__/execAgent.heteroFiles.test.ts b/apps/server/src/services/aiAgent/__tests__/execAgent.heteroFiles.test.ts index c760c1d9e0..2fde99b47f 100644 --- a/apps/server/src/services/aiAgent/__tests__/execAgent.heteroFiles.test.ts +++ b/apps/server/src/services/aiAgent/__tests__/execAgent.heteroFiles.test.ts @@ -2,13 +2,17 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; import { AiAgentService } from '../index'; -const { mockMessageCreate, mockResolveAttachmentMetadata, mockSpawnHeteroSandbox } = vi.hoisted( - () => ({ - mockMessageCreate: vi.fn(), - mockResolveAttachmentMetadata: vi.fn(), - mockSpawnHeteroSandbox: vi.fn().mockResolvedValue(undefined), - }), -); +const { + mockMessageCreate, + mockResolveAttachmentMetadata, + mockSandboxCallTool, + mockSpawnHeteroSandbox, +} = vi.hoisted(() => ({ + mockMessageCreate: vi.fn(), + mockResolveAttachmentMetadata: vi.fn(), + mockSandboxCallTool: vi.fn(), + mockSpawnHeteroSandbox: vi.fn().mockResolvedValue({}), +})); vi.mock('@/libs/trusted-client', () => ({ generateTrustedClientToken: vi.fn().mockReturnValue(undefined), @@ -99,6 +103,12 @@ vi.mock('@/server/services/heterogeneousAgent/sandboxRunner', () => ({ spawnHeteroSandbox: mockSpawnHeteroSandbox, })); +vi.mock('@/server/services/sandbox', () => ({ + createSandboxService: vi.fn(() => ({ + callTool: mockSandboxCallTool, + })), +})); + vi.mock('@/server/services/file/resolveAttachments', () => ({ resolveAttachmentMetadata: mockResolveAttachmentMetadata, resolveAttachmentsByFileIds: vi.fn().mockResolvedValue({ @@ -148,7 +158,8 @@ describe('AiAgentService.execAgent - hetero early-exit file attachments', () => topicMock.updateMetadata.mockResolvedValue(undefined); mockMessageCreate.mockResolvedValue({ id: 'msg-1' }); mockResolveAttachmentMetadata.mockResolvedValue([]); - mockSpawnHeteroSandbox.mockResolvedValue(undefined); + mockSandboxCallTool.mockResolvedValue({ success: true }); + mockSpawnHeteroSandbox.mockResolvedValue({}); service = new AiAgentService(mockDb, userId); }); @@ -290,4 +301,48 @@ describe('AiAgentService.execAgent - hetero early-exit file attachments', () => expect(mockResolveAttachmentMetadata).not.toHaveBeenCalled(); }); }); + + describe('sandbox stop race', () => { + it('should kill the sandbox command when stop was requested before commandId is persisted', async () => { + mockSpawnHeteroSandbox.mockResolvedValue({ commandId: 'cmd-delayed' }); + topicMock.findById.mockImplementation(async () => { + const seededRunningOperation = topicMock.updateMetadata.mock.calls.find( + ([, metadata]) => metadata.runningOperation?.operationId, + )?.[1].runningOperation; + + return { + id: 'topic-1', + metadata: { + runningOperation: seededRunningOperation + ? { + ...seededRunningOperation, + cancelRequestedAt: '2026-01-01T00:00:00.000Z', + } + : undefined, + }, + }; + }); + + await service.execAgent({ + agentId: 'agent-1', + prompt: 'Run in sandbox', + }); + + await vi.waitFor(() => { + expect(mockSandboxCallTool).toHaveBeenCalledWith('killCommand', { + commandId: 'cmd-delayed', + }); + }); + + expect(topicMock.updateMetadata).toHaveBeenCalledWith( + 'topic-1', + expect.objectContaining({ + runningOperation: expect.objectContaining({ + cancelRequestedAt: '2026-01-01T00:00:00.000Z', + sandboxCommandId: 'cmd-delayed', + }), + }), + ); + }); + }); }); diff --git a/apps/server/src/services/aiAgent/index.ts b/apps/server/src/services/aiAgent/index.ts index 2ebbdac012..0f37aa60a8 100644 --- a/apps/server/src/services/aiAgent/index.ts +++ b/apps/server/src/services/aiAgent/index.ts @@ -29,6 +29,7 @@ import { buildTaskManagerDefaultsPrompt } from '@lobechat/prompts'; import type { ChatFileItem, ChatTopicBotContext, + ChatTopicMetadata, ChatVideoItem, ExecAgentParams, ExecAgentResult, @@ -106,6 +107,7 @@ import { HeterogeneousAgentService } from '@/server/services/heterogeneousAgent' import type { ConversationHistoryEntry } from '@/server/services/heterogeneousAgent/cloudHeteroContext'; import { KlavisService } from '@/server/services/klavis'; import { MarketService } from '@/server/services/market'; +import { createSandboxService } from '@/server/services/sandbox'; import { markdownToTxt } from '@/utils/markdownToTxt'; import { resolveDeviceAccessPolicy } from './deviceAccessPolicy'; @@ -1040,22 +1042,49 @@ export class AiAgentService { const remoteDeviceId = requestedDeviceId || agentConfig.agencyConfig?.boundDeviceId || undefined; + type RunningOperationMetadata = NonNullable; + const buildRunningOperationMetadata = ( + extra: Partial = {}, + ): RunningOperationMetadata => ({ + assistantMessageId: assistantMsg.id, + completionWebhook: hooks?.find((h) => h.type === 'onComplete')?.webhook, + heteroType, + operationId, + scope: appContext?.scope ?? undefined, + threadId: appContext?.threadId ?? undefined, + ...extra, + }); + const updateRunningOperationMetadata = async ( + extra: Partial, + ): Promise => { + const latestTopic = await this.topicModel.findById(topicId); + const current = latestTopic?.metadata?.runningOperation; + if (current && current.operationId !== operationId) { + log( + 'execAgent: skip runningOperation update for stale op=%s current=%s', + operationId, + current.operationId, + ); + return; + } + const runningOperation = { + ...buildRunningOperationMetadata(), + ...current, + ...extra, + }; + await this.topicModel.updateMetadata(topicId, { + runningOperation, + }); + return runningOperation; + }; // Seed topic.metadata.runningOperation so heteroIngest can validate the operation. // completionWebhook is stored so heteroFinish can call back to the IM bot-callback // endpoint even though the hetero path bypasses the normal hook registration flow. await this.topicModel.updateMetadata(topicId, { - runningOperation: { - assistantMessageId: assistantMsg.id, - completionWebhook: hooks?.find((h) => h.type === 'onComplete')?.webhook, - // Store deviceId + heteroType so interruptTask can cancel remote processes - ...(isRemoteHetero && remoteDeviceId - ? { deviceId: remoteDeviceId, heteroType } - : undefined), - operationId, - scope: appContext?.scope ?? undefined, - threadId: appContext?.threadId ?? undefined, - }, + runningOperation: buildRunningOperationMetadata( + isRemoteHetero && remoteDeviceId ? { deviceId: remoteDeviceId } : {}, + ), }); // Remote hetero agents (openclaw / hermes) dispatch to the device identified @@ -1241,6 +1270,8 @@ export class AiAgentService { userMessageId: userMsg?.id ?? parentMessageId ?? '', }; } + await updateRunningOperationMetadata({ deviceId: dispatchDeviceId }); + // Resolve the working directory for the run: a topic-level override // wins, else the device's user-configured defaultCwd. The device row // lives in the DB (the gateway only knows live connections), so read @@ -1318,9 +1349,24 @@ export class AiAgentService { ...heteroParams, agentType: heteroType as 'claude-code' | 'codex', marketService: this.marketService, - }).catch((err) => { - log('execAgent: hetero sandbox spawn failed: %O', err); - }); + }) + .then(async ({ commandId }) => { + if (!commandId) return; + const runningOperation = await updateRunningOperationMetadata({ + sandboxCommandId: commandId, + }); + if (!runningOperation?.cancelRequestedAt) return; + await createSandboxService({ + marketService: this.marketService, + topicId, + userId: this.userId, + }) + .callTool('killCommand', { commandId }) + .catch((err) => log('execAgent: delayed sandbox killCommand failed: %O', err)); + }) + .catch((err) => { + log('execAgent: hetero sandbox spawn failed: %O', err); + }); } } @@ -3410,31 +3456,50 @@ export class AiAgentService { throw new Error('Operation ID not found'); } - // 2. Cancel remote hetero process (openclaw / hermes) if applicable. - // Check topic.metadata.runningOperation for device + heteroType info seeded by execAgent. + // 2. Cancel hetero processes when the run lives outside the server process. + // Device-dispatched local CLI agents (claude-code / codex) and remote + // platform agents (openclaw / hermes) are killed through the connected + // device. Sandbox-dispatched local CLI agents are killed through sandbox + // command cancellation when the background command id is available. // This runs regardless of whether interruptOperation succeeds — the remote process // is independent of the local operation registry. if (topicId) { const topic = await this.topicModel.findById(topicId); - const runningOp = (topic?.metadata as any)?.runningOperation as - | { deviceId?: string; heteroType?: string; operationId?: string } - | undefined; + const runningOp = topic?.metadata?.runningOperation; - if ( - runningOp?.deviceId && - runningOp.heteroType && - isRemoteHeterogeneousType(runningOp.heteroType) - ) { - const taskId = runningOp.operationId ?? resolvedOperationId; + const runningOperation = + runningOp?.operationId === resolvedOperationId + ? { + ...runningOp, + operationId: resolvedOperationId, + } + : undefined; + + if (runningOp && runningOp.operationId !== resolvedOperationId) { log( - 'interruptTask: cancelling remote hetero process heteroType=%s deviceId=%s taskId=%s', - runningOp.heteroType, - runningOp.deviceId, + 'interruptTask: skip hetero process cancel for stale op=%s current=%s topicId=%s', + resolvedOperationId, + runningOp.operationId, + topicId, + ); + } else if (runningOperation) { + const cancelRequestedAt = runningOperation.cancelRequestedAt ?? new Date().toISOString(); + await this.topicModel.updateMetadata(topicId, { + runningOperation: { ...runningOperation, cancelRequestedAt }, + }); + } + + if (runningOperation?.deviceId && runningOperation.heteroType) { + const taskId = runningOperation.operationId; + log( + 'interruptTask: cancelling hetero device process heteroType=%s deviceId=%s taskId=%s', + runningOperation.heteroType, + runningOperation.deviceId, taskId, ); await deviceGateway .executeToolCall( - { deviceId: runningOp.deviceId, userId: this.userId }, + { deviceId: runningOperation.deviceId, userId: this.userId }, { apiName: 'cancelHeteroTask', arguments: JSON.stringify({ signal: 'SIGINT', taskId }), @@ -3444,6 +3509,21 @@ export class AiAgentService { ) .catch((err) => log('interruptTask: cancelHeteroTask dispatch failed: %O', err)); } + + if (runningOperation?.sandboxCommandId) { + log( + 'interruptTask: cancelling hetero sandbox command commandId=%s topicId=%s', + runningOperation.sandboxCommandId, + topicId, + ); + await createSandboxService({ + marketService: this.marketService, + topicId, + userId: this.userId, + }) + .callTool('killCommand', { commandId: runningOperation.sandboxCommandId }) + .catch((err) => log('interruptTask: sandbox killCommand failed: %O', err)); + } } // 3. Interrupt the runtime operation first. Only mark the thread cancelled diff --git a/apps/server/src/services/heterogeneousAgent/sandboxRunner.ts b/apps/server/src/services/heterogeneousAgent/sandboxRunner.ts index 536eec9168..4a9b8fa835 100644 --- a/apps/server/src/services/heterogeneousAgent/sandboxRunner.ts +++ b/apps/server/src/services/heterogeneousAgent/sandboxRunner.ts @@ -47,6 +47,10 @@ export interface SandboxRunParams { userId: string; } +export interface SandboxRunResult { + commandId?: string; +} + /** * Derive the local directory name from a repo identifier. * Accepts "owner/repo", "https://github.com/owner/repo", or "https://github.com/owner/repo.git". @@ -121,7 +125,7 @@ function buildRepoSetupScript(repos: string[], githubToken?: string): string | n * Fire-and-forget: the caller does NOT await this — the sandbox pushes events * back to the server via `heteroIngest` tRPC batches independently. */ -export async function spawnHeteroSandbox(params: SandboxRunParams): Promise { +export async function spawnHeteroSandbox(params: SandboxRunParams): Promise { const { agentType, assistantMessageId, @@ -215,4 +219,16 @@ export async function spawnHeteroSandbox(params: SandboxRunParams): Promise).commandId || + (resultData as Record).shell_id || + '', + ) || undefined + : undefined; + + return { commandId }; } diff --git a/packages/types/src/task/index.ts b/packages/types/src/task/index.ts index 602a39e1ad..c0924b3e1b 100644 --- a/packages/types/src/task/index.ts +++ b/packages/types/src/task/index.ts @@ -280,7 +280,11 @@ export interface TaskDetailActivity { */ runningOperation?: { assistantMessageId: string; + cancelRequestedAt?: string; + deviceId?: string; + heteroType?: string; operationId: string; + sandboxCommandId?: string; scope?: string; threadId?: string | null; } | null; diff --git a/packages/types/src/topic/topic.ts b/packages/types/src/topic/topic.ts index e8f604591d..221adacc7e 100644 --- a/packages/types/src/topic/topic.ts +++ b/packages/types/src/topic/topic.ts @@ -155,6 +155,11 @@ export interface ChatTopicMetadata { */ runningOperation?: { assistantMessageId: string; + /** + * Set when a stop request arrives before all out-of-process handles + * (for example sandbox command id) are available. + */ + cancelRequestedAt?: string; /** * Webhook to fire when the operation completes. * Populated by the IM bot path so heterogeneous agents (Claude Code / Codex) @@ -166,7 +171,19 @@ export interface ChatTopicMetadata { delivery?: 'fetch' | 'qstash'; url: string; }; + /** + * Device id for hetero runs dispatched through the device gateway. + */ + deviceId?: string; + /** + * Heterogeneous runtime type, used to route cancellation. + */ + heteroType?: string; operationId: string; + /** + * Background command id for sandbox-dispatched hetero runs. + */ + sandboxCommandId?: string; scope?: string; threadId?: string | null; } | null;