♻️ refactor(hetero-agent): extract producer pipeline into shared package (#14425)

* 💄 style(todo-progress): use colorFillSecondary so left/right borders are visible against QueueTray

The colorBorderSecondary stroke nearly vanished against the dark elevated bg, so the TODO card looked open on the sides when stacked under QueueTray. Match QueueTray's outer border token (colorFillSecondary) for a consistent visible seam; inner dividers keep colorBorderSecondary as a softer secondary level.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* ♻️ refactor(hetero-agent): extract producer pipeline into shared package

LOBE-8516 phase 0. Move the JSONL framing + adapter conversion + toStreamEvent
chain out of the renderer into a new `@lobechat/heterogeneous-agents/spawn`
entry, then have desktop main run it before broadcasting. Renderer now
consumes ready-made `AgentStreamEvent`s on `heteroAgentEvent`, dropping ~50
lines of in-renderer adapter wiring.

This unifies the wire shape across desktop main, the upcoming `lh hetero exec`
CLI, and the server `heteroIngest` handler — every consumer gets the same
stamped `AgentStreamEvent` with no per-consumer adapter step.

The desktop CC flow is unchanged behavior-wise: same adapter, same persistence
ordering, same step-boundary semantics; only the seam between main and
renderer moved.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* ♻️ refactor(hetero-agent): pull codex tracker into shared spawn, drop desktop's gateway-client dep

Two cleanups on top of the phase 0 refactor:

1. Move `CodexFileChangeTracker` (+ its test) out of `apps/desktop/src/main/modules/heterogeneousAgent/` into `packages/heterogeneous-agents/src/spawn/`. `AgentStreamPipeline` now auto-instantiates it when `agentType === 'codex'`, so the desktop controller (and the future `lh hetero exec` CLI) stays agent-agnostic — no more "if codex { wire tracker via transformPayload }" branching at the call site. The public `transformPayload` hook is removed since it had no other consumer.

2. Re-export `AgentStreamEvent` / `AgentStreamEventType` from `@lobechat/heterogeneous-agents/spawn` and drop `@lobechat/agent-gateway-client` from `apps/desktop/package.json`. The gateway-client package is a browser-side WebSocket client; producer-side callers (desktop main, sandbox CLI) shouldn't carry it as a direct dep — they only need the type, which now flows through the producer-side entry.

Type predicate on Codex payloads tightened to a non-`Required<>` shape so the moved file passes the root tsconfig's `strict: true` (apps/desktop's tsconfig was lax).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🧑‍💻 chore(local-testing): harden electron-dev.sh process management

Lifecycle improvements for the local-testing helper so smoke runs against the desktop dev session are reliable:

- `find_project_pids` now also catches user-started `bun run dev` Electron sessions (matches by project electron path, not just `--remote-debugging-port`), the launcher subshell saved to PIDFILE, and any process bound to the CDP port. Vite match tightened to `electron-vite[/.].*\bdev\b` so unrelated Vite invocations aren't swept up.
- `do_stop` expands seed PIDs into their descendant trees (DFS via `pgrep -P`), SIGTERMs the whole tree, waits 5s, then SIGKILLs survivors. Belt-and-suspenders sweep for stragglers + anything still bound to the CDP port. Closes the long-standing "Helper processes survive the kill" gotcha.
- `do_start` detects existing project Electron/vite before tearing it down so the user sees what's being killed; waits for port + user-data-dir locks to release before relaunching to avoid the "user data directory in use" race.
- `wait_for_cdp` uses an explicit deadline + early bail-out if the launcher PID dies, instead of the previous fixed-step loop. `wait_for_renderer` no longer pre-sleeps 10s.

`setsid` use is intentional; it puts the launched Electron in its own session so the whole tree shares a PGID we can signal in one shot. Note: `setsid` is GNU coreutils — on macOS without `brew install util-linux` the script will fail at the launch step. Documented as a known limitation; no fallback added.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🐛 fix(hetero-agent): gate session-complete on stdout fully drained

