feat(hetero-agent): support AskUserQuestion tools for claude code (#14639)

*  feat(hetero-agent): AskUserQuestion MCP server + bridge skeleton (LOBE-8725 step 1+2)

Foundation for LOBE-8725 — interactive AskUserQuestion via local MCP. CC's
built-in tool short-circuits in `-p` mode, so we host an in-process MCP
server that exposes an equivalent `ask_user_question` tool. The handler
blocks until the consumer submits an answer (or the 5min deadline / op
shutdown fires), surfacing a structured `agent_intervention_request` /
`agent_intervention_response` round-trip on the existing event stream.

Added in this commit:

- `packages/heterogeneous-agents/src/askUser/`
  - `AskUserBridge` — per-op pending map with timeout / cancel / progress
    keepalive support; emits an async-iterable of outbound events
  - `AskUserMcpServer` — process-wide HTTP/Streamable MCP server,
    `?op=<id>` query routes via `AsyncLocalStorage` →
    `onsessioninitialized` → sessionId↔opId map; tool handler hands off
    to the matching bridge and pumps `notifications/progress` back to CC
    every 30s as wire-level keepalive (required for >5min waits, see
    spike notes)
  - `constants.ts` — shared tool/server names + the stable `apiName`
    the adapter rewrites to
  - Unit tests cover bridge lifecycle (resolve / cancel / timeout /
    progress / event stream) and an end-to-end MCP probe via
    `StreamableHTTPClientTransport`

- `packages/agent-gateway-client/src/types.ts` — wire-level
  `agent_intervention_request` / `agent_intervention_response` event
  variants + payload interfaces. Re-exported through the package barrel.

- `packages/heterogeneous-agents/src/adapters/claudeCode.ts` — when CC's
  `tool_use` carries `mcp__lobe_cc__ask_user_question`, the adapter
  rewrites `apiName` to `askUserQuestion` so the renderer routes on a
  clean domain key. Identifier stays `claude-code`. Applied to both the
  main-agent and subagent paths for symmetry (subagent ask isn't
  expected today, but doesn't hurt).

- `src/server/routers/lambda/aiAgent.ts` — Zod input schema for
  `aiAgent.heteroIngest` extended with the two new event types so the
  CLI sandbox can forward them through the server.

No producer wiring yet — Steps 3-5 plug this into Electron main, the
renderer executor, and the new UI.

*  feat(hetero-agent): wire AskUserQuestion MCP into Electron CC driver (LOBE-8725 step 3)

Plug the Step 1 skeleton (`AskUserMcpServer` + `AskUserBridge`) into the
desktop Claude Code spawn path. CC's local MCP `ask_user_question` tool now
goes live during real prompts; renderer-submitted answers route back via
new IPC.

Changes
- `apps/desktop/src/main/modules/heterogeneousAgent/types.ts` — add
  optional `mcpConfigPath` to `HeterogeneousAgentBuildPlanParams` so
  controller-managed temp configs flow into the driver.
- `apps/desktop/src/main/modules/heterogeneousAgent/drivers/claudeCode.ts`
  — append `--mcp-config <path>` when provided. Disallowed-tools pin
  stays so CC's built-in AskUserQuestion remains off (avoids double-
  registration of the same tool name).
- `apps/desktop/src/main/controllers/HeterogeneousAgentCtr.ts`
  - Lazy-singleton `AskUserMcpServer` started on first claude-code prompt
    (de-duped concurrent first-callers via in-flight promise).
  - Per-op `setupInterventionForOp(opId, sessionId)`: registers an
    `AskUserBridge`, writes `os.tmpdir()/lobe-cc-mcp-<opId>.json` with
    `alwaysLoad: true` so CC eager-loads the tool (1-hop call, no
    ToolSearch detour — see LOBE-8725 spike), pumps `bridge.events()`
    into the existing `heteroAgentEvent` broadcast.
  - Cleanup paths: exit handler `await intervention.cleanup()` settles
    pending MCP handlers + unlinks the temp config; pre-spawn errors
    short-circuit the same cleanup so we don't leak bridges on
    `buildSpawnPlan` / trace-session failures.
  - `before-quit` stops the MCP server (in addition to killing CC
    processes).
  - New `@IpcMethod() submitIntervention({ operationId, toolCallId,
    result?, cancelled?, cancelReason? })` — renderer side will dispatch
    answers / cancellations through this in Step 4/5.
  - codex unchanged — bridge setup is gated on `agentType === 'claude-code'`.
- `src/services/electron/heterogeneousAgent.ts` — renderer-side proxy
  for `submitIntervention`.
- New `claudeCode.test.ts` covers the four driver-arg paths
  (`--mcp-config` presence, ordering vs `--resume`, AskUserQuestion stay
  disallowed). Existing 28 controller tests still pass.

What still doesn't run end-to-end
- The renderer `heteroExecutor` doesn't consume `agent_intervention_request`
  yet — events go through the broadcast but the chat store ignores them.
- No UI to render the intervention card or to call `submitIntervention`.
Both lands in Steps 4/5 next.

*  feat(hetero-agent): correlate intervention with tool message + renderer handler (LOBE-8725 step 3.5+4)

Bridge now uses the caller-supplied toolCallId (CC's `claudecode/toolUseId`
from MCP `_meta`) instead of a random UUID, so the
`agent_intervention_request` event references the same id as the existing
tool message on the renderer side.

Renderer-side `heteroExecutor` learns the new event:

- Added `persistInterventionRequest(...)` next to `persistToolResult` —
  stamps `pluginState.askUserQuestion` (apiName + identifier + questions
  parsed from `arguments` + deadline + status='pending' + toolCallId)
  onto the matching tool message via `messageService.updateToolMessage`.
- New branch in `handleStreamEvent` for `'agent_intervention_request'`:
  defers behind `persistQueue` (so it lands AFTER `persistToolBatch`
  populates `toolMsgIdByCallId`), then mirrors the same pluginState onto
  the in-memory message via `internal_dispatchMessage` so the UI lights
  up immediately — no fetchAndReplaceMessages round-trip needed.
- The eventual `tool_result` for the same toolCallId hits the existing
  `tool_result` branch unchanged: it overwrites `pluginState` with
  whatever the result carries (typically undefined for our MCP tool, so
  `pluginState.askUserQuestion` clears and the intervention UI yields to
  the regular Render).

Bridge tests cover the new contract:
- caller-supplied toolCallId becomes the wire correlation key
- duplicate-toolCallId pendings reject loudly so two-handler clobbers
  surface immediately

153 package tests + 1167 desktop main tests + 51 hetero executor tests
still green; type-check clean.

*  feat(claude-code): AskUserQuestion intervention render component (LOBE-8725 step 5)

Dedicated Render for the synthetic `askUserQuestion` apiName the adapter
rewrites the local MCP `mcp__lobe_cc__ask_user_question` tool to. Lives
under CC's render registry so the existing chat tool-detail flow picks
it up automatically — no changes to the conversation framework.

- New `AskUserQuestionItem` / `AskUserQuestionArgs` /
  `AskUserQuestionPluginState` types (mirrors CC's own
  AskUserQuestion schema verbatim).
- `ClaudeCodeApiName` gains an `AskUserQuestion = 'askUserQuestion'`
  member so the renders / inspectors / streamings registries can key
  off the same enum value.
- `client/Render/AskUserQuestion/index.tsx` is the component:
  - `pluginState.askUserQuestion?.status === 'pending'` → renders the
    questions form (Select for single-select, CheckboxGroup for
    multi-select), a 5-min countdown ticking once a second, Submit /
    Skip buttons. Reads `operationId` via `messageOperationMap` so we
    can route through `heterogeneousAgentService.submitIntervention`.
  - Otherwise → renders the questions as muted captions plus the
    final answer text from `content`. Surfaces a warning when the
    tool_result was an error (timeout / cancelled / session ended).
  - Submit button stays disabled until every question has a
    selection; Skip always enabled (sends `cancelled: true`).
- `ClaudeCodeRenders[ClaudeCodeApiName.AskUserQuestion]` registers
  the new component.

What this does NOT do
- Doesn't touch `BuiltinToolInterventions` — the form is rendered
  inside the regular tool body (Render slot), not the canonical
  intervention slot. Cleanest for now: the framework intervention
  flow assumes `submitToolInteraction` store actions, which would
  fight our IPC path. We can refactor onto that surface later if
  CC grows additional interactions (approval, file picker).
- Doesn't translate strings — i18n in a follow-up.

Type-check clean. Step 6 (real desktop e2e via CC) is next.

*  feat(claude-code): render AskUserQuestion form during pending state (LOBE-8725 step 5 follow-up)

Step 5 registered the Render component but stopped at the registry — the
chat tool-detail still returned the loading placeholder while
`isToolCalling` was true, so users only ever saw a spinner during the 5
min intervention window.

Detect `pluginState.askUserQuestion?.status === 'pending'` (only set on
CC + apiName=askUserQuestion tool messages) and route to the registered
builtin Render inline before the placeholder branch. Once the
intervention resolves, the eventual `tool_result` clears
`pluginState.askUserQuestion` and the regular Render takes over.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

*  feat(hetero-agent): wire regenerate / continue for hetero runtime (LOBE-8519 follow-up)

LOBE-8519 left two TODOs in `generationSlice` where hetero runtime
silently fell through to client mode — regenerate would secretly hit the
agent's underlying LLM, and continue would synthesize a fake "please
continue" turn that confuses CC / Codex.

- regenerateMessage: re-create the assistant row branched off the same
  user message, resolve resume sessionId (drop on cwd mismatch), then
  spawn a child `execHeterogeneousAgent` op so Stop only kills the
  executor, not the parent regenerate op. Mirrors sendMessage's hetero
  branch.
- continueGenerationMessage: hetero CLIs have no continue primitive —
  each prompt is a fresh user turn — so bail out instead of polluting
  the session.
- continueGenerationMessage: gateway mode now branches a server-side
  resume run instead of falling through to client.

Surfaced while testing CC AskUserQuestion end-to-end on the
LOBE-8725 branch (regenerating after an answered question went through
the wrong runtime).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🐛 fix(local-testing): electron-dev.sh boots on macOS bash 3.2

Two bugs surfaced when invoking the local-testing helper from a fresh
session on macOS:

- `find_project_pids` / `do_stop` end with `grep -v '^$'` whose exit
  code propagates through `pipefail`. With `set -e`, an empty pid set
  silently kills the whole script — `do_start` reported success, no
  Electron, no error. Trail with `|| true`.
- `setsid` is GNU coreutils, not on macOS. Fall back to plain `bash -c`;
  process-tree teardown still works because `expand_descendants` walks
  the tree directly.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🐛 fix(hetero-agent): per-session MCP transport for sequential ops (LOBE-8725)

`AskUserMcpServer` shared a single `StreamableHTTPServerTransport` across
every CC subprocess. The SDK transport latches `_initialized=true`
after the first `initialize`, so the second op's CC subprocess sees
`Invalid Request: Server already initialized` (400) and reports the
`lobe_cc` server as `failed`. From the model's POV the MCP tool is
absent — it falls back to ToolSearch, can't find anything, and
verbalizes the question instead.

Refactor to the canonical multi-tenant pattern: one transport + one
`McpServer` per session, looked up by the SDK-managed `mcp-session-id`
header. New transports are minted on the first POST without a session
id (must be an `initialize` request); subsequent requests route via
the stored map; `onsessionclosed` cleans up.

The first run of any process still works as before — this only matters
once a second op spins up. Added a 3-op sequential regression test
that fails on the old single-transport implementation and passes now.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* ♻️ refactor(claude-code): move AskUserQuestion onto canonical Intervention surface (LOBE-8725)

Step 5's first cut shoehorned the pending form into the Render slot and
drove submit/skip with a custom `pluginState.askUserQuestion.status`
field, which forced three layers of glue:

- `Tool/Detail` had to bypass the loading placeholder via an
  identifier+apiName hardcode so the form would surface during
  `isToolCalling`
- The executor had to `messageService.getMessages → replaceMessages`
  after `agent_intervention_request` to drag the freshly-created tool
  row into in-memory state (the framework's own `tool_end →
  fetchAndReplaceMessages` only fires after the user answers)
- The executor also had to `associateMessageWithOperation` for the tool
  row so the form could look up the running CC op for IPC

All three were patches around skipping the canonical surface. This
commit moves AskUserQuestion onto `pluginIntervention.status='pending'`
and the `BuiltinToolInterventions` registry, which the framework
already drives end-to-end:

- `packages/builtin-tool-claude-code/src/client/Intervention/AskUserQuestion.tsx`
  — pure form, no IPC, no store reads. Resolves through the standard
  `onInteractionAction({type:'submit'|'skip'|'cancel'})` callback.
- `Render/AskUserQuestion` shrinks to the answered/aborted view only;
  the framework hides Render while pending, so no status switching.
- New `Inspector/AskUserQuestion` shows a compact "askUserQuestion · {header}"
  chip in the inline tool body, matching the rest of CC's tools.
- Registries: `ClaudeCodeInspectors`, `ClaudeCodeRenders`, and the new
  `ClaudeCodeInterventions` all key off `ClaudeCodeApiName.AskUserQuestion`;
  `BuiltinToolInterventions` gains a `[ClaudeCodeIdentifier]` entry.

Hetero needs a different action handler than `submitToolInteraction`
(which spawns `executeClientAgent` — wrong for a CC subprocess that's
already blocked on an MCP call). Two thin pieces wire that:

- `submitHeteroIntervention` (chat store) — sets
  `pluginIntervention` via `optimisticUpdateMessagePlugin` (which
  already syncs DB + in-memory + parent-assistant `tools[].intervention`
  in one shot), then forwards the answer through
  `heterogeneousAgentService.submitIntervention` IPC. Operation lookup
  walks the tool message's `parentId` to hit the assistant's
  `messageOperationMap` entry — drops the explicit
  `associateMessageWithOperation` call from the executor.
- `customInteractionHandlers.isHeteroInteractionIdentifier` flags
  `ClaudeCodeIdentifier`; `Tool/Detail/Intervention` short-circuits
  there before reaching the existing `submitToolInteraction` path.

Executor change collapses to one line:
`optimisticUpdateMessagePlugin(toolMsgId, { intervention: { status: 'pending' } })`.
The post-intervention refresh, the associate call, and the
`persistInterventionRequest` helper all go away.

Removed:
- `AskUserQuestionPluginState` type (custom field is gone)
- `Tool/Detail` `askUserPending` inline-render branch
- Executor `messageService.getMessages + replaceMessages` round-trip
- Executor `associateMessageWithOperation` for tool rows
- `persistInterventionRequest` helper

Verified end-to-end against a real CC subprocess on desktop:
- Inline body shows the new Inspector chip; pending form lives in the
  bottom InterventionBar (canonical surface)
- Submit ships answer through MCP, CC continues with structured result
- Skip flips status to `rejected`, framework's RejectedResponse
  shows "User skipped"; CC receives isError and falls back to text
- `mcp_servers.lobe_cc.status === 'connected'` on a 3rd sequential op
  (the per-session transport fix from the previous commit)
- `alwaysLoad: true` still produces 1-hop calls (no ToolSearch hop)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 💄 style(claude-code): inline numbered option cards for AskUserQuestion intervention (LOBE-8725)

Select dropdown was the wrong primitive — it hides options behind an extra
click and doesn't read like a question to answer. CC's underlying tool is
1-4 questions × 2-4 options, so the whole option set always fits inline.

- Each option renders as a clickable card: numbered chip (1/2/3/4) +
  bold label + secondary description on a single row. Hover tints the
  background; selected state lights up `colorPrimary` on both the chip
  and the card outline so the pick is unmistakable at a glance.
- Multi-select (`q.multiSelect`) toggles instead of replacing, with a
  "(multi-select)" hint in the question header.
- Multi-question support gets a proper visual hierarchy: each question
  past the first sits below a dashed divider, headed by a `Q1/N` tag
  + the original `q.header` chip. The `Q*/N` lets the user track
  progress without counting.
- Inspector picks up the question count too: now shows
  "askUserQuestion · {first header} +N" when multiple are queued.

Verified end-to-end on desktop with a CC-driven 2-question prompt
(4-option + 3-option). Both selections feed back to CC as a single
"User answers" payload, CC echoes both picks in its continuation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

*  feat(claude-code): tabbed multi-question + draft + timeout fallback for AskUserQuestion (LOBE-8725)

- Multi-question forms now use a top tab strip; single question renders inline.
- Picking a single-select option auto-advances to the next unanswered question.
- Drafts persist to tool message `pluginState.askUserDraft` so picks survive
  remount / HMR; new `setInterventionDraft` action on the chat store dispatches
  the pluginState patch.
- Timeout fallback: when the 5-min countdown expires, auto-submit option 1 for
  every unanswered question instead of letting the bridge time out into a
  cancelled isError — model gets a structured answer it can act on.
- Visual: selected option now uses filled `colorPrimaryBg` + right-aligned
  check icon; index chip stays neutral.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🐛 fix(hetero-agent): synchronously unlink temp mcp.json on app quit (LOBE-8725)

The async exit-handler cleanup raced Electron's main-process teardown and
left `lobe-cc-mcp-<opId>.json` files in `os.tmpdir()` after every quit. Sync
unlink in the quit hook is the only reliable guarantee.

Also handle SIGTERM / SIGINT — `before-quit` only fires on user-driven Cmd+Q
or `app.quit()`, not on external kills (test harness, OS shutdown).

Verified by manual test: pending askUserQuestion forms now leave zero
residue after both Cmd+Q and SIGTERM paths.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

*  feat(claude-code): persist structured AskUserQuestion answers + Q&A render (LOBE-8725)

Submit now writes the structured `{ questionText: pickedLabel(s) }` payload
to the tool message's `pluginState.askUserAnswers` (in-memory + DB merge), so
Render no longer has to scrape the bridge's prose `User answers:` content.

Render shows one Q&A block per question — header + question + a checkmark
card per picked option (multi-select fans out into multiple rows). Falls
back to a `—` placeholder when answers are missing (older messages or
skipped flows), and keeps the existing `pluginError` warning for cancel /
no-answer paths.

Also surfaces the answers in the Skill state inspector tab, which was
previously empty for completed askUserQuestion messages.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

*  test(hetero-agent): cover synchronous quit cleanup of AskUserQuestion temp configs (LOBE-8725)

Locks down the regression fixed in c0de0cdb7c — async exit-handler cleanup
losing to Electron's main-process teardown. Four cases: `before-quit`
(Cmd+Q / `app.quit()` path), `SIGTERM` (test harness / OS shutdown),
`SIGINT` (Ctrl-C), and idempotency (already-deleted temp file must not
throw on the second pass).

`process.on` and `process.exit` are stubbed in the signal-path tests so the
controller's listener attaches to a spy, not the test runner's process —
otherwise we'd leak a real SIGTERM listener every test.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-05-11 02:16:24 +08:00
committed by GitHub
parent ccc8ee1315
commit 5f24d179d4
34 changed files with 2910 additions and 52 deletions
@@ -76,7 +76,9 @@ find_project_pids() {
port_pid=$(lsof -ti tcp:"$CDP_PORT" -sTCP:LISTEN 2>/dev/null || true)
pids="$pids $port_pid"
echo "$pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' '
# `|| true` because `grep -v '^$'` exits 1 when input has no non-empty
# lines, which (with pipefail + set -e) silently kills the caller.
echo "$pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' ' || true
}
# Wait for the CDP HTTP endpoint to respond, with a deadline + early bail-out
@@ -146,7 +148,7 @@ do_stop() {
for pid in $seed_pids; do
all_pids="$all_pids $(expand_descendants "$pid")"
done
all_pids=$(echo "$all_pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' ')
all_pids=$(echo "$all_pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' ' || true)
if [ -z "$all_pids" ]; then
echo "[electron-dev] No project Electron/vite processes found."
@@ -270,10 +272,17 @@ do_start() {
# Launch in a new session (setsid) so the whole process tree shares a PGID
# we can later signal in one shot. `setsid bash -c '... exec ...' &` keeps
# the bash shell as the session leader; its PID is what we save.
setsid bash -c "
# macOS doesn't ship setsid by default — fall back to plain bash; cleanup
# still works via `expand_descendants` walking the process tree.
local launch_cmd="
cd '$PROJECT_ROOT/apps/desktop'
exec npx electron-vite dev -- --remote-debugging-port=$CDP_PORT
" >> "$ELECTRON_LOG" 2>&1 < /dev/null &
"
if command -v setsid >/dev/null 2>&1; then
setsid bash -c "$launch_cmd" >> "$ELECTRON_LOG" 2>&1 < /dev/null &
else
bash -c "$launch_cmd" >> "$ELECTRON_LOG" 2>&1 < /dev/null &
fi
local launcher_pid=$!
echo "$launcher_pid" > "$PIDFILE"
echo "[electron-dev] Launcher PID (session leader): $launcher_pid"
@@ -1,7 +1,9 @@
import type { ChildProcess } from 'node:child_process';
import { spawn } from 'node:child_process';
import { randomUUID } from 'node:crypto';
import { access, appendFile, mkdir, writeFile } from 'node:fs/promises';
import { unlinkSync } from 'node:fs';
import { access, appendFile, mkdir, unlink, writeFile } from 'node:fs/promises';
import os from 'node:os';
import path from 'node:path';
import type { Readable, Writable } from 'node:stream';
import { finished as streamFinished } from 'node:stream/promises';
@@ -14,6 +16,8 @@ import {
CODEX_CLI_INSTALL_DOCS_URL,
HeterogeneousAgentSessionErrorCode,
} from '@lobechat/electron-client-ipc';
import type { AskUserBridge } from '@lobechat/heterogeneous-agents/askUser';
import { AskUserMcpServer } from '@lobechat/heterogeneous-agents/askUser';
import type { AgentContentBlock } from '@lobechat/heterogeneous-agents/spawn';
import {
AgentStreamPipeline,
@@ -99,6 +103,18 @@ interface CancelSessionParams {
sessionId: string;
}
interface SubmitInterventionParams {
cancelled?: boolean;
/** When set, signals user-cancelled or timeout — the bridge resolves with isError. */
cancelReason?: 'timeout' | 'user_cancelled';
/** Operation id stamped on the request the renderer is responding to. */
operationId: string;
/** Structured user answer; ignored when `cancelled` is true. */
result?: unknown;
/** Correlation key carried on the original `agent_intervention_request`. */
toolCallId: string;
}
interface StopSessionParams {
sessionId: string;
}
@@ -150,10 +166,28 @@ interface CliTraceSession {
*
* Lifecycle: startSession → sendPrompt → (heteroAgentEvent broadcasts) → stopSession
*/
interface InterventionSlot {
bridge: AskUserBridge;
/** Resolves once bridge.events() iterator ends (after `cancelAll`). */
pumpDone: Promise<void>;
/** Path to the per-op temp `mcp.json` we wrote for `--mcp-config`. */
tmpConfigPath: string;
}
export default class HeterogeneousAgentCtr extends ControllerModule {
static override readonly groupName = 'heterogeneousAgent';
private sessions = new Map<string, AgentSession>();
/**
* Per-operation AskUserQuestion bridge state. Keyed by `operationId` so the
* `submitIntervention` IPC can route an answer to the right pending MCP
* handler regardless of which `sessionId` it belongs to (one session can
* fire many ops over its lifetime).
*/
private opIdToIntervention = new Map<string, InterventionSlot>();
/** Lazy single MCP server, started on first claude-code prompt. */
private askUserMcpServer?: AskUserMcpServer;
private askUserMcpStartPromise?: Promise<AskUserMcpServer>;
private resolveSessionCommand(session: AgentSession): string {
const resolvedCommand = session.command.trim();
@@ -567,6 +601,92 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
}
}
// ─── AskUserQuestion MCP server (LOBE-8725) ───
/**
* Lazy single-instance MCP server for CC's AskUserQuestion replacement.
* First claude-code prompt triggers `start()`; subsequent prompts reuse
* the same listener. Concurrent first-callers de-dupe via the in-flight
* promise so we don't bind two ports.
*/
private async ensureAskUserMcpServerStarted(): Promise<AskUserMcpServer> {
if (this.askUserMcpServer) return this.askUserMcpServer;
if (!this.askUserMcpStartPromise) {
this.askUserMcpStartPromise = (async () => {
const server = new AskUserMcpServer();
await server.start();
this.askUserMcpServer = server;
logger.info('AskUserQuestion MCP server started:', server.url);
return server;
})().catch((err) => {
// Reset so a later sendPrompt can retry; surface the error.
this.askUserMcpStartPromise = undefined;
logger.error('Failed to start AskUserQuestion MCP server:', err);
throw err;
});
}
return this.askUserMcpStartPromise;
}
/**
* Register a per-op AskUserQuestion bridge, write its temp `mcp.json`,
* and start pumping the bridge's outbound events into the regular
* `heteroAgentEvent` broadcast. Caller must invoke the returned cleanup
* after the spawn finishes (success, error, or cancel) to remove the
* temp file and tear down the bridge.
*
* Pump errors are logged but never thrown — they don't fail the spawn.
*/
private async setupInterventionForOp(
operationId: string,
sessionId: string,
): Promise<{ cleanup: () => Promise<void>; tmpConfigPath: string }> {
const server = await this.ensureAskUserMcpServerStarted();
const bridge = server.registerOperation(operationId);
const tmpConfigPath = path.join(os.tmpdir(), `lobe-cc-mcp-${operationId}.json`);
// `alwaysLoad: true` is the undocumented CC flag that promotes our
// server's tool out of the deferred set so the model calls it directly
// (no ToolSearch hop). See LOBE-8725 spike notes — falls back to the
// 2-hop ToolSearch path if a future CC drops the flag, no breakage.
const config = {
mcpServers: {
lobe_cc: {
alwaysLoad: true,
type: 'http' as const,
url: server.urlForOperation(operationId),
},
},
};
await writeFile(tmpConfigPath, JSON.stringify(config), 'utf8');
// Pump bridge.events() into the `heteroAgentEvent` broadcast. The
// iterator only ends after `cancelAll()`, so `pumpDone` resolves at
// cleanup time and gates teardown.
const pumpDone = (async () => {
for await (const event of bridge.events()) {
this.broadcast('heteroAgentEvent', { event, sessionId });
}
})().catch((err) => {
logger.warn('AskUserQuestion bridge pump error:', err);
});
this.opIdToIntervention.set(operationId, { bridge, pumpDone, tmpConfigPath });
const cleanup = async () => {
// Unregistering on the server cancels all bridge pendings AND closes
// the events iterator (cancelAll fires from within unregisterOperation).
this.askUserMcpServer?.unregisterOperation(operationId);
await pumpDone;
this.opIdToIntervention.delete(operationId);
await unlink(tmpConfigPath).catch(() => {
/* file may already be gone if app crashed mid-prompt */
});
};
return { cleanup, tmpConfigPath };
}
// ─── File cache ───
private get fileCacheDir(): string {
@@ -697,32 +817,58 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
throw new Error(preflightError.message);
}
const driver = getHeterogeneousAgentDriver(session.agentType);
const spawnPlan = await driver.buildSpawnPlan({
args: session.args,
helpers: {
buildClaudeStreamJsonInput: (prompt, imageList) =>
this.buildStreamJsonInput(prompt, imageList),
resolveCliImagePaths: (imageList) => this.resolveCliImagePaths(imageList),
},
imageList: params.imageList ?? [],
prompt: params.prompt,
resumeSessionId: session.agentSessionId,
});
// Stand up the AskUserQuestion MCP bridge for claude-code prompts BEFORE
// building the spawn plan so the driver can wire the temp config path
// into `--mcp-config`. Codex / future agents skip this entirely.
const intervention =
session.agentType === 'claude-code'
? await this.setupInterventionForOp(params.operationId, session.sessionId).catch((err) => {
logger.warn('Failed to set up AskUserQuestion bridge — proceeding without it:', err);
return undefined;
})
: undefined;
let spawnPlan;
let traceSession;
let cwd: string;
try {
const driver = getHeterogeneousAgentDriver(session.agentType);
spawnPlan = await driver.buildSpawnPlan({
args: session.args,
helpers: {
buildClaudeStreamJsonInput: (prompt, imageList) =>
this.buildStreamJsonInput(prompt, imageList),
resolveCliImagePaths: (imageList) => this.resolveCliImagePaths(imageList),
},
imageList: params.imageList ?? [],
mcpConfigPath: intervention?.tmpConfigPath,
prompt: params.prompt,
resumeSessionId: session.agentSessionId,
});
// Fall back to the user's Desktop so the process never inherits
// the Electron parent's cwd (which is `/` when launched from Finder).
cwd = session.cwd || electronApp.getPath('desktop');
traceSession = await this.createCliTraceSession({
cliArgs: spawnPlan.args,
cwd,
imageList: params.imageList ?? [],
session,
stdinPayload: spawnPlan.stdinPayload,
});
} catch (err) {
// We never made it to spawn — the `proc.on('exit')` cleanup path
// won't run, so tear the intervention bridge down right here.
if (intervention) {
await intervention.cleanup().catch((cleanupErr) => {
logger.warn('AskUserQuestion cleanup error during pre-spawn failure:', cleanupErr);
});
}
throw err;
}
const useStdin = spawnPlan.stdinPayload !== undefined;
const cliArgs = spawnPlan.args;
// Fall back to the user's Desktop so the process never inherits
// the Electron parent's cwd (which is `/` when launched from Finder).
const cwd = session.cwd || electronApp.getPath('desktop');
const traceSession = await this.createCliTraceSession({
cliArgs,
cwd,
imageList: params.imageList ?? [],
session,
stdinPayload: spawnPlan.stdinPayload,
});
return new Promise<void>((resolve, reject) => {
logger.info('Spawning agent:', session.command, cliArgs.join(' '), `(cwd: ${cwd})`);
@@ -838,6 +984,15 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
void stdoutDrained
.then(() => stdoutBroadcastQueue)
.finally(async () => {
// Tear down the AskUserQuestion bridge / temp `mcp.json` for this
// op. Pending MCP handlers get a `session_ended` cancellation so
// they return cleanly even if CC was killed mid-tool-call.
if (intervention) {
await intervention.cleanup().catch((err) => {
logger.warn('AskUserQuestion cleanup error:', err);
});
}
void this.writeCliTraceJson(traceSession, 'exit.json', {
code,
finishedAt: new Date().toISOString(),
@@ -972,10 +1127,54 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
}
/**
* Cleanup on app quit.
* Renderer → main: deliver the user's answer to a pending CC AskUserQuestion
* (or signal cancellation). The matching bridge resolves its blocked
* `pending()` Promise, the local MCP handler returns to CC, and CC's
* `tool_result` flows back through the normal stream pipeline.
*
* Idempotent — late submissions for already-resolved tool calls are no-ops.
* No-op when called for an unknown opId; the bridge may have been cleaned
* up already (op finished / cancelled).
*/
@IpcMethod()
async submitIntervention(params: SubmitInterventionParams): Promise<void> {
const slot = this.opIdToIntervention.get(params.operationId);
if (!slot) {
logger.warn('submitIntervention: no active intervention for operationId', params.operationId);
return;
}
slot.bridge.resolve(params.toolCallId, {
cancelReason: params.cancelled ? (params.cancelReason ?? 'user_cancelled') : undefined,
cancelled: params.cancelled,
result: params.result,
});
}
/**
* Synchronously unlink every pending intervention's temp `mcp.json`. The
* async exit-handler cleanup loses to Electron's main-process teardown
* often enough that we'd leak `lobe-cc-mcp-<opId>.json` files into
* `os.tmpdir()` on real shutdowns; sync unlink here is the only reliable
* guarantee. Safe to call multiple times.
*/
private unlinkPendingInterventionConfigsSync = (): void => {
for (const [, intervention] of this.opIdToIntervention) {
try {
unlinkSync(intervention.tmpConfigPath);
} catch {
/* file may already be gone — fine */
}
}
};
/**
* Cleanup on app quit. `before-quit` covers the user-driven Cmd+Q /
* `app.quit()` path; SIGTERM / SIGINT cover external kills (test
* harnesses, OS shutdown) where Electron's lifecycle events never fire.
*/
afterAppReady() {
electronApp.on('before-quit', () => {
this.unlinkPendingInterventionConfigsSync();
for (const [, session] of this.sessions) {
if (session.process && !session.process.killed) {
session.cancelledByUs = true;
@@ -983,6 +1182,28 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
}
}
this.sessions.clear();
// The exit handlers will tear each per-op intervention down, but if
// CC's stdio close races shutdown we'd leave the MCP server bound to
// a port. Stopping it here cancels every still-pending bridge with
// `session_ended` and closes the listener.
void this.askUserMcpServer?.stop().catch((err) => {
logger.warn('AskUserQuestion MCP server stop error:', err);
});
});
const onSignal = (signal: NodeJS.Signals) => {
this.unlinkPendingInterventionConfigsSync();
// Defer to Electron's normal quit flow so the rest of the app gets a
// chance to tear down. The `before-quit` handler above is idempotent.
try {
electronApp.quit();
} catch {
/* during late shutdown app.quit may throw — fine */
}
// Last-resort exit if Electron is wedged and won't quit on its own.
setTimeout(() => process.exit(signal === 'SIGINT' ? 130 : 143), 1000).unref();
};
process.on('SIGTERM', onSignal);
process.on('SIGINT', onSignal);
}
}
@@ -802,4 +802,131 @@ describe('HeterogeneousAgentCtr', () => {
expect(toolEnds.length).toBeGreaterThan(0);
});
});
describe('app-quit cleanup of AskUserQuestion temp configs (LOBE-8725)', () => {
// The async exit-handler cleanup races Electron's main-process teardown
// and used to leak `lobe-cc-mcp-<opId>.json` files in `os.tmpdir()` on
// every quit. The controller now unlinks pending intervention temp
// configs *synchronously* from `before-quit` AND from process signal
// handlers (SIGTERM / SIGINT — `before-quit` doesn't fire on external
// kills). These tests exercise both paths against real files.
/**
* Drop a temp `lobe-cc-mcp-<id>.json` and stash it on the controller's
* `opIdToIntervention` map under the same key, so the quit hook treats
* it like a real pending intervention and tries to unlink it.
*/
const seedPendingIntervention = async (ctr: HeterogeneousAgentCtr, opId: string) => {
const tmpConfigPath = path.join(tmpdir(), `lobe-cc-mcp-test-${opId}.json`);
await writeFile(tmpConfigPath, '{"mcpServers":{}}');
const slot = {
bridge: {} as any,
pumpDone: Promise.resolve(),
tmpConfigPath,
};
(ctr as any).opIdToIntervention.set(opId, slot);
return tmpConfigPath;
};
const captureRegisteredHandler = (
registerSpy: ReturnType<typeof vi.fn> | ReturnType<typeof vi.spyOn>,
eventName: string,
): (() => void) => {
const calls = (registerSpy as any).mock.calls as Array<[string, () => void]>;
const match = calls.findLast(([evt]) => evt === eventName);
if (!match) throw new Error(`no handler registered for "${eventName}"`);
return match[1];
};
it('before-quit synchronously unlinks every pending intervention temp config', async () => {
const electron = (await import('electron')) as any;
electron.app.on.mockClear();
const ctr = new HeterogeneousAgentCtr({
appStoragePath,
storeManager: { get: vi.fn() },
} as any);
const fileA = await seedPendingIntervention(ctr, 'opA');
const fileB = await seedPendingIntervention(ctr, 'opB');
ctr.afterAppReady();
const beforeQuit = captureRegisteredHandler(electron.app.on, 'before-quit');
beforeQuit();
await expect(access(fileA)).rejects.toThrow();
await expect(access(fileB)).rejects.toThrow();
});
it('SIGTERM handler unlinks pending intervention temp configs (external-kill path)', async () => {
// External kills (test harness, OS shutdown) skip Electron's lifecycle
// events entirely — `before-quit` never fires, so the controller has to
// hook the raw process signal too. Stub `process.on` so the handler is
// *recorded* but never actually attached to the test runner's process
// (otherwise the test leaks a SIGTERM listener that survives the test).
// Same for `process.exit` — the controller's fail-safe shouldn't get a
// chance to actually exit the worker if its `setTimeout(...).unref()`
// ever fires before mockRestore.
const electron = (await import('electron')) as any;
electron.app.on.mockClear();
const processOnSpy = vi.spyOn(process, 'on').mockImplementation(() => process);
const processExitSpy = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never);
const ctr = new HeterogeneousAgentCtr({
appStoragePath,
storeManager: { get: vi.fn() },
} as any);
const file = await seedPendingIntervention(ctr, 'opSigterm');
ctr.afterAppReady();
const sigterm = captureRegisteredHandler(processOnSpy, 'SIGTERM');
sigterm();
await expect(access(file)).rejects.toThrow();
processOnSpy.mockRestore();
processExitSpy.mockRestore();
});
it('SIGINT handler unlinks pending intervention temp configs (Ctrl-C path)', async () => {
const electron = (await import('electron')) as any;
electron.app.on.mockClear();
const processOnSpy = vi.spyOn(process, 'on').mockImplementation(() => process);
const processExitSpy = vi.spyOn(process, 'exit').mockImplementation(() => undefined as never);
const ctr = new HeterogeneousAgentCtr({
appStoragePath,
storeManager: { get: vi.fn() },
} as any);
const file = await seedPendingIntervention(ctr, 'opSigint');
ctr.afterAppReady();
const sigint = captureRegisteredHandler(processOnSpy, 'SIGINT');
sigint();
await expect(access(file)).rejects.toThrow();
processOnSpy.mockRestore();
processExitSpy.mockRestore();
});
it('cleanup is idempotent — already-deleted files do not throw', async () => {
const electron = (await import('electron')) as any;
electron.app.on.mockClear();
const ctr = new HeterogeneousAgentCtr({
appStoragePath,
storeManager: { get: vi.fn() },
} as any);
const file = await seedPendingIntervention(ctr, 'opIdempotent');
// Pre-delete the file out from under the controller — simulates a
// partial cleanup race where the async exit handler beat us to it.
await unlink(file);
ctr.afterAppReady();
const beforeQuit = captureRegisteredHandler(electron.app.on, 'before-quit');
expect(() => beforeQuit()).not.toThrow();
});
});
});
@@ -0,0 +1,62 @@
import { describe, expect, it } from 'vitest';
import type {
HeterogeneousAgentBuildPlanHelpers,
HeterogeneousAgentBuildPlanParams,
} from '../types';
import { claudeCodeDriver } from './claudeCode';
const stubHelpers: HeterogeneousAgentBuildPlanHelpers = {
buildClaudeStreamJsonInput: async () => '{"type":"user","message":{}}\n',
resolveCliImagePaths: async () => [],
};
const buildParams = (
overrides: Partial<HeterogeneousAgentBuildPlanParams> = {},
): HeterogeneousAgentBuildPlanParams => ({
args: [],
helpers: stubHelpers,
imageList: [],
prompt: 'hi',
...overrides,
});
describe('claudeCodeDriver', () => {
it('omits --mcp-config when mcpConfigPath is undefined', async () => {
const { args } = await claudeCodeDriver.buildSpawnPlan(buildParams());
expect(args).not.toContain('--mcp-config');
});
it('appends --mcp-config <path> when mcpConfigPath is provided', async () => {
const { args } = await claudeCodeDriver.buildSpawnPlan(
buildParams({ mcpConfigPath: '/tmp/lobe-cc-mcp-op-1.json' }),
);
const idx = args.indexOf('--mcp-config');
expect(idx).toBeGreaterThan(-1);
expect(args[idx + 1]).toBe('/tmp/lobe-cc-mcp-op-1.json');
});
it('still pins --disallowedTools AskUserQuestion alongside --mcp-config', async () => {
// Even with our local MCP replacement available, CC's built-in stays
// disabled — leaving both visible would let the model double-register
// the same name and pick the broken one.
const { args } = await claudeCodeDriver.buildSpawnPlan(
buildParams({ mcpConfigPath: '/tmp/x.json' }),
);
const disallowedIdx = args.indexOf('--disallowedTools');
expect(disallowedIdx).toBeGreaterThan(-1);
expect(args[disallowedIdx + 1]).toBe('AskUserQuestion');
});
it('--mcp-config goes before --resume so user --args can still override the resume id', async () => {
const { args } = await claudeCodeDriver.buildSpawnPlan(
buildParams({ mcpConfigPath: '/tmp/x.json', resumeSessionId: 'cc-prev-1' }),
);
const mcpIdx = args.indexOf('--mcp-config');
const resumeIdx = args.indexOf('--resume');
expect(mcpIdx).toBeGreaterThan(-1);
expect(resumeIdx).toBeGreaterThan(-1);
expect(mcpIdx).toBeLessThan(resumeIdx);
expect(args[resumeIdx + 1]).toBe('cc-prev-1');
});
});
@@ -18,6 +18,7 @@ export const claudeCodeDriver: HeterogeneousAgentDriver = {
args,
helpers,
imageList,
mcpConfigPath,
prompt,
resumeSessionId,
}: HeterogeneousAgentBuildPlanParams) {
@@ -26,6 +27,10 @@ export const claudeCodeDriver: HeterogeneousAgentDriver = {
return {
args: [
...DESKTOP_CLAUDE_CODE_ARGS,
// Wire the controller-managed temp mcp.json (AskUserQuestion server,
// see LOBE-8725) when present. Path-based config is required — CC
// does not accept inline JSON for `--mcp-config`.
...(mcpConfigPath ? ['--mcp-config', mcpConfigPath] : []),
...(resumeSessionId ? ['--resume', resumeSessionId] : []),
...args,
],
@@ -20,6 +20,12 @@ export interface HeterogeneousAgentBuildPlanParams {
args: string[];
helpers: HeterogeneousAgentBuildPlanHelpers;
imageList: HeterogeneousAgentImageAttachment[];
/**
* Optional path to an MCP config JSON written by the controller (e.g. for
* the local `lobe_cc` AskUserQuestion server). Drivers that recognize the
* field append `--mcp-config <path>`; others ignore it.
*/
mcpConfigPath?: string;
prompt: string;
resumeSessionId?: string;
}
@@ -1,5 +1,7 @@
export { AgentStreamClient } from './client';
export type {
AgentInterventionRequestData,
AgentInterventionResponseData,
AgentStreamClientEvents,
AgentStreamClientOptions,
AgentStreamEvent,
@@ -16,6 +16,23 @@ export type AgentStreamEventType =
* wire union so consumers can pattern-match without casting.
*/
| 'tool_result'
/**
* Producer needs structured input from the user mid-run (e.g. CC's
* AskUserQuestion delivered via a local MCP server). Distinct from
* `tool_execute` — that one means "client, please run this tool"; this
* one means "user, please answer these questions". Renderer surfaces a
* dedicated UI; the producer's MCP handler stays pending until the
* paired `agent_intervention_response` resolves it (or the deadline
* passes / the op is cancelled).
*/
| 'agent_intervention_request'
/**
* The user's answer to a prior `agent_intervention_request`. Flows back
* to the producer (Electron main → MCP handler resolve, sandbox →
* server bus → CLI). Carries either a structured `result` or a
* cancellation marker.
*/
| 'agent_intervention_response'
| 'step_start'
| 'step_complete'
| 'error';
@@ -78,6 +95,39 @@ export interface StepCompleteData {
reasonDetail?: string;
}
/**
* Producer → consumer: structured-input request the user must answer
* directly (no tool execution involved). The producer's tool handler stays
* blocked until a matching `agent_intervention_response` (correlated by
* `toolCallId`) flows back, or the `deadline` is reached.
*/
export interface AgentInterventionRequestData {
/** Tool API name (e.g. `'askUserQuestion'`). */
apiName: string;
/** JSON-encoded payload the UI renders (e.g. `{ questions: [...] }`). */
arguments: string;
/** Unix-ms wall-clock at which the producer will give up waiting. */
deadline: number;
/** Tool plugin identifier (e.g. `'claude-code'`). */
identifier: string;
/** Correlation key. Stable for the lifetime of the intervention. */
toolCallId: string;
}
/**
* Consumer → producer: the user's answer to a prior intervention request.
* Either `result` (success) or `cancelled: true` (timeout / user cancel).
*/
export interface AgentInterventionResponseData {
/** Set when the user cancelled or the deadline elapsed. */
cancelled?: boolean;
/** When `cancelled`, optional reason for telemetry/logging. */
cancelReason?: 'timeout' | 'user_cancelled' | 'session_ended';
/** User-supplied answer (JSON-serializable). Absent when cancelled. */
result?: unknown;
toolCallId: string;
}
/**
* Server → Client: request the client to execute a tool locally and return the result.
*/
@@ -0,0 +1,60 @@
'use client';
import { inspectorTextStyles, shinyTextStyles } from '@lobechat/shared-tool-ui/styles';
import type { BuiltinInspectorProps } from '@lobechat/types';
import { createStaticStyles, cx } from 'antd-style';
import { memo } from 'react';
import { useTranslation } from 'react-i18next';
import { type AskUserQuestionArgs, ClaudeCodeApiName } from '../../types';
const styles = createStaticStyles(({ css, cssVar }) => ({
chip: css`
overflow: hidden;
min-width: 0;
margin-inline-start: 6px;
padding-block: 2px;
padding-inline: 10px;
border-radius: 999px;
font-size: 12px;
color: ${cssVar.colorText};
text-overflow: ellipsis;
white-space: nowrap;
background: ${cssVar.colorFillTertiary};
`,
}));
export const AskUserQuestionInspector = memo<BuiltinInspectorProps<AskUserQuestionArgs>>(
({ args, partialArgs, isArgumentsStreaming, isLoading }) => {
const { t } = useTranslation('plugin');
const label = t(ClaudeCodeApiName.AskUserQuestion as any);
const questions = args?.questions ?? partialArgs?.questions ?? [];
const summary =
questions.length === 0
? undefined
: questions.length === 1
? questions[0]?.header || questions[0]?.question
: `${questions[0]?.header || questions[0]?.question} +${questions.length - 1}`;
if (isArgumentsStreaming && !summary) {
return <div className={cx(inspectorTextStyles.root, shinyTextStyles.shinyText)}>{label}</div>;
}
return (
<div
className={cx(
inspectorTextStyles.root,
(isArgumentsStreaming || isLoading) && shinyTextStyles.shinyText,
)}
>
<span>{label}</span>
{summary && <span className={styles.chip}>{summary}</span>}
</div>
);
},
);
AskUserQuestionInspector.displayName = 'ClaudeCodeAskUserQuestionInspector';
@@ -8,6 +8,7 @@ import {
import { ClaudeCodeApiName } from '../../types';
import { AgentInspector } from './Agent';
import { AskUserQuestionInspector } from './AskUserQuestion';
import { EditInspector } from './Edit';
import { ReadInspector } from './Read';
import { ScheduleWakeupInspector } from './ScheduleWakeup';
@@ -28,6 +29,7 @@ import { WriteInspector } from './Write';
// state for diff stats), so they live in their own sibling files.
export const ClaudeCodeInspectors = {
[ClaudeCodeApiName.Agent]: AgentInspector,
[ClaudeCodeApiName.AskUserQuestion]: AskUserQuestionInspector,
[ClaudeCodeApiName.Bash]: createRunCommandInspector(ClaudeCodeApiName.Bash),
[ClaudeCodeApiName.Edit]: EditInspector,
[ClaudeCodeApiName.Glob]: createGlobLocalFilesInspector(ClaudeCodeApiName.Glob),
@@ -0,0 +1,388 @@
'use client';
import type { BuiltinInterventionProps } from '@lobechat/types';
import { Button, Flexbox, Icon, Tabs, Text } from '@lobehub/ui';
import { createStaticStyles, cx } from 'antd-style';
import { Check, Send, X } from 'lucide-react';
import { memo, useCallback, useEffect, useMemo, useState } from 'react';
import { useConversationStore } from '@/features/Conversation/store';
import { dataSelectors } from '@/features/Conversation/store/slices/data/selectors';
import { useChatStore } from '@/store/chat';
import type { AskUserQuestionArgs, AskUserQuestionItem } from '../../types';
/**
* Server-side bridge timeout (matches `AskUserMcpServer.pendingTimeoutMs`).
* Not strictly synchronized — server is authoritative — but keeps the on-screen
* countdown close to reality without plumbing a deadline through every layer.
*/
const COUNTDOWN_MS = 5 * 60 * 1000;
/** Key under tool message `pluginState` where in-progress draft answers live. */
const DRAFT_PLUGIN_STATE_KEY = 'askUserDraft';
const formatRemaining = (msLeft: number): string => {
const totalSec = Math.max(0, Math.floor(msLeft / 1000));
const min = Math.floor(totalSec / 60);
const sec = totalSec % 60;
return `${min}:${sec.toString().padStart(2, '0')}`;
};
const styles = createStaticStyles(({ css, cssVar }) => ({
// Card sits inline with the chat — no surrounding panel chrome. Hover
// tints the row so the stack reads as clickable; selection swaps to a
// filled `colorPrimaryBg` so the pick is visually weighty.
option: css`
cursor: pointer;
padding-block: 10px;
padding-inline: 12px;
border-radius: 8px;
transition: background 0.12s ease;
&:hover {
background: ${cssVar.colorFillQuaternary};
}
`,
optionCheck: css`
flex-shrink: 0;
color: ${cssVar.colorPrimary};
`,
optionDescription: css`
font-size: 12px;
line-height: 1.45;
color: ${cssVar.colorTextSecondary};
`,
// Neutral 1/2/3/4 chip — stays the same colour whether selected or not so
// the selection signal lives on the filled background + checkmark.
optionIndex: css`
flex-shrink: 0;
box-sizing: border-box;
width: 22px;
height: 22px;
border-radius: 6px;
font-family: ${cssVar.fontFamilyCode};
font-size: 12px;
font-weight: 600;
line-height: 22px;
color: ${cssVar.colorTextSecondary};
text-align: center;
background: ${cssVar.colorFillTertiary};
`,
optionLabel: css`
font-weight: 500;
`,
optionSelected: css`
background: ${cssVar.colorPrimaryBg};
&:hover {
background: ${cssVar.colorPrimaryBgHover};
}
`,
}));
interface OptionCardProps {
description?: string;
disabled?: boolean;
index: number;
label: string;
onToggle: () => void;
selected: boolean;
}
/**
* One numbered option in a question. Outlined when picked, neutral otherwise;
* a right-side checkmark seals the selection so the state reads cleanly even
* with the number chip kept neutral.
*/
const OptionCard = memo<OptionCardProps>(
({ index, label, description, selected, disabled, onToggle }) => (
<Flexbox
horizontal
align="center"
aria-selected={selected}
className={cx(styles.option, selected && styles.optionSelected)}
gap={12}
role="option"
onClick={() => {
if (!disabled) onToggle();
}}
>
<span className={styles.optionIndex}>{index}</span>
<Flexbox flex={1} gap={2}>
<Text className={styles.optionLabel}>{label}</Text>
{description && <span className={styles.optionDescription}>{description}</span>}
</Flexbox>
{selected && <Icon className={styles.optionCheck} icon={Check} size={16} />}
</Flexbox>
),
);
OptionCard.displayName = 'CCAskUserQuestionOption';
interface QuestionPanelProps {
answer: string | string[] | undefined;
disabled: boolean;
onToggle: (q: AskUserQuestionItem, label: string) => void;
question: AskUserQuestionItem;
}
const QuestionPanel = memo<QuestionPanelProps>(({ question, answer, disabled, onToggle }) => {
const isOptionSelected = (label: string): boolean =>
question.multiSelect ? Array.isArray(answer) && answer.includes(label) : answer === label;
return (
<Flexbox gap={10}>
<Flexbox horizontal align="center" gap={8}>
{question.header && <Text type="secondary">{question.header}</Text>}
{question.multiSelect && (
<Text fontSize={12} type="secondary">
(multi-select)
</Text>
)}
</Flexbox>
<Text strong>{question.question}</Text>
<Flexbox gap={4} role="listbox">
{question.options.map((opt, optIdx) => (
<OptionCard
description={opt.description}
disabled={disabled}
index={optIdx + 1}
key={opt.label}
label={opt.label}
selected={isOptionSelected(opt.label)}
onToggle={() => onToggle(question, opt.label)}
/>
))}
</Flexbox>
</Flexbox>
);
});
QuestionPanel.displayName = 'CCAskUserQuestionPanel';
/**
* CC AskUserQuestion intervention component.
*
* Pure form — `onInteractionAction` ({type:'submit'|'skip'}) is the only
* outbound side effect. The framework's `handleInteractionAction` (or the
* hetero branch the chat conversation wires up) is responsible for marking
* `pluginIntervention.status` and forwarding the answer to CC over IPC.
*
* Layout
* - One question → renders the question + options directly, no tab strip.
* - Multiple questions → top tab bar (Q1, Q2, …), one panel visible at a
* time. Picking an answer auto-advances to the next unanswered question
* so the user sweeps through without re-clicking the tabs.
*
* Draft persistence
* - Per-message state lives on the tool message's `pluginState.askUserDraft`
* (see `setInterventionDraft` in the chat store). HMR reloads, store
* re-mounts, and tab switches all keep the partial answers around — only
* a fresh `tool_use` (different toolCallId / messageId) starts blank.
*/
const AskUserQuestionIntervention = memo<BuiltinInterventionProps<AskUserQuestionArgs>>(
({ args, messageId, onInteractionAction }) => {
const questions = args?.questions ?? [];
// Persisted draft (survives unmount / HMR / refresh) — read from the tool
// message's pluginState so the form stays where the user left it.
const persistedDraft = useConversationStore((s) => {
const msg = dataSelectors.getDbMessageById(messageId)(s);
return (
msg?.pluginState as { [DRAFT_PLUGIN_STATE_KEY]?: Record<string, string | string[]> }
)?.[DRAFT_PLUGIN_STATE_KEY];
});
const setInterventionDraft = useChatStore((s) => s.setInterventionDraft);
const [answers, setAnswers] = useState<Record<string, string | string[]>>(
() => persistedDraft ?? {},
);
const [submitting, setSubmitting] = useState(false);
const [activeTab, setActiveTab] = useState<string>(() => {
// Resume on the first unanswered question so coming back lands the user
// where they left off rather than always at Q1.
const initial = persistedDraft ?? {};
const firstUnanswered = questions.findIndex((q) => {
const a = initial[q.question];
return q.multiSelect ? !Array.isArray(a) || a.length === 0 : !a;
});
const idx = firstUnanswered >= 0 ? firstUnanswered : 0;
return String(idx);
});
// Mounted-time deadline; server has its own clock and will return
// isError if it expires first. Drift of a few seconds is fine.
const deadline = useMemo(() => Date.now() + COUNTDOWN_MS, []);
const [now, setNow] = useState(() => Date.now());
useEffect(() => {
const id = setInterval(() => setNow(Date.now()), 1000);
return () => clearInterval(id);
}, []);
const expired = now >= deadline;
const handleToggle = useCallback(
(q: AskUserQuestionItem, label: string) => {
setAnswers((prev) => {
let next: Record<string, string | string[]>;
if (q.multiSelect) {
const current = (prev[q.question] as string[] | undefined) ?? [];
const updated = current.includes(label)
? current.filter((x) => x !== label)
: [...current, label];
next = { ...prev, [q.question]: updated };
} else {
next = { ...prev, [q.question]: label };
}
// Persist to pluginState so the picks survive remount / refresh.
setInterventionDraft(messageId, next);
// Single-select auto-advance: if there's a next unanswered question,
// jump to it. Multi-select stays on the same panel so the user can
// toggle additional options.
if (!q.multiSelect && questions.length > 1) {
const nextUnanswered = questions.findIndex((qq, idx) => {
if (qq.question === q.question) return false;
const a = next[qq.question];
if (idx < 0) return false;
return qq.multiSelect ? !Array.isArray(a) || a.length === 0 : !a;
});
if (nextUnanswered >= 0) setActiveTab(String(nextUnanswered));
}
return next;
});
},
[messageId, questions, setInterventionDraft],
);
/**
* Submit `payload` exactly as given. Used by both the explicit "Submit"
* button (with whatever the user picked) and the timeout fallback (with
* option 1 of each unanswered question merged in).
*/
const submitWith = useCallback(
async (payload: Record<string, string | string[]>) => {
if (!onInteractionAction || submitting) return;
setSubmitting(true);
try {
await onInteractionAction({ payload, type: 'submit' });
} catch (err) {
console.error('[AskUserQuestion] submit failed:', err);
setSubmitting(false);
}
},
[onInteractionAction, submitting],
);
const handleSubmit = useCallback(() => submitWith(answers), [answers, submitWith]);
const handleSkip = useCallback(async () => {
if (!onInteractionAction || submitting) return;
setSubmitting(true);
try {
await onInteractionAction({ type: 'skip' });
} catch (err) {
console.error('[AskUserQuestion] skip failed:', err);
setSubmitting(false);
}
}, [onInteractionAction, submitting]);
const allAnswered = useMemo(
() =>
questions.every((q) => {
const a = answers[q.question];
return q.multiSelect ? Array.isArray(a) && a.length > 0 : !!a;
}),
[answers, questions],
);
// Timeout fallback: when the countdown hits zero and the user hasn't
// submitted, fill in option 1 of each unanswered question and submit.
// Beats letting the server-side bridge time out into a `cancelled`
// result — the model gets a structured answer it can act on instead of
// a "user didn't respond" isError. Single-shot via `submitting` guard.
useEffect(() => {
if (!expired || submitting || questions.length === 0) return;
const fallback: Record<string, string | string[]> = { ...answers };
for (const q of questions) {
const a = fallback[q.question];
const unanswered = q.multiSelect ? !Array.isArray(a) || a.length === 0 : !a;
if (unanswered && q.options.length > 0) {
const first = q.options[0].label;
fallback[q.question] = q.multiSelect ? [first] : first;
}
}
void submitWith(fallback);
}, [expired, submitting, questions, answers, submitWith]);
const isMulti = questions.length > 1;
const activeQuestion = questions[Number(activeTab)] ?? questions[0];
return (
<Flexbox gap={12}>
{isMulti && (
<Tabs
compact
activeKey={activeTab}
items={questions.map((q, idx) => {
const a = answers[q.question];
const done = q.multiSelect ? Array.isArray(a) && a.length > 0 : !!a;
return {
key: String(idx),
label: (
<Flexbox horizontal align="center" gap={6}>
<Text>Q{idx + 1}</Text>
{done && <Icon icon={Check} size={12} />}
</Flexbox>
),
};
})}
onChange={(key) => setActiveTab(key as string)}
/>
)}
{activeQuestion && (
<QuestionPanel
answer={answers[activeQuestion.question]}
disabled={expired || submitting}
question={activeQuestion}
onToggle={handleToggle}
/>
)}
<Flexbox horizontal align="center" gap={8} justify="space-between">
<Text fontSize={12} type="secondary">
{expired
? 'Time expired — using option 1 of each question.'
: `Time remaining: ${formatRemaining(deadline - now)} · ` +
'unanswered questions default to option 1 on timeout.'}
</Text>
<Flexbox horizontal gap={8}>
<Button disabled={submitting} icon={X} onClick={handleSkip}>
Skip
</Button>
<Button
disabled={!allAnswered || expired || submitting}
icon={Send}
loading={submitting}
type="primary"
onClick={handleSubmit}
>
Submit
</Button>
</Flexbox>
</Flexbox>
</Flexbox>
);
},
);
AskUserQuestionIntervention.displayName = 'CCAskUserQuestionIntervention';
export default AskUserQuestionIntervention;
@@ -0,0 +1,14 @@
import type { BuiltinIntervention } from '@lobechat/types';
import { ClaudeCodeApiName } from '../../types';
import AskUserQuestionIntervention from './AskUserQuestion';
/**
* Claude Code Intervention components.
*
* Currently only `askUserQuestion` (CC's clarifying-question MCP tool) needs
* one. Approval / file-picker etc. would slot in alongside.
*/
export const ClaudeCodeInterventions: Record<string, BuiltinIntervention> = {
[ClaudeCodeApiName.AskUserQuestion]: AskUserQuestionIntervention as BuiltinIntervention,
};
@@ -0,0 +1,131 @@
'use client';
import type { BuiltinRenderProps } from '@lobechat/types';
import { Flexbox, Icon, Text } from '@lobehub/ui';
import { createStaticStyles, cx } from 'antd-style';
import { Check } from 'lucide-react';
import { memo } from 'react';
import type { AskUserQuestionArgs, AskUserQuestionItem } from '../../../types';
/** Persisted draft + answer shape stored on `pluginState`. */
interface AskUserQuestionState {
askUserAnswers?: Record<string, string | string[]>;
askUserDraft?: Record<string, string | string[]>;
}
const styles = createStaticStyles(({ css, cssVar }) => ({
answer: css`
color: ${cssVar.colorText};
`,
answerRow: css`
padding-block: 6px;
padding-inline: 10px;
border-radius: 6px;
background: ${cssVar.colorBgContainer};
`,
check: css`
flex-shrink: 0;
color: ${cssVar.colorPrimary};
`,
container: css`
padding: 12px;
border-radius: ${cssVar.borderRadiusLG};
background: ${cssVar.colorFillQuaternary};
`,
header: css`
font-size: 12px;
color: ${cssVar.colorTextSecondary};
`,
question: css`
font-weight: 500;
`,
}));
interface QABlockProps {
answer?: string | string[];
question: AskUserQuestionItem;
}
/**
* One question/answer pair for the completed Render. The original question
* stays visible (header + body); the answer renders as one card per picked
* option (multi-select fans out into multiple rows). When `answer` is
* absent — older messages persisted before LOBE-8725 added structured
* storage — we show a `—` placeholder so the layout stays uniform.
*/
const QABlock = memo<QABlockProps>(({ question, answer }) => {
const labels: string[] = Array.isArray(answer) ? answer : answer ? [answer] : [];
const optionByLabel = new Map(question.options.map((o) => [o.label, o]));
return (
<Flexbox gap={6}>
{question.header && <span className={styles.header}>{question.header}</span>}
<Text className={styles.question}>{question.question}</Text>
{labels.length > 0 ? (
<Flexbox gap={4}>
{labels.map((label) => {
const opt = optionByLabel.get(label);
return (
<Flexbox
horizontal
align="center"
className={cx(styles.answerRow)}
gap={8}
key={label}
>
<Icon className={styles.check} icon={Check} size={14} />
<Flexbox flex={1} gap={2}>
<Text className={styles.answer}>{label}</Text>
{opt?.description && opt.description !== label && (
<span className={styles.header}>{opt.description}</span>
)}
</Flexbox>
</Flexbox>
);
})}
</Flexbox>
) : (
<Text type="secondary"></Text>
)}
</Flexbox>
);
});
QABlock.displayName = 'CCAskUserQuestionQABlock';
/**
* CC `askUserQuestion` Render — answered / aborted state only.
*
* The pending form lives on the canonical Intervention surface
* (`BuiltinToolInterventions['claude-code']['askUserQuestion']`) — the
* framework hides this Render while `pluginIntervention.status === 'pending'`,
* then yields to it once the user submits / skips and a `tool_result` arrives.
*
* Structured rendering reads `pluginState.askUserAnswers`, written by
* `setInterventionAnswers` in `conversationControl` at submit time. If the
* key is missing (older messages, or skipped/cancelled flows where there's
* nothing to show), we fall back to the question list with a status hint.
*/
const AskUserQuestion = memo<
BuiltinRenderProps<AskUserQuestionArgs, AskUserQuestionState, unknown>
>(({ args, pluginError, pluginState }) => {
const questions = args?.questions ?? [];
const answers = pluginState?.askUserAnswers;
const isError = !!pluginError;
return (
<Flexbox className={styles.container} gap={12}>
{questions.map((q, idx) => (
<QABlock answer={answers?.[q.question]} key={`${q.question}-${idx}`} question={q} />
))}
{isError && (
<Text type="warning">(No answer received model continued without their input.)</Text>
)}
</Flexbox>
);
});
AskUserQuestion.displayName = 'CCAskUserQuestion';
export default AskUserQuestion;
@@ -3,6 +3,7 @@ import type { RenderDisplayControl } from '@lobechat/types';
import { ClaudeCodeApiName } from '../../types';
import Agent from './Agent';
import AskUserQuestion from './AskUserQuestion';
import Edit from './Edit';
import Glob from './Glob';
import Grep from './Grep';
@@ -19,6 +20,7 @@ import Write from './Write';
*/
export const ClaudeCodeRenders = {
[ClaudeCodeApiName.Agent]: Agent,
[ClaudeCodeApiName.AskUserQuestion]: AskUserQuestion,
// RunCommand already renders `args.command` + combined output the way CC emits —
// use the shared component directly instead of wrapping it in a re-export file.
[ClaudeCodeApiName.Bash]: RunCommandRender,
@@ -1,5 +1,6 @@
export { ClaudeCodeApiName, ClaudeCodeIdentifier } from '../types';
export { ClaudeCodeInspectors } from './Inspector';
export { ClaudeCodeInterventions } from './Intervention';
export { ClaudeCodeRenderDisplayControls, ClaudeCodeRenders } from './Render';
export { ClaudeCodeStreamings } from './Streaming';
export { CC_SUBAGENT_TYPES, type CCSubagentTypeInfo, resolveCCSubagentType } from './subagentTypes';
@@ -1,5 +1,8 @@
export {
type AgentArgs,
type AskUserQuestionArgs,
type AskUserQuestionItem,
type AskUserQuestionOption,
ClaudeCodeApiName,
ClaudeCodeIdentifier,
type ClaudeCodeTodoItem,
@@ -23,6 +23,13 @@ export enum ClaudeCodeApiName {
* different concept.
*/
Agent = 'Agent',
/**
* Synthetic apiName the adapter rewrites the local
* `mcp__lobe_cc__ask_user_question` MCP tool to. Routes the dedicated
* intervention UI for CC's clarifying-question flow (LOBE-8725); not
* something CC's CLI emits directly.
*/
AskUserQuestion = 'askUserQuestion',
Bash = 'Bash',
Edit = 'Edit',
Glob = 'Glob',
@@ -117,3 +124,31 @@ export interface TaskStopArgs {
shell_id?: string;
task_id?: string;
}
/**
* One option on an AskUserQuestion question — `label` is what the user picks,
* `description` is the supporting text shown alongside.
*/
export interface AskUserQuestionOption {
description: string;
label: string;
}
/**
* One question in an `AskUserQuestion` invocation — header is short (≤12
* chars per CC's contract), `options` is 2-4 entries, `multiSelect` is opt-in.
*/
export interface AskUserQuestionItem {
header: string;
multiSelect?: boolean;
options: AskUserQuestionOption[];
question: string;
}
/**
* `AskUserQuestion` tool arguments — mirrors CC's own schema verbatim so the
* model's existing prompts work unchanged. 1-4 questions per call.
*/
export interface AskUserQuestionArgs {
questions: AskUserQuestionItem[];
}
@@ -2,6 +2,10 @@ import {
AgentBuilderInterventions,
AgentBuilderManifest,
} from '@lobechat/builtin-tool-agent-builder/client';
import {
ClaudeCodeIdentifier,
ClaudeCodeInterventions,
} from '@lobechat/builtin-tool-claude-code/client';
import { CloudSandboxManifest } from '@lobechat/builtin-tool-cloud-sandbox';
import { CloudSandboxInterventions } from '@lobechat/builtin-tool-cloud-sandbox/client';
import {
@@ -37,6 +41,7 @@ import { type BuiltinIntervention } from '@lobechat/types';
export const BuiltinToolInterventions: Record<string, Record<string, any>> = {
[AgentBuilderManifest.identifier]: AgentBuilderInterventions,
[AgentMarketplaceManifest.identifier]: AgentMarketplaceInterventions,
[ClaudeCodeIdentifier]: ClaudeCodeInterventions,
[CloudSandboxManifest.identifier]: CloudSandboxInterventions,
[GroupManagementManifest.identifier]: GroupManagementInterventions,
[GTDManifest.identifier]: GTDInterventions,
+4 -1
View File
@@ -4,6 +4,7 @@
"private": true,
"exports": {
".": "./src/index.ts",
"./askUser": "./src/askUser/index.ts",
"./client": "./src/client/index.ts",
"./spawn": "./src/spawn/index.ts"
},
@@ -15,7 +16,9 @@
"dependencies": {
"@lobechat/agent-gateway-client": "workspace:*",
"@lobehub/icons": "^5.4.0",
"diff": "^8.0.4"
"@modelcontextprotocol/sdk": "^1.26.0",
"diff": "^8.0.4",
"zod": "^3.25.76"
},
"devDependencies": {
"@lobechat/types": "workspace:*"
@@ -158,6 +158,53 @@ describe('ClaudeCodeAdapter', () => {
const toolStart = events.find((e) => e.type === 'tool_start');
expect(toolStart).toBeDefined();
});
it('rewrites mcp__lobe_cc__ask_user_question to apiName=askUserQuestion', () => {
const adapter = new ClaudeCodeAdapter();
adapter.adapt({ subtype: 'init', type: 'system' });
const askInput = {
questions: [
{
header: 'Color',
options: [
{ description: 'Red', label: 'Red' },
{ description: 'Blue', label: 'Blue' },
],
question: 'Pick a color?',
},
],
};
const events = adapter.adapt({
message: {
id: 'msg_1',
content: [
{
id: 'tu_aq_1',
input: askInput,
name: 'mcp__lobe_cc__ask_user_question',
type: 'tool_use',
},
],
},
type: 'assistant',
});
const chunk = events.find(
(e) => e.type === 'stream_chunk' && e.data.chunkType === 'tools_calling',
);
expect(chunk!.data.toolsCalling).toEqual([
{
// Wire-prefixed name is rewritten to the stable domain key.
apiName: 'askUserQuestion',
arguments: JSON.stringify(askInput),
id: 'tu_aq_1',
identifier: 'claude-code',
type: 'default',
},
]);
});
});
describe('tool_result in user events', () => {
@@ -61,6 +61,22 @@ import type {
*/
const CC_TODO_WRITE_TOOL_NAME = 'TodoWrite';
/**
* Tool name CC sees for the LobeHub-hosted MCP `ask_user_question` server.
* Source of truth lives in `../askUser/constants.ts`; replicated here as a
* literal so the adapter compiles in browser bundles without dragging in
* any of the askUser package's runtime (node:http, MCP SDK, etc.) by
* accident. Keep in sync.
*/
const ASK_USER_MCP_TOOL_NAME = 'mcp__lobe_cc__ask_user_question';
/**
* apiName the adapter rewrites the MCP tool to so the renderer routes on
* a stable key, not the wire-prefixed MCP name. Source of truth same as
* above.
*/
const ASK_USER_API_NAME = 'askUserQuestion';
/** Status of a single todo item in CC's `TodoWrite` tool_use. */
type ClaudeCodeTodoStatus = 'pending' | 'in_progress' | 'completed';
@@ -414,8 +430,13 @@ export class ClaudeCodeAdapter implements AgentEventAdapter {
break;
}
case 'tool_use': {
// Rewrite our local MCP `ask_user_question` tool to a stable
// apiName so the renderer routes on `askUserQuestion` (clean,
// domain-named) instead of the wire-prefixed MCP form. Identifier
// stays `claude-code` because this remains a CC-side tool.
const apiName = block.name === ASK_USER_MCP_TOOL_NAME ? ASK_USER_API_NAME : block.name;
newToolCalls.push({
apiName: block.name,
apiName,
arguments: JSON.stringify(block.input || {}),
id: block.id,
identifier: 'claude-code',
@@ -507,8 +528,13 @@ export class ClaudeCodeAdapter implements AgentEventAdapter {
break;
}
case 'tool_use': {
// Rewrite our local MCP `ask_user_question` tool to a stable
// apiName so the renderer routes on `askUserQuestion` (clean,
// domain-named) instead of the wire-prefixed MCP form. Identifier
// stays `claude-code` because this remains a CC-side tool.
const apiName = block.name === ASK_USER_MCP_TOOL_NAME ? ASK_USER_API_NAME : block.name;
newToolCalls.push({
apiName: block.name,
apiName,
arguments: JSON.stringify(block.input || {}),
id: block.id,
identifier: 'claude-code',
@@ -0,0 +1,255 @@
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { AskUserBridge } from './AskUserBridge';
describe('AskUserBridge', () => {
beforeEach(() => {
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
describe('pending() → resolve()', () => {
it('emits an agent_intervention_request and resolves with the user-supplied result', async () => {
const bridge = new AskUserBridge('op-1');
const events: any[] = [];
const iter = bridge.events()[Symbol.asyncIterator]();
// Pump events asynchronously into the array.
const pumped = (async () => {
const e = await iter.next();
if (!e.done) events.push(e.value);
})();
const pending = bridge.pending({ arguments: { questions: [{ q: 'foo' }] } });
await pumped;
expect(events).toHaveLength(1);
const req = events[0];
expect(req.type).toBe('agent_intervention_request');
expect(req.operationId).toBe('op-1');
expect(req.data.identifier).toBe('claude-code');
expect(req.data.apiName).toBe('askUserQuestion');
expect(req.data.toolCallId).toMatch(/^[\da-f-]{36}$/);
expect(JSON.parse(req.data.arguments)).toEqual({ questions: [{ q: 'foo' }] });
bridge.resolve(req.data.toolCallId, { result: { foo: 'bar' } });
await expect(pending).resolves.toEqual({ result: { foo: 'bar' } });
});
it('uses caller-supplied toolCallId as the wire correlation key', async () => {
const bridge = new AskUserBridge('op-1');
const drain = drainEvents(bridge);
const pending = bridge.pending({
arguments: { questions: [] },
toolCallId: 'cc-tool-use-abc',
});
const event = await drain.firstEvent;
expect(event.data.toolCallId).toBe('cc-tool-use-abc');
bridge.resolve('cc-tool-use-abc', { result: { picked: 'red' } });
await expect(pending).resolves.toEqual({ result: { picked: 'red' } });
drain.stop();
});
it('rejects pending() when the same toolCallId is already in flight', async () => {
const bridge = new AskUserBridge('op-1');
const drain = drainEvents(bridge);
void bridge.pending({ arguments: {}, toolCallId: 'dup' });
await expect(bridge.pending({ arguments: {}, toolCallId: 'dup' })).rejects.toThrow(
/duplicate toolCallId/,
);
bridge.cancelAll();
drain.stop();
});
it('ignores resolve() for unknown toolCallId', async () => {
const bridge = new AskUserBridge('op-1');
// Drain emitted events into the void so pending() can run.
const drain = drainEvents(bridge);
const pending = bridge.pending({ arguments: {} });
bridge.resolve('not-a-real-id', { result: 'x' });
// Promise should still be unresolved — fast-forward past timeout to confirm.
vi.advanceTimersByTime(5 * 60 * 1000 + 1);
const answer = await pending;
expect(answer.cancelled).toBe(true);
expect(answer.cancelReason).toBe('timeout');
drain.stop();
});
});
describe('cancellation paths', () => {
it('resolves with cancelled=true on user_cancelled via cancel()', async () => {
const bridge = new AskUserBridge('op-1');
const drain = drainEvents(bridge);
const pending = bridge.pending({ arguments: {} });
const toolCallId = (await drain.firstEvent).data.toolCallId;
bridge.cancel(toolCallId);
const answer = await pending;
expect(answer).toEqual({ cancelReason: 'user_cancelled', cancelled: true });
drain.stop();
});
it('resolves with cancelled=true on session_ended via cancelAll()', async () => {
const bridge = new AskUserBridge('op-1');
const drain = drainEvents(bridge);
const pending = bridge.pending({ arguments: {} });
await drain.firstEvent;
bridge.cancelAll('session_ended');
const answer = await pending;
expect(answer).toEqual({ cancelReason: 'session_ended', cancelled: true });
drain.stop();
});
it('resolves with cancelled=true on timeout (default 5 min)', async () => {
const bridge = new AskUserBridge('op-1');
const drain = drainEvents(bridge);
const pending = bridge.pending({ arguments: {} });
vi.advanceTimersByTime(5 * 60 * 1000 - 1);
const stillPending = await Promise.race([pending, Promise.resolve('not-yet')]);
expect(stillPending).toBe('not-yet');
vi.advanceTimersByTime(2);
const answer = await pending;
expect(answer).toEqual({ cancelReason: 'timeout', cancelled: true });
drain.stop();
});
it('honors a custom timeoutMs', async () => {
const bridge = new AskUserBridge('op-1');
const drain = drainEvents(bridge);
const pending = bridge.pending({ arguments: {} }, { timeoutMs: 1000 });
vi.advanceTimersByTime(1001);
const answer = await pending;
expect(answer.cancelled).toBe(true);
drain.stop();
});
it('cancelAll() rejects future pending() calls', async () => {
const bridge = new AskUserBridge('op-1');
bridge.cancelAll();
await expect(bridge.pending({ arguments: {} })).rejects.toThrow(/closed/);
});
});
describe('progress notifications (keepalive)', () => {
it('calls onProgress at the configured interval until resolved', async () => {
const bridge = new AskUserBridge('op-1');
const drain = drainEvents(bridge);
const onProgress = vi.fn();
const pending = bridge.pending(
{ arguments: {} },
{ onProgress, progressIntervalMs: 100, timeoutMs: 1000 },
);
const toolCallId = (await drain.firstEvent).data.toolCallId;
vi.advanceTimersByTime(350);
// Three ticks elapsed (100, 200, 300) — Node's setInterval fires
// exactly at multiples, so 3 calls with monotonically increasing
// elapsed values.
expect(onProgress).toHaveBeenCalledTimes(3);
expect(onProgress.mock.calls[0][0]).toBeGreaterThanOrEqual(100);
expect(onProgress.mock.calls[2][0]).toBeGreaterThanOrEqual(300);
expect(onProgress.mock.calls[0][1]).toBe(1000);
bridge.resolve(toolCallId, { result: 'done' });
await pending;
// After resolve, no further ticks even after more time passes.
vi.advanceTimersByTime(500);
expect(onProgress).toHaveBeenCalledTimes(3);
drain.stop();
});
it('skips onProgress entirely when not provided', async () => {
const bridge = new AskUserBridge('op-1');
const drain = drainEvents(bridge);
// No onProgress — just verify timeout still works without it.
const pending = bridge.pending({ arguments: {} }, { timeoutMs: 50 });
vi.advanceTimersByTime(60);
await expect(pending).resolves.toMatchObject({ cancelled: true });
drain.stop();
});
});
describe('event stream', () => {
it('emits one request event per pending() call', async () => {
const bridge = new AskUserBridge('op-1');
const drain = drainEvents(bridge);
bridge.pending({ arguments: { q: 1 } });
bridge.pending({ arguments: { q: 2 } });
bridge.pending({ arguments: { q: 3 } });
// Advance timers so the resolved-via-cancellation cleanup paths don't
// interfere with the assertion below.
const events = await Promise.all([
drain.firstEvent,
drain.events.next(),
drain.events.next(),
]);
// first 3 events are all intervention requests
const types = events.map((e: any) => e.value?.type ?? e.type);
expect(types.every((t) => t === 'agent_intervention_request')).toBe(true);
bridge.cancelAll();
drain.stop();
});
it('stamps stepIndex via the configured getter', async () => {
let step = 7;
const bridge = new AskUserBridge('op-1', { getStepIndex: () => step });
const drain = drainEvents(bridge);
bridge.pending({ arguments: {} });
const e1 = await drain.firstEvent;
expect(e1.stepIndex).toBe(7);
step = 11;
bridge.pending({ arguments: {} });
const e2 = await drain.events.next();
expect((e2.value as any).stepIndex).toBe(11);
bridge.cancelAll();
});
it('iterator ends after cancelAll()', async () => {
const bridge = new AskUserBridge('op-1');
const iter = bridge.events()[Symbol.asyncIterator]();
const next = iter.next();
bridge.cancelAll();
await expect(next).resolves.toMatchObject({ done: true });
});
});
});
// Helper: drain events from a bridge in the background, exposing the first
// emitted event as a promise and a way to call iter.next() on demand.
const drainEvents = (bridge: AskUserBridge) => {
const events = bridge.events()[Symbol.asyncIterator]();
const firstEvent = events.next().then((r) => {
if (r.done) throw new Error('stream ended before first event');
return r.value as any;
});
let stopped = false;
return {
events,
firstEvent,
stop: () => {
stopped = true;
},
get stopped() {
return stopped;
},
};
};
@@ -0,0 +1,272 @@
import { randomUUID } from 'node:crypto';
import type {
AgentInterventionRequestData,
AgentStreamEvent,
} from '@lobechat/agent-gateway-client';
/**
* What the MCP handler gets back from `bridge.pending()`.
*
* `result` carries the user's structured answer when they submit; `cancelled`
* with a reason when the deadline elapses, the user cancels, or the producer
* tears the session down. Mutually exclusive — exactly one of `result` /
* `cancelled` is set.
*/
export interface InterventionAnswer {
cancelled?: boolean;
cancelReason?: 'timeout' | 'user_cancelled' | 'session_ended';
result?: unknown;
}
export interface PendingArgs {
/** Whatever the MCP tool's input schema accepted (e.g. `{ questions: [...] }`). */
arguments: unknown;
/**
* Wire correlation key for this intervention. Used as the `toolCallId`
* on outbound `agent_intervention_request` events and looked up by
* `resolve()` / `cancel()` when the user submits an answer.
*
* For CC, the producer should pass `extra._meta['claudecode/toolUseId']`
* here so it equals the existing tool message id on the renderer side
* (the assistant `tool_use` for `mcp__lobe_cc__ask_user_question` and
* the intervention request both reference the same tool bubble).
*
* If omitted, the bridge synthesizes a random UUID — fine for
* stand-alone tests, but the renderer won't be able to correlate.
*/
toolCallId?: string;
}
export interface PendingOptions {
/**
* Called every `progressIntervalMs` while the call is pending. Use it to
* push MCP `notifications/progress` to keep the SSE channel from timing
* out (CC's HTTP transport drops at ~5min without keepalive).
*
* `elapsedMs` is the wall-clock millis since `pending()` was called.
*/
onProgress?: (elapsedMs: number, totalMs: number) => void | Promise<void>;
/** How often to call `onProgress`. Default: 30 000 (30s). */
progressIntervalMs?: number;
/**
* Absolute deadline (`Date.now() + timeoutMs`). When it elapses, the
* pending promise resolves with `{ cancelled: true, cancelReason: 'timeout' }`.
* Default: 5 minutes.
*/
timeoutMs?: number;
}
interface PendingEntry {
cleanup: () => void;
reject: (err: unknown) => void;
resolve: (answer: InterventionAnswer) => void;
}
interface BridgeOptions {
/**
* Stamps `stepIndex` on emitted events. The bridge has no visibility into
* the CC adapter's own step counter, so the producer (which owns the
* merged stream) provides it. Defaults to a constant `0` — fine for unit
* tests, but real producers should plug in their adapter's current value.
*/
getStepIndex?: () => number;
}
/**
* Per-operation channel between an MCP tool handler (which awaits the user)
* and the producer's outbound stream (which carries the request to UI and
* receives the user's answer back).
*
* Lifecycle:
* 1. Producer constructs a bridge for an `operationId`.
* 2. The MCP handler calls `pending(args, opts)` — gets a Promise.
* 3. Bridge synthesizes a `toolCallId`, emits an
* `agent_intervention_request` AgentStreamEvent on `events()` for the
* producer to forward.
* 4. Producer eventually calls `resolve(toolCallId, payload)` (from the
* consumer's `agent_intervention_response`) — Promise resolves.
* 5. Or: deadline / `cancelAll()` rejects the pending Promise with a
* `{ cancelled: true, cancelReason }` answer (no exception thrown).
*
* Errors only surface from `pending()` if the bridge itself is misused
* (e.g. emitting after `cancelAll`). Cancellation/timeout is normal flow,
* not an exception.
*/
export class AskUserBridge {
private readonly pending_ = new Map<string, PendingEntry>();
private readonly outboundQueue: AgentStreamEvent[] = [];
private readonly outboundWaiters: Array<(value: IteratorResult<AgentStreamEvent>) => void> = [];
private readonly getStepIndex: () => number;
private closed = false;
constructor(
public readonly operationId: string,
options: BridgeOptions = {},
) {
this.getStepIndex = options.getStepIndex ?? (() => 0);
}
/** Currently-blocked MCP handler count. Useful for telemetry / shutdown gates. */
get pendingCount(): number {
return this.pending_.size;
}
/**
* Block the caller until the consumer answers (or the deadline / cancel
* fires). Always resolves; never throws unless the bridge is already
* closed (programming error).
*/
pending(args: PendingArgs, options: PendingOptions = {}): Promise<InterventionAnswer> {
if (this.closed) {
return Promise.reject(new Error('AskUserBridge is closed; cannot accept new pending calls'));
}
const toolCallId = args.toolCallId ?? randomUUID();
if (this.pending_.has(toolCallId)) {
// Two pendings on the same key would clobber each other. Caller bug;
// surface it loudly rather than silently lose one resolve.
return Promise.reject(
new Error(`AskUserBridge: duplicate toolCallId in flight: ${toolCallId}`),
);
}
const timeoutMs = options.timeoutMs ?? 5 * 60 * 1000;
const progressIntervalMs = options.progressIntervalMs ?? 30_000;
const startedAt = Date.now();
const deadline = startedAt + timeoutMs;
return new Promise<InterventionAnswer>((resolve, reject) => {
const timeoutTimer = setTimeout(() => {
this.pending_.delete(toolCallId);
clearInterval(progressTimer);
resolve({ cancelled: true, cancelReason: 'timeout' });
}, timeoutMs);
const progressTimer: ReturnType<typeof setInterval> | undefined = options.onProgress
? setInterval(() => {
const elapsed = Date.now() - startedAt;
// Fire-and-forget; consumer-side errors are logged by caller.
void Promise.resolve(options.onProgress!(elapsed, timeoutMs)).catch(() => {});
}, progressIntervalMs)
: undefined;
const cleanup = () => {
clearTimeout(timeoutTimer);
if (progressTimer) clearInterval(progressTimer);
};
this.pending_.set(toolCallId, { cleanup, reject, resolve });
// Emit the intervention request AFTER setting up the pending entry,
// so any synchronous resolve from a test fixture finds the slot.
// Hardcoded to AskUserQuestion for now — the only intervention shape
// we support. Take an explicit `apiName` in PendingArgs when adding
// more (e.g. CC approval, file picker).
const data: AgentInterventionRequestData = {
apiName: 'askUserQuestion',
arguments: JSON.stringify(args.arguments ?? {}),
deadline,
identifier: 'claude-code',
toolCallId,
};
this.emit({
data,
operationId: this.operationId,
stepIndex: this.getStepIndex(),
timestamp: startedAt,
type: 'agent_intervention_request',
});
});
}
/**
* Producer-side: called when an `agent_intervention_response` arrives
* from the consumer. No-op if the toolCallId is unknown (already
* timed out, cancelled, or a stale duplicate).
*/
resolve(
toolCallId: string,
payload: {
cancelled?: boolean;
cancelReason?: InterventionAnswer['cancelReason'];
result?: unknown;
},
): void {
const entry = this.pending_.get(toolCallId);
if (!entry) return;
this.pending_.delete(toolCallId);
entry.cleanup();
entry.resolve(
payload.cancelled
? { cancelReason: payload.cancelReason ?? 'user_cancelled', cancelled: true }
: { result: payload.result },
);
}
/**
* Cancel a single pending call. Used when the consumer explicitly aborts
* one intervention without ending the whole op.
*/
cancel(toolCallId: string, reason: InterventionAnswer['cancelReason'] = 'user_cancelled'): void {
this.resolve(toolCallId, { cancelReason: reason, cancelled: true });
}
/**
* Tear down the bridge. Every pending handler is resolved with
* `cancelled: true, reason='session_ended'` (so its MCP tool returns
* cleanly to CC), the outbound event stream closes, and subsequent
* `pending()` calls reject.
*/
cancelAll(reason: InterventionAnswer['cancelReason'] = 'session_ended'): void {
if (this.closed) return;
this.closed = true;
for (const entry of this.pending_.values()) {
entry.cleanup();
entry.resolve({ cancelReason: reason, cancelled: true });
}
this.pending_.clear();
// Drain any waiters with a "done" so consumers can break their loop.
while (this.outboundWaiters.length > 0) {
const waiter = this.outboundWaiters.shift()!;
waiter({ done: true, value: undefined as any });
}
}
/**
* Async iterable over outbound events the producer should forward to the
* consumer. One iterator per bridge — multi-consumer fan-out is the
* producer's job. Iterator ends after `cancelAll()`.
*/
events(): AsyncIterable<AgentStreamEvent> {
return {
[Symbol.asyncIterator]: () => this.makeIterator(),
};
}
private makeIterator(): AsyncIterator<AgentStreamEvent> {
return {
next: () => {
const buffered = this.outboundQueue.shift();
if (buffered) {
return Promise.resolve({ done: false, value: buffered });
}
if (this.closed) {
return Promise.resolve({ done: true, value: undefined as any });
}
return new Promise((resolveWaiter) => {
this.outboundWaiters.push(resolveWaiter);
});
},
return: () => Promise.resolve({ done: true, value: undefined as any }),
};
}
private emit(event: AgentStreamEvent): void {
const waiter = this.outboundWaiters.shift();
if (waiter) {
waiter({ done: false, value: event });
} else {
this.outboundQueue.push(event);
}
}
}
@@ -0,0 +1,257 @@
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StreamableHTTPClientTransport } from '@modelcontextprotocol/sdk/client/streamableHttp.js';
import { afterEach, describe, expect, it } from 'vitest';
import { AskUserBridge } from './AskUserBridge';
import { AskUserMcpServer } from './AskUserMcpServer';
import { ASK_USER_MCP_SERVER_NAME, ASK_USER_TOOL_FULL_NAME, ASK_USER_TOOL_NAME } from './constants';
let server: AskUserMcpServer;
afterEach(async () => {
await server?.stop();
});
describe('AskUserMcpServer', () => {
describe('lifecycle', () => {
it('starts on port=0 (auto-assigned), exposes a localhost URL, stops cleanly', async () => {
server = new AskUserMcpServer();
const { port, url } = await server.start();
expect(port).toBeGreaterThan(0);
expect(url).toMatch(/^http:\/\/127\.0\.0\.1:\d+\/mcp$/);
await server.stop();
// After stop, calling .url throws — server is no longer listening.
expect(() => server.url).toThrow();
});
it('start() is idempotent', async () => {
server = new AskUserMcpServer();
const a = await server.start();
const b = await server.start();
expect(a.port).toBe(b.port);
});
});
describe('per-op routing', () => {
it('hasOperation / operationCount track register / unregister', async () => {
server = new AskUserMcpServer();
await server.start();
expect(server.operationCount).toBe(0);
const bridge1 = server.registerOperation('op-1');
const bridge2 = server.registerOperation('op-2');
expect(server.operationCount).toBe(2);
expect(server.hasOperation('op-1')).toBe(true);
expect(server.hasOperation('op-3')).toBe(false);
expect(bridge1).toBeInstanceOf(AskUserBridge);
expect(bridge2).toBeInstanceOf(AskUserBridge);
expect(bridge1).not.toBe(bridge2);
server.unregisterOperation('op-1');
expect(server.operationCount).toBe(1);
expect(server.hasOperation('op-1')).toBe(false);
});
it('rejects double-registering the same op id', async () => {
server = new AskUserMcpServer();
await server.start();
server.registerOperation('op-1');
expect(() => server.registerOperation('op-1')).toThrow(/already registered/);
});
it('urlForOperation appends ?op=<id> to the base url', async () => {
server = new AskUserMcpServer();
await server.start();
server.registerOperation('op-7');
const url = server.urlForOperation('op-7');
expect(url).toContain('/mcp');
expect(new URL(url).searchParams.get('op')).toBe('op-7');
});
it('unregisterOperation cancels pending bridges', async () => {
server = new AskUserMcpServer();
await server.start();
const bridge = server.registerOperation('op-1');
const pending = bridge.pending({ arguments: { questions: [{ q: 'foo' }] } });
// Drain the request event so the iterator's queue doesn't deadlock.
void bridge.events()[Symbol.asyncIterator]().next();
server.unregisterOperation('op-1');
const answer = await pending;
expect(answer).toEqual({ cancelReason: 'session_ended', cancelled: true });
});
it('publishes the canonical mcp__lobe_cc__ask_user_question tool', async () => {
server = new AskUserMcpServer();
await server.start();
server.registerOperation('probe-op');
const client = await connectClient(server.urlForOperation('probe-op'));
try {
const list = await client.listTools();
expect(list.tools).toHaveLength(1);
expect(list.tools[0].name).toBe(ASK_USER_TOOL_NAME);
expect(ASK_USER_TOOL_FULL_NAME).toBe(
`mcp__${ASK_USER_MCP_SERVER_NAME}__${ASK_USER_TOOL_NAME}`,
);
} finally {
await client.close();
}
});
});
describe('end-to-end tool call', () => {
it('routes a tools/call to the right per-op bridge and returns the user answer', async () => {
server = new AskUserMcpServer({ pendingTimeoutMs: 30_000, progressIntervalMs: 1000 });
await server.start();
const bridge = server.registerOperation('op-A');
// Producer-side: when an intervention_request shows up on bridge.events,
// resolve it with a fake user answer.
let interventionRequestSeen: any;
const producerLoop = (async () => {
for await (const e of bridge.events()) {
if (e.type === 'agent_intervention_request') {
interventionRequestSeen = e;
bridge.resolve(e.data.toolCallId, {
result: { 'What color do you want?': 'Red' },
});
break;
}
}
})();
// Client-side: behave like CC — initialize, list, call.
const client = await connectClient(server.urlForOperation('op-A'));
try {
const list = await client.listTools();
expect(list.tools[0].name).toBe(ASK_USER_TOOL_NAME);
const callResult = (await client.callTool({
arguments: {
questions: [
{
header: 'Color',
options: [
{ description: 'Red color', label: 'Red' },
{ description: 'Blue color', label: 'Blue' },
],
question: 'What color do you want?',
},
],
},
name: ASK_USER_TOOL_NAME,
})) as { content: Array<{ text: string; type: string }>; isError?: boolean };
expect(callResult.isError).toBeFalsy();
expect(callResult.content[0].text).toContain('User answers');
expect(callResult.content[0].text).toContain('What color do you want?: Red');
} finally {
await client.close();
}
await producerLoop;
expect(interventionRequestSeen).toBeDefined();
expect(interventionRequestSeen.operationId).toBe('op-A');
expect(interventionRequestSeen.data.identifier).toBe('claude-code');
expect(interventionRequestSeen.data.apiName).toBe('askUserQuestion');
});
it('returns an isError tool result when the user cancels', async () => {
server = new AskUserMcpServer({ pendingTimeoutMs: 30_000, progressIntervalMs: 1000 });
await server.start();
const bridge = server.registerOperation('op-cancel');
const producerLoop = (async () => {
for await (const e of bridge.events()) {
if (e.type === 'agent_intervention_request') {
bridge.resolve(e.data.toolCallId, { cancelled: true, cancelReason: 'user_cancelled' });
break;
}
}
})();
const client = await connectClient(server.urlForOperation('op-cancel'));
try {
const callResult = (await client.callTool({
arguments: {
questions: [
{
header: 'X',
options: [
{ description: 'a', label: 'A' },
{ description: 'b', label: 'B' },
],
question: 'pick',
},
],
},
name: ASK_USER_TOOL_NAME,
})) as { content: Array<{ text: string; type: string }>; isError?: boolean };
expect(callResult.isError).toBe(true);
expect(callResult.content[0].text.toLowerCase()).toContain('cancel');
} finally {
await client.close();
}
await producerLoop;
});
/**
* Regression: a single shared `StreamableHTTPServerTransport` rejects the
* second `initialize` with `Server already initialized`, breaking every
* op after the first. Ensures we mint one transport+McpServer per session.
*/
it('handles sequential ops on independent sessions', async () => {
server = new AskUserMcpServer({ pendingTimeoutMs: 30_000, progressIntervalMs: 1000 });
await server.start();
for (const opId of ['op-seq-1', 'op-seq-2', 'op-seq-3']) {
const bridge = server.registerOperation(opId);
const producerLoop = (async () => {
for await (const e of bridge.events()) {
if (e.type === 'agent_intervention_request') {
bridge.resolve(e.data.toolCallId, { result: { pick: 'A' } });
break;
}
}
})();
const client = await connectClient(server.urlForOperation(opId));
try {
const callResult = (await client.callTool({
arguments: {
questions: [
{
header: opId,
options: [
{ description: 'a', label: 'A' },
{ description: 'b', label: 'B' },
],
question: 'pick',
},
],
},
name: ASK_USER_TOOL_NAME,
})) as { content: Array<{ text: string }>; isError?: boolean };
expect(callResult.isError).toBeFalsy();
expect(callResult.content[0].text).toContain('pick: A');
} finally {
await client.close();
}
await producerLoop;
server.unregisterOperation(opId);
}
});
});
});
const connectClient = async (url: string): Promise<Client> => {
const transport = new StreamableHTTPClientTransport(new URL(url));
const client = new Client({ name: 'unit-test', version: '0' }, { capabilities: {} });
await client.connect(transport);
return client;
};
@@ -0,0 +1,448 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import { randomUUID } from 'node:crypto';
import http from 'node:http';
import type { AddressInfo } from 'node:net';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import { StreamableHTTPServerTransport } from '@modelcontextprotocol/sdk/server/streamableHttp.js';
import { isInitializeRequest } from '@modelcontextprotocol/sdk/types.js';
import { z } from 'zod';
import type { InterventionAnswer } from './AskUserBridge';
import { AskUserBridge } from './AskUserBridge';
import { ASK_USER_MCP_SERVER_NAME, ASK_USER_TOOL_NAME } from './constants';
/**
* Mirrors CC's built-in `AskUserQuestion` schema. CC's schema:
* - 1-4 questions
* - each: header (≤12 chars), question, options (2-4), multiSelect?
* - each option: label, description
* We replicate it so the model can call our tool with the exact shape CC
* trained on.
*/
const askUserOptionShape = z.object({
description: z.string(),
label: z.string(),
});
const askUserQuestionShape = z.object({
header: z.string(),
multiSelect: z.boolean().optional(),
options: z.array(askUserOptionShape).min(2).max(4),
question: z.string(),
});
const askUserInputShape = {
questions: z.array(askUserQuestionShape).min(1).max(4),
};
/** Tool description seen by the model. Kept terse — full schema lives in `inputSchema`. */
const ASK_USER_TOOL_DESCRIPTION =
'Ask the user one or more clarifying questions with multiple-choice options. ' +
"Use this whenever the user's intent is ambiguous and you need them to pick.";
export interface StartedServer {
/** Effective listen port (auto-assigned when constructed with port=0). */
port: number;
/** Base URL the producer hands to CC via `--mcp-config`. */
url: string;
}
export interface AskUserMcpServerOptions {
/**
* Per-call timeout passed to `bridge.pending()`. Default 5 minutes —
* matches the issue's UX requirement and the tested CC keepalive ceiling.
*/
pendingTimeoutMs?: number;
/**
* Port to bind. `0` (default) lets the OS pick a free one.
*/
port?: number;
/**
* Progress notification cadence. Default 30s. CC's HTTP transport drops
* SSE around 5min idle without a wire-level message — `notifications/progress`
* counts as a wire-level message.
*/
progressIntervalMs?: number;
}
interface RegisteredOperation {
bridge: AskUserBridge;
}
interface SessionEntry {
mcp: McpServer;
transport: StreamableHTTPServerTransport;
}
/**
* Process-wide MCP server that exposes a single `ask_user_question` tool to
* CC over HTTP/SSE. One server, many concurrent operations — each spawn
* registers a per-op `AskUserBridge` and gets back a URL with `?op=<opId>`
* that CC's `--mcp-config` points at. Tool invocations route to the matching
* bridge by query param.
*
* Lifecycle:
* server.start() // once per process
* bridge = server.registerOperation(opId)
* ...spawn CC pointing at server.url + ?op=opId...
* server.unregisterOperation(opId) // releases bridge resources
* server.stop() // on app shutdown
*
* ## Per-session transport
*
* Each MCP `initialize` from a new CC subprocess gets its own
* `StreamableHTTPServerTransport` + `McpServer` pair. The SDK's transport
* stores `_initialized=true` and a `sessionId` per instance, so reusing a
* single transport across sequential ops makes the second `initialize` fail
* with `Invalid Request: Server already initialized`. Subsequent requests
* from the same CC subprocess (carrying `mcp-session-id`) route back to the
* matching transport via `sessionTransports` lookup.
*/
export class AskUserMcpServer {
private httpServer?: http.Server;
/** sessionId → transport+mcp pair. Populated on initialize, removed on session close. */
private readonly sessionTransports = new Map<string, SessionEntry>();
private readonly operations = new Map<string, RegisteredOperation>();
/**
* MCP session id → operationId. Populated when a CC initialize POST
* arrives at `/mcp?op=<opId>`; the URL's `op` is captured via
* `AsyncLocalStorage`, the SDK's `onsessioninitialized` hook reads it
* at session-create time, and tool handler lookups use `extra.sessionId`.
*/
private readonly sessionIdToOpId = new Map<string, string>();
/** Per-request op id, populated for the duration of `handleRequest`. */
private readonly opIdContext = new AsyncLocalStorage<string | undefined>();
private startedUrl?: string;
private readonly pendingTimeoutMs: number;
private readonly progressIntervalMs: number;
constructor(private readonly options: AskUserMcpServerOptions = {}) {
this.pendingTimeoutMs = options.pendingTimeoutMs ?? 5 * 60 * 1000;
this.progressIntervalMs = options.progressIntervalMs ?? 30_000;
}
/** URL only valid after `start()` resolves. */
get url(): string {
if (!this.startedUrl) {
throw new Error('AskUserMcpServer not started yet — call start() first');
}
return this.startedUrl;
}
async start(): Promise<StartedServer> {
if (this.httpServer) {
// idempotent — repeat calls return the existing started state.
return { port: (this.httpServer.address() as AddressInfo).port, url: this.url };
}
const httpServer = http.createServer(async (req, res) => {
// Only the `/mcp` path is part of our contract. Anything else is
// either a misroute or a probe — answer with 404 so it's loud.
if (!req.url || !req.url.startsWith('/mcp')) {
res.writeHead(404, { 'content-type': 'text/plain' });
res.end('Not Found');
return;
}
// Producer encodes operationId as `?op=<opId>` on the URL it hands
// to CC. Capture it now so `onsessioninitialized` can bind it to the
// generated sessionId. AsyncLocalStorage keeps interleaved requests
// from clobbering each other.
const parsed = new URL(req.url, 'http://127.0.0.1');
const opId = parsed.searchParams.get('op') ?? undefined;
// The MCP transport reads from req directly. We only need to extract
// the JSON body for POST requests so it can be passed in.
let body: unknown;
if (req.method === 'POST') {
body = await readJsonBody(req).catch(() => undefined);
}
await this.opIdContext.run(opId, async () => {
try {
const transport = await this.resolveTransport(req, body);
if (!transport) {
res.writeHead(400, { 'content-type': 'application/json' });
res.end(
JSON.stringify({
error: {
code: -32_000,
message: 'Bad Request: no session and no initialize request',
},
id: null,
jsonrpc: '2.0',
}),
);
return;
}
await transport.handleRequest(req, res, body);
} catch (err) {
if (!res.headersSent) {
res.writeHead(500, { 'content-type': 'text/plain' });
res.end(String((err as Error)?.message ?? err));
}
}
});
});
this.httpServer = httpServer;
await new Promise<void>((resolve, reject) => {
httpServer.once('error', reject);
httpServer.listen(this.options.port ?? 0, '127.0.0.1', () => {
httpServer.off('error', reject);
resolve();
});
});
const port = (httpServer.address() as AddressInfo).port;
this.startedUrl = `http://127.0.0.1:${port}/mcp`;
return { port, url: this.startedUrl };
}
async stop(): Promise<void> {
// Close all bridges first so pending MCP handlers return quickly.
for (const [opId] of this.operations) this.unregisterOperation(opId);
// Tear down every per-session transport + MCP server pair.
for (const [, entry] of this.sessionTransports) {
await entry.transport.close().catch(() => {});
await entry.mcp.close().catch(() => {});
}
this.sessionTransports.clear();
await new Promise<void>((resolve) => {
if (!this.httpServer) {
resolve();
return;
}
this.httpServer.close(() => resolve());
});
this.httpServer = undefined;
this.startedUrl = undefined;
}
/**
* Allocate a bridge for a new operation and return the
* `--mcp-config`-ready URL. The caller spawns CC pointing at this URL
* and merges `bridge.events()` into the producer's outbound stream.
*/
registerOperation(operationId: string, bridge?: AskUserBridge): AskUserBridge {
if (this.operations.has(operationId)) {
throw new Error(`AskUserMcpServer: operation already registered: ${operationId}`);
}
const created = bridge ?? new AskUserBridge(operationId);
this.operations.set(operationId, { bridge: created });
return created;
}
unregisterOperation(operationId: string): void {
const entry = this.operations.get(operationId);
if (!entry) return;
entry.bridge.cancelAll('session_ended');
this.operations.delete(operationId);
// Drop the reverse mapping for any sessions that were bound to this op.
for (const [sid, oid] of this.sessionIdToOpId) {
if (oid === operationId) this.sessionIdToOpId.delete(sid);
}
}
/** Build the per-op URL the producer writes into the temp `mcp-config` JSON. */
urlForOperation(operationId: string): string {
const base = new URL(this.url);
base.searchParams.set('op', operationId);
return base.toString();
}
/** Test/inspection helper. */
hasOperation(operationId: string): boolean {
return this.operations.has(operationId);
}
/** Currently-registered operation count. */
get operationCount(): number {
return this.operations.size;
}
/** Active MCP session count (initialize succeeded, not yet closed). */
get sessionCount(): number {
return this.sessionTransports.size;
}
/**
* Locate (or build) the transport that should handle this request.
*
* - Existing session id (header) → matching stored transport
* - No session id + initialize body → fresh transport+mcp pair, registered
* on `onsessioninitialized` so the very next message from this client
* finds its session
* - Anything else → null (caller responds with 400)
*/
private async resolveTransport(
req: http.IncomingMessage,
body: unknown,
): Promise<StreamableHTTPServerTransport | undefined> {
const sessionId = (req.headers['mcp-session-id'] as string | undefined) ?? undefined;
if (sessionId) {
const entry = this.sessionTransports.get(sessionId);
if (entry) return entry.transport;
// Unknown session id — let the SDK respond 404; we still return the
// transport-less response below.
return undefined;
}
if (body && isInitializeRequest(body)) {
return this.createSessionTransport();
}
return undefined;
}
/**
* Build a fresh `StreamableHTTPServerTransport` + `McpServer` pair for a
* new MCP session. The pair is registered into `sessionTransports` from
* the `onsessioninitialized` callback, so every subsequent request
* tagged with that sessionId routes back here without reconstruction.
*/
private createSessionTransport(): StreamableHTTPServerTransport {
const mcp = new McpServer(
{ name: ASK_USER_MCP_SERVER_NAME, version: '1.0.0' },
{ capabilities: { tools: {} } },
);
this.registerAskUserTool(mcp);
const transport: StreamableHTTPServerTransport = new StreamableHTTPServerTransport({
onsessionclosed: (sessionId: string) => {
this.sessionTransports.delete(sessionId);
this.sessionIdToOpId.delete(sessionId);
},
onsessioninitialized: (sessionId: string) => {
this.sessionTransports.set(sessionId, { mcp, transport });
const opId = this.opIdContext.getStore();
if (opId) this.sessionIdToOpId.set(sessionId, opId);
},
sessionIdGenerator: () => randomUUID(),
});
// Connect synchronously so the first request's body can be processed
// by the same transport we just built. `mcp.connect(transport)` is
// idempotent at this stage and resolves before `handleRequest` is
// called by the caller.
void mcp.connect(transport);
return transport;
}
private registerAskUserTool(mcp: McpServer) {
mcp.registerTool(
ASK_USER_TOOL_NAME,
{
description: ASK_USER_TOOL_DESCRIPTION,
inputSchema: askUserInputShape,
title: 'Ask User Question',
},
async (args, extra) => {
const sessionId = (extra as { sessionId?: string } | undefined)?.sessionId;
const operationId = sessionId ? this.sessionIdToOpId.get(sessionId) : undefined;
if (!operationId) {
return errorResult(
"Missing 'op' query parameter on MCP server URL — producer should append ?op=<operationId>",
);
}
const op = this.operations.get(operationId);
if (!op) {
return errorResult(
`No active operation for id '${operationId}'. The op may have ended before the tool call landed.`,
);
}
const ccToolUseId = (extra?._meta as { 'claudecode/toolUseId'?: string } | undefined)?.[
'claudecode/toolUseId'
];
const progressToken = (extra?._meta as { progressToken?: string | number } | undefined)
?.progressToken;
// Use CC's own tool_use id as the bridge correlation key so the
// outbound `agent_intervention_request` shares an id with the
// existing tool message on the renderer side. Without this the
// renderer can't tie the intervention card to its tool bubble.
const toolCallId = ccToolUseId;
// SSE keepalive: every progressIntervalMs send a progress
// notification so CC's transport doesn't time out on long waits.
// Empirically required for >~5min — we cap at 5min anyway, but
// tick from the start so even 4-minute pendings get periodic life.
const onProgress =
progressToken !== undefined && extra?.sendNotification
? async (elapsedMs: number, totalMs: number) => {
try {
await extra.sendNotification!({
method: 'notifications/progress',
params: {
message: `Waiting for user (${Math.round(elapsedMs / 1000)}s)`,
progress: elapsedMs,
progressToken,
total: totalMs,
},
});
} catch {
// Non-fatal — the underlying transport may be torn down
// mid-flight; the next setInterval tick will skip too.
}
}
: undefined;
const answer = await op.bridge.pending(
{ arguments: args, toolCallId },
{
onProgress,
progressIntervalMs: this.progressIntervalMs,
timeoutMs: this.pendingTimeoutMs,
},
);
return formatAnswerForCC(answer, args);
},
);
}
}
const readJsonBody = async (req: http.IncomingMessage): Promise<unknown> => {
const chunks: Buffer[] = [];
for await (const chunk of req) {
chunks.push(typeof chunk === 'string' ? Buffer.from(chunk) : (chunk as Buffer));
}
if (chunks.length === 0) return undefined;
return JSON.parse(Buffer.concat(chunks).toString('utf8'));
};
const errorResult = (message: string) => ({
content: [{ text: message, type: 'text' as const }],
isError: true,
});
const formatAnswerForCC = (answer: InterventionAnswer, args: unknown) => {
if (answer.cancelled) {
const reasonText =
answer.cancelReason === 'timeout'
? 'No answer received within the wait window; the user did not respond. Continue without their input or ask in plain text.'
: answer.cancelReason === 'session_ended'
? 'The session was ended before the user could respond.'
: 'The user cancelled the question.';
return {
content: [{ text: reasonText, type: 'text' as const }],
isError: true,
};
}
// Success: format the structured answer back as text. CC's built-in
// AskUserQuestion returns "User answers:\n- <q>: <a>" style — match it
// so the model handles our payload identically.
const answerObj = (answer.result ?? {}) as Record<string, unknown>;
const questions = (args as { questions?: Array<{ question: string }> }).questions ?? [];
const lines = ['User answers:'];
for (const q of questions) {
const a = answerObj[q.question];
const formatted = Array.isArray(a) ? a.join(', ') : a == null ? '(no answer)' : String(a);
lines.push(`- ${q.question}: ${formatted}`);
}
return {
content: [{ text: lines.join('\n'), type: 'text' as const }],
};
};
@@ -0,0 +1,21 @@
/**
* Public constants shared between the producer-side MCP server (Node-only)
* and the consumer-side adapter / renderer (browser-safe). Kept in a
* dependency-free module so importers don't accidentally pull node:http
* etc. into the renderer bundle.
*/
/** MCP server name as it appears in the tool name prefix. */
export const ASK_USER_MCP_SERVER_NAME = 'lobe_cc';
/** MCP tool name (without the `mcp__lobe_cc__` prefix). */
export const ASK_USER_TOOL_NAME = 'ask_user_question';
/** Full tool name as the CC model sees it on the wire. */
export const ASK_USER_TOOL_FULL_NAME = `mcp__${ASK_USER_MCP_SERVER_NAME}__${ASK_USER_TOOL_NAME}`;
/**
* Stable apiName the adapter rewrites the MCP tool to so that downstream
* UI / persistence routes on a clean key, not the wire-prefixed MCP name.
*/
export const ASK_USER_API_NAME = 'askUserQuestion';
@@ -0,0 +1,30 @@
/**
* Producer-side MCP server + per-op bridge for Claude Code's AskUserQuestion
* via local HTTP MCP. See `LOBE-8725` for the full design.
*
* Used by:
* - Electron main (`HeterogeneousAgentCtr`) — local app
* - Sandbox CLI (`lh hetero exec`) — phase 2; for now the CLI doesn't
* register a server and CC falls back to text questions
*
* Consumer (renderer / web client) talks to the producer via the existing
* `AgentStreamEvent` pipeline — `agent_intervention_request` flows out,
* `agent_intervention_response` flows back.
*/
export {
AskUserBridge,
type InterventionAnswer,
type PendingArgs,
type PendingOptions,
} from './AskUserBridge';
export {
AskUserMcpServer,
type AskUserMcpServerOptions,
type StartedServer,
} from './AskUserMcpServer';
export {
ASK_USER_API_NAME,
ASK_USER_MCP_SERVER_NAME,
ASK_USER_TOOL_FULL_NAME,
ASK_USER_TOOL_NAME,
} from './constants';
@@ -1,3 +1,4 @@
import { ClaudeCodeIdentifier } from '@lobechat/builtin-tool-claude-code';
import { UserInteractionIdentifier } from '@lobechat/builtin-tool-user-interaction';
import {
AgentMarketplaceIdentifier,
@@ -117,8 +118,22 @@ const customInteractionSubmitHandlers = new Map<string, CustomInteractionSubmitH
[AgentMarketplaceIdentifier, handleAgentMarketplaceSubmit],
]);
/**
* Identifiers whose intervention component renders inline as a form (with
* `onInteractionAction` callbacks) rather than the default approve / reject
* approval UI. Hetero CLIs (CC AskUserQuestion etc.) need this surface
* because the answer ships back through IPC, not through a synthetic user
* turn.
*/
const HETERO_CUSTOM_INTERACTION_IDENTIFIERS = new Set<string>([ClaudeCodeIdentifier]);
export const isHeteroInteractionIdentifier = (identifier: string) =>
HETERO_CUSTOM_INTERACTION_IDENTIFIERS.has(identifier);
export const isCustomInteractionIdentifier = (identifier: string) =>
identifier === UserInteractionIdentifier || customInteractionSubmitHandlers.has(identifier);
identifier === UserInteractionIdentifier ||
isHeteroInteractionIdentifier(identifier) ||
customInteractionSubmitHandlers.has(identifier);
export const prepareCustomInteractionSubmit = async (
identifier: string,
@@ -4,6 +4,7 @@ import { Flexbox } from '@lobehub/ui';
import { memo, Suspense, useCallback, useMemo, useRef, useState } from 'react';
import { createPortal } from 'react-dom';
import { useChatStore } from '@/store/chat';
import { useUserStore } from '@/store/user';
import { toolInterventionSelectors } from '@/store/user/selectors';
@@ -12,6 +13,7 @@ import Arguments from '../Arguments';
import ApprovalActions from './ApprovalActions';
import {
isCustomInteractionIdentifier,
isHeteroInteractionIdentifier,
prepareCustomInteractionSubmit,
recordCustomInteractionResolution,
} from './customInteractionHandlers';
@@ -98,6 +100,11 @@ const Intervention = memo<InterventionProps>(
const submitToolInteraction = useConversationStore((s) => s.submitToolInteraction);
const skipToolInteraction = useConversationStore((s) => s.skipToolInteraction);
const cancelToolInteraction = useConversationStore((s) => s.cancelToolInteraction);
// Hetero (CC / Codex) interventions ship the answer back through IPC to a
// running CLI subprocess instead of starting a fresh `executeClientAgent`
// turn. Pull the chat-store action lazily so non-hetero interactions stay
// on the existing path with no behavior change.
const submitHeteroIntervention = useChatStore((s) => s.submitHeteroIntervention);
const handleInteractionAction = useCallback(
async (
@@ -106,6 +113,10 @@ const Intervention = memo<InterventionProps>(
| { type: 'skip'; payload?: Record<string, unknown>; reason?: string }
| { type: 'cancel'; payload?: Record<string, unknown> },
) => {
if (isHeteroInteractionIdentifier(identifier)) {
await submitHeteroIntervention(id, action.type, action.payload);
return;
}
switch (action.type) {
case 'submit': {
const { payload, options } = await prepareCustomInteractionSubmit(
@@ -149,6 +160,7 @@ const Intervention = memo<InterventionProps>(
identifier,
parsedArgs,
skipToolInteraction,
submitHeteroIntervention,
submitToolInteraction,
topicId,
],
@@ -1,16 +1,23 @@
import { AgentManagementIdentifier } from '@lobechat/builtin-tool-agent-management';
import { LOADING_FLAT } from '@lobechat/const';
import type { ConversationContext, HeterogeneousProviderConfig } from '@lobechat/types';
import { t } from 'i18next';
import { type StateCreator } from 'zustand';
import { message as antdMessage } from '@/components/AntdStaticMethods';
import { MESSAGE_CANCEL_FLAT } from '@/const/index';
import { messageService } from '@/services/message';
import { getAgentStoreState } from '@/store/agent';
import { agentSelectors } from '@/store/agent/selectors';
import { agentByIdSelectors, agentSelectors } from '@/store/agent/selectors';
import { useChatStore } from '@/store/chat';
import { topicSelectors } from '@/store/chat/selectors';
import { selectRuntimeType } from '@/store/chat/slices/aiChat/actions/agentDispatcher';
import {
parseMentionedAgentsFromEditorData,
parseSelectedSkillsFromEditorData,
parseSelectedToolsFromEditorData,
} from '@/store/chat/slices/aiChat/actions/commandBus';
import { resolveHeteroResume } from '@/store/chat/slices/aiChat/actions/heteroResume';
import { operationSelectors } from '@/store/chat/slices/operation/selectors';
import { INPUT_LOADING_OPERATION_TYPES } from '@/store/chat/slices/operation/types';
import {
@@ -49,6 +56,91 @@ const buildRetryInitialContext = (editorData: Record<string, any> | null | undef
};
};
/**
* Branch a hetero (Claude Code / Codex) turn off an existing user message.
*
* Used by regenerate (parent = user msg, prompt = original user content).
* Pre-creates the assistant row so `executeHeterogeneousAgent` has a stable
* `assistantMessageId` to stream into, then runs an `execHeterogeneousAgent`
* op as a child of the caller's parent op so Stop cancels the executor
* without killing the parent op early.
*/
const runHeterogeneousFromExistingMessage = async (
chatStore: ReturnType<typeof useChatStore.getState>,
params: {
context: ConversationContext;
heterogeneousProvider: HeterogeneousProviderConfig;
parentMessageId: string;
parentOperationId: string;
prompt: string;
},
): Promise<string> => {
const { context, heterogeneousProvider, parentMessageId, parentOperationId, prompt } = params;
const agentId = context.agentId;
if (!agentId) throw new Error('agentId is required for heterogeneous agent');
// Resolve workingDirectory: topic-level pin (set when bound to a project)
// wins over the agent-level default. Mirrors the sendMessage hetero branch
// so regenerate stays on the same project as the original turn.
const topic = context.topicId
? topicSelectors.getTopicById(context.topicId)(chatStore)
: undefined;
const agentWorkingDirectory =
agentByIdSelectors.getAgentWorkingDirectoryById(agentId)(getAgentStoreState());
const workingDirectory = topic?.metadata?.workingDirectory || agentWorkingDirectory;
// Drops the saved sessionId when its bound cwd disagrees with the current
// one — without this CC emits "No conversation found with session ID".
const { cwdChanged, resumeSessionId } = resolveHeteroResume(topic?.metadata, workingDirectory);
if (cwdChanged) antdMessage.info(t('heteroAgent.resumeReset.cwdChanged', { ns: 'chat' }));
const assistantMsg = await messageService.createMessage({
agentId,
content: LOADING_FLAT,
parentId: parentMessageId,
// External CLIs own model selection; persist only the runtime provider up
// front. The adapter backfills the actual model later if the CLI reports it.
provider: heterogeneousProvider.type,
role: 'assistant',
threadId: context.threadId ?? undefined,
topicId: context.topicId ?? undefined,
});
// Pull the new row into the store so the loading bubble is visible while
// the executor runs (the executor only dispatches updates, not creates).
await chatStore.refreshMessages();
if (context.topicId) chatStore.internal_updateTopicLoading(context.topicId, true);
const { operationId: heteroOpId } = chatStore.startOperation({
context,
label: 'Heterogeneous Agent Execution',
metadata: { heterogeneousType: heterogeneousProvider.type },
parentOperationId,
type: 'execHeterogeneousAgent',
});
chatStore.associateMessageWithOperation(assistantMsg.id, heteroOpId);
try {
const { executeHeterogeneousAgent } =
await import('@/store/chat/slices/aiChat/actions/heterogeneousAgentExecutor');
await executeHeterogeneousAgent(() => useChatStore.getState(), {
assistantMessageId: assistantMsg.id,
context,
heterogeneousProvider,
message: prompt,
operationId: heteroOpId,
resumeSessionId,
workingDirectory,
});
} finally {
if (context.topicId)
useChatStore.getState().internal_updateTopicLoading(context.topicId, false);
}
return assistantMsg.id;
};
/**
* Generation Actions
*
@@ -223,15 +315,11 @@ export const generationSlice: StateCreator<
isGatewayMode: chatStore.isGatewayModeEnabled(),
});
// TODO(LOBE-8519 follow-up): continue is currently only wired for the client
// runtime. Gateway / hetero continue both fall through to the client path
// here; a proper implementation needs runtime-specific resume semantics.
if (runtimeType !== 'client') {
console.warn(
`[continueGenerationMessage] runtime=${runtimeType} not yet supported; ` +
'falling through to client mode',
);
}
// Hetero CLIs (CC / Codex) have no "continue a cut-off response" primitive
// — each prompt is a fresh user turn from their perspective. Bail out
// rather than synthesize a fake "please continue" turn that would pollute
// the session and confuse the model. The button is a no-op in this mode.
if (runtimeType === 'hetero') return;
// Create continue operation with ConversationStore context (includes groupId)
const { operationId } = chatStore.startOperation({
@@ -240,7 +328,24 @@ export const generationSlice: StateCreator<
});
try {
// Execute agent runtime with full context from ConversationStore
// ── Gateway mode: branch a server-side run from the cut-off message ──
// `parentMessageId` triggers `resume: true` on the router, so the server
// skips user-message creation and continues from the existing chain.
// Empty prompt is intentional and matches the approve/reject resume path.
if (runtimeType === 'gateway') {
await chatStore.executeGatewayAgent({
context,
message: '',
onComplete: () => {
chatStore.completeOperation(operationId);
if (hooks.onContinueComplete) hooks.onContinueComplete(displayMessageId);
},
parentMessageId: dbMessageId,
});
return;
}
// ── Client mode: run agent locally ──
await chatStore.executeClientAgent({
context,
messages: displayMessages,
@@ -381,8 +486,9 @@ export const generationSlice: StateCreator<
});
const agentConfig = agentSelectors.getAgentConfigById(context.agentId)(getAgentStoreState());
const heterogeneousProvider = agentConfig?.agencyConfig?.heterogeneousProvider;
const runtimeType = selectRuntimeType({
heterogeneousProvider: agentConfig?.agencyConfig?.heterogeneousProvider,
heterogeneousProvider,
isGatewayMode: chatStore.isGatewayModeEnabled(),
});
@@ -405,13 +511,25 @@ export const generationSlice: StateCreator<
return;
}
// ── Client mode: run agent locally ──
// TODO(LOBE-8519 follow-up): hetero regenerate is not yet implemented and
// currently falls through to client mode (silently uses the agent's underlying
// LLM instead of routing back through the heterogeneous CLI). Implementing it
// requires the same persistence + executeHeterogeneousAgent setup as
// sendMessage's hetero branch.
// ── Hetero mode: re-run the local CLI against the original user prompt ──
// Creates a fresh assistant row branched off the existing user message so
// the CC / Codex turn replaces the previous attempt without rewriting
// history, and resumes the same session id (when the cwd still matches)
// so prior context is preserved.
if (runtimeType === 'hetero' && heterogeneousProvider) {
await runHeterogeneousFromExistingMessage(chatStore, {
context,
heterogeneousProvider,
parentMessageId: messageId,
parentOperationId: operationId,
prompt: item.content,
});
chatStore.completeOperation(operationId);
if (hooks.onRegenerateComplete) hooks.onRegenerateComplete(messageId);
return;
}
// ── Client mode: run agent locally ──
// Execute agent runtime with full context from ConversationStore
await chatStore.executeClientAgent({
context,
+2
View File
@@ -357,6 +357,8 @@ const AgentStreamEventSchema = z.object({
'tool_end',
'tool_execute',
'tool_result',
'agent_intervention_request',
'agent_intervention_response',
'step_start',
'step_complete',
'error',
@@ -39,6 +39,21 @@ class HeterogeneousAgentService {
async getSessionInfo(sessionId: string) {
return this.ipc.heterogeneousAgent.getSessionInfo({ sessionId });
}
/**
* Submit the user's answer (or cancellation) for a pending CC
* AskUserQuestion intervention. The main process routes it to the
* matching MCP bridge so the blocked tool handler can return to CC.
*/
async submitIntervention(params: {
cancelReason?: 'timeout' | 'user_cancelled';
cancelled?: boolean;
operationId: string;
result?: unknown;
toolCallId: string;
}) {
return this.ipc.heterogeneousAgent.submitIntervention(params);
}
}
export const heterogeneousAgentService = new HeterogeneousAgentService();
@@ -631,6 +631,179 @@ export class ConversationControlActionImpl {
completeOperation(operationId);
};
/**
* Resolve a heterogeneous-runtime intervention (CC AskUserQuestion, …).
*
* Why this action exists separately from `submitToolInteraction`:
* - The CC subprocess is already running and blocked on an MCP call —
* we need to feed the answer back through the IPC bridge, not spawn
* a fresh `executeClientAgent` turn.
* - Once the answer ships, CC's existing stream emits `tool_result` and
* keeps going on its own; no synthetic user message, no new op.
*
* The framework's intervention surface still drives the UI: we just
* stamp `pluginIntervention.status` and the eventual `tool_result`
* content via the same optimistic primitives, so the InterventionBar /
* inline tool body update synchronously and the answered Render takes
* over once `pluginIntervention.status === 'approved' | 'rejected'`.
*
* `actionType`:
* - `'submit'` → mark approved, ship `payload` as the answer
* - `'skip' | 'cancel'` → mark rejected, ship `cancelled: true` so the
* bridge resolves with `cancelReason` and CC sees an isError result
* (it'll fall back to plain-text questioning)
*/
submitHeteroIntervention = async (
toolMessageId: string,
actionType: 'submit' | 'skip' | 'cancel',
payload?: Record<string, unknown>,
context?: ConversationContext,
): Promise<void> => {
const toolMessage = dbMessageSelectors.getDbMessageById(toolMessageId)(this.#get());
if (!toolMessage) return;
const toolCallId = toolMessage.tool_call_id;
if (!toolCallId) {
console.warn('[submitHeteroIntervention] tool message has no tool_call_id', toolMessageId);
return;
}
// Walk up to the assistant that owns this tool — its operation is the
// running CC stream we need to address. Falls through to the tool
// message id itself if a producer ever associated it directly.
const { messageOperationMap } = this.#get();
const operationId =
(toolMessage.parentId && messageOperationMap?.[toolMessage.parentId]) ??
messageOperationMap?.[toolMessageId];
if (!operationId) {
console.warn('[submitHeteroIntervention] no operationId for', toolMessageId);
return;
}
const effectiveContext: ConversationContext = context ?? {
agentId: this.#get().activeAgentId,
topicId: this.#get().activeTopicId,
threadId: this.#get().activeThreadId,
};
const optimisticContext: OptimisticUpdateContext = { operationId };
if (actionType === 'submit') {
await this.#get().optimisticUpdateMessagePlugin(
toolMessageId,
{ intervention: { status: 'approved' } },
optimisticContext,
);
// Persist the structured `{ [questionText]: selectedLabel(s) }` answers
// to `pluginState.askUserAnswers` so the Render component can show
// Q&A pairs instead of parsing the bridge's prose `User answers:`
// dump out of `content`. Best-effort — never block the IPC submit.
await this.setInterventionAnswers(toolMessageId, payload ?? {}, optimisticContext);
// Bridge formats its own "User answers:" string for CC, so the eventual
// tool_result re-rewrites this content. The optimistic write is just
// for the brief gap between Submit and CC echoing the result back.
const summary = `User submitted: ${JSON.stringify(payload ?? {})}`;
await this.#get().optimisticUpdateMessageContent(
toolMessageId,
summary,
undefined,
optimisticContext,
);
} else {
const reason = actionType === 'skip' ? 'User skipped' : 'User cancelled';
await this.#get().optimisticUpdateMessagePlugin(
toolMessageId,
{ intervention: { rejectedReason: reason, status: 'rejected' } },
optimisticContext,
);
await this.#get().optimisticUpdateMessageContent(
toolMessageId,
`${reason} this interaction.`,
undefined,
optimisticContext,
);
}
// Forward to the producer (Electron main → bridge.resolve). Dynamic
// import keeps `@/services/electron/*` out of non-Electron bundles.
try {
const { heterogeneousAgentService } = await import('@/services/electron/heterogeneousAgent');
await heterogeneousAgentService.submitIntervention(
actionType === 'submit'
? { operationId, result: payload ?? {}, toolCallId }
: {
cancelReason: actionType === 'skip' ? 'user_cancelled' : 'user_cancelled',
cancelled: true,
operationId,
toolCallId,
},
);
} catch (err) {
console.error('[submitHeteroIntervention] IPC submitIntervention failed:', err);
}
void effectiveContext;
};
/**
* In-memory draft store for an intervention form. Backs the renderer's
* "remember what I'd partially answered" behaviour without paying for a
* DB round-trip on every keystroke — drafts only matter while the
* intervention is pending (5 min cap), and the canonical pluginState
* mirror is enough to survive HMR / panel re-mounts.
*
* `askUserDraft` is irrelevant after submit (the form unmounts), so we
* don't bother clearing it — it stays buried under `askUserAnswers` in
* `pluginState` and never affects the completed Render.
*/
setInterventionDraft = (toolMessageId: string, draft: Record<string, unknown>): void => {
this.#get().internal_dispatchMessage({
id: toolMessageId,
key: 'askUserDraft',
type: 'updatePluginState',
value: draft,
});
};
/**
* Persist the structured intervention answers (`{ questionText:
* selectedLabel | selectedLabel[] }`) to the tool message's
* `pluginState.askUserAnswers`. Drives structured Q&A rendering on the
* `Render` component without re-parsing the bridge's prose tool_result.
*
* Both writes are merge-style by key — the in-memory `updatePluginState`
* reducer (`message/reducer.ts:142`) and the DB
* `messageModel.updatePluginState` shallow-merge so co-existing keys
* (`askUserDraft` etc.) survive. DB write is best-effort: a slow lambda
* must not strand the IPC submit that follows.
*/
setInterventionAnswers = async (
toolMessageId: string,
answers: Record<string, unknown>,
context?: OptimisticUpdateContext,
): Promise<void> => {
this.#get().internal_dispatchMessage(
{
id: toolMessageId,
key: 'askUserAnswers',
type: 'updatePluginState',
value: answers,
},
context,
);
try {
const { messageService } = await import('@/services/message');
const ctx = this.#get().internal_getConversationContext(context);
await messageService.updateMessagePluginState(
toolMessageId,
{ askUserAnswers: answers },
ctx,
);
} catch (err) {
console.warn('[setInterventionAnswers] persist failed:', err);
}
};
rejectToolCalling = async (
messageId: string,
reason?: string,
@@ -1,4 +1,7 @@
import type { AgentStreamEvent } from '@lobechat/agent-gateway-client';
import type {
AgentInterventionRequestData,
AgentStreamEvent,
} from '@lobechat/agent-gateway-client';
import { isDesktop } from '@lobechat/const';
import {
CLAUDE_CODE_CLI_INSTALL_DOCS_URL,
@@ -1319,6 +1322,44 @@ export const executeHeterogeneousAgent = async (
// Record for debugging
trace.push({ event, timestamp: Date.now() });
// ─── agent_intervention_request: CC AskUserQuestion needs user input ───
// Stamp the canonical `pluginIntervention.status='pending'` on the
// matching tool message via `optimisticUpdateMessagePlugin` — that
// single primitive (1) writes to DB, (2) updates the in-memory
// `dbMessagesMap` reducer, AND (3) mirrors the same intervention onto
// the parent assistant's `tools[].intervention` so both surfaces
// (inline tool body + bottom InterventionBar) light up immediately.
// The Intervention component registered under
// `BuiltinToolInterventions['claude-code'][askUserQuestion]` is
// rendered automatically by the framework while pending; the
// eventual `tool_result` content (formatted answer text) gets
// overwritten via the existing `tool_result` branch below.
// Deferred behind `persistQueue` so it lands AFTER `persistToolBatch`
// populates `toolMsgIdByCallId`.
if (event.type === 'agent_intervention_request') {
const data = event.data as AgentInterventionRequestData;
persistQueue = persistQueue.then(async () => {
const toolMsgId = toolMsgIdByCallId.get(data.toolCallId);
if (!toolMsgId) {
console.warn(
'[HeterogeneousAgent] intervention_request for unknown toolCallId:',
data.toolCallId,
);
return;
}
try {
await get().optimisticUpdateMessagePlugin(
toolMsgId,
{ intervention: { status: 'pending' } },
{ operationId },
);
} catch (err) {
console.error('[HeterogeneousAgent] persist intervention pending failed:', err);
}
});
return;
}
// ─── tool_result: update tool message content in DB (ACP-only) ───
if (event.type === 'tool_result') {
const { content, isError, pluginState, toolCallId } = event.data as {