mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-14 03:30:19 +00:00
🐛 fix(cli): handle agent_run_request in lh connect so device dispatch doesn't time out (#15634)
* 🐛 fix(cli): handle agent_run_request in `lh connect` so device dispatch doesn't time out `lh connect` auto-registers the CLI as a device, so the gateway can pick it as the dispatch target for a heterogeneous agent run (`agent_run_request`). But the connect daemon only listened for `system_info_request` and `tool_call_request` — it never handled `agent_run_request`, so it never sent `agent_run_ack`. The gateway waited out its ack window and returned `{error:'TIMEOUT',success:false}`, surfaced server-side as "Hetero agent device dispatch failed". Add an `agent_run_request` handler mirroring the desktop app: spawn `lh hetero exec` fire-and-forget and ack `accepted` immediately. The spawned process owns the full execution + server-ingest pipeline. It re-invokes the current CLI entry (process.execPath + argv[1]) rather than relying on `lh` being on PATH, so it works inside the detached daemon. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> * fix: bump the cli version * chore: bump the cli manifest * 🐛 fix(cli): ack agent run only after spawn succeeds, reject on spawn error `child_process.spawn` reports a missing/inaccessible cwd asynchronously via the child's `error` event, after the handler had already sent an `accepted` ack. The gateway/server then recorded dispatch success while no `lh hetero exec` process existed to emit `heteroFinish`, leaving the assistant message stuck instead of surfacing a failure. `spawnHeteroAgentRun` now resolves on the child's outcome: `accepted` on the `spawn` event (stdin is written only then), `rejected` on an early `error`. A rejected ack returns the gateway 422 → execAgent writes a ServerAgentRuntimeError onto the assistant message, so a failed dispatch is visible. Still resolves in milliseconds, well within the gateway's 10s ack window. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
.\" Code generated by `npm run man:generate`; DO NOT EDIT.
|
||||
.\" Manual command details come from the Commander command tree.
|
||||
.TH LH 1 "" "@lobehub/cli 0.0.27" "User Commands"
|
||||
.TH LH 1 "" "@lobehub/cli 0.0.29" "User Commands"
|
||||
.SH NAME
|
||||
lh \- LobeHub CLI \- manage and connect to LobeHub services
|
||||
.SH SYNOPSIS
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "@lobehub/cli",
|
||||
"version": "0.0.27",
|
||||
"version": "0.0.29",
|
||||
"type": "module",
|
||||
"bin": {
|
||||
"lh": "./dist/index.js",
|
||||
|
||||
@@ -3,6 +3,7 @@ import os from 'node:os';
|
||||
import path from 'node:path';
|
||||
|
||||
import type {
|
||||
AgentRunRequestMessage,
|
||||
DeviceSystemInfo,
|
||||
SystemInfoRequestMessage,
|
||||
ToolCallRequestMessage,
|
||||
@@ -25,6 +26,7 @@ import {
|
||||
stopDaemon,
|
||||
writeStatus,
|
||||
} from '../daemon/manager';
|
||||
import { spawnHeteroAgentRun } from '../device/agentRun';
|
||||
import { registerDevice, resolveDeviceIdentity } from '../device/register';
|
||||
import { loadOrCreateConnectionId, loadSettings, normalizeUrl, saveSettings } from '../settings';
|
||||
import { executeToolCall } from '../tools';
|
||||
@@ -286,6 +288,38 @@ async function runConnect(options: ConnectOptions, isDaemonChild: boolean) {
|
||||
});
|
||||
});
|
||||
|
||||
// Handle gateway-dispatched agent runs (heterogeneous agents, e.g. Claude
|
||||
// Code). Mirrors the desktop app: spawn `lh hetero exec`, which owns the full
|
||||
// execution + server-ingest pipeline. Ack with the spawn outcome — `accepted`
|
||||
// once the child starts, `rejected` if it fails to spawn (e.g. bad cwd) — so
|
||||
// a failed dispatch surfaces as an error instead of a stuck assistant message.
|
||||
client.on('agent_run_request', async (request: AgentRunRequestMessage) => {
|
||||
info(
|
||||
`Received agent_run_request: operationId=${request.operationId} type=${request.agentType}`,
|
||||
);
|
||||
try {
|
||||
const ack = await spawnHeteroAgentRun(
|
||||
{
|
||||
agentType: request.agentType,
|
||||
cwd: request.cwd,
|
||||
jwt: request.jwt,
|
||||
operationId: request.operationId,
|
||||
prompt: request.prompt,
|
||||
resumeSessionId: request.resumeSessionId,
|
||||
serverUrl: auth.serverUrl,
|
||||
systemContext: request.systemContext,
|
||||
topicId: request.topicId,
|
||||
},
|
||||
{ error, info },
|
||||
);
|
||||
client.sendAgentRunAck({ operationId: request.operationId, ...ack });
|
||||
} catch (err) {
|
||||
const reason = err instanceof Error ? err.message : String(err);
|
||||
error(`agent_run_request failed: ${reason}`);
|
||||
client.sendAgentRunAck({ operationId: request.operationId, reason, status: 'rejected' });
|
||||
}
|
||||
});
|
||||
|
||||
client.on('connected', () => {
|
||||
updateStatus('connected');
|
||||
});
|
||||
|
||||
@@ -0,0 +1,125 @@
|
||||
import { EventEmitter } from 'node:events';
|
||||
|
||||
import { afterEach, describe, expect, it, vi } from 'vitest';
|
||||
|
||||
import { spawnHeteroAgentRun } from './agentRun';
|
||||
|
||||
const { spawnMock } = vi.hoisted(() => ({ spawnMock: vi.fn() }));
|
||||
|
||||
vi.mock('node:child_process', () => ({ spawn: spawnMock }));
|
||||
|
||||
const makeFakeChild = () => {
|
||||
const child = new EventEmitter() as EventEmitter & {
|
||||
stdin: { end: ReturnType<typeof vi.fn>; write: ReturnType<typeof vi.fn> };
|
||||
};
|
||||
child.stdin = { end: vi.fn(), write: vi.fn() };
|
||||
return child;
|
||||
};
|
||||
|
||||
const baseParams = {
|
||||
agentType: 'claudeCode',
|
||||
jwt: 'jwt',
|
||||
operationId: 'op',
|
||||
prompt: 'hi',
|
||||
serverUrl: 'https://app.lobehub.com',
|
||||
topicId: 'tpc',
|
||||
};
|
||||
|
||||
describe('spawnHeteroAgentRun', () => {
|
||||
afterEach(() => {
|
||||
spawnMock.mockReset();
|
||||
});
|
||||
|
||||
it('spawns `lh hetero exec` in server-ingest mode via the current CLI entry', async () => {
|
||||
const child = makeFakeChild();
|
||||
spawnMock.mockReturnValue(child);
|
||||
|
||||
const ackPromise = spawnHeteroAgentRun({
|
||||
...baseParams,
|
||||
cwd: '/work/dir',
|
||||
jwt: 'jwt-token',
|
||||
operationId: 'op-1',
|
||||
topicId: 'tpc-1',
|
||||
});
|
||||
|
||||
expect(spawnMock).toHaveBeenCalledTimes(1);
|
||||
const [bin, args, opts] = spawnMock.mock.calls[0];
|
||||
|
||||
expect(bin).toBe(process.execPath);
|
||||
expect(args).toEqual([
|
||||
...process.execArgv,
|
||||
process.argv[1],
|
||||
'hetero',
|
||||
'exec',
|
||||
'--type',
|
||||
'claudeCode',
|
||||
'--operation-id',
|
||||
'op-1',
|
||||
'--topic',
|
||||
'tpc-1',
|
||||
'--render',
|
||||
'none',
|
||||
'--input-json',
|
||||
'-',
|
||||
'--cwd',
|
||||
'/work/dir',
|
||||
]);
|
||||
expect(opts).toMatchObject({
|
||||
cwd: '/work/dir',
|
||||
env: expect.objectContaining({
|
||||
LOBEHUB_JWT: 'jwt-token',
|
||||
LOBEHUB_SERVER: 'https://app.lobehub.com',
|
||||
}),
|
||||
});
|
||||
|
||||
// stdin is only written after the child actually spawns.
|
||||
expect(child.stdin.write).not.toHaveBeenCalled();
|
||||
child.emit('spawn');
|
||||
|
||||
await expect(ackPromise).resolves.toEqual({ status: 'accepted' });
|
||||
expect(child.stdin.write).toHaveBeenCalledWith(JSON.stringify('hi'));
|
||||
expect(child.stdin.end).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('rejects (no stuck run) when the child errors before spawning, e.g. bad cwd', async () => {
|
||||
const child = makeFakeChild();
|
||||
spawnMock.mockReturnValue(child);
|
||||
|
||||
const ackPromise = spawnHeteroAgentRun({ ...baseParams, cwd: '/missing' });
|
||||
child.emit('error', new Error('spawn ENOENT'));
|
||||
|
||||
await expect(ackPromise).resolves.toEqual({ reason: 'spawn ENOENT', status: 'rejected' });
|
||||
expect(child.stdin.write).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('appends --resume when resuming a session', () => {
|
||||
const child = makeFakeChild();
|
||||
spawnMock.mockReturnValue(child);
|
||||
|
||||
void spawnHeteroAgentRun({ ...baseParams, resumeSessionId: 'sess-9' });
|
||||
|
||||
const [, args] = spawnMock.mock.calls[0];
|
||||
expect(args).toContain('--resume');
|
||||
expect(args).toContain('sess-9');
|
||||
});
|
||||
|
||||
it('sends a content-block array to stdin when systemContext is provided', async () => {
|
||||
const child = makeFakeChild();
|
||||
spawnMock.mockReturnValue(child);
|
||||
|
||||
const ackPromise = spawnHeteroAgentRun({
|
||||
...baseParams,
|
||||
prompt: 'do it',
|
||||
systemContext: 'workspace rules',
|
||||
});
|
||||
child.emit('spawn');
|
||||
await ackPromise;
|
||||
|
||||
expect(child.stdin.write).toHaveBeenCalledWith(
|
||||
JSON.stringify([
|
||||
{ text: 'workspace rules', type: 'text' },
|
||||
{ text: 'do it', type: 'text' },
|
||||
]),
|
||||
);
|
||||
});
|
||||
});
|
||||
@@ -0,0 +1,130 @@
|
||||
import { spawn } from 'node:child_process';
|
||||
|
||||
export interface SpawnHeteroAgentRunParams {
|
||||
agentType: string;
|
||||
cwd?: string;
|
||||
jwt: string;
|
||||
operationId: string;
|
||||
prompt: string;
|
||||
resumeSessionId?: string;
|
||||
serverUrl: string;
|
||||
systemContext?: string;
|
||||
topicId: string;
|
||||
}
|
||||
|
||||
export interface AgentRunAckResult {
|
||||
reason?: string;
|
||||
status: 'accepted' | 'rejected';
|
||||
}
|
||||
|
||||
interface SpawnHeteroAgentRunLogger {
|
||||
error?: (msg: string) => void;
|
||||
info?: (msg: string) => void;
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn `lh hetero exec` for a gateway-dispatched agent run. Mirrors the
|
||||
* desktop app's `spawnLhHeteroExec`: the spawned CLI owns the full pipeline
|
||||
* (spawn -> adapt -> BatchIngester -> server ingest), so the connect daemon
|
||||
* needs no local stream handling — it only kicks off the process.
|
||||
*
|
||||
* Re-invokes the current CLI entry (`process.execPath` + `process.argv[1]`)
|
||||
* instead of relying on `lh` being on `PATH`, so it also works inside the
|
||||
* detached `lh connect --daemon` child where `PATH` may be minimal.
|
||||
*
|
||||
* Resolves only once the child's outcome is known: `accepted` on the `spawn`
|
||||
* event, `rejected` on an early `error`. `spawn()` reports failures (missing or
|
||||
* inaccessible `cwd`, etc.) asynchronously via `error`, so acking eagerly would
|
||||
* report a false success and leave the run with no process to emit
|
||||
* `heteroFinish` — surfacing as a stuck assistant message. A rejected ack
|
||||
* instead flows back as a dispatch failure the user can see.
|
||||
*/
|
||||
export function spawnHeteroAgentRun(
|
||||
params: SpawnHeteroAgentRunParams,
|
||||
logger?: SpawnHeteroAgentRunLogger,
|
||||
): Promise<AgentRunAckResult> {
|
||||
const {
|
||||
agentType,
|
||||
cwd,
|
||||
jwt,
|
||||
operationId,
|
||||
prompt,
|
||||
resumeSessionId,
|
||||
serverUrl,
|
||||
systemContext,
|
||||
topicId,
|
||||
} = params;
|
||||
const workDir = cwd ?? process.cwd();
|
||||
|
||||
// Server-ingest mode (--topic + --operation-id): events are batch-POSTed to
|
||||
// the server, not rendered. `--input-json -` reads the prompt from stdin.
|
||||
const cliArgs = [
|
||||
process.argv[1],
|
||||
'hetero',
|
||||
'exec',
|
||||
'--type',
|
||||
agentType,
|
||||
'--operation-id',
|
||||
operationId,
|
||||
'--topic',
|
||||
topicId,
|
||||
'--render',
|
||||
'none',
|
||||
'--input-json',
|
||||
'-',
|
||||
'--cwd',
|
||||
workDir,
|
||||
...(resumeSessionId ? ['--resume', resumeSessionId] : []),
|
||||
];
|
||||
|
||||
// With systemContext, send a content-block array so the agent sees the
|
||||
// context block first, then the user's actual prompt — mirrors the desktop
|
||||
// path. `lh hetero exec` coerces both shapes via coerceJsonPrompt.
|
||||
const stdinPayload = systemContext
|
||||
? JSON.stringify([
|
||||
{ text: systemContext, type: 'text' },
|
||||
{ text: prompt, type: 'text' },
|
||||
])
|
||||
: JSON.stringify(prompt);
|
||||
|
||||
return new Promise<AgentRunAckResult>((resolve) => {
|
||||
let settled = false;
|
||||
const settle = (result: AgentRunAckResult) => {
|
||||
if (settled) return;
|
||||
settled = true;
|
||||
resolve(result);
|
||||
};
|
||||
|
||||
const child = spawn(process.execPath, [...process.execArgv, ...cliArgs], {
|
||||
cwd: workDir,
|
||||
env: {
|
||||
...process.env,
|
||||
LOBEHUB_JWT: jwt,
|
||||
LOBEHUB_SERVER: serverUrl,
|
||||
},
|
||||
stdio: ['pipe', 'inherit', 'inherit'],
|
||||
});
|
||||
|
||||
child.once('spawn', () => {
|
||||
// Only safe to write stdin once the process actually started.
|
||||
try {
|
||||
child.stdin?.write(stdinPayload);
|
||||
child.stdin?.end();
|
||||
} catch (err) {
|
||||
logger?.error?.(
|
||||
`hetero exec stdin write failed (op=${operationId}): ${(err as Error).message}`,
|
||||
);
|
||||
}
|
||||
settle({ status: 'accepted' });
|
||||
});
|
||||
|
||||
child.once('error', (err) => {
|
||||
logger?.error?.(`hetero exec spawn failed (op=${operationId}): ${err.message}`);
|
||||
settle({ reason: err.message, status: 'rejected' });
|
||||
});
|
||||
|
||||
child.on('exit', (code, signal) => {
|
||||
logger?.info?.(`hetero exec exited (op=${operationId}) code=${code} signal=${signal}`);
|
||||
});
|
||||
});
|
||||
}
|
||||
Reference in New Issue
Block a user