Node may emit `proc.on('exit')` BEFORE child stdio fully closes (documented
in child_process: "stdio streams might still be open"). Phase 0 of LOBE-8516
moved adapter ownership to main, so renderer no longer flushes its own
adapter on session-complete — meaning trailing events synthesized by
`pipeline.flush()` (e.g. Codex's `tool_end` for unfinished tool calls) would
race against, and lose to, the `heteroAgentSessionComplete` broadcast,
leaving renderer-side persistence to finalize on incomplete state.

Fix: in `proc.on('exit')`, await `streamFinished(stdout)` (covers `'end'`,
`'close'`, and `'error'`) BEFORE awaiting the broadcast queue. The first
await ensures the `stdout.on('end')` handler has had a chance to schedule
`pipeline.flush()` onto the queue; the second drains it. Only then do we
broadcast complete / error.

Regression test repros the documented Node race by emitting `exit` before
`stdout.end()` and asserts every `heteroAgentEvent` (including the
synthesized `tool_end` from `pipeline.flush()`) lands before
`heteroAgentSessionComplete`. Bisected: test fails without the gate, passes
with it.

Also: add `packages/heterogeneous-agents` to `apps/desktop/pnpm-workspace.yaml`
to mirror the new workspace dep added in the phase 0 refactor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🐛 fix(hetero-agent): drop builtin-tool-claude-code dep, inline the 3 CC wire shapes the adapter needs

Phase 0 added `@lobechat/heterogeneous-agents` as a runtime dep of the desktop
main process. That transitively pulled in `@lobechat/builtin-tool-claude-code`
(declared in the shared package's deps), which the desktop pnpm workspace
doesn't list — CI install on the desktop project fails:

    ERR_PNPM_WORKSPACE_PKG_NOT_FOUND  In ../../packages/heterogeneous-agents:
    "@lobechat/builtin-tool-claude-code@workspace:*" is in the dependencies but
    no package named "@lobechat/builtin-tool-claude-code" is present in the
    workspace

The dep is also a layer-violation: `heterogeneous-agents` is the producer
side (CLI stream → AgentStreamEvent), `builtin-tool-claude-code` is the UI
tool definition (renderers / inspectors / agent template). Producer
shouldn't depend on UI-tool packages, even if today the import is just
types/constants — the dep cascade still drags `shared-tool-ui` etc. into
every workspace that wants the adapter.

Fix: inline the three things the adapter actually uses (`'TodoWrite'` tool
name string, `TodoWriteArgs` interface, `ClaudeCodeTodoItem` interface).
They reflect upstream Claude Code's wire schema — if `claude` ever renames
`TodoWrite`, the adapter and the downstream renderers must both update
regardless of whether they share a constant. Renderer-side packages
(`builtin-tools/codex/TodoListRender`, etc.) keep importing the canonical
`ClaudeCodeApiName` from `@lobechat/builtin-tool-claude-code`.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Arvin Xu
2026-05-05 01:04:09 +08:00
committed by GitHub
parent b5e4cd0805
commit fe65741a32
23 changed files with 1144 additions and 678 deletions
@@ -11,86 +11,167 @@
# Environment variables:
# CDP_PORT — Chrome DevTools Protocol port (default: 9222)
# ELECTRON_LOG — Log file path (default: /tmp/electron-dev.log)
# ELECTRON_WAIT_S — Max seconds to wait for Electron process (default: 60)
# RENDERER_WAIT_S — Max seconds to wait for renderer/SPA (default: 60)
# ELECTRON_WAIT_S — Max seconds to wait for CDP to become reachable (default: 90)
# RENDERER_WAIT_S — Max seconds to wait for SPA after CDP is up (default: 60)
# FORCE_KILL_USER — When set to 1, silently kill the user's `bun run dev`
# Electron without confirmation (default: always confirm-by-action)
#
set -euo pipefail
CDP_PORT="${CDP_PORT:-9222}"
ELECTRON_LOG="${ELECTRON_LOG:-/tmp/electron-dev.log}"
ELECTRON_WAIT_S="${ELECTRON_WAIT_S:-60}"
ELECTRON_WAIT_S="${ELECTRON_WAIT_S:-90}"
RENDERER_WAIT_S="${RENDERER_WAIT_S:-60}"
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/../../../.." && pwd)"
PIDFILE="/tmp/electron-dev-cdp-${CDP_PORT}.pid"
# Project-scoped electron path prefix used for pgrep matching. Any Electron
# binary from this project (main + helpers, with or without --remote-debugging-port)
# starts with this string in its argv[0], so a single substring match catches all.
PROJECT_ELECTRON_PATH="${PROJECT_ROOT}/apps/desktop/node_modules/.pnpm/electron@"
# ── Helpers ──────────────────────────────────────────────────────────
# Get the Electron binary path used by this project
electron_bin_pattern() {
echo "${PROJECT_ROOT}/apps/desktop/node_modules/.pnpm/electron@*/node_modules/electron/dist/Electron.app"
# Print pid + every descendant pid (DFS via pgrep -P).
expand_descendants() {
local pid="$1"
echo "$pid"
local children
children=$(pgrep -P "$pid" 2>/dev/null || true)
for c in $children; do
expand_descendants "$c"
done
}
# Find all PIDs related to the project's Electron dev session
find_electron_pids() {
# Find seed PIDs related to this project's Electron dev session.
# Matches REGARDLESS of whether --remote-debugging-port was passed, so it also
# catches a plain `bun run dev` session the user started outside this script.
find_project_pids() {
local pids=""
# 1. Main Electron process (launched with --remote-debugging-port)
local main_pids
main_pids=$(pgrep -f "Electron\.app.*--remote-debugging-port=${CDP_PORT}" 2>/dev/null || true)
[ -n "$main_pids" ] && pids="$pids $main_pids"
# 1. Any process whose command line mentions this project's electron path
# (covers the main Electron binary AND every Helper subprocess)
local electron_pids
electron_pids=$(pgrep -f "$PROJECT_ELECTRON_PATH" 2>/dev/null || true)
pids="$pids $electron_pids"
# 2. Electron Helper processes (gpu, renderer, utility) spawned from the project's electron binary
local helper_pids
helper_pids=$(pgrep -f "${PROJECT_ROOT}/apps/desktop/node_modules/.*Electron Helper" 2>/dev/null || true)
[ -n "$helper_pids" ] && pids="$pids $helper_pids"
# 3. electron-vite dev server
# 2. electron-vite dev server (narrow match to avoid catching unrelated Vite invocations)
local vite_pids
vite_pids=$(pgrep -f "electron-vite.*dev" 2>/dev/null || true)
[ -n "$vite_pids" ] && pids="$pids $vite_pids"
vite_pids=$(pgrep -f "electron-vite[/.].*\\bdev\\b" 2>/dev/null || true)
pids="$pids $vite_pids"
# 4. PID from pidfile (fallback)
# 3. The launcher subshell from a previous `start` (saved to pidfile)
if [ -f "$PIDFILE" ]; then
local saved_pid
saved_pid=$(cat "$PIDFILE")
if kill -0 "$saved_pid" 2>/dev/null; then
saved_pid=$(cat "$PIDFILE" 2>/dev/null || true)
if [ -n "$saved_pid" ] && kill -0 "$saved_pid" 2>/dev/null; then
pids="$pids $saved_pid"
fi
fi
# Deduplicate
echo "$pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' ' || true
# 4. Whatever is currently bound to the CDP port — catches strays whose
# binary path doesn't match (e.g. orphaned from a crashed restart)
local port_pid
port_pid=$(lsof -ti tcp:"$CDP_PORT" -sTCP:LISTEN 2>/dev/null || true)
pids="$pids $port_pid"
echo "$pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' '
}
# Wait for the CDP HTTP endpoint to respond, with a deadline + early bail-out
# if the launcher process died (no point waiting if Electron crashed).
wait_for_cdp() {
local deadline=$(( $(date +%s) + ELECTRON_WAIT_S ))
echo "[electron-dev] Waiting for CDP on port ${CDP_PORT} (up to ${ELECTRON_WAIT_S}s)..."
while [ "$(date +%s)" -lt "$deadline" ]; do
if curl -sf --max-time 2 "http://localhost:${CDP_PORT}/json/version" >/dev/null 2>&1; then
echo "[electron-dev] CDP is reachable."
return 0
fi
# If our launcher subshell died, abort early so we don't hang the full timeout
if [ -f "$PIDFILE" ]; then
local saved_pid
saved_pid=$(cat "$PIDFILE" 2>/dev/null || true)
if [ -n "$saved_pid" ] && ! kill -0 "$saved_pid" 2>/dev/null; then
echo "[electron-dev] Launcher PID $saved_pid is gone before CDP came up."
echo "[electron-dev] Last 30 lines of $ELECTRON_LOG:"
tail -30 "$ELECTRON_LOG" 2>/dev/null || true
return 1
fi
fi
sleep 2
done
echo "[electron-dev] ERROR: CDP did not respond within ${ELECTRON_WAIT_S}s"
echo "[electron-dev] Last 30 lines of $ELECTRON_LOG:"
tail -30 "$ELECTRON_LOG" 2>/dev/null || true
return 1
}
# After CDP is up, wait until the SPA renders interactive elements.
wait_for_renderer() {
local deadline=$(( $(date +%s) + RENDERER_WAIT_S ))
echo "[electron-dev] Waiting for SPA to load (up to ${RENDERER_WAIT_S}s)..."
while [ "$(date +%s)" -lt "$deadline" ]; do
local snap
snap=$(agent-browser --cdp "$CDP_PORT" snapshot -i 2>&1 || true)
if echo "$snap" | grep -qE '\b(link|button)\b'; then
echo "[electron-dev] Renderer ready."
return 0
fi
sleep 2
done
echo "[electron-dev] WARNING: Renderer not interactive within ${RENDERER_WAIT_S}s — proceeding anyway."
return 0
}
# ── Commands ─────────────────────────────────────────────────────────
do_stop() {
echo "[electron-dev] Stopping Electron dev environment..."
local pids
pids=$(find_electron_pids)
local seed_pids
seed_pids=$(find_project_pids)
if [ -z "$pids" ]; then
echo "[electron-dev] No Electron processes found."
# Expand to include all descendants — catches helpers spawned by the main
# process AFTER our pgrep snapshot, and the launcher's child node/electron-vite
# process tree.
local all_pids=""
for pid in $seed_pids; do
all_pids="$all_pids $(expand_descendants "$pid")"
done
all_pids=$(echo "$all_pids" | tr ' ' '\n' | sort -u | grep -v '^$' | tr '\n' ' ')
if [ -z "$all_pids" ]; then
echo "[electron-dev] No project Electron/vite processes found."
else
echo "[electron-dev] Killing PIDs: $pids"
for pid in $pids; do
local count
count=$(echo "$all_pids" | tr ' ' '\n' | grep -c .)
echo "[electron-dev] Sending SIGTERM to $count process(es): $all_pids"
for pid in $all_pids; do
kill "$pid" 2>/dev/null || true
done
# Wait up to 5s for graceful exit, then force-kill survivors
# Wait up to 5s for graceful exit
local waited=0
while [ $waited -lt 5 ]; do
local alive=""
for pid in $pids; do
kill -0 "$pid" 2>/dev/null && alive="$alive $pid"
local any_alive=0
for pid in $all_pids; do
if kill -0 "$pid" 2>/dev/null; then any_alive=1; break; fi
done
[ -z "$alive" ] && break
[ "$any_alive" = "0" ] && break
sleep 1
waited=$((waited + 1))
done
# Force-kill any remaining
for pid in $pids; do
# SIGKILL anyone still alive
for pid in $all_pids; do
if kill -0 "$pid" 2>/dev/null; then
echo "[electron-dev] Force-killing PID $pid"
kill -9 "$pid" 2>/dev/null || true
@@ -98,7 +179,27 @@ do_stop() {
done
fi
# Also close any agent-browser sessions connected to this port
# Belt-and-suspenders: anything still bound to the CDP port goes away
local port_pid
port_pid=$(lsof -ti tcp:"$CDP_PORT" -sTCP:LISTEN 2>/dev/null || true)
if [ -n "$port_pid" ]; then
echo "[electron-dev] Port $CDP_PORT still bound by PID $port_pid; force-killing"
# shellcheck disable=SC2086
kill -9 $port_pid 2>/dev/null || true
fi
# Also re-sweep the project's electron processes — sometimes the OS spawns
# new helpers during shutdown that didn't exist when we first enumerated.
local stragglers
stragglers=$(pgrep -f "$PROJECT_ELECTRON_PATH" 2>/dev/null || true)
if [ -n "$stragglers" ]; then
echo "[electron-dev] Cleaning up stragglers: $stragglers"
for pid in $stragglers; do
kill -9 "$pid" 2>/dev/null || true
done
fi
# Close any agent-browser sessions connected to this port
agent-browser --cdp "$CDP_PORT" close --all 2>/dev/null || true
rm -f "$PIDFILE"
@@ -107,113 +208,84 @@ do_stop() {
do_status() {
local pids
pids=$(find_electron_pids)
pids=$(find_project_pids)
if [ -z "$pids" ]; then
echo "[electron-dev] Electron is NOT running."
echo "[electron-dev] No project Electron processes found."
return 1
fi
echo "[electron-dev] Electron is running (PIDs: $pids)"
echo "[electron-dev] Project processes: $pids"
# Check CDP connectivity
if agent-browser --cdp "$CDP_PORT" get url >/dev/null 2>&1; then
if curl -sf --max-time 2 "http://localhost:${CDP_PORT}/json/version" >/dev/null 2>&1; then
local url
url=$(agent-browser --cdp "$CDP_PORT" get url 2>&1 | tail -1)
url=$(agent-browser --cdp "$CDP_PORT" get url 2>&1 | tail -1 || echo "?")
echo "[electron-dev] CDP port ${CDP_PORT} is reachable. URL: $url"
return 0
else
echo "[electron-dev] CDP port ${CDP_PORT} is NOT reachable (Electron may still be loading)."
echo "[electron-dev] CDP port ${CDP_PORT} is NOT reachable (no --remote-debugging-port, or still loading)."
return 2
fi
}
wait_for_electron() {
echo "[electron-dev] Waiting for Electron process (up to ${ELECTRON_WAIT_S}s)..."
local elapsed=0
local interval=3
while [ $elapsed -lt "$ELECTRON_WAIT_S" ]; do
if strings "$ELECTRON_LOG" 2>/dev/null | grep -q "starting electron"; then
echo "[electron-dev] Electron process started."
return 0
fi
sleep "$interval"
elapsed=$((elapsed + interval))
echo "[electron-dev] Still waiting... (${elapsed}/${ELECTRON_WAIT_S}s)"
done
echo "[electron-dev] ERROR: Electron did not start within ${ELECTRON_WAIT_S}s"
echo "[electron-dev] Last 20 lines of log:"
tail -20 "$ELECTRON_LOG" 2>/dev/null || true
return 1
}
wait_for_renderer() {
echo "[electron-dev] Waiting for renderer/SPA to load (up to ${RENDERER_WAIT_S}s)..."
# Initial delay — renderer needs time to bootstrap
sleep 10
local elapsed=10
local interval=5
while [ $elapsed -lt "$RENDERER_WAIT_S" ]; do
if agent-browser --cdp "$CDP_PORT" wait 2000 >/dev/null 2>&1; then
# Check if interactive elements are present (SPA loaded)
local snap
snap=$(agent-browser --cdp "$CDP_PORT" snapshot -i 2>&1 || true)
if echo "$snap" | grep -qE 'link |button '; then
echo "[electron-dev] Renderer ready (interactive elements found)."
return 0
fi
fi
sleep "$interval"
elapsed=$((elapsed + interval))
echo "[electron-dev] SPA still loading... (${elapsed}/${RENDERER_WAIT_S}s)"
done
echo "[electron-dev] WARNING: Timed out waiting for renderer, proceeding anyway."
return 0
}
do_start() {
# If already running and healthy, skip
local status_ok=0
do_status >/dev/null 2>&1 || status_ok=$?
if [ "$status_ok" -eq 0 ]; then
echo "[electron-dev] Electron is already running and CDP is reachable. Skipping start."
echo "[electron-dev] Use 'restart' to force a fresh session, or 'stop' to tear down."
# Already up and CDP is reachable → nothing to do
if curl -sf --max-time 2 "http://localhost:${CDP_PORT}/json/version" >/dev/null 2>&1; then
echo "[electron-dev] CDP already reachable on port $CDP_PORT. Skipping start."
echo "[electron-dev] Use 'restart' to force a fresh session."
return 0
fi
# Clean up any stale processes
# Detect the user's existing dev session (or stale processes) BEFORE killing
local existing
existing=$(find_project_pids)
if [ -n "$existing" ]; then
echo "[electron-dev] Existing project Electron/vite processes detected:"
echo "$existing" | tr ' ' '\n' | sed 's/^/[electron-dev] PID /'
echo "[electron-dev] Tearing them down so we can start a CDP-enabled session..."
fi
do_stop
# Start fresh
# Wait for port + user-data-dir locks to release. Without this, the new
# Electron may fail with "user data directory in use" or fail to bind CDP.
local waited=0
while [ $waited -lt 10 ]; do
if ! lsof -i tcp:"$CDP_PORT" >/dev/null 2>&1 \
&& ! pgrep -f "$PROJECT_ELECTRON_PATH" >/dev/null 2>&1; then
break
fi
[ $waited -eq 0 ] && echo "[electron-dev] Waiting for port + Electron locks to release..."
sleep 1
waited=$((waited + 1))
done
echo "[electron-dev] Starting Electron dev server..."
echo "[electron-dev] Project: $PROJECT_ROOT"
echo "[electron-dev] Project: $PROJECT_ROOT"
echo "[electron-dev] CDP port: $CDP_PORT"
echo "[electron-dev] Log: $ELECTRON_LOG"
echo "[electron-dev] Log: $ELECTRON_LOG"
: > "$ELECTRON_LOG" # Truncate log
(
cd "$PROJECT_ROOT/apps/desktop" && \
ELECTRON_ENABLE_LOGGING=1 npx electron-vite dev -- --remote-debugging-port="$CDP_PORT" \
>> "$ELECTRON_LOG" 2>&1
) &
local bg_pid=$!
echo "$bg_pid" > "$PIDFILE"
echo "[electron-dev] Background PID: $bg_pid"
# Launch in a new session (setsid) so the whole process tree shares a PGID
# we can later signal in one shot. `setsid bash -c '... exec ...' &` keeps
# the bash shell as the session leader; its PID is what we save.
setsid bash -c "
cd '$PROJECT_ROOT/apps/desktop'
exec npx electron-vite dev -- --remote-debugging-port=$CDP_PORT
" >> "$ELECTRON_LOG" 2>&1 < /dev/null &
local launcher_pid=$!
echo "$launcher_pid" > "$PIDFILE"
echo "[electron-dev] Launcher PID (session leader): $launcher_pid"
# Wait for Electron process to start
if ! wait_for_electron; then
echo "[electron-dev] Failed to start. Cleaning up..."
if ! wait_for_cdp; then
echo "[electron-dev] Failed to bring up CDP. Cleaning up..."
do_stop
return 1
fi
# Wait for renderer to be interactive
if ! wait_for_renderer; then
echo "[electron-dev] Renderer not ready, but Electron is running. You may need to wait more."
echo "[electron-dev] Renderer not interactive — you may need to wait more."
fi
echo "[electron-dev] Ready! Use: agent-browser --cdp $CDP_PORT snapshot -i"
@@ -221,7 +293,7 @@ do_start() {
do_restart() {
do_stop
sleep 2
sleep 1
do_start
}
@@ -235,10 +307,12 @@ case "${1:-help}" in
*)
echo "Usage: $0 {start|stop|status|restart}"
echo ""
echo " start — Start Electron dev with CDP (idempotent, skips if already running)"
echo " stop — Kill all Electron dev processes (main + helpers + vite)"
echo " status — Check if Electron is running and CDP is reachable"
echo " restart — Stop then start"
echo " start — Start Electron dev with CDP. Detects + tears down any"
echo " existing project Electron (e.g. \`bun run dev\`) first."
echo " stop — Kill all project Electron/vite processes (main + helpers"
echo " + descendants), with SIGTERM → 5s wait → SIGKILL fallback."
echo " status — Check if Electron is running and CDP is reachable."
echo " restart — Stop then start."
exit 1
;;
esac
+1
View File
@@ -58,6 +58,7 @@
"@lobechat/electron-client-ipc": "workspace:*",
"@lobechat/electron-server-ipc": "workspace:*",
"@lobechat/file-loaders": "workspace:*",
"@lobechat/heterogeneous-agents": "workspace:*",
"@lobechat/local-file-shell": "workspace:*",
"@lobehub/i18n-cli": "^1.25.1",
"@modelcontextprotocol/sdk": "^1.24.3",
+1
View File
@@ -1,6 +1,7 @@
packages:
- '../cli'
- '../../packages/agent-gateway-client'
- '../../packages/heterogeneous-agents'
- '../../packages/const'
- '../../packages/electron-server-ipc'
- '../../packages/electron-client-ipc'
@@ -4,6 +4,7 @@ import { createHash, randomUUID } from 'node:crypto';
import { access, appendFile, mkdir, readFile, writeFile } from 'node:fs/promises';
import path from 'node:path';
import type { Readable, Writable } from 'node:stream';
import { finished as streamFinished } from 'node:stream/promises';
import type { HeterogeneousAgentSessionError } from '@lobechat/electron-client-ipc';
import {
@@ -13,14 +14,11 @@ import {
CODEX_CLI_INSTALL_DOCS_URL,
HeterogeneousAgentSessionErrorCode,
} from '@lobechat/electron-client-ipc';
import { AgentStreamPipeline } from '@lobechat/heterogeneous-agents/spawn';
import { app as electronApp, BrowserWindow } from 'electron';
import { getHeterogeneousAgentDriver } from '@/modules/heterogeneousAgent';
import { CodexFileChangeTracker } from '@/modules/heterogeneousAgent/codexFileChangeTracker';
import type {
HeterogeneousAgentImageAttachment,
HeterogeneousAgentParsedOutput,
} from '@/modules/heterogeneousAgent/types';
import type { HeterogeneousAgentImageAttachment } from '@/modules/heterogeneousAgent/types';
import { buildProxyEnv } from '@/modules/networkProxy/envBuilder';
import { detectHeterogeneousCliCommand } from '@/modules/toolDetectors';
import { createLogger } from '@/utils/logger';
@@ -91,6 +89,12 @@ interface StartSessionResult {
interface SendPromptParams {
/** Image attachments to include in the prompt (downloaded from url, cached by id) */
imageList?: HeterogeneousAgentImageAttachment[];
/**
* Renderer-side operation id stamped onto every emitted `AgentStreamEvent`.
* Required: producer-side conversion is the V3 contract — by the time events
* reach the renderer they must already carry the operation they belong to.
*/
operationId: string;
prompt: string;
sessionId: string;
}
@@ -148,7 +152,7 @@ interface CliTraceSession {
* prompt transport, resume semantics, and raw stream shape without turning
* this controller into a giant `switch`.
*
* Lifecycle: startSession → sendPrompt → (heteroAgentRawLine broadcasts) → stopSession
* Lifecycle: startSession → sendPrompt → (heteroAgentEvent broadcasts) → stopSession
*/
export default class HeterogeneousAgentCtr extends ControllerModule {
static override readonly groupName = 'heterogeneousAgent';
@@ -780,8 +784,9 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
/**
* Send a prompt to an agent session.
*
* Spawns the CLI process with preset flags. Broadcasts each stdout line
* as an `heteroAgentRawLine` event — Renderer side parses and adapts.
* Spawns the CLI process with preset flags. Pipes each stdout chunk through
* the shared `AgentStreamPipeline` (JSONL → adapter → toStreamEvent) and
* broadcasts the resulting `AgentStreamEvent`s on `heteroAgentEvent`.
*/
@IpcMethod()
async sendPrompt(params: SendPromptParams): Promise<void> {
@@ -853,42 +858,49 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
}
session.process = proc;
const streamProcessor = driver.createStreamProcessor();
const codexFileChangeTracker =
session.agentType === 'codex' ? new CodexFileChangeTracker() : undefined;
// Producer-side conversion (V3 contract): JSONL framing + adapter +
// toStreamEvent all run inside the shared pipeline, so renderer + future
// server `heteroIngest` see the same `AgentStreamEvent` wire shape with
// no per-consumer adapter. The pipeline auto-wires the Codex
// file-change line-stat tracker when `agentType === 'codex'`, so this
// controller stays agent-agnostic.
const pipeline = new AgentStreamPipeline({
agentType: session.agentType,
operationId: params.operationId,
});
let stdoutBroadcastQueue: Promise<void> = Promise.resolve();
const broadcastParsedOutputs = (parsedOutputs: HeterogeneousAgentParsedOutput[]) => {
const broadcastPipelineBatch = (produce: () => ReturnType<AgentStreamPipeline['push']>) => {
stdoutBroadcastQueue = stdoutBroadcastQueue
.then(async () => {
for (const parsedOutput of parsedOutputs) {
if (parsedOutput.agentSessionId) {
session.agentSessionId = parsedOutput.agentSessionId;
}
const line = codexFileChangeTracker
? await codexFileChangeTracker.track(parsedOutput.payload)
: parsedOutput.payload;
this.broadcast('heteroAgentRawLine', {
line,
const events = await produce();
// Adapter-extracted CC/Codex session id powers `--resume` on the
// next prompt; surface it through the existing `getSessionInfo`
// IPC by mirroring the freshest value onto the session record.
if (pipeline.sessionId && pipeline.sessionId !== session.agentSessionId) {
session.agentSessionId = pipeline.sessionId;
}
for (const event of events) {
this.broadcast('heteroAgentEvent', {
event,
sessionId: session.sessionId,
});
}
})
.catch((error) => {
logger.error('Failed to broadcast parsed agent output:', error);
logger.error('Failed to broadcast agent stream batch:', error);
});
};
// Stream stdout events as raw provider payloads to Renderer.
// Stream stdout events through the producer pipeline.
const stdout = proc.stdout as Readable;
stdout.on('data', (chunk: Buffer) => {
void this.appendCliTraceFile(traceSession, 'stdout.jsonl', chunk);
broadcastParsedOutputs(streamProcessor.push(chunk));
broadcastPipelineBatch(() => pipeline.push(chunk));
});
stdout.on('end', () => {
broadcastParsedOutputs(streamProcessor.flush());
broadcastPipelineBatch(() => pipeline.flush());
});
// Capture stderr
@@ -915,44 +927,59 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
});
proc.on('exit', (code, signal) => {
void stdoutBroadcastQueue.finally(async () => {
void this.writeCliTraceJson(traceSession, 'exit.json', {
code,
finishedAt: new Date().toISOString(),
signal,
});
await this.flushCliTrace(traceSession);
logger.info('Agent process exited:', { code, sessionId: session.sessionId, signal });
session.process = undefined;
// If *we* killed it (cancel / stop / before-quit), treat the non-zero
// exit as a clean shutdown — surfacing it as an error would make a
// user-initiated cancel look like an agent failure, and an Electron
// shutdown affecting OTHER running CC sessions would pollute their
// topics with a misleading "Agent exited with code 143" message.
if (session.cancelledByUs) {
this.broadcast('heteroAgentSessionComplete', { sessionId: session.sessionId });
resolve();
return;
}
if (code === 0) {
this.broadcast('heteroAgentSessionComplete', { sessionId: session.sessionId });
resolve();
} else {
const stderrOutput = stderrChunks.join('').trim();
const errorMsg = this.getExitErrorMessage(code, session, stderrOutput);
const sessionError = this.getSessionErrorPayload(errorMsg, session);
this.broadcast('heteroAgentSessionError', {
error: sessionError,
sessionId: session.sessionId,
});
reject(
new Error(typeof sessionError === 'string' ? sessionError : sessionError.message),
);
}
// Node may emit `'exit'` BEFORE stdio finishes draining (documented:
// child_process docs note "stdio streams might still be open" at exit
// time). Wait for stdout to fully end/close so the `stdout.on('end')`
// handler has scheduled `pipeline.flush()` onto `stdoutBroadcastQueue`,
// THEN wait for the queue itself to settle. Without this two-step
// gate, trailing flushed events (final synthesized tool_end /
// tool_result) would race against — and lose to — the
// `heteroAgentSessionComplete` broadcast, leaving renderer-side
// persistence to finalize on incomplete state.
const stdoutDrained = streamFinished(stdout, { writable: false }).catch(() => {
/* end / close / error are all "done"; we still want to settle. */
});
void stdoutDrained
.then(() => stdoutBroadcastQueue)
.finally(async () => {
void this.writeCliTraceJson(traceSession, 'exit.json', {
code,
finishedAt: new Date().toISOString(),
signal,
});
await this.flushCliTrace(traceSession);
logger.info('Agent process exited:', { code, sessionId: session.sessionId, signal });
session.process = undefined;
// If *we* killed it (cancel / stop / before-quit), treat the non-zero
// exit as a clean shutdown — surfacing it as an error would make a
// user-initiated cancel look like an agent failure, and an Electron
// shutdown affecting OTHER running CC sessions would pollute their
// topics with a misleading "Agent exited with code 143" message.
if (session.cancelledByUs) {
this.broadcast('heteroAgentSessionComplete', { sessionId: session.sessionId });
resolve();
return;
}
if (code === 0) {
this.broadcast('heteroAgentSessionComplete', { sessionId: session.sessionId });
resolve();
} else {
const stderrOutput = stderrChunks.join('').trim();
const errorMsg = this.getExitErrorMessage(code, session, stderrOutput);
const sessionError = this.getSessionErrorPayload(errorMsg, session);
this.broadcast('heteroAgentSessionError', {
error: sessionError,
sessionId: session.sessionId,
});
reject(
new Error(typeof sessionError === 'string' ? sessionError : sessionError.message),
);
}
});
});
});
}
@@ -11,8 +11,12 @@ import HeterogeneousAgentCtr from '../HeterogeneousAgentCtr';
const FAKE_DESKTOP_PATH = '/Users/fake/Desktop';
const { mockGetAllWindows } = vi.hoisted(() => ({
mockGetAllWindows: vi.fn<() => any[]>(() => []),
}));
vi.mock('electron', () => ({
BrowserWindow: { getAllWindows: () => [] },
BrowserWindow: { getAllWindows: () => mockGetAllWindows() },
app: {
getPath: vi.fn((name: string) => (name === 'desktop' ? FAKE_DESKTOP_PATH : `/fake/${name}`)),
isPackaged: false,
@@ -199,7 +203,7 @@ describe('HeterogeneousAgentCtr', () => {
command: 'claude',
...sessionOverrides,
});
await ctr.sendPrompt({ prompt, sessionId, ...sendPromptOverrides });
await ctr.sendPrompt({ operationId: 'op-test', prompt, sessionId, ...sendPromptOverrides });
const { args: cliArgs, command, options } = spawnCalls[0];
return { cliArgs, command, ctr, options, sessionId, writes };
@@ -314,7 +318,7 @@ describe('HeterogeneousAgentCtr', () => {
command: 'codex',
...sessionOverrides,
});
await ctr.sendPrompt({ prompt, sessionId, ...sendPromptOverrides });
await ctr.sendPrompt({ operationId: 'op-test', prompt, sessionId, ...sendPromptOverrides });
const { args: cliArgs, command, options } = spawnCalls[0];
return { cliArgs, command, ctr, options, sessionId, writes };
@@ -332,9 +336,9 @@ describe('HeterogeneousAgentCtr', () => {
command: 'codex',
});
await expect(ctr.sendPrompt({ prompt: 'hello', sessionId })).rejects.toThrow(
'Codex CLI was not found',
);
await expect(
ctr.sendPrompt({ operationId: 'op-test', prompt: 'hello', sessionId }),
).rejects.toThrow('Codex CLI was not found');
expect(detect).toHaveBeenCalledWith('codex', true);
expect(spawnCalls).toHaveLength(0);
@@ -352,9 +356,9 @@ describe('HeterogeneousAgentCtr', () => {
command: 'claude',
});
await expect(ctr.sendPrompt({ prompt: 'hello', sessionId })).rejects.toThrow(
'Claude Code CLI was not found',
);
await expect(
ctr.sendPrompt({ operationId: 'op-test', prompt: 'hello', sessionId }),
).rejects.toThrow('Claude Code CLI was not found');
expect(detect).toHaveBeenCalledWith('claude', true);
expect(spawnCalls).toHaveLength(0);
@@ -390,9 +394,9 @@ describe('HeterogeneousAgentCtr', () => {
command: 'claude-alt',
});
await expect(ctr.sendPrompt({ prompt: 'hello', sessionId })).rejects.toThrow(
'Claude Code CLI was not found',
);
await expect(
ctr.sendPrompt({ operationId: 'op-test', prompt: 'hello', sessionId }),
).rejects.toThrow('Claude Code CLI was not found');
expect(detect).not.toHaveBeenCalled();
expect(spawnCalls).toHaveLength(0);
@@ -493,6 +497,7 @@ describe('HeterogeneousAgentCtr', () => {
await expect(
ctr.sendPrompt({
imageList,
operationId: 'op-test',
prompt: 'inspect the screenshots',
sessionId,
}),
@@ -526,9 +531,9 @@ describe('HeterogeneousAgentCtr', () => {
command: 'codex',
});
await expect(ctr.sendPrompt({ prompt: 'hello', sessionId })).rejects.toThrow(
'Agent exited with code 1',
);
await expect(
ctr.sendPrompt({ operationId: 'op-test', prompt: 'hello', sessionId }),
).rejects.toThrow('Agent exited with code 1');
});
it('uses codex exec resume syntax when continuing an existing thread', async () => {
@@ -672,4 +677,108 @@ describe('HeterogeneousAgentCtr', () => {
});
});
});
/**
* Node may emit `proc.on('exit')` BEFORE stdout fully drains (documented in
* child_process docs as "stdio streams might still be open"). The phase 0
* refactor moved adapter ownership to main, so renderer no longer flushes
* its own adapter on session-complete — meaning trailing events from
* `pipeline.flush()` (e.g. Codex's synthesized `tool_end` for unfinished
* tool calls) would race against — and lose to — the
* `heteroAgentSessionComplete` broadcast without an explicit gate.
*
* The fix in `proc.on('exit')` is to await stdout `'end'/'close'` (so the
* `stdout.on('end')` handler can schedule `pipeline.flush()` onto the
* broadcast queue), then drain the queue, then broadcast complete.
*/
describe('exit-before-end ordering (LOBE-8516 phase 0 race)', () => {
let broadcasts: Array<{ channel: string; data: any }>;
beforeEach(() => {
spawnCalls.length = 0;
execFileMock.mockReset();
broadcasts = [];
mockGetAllWindows.mockImplementation(() => [
{
isDestroyed: () => false,
webContents: {
send: (channel: string, data: any) => broadcasts.push({ channel, data }),
},
},
]);
});
afterEach(() => {
mockGetAllWindows.mockReset();
mockGetAllWindows.mockReturnValue([]);
});
it('delivers pipeline.flush() events BEFORE heteroAgentSessionComplete even when proc exit precedes stdout end', async () => {
// Codex `item.started` for a tool — adapter buffers it as a pending
// tool call. On flush, adapter synthesizes a trailing `tool_end`. This
// is exactly the kind of event the race would lose against complete.
const itemStarted = `${JSON.stringify({
item: {
aggregated_output: '',
command: 'echo hi',
id: 'cmd-1',
status: 'in_progress',
type: 'command_execution',
},
type: 'item.started',
})}\n`;
const threadStarted = `${JSON.stringify({ thread_id: 't1', type: 'thread.started' })}\n`;
const proc = new EventEmitter() as any;
const stdout = new PassThrough();
const stderr = new PassThrough();
proc.stdout = stdout;
proc.stderr = stderr;
proc.stdin = {
end: vi.fn(),
write: vi.fn((_chunk: any, cb?: () => void) => {
cb?.();
return true;
}),
};
proc.kill = vi.fn();
proc.killed = false;
proc.__start = () => {
setImmediate(() => {
stdout.write(threadStarted);
stdout.write(itemStarted);
stderr.end();
// ⚠️ Reproduce the documented Node race: emit exit BEFORE stdout
// ends. Without the streamFinished gate in the controller, the
// broadcast queue settles immediately (no flush queued yet) and
// complete fires before the trailing tool_end ever broadcasts.
proc.emit('exit', 0);
setImmediate(() => stdout.end());
});
};
nextFakeProc = proc;
const ctr = new HeterogeneousAgentCtr({
appStoragePath,
storeManager: { get: vi.fn() },
} as any);
const { sessionId } = await ctr.startSession({ agentType: 'codex', command: 'codex' });
await ctr.sendPrompt({ operationId: 'op-test', prompt: 'hello', sessionId });
const events = broadcasts.filter((b) => b.channel === 'heteroAgentEvent');
const completeIdx = broadcasts.findIndex((b) => b.channel === 'heteroAgentSessionComplete');
const lastEventIdx = broadcasts.findLastIndex((b) => b.channel === 'heteroAgentEvent');
expect(completeIdx).toBeGreaterThan(-1);
expect(events.length).toBeGreaterThan(0);
// Every stream event must land before complete — no trailing events
// sneak in after the renderer has been told the session is done.
expect(lastEventIdx).toBeLessThan(completeIdx);
// Specifically: the synthesized tool_end for the pending command
// execution (emitted only by adapter.flush()) is in the broadcast.
const toolEnds = events.filter((b) => (b.data as any)?.event?.type === 'tool_end');
expect(toolEnds.length).toBeGreaterThan(0);
});
});
});
@@ -1,4 +1,3 @@
import { JsonlStreamProcessor } from '../jsonlProcessor';
import type { HeterogeneousAgentBuildPlanParams, HeterogeneousAgentDriver } from '../types';
const CLAUDE_CODE_BASE_ARGS = [
@@ -32,10 +31,4 @@ export const claudeCodeDriver: HeterogeneousAgentDriver = {
stdinPayload,
};
},
createStreamProcessor() {
return new JsonlStreamProcessor({
extractSessionId: (payload) =>
payload?.type === 'system' && payload?.subtype === 'init' ? payload?.session_id : undefined,
});
},
};
@@ -1,4 +1,3 @@
import { JsonlStreamProcessor } from '../jsonlProcessor';
import type { HeterogeneousAgentBuildPlanParams, HeterogeneousAgentDriver } from '../types';
const CODEX_REQUIRED_ARGS = ['--json', '--skip-git-repo-check'] as const;
@@ -41,10 +40,4 @@ export const codexDriver: HeterogeneousAgentDriver = {
stdinPayload: prompt,
};
},
createStreamProcessor() {
return new JsonlStreamProcessor({
extractSessionId: (payload) =>
payload?.type === 'thread.started' ? payload?.thread_id : undefined,
});
},
};
@@ -1,61 +0,0 @@
import type { HeterogeneousAgentParsedOutput, HeterogeneousAgentStreamProcessor } from './types';
export interface JsonlProcessorOptions {
extractSessionId?: (payload: any) => string | undefined;
}
/**
* Parses stdout as JSONL / NDJSON while tolerating non-JSON noise lines.
* Different CLIs still end up sharing this framing logic even when the
* payload schema differs.
*/
export class JsonlStreamProcessor implements HeterogeneousAgentStreamProcessor {
private buffer = '';
constructor(private readonly options: JsonlProcessorOptions = {}) {}
push(chunk: Buffer | string): HeterogeneousAgentParsedOutput[] {
this.buffer += chunk instanceof Buffer ? chunk.toString('utf8') : chunk;
return this.drainCompleteLines();
}
flush(): HeterogeneousAgentParsedOutput[] {
const trailing = this.buffer.trim();
this.buffer = '';
if (!trailing) return [];
try {
return [this.toParsedOutput(JSON.parse(trailing))];
} catch {
return [];
}
}
private drainCompleteLines(): HeterogeneousAgentParsedOutput[] {
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || '';
const parsed: HeterogeneousAgentParsedOutput[] = [];
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
parsed.push(this.toParsedOutput(JSON.parse(trimmed)));
} catch {
// Ignore non-JSON stdout noise.
}
}
return parsed;
}
private toParsedOutput(payload: any): HeterogeneousAgentParsedOutput {
return {
agentSessionId: this.options.extractSessionId?.(payload),
payload,
};
}
}
@@ -24,19 +24,13 @@ export interface HeterogeneousAgentBuildPlanParams {
resumeSessionId?: string;
}
export interface HeterogeneousAgentParsedOutput {
agentSessionId?: string;
payload: any;
}
export interface HeterogeneousAgentStreamProcessor {
flush: () => HeterogeneousAgentParsedOutput[];
push: (chunk: Buffer | string) => HeterogeneousAgentParsedOutput[];
}
/**
* Per-agent CLI flag composition + stdin shape. Stream framing is no longer the
* driver's concern — `AgentStreamPipeline` (`@lobechat/heterogeneous-agents/spawn`)
* runs JSONL parsing + adapter conversion uniformly for every agent type.
*/
export interface HeterogeneousAgentDriver {
buildSpawnPlan: (
params: HeterogeneousAgentBuildPlanParams,
) => Promise<HeterogeneousAgentBuildPlan>;
createStreamProcessor: () => HeterogeneousAgentStreamProcessor;
}
@@ -10,6 +10,12 @@ export type AgentStreamEventType =
| 'tool_start'
| 'tool_end'
| 'tool_execute'
/**
* Producer-side tool result content (heterogeneous CLI agents emit this
* separately from `tool_end`; gateway-driven runs do not). Kept in the
* wire union so consumers can pattern-match without casting.
*/
| 'tool_result'
| 'step_start'
| 'step_complete'
| 'error';
+5 -3
View File
@@ -4,7 +4,8 @@
"private": true,
"exports": {
".": "./src/index.ts",
"./client": "./src/client/index.ts"
"./client": "./src/client/index.ts",
"./spawn": "./src/spawn/index.ts"
},
"main": "./src/index.ts",
"scripts": {
@@ -12,8 +13,9 @@
"test:coverage": "vitest --coverage --silent='passed-only'"
},
"dependencies": {
"@lobechat/builtin-tool-claude-code": "workspace:*",
"@lobehub/icons": "^5.4.0"
"@lobechat/agent-gateway-client": "workspace:*",
"@lobehub/icons": "^5.4.0",
"diff": "^8.0.4"
},
"devDependencies": {
"@lobechat/types": "workspace:*"
@@ -34,12 +34,6 @@
* - `tool_result` blocks are in `type: 'user'` events, not assistant events
*/
import {
ClaudeCodeApiName,
type ClaudeCodeTodoItem,
type TodoWriteArgs,
} from '@lobechat/builtin-tool-claude-code';
import type {
AgentCLIPreset,
AgentEventAdapter,
@@ -53,6 +47,35 @@ import type {
UsageData,
} from '../types';
/**
* The CC tool_use `name` we synthesize `pluginState.todos` for. Inlined here
* (rather than imported from `@lobechat/builtin-tool-claude-code`) to keep
* the adapter package free of UI-tool-package coupling — the canonical
* `ClaudeCodeApiName` enum still lives in `@lobechat/builtin-tool-claude-code`
* for renderer / inspector / streaming consumers, but those packages are
* downstream of the adapter, not upstream.
*
* The string is upstream wire data emitted by `claude` itself, so a change
* would require both sides (adapter + downstream renderers) to update
* regardless of whether they share a constant.
*/
const CC_TODO_WRITE_TOOL_NAME = 'TodoWrite';
/** Status of a single todo item in CC's `TodoWrite` tool_use. */
type ClaudeCodeTodoStatus = 'pending' | 'in_progress' | 'completed';
interface ClaudeCodeTodoItem {
/** Present-continuous form, shown while the item is in progress. */
activeForm: string;
/** Imperative description, shown in pending & completed states. */
content: string;
status: ClaudeCodeTodoStatus;
}
interface TodoWriteArgs {
todos: ClaudeCodeTodoItem[];
}
const CLAUDE_CODE_CLI_INSTALL_DOCS_URL = 'https://docs.anthropic.com/en/docs/claude-code/setup';
const CLI_AUTH_REQUIRED_PATTERNS = [
@@ -405,7 +428,7 @@ export class ClaudeCodeAdapter implements AgentEventAdapter {
// used (`Task`, `Agent`, etc.). Non-spawn tools occupy a tiny
// amount of memory and get pruned naturally when the run ends.
if (block.input) this.mainToolInputsById.set(block.id, block.input);
if (block.name === ClaudeCodeApiName.TodoWrite && block.input) {
if (block.name === CC_TODO_WRITE_TOOL_NAME && block.input) {
this.todoWriteInputs.set(block.id, block.input as TodoWriteArgs);
}
break;
@@ -492,7 +515,7 @@ export class ClaudeCodeAdapter implements AgentEventAdapter {
type: 'default',
});
this.pendingToolCalls.add(block.id);
if (block.name === ClaudeCodeApiName.TodoWrite && block.input) {
if (block.name === CC_TODO_WRITE_TOOL_NAME && block.input) {
this.todoWriteInputs.set(block.id, block.input as TodoWriteArgs);
}
break;
@@ -0,0 +1,84 @@
import { describe, expect, it } from 'vitest';
import { AgentStreamPipeline } from './agentStreamPipeline';
const init = (sessionId = 'cc-1') =>
`${JSON.stringify({
model: 'claude-sonnet-4-6',
session_id: sessionId,
subtype: 'init',
type: 'system',
})}\n`;
const ccText = (msgId: string, text: string) =>
`${JSON.stringify({
message: {
content: [{ text, type: 'text' }],
id: msgId,
model: 'claude-sonnet-4-6',
role: 'assistant',
},
type: 'assistant',
})}\n`;
describe('AgentStreamPipeline', () => {
it('runs JSONL → adapter → toStreamEvent and stamps operationId', async () => {
const pipeline = new AgentStreamPipeline({
agentType: 'claude-code',
operationId: 'op-42',
});
const events = await pipeline.push(init() + ccText('msg_01', 'hello'));
expect(events.length).toBeGreaterThan(0);
for (const event of events) {
expect(event.operationId).toBe('op-42');
}
expect(pipeline.sessionId).toBe('cc-1');
});
it('exposes the adapter session id once the init event is parsed', async () => {
const pipeline = new AgentStreamPipeline({
agentType: 'claude-code',
operationId: 'op-1',
});
expect(pipeline.sessionId).toBeUndefined();
await pipeline.push(init('cc-99'));
expect(pipeline.sessionId).toBe('cc-99');
});
it('auto-wires the Codex file-change tracker for codex agents only', async () => {
// claude-code → no codex tracker, file_change payloads pass through untouched
const claude = new AgentStreamPipeline({ agentType: 'claude-code', operationId: 'op-1' });
expect((claude as any).codexTracker).toBeUndefined();
// codex → tracker is instantiated automatically; consumers stay agent-agnostic
const codex = new AgentStreamPipeline({ agentType: 'codex', operationId: 'op-1' });
expect((codex as any).codexTracker).toBeDefined();
});
it('drops non-JSON noise lines instead of throwing', async () => {
const pipeline = new AgentStreamPipeline({
agentType: 'claude-code',
operationId: 'op-1',
});
const events = await pipeline.push(`not-json-line\n${init()}`);
expect(pipeline.sessionId).toBe('cc-1');
expect(events.length).toBeGreaterThan(0);
});
it('flushes adapter-buffered events on stream end', async () => {
const pipeline = new AgentStreamPipeline({
agentType: 'claude-code',
operationId: 'op-1',
});
await pipeline.push(init());
const flushed = await pipeline.flush();
expect(Array.isArray(flushed)).toBe(true);
});
});
@@ -0,0 +1,76 @@
import type { AgentStreamEvent } from '@lobechat/agent-gateway-client';
import { createAdapter } from '../registry';
import type { AgentEventAdapter } from '../types';
import { CodexFileChangeTracker } from './codexFileChangeTracker';
import { JsonlStreamProcessor } from './jsonlProcessor';
import { toStreamEvent } from './streamEvent';
export interface AgentStreamPipelineOptions {
/** Agent type key (e.g. `claude-code`, `codex`). */
agentType: string;
/** Operation id to stamp onto every emitted `AgentStreamEvent`. */
operationId: string;
}
/**
* Producer-side pipeline that converts CLI stdout chunks into
* `AgentStreamEvent` batches. Composes the building blocks the
* heterogeneous-agent contract requires:
*
* stdout chunk → JsonlStreamProcessor → (codex tracker, if applicable) → adapter → toStreamEvent
*
* Both the desktop main process and the future `lh hetero exec` CLI feed
* stdout into this pipeline so consumers (renderer / server) only ever see a
* single, unified wire shape. Codex's file-change line-stat enrichment is
* baked in here so consumers don't need to know it exists.
*/
export class AgentStreamPipeline {
private readonly processor = new JsonlStreamProcessor();
private readonly adapter: AgentEventAdapter;
private readonly operationId: string;
private readonly codexTracker?: CodexFileChangeTracker;
constructor(options: AgentStreamPipelineOptions) {
this.adapter = createAdapter(options.agentType);
this.operationId = options.operationId;
this.codexTracker = options.agentType === 'codex' ? new CodexFileChangeTracker() : undefined;
}
/** CC/Codex session id extracted by the underlying adapter (`adapter.sessionId`). */
get sessionId(): string | undefined {
return this.adapter.sessionId;
}
/**
* Push a stdout chunk through the pipeline. Resolves with the resulting
* `AgentStreamEvent` batch in arrival order. Async because the codex
* tracker reads pre-edit file snapshots from disk for diff stats.
*/
async push(chunk: Buffer | string): Promise<AgentStreamEvent[]> {
return this.processPayloads(this.processor.push(chunk));
}
/**
* Drain any trailing buffered line + flush adapter-buffered events. Call
* when the upstream stdout stream emits `end`.
*/
async flush(): Promise<AgentStreamEvent[]> {
const trailing = await this.processPayloads(this.processor.flush());
const flushed = this.adapter.flush().map((event) => toStreamEvent(event, this.operationId));
return [...trailing, ...flushed];
}
private async processPayloads(payloads: unknown[]): Promise<AgentStreamEvent[]> {
const out: AgentStreamEvent[] = [];
for (const raw of payloads) {
const payload = this.codexTracker ? await this.codexTracker.track(raw as any) : raw;
for (const event of this.adapter.adapt(payload)) {
out.push(toStreamEvent(event, this.operationId));
}
}
return out;
}
}
@@ -23,6 +23,10 @@ interface CodexFileChangePayload {
type?: string;
}
type CodexFileChangePayloadWithId = CodexFileChangePayload & {
item: CodexFileChangeItem & { id: string };
};
interface CodexFileChangeLineStats {
linesAdded: number;
linesDeleted: number;
@@ -36,7 +40,7 @@ interface CodexTrackedFileChangeItem extends CodexFileChangeItem, CodexFileChang
const isCodexFileChangePayload = (
payload: CodexFileChangePayload,
): payload is Required<CodexFileChangePayload> =>
): payload is CodexFileChangePayloadWithId =>
payload?.item?.type === 'file_change' && !!payload.item.id;
const readTextFileSnapshot = async (filePath: string): Promise<CodexFileChangeSnapshot> => {
@@ -0,0 +1,19 @@
/**
* Producer-side helpers for converting external agent CLI output into the
* unified `AgentStreamEvent` wire shape. Imported by:
* - Electron main (`HeterogeneousAgentCtr`) — desktop CC / Codex flow
* - The future `lh hetero exec` CLI — sandbox + terminal flow (LOBE-8516)
*
* Consumers (renderer executor, server `heteroIngest` handler) never need to
* touch adapters — every event reaching them is already an `AgentStreamEvent`.
*
* `AgentStreamEvent` itself is re-exported here so producer-side callers
* (desktop main, CLI sandbox) only depend on this package, not on
* `@lobechat/agent-gateway-client` (which is a browser-side WebSocket client
* that producers have no business pulling in).
*/
export { AgentStreamPipeline, type AgentStreamPipelineOptions } from './agentStreamPipeline';
export { CodexFileChangeTracker } from './codexFileChangeTracker';
export { JsonlStreamProcessor } from './jsonlProcessor';
export { toStreamEvent } from './streamEvent';
export type { AgentStreamEvent, AgentStreamEventType } from '@lobechat/agent-gateway-client';
@@ -0,0 +1,46 @@
/**
* Parses stdout as JSONL / NDJSON while tolerating non-JSON noise lines.
* Different CLIs still end up sharing this framing logic even when the
* payload schema differs.
*/
export class JsonlStreamProcessor {
private buffer = '';
push(chunk: Buffer | string): unknown[] {
this.buffer += chunk instanceof Buffer ? chunk.toString('utf8') : chunk;
return this.drainCompleteLines();
}
flush(): unknown[] {
const trailing = this.buffer.trim();
this.buffer = '';
if (!trailing) return [];
try {
return [JSON.parse(trailing)];
} catch {
return [];
}
}
private drainCompleteLines(): unknown[] {
const lines = this.buffer.split('\n');
this.buffer = lines.pop() || '';
const parsed: unknown[] = [];
for (const line of lines) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
parsed.push(JSON.parse(trimmed));
} catch {
// Ignore non-JSON stdout noise.
}
}
return parsed;
}
}
@@ -0,0 +1,20 @@
import type { AgentStreamEvent } from '@lobechat/agent-gateway-client';
import type { HeterogeneousAgentEvent } from '../types';
/**
* Stamp an `operationId` onto an adapter-emitted `HeterogeneousAgentEvent` to
* produce an `AgentStreamEvent` — the wire shape the gateway handler and the
* server's `StreamEventManager.publish` both consume. Producer-side conversion
* keeps consumer code (renderer, server handler) free of any adapter awareness.
*/
export const toStreamEvent = (
event: HeterogeneousAgentEvent,
operationId: string,
): AgentStreamEvent => ({
data: event.data,
operationId,
stepIndex: event.stepIndex,
timestamp: event.timestamp,
type: event.type,
});
@@ -32,7 +32,7 @@ const styles = createStaticStyles(({ css, cssVar }) => ({
padding-block: 8px 10px;
padding-inline: 12px;
border: 1px solid ${cssVar.colorBorderSecondary};
border: 1px solid ${cssVar.colorFillSecondary};
border-block-end: none;
border-start-start-radius: 12px;
border-start-end-radius: 12px;
+2 -1
View File
@@ -22,9 +22,10 @@ class HeterogeneousAgentService {
async sendPrompt(
sessionId: string,
prompt: string,
operationId: string,
imageList?: Array<{ id: string; url: string }>,
) {
return this.ipc.heterogeneousAgent.sendPrompt({ imageList, prompt, sessionId });
return this.ipc.heterogeneousAgent.sendPrompt({ imageList, operationId, prompt, sessionId });
}
async cancelSession(sessionId: string) {
@@ -12,6 +12,8 @@
import path from 'node:path';
import { HeterogeneousAgentSessionErrorCode } from '@lobechat/electron-client-ipc';
import type { AgentEventAdapter } from '@lobechat/heterogeneous-agents';
import { createAdapter } from '@lobechat/heterogeneous-agents';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { createGatewayEventHandler } from '../gatewayEventHandler';
@@ -81,14 +83,60 @@ function setupIpcCapture() {
},
};
// After subscribeBroadcasts is called, extract the callbacks
// by intercepting the IPC .on() calls
/**
* Per-IPC-session adapter — mimics the desktop main pipeline:
* raw stdout JSON → adapter.adapt() → AgentStreamEvent → broadcast.
* Test fixtures still feed raw CC/Codex events, so the existing ~2.8k lines
* of stream-shape tests stay intact while the renderer's input boundary
* becomes the new `heteroAgentEvent` channel.
*/
const adapters = new Map<string, AgentEventAdapter>();
/**
* IPC-session → agent type. Defaults to `claude-code` so tests that don't
* explicitly register codex still work; the multi-session resume test (and
* any codex-only suite) registers explicitly via `setAgentType`.
*/
const sessionAgentType = new Map<string, string>();
const getAdapter = (sessionId: string) => {
if (!adapters.has(sessionId)) {
adapters.set(sessionId, createAdapter(sessionAgentType.get(sessionId) ?? 'claude-code'));
}
return adapters.get(sessionId)!;
};
return {
getListeners: () => listeners,
/** Simulate a raw line broadcast from Electron main */
emitRawLine: (sessionId: string, line: any) => {
const handler = listeners.get('heteroAgentRawLine');
handler?.(null, { line, sessionId });
/** Register the agent type for an IPC session before emitting raw events. */
setAgentType: (sessionId: string, type: string) => {
sessionAgentType.set(sessionId, type);
},
/**
* Look up the underlying adapter for an IPC session — used by the
* `getSessionInfo` mock to mirror what main's `AgentStreamPipeline.sessionId`
* returns to the renderer's post-prompt session-id sync.
*/
getAdapterSessionId: (sessionId: string) => adapters.get(sessionId)?.sessionId,
/**
* Simulate the desktop main's per-stdout-line forwarding: feed `raw`
* through the session's adapter, then broadcast each resulting
* `AgentStreamEvent` over the `heteroAgentEvent` channel.
*/
emitRawLine: (sessionId: string, raw: any) => {
const handler = listeners.get('heteroAgentEvent');
const adapter = getAdapter(sessionId);
for (const event of adapter.adapt(raw)) {
handler?.(null, {
event: {
data: event.data,
operationId: defaultParams.operationId,
stepIndex: event.stepIndex,
timestamp: event.timestamp,
type: event.type,
},
sessionId,
});
}
},
/** Simulate session completion */
emitComplete: (sessionId: string) => {
@@ -322,10 +370,25 @@ describe('heterogeneousAgentExecutor DB persistence', () => {
beforeEach(() => {
vi.clearAllMocks();
ipc = setupIpcCapture();
mockStartSession.mockResolvedValue({ sessionId: 'ipc-sess-1' });
// Register the IPC session's agent type from the params the executor
// hands to startSession, so the helper picks the right adapter when the
// test starts emitting raw events.
mockStartSession.mockImplementation(async (params: any) => {
ipc.setAgentType('ipc-sess-1', params.agentType ?? 'claude-code');
return { sessionId: 'ipc-sess-1' };
});
mockSendPrompt.mockResolvedValue(undefined);
mockStopSession.mockResolvedValue(undefined);
mockGetSessionInfo.mockResolvedValue({ agentSessionId: 'cc-sess-1' });
// Mirror the desktop main: `getSessionInfo` returns whatever the producer
// pipeline's adapter has extracted from the JSONL stream so far. Tests
// that never emit an init / thread.started event get `agentSessionId:
// undefined`, matching pre-Phase-0 behavior where the renderer-side
// adapter never observed one either. The renderer service hands the raw
// sessionId string straight to the mock — it's the underlying IPC handler
// that wraps `{ sessionId }`, and that's stubbed here.
mockGetSessionInfo.mockImplementation(async (sessionId: string) => ({
agentSessionId: ipc.getAdapterSessionId(sessionId),
}));
mockGetMessages.mockResolvedValue([]);
mockCreateMessage.mockImplementation(async (params: any) => ({
id: `created-${params.role}-${Date.now()}-${Math.random().toString(36).slice(2, 6)}`,
@@ -1111,7 +1174,7 @@ describe('heterogeneousAgentExecutor DB persistence', () => {
imageList,
});
expect(mockSendPrompt).toHaveBeenCalledWith('ipc-sess-1', 'test prompt', imageList);
expect(mockSendPrompt).toHaveBeenCalledWith('ipc-sess-1', 'test prompt', 'op-1', imageList);
});
it('should clear stale resume metadata and retry once without resume for recoverable Codex errors', async () => {
@@ -1122,9 +1185,17 @@ describe('heterogeneousAgentExecutor DB persistence', () => {
{ reject: (reason?: unknown) => void; resolve: () => void }
>();
mockStartSession
.mockResolvedValueOnce({ sessionId: 'ipc-sess-1' })
.mockResolvedValueOnce({ sessionId: 'ipc-sess-2' });
// Both spawned IPC sessions are codex; register so the helper's adapter
// pipeline yields the right shape when the test emits raw codex events.
ipc.setAgentType('ipc-sess-1', 'codex');
ipc.setAgentType('ipc-sess-2', 'codex');
let startCount = 0;
mockStartSession.mockImplementation(async (params: any) => {
startCount += 1;
const sid = startCount === 1 ? 'ipc-sess-1' : 'ipc-sess-2';
ipc.setAgentType(sid, params.agentType ?? 'claude-code');
return { sessionId: sid };
});
mockSendPrompt.mockImplementation(
(sessionId: string) =>
new Promise<void>((resolve, reject) => {
@@ -6,12 +6,7 @@ import {
type HeterogeneousAgentSessionError,
HeterogeneousAgentSessionErrorCode,
} from '@lobechat/electron-client-ipc';
import type {
HeterogeneousAgentEvent,
SubagentEventContext,
ToolCallPayload,
} from '@lobechat/heterogeneous-agents';
import { createAdapter } from '@lobechat/heterogeneous-agents';
import type { SubagentEventContext, ToolCallPayload } from '@lobechat/heterogeneous-agents';
import type {
ChatMessageError,
ChatToolPayload,
@@ -218,34 +213,26 @@ const resolveAdapterType = (config: HeterogeneousProviderConfig): string => {
};
/**
* Convert HeterogeneousAgentEvent to AgentStreamEvent (add operationId).
*/
const toStreamEvent = (event: HeterogeneousAgentEvent, operationId: string): AgentStreamEvent => ({
data: event.data,
operationId,
stepIndex: event.stepIndex,
timestamp: event.timestamp,
type: event.type as AgentStreamEvent['type'],
});
/**
* Subscribe to Electron IPC broadcasts for raw agent lines.
* Returns unsubscribe function.
* Subscribe to Electron IPC broadcasts. As of LOBE-8516 phase 0, the main
* process runs JSONL framing + adapter conversion + `toStreamEvent` itself
* (`AgentStreamPipeline` from `@lobechat/heterogeneous-agents/spawn`), so the
* renderer receives ready-made `AgentStreamEvent`s with no per-event adapter
* step. Returns unsubscribe function.
*/
const subscribeBroadcasts = (
sessionId: string,
callbacks: {
onComplete: () => void;
onError: (error: HeterogeneousAgentSessionError | string) => void;
onRawLine: (line: any) => void;
onStreamEvent: (event: AgentStreamEvent) => void;
},
): (() => void) => {
if (!window.electron?.ipcRenderer) return () => {};
const ipc = window.electron.ipcRenderer;
const onLine = (_e: any, data: { line: any; sessionId: string }) => {
if (data.sessionId === sessionId) callbacks.onRawLine(data.line);
const onStreamEvent = (_e: any, data: { event: AgentStreamEvent; sessionId: string }) => {
if (data.sessionId === sessionId) callbacks.onStreamEvent(data.event);
};
const onComplete = (_e: any, data: { sessionId: string }) => {
if (data.sessionId === sessionId) callbacks.onComplete();
@@ -257,12 +244,12 @@ const subscribeBroadcasts = (
if (data.sessionId === sessionId) callbacks.onError(data.error);
};
ipc.on('heteroAgentRawLine' as any, onLine);
ipc.on('heteroAgentEvent' as any, onStreamEvent);
ipc.on('heteroAgentSessionComplete' as any, onComplete);
ipc.on('heteroAgentSessionError' as any, onError);
return () => {
ipc.removeListener('heteroAgentRawLine' as any, onLine);
ipc.removeListener('heteroAgentEvent' as any, onStreamEvent);
ipc.removeListener('heteroAgentSessionComplete' as any, onComplete);
ipc.removeListener('heteroAgentSessionError' as any, onError);
};
@@ -1029,9 +1016,10 @@ const persistToolResult = async (
* Execute a prompt via an external agent CLI.
*
* Flow:
* 1. Subscribe to IPC broadcasts
* 1. Subscribe to IPC broadcasts (`heteroAgentEvent` carrying `AgentStreamEvent`)
* 2. Spawn agent process via heterogeneousAgentService
* 3. Raw stdout lines → Adapter → HeterogeneousAgentEvent AgentStreamEvent
* 3. Main runs JSONL framing + adapter + toStreamEvent (`AgentStreamPipeline`)
* so events arrive renderer-side already in the unified wire shape.
* 4. Feed AgentStreamEvents into createGatewayEventHandler (unified handler)
* 5. Tool messages created via messageService before emitting tool events
*/
@@ -1050,9 +1038,7 @@ export const executeHeterogeneousAgent = async (
workingDirectory,
} = params;
// Create adapter for this agent type
const adapterType = resolveAdapterType(heterogeneousProvider);
const adapter = createAdapter(adapterType);
// Create the unified event handler (same one Gateway uses)
const eventHandler = createGatewayEventHandler(get, {
@@ -1156,7 +1142,7 @@ export const executeHeterogeneousAgent = async (
* fetchAndReplaceMessages which would clobber our in-flight content
* writes with stale DB state. onComplete forwards after persistence.
*/
let deferredTerminalEvent: HeterogeneousAgentEvent | null = null;
let deferredTerminalEvent: AgentStreamEvent | null = null;
/**
* True while a step transition is in flight (stream_start queued but not yet
* forwarded to handler). Events that would normally be forwarded sync must
@@ -1314,356 +1300,344 @@ export const executeHeterogeneousAgent = async (
});
// ─── Debug tracing (dev only) ───
const trace: Array<{ adaptedEvents: any[]; rawLine: any; timestamp: number }> = [];
const trace: Array<{ event: AgentStreamEvent; timestamp: number }> = [];
if (typeof window !== 'undefined') {
(window as any).__HETERO_AGENT_TRACE = trace;
}
// Subscribe to broadcasts BEFORE sending prompt
unsubscribe = subscribeBroadcasts(agentSessionId, {
onRawLine: (line) => {
// Once the user cancels, drop any trailing events the CLI emits before
// exit so they don't leak into DB writes.
if (isAborted()) return;
const events = adapter.adapt(line);
/**
* Process a single `AgentStreamEvent` from main. As of LOBE-8516 phase 0,
* main runs the adapter and `toStreamEvent` itself, so each IPC arrival
* carries exactly one already-stamped `AgentStreamEvent` (no per-line
* batch). Per-event branches still mirror the pre-Phase-0 inner loop.
*/
const handleStreamEvent = (event: AgentStreamEvent) => {
// Once the user cancels, drop any trailing events the CLI emits before
// exit so they don't leak into DB writes.
if (isAborted()) return;
// Record for debugging
trace.push({
adaptedEvents: events.map((e) => ({ data: e.data, type: e.type })),
rawLine: line,
timestamp: Date.now(),
// Record for debugging
trace.push({ event, timestamp: Date.now() });
// ─── tool_result: update tool message content in DB (ACP-only) ───
if (event.type === 'tool_result') {
const { content, isError, pluginState, toolCallId } = event.data as {
content: string;
isError?: boolean;
pluginState?: Record<string, any>;
subagent?: SubagentEventContext;
toolCallId: string;
};
// Subagent vs main lookup is transparent — one global
// `toolMsgIdByCallId` map spans both scopes.
persistQueue = persistQueue.then(() =>
persistToolResult(
toolCallId,
content,
!!isError,
toolMsgIdByCallId,
context,
pluginState,
),
);
// Mirror the tool_result content into the owning subagent
// run's thread bucket so the in-thread tool bubble stops
// showing "loading" and renders the result the moment it
// arrives (main-topic fetchAndReplace does not refresh
// thread buckets, so without this the subagent UI would
// stay stuck on the spinner until the user re-opens the
// Thread). Lookup is deferred into the queue because the
// prior `persistSubagentToolChunk` that adds this toolCallId
// to the run's `persistedIds` is still pending when the
// tool_result event arrives.
persistQueue = persistQueue.then(() => {
const run = findRunByInnerToolCallId(toolCallId);
if (!run) return;
const toolMsgId = toolMsgIdByCallId.get(toolCallId);
if (!toolMsgId) return;
const update: Partial<UIChatMessage> = { content };
if (pluginState) (update as any).pluginState = pluginState;
if (isError) (update as any).pluginError = { message: content };
run.stream.update(toolMsgId, update);
});
// If this tool_result IS for a subagent's spawning tool_use
// (tool_result lands on the MAIN side but its toolCallId
// matches a subagent run's parent), the subagent run just
// ended — finalize so the terminal assistant with the
// authoritative result lands in DB before fetchAndReplace.
//
// The `subagentRuns.has` check is deferred INTO the queue so
// that any subagent tool_use/text chunks from earlier in the
// same stream batch — which register the run via
// `persistSubagent*Chunk` — have already drained. Checking
// synchronously here races with those writes and silently
// misses the run in pure-tools subagents (no preceding text
// event to force an earlier registration).
persistQueue = persistQueue.then(() => {
if (!subagentRuns.has(toolCallId)) return;
return finalizeSubagentRun({
completeSubOp: completeSubagentOp,
context,
parentToolCallId: toolCallId,
resultContent: content,
subagentRuns,
});
});
// Don't forward — the tool_end that follows triggers fetchAndReplaceMessages
// which reads the updated content from DB.
return;
}
// ─── step_complete with turn_metadata: persist per-step usage ───
// `turn_metadata.usage` is the per-turn delta (deduped by adapter per
// message.id) and already normalized to the MessageMetadata.usage
// shape — write it straight through to the current step's assistant
// message. Queue the write so it lands after any in-flight
// stream_start(newStep) that may still be swapping
// `currentAssistantMessageId` to the new step's message.
//
// `result_usage` (grand total across all turns) is intentionally
// ignored — applying it would overwrite the last step with the sum
// of all prior steps. Sum of turn_metadata equals result_usage for
// a healthy run.
if (event.type === 'step_complete' && event.data?.phase === 'turn_metadata') {
if (event.data.model) lastModel = event.data.model;
if (event.data.provider) lastProvider = event.data.provider;
const turnUsage = event.data.usage;
if (turnUsage) {
persistQueue = persistQueue.then(async () => {
await messageService
.updateMessage(
currentAssistantMessageId,
{ metadata: { usage: turnUsage } },
{ agentId: context.agentId, topicId: context.topicId },
)
.catch(console.error);
});
}
// Don't forward turn metadata — it's internal bookkeeping
return;
}
// ─── stream_start with newStep: new LLM turn, create new assistant message ───
if (event.type === 'stream_start' && event.data?.newStep) {
// ⚠️ Snapshot CONTENT accumulators synchronously — stream_chunk events for
// the new step arrive in the same stream batch and would contaminate.
// Tool state (toolMsgIdByCallId) is populated ASYNC by persistQueue, so
// it must be read inside the queue where previous persists have completed.
const prevContent = accumulatedContent;
const prevReasoning = accumulatedReasoning;
const prevModel = lastModel;
const prevProvider = lastProvider;
// Reset content accumulators synchronously so new-step chunks go to fresh state
accumulatedContent = '';
accumulatedReasoning = '';
// Mark that we're in a step transition. Events from the same stream
// batch (stream_chunk, tool_start, etc.) must be deferred through
// persistQueue so the handler receives stream_start FIRST — otherwise
// it dispatches tools to the OLD assistant (orphan tool bug).
pendingStepTransition = true;
persistQueue = persistQueue.then(async () => {
// Persist previous step's content to its assistant message
const prevUpdate: Record<string, any> = {};
if (prevContent) prevUpdate.content = prevContent;
if (prevReasoning) prevUpdate.reasoning = { content: prevReasoning };
if (prevModel) prevUpdate.model = prevModel;
if (prevProvider) prevUpdate.provider = prevProvider;
if (Object.keys(prevUpdate).length > 0) {
await messageService
.updateMessage(currentAssistantMessageId, prevUpdate, {
agentId: context.agentId,
topicId: context.topicId,
})
.catch(console.error);
}
// Create new assistant message for this step.
// parentId should point to the last tool message from the previous step
// (if any), forming the chain: assistant → tool → assistant → tool → ...
// If no tool was used, fall back to the previous assistant message.
//
// Read from `toolState.payloads` (not the global
// `toolMsgIdByCallId`) so we only pick up MAIN-agent tools —
// the global map also holds subagent tool msg ids which
// would break the main-agent step chain.
const lastToolMsgId = [...toolState.payloads]
.reverse()
.find((p) => !!p.result_msg_id)?.result_msg_id;
const stepParentId = lastToolMsgId || currentAssistantMessageId;
const newMsg = await messageService.createMessage({
agentId: context.agentId,
content: '',
model: lastModel,
parentId: stepParentId,
provider: lastProvider,
role: 'assistant',
topicId: context.topicId ?? undefined,
});
currentAssistantMessageId = newMsg.id;
// Associate the new message with the operation
get().associateMessageWithOperation(currentAssistantMessageId, operationId);
// Reset tool state AFTER reading — new-step tool persists are queued
// AFTER this handler, so they'll write to the clean state.
toolState.payloads = [];
toolState.persistedIds.clear();
// toolMsgIdByCallId is NOT cleared — it's the global
// id→row lookup and subagent tool_results from a previous
// step may still land after the step boundary.
});
for (const event of events) {
// ─── tool_result: update tool message content in DB (ACP-only) ───
if (event.type === 'tool_result') {
const { content, isError, pluginState, toolCallId } = event.data as {
content: string;
isError?: boolean;
pluginState?: Record<string, any>;
subagent?: SubagentEventContext;
toolCallId: string;
};
// Subagent vs main lookup is transparent — one global
// `toolMsgIdByCallId` map spans both scopes.
// Update the stream_start event to carry the new message ID
// so the gateway handler can switch to it
persistQueue = persistQueue.then(() => {
event.data.assistantMessage = { id: currentAssistantMessageId };
eventHandler(event);
// Step transition complete — handler has the new assistant ID now
pendingStepTransition = false;
});
return;
}
// ─── Defer terminal events so content writes complete first ───
// Gateway handler's agent_runtime_end/error triggers fetchAndReplaceMessages,
// which would read stale DB state (before we persist final content + usage).
if (event.type === 'agent_runtime_end' || event.type === 'error') {
deferredTerminalEvent = event;
return;
}
// ─── stream_chunk: accumulate content + persist tool_use ───
if (event.type === 'stream_chunk') {
const chunk = event.data;
const chunkSubagentCtx = chunk?.subagent as SubagentEventContext | undefined;
if (chunk?.chunkType === 'text' && chunk.content) {
if (chunkSubagentCtx) {
// Subagent text → accumulates on the run's in-thread
// assistant, NOT on the main assistant's content.
const mainAsstId = currentAssistantMessageId;
persistQueue = persistQueue.then(() =>
persistToolResult(
toolCallId,
content,
!!isError,
toolMsgIdByCallId,
persistSubagentTextChunk(
'text',
chunk.content,
chunkSubagentCtx,
mainAsstId,
context,
pluginState,
subagentRuns,
beginSubagentRun,
onSubagentThreadCreated,
),
);
// Mirror the tool_result content into the owning subagent
// run's thread bucket so the in-thread tool bubble stops
// showing "loading" and renders the result the moment it
// arrives (main-topic fetchAndReplace does not refresh
// thread buckets, so without this the subagent UI would
// stay stuck on the spinner until the user re-opens the
// Thread). Lookup is deferred into the queue because the
// prior `persistSubagentToolChunk` that adds this toolCallId
// to the run's `persistedIds` is still pending when the
// tool_result event arrives.
persistQueue = persistQueue.then(() => {
const run = findRunByInnerToolCallId(toolCallId);
if (!run) return;
const toolMsgId = toolMsgIdByCallId.get(toolCallId);
if (!toolMsgId) return;
const update: Partial<UIChatMessage> = { content };
if (pluginState) (update as any).pluginState = pluginState;
if (isError) (update as any).pluginError = { message: content };
run.stream.update(toolMsgId, update);
});
// If this tool_result IS for a subagent's spawning tool_use
// (tool_result lands on the MAIN side but its toolCallId
// matches a subagent run's parent), the subagent run just
// ended — finalize so the terminal assistant with the
// authoritative result lands in DB before fetchAndReplace.
//
// The `subagentRuns.has` check is deferred INTO the queue so
// that any subagent tool_use/text chunks from earlier in the
// same onRawLine batch — which register the run via
// `persistSubagent*Chunk` — have already drained. Checking
// synchronously here races with those writes and silently
// misses the run in pure-tools subagents (no preceding text
// event to force an earlier registration).
persistQueue = persistQueue.then(() => {
if (!subagentRuns.has(toolCallId)) return;
return finalizeSubagentRun({
completeSubOp: completeSubagentOp,
context,
parentToolCallId: toolCallId,
resultContent: content,
subagentRuns,
});
});
// Don't forward — the tool_end that follows triggers fetchAndReplaceMessages
// which reads the updated content from DB.
continue;
}
// ─── step_complete with turn_metadata: persist per-step usage ───
// `turn_metadata.usage` is the per-turn delta (deduped by adapter per
// message.id) and already normalized to the MessageMetadata.usage
// shape — write it straight through to the current step's assistant
// message. Queue the write so it lands after any in-flight
// stream_start(newStep) that may still be swapping
// `currentAssistantMessageId` to the new step's message.
//
// `result_usage` (grand total across all turns) is intentionally
// ignored — applying it would overwrite the last step with the sum
// of all prior steps. Sum of turn_metadata equals result_usage for
// a healthy run.
if (event.type === 'step_complete' && event.data?.phase === 'turn_metadata') {
if (event.data.model) lastModel = event.data.model;
if (event.data.provider) lastProvider = event.data.provider;
const turnUsage = event.data.usage;
if (turnUsage) {
persistQueue = persistQueue.then(async () => {
await messageService
.updateMessage(
currentAssistantMessageId,
{ metadata: { usage: turnUsage } },
{ agentId: context.agentId, topicId: context.topicId },
)
.catch(console.error);
});
}
// Don't forward turn metadata — it's internal bookkeeping
continue;
}
// ─── stream_start with newStep: new LLM turn, create new assistant message ───
if (event.type === 'stream_start' && event.data?.newStep) {
// ⚠️ Snapshot CONTENT accumulators synchronously — stream_chunk events for
// the new step arrive in the same onRawLine batch and would contaminate.
// Tool state (toolMsgIdByCallId) is populated ASYNC by persistQueue, so
// it must be read inside the queue where previous persists have completed.
const prevContent = accumulatedContent;
const prevReasoning = accumulatedReasoning;
const prevModel = lastModel;
const prevProvider = lastProvider;
// Reset content accumulators synchronously so new-step chunks go to fresh state
accumulatedContent = '';
accumulatedReasoning = '';
// Mark that we're in a step transition. Events from the same onRawLine
// batch (stream_chunk, tool_start, etc.) must be deferred through
// persistQueue so the handler receives stream_start FIRST — otherwise
// it dispatches tools to the OLD assistant (orphan tool bug).
pendingStepTransition = true;
persistQueue = persistQueue.then(async () => {
// Persist previous step's content to its assistant message
const prevUpdate: Record<string, any> = {};
if (prevContent) prevUpdate.content = prevContent;
if (prevReasoning) prevUpdate.reasoning = { content: prevReasoning };
if (prevModel) prevUpdate.model = prevModel;
if (prevProvider) prevUpdate.provider = prevProvider;
if (Object.keys(prevUpdate).length > 0) {
await messageService
.updateMessage(currentAssistantMessageId, prevUpdate, {
agentId: context.agentId,
topicId: context.topicId,
})
.catch(console.error);
}
// Create new assistant message for this step.
// parentId should point to the last tool message from the previous step
// (if any), forming the chain: assistant → tool → assistant → tool → ...
// If no tool was used, fall back to the previous assistant message.
//
// Read from `toolState.payloads` (not the global
// `toolMsgIdByCallId`) so we only pick up MAIN-agent tools —
// the global map also holds subagent tool msg ids which
// would break the main-agent step chain.
const lastToolMsgId = [...toolState.payloads]
.reverse()
.find((p) => !!p.result_msg_id)?.result_msg_id;
const stepParentId = lastToolMsgId || currentAssistantMessageId;
const newMsg = await messageService.createMessage({
agentId: context.agentId,
content: '',
model: lastModel,
parentId: stepParentId,
provider: lastProvider,
role: 'assistant',
topicId: context.topicId ?? undefined,
});
currentAssistantMessageId = newMsg.id;
// Associate the new message with the operation
get().associateMessageWithOperation(currentAssistantMessageId, operationId);
// Reset tool state AFTER reading — new-step tool persists are queued
// AFTER this handler, so they'll write to the clean state.
toolState.payloads = [];
toolState.persistedIds.clear();
// toolMsgIdByCallId is NOT cleared — it's the global
// id→row lookup and subagent tool_results from a previous
// step may still land after the step boundary.
});
// Update the stream_start event to carry the new message ID
// so the gateway handler can switch to it
persistQueue = persistQueue.then(() => {
event.data.assistantMessage = { id: currentAssistantMessageId };
eventHandler(toStreamEvent(event, operationId));
// Step transition complete — handler has the new assistant ID now
pendingStepTransition = false;
});
continue;
}
// ─── Defer terminal events so content writes complete first ───
// Gateway handler's agent_runtime_end/error triggers fetchAndReplaceMessages,
// which would read stale DB state (before we persist final content + usage).
if (event.type === 'agent_runtime_end' || event.type === 'error') {
deferredTerminalEvent = event;
continue;
}
// ─── stream_chunk: accumulate content + persist tool_use ───
if (event.type === 'stream_chunk') {
const chunk = event.data;
const chunkSubagentCtx = chunk?.subagent as SubagentEventContext | undefined;
if (chunk?.chunkType === 'text' && chunk.content) {
if (chunkSubagentCtx) {
// Subagent text → accumulates on the run's in-thread
// assistant, NOT on the main assistant's content.
const mainAsstId = currentAssistantMessageId;
persistQueue = persistQueue.then(() =>
persistSubagentTextChunk(
'text',
chunk.content,
chunkSubagentCtx,
mainAsstId,
context,
subagentRuns,
beginSubagentRun,
onSubagentThreadCreated,
),
);
} else {
accumulatedContent += chunk.content;
}
}
if (chunk?.chunkType === 'reasoning' && chunk.reasoning) {
if (chunkSubagentCtx) {
const mainAsstId = currentAssistantMessageId;
persistQueue = persistQueue.then(() =>
persistSubagentTextChunk(
'reasoning',
chunk.reasoning,
chunkSubagentCtx,
mainAsstId,
context,
subagentRuns,
beginSubagentRun,
onSubagentThreadCreated,
),
);
} else {
accumulatedReasoning += chunk.reasoning;
}
}
if (chunk?.chunkType === 'tools_calling') {
const tools = chunk.toolsCalling as ToolCallPayload[];
const subagentCtx = chunk.subagent as SubagentEventContext | undefined;
if (tools?.length) {
if (subagentCtx) {
// Subagent chunk → lazy-create Thread + in-thread
// assistant, then persist into that scope. Kept off the
// main path so main-agent snapshot logic stays untouched.
const mainAsstId = currentAssistantMessageId;
persistQueue = persistQueue.then(() =>
persistSubagentToolChunk(
tools,
subagentCtx,
mainAsstId,
context,
subagentRuns,
toolMsgIdByCallId,
beginSubagentRun,
onSubagentThreadCreated,
),
);
} else {
// Main-agent chunk — existing path.
// Snapshot accumulators sync — must travel with the
// same step's assistantMessageId. A late-bound getter
// would read NEW step's content if a step transition
// lands between scheduling and execution, while
// assistantMessageId would still be the OLD one (also
// captured sync) → cross-step contamination.
const snapshot = {
content: accumulatedContent,
reasoning: accumulatedReasoning,
};
persistQueue = persistQueue.then(() =>
persistToolBatch(
tools,
toolState,
currentAssistantMessageId,
context,
snapshot,
toolMsgIdByCallId,
),
);
}
}
}
}
// Subagent-tagged stream_chunks are persisted above via
// persistSubagent*Chunk into the in-thread assistant. The gateway
// handler is main-agent-only: forwarding would dispatch
// `updateMessage { tools }` onto `currentAssistantMessageId` (main),
// overwriting main.tools[] with subagent tools — main's own
// tool_use messages then lose their tools[] pairing and render
// as orphans until the next fetchAndReplaceMessages. Text /
// reasoning chunks similarly bleed subagent content into the
// main bubble. DB state is already correct (the subagent persist
// path writes to the thread scope), so dropping the forward
// keeps in-memory state aligned with DB.
if (event.type === 'stream_chunk' && (event.data as any)?.subagent) {
continue;
}
// Forward to the unified Gateway handler.
// If a step transition is pending, defer through persistQueue so the
// handler receives stream_start (with new assistant ID) FIRST.
if (pendingStepTransition) {
const snapshot = toStreamEvent(event, operationId);
persistQueue = persistQueue.then(() => {
eventHandler(snapshot);
});
} else {
eventHandler(toStreamEvent(event, operationId));
accumulatedContent += chunk.content;
}
}
},
if (chunk?.chunkType === 'reasoning' && chunk.reasoning) {
if (chunkSubagentCtx) {
const mainAsstId = currentAssistantMessageId;
persistQueue = persistQueue.then(() =>
persistSubagentTextChunk(
'reasoning',
chunk.reasoning,
chunkSubagentCtx,
mainAsstId,
context,
subagentRuns,
beginSubagentRun,
onSubagentThreadCreated,
),
);
} else {
accumulatedReasoning += chunk.reasoning;
}
}
if (chunk?.chunkType === 'tools_calling') {
const tools = chunk.toolsCalling as ToolCallPayload[];
const subagentCtx = chunk.subagent as SubagentEventContext | undefined;
if (tools?.length) {
if (subagentCtx) {
// Subagent chunk → lazy-create Thread + in-thread
// assistant, then persist into that scope. Kept off the
// main path so main-agent snapshot logic stays untouched.
const mainAsstId = currentAssistantMessageId;
persistQueue = persistQueue.then(() =>
persistSubagentToolChunk(
tools,
subagentCtx,
mainAsstId,
context,
subagentRuns,
toolMsgIdByCallId,
beginSubagentRun,
onSubagentThreadCreated,
),
);
} else {
// Main-agent chunk — existing path.
// Snapshot accumulators sync — must travel with the
// same step's assistantMessageId. A late-bound getter
// would read NEW step's content if a step transition
// lands between scheduling and execution, while
// assistantMessageId would still be the OLD one (also
// captured sync) → cross-step contamination.
const snapshot = {
content: accumulatedContent,
reasoning: accumulatedReasoning,
};
persistQueue = persistQueue.then(() =>
persistToolBatch(
tools,
toolState,
currentAssistantMessageId,
context,
snapshot,
toolMsgIdByCallId,
),
);
}
}
}
}
// Subagent-tagged stream_chunks are persisted above via
// persistSubagent*Chunk into the in-thread assistant. The gateway
// handler is main-agent-only: forwarding would dispatch
// `updateMessage { tools }` onto `currentAssistantMessageId` (main),
// overwriting main.tools[] with subagent tools — main's own
// tool_use messages then lose their tools[] pairing and render
// as orphans until the next fetchAndReplaceMessages. Text /
// reasoning chunks similarly bleed subagent content into the
// main bubble. DB state is already correct (the subagent persist
// path writes to the thread scope), so dropping the forward
// keeps in-memory state aligned with DB.
if (event.type === 'stream_chunk' && (event.data as any)?.subagent) {
return;
}
// Forward to the unified Gateway handler.
// If a step transition is pending, defer through persistQueue so the
// handler receives stream_start (with new assistant ID) FIRST.
if (pendingStepTransition) {
persistQueue = persistQueue.then(() => {
eventHandler(event);
});
} else {
eventHandler(event);
}
};
unsubscribe = subscribeBroadcasts(agentSessionId, {
onStreamEvent: handleStreamEvent,
onComplete: async () => {
if (completed) return;
completed = true;
// Flush remaining adapter state (e.g., still-open tool_end events — but
// NOT agent_runtime_end; that's deferred below)
const flushEvents = adapter.flush();
for (const event of flushEvents) {
if (event.type === 'agent_runtime_end' || event.type === 'error') {
deferredTerminalEvent = event;
continue;
}
eventHandler(toStreamEvent(event, operationId));
}
// Wait for all tool persistence to finish before writing final state
await persistQueue.catch(console.error);
@@ -1714,13 +1688,14 @@ export const executeHeterogeneousAgent = async (
} else {
// NOW forward the deferred terminal event — handler will fetchAndReplaceMessages
// and pick up the final persisted state.
const terminal = deferredTerminalEvent ?? {
const terminal: AgentStreamEvent = deferredTerminalEvent ?? {
data: {},
operationId,
stepIndex: 0,
timestamp: Date.now(),
type: 'agent_runtime_end' as const,
type: 'agent_runtime_end',
};
eventHandler(toStreamEvent(terminal, operationId));
eventHandler(terminal);
}
// Signal completion to the user — dock badge + (window-hidden) notification.
@@ -1778,7 +1753,7 @@ export const executeHeterogeneousAgent = async (
});
// Send the prompt — blocks until process exits
await heterogeneousAgentService.sendPrompt(agentSessionId, message, imageList);
await heterogeneousAgentService.sendPrompt(agentSessionId, message, operationId, imageList);
// Persist heterogeneous-agent session id + the cwd it was created under,
// for multi-turn resume. CC stores sessions per-cwd
@@ -1786,9 +1761,17 @@ export const executeHeterogeneousAgent = async (
// cwd hasn't changed before `--resume`. Reuses `workingDirectory` as the
// topic-level binding — pinning the topic to this cwd once the agent has
// executed here.
if (adapter.sessionId && context.topicId) {
//
// Source of truth shifted from renderer's adapter to main's pipeline as of
// LOBE-8516 phase 0; pull it back through the existing `getSessionInfo`
// IPC, which already returns the freshest `agentSessionId` main has
// mirrored from `pipeline.sessionId`.
const sessionInfo = await heterogeneousAgentService
.getSessionInfo(agentSessionId)
.catch(() => undefined);
if (sessionInfo?.agentSessionId && context.topicId) {
await updateTopicMetadata?.(context.topicId, {
heteroSessionId: adapter.sessionId,
heteroSessionId: sessionInfo.agentSessionId,
workingDirectory: workingDirectory ?? '',
});
}
@@ -1804,7 +1787,7 @@ export const executeHeterogeneousAgent = async (
// for the user to manage via QueueTray; "send now" = stop + send.
// Cast: TS narrows the closure-mutated `deferredTerminalEvent` back to
// `null` in linear flow (it can't see writes from the async IPC handler).
const terminalEvent = deferredTerminalEvent as HeterogeneousAgentEvent | null;
const terminalEvent = deferredTerminalEvent as AgentStreamEvent | null;
if (!isAborted() && terminalEvent?.type !== 'error') {
const contextKey = messageMapKey(context);
const remainingQueued = get().drainQueuedMessages?.(contextKey) ?? [];