From 5f24d179d4a3a5b27c9cdebe750bdb7d4e42b092 Mon Sep 17 00:00:00 2001 From: Arvin Xu Date: Mon, 11 May 2026 02:16:24 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(hetero-agent):=20support=20Ask?= =?UTF-8?q?UserQuestion=20tools=20for=20claude=20code=20(#14639)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ✨ feat(hetero-agent): AskUserQuestion MCP server + bridge skeleton (LOBE-8725 step 1+2) Foundation for LOBE-8725 — interactive AskUserQuestion via local MCP. CC's built-in tool short-circuits in `-p` mode, so we host an in-process MCP server that exposes an equivalent `ask_user_question` tool. The handler blocks until the consumer submits an answer (or the 5min deadline / op shutdown fires), surfacing a structured `agent_intervention_request` / `agent_intervention_response` round-trip on the existing event stream. Added in this commit: - `packages/heterogeneous-agents/src/askUser/` - `AskUserBridge` — per-op pending map with timeout / cancel / progress keepalive support; emits an async-iterable of outbound events - `AskUserMcpServer` — process-wide HTTP/Streamable MCP server, `?op=` query routes via `AsyncLocalStorage` → `onsessioninitialized` → sessionId↔opId map; tool handler hands off to the matching bridge and pumps `notifications/progress` back to CC every 30s as wire-level keepalive (required for >5min waits, see spike notes) - `constants.ts` — shared tool/server names + the stable `apiName` the adapter rewrites to - Unit tests cover bridge lifecycle (resolve / cancel / timeout / progress / event stream) and an end-to-end MCP probe via `StreamableHTTPClientTransport` - `packages/agent-gateway-client/src/types.ts` — wire-level `agent_intervention_request` / `agent_intervention_response` event variants + payload interfaces. Re-exported through the package barrel. - `packages/heterogeneous-agents/src/adapters/claudeCode.ts` — when CC's `tool_use` carries `mcp__lobe_cc__ask_user_question`, the adapter rewrites `apiName` to `askUserQuestion` so the renderer routes on a clean domain key. Identifier stays `claude-code`. Applied to both the main-agent and subagent paths for symmetry (subagent ask isn't expected today, but doesn't hurt). - `src/server/routers/lambda/aiAgent.ts` — Zod input schema for `aiAgent.heteroIngest` extended with the two new event types so the CLI sandbox can forward them through the server. No producer wiring yet — Steps 3-5 plug this into Electron main, the renderer executor, and the new UI. * ✨ feat(hetero-agent): wire AskUserQuestion MCP into Electron CC driver (LOBE-8725 step 3) Plug the Step 1 skeleton (`AskUserMcpServer` + `AskUserBridge`) into the desktop Claude Code spawn path. CC's local MCP `ask_user_question` tool now goes live during real prompts; renderer-submitted answers route back via new IPC. Changes - `apps/desktop/src/main/modules/heterogeneousAgent/types.ts` — add optional `mcpConfigPath` to `HeterogeneousAgentBuildPlanParams` so controller-managed temp configs flow into the driver. - `apps/desktop/src/main/modules/heterogeneousAgent/drivers/claudeCode.ts` — append `--mcp-config ` when provided. Disallowed-tools pin stays so CC's built-in AskUserQuestion remains off (avoids double- registration of the same tool name). - `apps/desktop/src/main/controllers/HeterogeneousAgentCtr.ts` - Lazy-singleton `AskUserMcpServer` started on first claude-code prompt (de-duped concurrent first-callers via in-flight promise). - Per-op `setupInterventionForOp(opId, sessionId)`: registers an `AskUserBridge`, writes `os.tmpdir()/lobe-cc-mcp-.json` with `alwaysLoad: true` so CC eager-loads the tool (1-hop call, no ToolSearch detour — see LOBE-8725 spike), pumps `bridge.events()` into the existing `heteroAgentEvent` broadcast. - Cleanup paths: exit handler `await intervention.cleanup()` settles pending MCP handlers + unlinks the temp config; pre-spawn errors short-circuit the same cleanup so we don't leak bridges on `buildSpawnPlan` / trace-session failures. - `before-quit` stops the MCP server (in addition to killing CC processes). - New `@IpcMethod() submitIntervention({ operationId, toolCallId, result?, cancelled?, cancelReason? })` — renderer side will dispatch answers / cancellations through this in Step 4/5. - codex unchanged — bridge setup is gated on `agentType === 'claude-code'`. - `src/services/electron/heterogeneousAgent.ts` — renderer-side proxy for `submitIntervention`. - New `claudeCode.test.ts` covers the four driver-arg paths (`--mcp-config` presence, ordering vs `--resume`, AskUserQuestion stay disallowed). Existing 28 controller tests still pass. What still doesn't run end-to-end - The renderer `heteroExecutor` doesn't consume `agent_intervention_request` yet — events go through the broadcast but the chat store ignores them. - No UI to render the intervention card or to call `submitIntervention`. Both lands in Steps 4/5 next. * ✨ feat(hetero-agent): correlate intervention with tool message + renderer handler (LOBE-8725 step 3.5+4) Bridge now uses the caller-supplied toolCallId (CC's `claudecode/toolUseId` from MCP `_meta`) instead of a random UUID, so the `agent_intervention_request` event references the same id as the existing tool message on the renderer side. Renderer-side `heteroExecutor` learns the new event: - Added `persistInterventionRequest(...)` next to `persistToolResult` — stamps `pluginState.askUserQuestion` (apiName + identifier + questions parsed from `arguments` + deadline + status='pending' + toolCallId) onto the matching tool message via `messageService.updateToolMessage`. - New branch in `handleStreamEvent` for `'agent_intervention_request'`: defers behind `persistQueue` (so it lands AFTER `persistToolBatch` populates `toolMsgIdByCallId`), then mirrors the same pluginState onto the in-memory message via `internal_dispatchMessage` so the UI lights up immediately — no fetchAndReplaceMessages round-trip needed. - The eventual `tool_result` for the same toolCallId hits the existing `tool_result` branch unchanged: it overwrites `pluginState` with whatever the result carries (typically undefined for our MCP tool, so `pluginState.askUserQuestion` clears and the intervention UI yields to the regular Render). Bridge tests cover the new contract: - caller-supplied toolCallId becomes the wire correlation key - duplicate-toolCallId pendings reject loudly so two-handler clobbers surface immediately 153 package tests + 1167 desktop main tests + 51 hetero executor tests still green; type-check clean. * ✨ feat(claude-code): AskUserQuestion intervention render component (LOBE-8725 step 5) Dedicated Render for the synthetic `askUserQuestion` apiName the adapter rewrites the local MCP `mcp__lobe_cc__ask_user_question` tool to. Lives under CC's render registry so the existing chat tool-detail flow picks it up automatically — no changes to the conversation framework. - New `AskUserQuestionItem` / `AskUserQuestionArgs` / `AskUserQuestionPluginState` types (mirrors CC's own AskUserQuestion schema verbatim). - `ClaudeCodeApiName` gains an `AskUserQuestion = 'askUserQuestion'` member so the renders / inspectors / streamings registries can key off the same enum value. - `client/Render/AskUserQuestion/index.tsx` is the component: - `pluginState.askUserQuestion?.status === 'pending'` → renders the questions form (Select for single-select, CheckboxGroup for multi-select), a 5-min countdown ticking once a second, Submit / Skip buttons. Reads `operationId` via `messageOperationMap` so we can route through `heterogeneousAgentService.submitIntervention`. - Otherwise → renders the questions as muted captions plus the final answer text from `content`. Surfaces a warning when the tool_result was an error (timeout / cancelled / session ended). - Submit button stays disabled until every question has a selection; Skip always enabled (sends `cancelled: true`). - `ClaudeCodeRenders[ClaudeCodeApiName.AskUserQuestion]` registers the new component. What this does NOT do - Doesn't touch `BuiltinToolInterventions` — the form is rendered inside the regular tool body (Render slot), not the canonical intervention slot. Cleanest for now: the framework intervention flow assumes `submitToolInteraction` store actions, which would fight our IPC path. We can refactor onto that surface later if CC grows additional interactions (approval, file picker). - Doesn't translate strings — i18n in a follow-up. Type-check clean. Step 6 (real desktop e2e via CC) is next. * ✨ feat(claude-code): render AskUserQuestion form during pending state (LOBE-8725 step 5 follow-up) Step 5 registered the Render component but stopped at the registry — the chat tool-detail still returned the loading placeholder while `isToolCalling` was true, so users only ever saw a spinner during the 5 min intervention window. Detect `pluginState.askUserQuestion?.status === 'pending'` (only set on CC + apiName=askUserQuestion tool messages) and route to the registered builtin Render inline before the placeholder branch. Once the intervention resolves, the eventual `tool_result` clears `pluginState.askUserQuestion` and the regular Render takes over. Co-Authored-By: Claude Opus 4.7 (1M context) * ✨ feat(hetero-agent): wire regenerate / continue for hetero runtime (LOBE-8519 follow-up) LOBE-8519 left two TODOs in `generationSlice` where hetero runtime silently fell through to client mode — regenerate would secretly hit the agent's underlying LLM, and continue would synthesize a fake "please continue" turn that confuses CC / Codex. - regenerateMessage: re-create the assistant row branched off the same user message, resolve resume sessionId (drop on cwd mismatch), then spawn a child `execHeterogeneousAgent` op so Stop only kills the executor, not the parent regenerate op. Mirrors sendMessage's hetero branch. - continueGenerationMessage: hetero CLIs have no continue primitive — each prompt is a fresh user turn — so bail out instead of polluting the session. - continueGenerationMessage: gateway mode now branches a server-side resume run instead of falling through to client. Surfaced while testing CC AskUserQuestion end-to-end on the LOBE-8725 branch (regenerating after an answered question went through the wrong runtime). Co-Authored-By: Claude Opus 4.7 (1M context) * 🐛 fix(local-testing): electron-dev.sh boots on macOS bash 3.2 Two bugs surfaced when invoking the local-testing helper from a fresh session on macOS: - `find_project_pids` / `do_stop` end with `grep -v '^$'` whose exit code propagates through `pipefail`. With `set -e`, an empty pid set silently kills the whole script — `do_start` reported success, no Electron, no error. Trail with `|| true`. - `setsid` is GNU coreutils, not on macOS. Fall back to plain `bash -c`; process-tree teardown still works because `expand_descendants` walks the tree directly. Co-Authored-By: Claude Opus 4.7 (1M context) * 🐛 fix(hetero-agent): per-session MCP transport for sequential ops (LOBE-8725) `AskUserMcpServer` shared a single `StreamableHTTPServerTransport` across every CC subprocess. The SDK transport latches `_initialized=true` after the first `initialize`, so the second op's CC subprocess sees `Invalid Request: Server already initialized` (400) and reports the `lobe_cc` server as `failed`. From the model's POV the MCP tool is absent — it falls back to ToolSearch, can't find anything, and verbalizes the question instead. Refactor to the canonical multi-tenant pattern: one transport + one `McpServer` per session, looked up by the SDK-managed `mcp-session-id` header. New transports are minted on the first POST without a session id (must be an `initialize` request); subsequent requests route via the stored map; `onsessionclosed` cleans up. The first run of any process still works as before — this only matters once a second op spins up. Added a 3-op sequential regression test that fails on the old single-transport implementation and passes now. Co-Authored-By: Claude Opus 4.7 (1M context) * ♻️ refactor(claude-code): move AskUserQuestion onto canonical Intervention surface (LOBE-8725) Step 5's first cut shoehorned the pending form into the Render slot and drove submit/skip with a custom `pluginState.askUserQuestion.status` field, which forced three layers of glue: - `Tool/Detail` had to bypass the loading placeholder via an identifier+apiName hardcode so the form would surface during `isToolCalling` - The executor had to `messageService.getMessages → replaceMessages` after `agent_intervention_request` to drag the freshly-created tool row into in-memory state (the framework's own `tool_end → fetchAndReplaceMessages` only fires after the user answers) - The executor also had to `associateMessageWithOperation` for the tool row so the form could look up the running CC op for IPC All three were patches around skipping the canonical surface. This commit moves AskUserQuestion onto `pluginIntervention.status='pending'` and the `BuiltinToolInterventions` registry, which the framework already drives end-to-end: - `packages/builtin-tool-claude-code/src/client/Intervention/AskUserQuestion.tsx` — pure form, no IPC, no store reads. Resolves through the standard `onInteractionAction({type:'submit'|'skip'|'cancel'})` callback. - `Render/AskUserQuestion` shrinks to the answered/aborted view only; the framework hides Render while pending, so no status switching. - New `Inspector/AskUserQuestion` shows a compact "askUserQuestion · {header}" chip in the inline tool body, matching the rest of CC's tools. - Registries: `ClaudeCodeInspectors`, `ClaudeCodeRenders`, and the new `ClaudeCodeInterventions` all key off `ClaudeCodeApiName.AskUserQuestion`; `BuiltinToolInterventions` gains a `[ClaudeCodeIdentifier]` entry. Hetero needs a different action handler than `submitToolInteraction` (which spawns `executeClientAgent` — wrong for a CC subprocess that's already blocked on an MCP call). Two thin pieces wire that: - `submitHeteroIntervention` (chat store) — sets `pluginIntervention` via `optimisticUpdateMessagePlugin` (which already syncs DB + in-memory + parent-assistant `tools[].intervention` in one shot), then forwards the answer through `heterogeneousAgentService.submitIntervention` IPC. Operation lookup walks the tool message's `parentId` to hit the assistant's `messageOperationMap` entry — drops the explicit `associateMessageWithOperation` call from the executor. - `customInteractionHandlers.isHeteroInteractionIdentifier` flags `ClaudeCodeIdentifier`; `Tool/Detail/Intervention` short-circuits there before reaching the existing `submitToolInteraction` path. Executor change collapses to one line: `optimisticUpdateMessagePlugin(toolMsgId, { intervention: { status: 'pending' } })`. The post-intervention refresh, the associate call, and the `persistInterventionRequest` helper all go away. Removed: - `AskUserQuestionPluginState` type (custom field is gone) - `Tool/Detail` `askUserPending` inline-render branch - Executor `messageService.getMessages + replaceMessages` round-trip - Executor `associateMessageWithOperation` for tool rows - `persistInterventionRequest` helper Verified end-to-end against a real CC subprocess on desktop: - Inline body shows the new Inspector chip; pending form lives in the bottom InterventionBar (canonical surface) - Submit ships answer through MCP, CC continues with structured result - Skip flips status to `rejected`, framework's RejectedResponse shows "User skipped"; CC receives isError and falls back to text - `mcp_servers.lobe_cc.status === 'connected'` on a 3rd sequential op (the per-session transport fix from the previous commit) - `alwaysLoad: true` still produces 1-hop calls (no ToolSearch hop) Co-Authored-By: Claude Opus 4.7 (1M context) * 💄 style(claude-code): inline numbered option cards for AskUserQuestion intervention (LOBE-8725) Select dropdown was the wrong primitive — it hides options behind an extra click and doesn't read like a question to answer. CC's underlying tool is 1-4 questions × 2-4 options, so the whole option set always fits inline. - Each option renders as a clickable card: numbered chip (1/2/3/4) + bold label + secondary description on a single row. Hover tints the background; selected state lights up `colorPrimary` on both the chip and the card outline so the pick is unmistakable at a glance. - Multi-select (`q.multiSelect`) toggles instead of replacing, with a "(multi-select)" hint in the question header. - Multi-question support gets a proper visual hierarchy: each question past the first sits below a dashed divider, headed by a `Q1/N` tag + the original `q.header` chip. The `Q*/N` lets the user track progress without counting. - Inspector picks up the question count too: now shows "askUserQuestion · {first header} +N" when multiple are queued. Verified end-to-end on desktop with a CC-driven 2-question prompt (4-option + 3-option). Both selections feed back to CC as a single "User answers" payload, CC echoes both picks in its continuation. Co-Authored-By: Claude Opus 4.7 (1M context) * ✨ feat(claude-code): tabbed multi-question + draft + timeout fallback for AskUserQuestion (LOBE-8725) - Multi-question forms now use a top tab strip; single question renders inline. - Picking a single-select option auto-advances to the next unanswered question. - Drafts persist to tool message `pluginState.askUserDraft` so picks survive remount / HMR; new `setInterventionDraft` action on the chat store dispatches the pluginState patch. - Timeout fallback: when the 5-min countdown expires, auto-submit option 1 for every unanswered question instead of letting the bridge time out into a cancelled isError — model gets a structured answer it can act on. - Visual: selected option now uses filled `colorPrimaryBg` + right-aligned check icon; index chip stays neutral. Co-Authored-By: Claude Opus 4.7 (1M context) * 🐛 fix(hetero-agent): synchronously unlink temp mcp.json on app quit (LOBE-8725) The async exit-handler cleanup raced Electron's main-process teardown and left `lobe-cc-mcp-.json` files in `os.tmpdir()` after every quit. Sync unlink in the quit hook is the only reliable guarantee. Also handle SIGTERM / SIGINT — `before-quit` only fires on user-driven Cmd+Q or `app.quit()`, not on external kills (test harness, OS shutdown). Verified by manual test: pending askUserQuestion forms now leave zero residue after both Cmd+Q and SIGTERM paths. Co-Authored-By: Claude Opus 4.7 (1M context) * ✨ feat(claude-code): persist structured AskUserQuestion answers + Q&A render (LOBE-8725) Submit now writes the structured `{ questionText: pickedLabel(s) }` payload to the tool message's `pluginState.askUserAnswers` (in-memory + DB merge), so Render no longer has to scrape the bridge's prose `User answers:` content. Render shows one Q&A block per question — header + question + a checkmark card per picked option (multi-select fans out into multiple rows). Falls back to a `—` placeholder when answers are missing (older messages or skipped flows), and keeps the existing `pluginError` warning for cancel / no-answer paths. Also surfaces the answers in the Skill state inspector tab, which was previously empty for completed askUserQuestion messages. Co-Authored-By: Claude Opus 4.7 (1M context) * ✅ test(hetero-agent): cover synchronous quit cleanup of AskUserQuestion temp configs (LOBE-8725) Locks down the regression fixed in c0de0cdb7c — async exit-handler cleanup losing to Electron's main-process teardown. Four cases: `before-quit` (Cmd+Q / `app.quit()` path), `SIGTERM` (test harness / OS shutdown), `SIGINT` (Ctrl-C), and idempotency (already-deleted temp file must not throw on the second pass). `process.on` and `process.exit` are stubbed in the signal-path tests so the controller's listener attaches to a spy, not the test runner's process — otherwise we'd leak a real SIGTERM listener every test. Co-Authored-By: Claude Opus 4.7 (1M context) --------- Co-authored-by: Claude Opus 4.7 (1M context) --- .../local-testing/scripts/electron-dev.sh | 17 +- .../main/controllers/HeterogeneousAgentCtr.ts | 271 ++++++++++- .../__tests__/HeterogeneousAgentCtr.test.ts | 127 +++++ .../drivers/claudeCode.test.ts | 62 +++ .../heterogeneousAgent/drivers/claudeCode.ts | 5 + .../main/modules/heterogeneousAgent/types.ts | 6 + packages/agent-gateway-client/src/index.ts | 2 + packages/agent-gateway-client/src/types.ts | 50 ++ .../src/client/Inspector/AskUserQuestion.tsx | 60 +++ .../src/client/Inspector/index.ts | 2 + .../client/Intervention/AskUserQuestion.tsx | 388 +++++++++++++++ .../src/client/Intervention/index.ts | 14 + .../client/Render/AskUserQuestion/index.tsx | 131 +++++ .../src/client/Render/index.ts | 2 + .../src/client/index.ts | 1 + .../builtin-tool-claude-code/src/index.ts | 3 + .../builtin-tool-claude-code/src/types.ts | 35 ++ packages/builtin-tools/src/interventions.ts | 5 + packages/heterogeneous-agents/package.json | 5 +- .../src/adapters/claudeCode.test.ts | 47 ++ .../src/adapters/claudeCode.ts | 30 +- .../src/askUser/AskUserBridge.test.ts | 255 ++++++++++ .../src/askUser/AskUserBridge.ts | 272 +++++++++++ .../src/askUser/AskUserMcpServer.test.ts | 257 ++++++++++ .../src/askUser/AskUserMcpServer.ts | 448 ++++++++++++++++++ .../src/askUser/constants.ts | 21 + .../heterogeneous-agents/src/askUser/index.ts | 30 ++ .../Intervention/customInteractionHandlers.ts | 17 +- .../Tool/Detail/Intervention/index.tsx | 12 + .../store/slices/generation/action.ts | 154 +++++- src/server/routers/lambda/aiAgent.ts | 2 + src/services/electron/heterogeneousAgent.ts | 15 + .../aiChat/actions/conversationControl.ts | 173 +++++++ .../actions/heterogeneousAgentExecutor.ts | 43 +- 34 files changed, 2910 insertions(+), 52 deletions(-) create mode 100644 apps/desktop/src/main/modules/heterogeneousAgent/drivers/claudeCode.test.ts create mode 100644 packages/builtin-tool-claude-code/src/client/Inspector/AskUserQuestion.tsx create mode 100644 packages/builtin-tool-claude-code/src/client/Intervention/AskUserQuestion.tsx create mode 100644 packages/builtin-tool-claude-code/src/client/Intervention/index.ts create mode 100644 packages/builtin-tool-claude-code/src/client/Render/AskUserQuestion/index.tsx create mode 100644 packages/heterogeneous-agents/src/askUser/AskUserBridge.test.ts create mode 100644 packages/heterogeneous-agents/src/askUser/AskUserBridge.ts create mode 100644 packages/heterogeneous-agents/src/askUser/AskUserMcpServer.test.ts create mode 100644 packages/heterogeneous-agents/src/askUser/AskUserMcpServer.ts create mode 100644 packages/heterogeneous-agents/src/askUser/constants.ts create mode 100644 packages/heterogeneous-agents/src/askUser/index.ts diff --git a/.agents/skills/local-testing/scripts/electron-dev.sh b/.agents/skills/local-testing/scripts/electron-dev.sh index c40975e1f4..e69cb8ed34 100755 --- a/.agents/skills/local-testing/scripts/electron-dev.sh +++ b/.agents/skills/local-testing/scripts/electron-dev.sh @@ -76,7 +76,9 @@ find_project_pids() { port_pid=$(lsof -ti tcp:"$CDP_PORT" -sTCP:LISTEN 2>/dev/null || true) pids="$pids $port_pid" - echo "$pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' ' + # `|| true` because `grep -v '^$'` exits 1 when input has no non-empty + # lines, which (with pipefail + set -e) silently kills the caller. + echo "$pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' ' || true } # Wait for the CDP HTTP endpoint to respond, with a deadline + early bail-out @@ -146,7 +148,7 @@ do_stop() { for pid in $seed_pids; do all_pids="$all_pids $(expand_descendants "$pid")" done - all_pids=$(echo "$all_pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' ') + all_pids=$(echo "$all_pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' ' || true) if [ -z "$all_pids" ]; then echo "[electron-dev] No project Electron/vite processes found." @@ -270,10 +272,17 @@ do_start() { # Launch in a new session (setsid) so the whole process tree shares a PGID # we can later signal in one shot. `setsid bash -c '... exec ...' &` keeps # the bash shell as the session leader; its PID is what we save. - setsid bash -c " + # macOS doesn't ship setsid by default — fall back to plain bash; cleanup + # still works via `expand_descendants` walking the process tree. + local launch_cmd=" cd '$PROJECT_ROOT/apps/desktop' exec npx electron-vite dev -- --remote-debugging-port=$CDP_PORT - " >> "$ELECTRON_LOG" 2>&1 < /dev/null & + " + if command -v setsid >/dev/null 2>&1; then + setsid bash -c "$launch_cmd" >> "$ELECTRON_LOG" 2>&1 < /dev/null & + else + bash -c "$launch_cmd" >> "$ELECTRON_LOG" 2>&1 < /dev/null & + fi local launcher_pid=$! echo "$launcher_pid" > "$PIDFILE" echo "[electron-dev] Launcher PID (session leader): $launcher_pid" diff --git a/apps/desktop/src/main/controllers/HeterogeneousAgentCtr.ts b/apps/desktop/src/main/controllers/HeterogeneousAgentCtr.ts index b930697920..b27f851688 100644 --- a/apps/desktop/src/main/controllers/HeterogeneousAgentCtr.ts +++ b/apps/desktop/src/main/controllers/HeterogeneousAgentCtr.ts @@ -1,7 +1,9 @@ import type { ChildProcess } from 'node:child_process'; import { spawn } from 'node:child_process'; import { randomUUID } from 'node:crypto'; -import { access, appendFile, mkdir, writeFile } from 'node:fs/promises'; +import { unlinkSync } from 'node:fs'; +import { access, appendFile, mkdir, unlink, writeFile } from 'node:fs/promises'; +import os from 'node:os'; import path from 'node:path'; import type { Readable, Writable } from 'node:stream'; import { finished as streamFinished } from 'node:stream/promises'; @@ -14,6 +16,8 @@ import { CODEX_CLI_INSTALL_DOCS_URL, HeterogeneousAgentSessionErrorCode, } from '@lobechat/electron-client-ipc'; +import type { AskUserBridge } from '@lobechat/heterogeneous-agents/askUser'; +import { AskUserMcpServer } from '@lobechat/heterogeneous-agents/askUser'; import type { AgentContentBlock } from '@lobechat/heterogeneous-agents/spawn'; import { AgentStreamPipeline, @@ -99,6 +103,18 @@ interface CancelSessionParams { sessionId: string; } +interface SubmitInterventionParams { + cancelled?: boolean; + /** When set, signals user-cancelled or timeout — the bridge resolves with isError. */ + cancelReason?: 'timeout' | 'user_cancelled'; + /** Operation id stamped on the request the renderer is responding to. */ + operationId: string; + /** Structured user answer; ignored when `cancelled` is true. */ + result?: unknown; + /** Correlation key carried on the original `agent_intervention_request`. */ + toolCallId: string; +} + interface StopSessionParams { sessionId: string; } @@ -150,10 +166,28 @@ interface CliTraceSession { * * Lifecycle: startSession → sendPrompt → (heteroAgentEvent broadcasts) → stopSession */ +interface InterventionSlot { + bridge: AskUserBridge; + /** Resolves once bridge.events() iterator ends (after `cancelAll`). */ + pumpDone: Promise; + /** Path to the per-op temp `mcp.json` we wrote for `--mcp-config`. */ + tmpConfigPath: string; +} + export default class HeterogeneousAgentCtr extends ControllerModule { static override readonly groupName = 'heterogeneousAgent'; private sessions = new Map(); + /** + * Per-operation AskUserQuestion bridge state. Keyed by `operationId` so the + * `submitIntervention` IPC can route an answer to the right pending MCP + * handler regardless of which `sessionId` it belongs to (one session can + * fire many ops over its lifetime). + */ + private opIdToIntervention = new Map(); + /** Lazy single MCP server, started on first claude-code prompt. */ + private askUserMcpServer?: AskUserMcpServer; + private askUserMcpStartPromise?: Promise; private resolveSessionCommand(session: AgentSession): string { const resolvedCommand = session.command.trim(); @@ -567,6 +601,92 @@ export default class HeterogeneousAgentCtr extends ControllerModule { } } + // ─── AskUserQuestion MCP server (LOBE-8725) ─── + + /** + * Lazy single-instance MCP server for CC's AskUserQuestion replacement. + * First claude-code prompt triggers `start()`; subsequent prompts reuse + * the same listener. Concurrent first-callers de-dupe via the in-flight + * promise so we don't bind two ports. + */ + private async ensureAskUserMcpServerStarted(): Promise { + if (this.askUserMcpServer) return this.askUserMcpServer; + if (!this.askUserMcpStartPromise) { + this.askUserMcpStartPromise = (async () => { + const server = new AskUserMcpServer(); + await server.start(); + this.askUserMcpServer = server; + logger.info('AskUserQuestion MCP server started:', server.url); + return server; + })().catch((err) => { + // Reset so a later sendPrompt can retry; surface the error. + this.askUserMcpStartPromise = undefined; + logger.error('Failed to start AskUserQuestion MCP server:', err); + throw err; + }); + } + return this.askUserMcpStartPromise; + } + + /** + * Register a per-op AskUserQuestion bridge, write its temp `mcp.json`, + * and start pumping the bridge's outbound events into the regular + * `heteroAgentEvent` broadcast. Caller must invoke the returned cleanup + * after the spawn finishes (success, error, or cancel) to remove the + * temp file and tear down the bridge. + * + * Pump errors are logged but never thrown — they don't fail the spawn. + */ + private async setupInterventionForOp( + operationId: string, + sessionId: string, + ): Promise<{ cleanup: () => Promise; tmpConfigPath: string }> { + const server = await this.ensureAskUserMcpServerStarted(); + const bridge = server.registerOperation(operationId); + const tmpConfigPath = path.join(os.tmpdir(), `lobe-cc-mcp-${operationId}.json`); + + // `alwaysLoad: true` is the undocumented CC flag that promotes our + // server's tool out of the deferred set so the model calls it directly + // (no ToolSearch hop). See LOBE-8725 spike notes — falls back to the + // 2-hop ToolSearch path if a future CC drops the flag, no breakage. + const config = { + mcpServers: { + lobe_cc: { + alwaysLoad: true, + type: 'http' as const, + url: server.urlForOperation(operationId), + }, + }, + }; + await writeFile(tmpConfigPath, JSON.stringify(config), 'utf8'); + + // Pump bridge.events() into the `heteroAgentEvent` broadcast. The + // iterator only ends after `cancelAll()`, so `pumpDone` resolves at + // cleanup time and gates teardown. + const pumpDone = (async () => { + for await (const event of bridge.events()) { + this.broadcast('heteroAgentEvent', { event, sessionId }); + } + })().catch((err) => { + logger.warn('AskUserQuestion bridge pump error:', err); + }); + + this.opIdToIntervention.set(operationId, { bridge, pumpDone, tmpConfigPath }); + + const cleanup = async () => { + // Unregistering on the server cancels all bridge pendings AND closes + // the events iterator (cancelAll fires from within unregisterOperation). + this.askUserMcpServer?.unregisterOperation(operationId); + await pumpDone; + this.opIdToIntervention.delete(operationId); + await unlink(tmpConfigPath).catch(() => { + /* file may already be gone if app crashed mid-prompt */ + }); + }; + + return { cleanup, tmpConfigPath }; + } + // ─── File cache ─── private get fileCacheDir(): string { @@ -697,32 +817,58 @@ export default class HeterogeneousAgentCtr extends ControllerModule { throw new Error(preflightError.message); } - const driver = getHeterogeneousAgentDriver(session.agentType); - const spawnPlan = await driver.buildSpawnPlan({ - args: session.args, - helpers: { - buildClaudeStreamJsonInput: (prompt, imageList) => - this.buildStreamJsonInput(prompt, imageList), - resolveCliImagePaths: (imageList) => this.resolveCliImagePaths(imageList), - }, - imageList: params.imageList ?? [], - prompt: params.prompt, - resumeSessionId: session.agentSessionId, - }); + // Stand up the AskUserQuestion MCP bridge for claude-code prompts BEFORE + // building the spawn plan so the driver can wire the temp config path + // into `--mcp-config`. Codex / future agents skip this entirely. + const intervention = + session.agentType === 'claude-code' + ? await this.setupInterventionForOp(params.operationId, session.sessionId).catch((err) => { + logger.warn('Failed to set up AskUserQuestion bridge — proceeding without it:', err); + return undefined; + }) + : undefined; + + let spawnPlan; + let traceSession; + let cwd: string; + try { + const driver = getHeterogeneousAgentDriver(session.agentType); + spawnPlan = await driver.buildSpawnPlan({ + args: session.args, + helpers: { + buildClaudeStreamJsonInput: (prompt, imageList) => + this.buildStreamJsonInput(prompt, imageList), + resolveCliImagePaths: (imageList) => this.resolveCliImagePaths(imageList), + }, + imageList: params.imageList ?? [], + mcpConfigPath: intervention?.tmpConfigPath, + prompt: params.prompt, + resumeSessionId: session.agentSessionId, + }); + + // Fall back to the user's Desktop so the process never inherits + // the Electron parent's cwd (which is `/` when launched from Finder). + cwd = session.cwd || electronApp.getPath('desktop'); + traceSession = await this.createCliTraceSession({ + cliArgs: spawnPlan.args, + cwd, + imageList: params.imageList ?? [], + session, + stdinPayload: spawnPlan.stdinPayload, + }); + } catch (err) { + // We never made it to spawn — the `proc.on('exit')` cleanup path + // won't run, so tear the intervention bridge down right here. + if (intervention) { + await intervention.cleanup().catch((cleanupErr) => { + logger.warn('AskUserQuestion cleanup error during pre-spawn failure:', cleanupErr); + }); + } + throw err; + } const useStdin = spawnPlan.stdinPayload !== undefined; const cliArgs = spawnPlan.args; - // Fall back to the user's Desktop so the process never inherits - // the Electron parent's cwd (which is `/` when launched from Finder). - const cwd = session.cwd || electronApp.getPath('desktop'); - const traceSession = await this.createCliTraceSession({ - cliArgs, - cwd, - imageList: params.imageList ?? [], - session, - stdinPayload: spawnPlan.stdinPayload, - }); - return new Promise((resolve, reject) => { logger.info('Spawning agent:', session.command, cliArgs.join(' '), `(cwd: ${cwd})`); @@ -838,6 +984,15 @@ export default class HeterogeneousAgentCtr extends ControllerModule { void stdoutDrained .then(() => stdoutBroadcastQueue) .finally(async () => { + // Tear down the AskUserQuestion bridge / temp `mcp.json` for this + // op. Pending MCP handlers get a `session_ended` cancellation so + // they return cleanly even if CC was killed mid-tool-call. + if (intervention) { + await intervention.cleanup().catch((err) => { + logger.warn('AskUserQuestion cleanup error:', err); + }); + } + void this.writeCliTraceJson(traceSession, 'exit.json', { code, finishedAt: new Date().toISOString(), @@ -972,10 +1127,54 @@ export default class HeterogeneousAgentCtr extends ControllerModule { } /** - * Cleanup on app quit. + * Renderer → main: deliver the user's answer to a pending CC AskUserQuestion + * (or signal cancellation). The matching bridge resolves its blocked + * `pending()` Promise, the local MCP handler returns to CC, and CC's + * `tool_result` flows back through the normal stream pipeline. + * + * Idempotent — late submissions for already-resolved tool calls are no-ops. + * No-op when called for an unknown opId; the bridge may have been cleaned + * up already (op finished / cancelled). + */ + @IpcMethod() + async submitIntervention(params: SubmitInterventionParams): Promise { + const slot = this.opIdToIntervention.get(params.operationId); + if (!slot) { + logger.warn('submitIntervention: no active intervention for operationId', params.operationId); + return; + } + slot.bridge.resolve(params.toolCallId, { + cancelReason: params.cancelled ? (params.cancelReason ?? 'user_cancelled') : undefined, + cancelled: params.cancelled, + result: params.result, + }); + } + + /** + * Synchronously unlink every pending intervention's temp `mcp.json`. The + * async exit-handler cleanup loses to Electron's main-process teardown + * often enough that we'd leak `lobe-cc-mcp-.json` files into + * `os.tmpdir()` on real shutdowns; sync unlink here is the only reliable + * guarantee. Safe to call multiple times. + */ + private unlinkPendingInterventionConfigsSync = (): void => { + for (const [, intervention] of this.opIdToIntervention) { + try { + unlinkSync(intervention.tmpConfigPath); + } catch { + /* file may already be gone — fine */ + } + } + }; + + /** + * Cleanup on app quit. `before-quit` covers the user-driven Cmd+Q / + * `app.quit()` path; SIGTERM / SIGINT cover external kills (test + * harnesses, OS shutdown) where Electron's lifecycle events never fire. */ afterAppReady() { electronApp.on('before-quit', () => { + this.unlinkPendingInterventionConfigsSync(); for (const [, session] of this.sessions) { if (session.process && !session.process.killed) { session.cancelledByUs = true; @@ -983,6 +1182,28 @@ export default class HeterogeneousAgentCtr extends ControllerModule { } } this.sessions.clear(); + // The exit handlers will tear each per-op intervention down, but if + // CC's stdio close races shutdown we'd leave the MCP server bound to + // a port. Stopping it here cancels every still-pending bridge with + // `session_ended` and closes the listener. + void this.askUserMcpServer?.stop().catch((err) => { + logger.warn('AskUserQuestion MCP server stop error:', err); + }); }); + + const onSignal = (signal: NodeJS.Signals) => { + this.unlinkPendingInterventionConfigsSync(); + // Defer to Electron's normal quit flow so the rest of the app gets a + // chance to tear down. The `before-quit` handler above is idempotent. + try { + electronApp.quit(); + } catch { + /* during late shutdown app.quit may throw — fine */ + } + // Last-resort exit if Electron is wedged and won't quit on its own. + setTimeout(() => process.exit(signal === 'SIGINT' ? 130 : 143), 1000).unref(); + }; + process.on('SIGTERM', onSignal); + process.on('SIGINT', onSignal); } } diff --git a/apps/desktop/src/main/controllers/__tests__/HeterogeneousAgentCtr.test.ts b/apps/desktop/src/main/controllers/__tests__/HeterogeneousAgentCtr.test.ts index cf4f56da47..330dc3e483 100644 --- a/apps/desktop/src/main/controllers/__tests__/HeterogeneousAgentCtr.test.ts +++ b/apps/desktop/src/main/controllers/__tests__/HeterogeneousAgentCtr.test.ts @@ -802,4 +802,131 @@ describe('HeterogeneousAgentCtr', () => { expect(toolEnds.length).toBeGreaterThan(0); }); }); + + describe('app-quit cleanup of AskUserQuestion temp configs (LOBE-8725)', () => { + // The async exit-handler cleanup races Electron's main-process teardown + // and used to leak `lobe-cc-mcp-.json` files in `os.tmpdir()` on + // every quit. The controller now unlinks pending intervention temp + // configs *synchronously* from `before-quit` AND from process signal + // handlers (SIGTERM / SIGINT — `before-quit` doesn't fire on external + // kills). These tests exercise both paths against real files. + + /** + * Drop a temp `lobe-cc-mcp-.json` and stash it on the controller's + * `opIdToIntervention` map under the same key, so the quit hook treats + * it like a real pending intervention and tries to unlink it. + */ + const seedPendingIntervention = async (ctr: HeterogeneousAgentCtr, opId: string) => { + const tmpConfigPath = path.join(tmpdir(), `lobe-cc-mcp-test-${opId}.json`); + await writeFile(tmpConfigPath, '{"mcpServers":{}}'); + const slot = { + bridge: {} as any, + pumpDone: Promise.resolve(), + tmpConfigPath, + }; + (ctr as any).opIdToIntervention.set(opId, slot); + return tmpConfigPath; + }; + + const captureRegisteredHandler = ( + registerSpy: ReturnType | ReturnType, + eventName: string, + ): (() => void) => { + const calls = (registerSpy as any).mock.calls as Array<[string, () => void]>; + const match = calls.findLast(([evt]) => evt === eventName); + if (!match) throw new Error(`no handler registered for "${eventName}"`); + return match[1]; + }; + + it('before-quit synchronously unlinks every pending intervention temp config', async () => { + const electron = (await import('electron')) as any; + electron.app.on.mockClear(); + + const ctr = new HeterogeneousAgentCtr({ + appStoragePath, + storeManager: { get: vi.fn() }, + } as any); + + const fileA = await seedPendingIntervention(ctr, 'opA'); + const fileB = await seedPendingIntervention(ctr, 'opB'); + + ctr.afterAppReady(); + const beforeQuit = captureRegisteredHandler(electron.app.on, 'before-quit'); + beforeQuit(); + + await expect(access(fileA)).rejects.toThrow(); + await expect(access(fileB)).rejects.toThrow(); + }); + + it('SIGTERM handler unlinks pending intervention temp configs (external-kill path)', async () => { + // External kills (test harness, OS shutdown) skip Electron's lifecycle + // events entirely — `before-quit` never fires, so the controller has to + // hook the raw process signal too. Stub `process.on` so the handler is + // *recorded* but never actually attached to the test runner's process + // (otherwise the test leaks a SIGTERM listener that survives the test). + // Same for `process.exit` — the controller's fail-safe shouldn't get a + // chance to actually exit the worker if its `setTimeout(...).unref()` + // ever fires before mockRestore. + const electron = (await import('electron')) as any; + electron.app.on.mockClear(); + const processOnSpy = vi.spyOn(process, 'on').mockImplementation(() => process); + const processExitSpy = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never); + + const ctr = new HeterogeneousAgentCtr({ + appStoragePath, + storeManager: { get: vi.fn() }, + } as any); + const file = await seedPendingIntervention(ctr, 'opSigterm'); + + ctr.afterAppReady(); + const sigterm = captureRegisteredHandler(processOnSpy, 'SIGTERM'); + sigterm(); + + await expect(access(file)).rejects.toThrow(); + + processOnSpy.mockRestore(); + processExitSpy.mockRestore(); + }); + + it('SIGINT handler unlinks pending intervention temp configs (Ctrl-C path)', async () => { + const electron = (await import('electron')) as any; + electron.app.on.mockClear(); + const processOnSpy = vi.spyOn(process, 'on').mockImplementation(() => process); + const processExitSpy = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never); + + const ctr = new HeterogeneousAgentCtr({ + appStoragePath, + storeManager: { get: vi.fn() }, + } as any); + const file = await seedPendingIntervention(ctr, 'opSigint'); + + ctr.afterAppReady(); + const sigint = captureRegisteredHandler(processOnSpy, 'SIGINT'); + sigint(); + + await expect(access(file)).rejects.toThrow(); + + processOnSpy.mockRestore(); + processExitSpy.mockRestore(); + }); + + it('cleanup is idempotent — already-deleted files do not throw', async () => { + const electron = (await import('electron')) as any; + electron.app.on.mockClear(); + + const ctr = new HeterogeneousAgentCtr({ + appStoragePath, + storeManager: { get: vi.fn() }, + } as any); + const file = await seedPendingIntervention(ctr, 'opIdempotent'); + + // Pre-delete the file out from under the controller — simulates a + // partial cleanup race where the async exit handler beat us to it. + await unlink(file); + + ctr.afterAppReady(); + const beforeQuit = captureRegisteredHandler(electron.app.on, 'before-quit'); + expect(() => beforeQuit()).not.toThrow(); + }); + }); }); diff --git a/apps/desktop/src/main/modules/heterogeneousAgent/drivers/claudeCode.test.ts b/apps/desktop/src/main/modules/heterogeneousAgent/drivers/claudeCode.test.ts new file mode 100644 index 0000000000..3cb3e1ec75 --- /dev/null +++ b/apps/desktop/src/main/modules/heterogeneousAgent/drivers/claudeCode.test.ts @@ -0,0 +1,62 @@ +import { describe, expect, it } from 'vitest'; + +import type { + HeterogeneousAgentBuildPlanHelpers, + HeterogeneousAgentBuildPlanParams, +} from '../types'; +import { claudeCodeDriver } from './claudeCode'; + +const stubHelpers: HeterogeneousAgentBuildPlanHelpers = { + buildClaudeStreamJsonInput: async () => '{"type":"user","message":{}}\n', + resolveCliImagePaths: async () => [], +}; + +const buildParams = ( + overrides: Partial = {}, +): HeterogeneousAgentBuildPlanParams => ({ + args: [], + helpers: stubHelpers, + imageList: [], + prompt: 'hi', + ...overrides, +}); + +describe('claudeCodeDriver', () => { + it('omits --mcp-config when mcpConfigPath is undefined', async () => { + const { args } = await claudeCodeDriver.buildSpawnPlan(buildParams()); + expect(args).not.toContain('--mcp-config'); + }); + + it('appends --mcp-config when mcpConfigPath is provided', async () => { + const { args } = await claudeCodeDriver.buildSpawnPlan( + buildParams({ mcpConfigPath: '/tmp/lobe-cc-mcp-op-1.json' }), + ); + const idx = args.indexOf('--mcp-config'); + expect(idx).toBeGreaterThan(-1); + expect(args[idx + 1]).toBe('/tmp/lobe-cc-mcp-op-1.json'); + }); + + it('still pins --disallowedTools AskUserQuestion alongside --mcp-config', async () => { + // Even with our local MCP replacement available, CC's built-in stays + // disabled — leaving both visible would let the model double-register + // the same name and pick the broken one. + const { args } = await claudeCodeDriver.buildSpawnPlan( + buildParams({ mcpConfigPath: '/tmp/x.json' }), + ); + const disallowedIdx = args.indexOf('--disallowedTools'); + expect(disallowedIdx).toBeGreaterThan(-1); + expect(args[disallowedIdx + 1]).toBe('AskUserQuestion'); + }); + + it('--mcp-config goes before --resume so user --args can still override the resume id', async () => { + const { args } = await claudeCodeDriver.buildSpawnPlan( + buildParams({ mcpConfigPath: '/tmp/x.json', resumeSessionId: 'cc-prev-1' }), + ); + const mcpIdx = args.indexOf('--mcp-config'); + const resumeIdx = args.indexOf('--resume'); + expect(mcpIdx).toBeGreaterThan(-1); + expect(resumeIdx).toBeGreaterThan(-1); + expect(mcpIdx).toBeLessThan(resumeIdx); + expect(args[resumeIdx + 1]).toBe('cc-prev-1'); + }); +}); diff --git a/apps/desktop/src/main/modules/heterogeneousAgent/drivers/claudeCode.ts b/apps/desktop/src/main/modules/heterogeneousAgent/drivers/claudeCode.ts index a0a1705859..6eb2ee8618 100644 --- a/apps/desktop/src/main/modules/heterogeneousAgent/drivers/claudeCode.ts +++ b/apps/desktop/src/main/modules/heterogeneousAgent/drivers/claudeCode.ts @@ -18,6 +18,7 @@ export const claudeCodeDriver: HeterogeneousAgentDriver = { args, helpers, imageList, + mcpConfigPath, prompt, resumeSessionId, }: HeterogeneousAgentBuildPlanParams) { @@ -26,6 +27,10 @@ export const claudeCodeDriver: HeterogeneousAgentDriver = { return { args: [ ...DESKTOP_CLAUDE_CODE_ARGS, + // Wire the controller-managed temp mcp.json (AskUserQuestion server, + // see LOBE-8725) when present. Path-based config is required — CC + // does not accept inline JSON for `--mcp-config`. + ...(mcpConfigPath ? ['--mcp-config', mcpConfigPath] : []), ...(resumeSessionId ? ['--resume', resumeSessionId] : []), ...args, ], diff --git a/apps/desktop/src/main/modules/heterogeneousAgent/types.ts b/apps/desktop/src/main/modules/heterogeneousAgent/types.ts index 49bcdfe660..89803793f7 100644 --- a/apps/desktop/src/main/modules/heterogeneousAgent/types.ts +++ b/apps/desktop/src/main/modules/heterogeneousAgent/types.ts @@ -20,6 +20,12 @@ export interface HeterogeneousAgentBuildPlanParams { args: string[]; helpers: HeterogeneousAgentBuildPlanHelpers; imageList: HeterogeneousAgentImageAttachment[]; + /** + * Optional path to an MCP config JSON written by the controller (e.g. for + * the local `lobe_cc` AskUserQuestion server). Drivers that recognize the + * field append `--mcp-config `; others ignore it. + */ + mcpConfigPath?: string; prompt: string; resumeSessionId?: string; } diff --git a/packages/agent-gateway-client/src/index.ts b/packages/agent-gateway-client/src/index.ts index 9467593c06..18314d6aed 100644 --- a/packages/agent-gateway-client/src/index.ts +++ b/packages/agent-gateway-client/src/index.ts @@ -1,5 +1,7 @@ export { AgentStreamClient } from './client'; export type { + AgentInterventionRequestData, + AgentInterventionResponseData, AgentStreamClientEvents, AgentStreamClientOptions, AgentStreamEvent, diff --git a/packages/agent-gateway-client/src/types.ts b/packages/agent-gateway-client/src/types.ts index 408783beff..187f402e04 100644 --- a/packages/agent-gateway-client/src/types.ts +++ b/packages/agent-gateway-client/src/types.ts @@ -16,6 +16,23 @@ export type AgentStreamEventType = * wire union so consumers can pattern-match without casting. */ | 'tool_result' + /** + * Producer needs structured input from the user mid-run (e.g. CC's + * AskUserQuestion delivered via a local MCP server). Distinct from + * `tool_execute` — that one means "client, please run this tool"; this + * one means "user, please answer these questions". Renderer surfaces a + * dedicated UI; the producer's MCP handler stays pending until the + * paired `agent_intervention_response` resolves it (or the deadline + * passes / the op is cancelled). + */ + | 'agent_intervention_request' + /** + * The user's answer to a prior `agent_intervention_request`. Flows back + * to the producer (Electron main → MCP handler resolve, sandbox → + * server bus → CLI). Carries either a structured `result` or a + * cancellation marker. + */ + | 'agent_intervention_response' | 'step_start' | 'step_complete' | 'error'; @@ -78,6 +95,39 @@ export interface StepCompleteData { reasonDetail?: string; } +/** + * Producer → consumer: structured-input request the user must answer + * directly (no tool execution involved). The producer's tool handler stays + * blocked until a matching `agent_intervention_response` (correlated by + * `toolCallId`) flows back, or the `deadline` is reached. + */ +export interface AgentInterventionRequestData { + /** Tool API name (e.g. `'askUserQuestion'`). */ + apiName: string; + /** JSON-encoded payload the UI renders (e.g. `{ questions: [...] }`). */ + arguments: string; + /** Unix-ms wall-clock at which the producer will give up waiting. */ + deadline: number; + /** Tool plugin identifier (e.g. `'claude-code'`). */ + identifier: string; + /** Correlation key. Stable for the lifetime of the intervention. */ + toolCallId: string; +} + +/** + * Consumer → producer: the user's answer to a prior intervention request. + * Either `result` (success) or `cancelled: true` (timeout / user cancel). + */ +export interface AgentInterventionResponseData { + /** Set when the user cancelled or the deadline elapsed. */ + cancelled?: boolean; + /** When `cancelled`, optional reason for telemetry/logging. */ + cancelReason?: 'timeout' | 'user_cancelled' | 'session_ended'; + /** User-supplied answer (JSON-serializable). Absent when cancelled. */ + result?: unknown; + toolCallId: string; +} + /** * Server → Client: request the client to execute a tool locally and return the result. */ diff --git a/packages/builtin-tool-claude-code/src/client/Inspector/AskUserQuestion.tsx b/packages/builtin-tool-claude-code/src/client/Inspector/AskUserQuestion.tsx new file mode 100644 index 0000000000..74074737db --- /dev/null +++ b/packages/builtin-tool-claude-code/src/client/Inspector/AskUserQuestion.tsx @@ -0,0 +1,60 @@ +'use client'; + +import { inspectorTextStyles, shinyTextStyles } from '@lobechat/shared-tool-ui/styles'; +import type { BuiltinInspectorProps } from '@lobechat/types'; +import { createStaticStyles, cx } from 'antd-style'; +import { memo } from 'react'; +import { useTranslation } from 'react-i18next'; + +import { type AskUserQuestionArgs, ClaudeCodeApiName } from '../../types'; + +const styles = createStaticStyles(({ css, cssVar }) => ({ + chip: css` + overflow: hidden; + + min-width: 0; + margin-inline-start: 6px; + padding-block: 2px; + padding-inline: 10px; + border-radius: 999px; + + font-size: 12px; + color: ${cssVar.colorText}; + text-overflow: ellipsis; + white-space: nowrap; + + background: ${cssVar.colorFillTertiary}; + `, +})); + +export const AskUserQuestionInspector = memo>( + ({ args, partialArgs, isArgumentsStreaming, isLoading }) => { + const { t } = useTranslation('plugin'); + const label = t(ClaudeCodeApiName.AskUserQuestion as any); + const questions = args?.questions ?? partialArgs?.questions ?? []; + const summary = + questions.length === 0 + ? undefined + : questions.length === 1 + ? questions[0]?.header || questions[0]?.question + : `${questions[0]?.header || questions[0]?.question} +${questions.length - 1}`; + + if (isArgumentsStreaming && !summary) { + return
{label}
; + } + + return ( +
+ {label} + {summary && {summary}} +
+ ); + }, +); + +AskUserQuestionInspector.displayName = 'ClaudeCodeAskUserQuestionInspector'; diff --git a/packages/builtin-tool-claude-code/src/client/Inspector/index.ts b/packages/builtin-tool-claude-code/src/client/Inspector/index.ts index ad39c86189..12a1766ba8 100644 --- a/packages/builtin-tool-claude-code/src/client/Inspector/index.ts +++ b/packages/builtin-tool-claude-code/src/client/Inspector/index.ts @@ -8,6 +8,7 @@ import { import { ClaudeCodeApiName } from '../../types'; import { AgentInspector } from './Agent'; +import { AskUserQuestionInspector } from './AskUserQuestion'; import { EditInspector } from './Edit'; import { ReadInspector } from './Read'; import { ScheduleWakeupInspector } from './ScheduleWakeup'; @@ -28,6 +29,7 @@ import { WriteInspector } from './Write'; // state for diff stats), so they live in their own sibling files. export const ClaudeCodeInspectors = { [ClaudeCodeApiName.Agent]: AgentInspector, + [ClaudeCodeApiName.AskUserQuestion]: AskUserQuestionInspector, [ClaudeCodeApiName.Bash]: createRunCommandInspector(ClaudeCodeApiName.Bash), [ClaudeCodeApiName.Edit]: EditInspector, [ClaudeCodeApiName.Glob]: createGlobLocalFilesInspector(ClaudeCodeApiName.Glob), diff --git a/packages/builtin-tool-claude-code/src/client/Intervention/AskUserQuestion.tsx b/packages/builtin-tool-claude-code/src/client/Intervention/AskUserQuestion.tsx new file mode 100644 index 0000000000..2232176487 --- /dev/null +++ b/packages/builtin-tool-claude-code/src/client/Intervention/AskUserQuestion.tsx @@ -0,0 +1,388 @@ +'use client'; + +import type { BuiltinInterventionProps } from '@lobechat/types'; +import { Button, Flexbox, Icon, Tabs, Text } from '@lobehub/ui'; +import { createStaticStyles, cx } from 'antd-style'; +import { Check, Send, X } from 'lucide-react'; +import { memo, useCallback, useEffect, useMemo, useState } from 'react'; + +import { useConversationStore } from '@/features/Conversation/store'; +import { dataSelectors } from '@/features/Conversation/store/slices/data/selectors'; +import { useChatStore } from '@/store/chat'; + +import type { AskUserQuestionArgs, AskUserQuestionItem } from '../../types'; + +/** + * Server-side bridge timeout (matches `AskUserMcpServer.pendingTimeoutMs`). + * Not strictly synchronized — server is authoritative — but keeps the on-screen + * countdown close to reality without plumbing a deadline through every layer. + */ +const COUNTDOWN_MS = 5 * 60 * 1000; + +/** Key under tool message `pluginState` where in-progress draft answers live. */ +const DRAFT_PLUGIN_STATE_KEY = 'askUserDraft'; + +const formatRemaining = (msLeft: number): string => { + const totalSec = Math.max(0, Math.floor(msLeft / 1000)); + const min = Math.floor(totalSec / 60); + const sec = totalSec % 60; + return `${min}:${sec.toString().padStart(2, '0')}`; +}; + +const styles = createStaticStyles(({ css, cssVar }) => ({ + // Card sits inline with the chat — no surrounding panel chrome. Hover + // tints the row so the stack reads as clickable; selection swaps to a + // filled `colorPrimaryBg` so the pick is visually weighty. + option: css` + cursor: pointer; + + padding-block: 10px; + padding-inline: 12px; + border-radius: 8px; + + transition: background 0.12s ease; + + &:hover { + background: ${cssVar.colorFillQuaternary}; + } + `, + optionCheck: css` + flex-shrink: 0; + color: ${cssVar.colorPrimary}; + `, + optionDescription: css` + font-size: 12px; + line-height: 1.45; + color: ${cssVar.colorTextSecondary}; + `, + // Neutral 1/2/3/4 chip — stays the same colour whether selected or not so + // the selection signal lives on the filled background + checkmark. + optionIndex: css` + flex-shrink: 0; + + box-sizing: border-box; + width: 22px; + height: 22px; + border-radius: 6px; + + font-family: ${cssVar.fontFamilyCode}; + font-size: 12px; + font-weight: 600; + line-height: 22px; + color: ${cssVar.colorTextSecondary}; + text-align: center; + + background: ${cssVar.colorFillTertiary}; + `, + optionLabel: css` + font-weight: 500; + `, + optionSelected: css` + background: ${cssVar.colorPrimaryBg}; + + &:hover { + background: ${cssVar.colorPrimaryBgHover}; + } + `, +})); + +interface OptionCardProps { + description?: string; + disabled?: boolean; + index: number; + label: string; + onToggle: () => void; + selected: boolean; +} + +/** + * One numbered option in a question. Outlined when picked, neutral otherwise; + * a right-side checkmark seals the selection so the state reads cleanly even + * with the number chip kept neutral. + */ +const OptionCard = memo( + ({ index, label, description, selected, disabled, onToggle }) => ( + { + if (!disabled) onToggle(); + }} + > + {index} + + {label} + {description && {description}} + + {selected && } + + ), +); + +OptionCard.displayName = 'CCAskUserQuestionOption'; + +interface QuestionPanelProps { + answer: string | string[] | undefined; + disabled: boolean; + onToggle: (q: AskUserQuestionItem, label: string) => void; + question: AskUserQuestionItem; +} + +const QuestionPanel = memo(({ question, answer, disabled, onToggle }) => { + const isOptionSelected = (label: string): boolean => + question.multiSelect ? Array.isArray(answer) && answer.includes(label) : answer === label; + + return ( + + + {question.header && {question.header}} + {question.multiSelect && ( + + (multi-select) + + )} + + {question.question} + + + {question.options.map((opt, optIdx) => ( + onToggle(question, opt.label)} + /> + ))} + + + ); +}); + +QuestionPanel.displayName = 'CCAskUserQuestionPanel'; + +/** + * CC AskUserQuestion intervention component. + * + * Pure form — `onInteractionAction` ({type:'submit'|'skip'}) is the only + * outbound side effect. The framework's `handleInteractionAction` (or the + * hetero branch the chat conversation wires up) is responsible for marking + * `pluginIntervention.status` and forwarding the answer to CC over IPC. + * + * Layout + * - One question → renders the question + options directly, no tab strip. + * - Multiple questions → top tab bar (Q1, Q2, …), one panel visible at a + * time. Picking an answer auto-advances to the next unanswered question + * so the user sweeps through without re-clicking the tabs. + * + * Draft persistence + * - Per-message state lives on the tool message's `pluginState.askUserDraft` + * (see `setInterventionDraft` in the chat store). HMR reloads, store + * re-mounts, and tab switches all keep the partial answers around — only + * a fresh `tool_use` (different toolCallId / messageId) starts blank. + */ +const AskUserQuestionIntervention = memo>( + ({ args, messageId, onInteractionAction }) => { + const questions = args?.questions ?? []; + + // Persisted draft (survives unmount / HMR / refresh) — read from the tool + // message's pluginState so the form stays where the user left it. + const persistedDraft = useConversationStore((s) => { + const msg = dataSelectors.getDbMessageById(messageId)(s); + return ( + msg?.pluginState as { [DRAFT_PLUGIN_STATE_KEY]?: Record } + )?.[DRAFT_PLUGIN_STATE_KEY]; + }); + const setInterventionDraft = useChatStore((s) => s.setInterventionDraft); + + const [answers, setAnswers] = useState>( + () => persistedDraft ?? {}, + ); + const [submitting, setSubmitting] = useState(false); + const [activeTab, setActiveTab] = useState(() => { + // Resume on the first unanswered question so coming back lands the user + // where they left off rather than always at Q1. + const initial = persistedDraft ?? {}; + const firstUnanswered = questions.findIndex((q) => { + const a = initial[q.question]; + return q.multiSelect ? !Array.isArray(a) || a.length === 0 : !a; + }); + const idx = firstUnanswered >= 0 ? firstUnanswered : 0; + return String(idx); + }); + + // Mounted-time deadline; server has its own clock and will return + // isError if it expires first. Drift of a few seconds is fine. + const deadline = useMemo(() => Date.now() + COUNTDOWN_MS, []); + const [now, setNow] = useState(() => Date.now()); + useEffect(() => { + const id = setInterval(() => setNow(Date.now()), 1000); + return () => clearInterval(id); + }, []); + const expired = now >= deadline; + + const handleToggle = useCallback( + (q: AskUserQuestionItem, label: string) => { + setAnswers((prev) => { + let next: Record; + if (q.multiSelect) { + const current = (prev[q.question] as string[] | undefined) ?? []; + const updated = current.includes(label) + ? current.filter((x) => x !== label) + : [...current, label]; + next = { ...prev, [q.question]: updated }; + } else { + next = { ...prev, [q.question]: label }; + } + // Persist to pluginState so the picks survive remount / refresh. + setInterventionDraft(messageId, next); + + // Single-select auto-advance: if there's a next unanswered question, + // jump to it. Multi-select stays on the same panel so the user can + // toggle additional options. + if (!q.multiSelect && questions.length > 1) { + const nextUnanswered = questions.findIndex((qq, idx) => { + if (qq.question === q.question) return false; + const a = next[qq.question]; + if (idx < 0) return false; + return qq.multiSelect ? !Array.isArray(a) || a.length === 0 : !a; + }); + if (nextUnanswered >= 0) setActiveTab(String(nextUnanswered)); + } + return next; + }); + }, + [messageId, questions, setInterventionDraft], + ); + + /** + * Submit `payload` exactly as given. Used by both the explicit "Submit" + * button (with whatever the user picked) and the timeout fallback (with + * option 1 of each unanswered question merged in). + */ + const submitWith = useCallback( + async (payload: Record) => { + if (!onInteractionAction || submitting) return; + setSubmitting(true); + try { + await onInteractionAction({ payload, type: 'submit' }); + } catch (err) { + console.error('[AskUserQuestion] submit failed:', err); + setSubmitting(false); + } + }, + [onInteractionAction, submitting], + ); + + const handleSubmit = useCallback(() => submitWith(answers), [answers, submitWith]); + + const handleSkip = useCallback(async () => { + if (!onInteractionAction || submitting) return; + setSubmitting(true); + try { + await onInteractionAction({ type: 'skip' }); + } catch (err) { + console.error('[AskUserQuestion] skip failed:', err); + setSubmitting(false); + } + }, [onInteractionAction, submitting]); + + const allAnswered = useMemo( + () => + questions.every((q) => { + const a = answers[q.question]; + return q.multiSelect ? Array.isArray(a) && a.length > 0 : !!a; + }), + [answers, questions], + ); + + // Timeout fallback: when the countdown hits zero and the user hasn't + // submitted, fill in option 1 of each unanswered question and submit. + // Beats letting the server-side bridge time out into a `cancelled` + // result — the model gets a structured answer it can act on instead of + // a "user didn't respond" isError. Single-shot via `submitting` guard. + useEffect(() => { + if (!expired || submitting || questions.length === 0) return; + const fallback: Record = { ...answers }; + for (const q of questions) { + const a = fallback[q.question]; + const unanswered = q.multiSelect ? !Array.isArray(a) || a.length === 0 : !a; + if (unanswered && q.options.length > 0) { + const first = q.options[0].label; + fallback[q.question] = q.multiSelect ? [first] : first; + } + } + void submitWith(fallback); + }, [expired, submitting, questions, answers, submitWith]); + + const isMulti = questions.length > 1; + const activeQuestion = questions[Number(activeTab)] ?? questions[0]; + + return ( + + {isMulti && ( + { + const a = answers[q.question]; + const done = q.multiSelect ? Array.isArray(a) && a.length > 0 : !!a; + return { + key: String(idx), + label: ( + + Q{idx + 1} + {done && } + + ), + }; + })} + onChange={(key) => setActiveTab(key as string)} + /> + )} + + {activeQuestion && ( + + )} + + + + {expired + ? 'Time expired — using option 1 of each question.' + : `Time remaining: ${formatRemaining(deadline - now)} · ` + + 'unanswered questions default to option 1 on timeout.'} + + + + + + + + ); + }, +); + +AskUserQuestionIntervention.displayName = 'CCAskUserQuestionIntervention'; + +export default AskUserQuestionIntervention; diff --git a/packages/builtin-tool-claude-code/src/client/Intervention/index.ts b/packages/builtin-tool-claude-code/src/client/Intervention/index.ts new file mode 100644 index 0000000000..23c64e3b89 --- /dev/null +++ b/packages/builtin-tool-claude-code/src/client/Intervention/index.ts @@ -0,0 +1,14 @@ +import type { BuiltinIntervention } from '@lobechat/types'; + +import { ClaudeCodeApiName } from '../../types'; +import AskUserQuestionIntervention from './AskUserQuestion'; + +/** + * Claude Code Intervention components. + * + * Currently only `askUserQuestion` (CC's clarifying-question MCP tool) needs + * one. Approval / file-picker etc. would slot in alongside. + */ +export const ClaudeCodeInterventions: Record = { + [ClaudeCodeApiName.AskUserQuestion]: AskUserQuestionIntervention as BuiltinIntervention, +}; diff --git a/packages/builtin-tool-claude-code/src/client/Render/AskUserQuestion/index.tsx b/packages/builtin-tool-claude-code/src/client/Render/AskUserQuestion/index.tsx new file mode 100644 index 0000000000..1234d86c73 --- /dev/null +++ b/packages/builtin-tool-claude-code/src/client/Render/AskUserQuestion/index.tsx @@ -0,0 +1,131 @@ +'use client'; + +import type { BuiltinRenderProps } from '@lobechat/types'; +import { Flexbox, Icon, Text } from '@lobehub/ui'; +import { createStaticStyles, cx } from 'antd-style'; +import { Check } from 'lucide-react'; +import { memo } from 'react'; + +import type { AskUserQuestionArgs, AskUserQuestionItem } from '../../../types'; + +/** Persisted draft + answer shape stored on `pluginState`. */ +interface AskUserQuestionState { + askUserAnswers?: Record; + askUserDraft?: Record; +} + +const styles = createStaticStyles(({ css, cssVar }) => ({ + answer: css` + color: ${cssVar.colorText}; + `, + answerRow: css` + padding-block: 6px; + padding-inline: 10px; + border-radius: 6px; + background: ${cssVar.colorBgContainer}; + `, + check: css` + flex-shrink: 0; + color: ${cssVar.colorPrimary}; + `, + container: css` + padding: 12px; + border-radius: ${cssVar.borderRadiusLG}; + background: ${cssVar.colorFillQuaternary}; + `, + header: css` + font-size: 12px; + color: ${cssVar.colorTextSecondary}; + `, + question: css` + font-weight: 500; + `, +})); + +interface QABlockProps { + answer?: string | string[]; + question: AskUserQuestionItem; +} + +/** + * One question/answer pair for the completed Render. The original question + * stays visible (header + body); the answer renders as one card per picked + * option (multi-select fans out into multiple rows). When `answer` is + * absent — older messages persisted before LOBE-8725 added structured + * storage — we show a `—` placeholder so the layout stays uniform. + */ +const QABlock = memo(({ question, answer }) => { + const labels: string[] = Array.isArray(answer) ? answer : answer ? [answer] : []; + const optionByLabel = new Map(question.options.map((o) => [o.label, o])); + + return ( + + {question.header && {question.header}} + {question.question} + {labels.length > 0 ? ( + + {labels.map((label) => { + const opt = optionByLabel.get(label); + return ( + + + + {label} + {opt?.description && opt.description !== label && ( + {opt.description} + )} + + + ); + })} + + ) : ( + + )} + + ); +}); + +QABlock.displayName = 'CCAskUserQuestionQABlock'; + +/** + * CC `askUserQuestion` Render — answered / aborted state only. + * + * The pending form lives on the canonical Intervention surface + * (`BuiltinToolInterventions['claude-code']['askUserQuestion']`) — the + * framework hides this Render while `pluginIntervention.status === 'pending'`, + * then yields to it once the user submits / skips and a `tool_result` arrives. + * + * Structured rendering reads `pluginState.askUserAnswers`, written by + * `setInterventionAnswers` in `conversationControl` at submit time. If the + * key is missing (older messages, or skipped/cancelled flows where there's + * nothing to show), we fall back to the question list with a status hint. + */ +const AskUserQuestion = memo< + BuiltinRenderProps +>(({ args, pluginError, pluginState }) => { + const questions = args?.questions ?? []; + const answers = pluginState?.askUserAnswers; + const isError = !!pluginError; + + return ( + + {questions.map((q, idx) => ( + + ))} + {isError && ( + (No answer received — model continued without their input.) + )} + + ); +}); + +AskUserQuestion.displayName = 'CCAskUserQuestion'; + +export default AskUserQuestion; diff --git a/packages/builtin-tool-claude-code/src/client/Render/index.ts b/packages/builtin-tool-claude-code/src/client/Render/index.ts index aa343d71ab..3781fb04e1 100644 --- a/packages/builtin-tool-claude-code/src/client/Render/index.ts +++ b/packages/builtin-tool-claude-code/src/client/Render/index.ts @@ -3,6 +3,7 @@ import type { RenderDisplayControl } from '@lobechat/types'; import { ClaudeCodeApiName } from '../../types'; import Agent from './Agent'; +import AskUserQuestion from './AskUserQuestion'; import Edit from './Edit'; import Glob from './Glob'; import Grep from './Grep'; @@ -19,6 +20,7 @@ import Write from './Write'; */ export const ClaudeCodeRenders = { [ClaudeCodeApiName.Agent]: Agent, + [ClaudeCodeApiName.AskUserQuestion]: AskUserQuestion, // RunCommand already renders `args.command` + combined output the way CC emits — // use the shared component directly instead of wrapping it in a re-export file. [ClaudeCodeApiName.Bash]: RunCommandRender, diff --git a/packages/builtin-tool-claude-code/src/client/index.ts b/packages/builtin-tool-claude-code/src/client/index.ts index 76bc5678b3..8b7566ff43 100644 --- a/packages/builtin-tool-claude-code/src/client/index.ts +++ b/packages/builtin-tool-claude-code/src/client/index.ts @@ -1,5 +1,6 @@ export { ClaudeCodeApiName, ClaudeCodeIdentifier } from '../types'; export { ClaudeCodeInspectors } from './Inspector'; +export { ClaudeCodeInterventions } from './Intervention'; export { ClaudeCodeRenderDisplayControls, ClaudeCodeRenders } from './Render'; export { ClaudeCodeStreamings } from './Streaming'; export { CC_SUBAGENT_TYPES, type CCSubagentTypeInfo, resolveCCSubagentType } from './subagentTypes'; diff --git a/packages/builtin-tool-claude-code/src/index.ts b/packages/builtin-tool-claude-code/src/index.ts index 6c44fc419a..561026f32d 100644 --- a/packages/builtin-tool-claude-code/src/index.ts +++ b/packages/builtin-tool-claude-code/src/index.ts @@ -1,5 +1,8 @@ export { type AgentArgs, + type AskUserQuestionArgs, + type AskUserQuestionItem, + type AskUserQuestionOption, ClaudeCodeApiName, ClaudeCodeIdentifier, type ClaudeCodeTodoItem, diff --git a/packages/builtin-tool-claude-code/src/types.ts b/packages/builtin-tool-claude-code/src/types.ts index 3ab2fdd7e4..4c34660690 100644 --- a/packages/builtin-tool-claude-code/src/types.ts +++ b/packages/builtin-tool-claude-code/src/types.ts @@ -23,6 +23,13 @@ export enum ClaudeCodeApiName { * different concept. */ Agent = 'Agent', + /** + * Synthetic apiName the adapter rewrites the local + * `mcp__lobe_cc__ask_user_question` MCP tool to. Routes the dedicated + * intervention UI for CC's clarifying-question flow (LOBE-8725); not + * something CC's CLI emits directly. + */ + AskUserQuestion = 'askUserQuestion', Bash = 'Bash', Edit = 'Edit', Glob = 'Glob', @@ -117,3 +124,31 @@ export interface TaskStopArgs { shell_id?: string; task_id?: string; } + +/** + * One option on an AskUserQuestion question — `label` is what the user picks, + * `description` is the supporting text shown alongside. + */ +export interface AskUserQuestionOption { + description: string; + label: string; +} + +/** + * One question in an `AskUserQuestion` invocation — header is short (≤12 + * chars per CC's contract), `options` is 2-4 entries, `multiSelect` is opt-in. + */ +export interface AskUserQuestionItem { + header: string; + multiSelect?: boolean; + options: AskUserQuestionOption[]; + question: string; +} + +/** + * `AskUserQuestion` tool arguments — mirrors CC's own schema verbatim so the + * model's existing prompts work unchanged. 1-4 questions per call. + */ +export interface AskUserQuestionArgs { + questions: AskUserQuestionItem[]; +} diff --git a/packages/builtin-tools/src/interventions.ts b/packages/builtin-tools/src/interventions.ts index 1ccdd711cf..6d96d793d9 100644 --- a/packages/builtin-tools/src/interventions.ts +++ b/packages/builtin-tools/src/interventions.ts @@ -2,6 +2,10 @@ import { AgentBuilderInterventions, AgentBuilderManifest, } from '@lobechat/builtin-tool-agent-builder/client'; +import { + ClaudeCodeIdentifier, + ClaudeCodeInterventions, +} from '@lobechat/builtin-tool-claude-code/client'; import { CloudSandboxManifest } from '@lobechat/builtin-tool-cloud-sandbox'; import { CloudSandboxInterventions } from '@lobechat/builtin-tool-cloud-sandbox/client'; import { @@ -37,6 +41,7 @@ import { type BuiltinIntervention } from '@lobechat/types'; export const BuiltinToolInterventions: Record> = { [AgentBuilderManifest.identifier]: AgentBuilderInterventions, [AgentMarketplaceManifest.identifier]: AgentMarketplaceInterventions, + [ClaudeCodeIdentifier]: ClaudeCodeInterventions, [CloudSandboxManifest.identifier]: CloudSandboxInterventions, [GroupManagementManifest.identifier]: GroupManagementInterventions, [GTDManifest.identifier]: GTDInterventions, diff --git a/packages/heterogeneous-agents/package.json b/packages/heterogeneous-agents/package.json index 589dd85835..4a9f6a65b9 100644 --- a/packages/heterogeneous-agents/package.json +++ b/packages/heterogeneous-agents/package.json @@ -4,6 +4,7 @@ "private": true, "exports": { ".": "./src/index.ts", + "./askUser": "./src/askUser/index.ts", "./client": "./src/client/index.ts", "./spawn": "./src/spawn/index.ts" }, @@ -15,7 +16,9 @@ "dependencies": { "@lobechat/agent-gateway-client": "workspace:*", "@lobehub/icons": "^5.4.0", - "diff": "^8.0.4" + "@modelcontextprotocol/sdk": "^1.26.0", + "diff": "^8.0.4", + "zod": "^3.25.76" }, "devDependencies": { "@lobechat/types": "workspace:*" diff --git a/packages/heterogeneous-agents/src/adapters/claudeCode.test.ts b/packages/heterogeneous-agents/src/adapters/claudeCode.test.ts index b2475e5b4a..b81141c803 100644 --- a/packages/heterogeneous-agents/src/adapters/claudeCode.test.ts +++ b/packages/heterogeneous-agents/src/adapters/claudeCode.test.ts @@ -158,6 +158,53 @@ describe('ClaudeCodeAdapter', () => { const toolStart = events.find((e) => e.type === 'tool_start'); expect(toolStart).toBeDefined(); }); + + it('rewrites mcp__lobe_cc__ask_user_question to apiName=askUserQuestion', () => { + const adapter = new ClaudeCodeAdapter(); + adapter.adapt({ subtype: 'init', type: 'system' }); + + const askInput = { + questions: [ + { + header: 'Color', + options: [ + { description: 'Red', label: 'Red' }, + { description: 'Blue', label: 'Blue' }, + ], + question: 'Pick a color?', + }, + ], + }; + + const events = adapter.adapt({ + message: { + id: 'msg_1', + content: [ + { + id: 'tu_aq_1', + input: askInput, + name: 'mcp__lobe_cc__ask_user_question', + type: 'tool_use', + }, + ], + }, + type: 'assistant', + }); + + const chunk = events.find( + (e) => e.type === 'stream_chunk' && e.data.chunkType === 'tools_calling', + ); + expect(chunk!.data.toolsCalling).toEqual([ + { + // Wire-prefixed name is rewritten to the stable domain key. + apiName: 'askUserQuestion', + arguments: JSON.stringify(askInput), + id: 'tu_aq_1', + identifier: 'claude-code', + type: 'default', + }, + ]); + }); }); describe('tool_result in user events', () => { diff --git a/packages/heterogeneous-agents/src/adapters/claudeCode.ts b/packages/heterogeneous-agents/src/adapters/claudeCode.ts index 2f9aae7b81..4437436d4f 100644 --- a/packages/heterogeneous-agents/src/adapters/claudeCode.ts +++ b/packages/heterogeneous-agents/src/adapters/claudeCode.ts @@ -61,6 +61,22 @@ import type { */ const CC_TODO_WRITE_TOOL_NAME = 'TodoWrite'; +/** + * Tool name CC sees for the LobeHub-hosted MCP `ask_user_question` server. + * Source of truth lives in `../askUser/constants.ts`; replicated here as a + * literal so the adapter compiles in browser bundles without dragging in + * any of the askUser package's runtime (node:http, MCP SDK, etc.) by + * accident. Keep in sync. + */ +const ASK_USER_MCP_TOOL_NAME = 'mcp__lobe_cc__ask_user_question'; + +/** + * apiName the adapter rewrites the MCP tool to so the renderer routes on + * a stable key, not the wire-prefixed MCP name. Source of truth same as + * above. + */ +const ASK_USER_API_NAME = 'askUserQuestion'; + /** Status of a single todo item in CC's `TodoWrite` tool_use. */ type ClaudeCodeTodoStatus = 'pending' | 'in_progress' | 'completed'; @@ -414,8 +430,13 @@ export class ClaudeCodeAdapter implements AgentEventAdapter { break; } case 'tool_use': { + // Rewrite our local MCP `ask_user_question` tool to a stable + // apiName so the renderer routes on `askUserQuestion` (clean, + // domain-named) instead of the wire-prefixed MCP form. Identifier + // stays `claude-code` because this remains a CC-side tool. + const apiName = block.name === ASK_USER_MCP_TOOL_NAME ? ASK_USER_API_NAME : block.name; newToolCalls.push({ - apiName: block.name, + apiName, arguments: JSON.stringify(block.input || {}), id: block.id, identifier: 'claude-code', @@ -507,8 +528,13 @@ export class ClaudeCodeAdapter implements AgentEventAdapter { break; } case 'tool_use': { + // Rewrite our local MCP `ask_user_question` tool to a stable + // apiName so the renderer routes on `askUserQuestion` (clean, + // domain-named) instead of the wire-prefixed MCP form. Identifier + // stays `claude-code` because this remains a CC-side tool. + const apiName = block.name === ASK_USER_MCP_TOOL_NAME ? ASK_USER_API_NAME : block.name; newToolCalls.push({ - apiName: block.name, + apiName, arguments: JSON.stringify(block.input || {}), id: block.id, identifier: 'claude-code', diff --git a/packages/heterogeneous-agents/src/askUser/AskUserBridge.test.ts b/packages/heterogeneous-agents/src/askUser/AskUserBridge.test.ts new file mode 100644 index 0000000000..ee29220bee --- /dev/null +++ b/packages/heterogeneous-agents/src/askUser/AskUserBridge.test.ts @@ -0,0 +1,255 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'; + +import { AskUserBridge } from './AskUserBridge'; + +describe('AskUserBridge', () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + describe('pending() → resolve()', () => { + it('emits an agent_intervention_request and resolves with the user-supplied result', async () => { + const bridge = new AskUserBridge('op-1'); + const events: any[] = []; + const iter = bridge.events()[Symbol.asyncIterator](); + + // Pump events asynchronously into the array. + const pumped = (async () => { + const e = await iter.next(); + if (!e.done) events.push(e.value); + })(); + + const pending = bridge.pending({ arguments: { questions: [{ q: 'foo' }] } }); + await pumped; + + expect(events).toHaveLength(1); + const req = events[0]; + expect(req.type).toBe('agent_intervention_request'); + expect(req.operationId).toBe('op-1'); + expect(req.data.identifier).toBe('claude-code'); + expect(req.data.apiName).toBe('askUserQuestion'); + expect(req.data.toolCallId).toMatch(/^[\da-f-]{36}$/); + expect(JSON.parse(req.data.arguments)).toEqual({ questions: [{ q: 'foo' }] }); + + bridge.resolve(req.data.toolCallId, { result: { foo: 'bar' } }); + await expect(pending).resolves.toEqual({ result: { foo: 'bar' } }); + }); + + it('uses caller-supplied toolCallId as the wire correlation key', async () => { + const bridge = new AskUserBridge('op-1'); + const drain = drainEvents(bridge); + const pending = bridge.pending({ + arguments: { questions: [] }, + toolCallId: 'cc-tool-use-abc', + }); + const event = await drain.firstEvent; + expect(event.data.toolCallId).toBe('cc-tool-use-abc'); + + bridge.resolve('cc-tool-use-abc', { result: { picked: 'red' } }); + await expect(pending).resolves.toEqual({ result: { picked: 'red' } }); + drain.stop(); + }); + + it('rejects pending() when the same toolCallId is already in flight', async () => { + const bridge = new AskUserBridge('op-1'); + const drain = drainEvents(bridge); + void bridge.pending({ arguments: {}, toolCallId: 'dup' }); + await expect(bridge.pending({ arguments: {}, toolCallId: 'dup' })).rejects.toThrow( + /duplicate toolCallId/, + ); + bridge.cancelAll(); + drain.stop(); + }); + + it('ignores resolve() for unknown toolCallId', async () => { + const bridge = new AskUserBridge('op-1'); + // Drain emitted events into the void so pending() can run. + const drain = drainEvents(bridge); + const pending = bridge.pending({ arguments: {} }); + + bridge.resolve('not-a-real-id', { result: 'x' }); + + // Promise should still be unresolved — fast-forward past timeout to confirm. + vi.advanceTimersByTime(5 * 60 * 1000 + 1); + const answer = await pending; + expect(answer.cancelled).toBe(true); + expect(answer.cancelReason).toBe('timeout'); + drain.stop(); + }); + }); + + describe('cancellation paths', () => { + it('resolves with cancelled=true on user_cancelled via cancel()', async () => { + const bridge = new AskUserBridge('op-1'); + const drain = drainEvents(bridge); + const pending = bridge.pending({ arguments: {} }); + const toolCallId = (await drain.firstEvent).data.toolCallId; + + bridge.cancel(toolCallId); + + const answer = await pending; + expect(answer).toEqual({ cancelReason: 'user_cancelled', cancelled: true }); + drain.stop(); + }); + + it('resolves with cancelled=true on session_ended via cancelAll()', async () => { + const bridge = new AskUserBridge('op-1'); + const drain = drainEvents(bridge); + const pending = bridge.pending({ arguments: {} }); + await drain.firstEvent; + + bridge.cancelAll('session_ended'); + + const answer = await pending; + expect(answer).toEqual({ cancelReason: 'session_ended', cancelled: true }); + drain.stop(); + }); + + it('resolves with cancelled=true on timeout (default 5 min)', async () => { + const bridge = new AskUserBridge('op-1'); + const drain = drainEvents(bridge); + const pending = bridge.pending({ arguments: {} }); + + vi.advanceTimersByTime(5 * 60 * 1000 - 1); + const stillPending = await Promise.race([pending, Promise.resolve('not-yet')]); + expect(stillPending).toBe('not-yet'); + + vi.advanceTimersByTime(2); + const answer = await pending; + expect(answer).toEqual({ cancelReason: 'timeout', cancelled: true }); + drain.stop(); + }); + + it('honors a custom timeoutMs', async () => { + const bridge = new AskUserBridge('op-1'); + const drain = drainEvents(bridge); + const pending = bridge.pending({ arguments: {} }, { timeoutMs: 1000 }); + + vi.advanceTimersByTime(1001); + const answer = await pending; + expect(answer.cancelled).toBe(true); + drain.stop(); + }); + + it('cancelAll() rejects future pending() calls', async () => { + const bridge = new AskUserBridge('op-1'); + bridge.cancelAll(); + await expect(bridge.pending({ arguments: {} })).rejects.toThrow(/closed/); + }); + }); + + describe('progress notifications (keepalive)', () => { + it('calls onProgress at the configured interval until resolved', async () => { + const bridge = new AskUserBridge('op-1'); + const drain = drainEvents(bridge); + const onProgress = vi.fn(); + const pending = bridge.pending( + { arguments: {} }, + { onProgress, progressIntervalMs: 100, timeoutMs: 1000 }, + ); + const toolCallId = (await drain.firstEvent).data.toolCallId; + + vi.advanceTimersByTime(350); + // Three ticks elapsed (100, 200, 300) — Node's setInterval fires + // exactly at multiples, so 3 calls with monotonically increasing + // elapsed values. + expect(onProgress).toHaveBeenCalledTimes(3); + expect(onProgress.mock.calls[0][0]).toBeGreaterThanOrEqual(100); + expect(onProgress.mock.calls[2][0]).toBeGreaterThanOrEqual(300); + expect(onProgress.mock.calls[0][1]).toBe(1000); + + bridge.resolve(toolCallId, { result: 'done' }); + await pending; + + // After resolve, no further ticks even after more time passes. + vi.advanceTimersByTime(500); + expect(onProgress).toHaveBeenCalledTimes(3); + drain.stop(); + }); + + it('skips onProgress entirely when not provided', async () => { + const bridge = new AskUserBridge('op-1'); + const drain = drainEvents(bridge); + // No onProgress — just verify timeout still works without it. + const pending = bridge.pending({ arguments: {} }, { timeoutMs: 50 }); + vi.advanceTimersByTime(60); + await expect(pending).resolves.toMatchObject({ cancelled: true }); + drain.stop(); + }); + }); + + describe('event stream', () => { + it('emits one request event per pending() call', async () => { + const bridge = new AskUserBridge('op-1'); + const drain = drainEvents(bridge); + + bridge.pending({ arguments: { q: 1 } }); + bridge.pending({ arguments: { q: 2 } }); + bridge.pending({ arguments: { q: 3 } }); + + // Advance timers so the resolved-via-cancellation cleanup paths don't + // interfere with the assertion below. + const events = await Promise.all([ + drain.firstEvent, + drain.events.next(), + drain.events.next(), + ]); + // first 3 events are all intervention requests + const types = events.map((e: any) => e.value?.type ?? e.type); + expect(types.every((t) => t === 'agent_intervention_request')).toBe(true); + + bridge.cancelAll(); + drain.stop(); + }); + + it('stamps stepIndex via the configured getter', async () => { + let step = 7; + const bridge = new AskUserBridge('op-1', { getStepIndex: () => step }); + const drain = drainEvents(bridge); + + bridge.pending({ arguments: {} }); + const e1 = await drain.firstEvent; + expect(e1.stepIndex).toBe(7); + + step = 11; + bridge.pending({ arguments: {} }); + const e2 = await drain.events.next(); + expect((e2.value as any).stepIndex).toBe(11); + + bridge.cancelAll(); + }); + + it('iterator ends after cancelAll()', async () => { + const bridge = new AskUserBridge('op-1'); + const iter = bridge.events()[Symbol.asyncIterator](); + const next = iter.next(); + bridge.cancelAll(); + await expect(next).resolves.toMatchObject({ done: true }); + }); + }); +}); + +// Helper: drain events from a bridge in the background, exposing the first +// emitted event as a promise and a way to call iter.next() on demand. +const drainEvents = (bridge: AskUserBridge) => { + const events = bridge.events()[Symbol.asyncIterator](); + const firstEvent = events.next().then((r) => { + if (r.done) throw new Error('stream ended before first event'); + return r.value as any; + }); + let stopped = false; + return { + events, + firstEvent, + stop: () => { + stopped = true; + }, + get stopped() { + return stopped; + }, + }; +}; diff --git a/packages/heterogeneous-agents/src/askUser/AskUserBridge.ts b/packages/heterogeneous-agents/src/askUser/AskUserBridge.ts new file mode 100644 index 0000000000..5d2dc795fb --- /dev/null +++ b/packages/heterogeneous-agents/src/askUser/AskUserBridge.ts @@ -0,0 +1,272 @@ +import { randomUUID } from 'node:crypto'; + +import type { + AgentInterventionRequestData, + AgentStreamEvent, +} from '@lobechat/agent-gateway-client'; + +/** + * What the MCP handler gets back from `bridge.pending()`. + * + * `result` carries the user's structured answer when they submit; `cancelled` + * with a reason when the deadline elapses, the user cancels, or the producer + * tears the session down. Mutually exclusive — exactly one of `result` / + * `cancelled` is set. + */ +export interface InterventionAnswer { + cancelled?: boolean; + cancelReason?: 'timeout' | 'user_cancelled' | 'session_ended'; + result?: unknown; +} + +export interface PendingArgs { + /** Whatever the MCP tool's input schema accepted (e.g. `{ questions: [...] }`). */ + arguments: unknown; + /** + * Wire correlation key for this intervention. Used as the `toolCallId` + * on outbound `agent_intervention_request` events and looked up by + * `resolve()` / `cancel()` when the user submits an answer. + * + * For CC, the producer should pass `extra._meta['claudecode/toolUseId']` + * here so it equals the existing tool message id on the renderer side + * (the assistant `tool_use` for `mcp__lobe_cc__ask_user_question` and + * the intervention request both reference the same tool bubble). + * + * If omitted, the bridge synthesizes a random UUID — fine for + * stand-alone tests, but the renderer won't be able to correlate. + */ + toolCallId?: string; +} + +export interface PendingOptions { + /** + * Called every `progressIntervalMs` while the call is pending. Use it to + * push MCP `notifications/progress` to keep the SSE channel from timing + * out (CC's HTTP transport drops at ~5min without keepalive). + * + * `elapsedMs` is the wall-clock millis since `pending()` was called. + */ + onProgress?: (elapsedMs: number, totalMs: number) => void | Promise; + /** How often to call `onProgress`. Default: 30 000 (30s). */ + progressIntervalMs?: number; + /** + * Absolute deadline (`Date.now() + timeoutMs`). When it elapses, the + * pending promise resolves with `{ cancelled: true, cancelReason: 'timeout' }`. + * Default: 5 minutes. + */ + timeoutMs?: number; +} + +interface PendingEntry { + cleanup: () => void; + reject: (err: unknown) => void; + resolve: (answer: InterventionAnswer) => void; +} + +interface BridgeOptions { + /** + * Stamps `stepIndex` on emitted events. The bridge has no visibility into + * the CC adapter's own step counter, so the producer (which owns the + * merged stream) provides it. Defaults to a constant `0` — fine for unit + * tests, but real producers should plug in their adapter's current value. + */ + getStepIndex?: () => number; +} + +/** + * Per-operation channel between an MCP tool handler (which awaits the user) + * and the producer's outbound stream (which carries the request to UI and + * receives the user's answer back). + * + * Lifecycle: + * 1. Producer constructs a bridge for an `operationId`. + * 2. The MCP handler calls `pending(args, opts)` — gets a Promise. + * 3. Bridge synthesizes a `toolCallId`, emits an + * `agent_intervention_request` AgentStreamEvent on `events()` for the + * producer to forward. + * 4. Producer eventually calls `resolve(toolCallId, payload)` (from the + * consumer's `agent_intervention_response`) — Promise resolves. + * 5. Or: deadline / `cancelAll()` rejects the pending Promise with a + * `{ cancelled: true, cancelReason }` answer (no exception thrown). + * + * Errors only surface from `pending()` if the bridge itself is misused + * (e.g. emitting after `cancelAll`). Cancellation/timeout is normal flow, + * not an exception. + */ +export class AskUserBridge { + private readonly pending_ = new Map(); + private readonly outboundQueue: AgentStreamEvent[] = []; + private readonly outboundWaiters: Array<(value: IteratorResult) => void> = []; + private readonly getStepIndex: () => number; + private closed = false; + + constructor( + public readonly operationId: string, + options: BridgeOptions = {}, + ) { + this.getStepIndex = options.getStepIndex ?? (() => 0); + } + + /** Currently-blocked MCP handler count. Useful for telemetry / shutdown gates. */ + get pendingCount(): number { + return this.pending_.size; + } + + /** + * Block the caller until the consumer answers (or the deadline / cancel + * fires). Always resolves; never throws unless the bridge is already + * closed (programming error). + */ + pending(args: PendingArgs, options: PendingOptions = {}): Promise { + if (this.closed) { + return Promise.reject(new Error('AskUserBridge is closed; cannot accept new pending calls')); + } + + const toolCallId = args.toolCallId ?? randomUUID(); + if (this.pending_.has(toolCallId)) { + // Two pendings on the same key would clobber each other. Caller bug; + // surface it loudly rather than silently lose one resolve. + return Promise.reject( + new Error(`AskUserBridge: duplicate toolCallId in flight: ${toolCallId}`), + ); + } + const timeoutMs = options.timeoutMs ?? 5 * 60 * 1000; + const progressIntervalMs = options.progressIntervalMs ?? 30_000; + const startedAt = Date.now(); + const deadline = startedAt + timeoutMs; + + return new Promise((resolve, reject) => { + const timeoutTimer = setTimeout(() => { + this.pending_.delete(toolCallId); + clearInterval(progressTimer); + resolve({ cancelled: true, cancelReason: 'timeout' }); + }, timeoutMs); + + const progressTimer: ReturnType | undefined = options.onProgress + ? setInterval(() => { + const elapsed = Date.now() - startedAt; + // Fire-and-forget; consumer-side errors are logged by caller. + void Promise.resolve(options.onProgress!(elapsed, timeoutMs)).catch(() => {}); + }, progressIntervalMs) + : undefined; + + const cleanup = () => { + clearTimeout(timeoutTimer); + if (progressTimer) clearInterval(progressTimer); + }; + + this.pending_.set(toolCallId, { cleanup, reject, resolve }); + + // Emit the intervention request AFTER setting up the pending entry, + // so any synchronous resolve from a test fixture finds the slot. + // Hardcoded to AskUserQuestion for now — the only intervention shape + // we support. Take an explicit `apiName` in PendingArgs when adding + // more (e.g. CC approval, file picker). + const data: AgentInterventionRequestData = { + apiName: 'askUserQuestion', + arguments: JSON.stringify(args.arguments ?? {}), + deadline, + identifier: 'claude-code', + toolCallId, + }; + this.emit({ + data, + operationId: this.operationId, + stepIndex: this.getStepIndex(), + timestamp: startedAt, + type: 'agent_intervention_request', + }); + }); + } + + /** + * Producer-side: called when an `agent_intervention_response` arrives + * from the consumer. No-op if the toolCallId is unknown (already + * timed out, cancelled, or a stale duplicate). + */ + resolve( + toolCallId: string, + payload: { + cancelled?: boolean; + cancelReason?: InterventionAnswer['cancelReason']; + result?: unknown; + }, + ): void { + const entry = this.pending_.get(toolCallId); + if (!entry) return; + this.pending_.delete(toolCallId); + entry.cleanup(); + entry.resolve( + payload.cancelled + ? { cancelReason: payload.cancelReason ?? 'user_cancelled', cancelled: true } + : { result: payload.result }, + ); + } + + /** + * Cancel a single pending call. Used when the consumer explicitly aborts + * one intervention without ending the whole op. + */ + cancel(toolCallId: string, reason: InterventionAnswer['cancelReason'] = 'user_cancelled'): void { + this.resolve(toolCallId, { cancelReason: reason, cancelled: true }); + } + + /** + * Tear down the bridge. Every pending handler is resolved with + * `cancelled: true, reason='session_ended'` (so its MCP tool returns + * cleanly to CC), the outbound event stream closes, and subsequent + * `pending()` calls reject. + */ + cancelAll(reason: InterventionAnswer['cancelReason'] = 'session_ended'): void { + if (this.closed) return; + this.closed = true; + for (const entry of this.pending_.values()) { + entry.cleanup(); + entry.resolve({ cancelReason: reason, cancelled: true }); + } + this.pending_.clear(); + // Drain any waiters with a "done" so consumers can break their loop. + while (this.outboundWaiters.length > 0) { + const waiter = this.outboundWaiters.shift()!; + waiter({ done: true, value: undefined as any }); + } + } + + /** + * Async iterable over outbound events the producer should forward to the + * consumer. One iterator per bridge — multi-consumer fan-out is the + * producer's job. Iterator ends after `cancelAll()`. + */ + events(): AsyncIterable { + return { + [Symbol.asyncIterator]: () => this.makeIterator(), + }; + } + + private makeIterator(): AsyncIterator { + return { + next: () => { + const buffered = this.outboundQueue.shift(); + if (buffered) { + return Promise.resolve({ done: false, value: buffered }); + } + if (this.closed) { + return Promise.resolve({ done: true, value: undefined as any }); + } + return new Promise((resolveWaiter) => { + this.outboundWaiters.push(resolveWaiter); + }); + }, + return: () => Promise.resolve({ done: true, value: undefined as any }), + }; + } + + private emit(event: AgentStreamEvent): void { + const waiter = this.outboundWaiters.shift(); + if (waiter) { + waiter({ done: false, value: event }); + } else { + this.outboundQueue.push(event); + } + } +} diff --git a/packages/heterogeneous-agents/src/askUser/AskUserMcpServer.test.ts b/packages/heterogeneous-agents/src/askUser/AskUserMcpServer.test.ts new file mode 100644 index 0000000000..20ae0c2f06 --- /dev/null +++ b/packages/heterogeneous-agents/src/askUser/AskUserMcpServer.test.ts @@ -0,0 +1,257 @@ +import { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js'; +import { afterEach, describe, expect, it } from 'vitest'; + +import { AskUserBridge } from './AskUserBridge'; +import { AskUserMcpServer } from './AskUserMcpServer'; +import { ASK_USER_MCP_SERVER_NAME, ASK_USER_TOOL_FULL_NAME, ASK_USER_TOOL_NAME } from './constants'; + +let server: AskUserMcpServer; + +afterEach(async () => { + await server?.stop(); +}); + +describe('AskUserMcpServer', () => { + describe('lifecycle', () => { + it('starts on port=0 (auto-assigned), exposes a localhost URL, stops cleanly', async () => { + server = new AskUserMcpServer(); + const { port, url } = await server.start(); + expect(port).toBeGreaterThan(0); + expect(url).toMatch(/^http:\/\/127\.0\.0\.1:\d+\/mcp$/); + + await server.stop(); + // After stop, calling .url throws — server is no longer listening. + expect(() => server.url).toThrow(); + }); + + it('start() is idempotent', async () => { + server = new AskUserMcpServer(); + const a = await server.start(); + const b = await server.start(); + expect(a.port).toBe(b.port); + }); + }); + + describe('per-op routing', () => { + it('hasOperation / operationCount track register / unregister', async () => { + server = new AskUserMcpServer(); + await server.start(); + + expect(server.operationCount).toBe(0); + const bridge1 = server.registerOperation('op-1'); + const bridge2 = server.registerOperation('op-2'); + expect(server.operationCount).toBe(2); + expect(server.hasOperation('op-1')).toBe(true); + expect(server.hasOperation('op-3')).toBe(false); + expect(bridge1).toBeInstanceOf(AskUserBridge); + expect(bridge2).toBeInstanceOf(AskUserBridge); + expect(bridge1).not.toBe(bridge2); + + server.unregisterOperation('op-1'); + expect(server.operationCount).toBe(1); + expect(server.hasOperation('op-1')).toBe(false); + }); + + it('rejects double-registering the same op id', async () => { + server = new AskUserMcpServer(); + await server.start(); + server.registerOperation('op-1'); + expect(() => server.registerOperation('op-1')).toThrow(/already registered/); + }); + + it('urlForOperation appends ?op= to the base url', async () => { + server = new AskUserMcpServer(); + await server.start(); + server.registerOperation('op-7'); + const url = server.urlForOperation('op-7'); + expect(url).toContain('/mcp'); + expect(new URL(url).searchParams.get('op')).toBe('op-7'); + }); + + it('unregisterOperation cancels pending bridges', async () => { + server = new AskUserMcpServer(); + await server.start(); + const bridge = server.registerOperation('op-1'); + + const pending = bridge.pending({ arguments: { questions: [{ q: 'foo' }] } }); + // Drain the request event so the iterator's queue doesn't deadlock. + void bridge.events()[Symbol.asyncIterator]().next(); + + server.unregisterOperation('op-1'); + const answer = await pending; + expect(answer).toEqual({ cancelReason: 'session_ended', cancelled: true }); + }); + + it('publishes the canonical mcp__lobe_cc__ask_user_question tool', async () => { + server = new AskUserMcpServer(); + await server.start(); + server.registerOperation('probe-op'); + + const client = await connectClient(server.urlForOperation('probe-op')); + try { + const list = await client.listTools(); + expect(list.tools).toHaveLength(1); + expect(list.tools[0].name).toBe(ASK_USER_TOOL_NAME); + expect(ASK_USER_TOOL_FULL_NAME).toBe( + `mcp__${ASK_USER_MCP_SERVER_NAME}__${ASK_USER_TOOL_NAME}`, + ); + } finally { + await client.close(); + } + }); + }); + + describe('end-to-end tool call', () => { + it('routes a tools/call to the right per-op bridge and returns the user answer', async () => { + server = new AskUserMcpServer({ pendingTimeoutMs: 30_000, progressIntervalMs: 1000 }); + await server.start(); + const bridge = server.registerOperation('op-A'); + + // Producer-side: when an intervention_request shows up on bridge.events, + // resolve it with a fake user answer. + let interventionRequestSeen: any; + const producerLoop = (async () => { + for await (const e of bridge.events()) { + if (e.type === 'agent_intervention_request') { + interventionRequestSeen = e; + bridge.resolve(e.data.toolCallId, { + result: { 'What color do you want?': 'Red' }, + }); + break; + } + } + })(); + + // Client-side: behave like CC — initialize, list, call. + const client = await connectClient(server.urlForOperation('op-A')); + try { + const list = await client.listTools(); + expect(list.tools[0].name).toBe(ASK_USER_TOOL_NAME); + + const callResult = (await client.callTool({ + arguments: { + questions: [ + { + header: 'Color', + options: [ + { description: 'Red color', label: 'Red' }, + { description: 'Blue color', label: 'Blue' }, + ], + question: 'What color do you want?', + }, + ], + }, + name: ASK_USER_TOOL_NAME, + })) as { content: Array<{ text: string; type: string }>; isError?: boolean }; + + expect(callResult.isError).toBeFalsy(); + expect(callResult.content[0].text).toContain('User answers'); + expect(callResult.content[0].text).toContain('What color do you want?: Red'); + } finally { + await client.close(); + } + + await producerLoop; + expect(interventionRequestSeen).toBeDefined(); + expect(interventionRequestSeen.operationId).toBe('op-A'); + expect(interventionRequestSeen.data.identifier).toBe('claude-code'); + expect(interventionRequestSeen.data.apiName).toBe('askUserQuestion'); + }); + + it('returns an isError tool result when the user cancels', async () => { + server = new AskUserMcpServer({ pendingTimeoutMs: 30_000, progressIntervalMs: 1000 }); + await server.start(); + const bridge = server.registerOperation('op-cancel'); + + const producerLoop = (async () => { + for await (const e of bridge.events()) { + if (e.type === 'agent_intervention_request') { + bridge.resolve(e.data.toolCallId, { cancelled: true, cancelReason: 'user_cancelled' }); + break; + } + } + })(); + + const client = await connectClient(server.urlForOperation('op-cancel')); + try { + const callResult = (await client.callTool({ + arguments: { + questions: [ + { + header: 'X', + options: [ + { description: 'a', label: 'A' }, + { description: 'b', label: 'B' }, + ], + question: 'pick', + }, + ], + }, + name: ASK_USER_TOOL_NAME, + })) as { content: Array<{ text: string; type: string }>; isError?: boolean }; + + expect(callResult.isError).toBe(true); + expect(callResult.content[0].text.toLowerCase()).toContain('cancel'); + } finally { + await client.close(); + } + await producerLoop; + }); + + /** + * Regression: a single shared `StreamableHTTPServerTransport` rejects the + * second `initialize` with `Server already initialized`, breaking every + * op after the first. Ensures we mint one transport+McpServer per session. + */ + it('handles sequential ops on independent sessions', async () => { + server = new AskUserMcpServer({ pendingTimeoutMs: 30_000, progressIntervalMs: 1000 }); + await server.start(); + + for (const opId of ['op-seq-1', 'op-seq-2', 'op-seq-3']) { + const bridge = server.registerOperation(opId); + const producerLoop = (async () => { + for await (const e of bridge.events()) { + if (e.type === 'agent_intervention_request') { + bridge.resolve(e.data.toolCallId, { result: { pick: 'A' } }); + break; + } + } + })(); + + const client = await connectClient(server.urlForOperation(opId)); + try { + const callResult = (await client.callTool({ + arguments: { + questions: [ + { + header: opId, + options: [ + { description: 'a', label: 'A' }, + { description: 'b', label: 'B' }, + ], + question: 'pick', + }, + ], + }, + name: ASK_USER_TOOL_NAME, + })) as { content: Array<{ text: string }>; isError?: boolean }; + + expect(callResult.isError).toBeFalsy(); + expect(callResult.content[0].text).toContain('pick: A'); + } finally { + await client.close(); + } + await producerLoop; + server.unregisterOperation(opId); + } + }); + }); +}); + +const connectClient = async (url: string): Promise => { + const transport = new StreamableHTTPClientTransport(new URL(url)); + const client = new Client({ name: 'unit-test', version: '0' }, { capabilities: {} }); + await client.connect(transport); + return client; +}; diff --git a/packages/heterogeneous-agents/src/askUser/AskUserMcpServer.ts b/packages/heterogeneous-agents/src/askUser/AskUserMcpServer.ts new file mode 100644 index 0000000000..eb0dec35a5 --- /dev/null +++ b/packages/heterogeneous-agents/src/askUser/AskUserMcpServer.ts @@ -0,0 +1,448 @@ +import { AsyncLocalStorage } from 'node:async_hooks'; +import { randomUUID } from 'node:crypto'; +import http from 'node:http'; +import type { AddressInfo } from 'node:net'; + +import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js'; +import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js'; +import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js'; +import { z } from 'zod'; + +import type { InterventionAnswer } from './AskUserBridge'; +import { AskUserBridge } from './AskUserBridge'; +import { ASK_USER_MCP_SERVER_NAME, ASK_USER_TOOL_NAME } from './constants'; + +/** + * Mirrors CC's built-in `AskUserQuestion` schema. CC's schema: + * - 1-4 questions + * - each: header (≤12 chars), question, options (2-4), multiSelect? + * - each option: label, description + * We replicate it so the model can call our tool with the exact shape CC + * trained on. + */ +const askUserOptionShape = z.object({ + description: z.string(), + label: z.string(), +}); + +const askUserQuestionShape = z.object({ + header: z.string(), + multiSelect: z.boolean().optional(), + options: z.array(askUserOptionShape).min(2).max(4), + question: z.string(), +}); + +const askUserInputShape = { + questions: z.array(askUserQuestionShape).min(1).max(4), +}; + +/** Tool description seen by the model. Kept terse — full schema lives in `inputSchema`. */ +const ASK_USER_TOOL_DESCRIPTION = + 'Ask the user one or more clarifying questions with multiple-choice options. ' + + "Use this whenever the user's intent is ambiguous and you need them to pick."; + +export interface StartedServer { + /** Effective listen port (auto-assigned when constructed with port=0). */ + port: number; + /** Base URL the producer hands to CC via `--mcp-config`. */ + url: string; +} + +export interface AskUserMcpServerOptions { + /** + * Per-call timeout passed to `bridge.pending()`. Default 5 minutes — + * matches the issue's UX requirement and the tested CC keepalive ceiling. + */ + pendingTimeoutMs?: number; + /** + * Port to bind. `0` (default) lets the OS pick a free one. + */ + port?: number; + /** + * Progress notification cadence. Default 30s. CC's HTTP transport drops + * SSE around 5min idle without a wire-level message — `notifications/progress` + * counts as a wire-level message. + */ + progressIntervalMs?: number; +} + +interface RegisteredOperation { + bridge: AskUserBridge; +} + +interface SessionEntry { + mcp: McpServer; + transport: StreamableHTTPServerTransport; +} + +/** + * Process-wide MCP server that exposes a single `ask_user_question` tool to + * CC over HTTP/SSE. One server, many concurrent operations — each spawn + * registers a per-op `AskUserBridge` and gets back a URL with `?op=` + * that CC's `--mcp-config` points at. Tool invocations route to the matching + * bridge by query param. + * + * Lifecycle: + * server.start() // once per process + * bridge = server.registerOperation(opId) + * ...spawn CC pointing at server.url + ?op=opId... + * server.unregisterOperation(opId) // releases bridge resources + * server.stop() // on app shutdown + * + * ## Per-session transport + * + * Each MCP `initialize` from a new CC subprocess gets its own + * `StreamableHTTPServerTransport` + `McpServer` pair. The SDK's transport + * stores `_initialized=true` and a `sessionId` per instance, so reusing a + * single transport across sequential ops makes the second `initialize` fail + * with `Invalid Request: Server already initialized`. Subsequent requests + * from the same CC subprocess (carrying `mcp-session-id`) route back to the + * matching transport via `sessionTransports` lookup. + */ +export class AskUserMcpServer { + private httpServer?: http.Server; + /** sessionId → transport+mcp pair. Populated on initialize, removed on session close. */ + private readonly sessionTransports = new Map(); + private readonly operations = new Map(); + /** + * MCP session id → operationId. Populated when a CC initialize POST + * arrives at `/mcp?op=`; the URL's `op` is captured via + * `AsyncLocalStorage`, the SDK's `onsessioninitialized` hook reads it + * at session-create time, and tool handler lookups use `extra.sessionId`. + */ + private readonly sessionIdToOpId = new Map(); + /** Per-request op id, populated for the duration of `handleRequest`. */ + private readonly opIdContext = new AsyncLocalStorage(); + private startedUrl?: string; + private readonly pendingTimeoutMs: number; + private readonly progressIntervalMs: number; + + constructor(private readonly options: AskUserMcpServerOptions = {}) { + this.pendingTimeoutMs = options.pendingTimeoutMs ?? 5 * 60 * 1000; + this.progressIntervalMs = options.progressIntervalMs ?? 30_000; + } + + /** URL only valid after `start()` resolves. */ + get url(): string { + if (!this.startedUrl) { + throw new Error('AskUserMcpServer not started yet — call start() first'); + } + return this.startedUrl; + } + + async start(): Promise { + if (this.httpServer) { + // idempotent — repeat calls return the existing started state. + return { port: (this.httpServer.address() as AddressInfo).port, url: this.url }; + } + + const httpServer = http.createServer(async (req, res) => { + // Only the `/mcp` path is part of our contract. Anything else is + // either a misroute or a probe — answer with 404 so it's loud. + if (!req.url || !req.url.startsWith('/mcp')) { + res.writeHead(404, { 'content-type': 'text/plain' }); + res.end('Not Found'); + return; + } + + // Producer encodes operationId as `?op=` on the URL it hands + // to CC. Capture it now so `onsessioninitialized` can bind it to the + // generated sessionId. AsyncLocalStorage keeps interleaved requests + // from clobbering each other. + const parsed = new URL(req.url, 'http://127.0.0.1'); + const opId = parsed.searchParams.get('op') ?? undefined; + + // The MCP transport reads from req directly. We only need to extract + // the JSON body for POST requests so it can be passed in. + let body: unknown; + if (req.method === 'POST') { + body = await readJsonBody(req).catch(() => undefined); + } + + await this.opIdContext.run(opId, async () => { + try { + const transport = await this.resolveTransport(req, body); + if (!transport) { + res.writeHead(400, { 'content-type': 'application/json' }); + res.end( + JSON.stringify({ + error: { + code: -32_000, + message: 'Bad Request: no session and no initialize request', + }, + id: null, + jsonrpc: '2.0', + }), + ); + return; + } + await transport.handleRequest(req, res, body); + } catch (err) { + if (!res.headersSent) { + res.writeHead(500, { 'content-type': 'text/plain' }); + res.end(String((err as Error)?.message ?? err)); + } + } + }); + }); + + this.httpServer = httpServer; + await new Promise((resolve, reject) => { + httpServer.once('error', reject); + httpServer.listen(this.options.port ?? 0, '127.0.0.1', () => { + httpServer.off('error', reject); + resolve(); + }); + }); + + const port = (httpServer.address() as AddressInfo).port; + this.startedUrl = `http://127.0.0.1:${port}/mcp`; + return { port, url: this.startedUrl }; + } + + async stop(): Promise { + // Close all bridges first so pending MCP handlers return quickly. + for (const [opId] of this.operations) this.unregisterOperation(opId); + // Tear down every per-session transport + MCP server pair. + for (const [, entry] of this.sessionTransports) { + await entry.transport.close().catch(() => {}); + await entry.mcp.close().catch(() => {}); + } + this.sessionTransports.clear(); + await new Promise((resolve) => { + if (!this.httpServer) { + resolve(); + return; + } + this.httpServer.close(() => resolve()); + }); + this.httpServer = undefined; + this.startedUrl = undefined; + } + + /** + * Allocate a bridge for a new operation and return the + * `--mcp-config`-ready URL. The caller spawns CC pointing at this URL + * and merges `bridge.events()` into the producer's outbound stream. + */ + registerOperation(operationId: string, bridge?: AskUserBridge): AskUserBridge { + if (this.operations.has(operationId)) { + throw new Error(`AskUserMcpServer: operation already registered: ${operationId}`); + } + const created = bridge ?? new AskUserBridge(operationId); + this.operations.set(operationId, { bridge: created }); + return created; + } + + unregisterOperation(operationId: string): void { + const entry = this.operations.get(operationId); + if (!entry) return; + entry.bridge.cancelAll('session_ended'); + this.operations.delete(operationId); + // Drop the reverse mapping for any sessions that were bound to this op. + for (const [sid, oid] of this.sessionIdToOpId) { + if (oid === operationId) this.sessionIdToOpId.delete(sid); + } + } + + /** Build the per-op URL the producer writes into the temp `mcp-config` JSON. */ + urlForOperation(operationId: string): string { + const base = new URL(this.url); + base.searchParams.set('op', operationId); + return base.toString(); + } + + /** Test/inspection helper. */ + hasOperation(operationId: string): boolean { + return this.operations.has(operationId); + } + + /** Currently-registered operation count. */ + get operationCount(): number { + return this.operations.size; + } + + /** Active MCP session count (initialize succeeded, not yet closed). */ + get sessionCount(): number { + return this.sessionTransports.size; + } + + /** + * Locate (or build) the transport that should handle this request. + * + * - Existing session id (header) → matching stored transport + * - No session id + initialize body → fresh transport+mcp pair, registered + * on `onsessioninitialized` so the very next message from this client + * finds its session + * - Anything else → null (caller responds with 400) + */ + private async resolveTransport( + req: http.IncomingMessage, + body: unknown, + ): Promise { + const sessionId = (req.headers['mcp-session-id'] as string | undefined) ?? undefined; + if (sessionId) { + const entry = this.sessionTransports.get(sessionId); + if (entry) return entry.transport; + // Unknown session id — let the SDK respond 404; we still return the + // transport-less response below. + return undefined; + } + + if (body && isInitializeRequest(body)) { + return this.createSessionTransport(); + } + + return undefined; + } + + /** + * Build a fresh `StreamableHTTPServerTransport` + `McpServer` pair for a + * new MCP session. The pair is registered into `sessionTransports` from + * the `onsessioninitialized` callback, so every subsequent request + * tagged with that sessionId routes back here without reconstruction. + */ + private createSessionTransport(): StreamableHTTPServerTransport { + const mcp = new McpServer( + { name: ASK_USER_MCP_SERVER_NAME, version: '1.0.0' }, + { capabilities: { tools: {} } }, + ); + this.registerAskUserTool(mcp); + + const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({ + onsessionclosed: (sessionId: string) => { + this.sessionTransports.delete(sessionId); + this.sessionIdToOpId.delete(sessionId); + }, + onsessioninitialized: (sessionId: string) => { + this.sessionTransports.set(sessionId, { mcp, transport }); + const opId = this.opIdContext.getStore(); + if (opId) this.sessionIdToOpId.set(sessionId, opId); + }, + sessionIdGenerator: () => randomUUID(), + }); + + // Connect synchronously so the first request's body can be processed + // by the same transport we just built. `mcp.connect(transport)` is + // idempotent at this stage and resolves before `handleRequest` is + // called by the caller. + void mcp.connect(transport); + return transport; + } + + private registerAskUserTool(mcp: McpServer) { + mcp.registerTool( + ASK_USER_TOOL_NAME, + { + description: ASK_USER_TOOL_DESCRIPTION, + inputSchema: askUserInputShape, + title: 'Ask User Question', + }, + async (args, extra) => { + const sessionId = (extra as { sessionId?: string } | undefined)?.sessionId; + const operationId = sessionId ? this.sessionIdToOpId.get(sessionId) : undefined; + if (!operationId) { + return errorResult( + "Missing 'op' query parameter on MCP server URL — producer should append ?op=", + ); + } + const op = this.operations.get(operationId); + if (!op) { + return errorResult( + `No active operation for id '${operationId}'. The op may have ended before the tool call landed.`, + ); + } + + const ccToolUseId = (extra?._meta as { 'claudecode/toolUseId'?: string } | undefined)?.[ + 'claudecode/toolUseId' + ]; + const progressToken = (extra?._meta as { progressToken?: string | number } | undefined) + ?.progressToken; + // Use CC's own tool_use id as the bridge correlation key so the + // outbound `agent_intervention_request` shares an id with the + // existing tool message on the renderer side. Without this the + // renderer can't tie the intervention card to its tool bubble. + const toolCallId = ccToolUseId; + + // SSE keepalive: every progressIntervalMs send a progress + // notification so CC's transport doesn't time out on long waits. + // Empirically required for >~5min — we cap at 5min anyway, but + // tick from the start so even 4-minute pendings get periodic life. + const onProgress = + progressToken !== undefined && extra?.sendNotification + ? async (elapsedMs: number, totalMs: number) => { + try { + await extra.sendNotification!({ + method: 'notifications/progress', + params: { + message: `Waiting for user (${Math.round(elapsedMs / 1000)}s)`, + progress: elapsedMs, + progressToken, + total: totalMs, + }, + }); + } catch { + // Non-fatal — the underlying transport may be torn down + // mid-flight; the next setInterval tick will skip too. + } + } + : undefined; + + const answer = await op.bridge.pending( + { arguments: args, toolCallId }, + { + onProgress, + progressIntervalMs: this.progressIntervalMs, + timeoutMs: this.pendingTimeoutMs, + }, + ); + + return formatAnswerForCC(answer, args); + }, + ); + } +} + +const readJsonBody = async (req: http.IncomingMessage): Promise => { + const chunks: Buffer[] = []; + for await (const chunk of req) { + chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : (chunk as Buffer)); + } + if (chunks.length === 0) return undefined; + return JSON.parse(Buffer.concat(chunks).toString('utf8')); +}; + +const errorResult = (message: string) => ({ + content: [{ text: message, type: 'text' as const }], + isError: true, +}); + +const formatAnswerForCC = (answer: InterventionAnswer, args: unknown) => { + if (answer.cancelled) { + const reasonText = + answer.cancelReason === 'timeout' + ? 'No answer received within the wait window; the user did not respond. Continue without their input or ask in plain text.' + : answer.cancelReason === 'session_ended' + ? 'The session was ended before the user could respond.' + : 'The user cancelled the question.'; + return { + content: [{ text: reasonText, type: 'text' as const }], + isError: true, + }; + } + + // Success: format the structured answer back as text. CC's built-in + // AskUserQuestion returns "User answers:\n- : " style — match it + // so the model handles our payload identically. + const answerObj = (answer.result ?? {}) as Record; + const questions = (args as { questions?: Array<{ question: string }> }).questions ?? []; + const lines = ['User answers:']; + for (const q of questions) { + const a = answerObj[q.question]; + const formatted = Array.isArray(a) ? a.join(', ') : a == null ? '(no answer)' : String(a); + lines.push(`- ${q.question}: ${formatted}`); + } + return { + content: [{ text: lines.join('\n'), type: 'text' as const }], + }; +}; diff --git a/packages/heterogeneous-agents/src/askUser/constants.ts b/packages/heterogeneous-agents/src/askUser/constants.ts new file mode 100644 index 0000000000..46a6b4f81c --- /dev/null +++ b/packages/heterogeneous-agents/src/askUser/constants.ts @@ -0,0 +1,21 @@ +/** + * Public constants shared between the producer-side MCP server (Node-only) + * and the consumer-side adapter / renderer (browser-safe). Kept in a + * dependency-free module so importers don't accidentally pull node:http + * etc. into the renderer bundle. + */ + +/** MCP server name as it appears in the tool name prefix. */ +export const ASK_USER_MCP_SERVER_NAME = 'lobe_cc'; + +/** MCP tool name (without the `mcp__lobe_cc__` prefix). */ +export const ASK_USER_TOOL_NAME = 'ask_user_question'; + +/** Full tool name as the CC model sees it on the wire. */ +export const ASK_USER_TOOL_FULL_NAME = `mcp__${ASK_USER_MCP_SERVER_NAME}__${ASK_USER_TOOL_NAME}`; + +/** + * Stable apiName the adapter rewrites the MCP tool to so that downstream + * UI / persistence routes on a clean key, not the wire-prefixed MCP name. + */ +export const ASK_USER_API_NAME = 'askUserQuestion'; diff --git a/packages/heterogeneous-agents/src/askUser/index.ts b/packages/heterogeneous-agents/src/askUser/index.ts new file mode 100644 index 0000000000..f7532850ff --- /dev/null +++ b/packages/heterogeneous-agents/src/askUser/index.ts @@ -0,0 +1,30 @@ +/** + * Producer-side MCP server + per-op bridge for Claude Code's AskUserQuestion + * via local HTTP MCP. See `LOBE-8725` for the full design. + * + * Used by: + * - Electron main (`HeterogeneousAgentCtr`) — local app + * - Sandbox CLI (`lh hetero exec`) — phase 2; for now the CLI doesn't + * register a server and CC falls back to text questions + * + * Consumer (renderer / web client) talks to the producer via the existing + * `AgentStreamEvent` pipeline — `agent_intervention_request` flows out, + * `agent_intervention_response` flows back. + */ +export { + AskUserBridge, + type InterventionAnswer, + type PendingArgs, + type PendingOptions, +} from './AskUserBridge'; +export { + AskUserMcpServer, + type AskUserMcpServerOptions, + type StartedServer, +} from './AskUserMcpServer'; +export { + ASK_USER_API_NAME, + ASK_USER_MCP_SERVER_NAME, + ASK_USER_TOOL_FULL_NAME, + ASK_USER_TOOL_NAME, +} from './constants'; diff --git a/src/features/Conversation/Messages/AssistantGroup/Tool/Detail/Intervention/customInteractionHandlers.ts b/src/features/Conversation/Messages/AssistantGroup/Tool/Detail/Intervention/customInteractionHandlers.ts index 56ace1238c..35c727351c 100644 --- a/src/features/Conversation/Messages/AssistantGroup/Tool/Detail/Intervention/customInteractionHandlers.ts +++ b/src/features/Conversation/Messages/AssistantGroup/Tool/Detail/Intervention/customInteractionHandlers.ts @@ -1,3 +1,4 @@ +import { ClaudeCodeIdentifier } from '@lobechat/builtin-tool-claude-code'; import { UserInteractionIdentifier } from '@lobechat/builtin-tool-user-interaction'; import { AgentMarketplaceIdentifier, @@ -117,8 +118,22 @@ const customInteractionSubmitHandlers = new Map([ClaudeCodeIdentifier]); + +export const isHeteroInteractionIdentifier = (identifier: string) => + HETERO_CUSTOM_INTERACTION_IDENTIFIERS.has(identifier); + export const isCustomInteractionIdentifier = (identifier: string) => - identifier === UserInteractionIdentifier || customInteractionSubmitHandlers.has(identifier); + identifier === UserInteractionIdentifier || + isHeteroInteractionIdentifier(identifier) || + customInteractionSubmitHandlers.has(identifier); export const prepareCustomInteractionSubmit = async ( identifier: string, diff --git a/src/features/Conversation/Messages/AssistantGroup/Tool/Detail/Intervention/index.tsx b/src/features/Conversation/Messages/AssistantGroup/Tool/Detail/Intervention/index.tsx index 68eb8fe66b..5f820b8b35 100644 --- a/src/features/Conversation/Messages/AssistantGroup/Tool/Detail/Intervention/index.tsx +++ b/src/features/Conversation/Messages/AssistantGroup/Tool/Detail/Intervention/index.tsx @@ -4,6 +4,7 @@ import { Flexbox } from '@lobehub/ui'; import { memo, Suspense, useCallback, useMemo, useRef, useState } from 'react'; import { createPortal } from 'react-dom'; +import { useChatStore } from '@/store/chat'; import { useUserStore } from '@/store/user'; import { toolInterventionSelectors } from '@/store/user/selectors'; @@ -12,6 +13,7 @@ import Arguments from '../Arguments'; import ApprovalActions from './ApprovalActions'; import { isCustomInteractionIdentifier, + isHeteroInteractionIdentifier, prepareCustomInteractionSubmit, recordCustomInteractionResolution, } from './customInteractionHandlers'; @@ -98,6 +100,11 @@ const Intervention = memo( const submitToolInteraction = useConversationStore((s) => s.submitToolInteraction); const skipToolInteraction = useConversationStore((s) => s.skipToolInteraction); const cancelToolInteraction = useConversationStore((s) => s.cancelToolInteraction); + // Hetero (CC / Codex) interventions ship the answer back through IPC to a + // running CLI subprocess instead of starting a fresh `executeClientAgent` + // turn. Pull the chat-store action lazily so non-hetero interactions stay + // on the existing path with no behavior change. + const submitHeteroIntervention = useChatStore((s) => s.submitHeteroIntervention); const handleInteractionAction = useCallback( async ( @@ -106,6 +113,10 @@ const Intervention = memo( | { type: 'skip'; payload?: Record; reason?: string } | { type: 'cancel'; payload?: Record }, ) => { + if (isHeteroInteractionIdentifier(identifier)) { + await submitHeteroIntervention(id, action.type, action.payload); + return; + } switch (action.type) { case 'submit': { const { payload, options } = await prepareCustomInteractionSubmit( @@ -149,6 +160,7 @@ const Intervention = memo( identifier, parsedArgs, skipToolInteraction, + submitHeteroIntervention, submitToolInteraction, topicId, ], diff --git a/src/features/Conversation/store/slices/generation/action.ts b/src/features/Conversation/store/slices/generation/action.ts index c1a64b4a2b..7eb7a6db17 100644 --- a/src/features/Conversation/store/slices/generation/action.ts +++ b/src/features/Conversation/store/slices/generation/action.ts @@ -1,16 +1,23 @@ import { AgentManagementIdentifier } from '@lobechat/builtin-tool-agent-management'; +import { LOADING_FLAT } from '@lobechat/const'; +import type { ConversationContext, HeterogeneousProviderConfig } from '@lobechat/types'; +import { t } from 'i18next'; import { type StateCreator } from 'zustand'; +import { message as antdMessage } from '@/components/AntdStaticMethods'; import { MESSAGE_CANCEL_FLAT } from '@/const/index'; +import { messageService } from '@/services/message'; import { getAgentStoreState } from '@/store/agent'; -import { agentSelectors } from '@/store/agent/selectors'; +import { agentByIdSelectors, agentSelectors } from '@/store/agent/selectors'; import { useChatStore } from '@/store/chat'; +import { topicSelectors } from '@/store/chat/selectors'; import { selectRuntimeType } from '@/store/chat/slices/aiChat/actions/agentDispatcher'; import { parseMentionedAgentsFromEditorData, parseSelectedSkillsFromEditorData, parseSelectedToolsFromEditorData, } from '@/store/chat/slices/aiChat/actions/commandBus'; +import { resolveHeteroResume } from '@/store/chat/slices/aiChat/actions/heteroResume'; import { operationSelectors } from '@/store/chat/slices/operation/selectors'; import { INPUT_LOADING_OPERATION_TYPES } from '@/store/chat/slices/operation/types'; import { @@ -49,6 +56,91 @@ const buildRetryInitialContext = (editorData: Record | null | undef }; }; +/** + * Branch a hetero (Claude Code / Codex) turn off an existing user message. + * + * Used by regenerate (parent = user msg, prompt = original user content). + * Pre-creates the assistant row so `executeHeterogeneousAgent` has a stable + * `assistantMessageId` to stream into, then runs an `execHeterogeneousAgent` + * op as a child of the caller's parent op so Stop cancels the executor + * without killing the parent op early. + */ +const runHeterogeneousFromExistingMessage = async ( + chatStore: ReturnType, + params: { + context: ConversationContext; + heterogeneousProvider: HeterogeneousProviderConfig; + parentMessageId: string; + parentOperationId: string; + prompt: string; + }, +): Promise => { + const { context, heterogeneousProvider, parentMessageId, parentOperationId, prompt } = params; + const agentId = context.agentId; + if (!agentId) throw new Error('agentId is required for heterogeneous agent'); + + // Resolve workingDirectory: topic-level pin (set when bound to a project) + // wins over the agent-level default. Mirrors the sendMessage hetero branch + // so regenerate stays on the same project as the original turn. + const topic = context.topicId + ? topicSelectors.getTopicById(context.topicId)(chatStore) + : undefined; + const agentWorkingDirectory = + agentByIdSelectors.getAgentWorkingDirectoryById(agentId)(getAgentStoreState()); + const workingDirectory = topic?.metadata?.workingDirectory || agentWorkingDirectory; + + // Drops the saved sessionId when its bound cwd disagrees with the current + // one — without this CC emits "No conversation found with session ID". + const { cwdChanged, resumeSessionId } = resolveHeteroResume(topic?.metadata, workingDirectory); + if (cwdChanged) antdMessage.info(t('heteroAgent.resumeReset.cwdChanged', { ns: 'chat' })); + + const assistantMsg = await messageService.createMessage({ + agentId, + content: LOADING_FLAT, + parentId: parentMessageId, + // External CLIs own model selection; persist only the runtime provider up + // front. The adapter backfills the actual model later if the CLI reports it. + provider: heterogeneousProvider.type, + role: 'assistant', + threadId: context.threadId ?? undefined, + topicId: context.topicId ?? undefined, + }); + + // Pull the new row into the store so the loading bubble is visible while + // the executor runs (the executor only dispatches updates, not creates). + await chatStore.refreshMessages(); + + if (context.topicId) chatStore.internal_updateTopicLoading(context.topicId, true); + + const { operationId: heteroOpId } = chatStore.startOperation({ + context, + label: 'Heterogeneous Agent Execution', + metadata: { heterogeneousType: heterogeneousProvider.type }, + parentOperationId, + type: 'execHeterogeneousAgent', + }); + chatStore.associateMessageWithOperation(assistantMsg.id, heteroOpId); + + try { + const { executeHeterogeneousAgent } = + await import('@/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor'); + await executeHeterogeneousAgent(() => useChatStore.getState(), { + assistantMessageId: assistantMsg.id, + context, + heterogeneousProvider, + message: prompt, + operationId: heteroOpId, + resumeSessionId, + workingDirectory, + }); + } finally { + if (context.topicId) + useChatStore.getState().internal_updateTopicLoading(context.topicId, false); + } + + return assistantMsg.id; +}; + /** * Generation Actions * @@ -223,15 +315,11 @@ export const generationSlice: StateCreator< isGatewayMode: chatStore.isGatewayModeEnabled(), }); - // TODO(LOBE-8519 follow-up): continue is currently only wired for the client - // runtime. Gateway / hetero continue both fall through to the client path - // here; a proper implementation needs runtime-specific resume semantics. - if (runtimeType !== 'client') { - console.warn( - `[continueGenerationMessage] runtime=${runtimeType} not yet supported; ` + - 'falling through to client mode', - ); - } + // Hetero CLIs (CC / Codex) have no "continue a cut-off response" primitive + // — each prompt is a fresh user turn from their perspective. Bail out + // rather than synthesize a fake "please continue" turn that would pollute + // the session and confuse the model. The button is a no-op in this mode. + if (runtimeType === 'hetero') return; // Create continue operation with ConversationStore context (includes groupId) const { operationId } = chatStore.startOperation({ @@ -240,7 +328,24 @@ export const generationSlice: StateCreator< }); try { - // Execute agent runtime with full context from ConversationStore + // ── Gateway mode: branch a server-side run from the cut-off message ── + // `parentMessageId` triggers `resume: true` on the router, so the server + // skips user-message creation and continues from the existing chain. + // Empty prompt is intentional and matches the approve/reject resume path. + if (runtimeType === 'gateway') { + await chatStore.executeGatewayAgent({ + context, + message: '', + onComplete: () => { + chatStore.completeOperation(operationId); + if (hooks.onContinueComplete) hooks.onContinueComplete(displayMessageId); + }, + parentMessageId: dbMessageId, + }); + return; + } + + // ── Client mode: run agent locally ── await chatStore.executeClientAgent({ context, messages: displayMessages, @@ -381,8 +486,9 @@ export const generationSlice: StateCreator< }); const agentConfig = agentSelectors.getAgentConfigById(context.agentId)(getAgentStoreState()); + const heterogeneousProvider = agentConfig?.agencyConfig?.heterogeneousProvider; const runtimeType = selectRuntimeType({ - heterogeneousProvider: agentConfig?.agencyConfig?.heterogeneousProvider, + heterogeneousProvider, isGatewayMode: chatStore.isGatewayModeEnabled(), }); @@ -405,13 +511,25 @@ export const generationSlice: StateCreator< return; } - // ── Client mode: run agent locally ── - // TODO(LOBE-8519 follow-up): hetero regenerate is not yet implemented and - // currently falls through to client mode (silently uses the agent's underlying - // LLM instead of routing back through the heterogeneous CLI). Implementing it - // requires the same persistence + executeHeterogeneousAgent setup as - // sendMessage's hetero branch. + // ── Hetero mode: re-run the local CLI against the original user prompt ── + // Creates a fresh assistant row branched off the existing user message so + // the CC / Codex turn replaces the previous attempt without rewriting + // history, and resumes the same session id (when the cwd still matches) + // so prior context is preserved. + if (runtimeType === 'hetero' && heterogeneousProvider) { + await runHeterogeneousFromExistingMessage(chatStore, { + context, + heterogeneousProvider, + parentMessageId: messageId, + parentOperationId: operationId, + prompt: item.content, + }); + chatStore.completeOperation(operationId); + if (hooks.onRegenerateComplete) hooks.onRegenerateComplete(messageId); + return; + } + // ── Client mode: run agent locally ── // Execute agent runtime with full context from ConversationStore await chatStore.executeClientAgent({ context, diff --git a/src/server/routers/lambda/aiAgent.ts b/src/server/routers/lambda/aiAgent.ts index db90c5d95a..2d6b818004 100644 --- a/src/server/routers/lambda/aiAgent.ts +++ b/src/server/routers/lambda/aiAgent.ts @@ -357,6 +357,8 @@ const AgentStreamEventSchema = z.object({ 'tool_end', 'tool_execute', 'tool_result', + 'agent_intervention_request', + 'agent_intervention_response', 'step_start', 'step_complete', 'error', diff --git a/src/services/electron/heterogeneousAgent.ts b/src/services/electron/heterogeneousAgent.ts index 2a5ba1d5a7..560ce4708e 100644 --- a/src/services/electron/heterogeneousAgent.ts +++ b/src/services/electron/heterogeneousAgent.ts @@ -39,6 +39,21 @@ class HeterogeneousAgentService { async getSessionInfo(sessionId: string) { return this.ipc.heterogeneousAgent.getSessionInfo({ sessionId }); } + + /** + * Submit the user's answer (or cancellation) for a pending CC + * AskUserQuestion intervention. The main process routes it to the + * matching MCP bridge so the blocked tool handler can return to CC. + */ + async submitIntervention(params: { + cancelReason?: 'timeout' | 'user_cancelled'; + cancelled?: boolean; + operationId: string; + result?: unknown; + toolCallId: string; + }) { + return this.ipc.heterogeneousAgent.submitIntervention(params); + } } export const heterogeneousAgentService = new HeterogeneousAgentService(); diff --git a/src/store/chat/slices/aiChat/actions/conversationControl.ts b/src/store/chat/slices/aiChat/actions/conversationControl.ts index 191c232a5e..a9ef1468cc 100644 --- a/src/store/chat/slices/aiChat/actions/conversationControl.ts +++ b/src/store/chat/slices/aiChat/actions/conversationControl.ts @@ -631,6 +631,179 @@ export class ConversationControlActionImpl { completeOperation(operationId); }; + /** + * Resolve a heterogeneous-runtime intervention (CC AskUserQuestion, …). + * + * Why this action exists separately from `submitToolInteraction`: + * - The CC subprocess is already running and blocked on an MCP call — + * we need to feed the answer back through the IPC bridge, not spawn + * a fresh `executeClientAgent` turn. + * - Once the answer ships, CC's existing stream emits `tool_result` and + * keeps going on its own; no synthetic user message, no new op. + * + * The framework's intervention surface still drives the UI: we just + * stamp `pluginIntervention.status` and the eventual `tool_result` + * content via the same optimistic primitives, so the InterventionBar / + * inline tool body update synchronously and the answered Render takes + * over once `pluginIntervention.status === 'approved' | 'rejected'`. + * + * `actionType`: + * - `'submit'` → mark approved, ship `payload` as the answer + * - `'skip' | 'cancel'` → mark rejected, ship `cancelled: true` so the + * bridge resolves with `cancelReason` and CC sees an isError result + * (it'll fall back to plain-text questioning) + */ + submitHeteroIntervention = async ( + toolMessageId: string, + actionType: 'submit' | 'skip' | 'cancel', + payload?: Record, + context?: ConversationContext, + ): Promise => { + const toolMessage = dbMessageSelectors.getDbMessageById(toolMessageId)(this.#get()); + if (!toolMessage) return; + + const toolCallId = toolMessage.tool_call_id; + if (!toolCallId) { + console.warn('[submitHeteroIntervention] tool message has no tool_call_id', toolMessageId); + return; + } + + // Walk up to the assistant that owns this tool — its operation is the + // running CC stream we need to address. Falls through to the tool + // message id itself if a producer ever associated it directly. + const { messageOperationMap } = this.#get(); + const operationId = + (toolMessage.parentId && messageOperationMap?.[toolMessage.parentId]) ?? + messageOperationMap?.[toolMessageId]; + + if (!operationId) { + console.warn('[submitHeteroIntervention] no operationId for', toolMessageId); + return; + } + + const effectiveContext: ConversationContext = context ?? { + agentId: this.#get().activeAgentId, + topicId: this.#get().activeTopicId, + threadId: this.#get().activeThreadId, + }; + const optimisticContext: OptimisticUpdateContext = { operationId }; + + if (actionType === 'submit') { + await this.#get().optimisticUpdateMessagePlugin( + toolMessageId, + { intervention: { status: 'approved' } }, + optimisticContext, + ); + // Persist the structured `{ [questionText]: selectedLabel(s) }` answers + // to `pluginState.askUserAnswers` so the Render component can show + // Q&A pairs instead of parsing the bridge's prose `User answers:` + // dump out of `content`. Best-effort — never block the IPC submit. + await this.setInterventionAnswers(toolMessageId, payload ?? {}, optimisticContext); + // Bridge formats its own "User answers:" string for CC, so the eventual + // tool_result re-rewrites this content. The optimistic write is just + // for the brief gap between Submit and CC echoing the result back. + const summary = `User submitted: ${JSON.stringify(payload ?? {})}`; + await this.#get().optimisticUpdateMessageContent( + toolMessageId, + summary, + undefined, + optimisticContext, + ); + } else { + const reason = actionType === 'skip' ? 'User skipped' : 'User cancelled'; + await this.#get().optimisticUpdateMessagePlugin( + toolMessageId, + { intervention: { rejectedReason: reason, status: 'rejected' } }, + optimisticContext, + ); + await this.#get().optimisticUpdateMessageContent( + toolMessageId, + `${reason} this interaction.`, + undefined, + optimisticContext, + ); + } + + // Forward to the producer (Electron main → bridge.resolve). Dynamic + // import keeps `@/services/electron/*` out of non-Electron bundles. + try { + const { heterogeneousAgentService } = await import('@/services/electron/heterogeneousAgent'); + await heterogeneousAgentService.submitIntervention( + actionType === 'submit' + ? { operationId, result: payload ?? {}, toolCallId } + : { + cancelReason: actionType === 'skip' ? 'user_cancelled' : 'user_cancelled', + cancelled: true, + operationId, + toolCallId, + }, + ); + } catch (err) { + console.error('[submitHeteroIntervention] IPC submitIntervention failed:', err); + } + + void effectiveContext; + }; + + /** + * In-memory draft store for an intervention form. Backs the renderer's + * "remember what I'd partially answered" behaviour without paying for a + * DB round-trip on every keystroke — drafts only matter while the + * intervention is pending (5 min cap), and the canonical pluginState + * mirror is enough to survive HMR / panel re-mounts. + * + * `askUserDraft` is irrelevant after submit (the form unmounts), so we + * don't bother clearing it — it stays buried under `askUserAnswers` in + * `pluginState` and never affects the completed Render. + */ + setInterventionDraft = (toolMessageId: string, draft: Record): void => { + this.#get().internal_dispatchMessage({ + id: toolMessageId, + key: 'askUserDraft', + type: 'updatePluginState', + value: draft, + }); + }; + + /** + * Persist the structured intervention answers (`{ questionText: + * selectedLabel | selectedLabel[] }`) to the tool message's + * `pluginState.askUserAnswers`. Drives structured Q&A rendering on the + * `Render` component without re-parsing the bridge's prose tool_result. + * + * Both writes are merge-style by key — the in-memory `updatePluginState` + * reducer (`message/reducer.ts:142`) and the DB + * `messageModel.updatePluginState` shallow-merge so co-existing keys + * (`askUserDraft` etc.) survive. DB write is best-effort: a slow lambda + * must not strand the IPC submit that follows. + */ + setInterventionAnswers = async ( + toolMessageId: string, + answers: Record, + context?: OptimisticUpdateContext, + ): Promise => { + this.#get().internal_dispatchMessage( + { + id: toolMessageId, + key: 'askUserAnswers', + type: 'updatePluginState', + value: answers, + }, + context, + ); + try { + const { messageService } = await import('@/services/message'); + const ctx = this.#get().internal_getConversationContext(context); + await messageService.updateMessagePluginState( + toolMessageId, + { askUserAnswers: answers }, + ctx, + ); + } catch (err) { + console.warn('[setInterventionAnswers] persist failed:', err); + } + }; + rejectToolCalling = async ( messageId: string, reason?: string, diff --git a/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts b/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts index 4e6fa79a8b..1fbd4ed004 100644 --- a/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts +++ b/src/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor.ts @@ -1,4 +1,7 @@ -import type { AgentStreamEvent } from '@lobechat/agent-gateway-client'; +import type { + AgentInterventionRequestData, + AgentStreamEvent, +} from '@lobechat/agent-gateway-client'; import { isDesktop } from '@lobechat/const'; import { CLAUDE_CODE_CLI_INSTALL_DOCS_URL, @@ -1319,6 +1322,44 @@ export const executeHeterogeneousAgent = async ( // Record for debugging trace.push({ event, timestamp: Date.now() }); + // ─── agent_intervention_request: CC AskUserQuestion needs user input ─── + // Stamp the canonical `pluginIntervention.status='pending'` on the + // matching tool message via `optimisticUpdateMessagePlugin` — that + // single primitive (1) writes to DB, (2) updates the in-memory + // `dbMessagesMap` reducer, AND (3) mirrors the same intervention onto + // the parent assistant's `tools[].intervention` so both surfaces + // (inline tool body + bottom InterventionBar) light up immediately. + // The Intervention component registered under + // `BuiltinToolInterventions['claude-code'][askUserQuestion]` is + // rendered automatically by the framework while pending; the + // eventual `tool_result` content (formatted answer text) gets + // overwritten via the existing `tool_result` branch below. + // Deferred behind `persistQueue` so it lands AFTER `persistToolBatch` + // populates `toolMsgIdByCallId`. + if (event.type === 'agent_intervention_request') { + const data = event.data as AgentInterventionRequestData; + persistQueue = persistQueue.then(async () => { + const toolMsgId = toolMsgIdByCallId.get(data.toolCallId); + if (!toolMsgId) { + console.warn( + '[HeterogeneousAgent] intervention_request for unknown toolCallId:', + data.toolCallId, + ); + return; + } + try { + await get().optimisticUpdateMessagePlugin( + toolMsgId, + { intervention: { status: 'pending' } }, + { operationId }, + ); + } catch (err) { + console.error('[HeterogeneousAgent] persist intervention pending failed:', err); + } + }); + return; + } + // ─── tool_result: update tool message content in DB (ACP-only) ─── if (event.type === 'tool_result') { const { content, isError, pluginState, toolCallId } = event.data as {