Compare commits

...

11 Commits

Author SHA1 Message Date
Arvin Xu a6c2a57f54 feat(agent-runtime): support client-side tool dispatch via device gateway
- Add clientRuntime field to AgentToolsEngine to support tool dispatch from client
- Add clientRuntime type to BuiltinToolManifest for client-executable tools
- Update AgentToolsEngine to route clientRuntime tools through gateway sendToolExecute
- Add deviceToolPipeline tests for client-side tool execution flow
- Update aiAgent router/service to pass gateway context for client dispatch
- Update gateway store action to handle tool execution requests from server
2026-05-27 12:33:52 +08:00
Arvin Xu eedf46a11d ♻️ refactor(agent-runtime): route desktop callers through device-gateway (#15157)
Removes the Phase 6.4 `clientRuntime === 'desktop'` short-circuit so the
desktop UI, web UI, and IM/Bot callers all converge on a single tool
dispatch path: the device-gateway proxy to a registered device. The
Agent Gateway WS-back-to-caller mechanism is deprecated.

This is the second half of LOBE-9378. PR #15087 fixed the IM/Web
single-online-device auto-activate so `deviceSystemInfo` was fetched
and the `<user_context>` Mustache template substituted (`{{hostname}}`,
`{{workingDirectory}}`, `{{homePath}}`). But on cloud canary the desktop
Electron client took the Phase 6.4 branch instead — `lobe-local-system`
was enabled via `hasClientExecutor` and `executor:'client'` was stamped
on the manifest, bypassing both `activeDeviceId` resolution AND
`fetchDeviceSystemInfoForTemplate`. So `state.metadata.deviceSystemInfo`
stayed undefined and the literal `{{workingDirectory}}` reached the LLM
even after the LOBE-9378 fix shipped. With this refactor, the desktop
client registers with device-gateway like the CLI does, gets picked up
by `queryDeviceList`, auto-activates as the single online device, and
the existing template substitution kicks in unchanged.

Changes:
- AgentToolsEngine: drop `hasClientExecutor` / `clientRuntime` param.
  `platform` is now `hasDeviceProxy ? 'desktop' : 'web'`. LocalSystem
  enable rule is the single device-gateway path; RemoteDevice no longer
  has the `!hasClientExecutor` carve-out.
- aiAgent.execAgent: drop `clientRuntime` param. `shouldDispatchToClient`
  collapses to `!gatewayConfigured`, preserving the standalone-Electron
  path where there is no gateway and tools run in-process.
- tRPC input + shared types (`packages/types/src/agentExecution`,
  `src/services/aiAgent.ts`) drop the `clientRuntime` field.
- Store: stop sending `clientRuntime: isDesktop ? 'desktop' : 'web'`.
- Tests: remove the Phase 6.4 describe blocks and the
  `clientRuntime`-forwarding tests; add coverage that local-system /
  stdio MCP `executor` stays unset when the gateway is configured so
  routing goes through Remote Device.
- `executors` doc on builtin tool manifests rewritten to describe the
  remaining standalone path (no more "client dispatched via Agent
  Gateway WS").

The unrelated `clientRuntimeStart` / `clientRuntimeComplete` agent
signal source-types are about run lifecycle events, not request runtime,
and are untouched.

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 23:01:12 +08:00
YuTengjing ff61f4b3fa 💄 style: add Qwen3.7 Max locale (#15150) 2026-05-24 21:49:34 +08:00
Innei 192111840c 💄 style(workflow): normalize block spacing (#15169) 2026-05-24 20:17:30 +08:00
Arvin Xu 837a3daa58 feat(chat): consume gateway uiMessages snapshot as SoT at step boundaries (#15153)
* ♻️ refactor(chat-store): useFetchMessages accepts options object

LOBE-9501

Replace the positional `skipFetch?: boolean` second argument with an
`options?: { skipFetch?, revalidateOnFocus? }` object on both
`useChatStore.useFetchMessages` and `useConversationStore.useFetchMessages`.
Plumb `revalidateOnFocus` through to the underlying SWR config so callers
can suppress focus revalidate per-call (default behaviour unchanged).

Mechanically migrate all 7 call sites to the new shape. No behaviour
change in this commit — the streaming-aware `revalidateOnFocus: false`
follow-up lives in the next commit.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

*  feat(chat): consume gateway uiMessages snapshot as SoT at step boundaries

LOBE-9501

Server attaches the canonical UIChatMessage[] snapshot to step_start and
agent_runtime_end events (#15152). The client now uses that pushed payload
as the source of truth instead of refetching from DB:

- step_start handler calls replaceMessages(uiMessages, { context }) when
  the snapshot is present, so the assistant tab-switch / next-step path
  no longer issues a refetch that returns a stale assistant placeholder.
- agent_runtime_end handler does the same for the terminal step — the
  last step has no later step_start to carry a fresh snapshot, so this
  branch is the only one that reconciles the final commit.
- step_complete on phase=tool_execution stops calling refreshMessages.
  That refetch was the direct cause of the assistantGroup→assistant
  clobber regression captured by the agent-gateway probe scripts.
- ChatList disables SWR revalidateOnFocus while the current topic is
  streaming (via operationSelectors.isAgentRuntimeRunningByContext) and
  automatically restores it after the run ends. Tab-focus during a run
  no longer triggers the stale DB read.

Doesn't touch streamingExecutor.ts (homogeneous runtime — parallel path).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🐛 fix(chat-store): wire gateway handler to consume server-pushed uiMessages SoT

LOBE-9501

#15152 (server) attaches the canonical UIChatMessage[] snapshot to both
the Redis SSE channel and the gateway /push-event channel. The earlier
client patch wired the consumer into `runAgent.ts`, but that file only
runs on the Group Chat SSE path. The actual gateway entry point
(`createGatewayEventHandler` in `gatewayEventHandler.ts`, used by single
agent, sub-agent, and hetero-CLI flows) ignored the field entirely and
kept refetching from DB.

Fix the gateway handler:

- step_start: consume `event.data.uiMessages` and replaceMessages with
  the pushed SoT. Skipped when absent — hetero adapters don't emit
  step_start at all (HeterogeneousEventType excludes it), so the new
  branch is invisible to hetero.

- agent_runtime_end: same SoT consumption; the existing
  `fetchAndReplaceMessages` becomes the fallback for events without the
  field. Claude Code adapter emits agent_runtime_end with empty data,
  so hetero terminal behavior is preserved by the fallback.

- stream_start: gate the DB fetch on `!newAssistantMessageId`. Native
  gateway streams carry `assistantMessage.id` (the preceding step_start
  also delivered the SoT), so the await is unnecessary — AND it was
  blocking the enqueue chain. Live chunks queued behind that await
  could not dispatch, which manifested as "streaming content never
  lands in messagesMap" during tab-switch and slow-network repros.
  Hetero CLI streams never set `assistantMessage.id`, so the fetch
  still runs for them on every stream_start.

Verified with the agent-gateway probe (separate commit): chunks now
land in real time (cLen grows 3 → 529 monotonically), and tab-switch
mid-stream no longer rolls the streamed assistantGroup back to the
LOADING placeholder (ROLLBACKS=none in the analyzer output).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🧪 chore(local-testing): rewrite agent-gateway probes in TS + add CLI

LOBE-9501

Convert the local-testing agent-gateway probes from .js/.mjs to TypeScript
and add a unified `run.ts` CLI that bundles via Bun.build (no extra
deps) and persists dumps to a gitignored `.agent-gateway/` directory for
use as streaming-replay test fixtures.

- types.ts: shared dump shape (ProbeStreamEvent / ProbeTimelineSample /
  ProbeDump) and `declare global` for the `window.__PROBE_*` surface
- probe-events.ts: WebSocket + fetch interception (gateway WS captures
  any socket with `operationId=`; fetch captures `/api/agent/stream` for
  direct SSE). Per-key timeline samples every 200ms so we can see
  which messagesMap key streaming chunks actually land in
- probe-dump.ts: stops the timeline timer and stashes JSON dump on
  `window.__PROBE_LAST_DUMP_JSON` (runner returns that global)
- analyze-events.ts: stream events (non-chunk) + chunks summary +
  action-call stacks + correlation + per-key assistant growth +
  rollback detection. Per-key growth was added specifically to
  diagnose "chunks arrive but assistant cLen never moves"
- run.ts: `install` | `dump [name]` | `analyze [path]` CLI. Bundles via
  Bun.build, wraps as IIFE with explicit return, pipes to
  `agent-browser eval --stdin`. Dumps land at
  `.agent-gateway/<name>-<YYYYMMDD-HHmmss>.json`

`.agent-gateway/` is gitignored so dumps accumulate across debugging
sessions without polluting git.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🐛 fix(local-testing): repair run.ts after autofix mangled path imports

LOBE-9501

The eslint --fix run during the previous commit applied the unicorn
`import-style` rule and renamed every `join(` / `dirname(` / `resolve(`
to `path.join(` / `path.dirname(` / `path.resolve(`, but the replacement
was a naive text substitution that:

1. rewrote `array.join('\n')` to `array.path.join('\n')` — broke bundle
   error reporting (would TypeError on the build-failure path)
2. produced `const path = path.join(DUMP_DIR, filename)` inside cmdDump
   — shadowed the `path` module with itself, ReferenceError on every
   dump invocation

Rename the local `path` to `dumpPath` and drop the spurious `.path`
prefix on the array `.join`. Verified round-trip: install + dump now
write a valid capture to `.agent-gateway/`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🧪 chore(local-testing): capture per-call message snapshot in probe

LOBE-9501

The probe's `replaceMessages` wrapper used to record only `count` and
`params` — enough to see "two messages were written" but not WHICH two.
For post-stream collapse debugging we need to see whether each call
restored streamed content (cLen=N) or wiped to LOADING_FLAT (cLen=3).

Two changes:

- Capture `snapshot` field on every replaceMessages call: last 2
  messages' id / role / cLen / rLen / updatedAt. The analyzer prints
  this inline next to each call so reviewers can see content drift /
  collapse without re-reading the dump.

- Make wrapping idempotent across re-installs. The old guard
  `chat.__probeWrapped = true` froze the first-installed wrapper across
  re-installs, so updates to the probe body had no effect without a
  page reload. Stash the originals on
  `window.__PROBE_ORIG_REFRESH_MESSAGES` /
  `window.__PROBE_ORIG_REPLACE_MESSAGES` and re-wrap from those on
  every install.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🧪 chore(local-testing): add mutation log + dispatchMessage wrap to probe

LOBE-9501

The replaceMessages-only wrap couldn't catch chunk-level writes (those go
through internal_dispatchMessage) or attribute post-stream collapses to a
specific writer. Add:

- `__PROBE_MUTATIONS` — unified ordered log of every dbMessagesMap[key]
  reference change, with `last`/`prevLast` summaries and a `delta` field
  that tags interesting transitions (`cLen↓N→M`, `rLen↓`, `id:A→B`,
  `n↓prev→cur`). Both writers — replaceMessages AND internal_dispatchMessage
  — push to the same buffer so a single timeline shows all stores writes.

- Idempotent action wrapping. Originals are stashed on
  `window.__PROBE_ORIG_*` and re-wrapped from there on every install, so
  probe edits take effect without a page reload (previous
  `chat.__probeWrapped` flag froze the first wrapper).

- Snapshot field on replaceMessages — last 2 messages'
  id/role/cLen/rLen/updatedAt — so reviewers can see WHICH content each
  call is writing instead of just the count.

- Dump file now carries the `mutations` array alongside streamEvents,
  actionCalls, timeline.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🐛 fix(chat-store): gate SWR onData by isStreaming for streaming topic

LOBE-9501

Backstop for the post-stream cLen collapse that survives even with the
gateway SoT consume in place. Reproduction (confirmed):

1. Send a stream that lands lots of WS chunks into ChatStore
2. Immediately reload the page

If the page reload races against server-side chunk fan-out into Postgres,
SWR's fresh fetch returns the assistant row in its LOADING_FLAT placeholder
state (cLen=3) and writes that to ChatStore via the conversation-store
mirror — even though the WS push at agent_runtime_end carried the
correct full content moments earlier.

`mergeFetchedMessagesWithLocalState`'s updatedAt tie-breaker handles
this for in-session repros (local message wins when its updatedAt is
newer), but it degenerates when:

- The SoT consume just wrote server's snapshot updatedAt onto the local
  message, equalising the timestamps so the next stale DB fetch wins
- The user reloads (no local state to merge against — fresh fetch wins
  outright)

Add a gate at the bottom of `ConversationStore.useFetchMessages.onData`:
while `isAgentRuntimeRunningByContext(context)` is true, drop the SWR
write entirely. SWR's own cache still updates, so once streaming ends a
normal revalidate writes through correctly.

This is layered defense — it does NOT fix the underlying server-side
fan-out lag (filed as separate Linear issue). It does prevent the
client-side flash users currently see during the lag window.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🧪 test(chat-store): align gateway handler tests with SoT contract

The previous assertions still expected `stream_start` to issue a DB refetch
on every native gateway stream — the very behaviour LOBE-9501 removes
(`acb9523a04`). Update the three failing cases to the new contract:

- `stream_start > should associate new message with operation`:
  assert `messageService.getMessages` is NOT called when
  `assistantMessage.id` is present (the SoT snapshot from the preceding
  `step_start` already pre-populated `dbMessagesMap`).
- `sequential processing`: rewrite around the surviving ordering guarantee
  — `associate` (stream_start) must precede `dispatch` (stream_chunk) so
  the chunk targets the new id. Add a sibling case for hetero CLI streams
  (no `assistantMessage.id` → DB fetch is still mandatory).
- `multi-step integration > full LLM → tools → LLM cycle`: keep the
  post-`tool_end` `replaceMessages` assertion (tool_end still refreshes
  from DB), invert the post-`stream_start` assertion for step 2.

42 tests passing (was 41 + 1 new hetero fallback test).

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 20:05:58 +08:00
AmAzing- 5f6f053039 🐛 fix(agent): hide community publish for heterogeneous agents (#15166) 2026-05-24 18:39:05 +08:00
AmAzing- 775be47513 🐛 fix(agent): align settings defaults and locale state (#15163) 2026-05-24 16:29:22 +08:00
Arvin Xu 2f265a9307 🐛 fix(conversation): only swap model name for remote hetero agents in Usage (#15156)
* 🐛 fix(conversation): only swap model name for remote hetero agents in Usage

Local CLI hetero agents (claude-code, codex) report their actual model
id on `turn_metadata` and persist it on the assistant message, but the
Usage extra was unconditionally replacing it with the provider brand
label ("Claude Code" / "Codex") whenever `HETEROGENEOUS_TYPE_LABELS`
had an entry. Gate the swap to remote platform agents (openclaw,
hermes) — those don't expose a real model id — so CC/Codex turns show
the underlying model again.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

*  test(desktop): update GatewayConnectionCtr tests for lh hetero exec route

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 13:08:21 +08:00
Arvin Xu 0fa2e2349c 🐛 fix(desktop): route gateway agent runs through lh hetero exec (#15132)
* feat(desktop): route gateway agent runs through lh hetero exec

Replace the desktop-side GatewayConnectionCtr.executeAgentRun() flow
(startSession -> sendPrompt with local AgentStreamPipeline) with a direct
lh hetero exec spawn. The lh CLI handles spawn -> adapt -> BatchIngester ->
heteroIngest/heteroFinish, matching the cloud sandbox path exactly.

Changes:
- HeterogeneousAgentCtr: add spawnLhHeteroExec() method
- GatewayConnectionCtr: executeAgentRun() now delegates to the new method

* 🐛 fix(desktop): remove duplicate lh token from hetero exec args

spawn('lh', args) already invokes the lh binary, so the leading 'lh'
in args made the effective command `lh lh hetero exec ...` and failed
before heteroIngest could run, breaking the gateway-triggered agent
run flow.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

---------

Co-authored-by: LobeHub Agent <agent@lobehub.com>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 02:54:00 +08:00
Arvin Xu 930344ae23 feat(agent-runtime): push UIChatMessage snapshot at gateway step boundaries (#15152)
* 🧪 chore(local-testing): add agent-gateway probe scripts for stream SoT validation

Probe + tab-switch + analyzer scripts under .agents/skills/local-testing/scripts/agent-gateway/
to capture in-browser snapshots of the message store during gateway streaming and detect
regressions where assistantGroup messages get clobbered by stale DB refetches.

Used to verify LOBE-9501.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

*  feat(agent-runtime): push canonical UIChatMessage snapshot at step boundaries

LOBE-9501

Gateway-mode streaming previously let the client refetch from DB on every
step_complete or tab-focus; with stream chunks landing before the DB write
fans out, the refetch returned a stale assistant placeholder that clobbered
the in-memory streamed assistantGroup (reasoning / tool calls / content).

Server now attaches the canonical UIChatMessage[] snapshot to step_start
and agent_runtime_end events so the client can use the pushed payload as
Source of Truth instead of refetching:

- step_start now loads agent state first, queries messages, and attaches
  uiMessages to the event data when topic context is known
- publishAgentRuntimeEnd signature switched to a params object (additive
  uiMessages field) and the coordinator resolves the snapshot through an
  optional uiMessagesResolver hook before publishing terminal events
- AgentRuntimeService wires the resolver through a lazily-instantiated
  MessageService so tests without S3 env still construct cleanly
- MessageService.queryMessages exposes the same read path as the
  message.getMessages trpc lambda (FileService postProcessUrl included)

Pure additive on the wire: legacy consumers see new uiMessages field, old
finalState payload unchanged. Existing call sites in agentNotify and
aiAgent migrated to the params shape. Failures in the resolver fall back
to publishing without uiMessages so streaming never fails the step.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🐛 fix(agent-runtime): forward uiMessages in gateway /push-event payload

LOBE-9501

GatewayStreamNotifier.publishAgentRuntimeEnd was delegating uiMessages to
the inner manager (Redis SSE) but reconstructing its own push-event data
object that only carried { errorType, finalState, reason, reasonDetail }.
In gateway mode, clients consume /push-event rather than Redis directly,
so the canonical UIChatMessage[] snapshot never reached them at terminal
state — and the final step has no later step_start to carry a fresh one.

Forward uiMessages via the same conditional-spread pattern used in the
inner managers; add two tests covering the present/absent branches.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 01:23:21 +08:00
Arvin Xu 538195dfb4 🐛 fix(agent-runtime): route context engine payload out of the events stream (#15151)
* 🐛 fix(agent-runtime): route context engine payload out of the events stream

`call_llm` previously pushed a `context_engine_result` event carrying the
full `contextEngineInput` (agentDocuments, systemRole, knowledge, …) into
the per-step events array. That array is the same one persisted into
Redis `agent_runtime_events`, so every step shipped the heavy CE payload
into the state pipeline even though the only consumer was the trace
recorder, which extracted CE into the typed `contextEngine` snapshot
field and immediately filtered the event back out.

Wire a typed `recordContextEngine` callback through
`RuntimeExecutorContext` instead. `AgentRuntimeService.executeStep`
buffers the call per step and hands it to
`OperationTraceRecorder.appendStep` via a new `contextEngine` param.
Trace snapshots are byte-identical; the events stream — and therefore
the Redis state blob — no longer carries CE.

Step toward LOBE-9110 (split state vs trace pipeline). Viewer keeps
the legacy `context_engine_result` reader for back-compat with older
on-disk snapshots.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🎨 refactor(agent-runtime): rename recordContextEngine to tracingContextEngine

The callback name now signals its role as the trace-pipeline channel,
matching the `tracing` prefix used elsewhere for non-state observability
wiring. Pure rename, no behavior change.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 01:14:12 +08:00
67 changed files with 3606 additions and 435 deletions
+6 -5
View File
@@ -14,7 +14,7 @@ In `NODE_ENV=development`, `AgentRuntimeService.executeStep()` automatically rec
**Data flow**: executeStep loop -> build `StepPresentationData` -> write partial snapshot to disk -> on completion, finalize to `.agent-tracing/{timestamp}_{traceId}.json`
**Context engine capture**: In `RuntimeExecutors.ts`, the `call_llm` executor emits a `context_engine_result` event after `serverMessagesEngine()` processes messages. This event carries the full `contextEngineInput` (DB messages, systemRole, model, knowledge, tools, userMemory, etc.) and the processed `output` messages (the final LLM payload).
**Context engine capture**: In `RuntimeExecutors.ts`, the `call_llm` executor calls `ctx.tracingContextEngine(input, output)` after `serverMessagesEngine()` processes messages. `AgentRuntimeService.executeStep` buffers the call per step and forwards it to `OperationTraceRecorder.appendStep` as the typed `contextEngine` field. CE flows through this side channel rather than the `events` array so its heavy payload (agentDocuments, systemRole, …) never enters the Redis state pipeline (LOBE-9110).
## Package Location
@@ -199,9 +199,10 @@ interface StepSnapshot {
messages?: any[]; // DB messages before step
context?: { phase: string; payload?: unknown; stepContext?: unknown };
events?: Array<{ type: string; [key: string]: unknown }>;
// context_engine_result event contains:
// input: full contextEngineInput (messages, systemRole, model, knowledge, tools, userMemory, ...)
// output: processed messages array (final LLM payload)
contextEngine?: {
input?: unknown; // contextEngineInput minus messages + toolsConfig (reconstructible from baseline)
output?: unknown; // processed messages array (final LLM payload)
};
}
```
@@ -216,5 +217,5 @@ When using `--messages`, the output shows three sections (if context engine data
## Integration Points
- **Recording**: `src/server/services/agentRuntime/AgentRuntimeService.ts` — in the `executeStep()` method, after building `stepPresentationData`, writes partial snapshot in dev mode
- **Context engine event**: `src/server/modules/AgentRuntime/RuntimeExecutors.ts` — in `call_llm` executor, after `serverMessagesEngine()` returns, emits `context_engine_result` event
- **Context engine capture**: `src/server/modules/AgentRuntime/RuntimeExecutors.ts` — in `call_llm` executor, after `serverMessagesEngine()` returns, calls `ctx.tracingContextEngine(input, output)`. `AgentRuntimeService.executeStep` buffers it per step and passes it to `traceRecorder.appendStep` as the typed `contextEngine` field (kept off the `events` array to stay out of Redis state).
- **Store**: `FileSnapshotStore` reads/writes to `.agent-tracing/` relative to `process.cwd()`
@@ -0,0 +1,93 @@
# LobeHub gateway streaming + tab-switch test harness
Captures store + DOM state at 200ms intervals so we can prove or disprove
claims like "切回 tab 后消息回到了很早以前". Built for gateway-mode chat but
works for any LobeHub streaming session.
## Files
`scripts/agent-gateway/`
| File | Role |
| --------------- | ---------------------------------------------------------------- |
| `probe.js` | Injects a 200ms sampler + `__PROBE_EVENT` marker + `__switchTab` |
| `probe-dump.js` | Stops the sampler and returns `{events, samples}` as JSON string |
| `tab-switch.js` | Runs N round-trip switches between two tabs, marks each step |
| `analyze.mjs` | Node post-processor: timeline + regression detection |
## Standard workflow
```bash
# 1. Start Electron with CDP
./.agents/skills/local-testing/scripts/electron-dev.sh start
# 2. Navigate to a chat, switch runtime to Cloud Sandbox (gateway mode)
# 3. Install the probe + helpers
agent-browser --cdp 9222 eval --stdin \
< .agents/skills/local-testing/scripts/agent-gateway/probe.js
# 4. Send a tool-call message — manually or via type+press
agent-browser --cdp 9222 eval "window.__PROBE_EVENT('SENT')"
# 5. Run the multi-switch driver (auto-picks active tab as BACK and the
# rightmost inactive tab as AWAY — edit ROUND_TRIPS / DWELL_MS in the
# file if you want different timing)
agent-browser --cdp 9222 eval --stdin \
< .agents/skills/local-testing/scripts/agent-gateway/tab-switch.js
# 6. Wait for streaming to finish, then dump
agent-browser --cdp 9222 eval --stdin \
< .agents/skills/local-testing/scripts/agent-gateway/probe-dump.js \
> /tmp/probe.json
# 7. Analyze
node .agents/skills/local-testing/scripts/agent-gateway/analyze.mjs /tmp/probe.json
```
The analyzer prints three sections: EVENTS, TIMELINE, REGRESSIONS. If
REGRESSIONS is non-empty it means content/reasoning/childN dropped on the
same topic — the symptom users describe.
## What the probe tracks (and why)
`chat.messagesMap` only stores the top-level `assistantGroup` shell. The
actual streamed content, reasoning, and tool calls live in
`assistantGroup.children: AssistantContentBlock[]`. Any probe that only
reads `m.content` / `m.reasoning` will see zeros throughout streaming and
miss everything that matters. probe.js walks both levels and sums:
- `cT` total content length
- `rT` total reasoning length
- `toolT` total tool-call count
- `childN` number of content blocks
Plus DOM-side signals (`domLen`, search/crawl indicator counts) so you can
tell store-side regressions apart from render-side regressions.
## Gotchas
- **Optimistic new-topic state.** Before the first chunk lands, messages
live under the `<scope>_new` key with `tmp_*` ids and no `topicId` field.
probe.js falls back to those when `activeTopicId` is null.
- **Reasoning resets to 0 are not bugs.** When the assistant finishes
thinking and starts tool-use or text, the streaming reasoning buffer
empties and the finalised reasoning gets sealed into a completed block.
Filter these out manually if needed.
- **DOM length jitters by a handful of chars** because counters like "(10)"
in tool-call labels change as results arrive. analyze.mjs only flags
`domLen` drops greater than 100 chars to ignore that noise.
- **Never identify tabs by innerText.** The active tab's text embeds a
` · <agent name>` suffix, so a search like `'LobeHub Growth'` matches the
active tab when the active agent happens to be LobeHub Growth — and you
end up clicking the tab you're already on. probe.js uses the stable
`data-contextmenu-trigger` attribute (a React `useId()` value that's set
per-tab and survives focus changes) plus `data-active="true"` to mark
the active one. Helpers exposed:
`__listTabs()` / `__clickTabByKey(key)` / `__clickTabByIndex(i)` /
`__activeTabKey()`.
- **`tab-switch.js` fires-and-forgets.** The IIFE kicks off an async loop
and returns immediately so the agent-browser CLI eval doesn't blow past
its default 25 s timeout. Wait on the `SWITCH_LOOP_DONE` event marker
before dumping. Re-running while a loop is in flight is refused — the
chaotic data from overlapping runs is not worth debugging.
@@ -0,0 +1,243 @@
// Analyzer for probe-events dumps. Reads a JSON file produced by `run.ts dump`
// and prints a layered breakdown:
//
// 1. STREAM EVENTS — every non-chunk WS/SSE event in receipt order
// 2. CHUNKS SUMMARY — collapsed per-step chunk counts (otherwise floods)
// 3. ACTION CALLS — replaceMessages / refreshMessages / MARK:* with stack
// 4. CORRELATION — calls ↔ nearest stream event within ±300ms
// 5. PER-KEY ASSISTANT GROWTH — for each messagesMap key, when the leading
// assistant message's cLen / rLen actually moves (this is what reveals
// "chunks arrived but the message never grew" regressions)
// 6. ROLLBACKS — msgN / childN / role drops in the active-topic timeline
//
// Usage:
// bun run .agents/skills/local-testing/scripts/agent-gateway/analyze-events.ts <dump.json>
import { readFileSync } from 'node:fs';
import type {
ProbeActionCall,
ProbeDump,
ProbeMessageSummary,
ProbeStreamEvent,
ProbeTimelineSample,
} from './types';
const file = process.argv[2];
if (!file) {
console.error('usage: bun run analyze-events.ts <dump.json>');
process.exit(1);
}
const raw = readFileSync(file, 'utf8');
// agent-browser eval --stdin wraps return values in quotes when the value is
// a string — so the JSON file may be double-encoded depending on how it was
// captured. Handle both.
const parsedOnce = JSON.parse(raw) as ProbeDump | string;
const dump: ProbeDump = typeof parsedOnce === 'string' ? JSON.parse(parsedOnce) : parsedOnce;
const { streamEvents = [], actionCalls = [], timeline = [] } = dump;
const pad = (v: unknown, n: number) => String(v).padStart(n);
// ── META ───────────────────────────────────────────────────────────
console.log('=== META ===');
console.log(` events: ${streamEvents.length}`);
console.log(` calls: ${actionCalls.length}`);
console.log(` timeline: ${timeline.length}`);
// ── 1. STREAM EVENTS (non-chunk) ───────────────────────────────────
const nonChunkEvents = streamEvents.filter((e) => e.type !== 'stream_chunk');
const chunkEvents = streamEvents.filter((e) => e.type === 'stream_chunk');
console.log(
`\n=== STREAM EVENTS (${nonChunkEvents.length} non-chunk + ${chunkEvents.length} chunks elided) ===`,
);
for (const e of nonChunkEvents) {
const dataStr = e.dataKeys?.length ? ` [${e.dataKeys.join(',')}]` : '';
const data = e.data as Record<string, unknown> | undefined;
const uiHint = data?.uiMessagesPreview
? ` uiPreview=${JSON.stringify(data.uiMessagesPreview)}`
: data?.uiMessagesTotal
? ` uiTotal=${data.uiMessagesTotal}`
: '';
const phaseHint = data?.phase ? ` phase=${data.phase}` : '';
const extra = e.serverType ? ` serverType=${e.serverType}` : '';
console.log(
` t=${pad(e.t, 7)} [${(e.transport ?? '?').padEnd(3)}] step=${pad(e.stepIndex ?? '-', 2)} ` +
`type=${(e.type ?? '').padEnd(22)} op=${e.opIdTail ?? '-'}${phaseHint}${uiHint}${extra}${dataStr}`,
);
}
// ── 2. CHUNK SUMMARY ───────────────────────────────────────────────
console.log('\n=== CHUNKS SUMMARY (per step / chunkType) ===');
const chunkBuckets = new Map<string, { count: number; firstT: number; lastT: number }>();
for (const c of chunkEvents) {
const data = c.data as Record<string, unknown> | undefined;
const ct = (data?.chunkType as string | undefined) ?? '?';
const key = `step=${c.stepIndex ?? '-'} chunkType=${ct.padEnd(8)} op=${c.opIdTail}`;
const slot = chunkBuckets.get(key);
if (slot) {
slot.count += 1;
slot.lastT = c.t;
} else {
chunkBuckets.set(key, { count: 1, firstT: c.t, lastT: c.t });
}
}
for (const [k, v] of chunkBuckets) {
console.log(` ${k} count=${pad(v.count, 4)} t=${pad(v.firstT, 7)}..${pad(v.lastT, 7)}`);
}
// ── 3. ACTION CALLS ───────────────────────────────────────────────
console.log('\n=== ACTION CALLS (replace/refresh/MARK) ===');
for (const c of actionCalls) {
if (c.name?.startsWith('MARK:')) {
console.log(` t=${pad(c.t, 7)} ${c.name}`);
continue;
}
const snapshot = (c.args as any)?.snapshot as
| Array<{ id: string; role: string; cLen: number; rLen: number }>
| undefined;
const snapStr = snapshot?.length
? ' snapshot=' + snapshot.map((m) => `${m.id}:${m.role}/c${m.cLen}/r${m.rLen}`).join(' | ')
: '';
const summary =
c.name === 'replaceMessages'
? `count=${c.args?.count} action=${(c.args?.params as any)?.action ?? '-'}${snapStr}`
: c.name === 'refreshMessages'
? `ctx=${JSON.stringify(c.args?.context)}`
: c.error
? `error=${c.error}`
: '';
console.log(` t=${pad(c.t, 7)} ${c.name.padEnd(20)} ${summary}`);
if (c.stack) {
const frames = c.stack
.split(' ← ')
.filter((f) => !!f && !f.includes('Object.<anonymous>'))
.slice(0, 3);
for (const f of frames) console.log(`${f}`);
}
}
// ── 4. CORRELATION ────────────────────────────────────────────────
function nearestEventForCall(
call: ProbeActionCall,
windowMs = 300,
): { event: ProbeStreamEvent; delta: number } | null {
let best: ProbeStreamEvent | null = null;
let bestDelta = Infinity;
for (const e of streamEvents) {
const d = Math.abs(e.t - call.t);
if (d < bestDelta && d <= windowMs) {
bestDelta = d;
best = e;
}
}
return best ? { event: best, delta: bestDelta } : null;
}
console.log('\n=== CORRELATION (replace/refresh ↔ nearest event within ±300ms) ===');
for (const c of actionCalls) {
if (c.name !== 'refreshMessages' && c.name !== 'replaceMessages') continue;
const hit = nearestEventForCall(c);
if (hit) {
const phase = (hit.event.data as Record<string, unknown> | undefined)?.phase;
console.log(
` t=${pad(c.t, 7)} ${c.name.padEnd(16)} ← Δ${pad(hit.delta, 4)}ms ${hit.event.type}` +
(phase ? ` phase=${phase}` : ''),
);
} else {
console.log(` t=${pad(c.t, 7)} ${c.name.padEnd(16)} ← (no event nearby — external trigger)`);
}
}
// ── 5. PER-KEY ASSISTANT GROWTH ───────────────────────────────────
// For each messagesMap key, find the trailing assistant message and report
// the points in time where its cLen / rLen actually changed. If the timeline
// shows chunks arriving but the assistant cLen never moves, that's the
// signature of "dispatch queue blocked / messageId mismatch".
console.log('\n=== PER-KEY ASSISTANT GROWTH ===');
const keysEverSeen = new Set<string>();
for (const s of timeline) for (const k of Object.keys(s.byKey ?? {})) keysEverSeen.add(k);
for (const key of keysEverSeen) {
console.log(`\n key=${key}`);
let lastSig: string | null = null;
for (const s of timeline) {
const slot = s.byKey?.[key];
if (!slot) continue;
const last = slot.msgs.at(-1) as ProbeMessageSummary | undefined;
if (!last) continue;
const sig = `${last.id}|c${last.cLen}|r${last.rLen}|n${slot.n}`;
if (sig === lastSig) continue;
lastSig = sig;
console.log(
` t=${pad(s.t, 7)} msgN=${pad(slot.n, 3)} ` +
`lastAssistant=${last.id} cLen=${pad(last.cLen, 5)} rLen=${pad(last.rLen, 5)}` +
` runOps=${s.runOps}`,
);
}
}
// ── 6. ROLLBACKS (active-topic msgN / childN / role drops) ─────────
console.log('\n=== ROLLBACKS (active-topic msgN / childN / role drops) ===');
let prev: ProbeTimelineSample | null = null;
const rollbacks: Array<{ t: number; topic: string | null; drops: string[] }> = [];
const flatten = (s: ProbeTimelineSample) => {
if (!s.activeTopic) return [];
return Object.entries(s.byKey ?? {})
.filter(([k]) => k.includes(s.activeTopic!))
.flatMap(([, v]) => v.msgs);
};
for (const s of timeline) {
if (s.err) {
prev = null;
continue;
}
if (!prev || prev.activeTopic !== s.activeTopic) {
prev = s;
continue;
}
const prevMsgs = flatten(prev);
const curMsgs = flatten(s);
const drops: string[] = [];
if (curMsgs.length < prevMsgs.length) drops.push(`msgN ${prevMsgs.length}${curMsgs.length}`);
let prevChild = 0;
let curChild = 0;
for (const m of prevMsgs) prevChild += m.chN ?? 0;
for (const m of curMsgs) curChild += m.chN ?? 0;
if (curChild < prevChild) drops.push(`childN ${prevChild}${curChild}`);
const prevById = new Map(prevMsgs.map((m) => [m.id, m]));
for (const m of curMsgs) {
const pr = prevById.get(m.id);
if (!pr) continue;
if (m.cLen < pr.cLen) drops.push(`cLen[${m.id}] ${pr.cLen}${m.cLen}`);
if (m.rLen < pr.rLen) drops.push(`rLen[${m.id}] ${pr.rLen}${m.rLen}`);
}
if (drops.length) rollbacks.push({ t: s.t, topic: s.activeTopic, drops });
prev = s;
}
if (rollbacks.length === 0) {
console.log(' (none)');
} else {
for (const r of rollbacks) {
const nearEvent = streamEvents
.filter((e) => Math.abs(e.t - r.t) <= 300)
.map((e) => `${e.type}${(e.data as any)?.phase ? ':' + (e.data as any).phase : ''}`);
const nearCall = actionCalls
.filter((c) => Math.abs(c.t - r.t) <= 300 && !c.name?.startsWith('MARK:'))
.map((c) => c.name);
console.log(
` t=${pad(r.t, 7)} topic=${r.topic} ${r.drops.join(' | ')}` +
(nearEvent.length ? ` near-event:[${nearEvent.join(',')}]` : '') +
(nearCall.length ? ` near-call:[${nearCall.join(',')}]` : ''),
);
}
}
@@ -0,0 +1,119 @@
#!/usr/bin/env node
// Analyze a probe dump captured by probe.js + probe-dump.js.
//
// node analyze.mjs /tmp/probe.json
//
// Prints:
// 1. EVENTS — user-action markers with their relative timestamps
// 2. TIMELINE — periodic samples (~1 per second + event-adjacent samples)
// showing every interesting field; columns:
// t(ms) | runOps | msgN | childN | content | reasoning | tools | domLen | search | crawl | topic | event
// 3. REGRESSIONS — every place a tracked counter *dropped* on the same
// topic between adjacent samples. A "true" UI rollback shows up as a
// drop in content/reasoning/tools/childN/domLen without a topic change.
//
// Whitelisted transitions (not flagged):
// - topic change → all drops expected (focus moved away)
// - reasoning length 0 after content starts → reasoning gets sealed into a
// completed sub-block; the parent's running reasoning resets to ''.
// - msgN drop when topic transitions from `_new` placeholder to a real id.
import fs from 'node:fs';
const file = process.argv[2];
if (!file) {
console.error('usage: node analyze.mjs <probe.json>');
process.exit(1);
}
const raw = JSON.parse(fs.readFileSync(file, 'utf8'));
// probe-dump.js wraps the payload in JSON.stringify so agent-browser returns
// it as a single quoted string. Unwrap.
const data = typeof raw === 'string' ? JSON.parse(raw) : raw;
const { events, samples } = data;
const fmt = {
pad(v, n) {
return String(v).padStart(n);
},
};
console.log('=== EVENTS ===');
for (const e of events) console.log(` t=${fmt.pad(e.t, 7)} ${e.name}`);
console.log(
'\n=== TIMELINE (~1s cadence, plus event-adjacent samples) ===\n' +
' t(ms) runOps msgN childN content reasoning tools domLen search crawl topic event',
);
let lastSampledAt = -1e9;
const eventBuckets = events.map((e) => e.t);
for (let i = 0; i < samples.length; i++) {
const s = samples[i];
const nearEvent = eventBuckets.some((et) => Math.abs(et - s.t) < 110);
if (!nearEvent && s.t - lastSampledAt < 1000) continue;
lastSampledAt = s.t;
const ev = events.find((e) => Math.abs(e.t - s.t) < 110);
const evMarker = ev ? `${ev.name}` : '';
const topicSuffix = s.topicId ? s.topicId.slice(-6) : '(none)';
const search = s.ind?.search ?? 0;
const crawl = s.ind?.crawl ?? 0;
console.log(
` ${fmt.pad(s.t, 6)} ` +
`${fmt.pad(s.runOps, 6)} ` +
`${fmt.pad(s.msgN, 4)} ` +
`${fmt.pad(s.childN ?? 0, 5)} ` +
`${fmt.pad(s.cT ?? 0, 8)} ` +
`${fmt.pad(s.rT ?? 0, 9)} ` +
`${fmt.pad(s.toolT ?? 0, 5)} ` +
`${fmt.pad(s.domLen ?? 0, 7)} ` +
`${fmt.pad(search, 6)} ` +
`${fmt.pad(crawl, 5)} ` +
`${topicSuffix.padEnd(8)}${evMarker}`,
);
}
console.log('\n=== REGRESSIONS (same topic, value dropped) ===');
const regressions = [];
for (let i = 1; i < samples.length; i++) {
const prev = samples[i - 1];
const cur = samples[i];
if (!cur.topicId || prev.topicId !== cur.topicId) continue;
const drops = [];
if (cur.msgN < prev.msgN) drops.push(`msgN: ${prev.msgN}${cur.msgN}`);
if ((cur.childN ?? 0) < (prev.childN ?? 0)) drops.push(`childN: ${prev.childN}${cur.childN}`);
if ((cur.cT ?? 0) < (prev.cT ?? 0)) drops.push(`content: ${prev.cT}${cur.cT}`);
if ((cur.rT ?? 0) < (prev.rT ?? 0)) drops.push(`reasoning: ${prev.rT}${cur.rT}`);
if ((cur.toolT ?? 0) < (prev.toolT ?? 0)) drops.push(`tools: ${prev.toolT}${cur.toolT}`);
// domLen jitters by a few chars from counter labels — only flag big drops.
if ((cur.domLen ?? 0) < (prev.domLen ?? 0) - 100) {
drops.push(`domLen: ${prev.domLen}${cur.domLen}`);
}
if (drops.length === 0) continue;
const nearbyEv = events.filter((e) => Math.abs(e.t - cur.t) < 600).map((e) => e.name);
regressions.push({ t: cur.t, topic: cur.topicId.slice(-6), drops, nearbyEv });
}
if (regressions.length === 0) {
console.log(' (none)');
} else {
for (const r of regressions) {
const evStr = r.nearbyEv.length ? ` near:[${r.nearbyEv.join(',')}]` : '';
console.log(` t=${fmt.pad(r.t, 7)} topic=${r.topic} ${r.drops.join(' | ')}${evStr}`);
}
}
console.log(`\n=== SUMMARY ===`);
console.log(` samples: ${samples.length}`);
console.log(` events: ${events.length}`);
console.log(` regressions: ${regressions.length}`);
if (samples.length) {
const last = samples.at(-1);
console.log(
` final: msgN=${last.msgN} childN=${last.childN ?? 0} content=${last.cT ?? 0} ` +
`reasoning=${last.rT ?? 0} tools=${last.toolT ?? 0} runOps=${last.runOps}`,
);
}
@@ -0,0 +1,17 @@
// Stop the probe and serialize collected data.
//
// agent-browser --cdp 9222 eval --stdin < probe-dump.js > /tmp/probe.json
//
// The whole thing is wrapped in a JSON.stringify so agent-browser returns it
// as a single quoted string — the analyzer double-parses to handle that.
(function () {
if (window.__PROBE_TIMER) {
clearInterval(window.__PROBE_TIMER);
window.__PROBE_TIMER = null;
}
return JSON.stringify({
events: window.__PROBE_EVENTS || [],
samples: window.__PROBE_SAMPLES || [],
});
})();
@@ -0,0 +1,37 @@
// Stops the events-probe timeline timer and stashes the full capture as a
// JSON string on `window.__PROBE_LAST_DUMP_JSON`. `run.ts` wraps the bundle
// in an IIFE that returns that global, which `agent-browser eval` prints to
// stdout — the runner then persists it under `.agent-gateway/`.
import type { ProbeDump } from './types';
declare global {
interface Window {
__PROBE_LAST_DUMP_JSON?: string;
}
}
const w = window;
if (w.__PROBE_TIMELINE_TIMER) {
clearInterval(w.__PROBE_TIMELINE_TIMER);
w.__PROBE_TIMELINE_TIMER = null;
}
const mutations = w.__PROBE_MUTATIONS ?? [];
const dump: ProbeDump & { mutations: typeof mutations } = {
meta: {
t0: w.__PROBE_T0 ?? 0,
collectedAt: Date.now(),
sampleCount: (w.__PROBE_MSG_TIMELINE ?? []).length,
eventCount: (w.__PROBE_STREAM_EVENTS ?? []).length,
callCount: (w.__PROBE_ACTION_CALLS ?? []).length,
},
streamEvents: w.__PROBE_STREAM_EVENTS ?? [],
actionCalls: w.__PROBE_ACTION_CALLS ?? [],
timeline: w.__PROBE_MSG_TIMELINE ?? [],
mutations,
};
w.__PROBE_LAST_DUMP_JSON = JSON.stringify(dump);
@@ -0,0 +1,637 @@
// LobeHub gateway raw-event-stream probe.
//
// Gateway-mode chats subscribe via WebSocket — NOT via the `/api/agent/stream`
// SSE endpoint (that one belongs to the direct/client durable-agent runtime).
// `AgentStreamClient` (`packages/agent-gateway-client/src/client.ts`) opens
// `new WebSocket('wss://.../ws?operationId=...')`, then parses JSON frames in
// its `onmessage` handler and re-emits `agent_event.event` objects to the
// chat store.
//
// To capture the RAW gateway events before the store touches them, we wrap
// `window.WebSocket` so that for any socket whose URL contains `operationId=`
// we intercept the `onmessage` handler / `addEventListener('message')` and
// log every `agent_event` frame.
//
// We *also* keep the `window.fetch` hook for `/api/agent/stream` so this
// probe still works for direct-mode runs — but gateway-mode events come
// through the WebSocket path.
//
// Buffers (read via `dump`):
// __PROBE_STREAM_EVENTS — raw events parsed off the wire
// __PROBE_ACTION_CALLS — replaceMessages / refreshMessages calls (best-effort)
// __PROBE_MSG_TIMELINE — 200ms snapshots of every messagesMap key
import type {
ProbeActionCall,
ProbeMessageSummary,
ProbeStreamEvent,
ProbeTimelineSample,
} from './types';
// Bundled by esbuild as an IIFE. Top-level code runs once on injection.
const w = window;
// ── Buffers ─────────────────────────────────────────────────────────
declare global {
interface Window {
__PROBE_MUTATIONS?: Array<{
t: number;
key: string;
n: number;
last?: { id: string; role: string; cLen: number; rLen: number; updatedAt?: unknown };
prevLast?: { id: string; role: string; cLen: number; rLen: number };
delta?: string;
}>;
__PROBE_STORE_UNSUB?: () => void;
}
}
const events: ProbeStreamEvent[] = (w.__PROBE_STREAM_EVENTS ??= []);
const calls: ProbeActionCall[] = (w.__PROBE_ACTION_CALLS ??= []);
const timeline: ProbeTimelineSample[] = (w.__PROBE_MSG_TIMELINE ??= []);
const mutations = (w.__PROBE_MUTATIONS ??= []);
events.length = 0;
calls.length = 0;
timeline.length = 0;
mutations.length = 0;
const t0 = Date.now();
w.__PROBE_T0 = t0;
const now = (): number => Date.now() - t0;
// ── Helpers ─────────────────────────────────────────────────────────
function summarizeData(data: unknown): Record<string, unknown> | unknown {
if (!data || typeof data !== 'object') return data;
const src = data as Record<string, unknown>;
const out: Record<string, unknown> = {};
for (const k of Object.keys(src)) {
const v = src[k];
if (v == null) {
out[k] = v;
} else if (Array.isArray(v)) {
out[k] = `Array(${v.length})`;
if (k === 'uiMessages') {
out.uiMessagesPreview = v.slice(0, 5).map((m: any) => ({
id: (m.id ?? '').slice(-8),
role: m.role,
cLen: (m.content ?? '').length,
children: (m.children ?? []).length,
tools: (m.tools ?? []).length,
reasoning: (m.reasoning?.content ?? '').length,
}));
out.uiMessagesTotal = v.length;
}
} else if (typeof v === 'object') {
const obj = v as Record<string, unknown>;
out[k] =
'Object{' +
Object.keys(obj)
.slice(0, 6)
.map((kk) => kk + (typeof obj[kk] === 'string' ? `=${(obj[kk] as string).length}ch` : ''))
.join(',') +
'}';
} else if (typeof v === 'string') {
out[k] = v.length > 100 ? v.slice(0, 100) + `…(${v.length})` : v;
} else {
out[k] = v;
}
}
return out;
}
function summarizeMessages(msgs: any[]): ProbeMessageSummary[] {
return (msgs ?? []).slice(0, 80).map((m) => ({
id: (m.id ?? '').slice(-8),
role: m.role,
cLen: (m.content ?? '').length,
rLen: (m.reasoning?.content ?? '').length,
tools: (m.tools ?? []).length,
chN: (m.children ?? []).length,
}));
}
function shortStack(): string {
const raw = new Error('probe-stack').stack ?? '';
return raw
.split('\n')
.slice(3)
.filter((l) => !l.includes('probe-events') && !l.includes('node_modules'))
.map((l) => l.trim().replace(/^at\s+/, ''))
.slice(0, 6)
.join(' ← ');
}
function recordAgentEvent(args: {
transport: 'ws' | 'sse';
opId: string | null;
agentEvent: any;
eventId?: string | null;
rawLen?: number;
}): void {
const { transport, opId, agentEvent, eventId, rawLen } = args;
if (!agentEvent || typeof agentEvent !== 'object') return;
events.push({
t: now(),
transport,
opIdTail: (opId ?? '').slice(-10),
eventId: eventId ?? null,
type: agentEvent.type,
stepIndex: agentEvent.stepIndex,
dataKeys: agentEvent.data ? Object.keys(agentEvent.data) : [],
data: summarizeData(agentEvent.data) as Record<string, unknown>,
rawLen,
});
}
// ── 1. Patch window.WebSocket for gateway WS events ────────────────
if (!w.__PROBE_ORIG_WEBSOCKET) w.__PROBE_ORIG_WEBSOCKET = w.WebSocket;
const OrigWS = w.__PROBE_ORIG_WEBSOCKET;
function extractOpIdFromWsUrl(url: string | URL): string | null {
const m = String(url ?? '').match(/operationId=([^&]+)/);
return m ? decodeURIComponent(m[1]) : null;
}
function isGatewayWs(url: string | URL): boolean {
return String(url ?? '').includes('operationId=');
}
function handleWsFrame(rawData: unknown, opId: string | null): void {
const rawLen = typeof rawData === 'string' ? rawData.length : -1;
let parsed: any;
try {
parsed = typeof rawData === 'string' ? JSON.parse(rawData) : null;
} catch {
events.push({
t: now(),
transport: 'ws',
opIdTail: (opId ?? '').slice(-10),
type: '_PARSE_ERROR_',
raw: typeof rawData === 'string' && rawData.length < 400 ? rawData : '(non-string or large)',
});
return;
}
if (!parsed) return;
if (parsed.type === 'agent_event') {
recordAgentEvent({
transport: 'ws',
opId,
agentEvent: parsed.event,
eventId: parsed.id,
rawLen,
});
} else {
events.push({
t: now(),
transport: 'ws',
opIdTail: (opId ?? '').slice(-10),
type: '_SERVER_MSG_',
serverType: parsed.type,
rawLen,
});
}
}
// Wrap the constructor. Instance `constructor` will still reflect OrigWS
// (we share prototypes), so use the `_WS_OPEN_` sentinel events to confirm
// the patch is firing.
function PatchedWebSocket(this: WebSocket, url: string | URL, protocols?: string | string[]) {
const ws: WebSocket = protocols == null ? new OrigWS(url) : new OrigWS(url, protocols);
const opId = extractOpIdFromWsUrl(url);
if (!isGatewayWs(url)) return ws;
events.push({
t: now(),
transport: 'ws',
opIdTail: (opId ?? '').slice(-10),
type: '_WS_OPEN_',
url: String(url),
});
// One observer listener that always fires, regardless of how the consumer
// (AgentStreamClient uses `ws.onmessage = …`) subscribes.
ws.addEventListener('message', (e) => {
try {
handleWsFrame((e as MessageEvent).data, opId);
} catch {
/* swallow */
}
});
ws.addEventListener('close', () => {
events.push({
t: now(),
transport: 'ws',
opIdTail: (opId ?? '').slice(-10),
type: '_WS_CLOSE_',
});
});
return ws;
}
// Preserve prototype + static fields so `instanceof WebSocket` and
// `WebSocket.OPEN` constants still work.
(PatchedWebSocket as unknown as { prototype: WebSocket }).prototype = OrigWS.prototype;
for (const k of Object.keys(OrigWS) as Array<keyof typeof OrigWS>) {
try {
(PatchedWebSocket as any)[k] = (OrigWS as any)[k];
} catch {
/* readonly */
}
}
(['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'] as const).forEach((k) => {
(PatchedWebSocket as any)[k] = (OrigWS as any)[k];
});
w.WebSocket = PatchedWebSocket as unknown as typeof WebSocket;
// ── 2. Patch window.fetch for `/api/agent/stream` (direct-mode SSE) ─
if (!w.__PROBE_ORIG_FETCH) w.__PROBE_ORIG_FETCH = w.fetch.bind(w);
const origFetch = w.__PROBE_ORIG_FETCH;
function isAgentStreamUrl(input: RequestInfo | URL): boolean {
let url = '';
if (typeof input === 'string') url = input;
else if (input instanceof URL) url = input.toString();
else if (input && typeof (input as Request).url === 'string') url = (input as Request).url;
return url.includes('/api/agent/stream');
}
function extractOpIdFromHttpUrl(input: RequestInfo | URL): string | null {
const url = typeof input === 'string' ? input : (input as Request | URL).toString();
const m = url.match(/operationId=([^&]+)/);
return m ? decodeURIComponent(m[1]) : null;
}
function pushFromSSEFrame(rawFrame: string, opId: string | null): void {
const lines = rawFrame.split('\n');
let dataJson = '';
let evtName = 'message';
for (const line of lines) {
if (line.startsWith('event:')) evtName = line.slice(6).trim();
else if (line.startsWith('data:')) dataJson += line.slice(5).trim();
}
if (!dataJson) return;
let parsed: any;
try {
parsed = JSON.parse(dataJson);
} catch {
events.push({
t: now(),
transport: 'sse',
opIdTail: (opId ?? '').slice(-10),
type: '_PARSE_ERROR_',
sseEvent: evtName,
raw: dataJson.length > 400 ? dataJson.slice(0, 400) + '…' : dataJson,
});
return;
}
recordAgentEvent({
transport: 'sse',
opId,
agentEvent: parsed,
eventId: null,
rawLen: dataJson.length,
});
}
async function teeAndDrain(response: Response, opId: string | null): Promise<Response> {
if (!response.body) return response;
const [a, b] = response.body.tee();
void (async () => {
const reader = b.getReader();
const decoder = new TextDecoder();
let buf = '';
try {
while (true) {
const { value, done } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
let idx: number;
while ((idx = buf.indexOf('\n\n')) !== -1) {
const frame = buf.slice(0, idx);
buf = buf.slice(idx + 2);
if (frame.trim()) pushFromSSEFrame(frame, opId);
}
}
if (buf.trim()) pushFromSSEFrame(buf, opId);
} catch (e: any) {
events.push({
t: now(),
transport: 'sse',
opIdTail: (opId ?? '').slice(-10),
type: '_TEE_ERROR_',
message: String(e?.message ?? e),
});
}
})();
return new Response(a, {
headers: response.headers,
status: response.status,
statusText: response.statusText,
});
}
w.fetch = async function patchedFetch(input: RequestInfo | URL, init?: RequestInit) {
const response = await origFetch(input as any, init);
if (!isAgentStreamUrl(input)) return response;
const opId = extractOpIdFromHttpUrl(input);
const url =
typeof input === 'string'
? input.split('?')[0]
: (input as Request | URL).toString().split('?')[0];
events.push({
t: now(),
transport: 'sse',
opIdTail: (opId ?? '').slice(-10),
type: '_CONNECTED_',
url,
status: response.status,
});
return teeAndDrain(response, opId);
} as typeof fetch;
// ── 3. Wrap store actions (best-effort for "who called replace") ────
// Side-global stash for the original chat-store actions. Re-installs ALWAYS
// rewrap from the originals so updates to the probe body take effect
// without a page reload — using only a `__probeWrapped` flag on the chat
// state object would freeze the first-installed wrapper across re-installs.
declare global {
interface Window {
__PROBE_ORIG_REFRESH_MESSAGES?: any;
__PROBE_ORIG_REPLACE_MESSAGES?: any;
}
}
try {
const chat = w.__LOBE_STORES?.chat?.();
if (chat) {
// First-time install: cache the originals. Re-install: restore from
// the cached originals before wrapping again.
if (!w.__PROBE_ORIG_REFRESH_MESSAGES) w.__PROBE_ORIG_REFRESH_MESSAGES = chat.refreshMessages;
if (!w.__PROBE_ORIG_REPLACE_MESSAGES) w.__PROBE_ORIG_REPLACE_MESSAGES = chat.replaceMessages;
const origRefresh = w.__PROBE_ORIG_REFRESH_MESSAGES;
const origReplace = w.__PROBE_ORIG_REPLACE_MESSAGES;
chat.refreshMessages = origRefresh;
chat.replaceMessages = origReplace;
chat.refreshMessages = async function probeRefresh(this: unknown, ...args: any[]) {
calls.push({
t: now(),
name: 'refreshMessages',
args: { context: args[0] ?? null },
stack: shortStack(),
});
return origRefresh.apply(this, args);
};
chat.replaceMessages = function probeReplace(this: unknown, ...args: any[]) {
const msgs = (args[0] as any[]) ?? [];
const snapshot = msgs.slice(-2).map((m) => ({
id: (m.id ?? '').slice(-8),
role: m.role,
cLen: (m.content ?? '').length,
rLen: (m.reasoning?.content ?? '').length,
updatedAt: m.updatedAt,
}));
calls.push({
t: now(),
name: 'replaceMessages',
args: { count: msgs.length, params: args[1] ?? null, snapshot } as any,
stack: shortStack(),
});
// Pair the call with a mutation row so the analyzer can build a
// single ordered timeline across replaceMessages + dispatchMessage.
const stackTop = shortStack().split(' ← ')[0]?.slice(0, 80);
const last = msgs.at(-1);
const lastSum = last
? {
id: (last.id ?? '').slice(-8),
role: last.role,
cLen: (last.content ?? '').length,
rLen: (last.reasoning?.content ?? '').length,
updatedAt: last.updatedAt,
}
: undefined;
const params: any = args[1] ?? {};
const ctxKey = params.context
? `main_${params.context.agentId ?? '?'}_${
params.context.topicId ? 'tpc_' + params.context.topicId : 'new'
}`.replace('main_tpc_', 'main_') // crude key inference
: '(no-ctx)';
mutations.push({
t: now(),
key: ctxKey,
n: msgs.length,
last: lastSum,
delta: `replaceMessages(action=${params.action ?? '-'}) src=${stackTop ?? '-'}`,
});
return origReplace.apply(this, args);
};
}
} catch (e: any) {
calls.push({ t: now(), name: '_WRAP_ERROR_', error: String(e?.message ?? e) });
}
// ── 3.5. Mutation log — wrap the TWO ChatStore writers (replaceMessages,
// internal_dispatchMessage) to record EVERY dbMessagesMap[key] reference
// change with a one-line "before/after last assistant message" delta. This
// reveals dispatchMessage-driven collapses that the replaceMessages wrap
// alone cannot see.
declare global {
interface Window {
__PROBE_ORIG_DISPATCH_MESSAGE?: any;
}
}
try {
const chat = w.__LOBE_STORES?.chat?.();
if (chat?.internal_dispatchMessage) {
if (!w.__PROBE_ORIG_DISPATCH_MESSAGE)
w.__PROBE_ORIG_DISPATCH_MESSAGE = chat.internal_dispatchMessage;
const origDispatch = w.__PROBE_ORIG_DISPATCH_MESSAGE;
chat.internal_dispatchMessage = origDispatch;
chat.internal_dispatchMessage = function probeDispatch(this: unknown, payload: any, ctx?: any) {
// Snapshot BEFORE — read the would-be target key + last message.
const before = (() => {
try {
const state = w.__LOBE_STORES?.chat?.();
if (!state) return null;
// Replicate state.internal_getConversationContext logic enough to
// resolve a key — but most callers pass operationId on ctx, and
// operationId-keyed lookup needs store internals. Easiest: snapshot
// ALL keys' last-assistant cLen and compare BEFORE vs AFTER below.
const map = state.dbMessagesMap ?? {};
const out: Record<string, any> = {};
for (const k of Object.keys(map)) {
const last = (map[k] ?? []).at(-1);
out[k] = last
? {
id: (last.id ?? '').slice(-8),
cLen: (last.content ?? '').length,
rLen: (last.reasoning?.content ?? '').length,
n: map[k].length,
}
: { n: 0 };
}
return out;
} catch {
return null;
}
})();
const result = origDispatch.apply(this, [payload, ctx]);
// Snapshot AFTER — find which key(s) actually changed.
try {
const state = w.__LOBE_STORES?.chat?.();
if (state && before) {
const map = state.dbMessagesMap ?? {};
for (const k of Object.keys(map)) {
const last = (map[k] ?? []).at(-1);
const beforeSnap = before[k];
const afterSnap = last
? {
id: (last.id ?? '').slice(-8),
cLen: (last.content ?? '').length,
rLen: (last.reasoning?.content ?? '').length,
n: map[k].length,
}
: { n: 0 };
const changed =
!beforeSnap ||
beforeSnap.n !== afterSnap.n ||
beforeSnap.id !== (afterSnap as any).id ||
beforeSnap.cLen !== (afterSnap as any).cLen ||
beforeSnap.rLen !== (afterSnap as any).rLen;
if (!changed) continue;
let delta = '';
if (beforeSnap?.id !== undefined && beforeSnap.id !== (afterSnap as any).id)
delta += `id:${beforeSnap.id}${(afterSnap as any).id};`;
if (
beforeSnap?.cLen !== undefined &&
(afterSnap as any).cLen !== undefined &&
(afterSnap as any).cLen < beforeSnap.cLen
)
delta += `cLen↓${beforeSnap.cLen}${(afterSnap as any).cLen};`;
if (
beforeSnap?.rLen !== undefined &&
(afterSnap as any).rLen !== undefined &&
(afterSnap as any).rLen < beforeSnap.rLen
)
delta += `rLen↓${beforeSnap.rLen}${(afterSnap as any).rLen};`;
if (beforeSnap?.n !== undefined && afterSnap.n < beforeSnap.n)
delta += `n↓${beforeSnap.n}${afterSnap.n};`;
mutations.push({
t: now(),
key: k,
n: afterSnap.n,
last: (afterSnap as any).id ? (afterSnap as any) : undefined,
prevLast: beforeSnap?.id ? beforeSnap : undefined,
delta: delta || `dispatch:${payload?.type}`,
});
}
}
} catch (e: any) {
mutations.push({
t: now(),
key: '_DISPATCH_PROBE_ERROR_',
n: -1,
delta: String(e?.message ?? e),
});
}
return result;
};
}
} catch (e: any) {
calls.push({ t: now(), name: '_DISPATCH_WRAP_ERROR_', error: String(e?.message ?? e) });
}
// ── 4. Periodic per-key timeline snapshots ─────────────────────────
function captureTimeline(): void {
try {
const c = w.__LOBE_STORES?.chat?.();
if (!c) return;
const msgsMap = (c.messagesMap ?? {}) as Record<string, any[]>;
const dbMap = (c.dbMessagesMap ?? {}) as Record<string, any[]>;
const byKey: ProbeTimelineSample['byKey'] = {};
for (const k of Object.keys(msgsMap)) {
const display = msgsMap[k] ?? [];
const db = dbMap[k] ?? [];
if (display.length === 0 && db.length === 0) continue;
byKey[k] = {
n: display.length,
dbN: db.length,
msgs: summarizeMessages(display),
};
}
const ops = Object.values((c.operations ?? {}) as Record<string, any>);
timeline.push({
t: now(),
activeTopic: ((c.activeTopicId as string | null) ?? '').slice(-10) || null,
keys: Object.keys(byKey),
byKey,
runOps: ops.filter((o: any) => o.status === 'running').length,
});
} catch (e: any) {
timeline.push({
t: now(),
activeTopic: null,
keys: [],
byKey: {},
runOps: 0,
err: e?.message ?? String(e),
});
}
}
captureTimeline();
if (w.__PROBE_TIMELINE_TIMER) clearInterval(w.__PROBE_TIMELINE_TIMER);
w.__PROBE_TIMELINE_TIMER = setInterval(captureTimeline, 200);
// ── 5. Tab-switch helpers ──────────────────────────────────────────
function listTopBarTabs(): HTMLElement[] {
return Array.from(
document.querySelectorAll<HTMLElement>(
'[data-insp-path*="TabItem.tsx"][data-contextmenu-trigger]',
),
).filter((t) => t.getBoundingClientRect().top < 30);
}
w.__listTabs = () =>
listTopBarTabs().map((t, i) => ({
i,
key: t.getAttribute('data-contextmenu-trigger'),
active: t.getAttribute('data-active') === 'true',
title: (t.innerText ?? '').slice(0, 60),
}));
w.__clickTabByKey = (key: string) => {
const tab = listTopBarTabs().find((t) => t.getAttribute('data-contextmenu-trigger') === key);
if (!tab) return 'not found: ' + key;
if (tab.getAttribute('data-active') === 'true') return 'already active: ' + key;
tab.click();
return 'clicked key=' + key;
};
w.__PROBE_EVENT = (name: string) => {
calls.push({ t: now(), name: 'MARK:' + name });
};
// `run.ts` wraps the bundle in an IIFE and appends a `return <confirmation>`
// after the bundle body — agent-browser then prints the confirmation back to
// the operator. Nothing to do here at the end of the module body.
@@ -0,0 +1,204 @@
// LobeHub chat streaming time-series probe.
//
// Inject into the renderer (via agent-browser eval) to record store + DOM
// snapshots every 200ms during a streaming session. Designed to surface
// "UI rolled back to an earlier state" symptoms — especially around
// gateway-mode tab switches that happen while the assistant is still writing.
//
// Usage:
// agent-browser --cdp 9222 eval --stdin < probe.js
// # ...do test interactions, call window.__PROBE_EVENT('LABEL') to mark moments...
// agent-browser --cdp 9222 eval --stdin < probe-dump.js > /tmp/probe.json
// node analyze.mjs /tmp/probe.json
//
// What it captures per sample:
// - activeTopicId
// - msgN: top-level messages in chat.messagesMap for this topic
// - childN: total assistantGroup.children blocks across all msgs (THIS is
// where streaming content actually lives — top-level assistantGroup stays empty)
// - cT / rT / toolT: totals across messages AND their children
// (content, reasoning, tool-call count)
// - perMsg: per-message breakdown so regressions can be located precisely
// - runOps: number of running operations (execServerAgentRuntime etc.)
// - domLen: total innerText length of the rendered chat list area
// - ind: visible UI indicators (Search pages, Crawled pages, Deeply Thought, Sending)
//
// Event markers: window.__PROBE_EVENT('NAME') records {t, name} into
// __PROBE_EVENTS, used by the analyzer to align state changes with
// user-driven actions (SENT, AWAY_1, BACK_1, ...).
(function () {
if (window.__PROBE_TIMER) clearInterval(window.__PROBE_TIMER);
window.__PROBE_SAMPLES = [];
window.__PROBE_EVENTS = [];
const t0 = Date.now();
function snapshot() {
try {
const chat = window.__LOBE_STORES.chat();
const topicId = chat.activeTopicId;
const idTail = topicId ? topicId.replace('tpc_', '') : null;
const keys = Object.keys(chat.messagesMap || {});
// Collect messages for the active topic. Before a topic is committed,
// optimistic messages live under the `<agentScope>_new` key — fall
// back to those when no topic is active yet.
let msgs = [];
if (idTail) {
keys.forEach((k) => {
if (k.includes(idTail)) msgs = msgs.concat(chat.messagesMap[k] || []);
});
} else {
keys
.filter((k) => k.endsWith('_new'))
.forEach((k) => {
msgs = msgs.concat(chat.messagesMap[k] || []);
});
}
// Walk top-level + assistantGroup.children. children carry the actual
// streamed content / reasoning / tool calls; the parent assistantGroup
// remains a placeholder (cLen=0, rLen=0) for its whole lifetime.
let totalContent = 0;
let totalReason = 0;
let totalTools = 0;
let childCount = 0;
const perMsg = msgs.map((m) => {
const cLen = (m.content || '').length;
const rLen = ((m.reasoning && m.reasoning.content) || '').length;
const tools = (m.tools || []).length;
totalContent += cLen;
totalReason += rLen;
totalTools += tools;
const children = m.children || [];
let chC = 0;
let chR = 0;
let chT = 0;
children.forEach((c) => {
chC += (c.content || '').length;
chR += ((c.reasoning && c.reasoning.content) || '').length;
chT += (c.tools || []).length;
});
totalContent += chC;
totalReason += chR;
totalTools += chT;
childCount += children.length;
return {
id: (m.id || '').slice(-8),
role: m.role,
cLen,
rLen,
tools,
chCount: children.length,
chC,
chR,
chT,
};
});
const ops = Object.values(chat.operations || {});
const runningOps = ops.filter((o) => o.status === 'running');
// DOM probe: total rendered text in the chat scroll area (proxy for
// "how much is actually visible to the user").
const convScroll =
document.querySelector(
'[data-chat-list], [class*="ChatList"], [class*="ConversationList"]',
) ||
document.querySelector('main [class*="scroll"]') ||
document.querySelector('main');
const domTxt = convScroll ? convScroll.innerText || '' : '';
const bodyTxt = document.body.innerText || '';
const searchMatches = (bodyTxt.match(/Search pages?:|Searched the web/g) || []).length;
const crawlMatches = (bodyTxt.match(/Crawl(ed|ing) pages?/g) || []).length;
window.__PROBE_SAMPLES.push({
t: Date.now() - t0,
topicId,
msgN: msgs.length,
childN: childCount,
cT: totalContent,
rT: totalReason,
toolT: totalTools,
perMsg,
runOps: runningOps.length,
runOpTypes: runningOps.map((o) => o.type),
domLen: domTxt.length,
ind: {
search: searchMatches,
crawl: crawlMatches,
sending: bodyTxt.includes('Sending message'),
deeplyThinking: bodyTxt.includes('Deeply Thinking'),
deeplyThought: bodyTxt.includes('Deeply Thought'),
},
});
} catch (e) {
window.__PROBE_SAMPLES.push({ t: Date.now() - t0, err: e.message });
}
}
snapshot();
window.__PROBE_TIMER = setInterval(snapshot, 200);
window.__PROBE_EVENT = function (name) {
window.__PROBE_EVENTS.push({ t: Date.now() - t0, name });
};
// Tab-switch helpers installed alongside the probe.
//
// The Electron tab bar mounts each tab as a div with data-insp-path
// ending in `TabItem.tsx:...`. The active tab is marked with
// data-active="true". DO NOT search by innerText — the active tab's text
// includes a ` · <agent name>` suffix that produces false matches when
// your search string happens to overlap with the agent name.
function listTabs() {
return Array.from(
document.querySelectorAll('[data-insp-path*="TabItem.tsx"][data-contextmenu-trigger]'),
).filter((t) => t.getBoundingClientRect().top < 30);
}
function tabKey(el) {
// Stable for the tab's lifetime; survives focus changes.
return el.getAttribute('data-contextmenu-trigger');
}
function findActiveTab() {
return listTabs().find((t) => t.getAttribute('data-active') === 'true') || null;
}
// Click by stable key captured earlier (preferred for round-trips).
window.__clickTabByKey = function (key) {
const tab = listTabs().find((t) => tabKey(t) === key);
if (!tab) return 'not found: key=' + key;
if (tab.getAttribute('data-active') === 'true') return 'already active: ' + key;
tab.click();
return 'clicked key=' + key;
};
// Click by index in the tab strip (0-based, left-to-right).
window.__clickTabByIndex = function (i) {
const tabs = listTabs();
if (i < 0 || i >= tabs.length) return 'index out of range: ' + i + '/' + tabs.length;
const t = tabs[i];
if (t.getAttribute('data-active') === 'true') return 'already active: i=' + i;
t.click();
return 'clicked i=' + i + ' key=' + tabKey(t);
};
// Snapshot all tabs in order: [{key, active, title (first 60 chars of innerText)}]
window.__listTabs = function () {
return listTabs().map((t, i) => ({
i,
key: tabKey(t),
active: t.getAttribute('data-active') === 'true',
title: (t.innerText || '').slice(0, 60),
}));
};
window.__activeTabKey = function () {
const a = findActiveTab();
return a ? tabKey(a) : null;
};
return 'probe installed';
})();
@@ -0,0 +1,211 @@
// CLI for the agent-gateway probe.
//
// Bundles the TS probes with esbuild, pipes them into `agent-browser eval`,
// and persists dumps under `.agent-gateway/` (gitignored) for later use as
// streaming-replay test fixtures.
//
// Commands:
// bun run .agents/skills/local-testing/scripts/agent-gateway/run.ts install
// Bundle probe-events.ts and inject into the CDP-attached browser.
// Re-installing clears all buffers and re-patches WebSocket / fetch.
//
// bun run .agents/skills/local-testing/scripts/agent-gateway/run.ts dump [name]
// Stop the timeline timer, fetch the capture as JSON, write it to
// `.agent-gateway/<name>-<YYYYMMDD-HHmmss>.json`. `name` defaults to
// `dump`. Prints the absolute path written.
//
// bun run .agents/skills/local-testing/scripts/agent-gateway/run.ts analyze [path]
// Run analyze-events.ts on the dump. `path` defaults to the most
// recently modified file in `.agent-gateway/`.
//
// Optional flags:
// --cdp <port> CDP port (default 9222)
// --browser <bin> agent-browser binary (default 'agent-browser')
import { spawn } from 'node:child_process';
import { mkdirSync, readdirSync, statSync, writeFileSync } from 'node:fs';
import path from 'node:path';
import { fileURLToPath } from 'node:url';
const SCRIPT_DIR = path.dirname(fileURLToPath(import.meta.url));
// .agents/skills/local-testing/scripts/agent-gateway/ → 5 levels up
const PROJECT_ROOT = path.resolve(SCRIPT_DIR, '../../../../..');
const DUMP_DIR = path.join(PROJECT_ROOT, '.agent-gateway');
interface Flags {
browser: string;
cdp: string;
positional: string[];
}
function parseFlags(argv: string[]): Flags {
const out: Flags = { cdp: '9222', browser: 'agent-browser', positional: [] };
for (let i = 0; i < argv.length; i++) {
const a = argv[i];
if (a === '--cdp') out.cdp = argv[++i] ?? out.cdp;
else if (a === '--browser') out.browser = argv[++i] ?? out.browser;
else out.positional.push(a);
}
return out;
}
async function bundle(entry: string): Promise<string> {
// Bun.build is built into the Bun runtime — no external dep needed.
const r = await Bun.build({
entrypoints: [path.join(SCRIPT_DIR, entry)],
target: 'browser',
format: 'esm',
minify: false,
});
if (!r.success) {
const msgs = r.logs.map((l) => `${l.level}: ${l.message}`).join('\n');
throw new Error(`bundle failed for ${entry}:\n${msgs}`);
}
return await r.outputs[0].text();
}
function wrapIife(body: string, returnExpr: string): string {
// Wrap as an IIFE that swallows the bundled top-level (top-level `const`
// declarations get scoped to the IIFE, so re-injection doesn't conflict)
// and returns the configured expression — which `agent-browser eval`
// captures and prints to stdout.
return `(() => {\n${body}\n;return ${returnExpr};\n})()`;
}
function runAgentBrowserEval(flags: Flags, script: string): Promise<string> {
return new Promise((resolveP, rejectP) => {
const child = spawn(flags.browser, ['--cdp', flags.cdp, 'eval', '--stdin'], {
stdio: ['pipe', 'pipe', 'inherit'],
});
let stdout = '';
child.stdout.on('data', (chunk: Buffer) => {
stdout += chunk.toString('utf8');
});
child.on('error', rejectP);
child.on('close', (code) => {
if (code === 0) resolveP(stdout);
else rejectP(new Error(`agent-browser exited ${code}`));
});
child.stdin.write(script);
child.stdin.end();
});
}
// agent-browser prints eval results as JSON (string values are quoted).
function unquoteAgentBrowserResult(raw: string): string {
const trimmed = raw.trim();
if (trimmed.startsWith('"') && trimmed.endsWith('"')) {
try {
return JSON.parse(trimmed) as string;
} catch {
/* fall through */
}
}
return trimmed;
}
function isoStamp(): string {
const d = new Date();
const yyyy = d.getFullYear();
const mm = String(d.getMonth() + 1).padStart(2, '0');
const dd = String(d.getDate()).padStart(2, '0');
const hh = String(d.getHours()).padStart(2, '0');
const mi = String(d.getMinutes()).padStart(2, '0');
const ss = String(d.getSeconds()).padStart(2, '0');
return `${yyyy}${mm}${dd}-${hh}${mi}${ss}`;
}
function ensureDumpDir(): void {
mkdirSync(DUMP_DIR, { recursive: true });
}
function latestDump(): string | null {
ensureDumpDir();
const entries = readdirSync(DUMP_DIR)
.filter((f) => f.endsWith('.json'))
.map((f) => ({ f, mtime: statSync(path.join(DUMP_DIR, f)).mtimeMs }))
.sort((a, b) => b.mtime - a.mtime);
return entries[0] ? path.join(DUMP_DIR, entries[0].f) : null;
}
// ── Commands ────────────────────────────────────────────────────────
async function cmdInstall(flags: Flags): Promise<void> {
const body = await bundle('probe-events.ts');
const installMsg = JSON.stringify(
'events probe installed: WebSocket+fetch interception. ' +
'WS captures operationId= sockets (gateway), fetch captures /api/agent/stream (direct).',
);
const script = wrapIife(body, installMsg);
const out = await runAgentBrowserEval(flags, script);
console.log(unquoteAgentBrowserResult(out));
}
async function cmdDump(flags: Flags): Promise<void> {
const name = flags.positional[1] ?? 'dump';
const body = await bundle('probe-dump.ts');
const script = wrapIife(body, 'window.__PROBE_LAST_DUMP_JSON');
const raw = await runAgentBrowserEval(flags, script);
const json = unquoteAgentBrowserResult(raw);
ensureDumpDir();
const filename = `${name}-${isoStamp()}.json`;
const dumpPath = path.join(DUMP_DIR, filename);
writeFileSync(dumpPath, json, 'utf8');
// Validate by parsing the meta header so we error early on bad capture
try {
const parsed = JSON.parse(json) as {
meta?: { eventCount?: number; callCount?: number; sampleCount?: number };
};
const meta = parsed.meta ?? {};
console.log(
`wrote ${dumpPath} (${json.length} bytes events=${meta.eventCount ?? '?'} ` +
`calls=${meta.callCount ?? '?'} samples=${meta.sampleCount ?? '?'})`,
);
} catch {
console.log(`wrote ${dumpPath} (${json.length} bytes — JSON.parse failed; see file)`);
}
}
async function cmdAnalyze(flags: Flags): Promise<void> {
const target = flags.positional[1] ?? latestDump();
if (!target) {
console.error('no dump file found. run `dump` first or pass a path.');
process.exit(1);
}
const child = spawn('bun', ['run', path.join(SCRIPT_DIR, 'analyze-events.ts'), target], {
stdio: 'inherit',
});
await new Promise<void>((resolveP, rejectP) => {
child.on('error', rejectP);
child.on('close', (code) => (code === 0 ? resolveP() : rejectP(new Error(`exit ${code}`))));
});
}
// ── Entry point ─────────────────────────────────────────────────────
const flags = parseFlags(process.argv.slice(2));
const cmd = flags.positional[0];
const usage = `usage:
bun run run.ts install [--cdp 9222]
bun run run.ts dump [name] [--cdp 9222]
bun run run.ts analyze [path]
`;
if (!cmd) {
console.error(usage);
process.exit(1);
}
try {
if (cmd === 'install') await cmdInstall(flags);
else if (cmd === 'dump') await cmdDump(flags);
else if (cmd === 'analyze') await cmdAnalyze(flags);
else {
console.error(`unknown command: ${cmd}\n\n${usage}`);
process.exit(1);
}
} catch (e: any) {
console.error(e?.stack ?? e);
process.exit(1);
}
@@ -0,0 +1,72 @@
// Run N round-trip tab switches with event markers timed against the probe.
//
// agent-browser --cdp 9222 eval --stdin < tab-switch.js
//
// Captures the currently-active tab as the BACK target and the rightmost
// inactive tab as the AWAY target. Both are addressed by their stable
// data-contextmenu-trigger key (NOT by visible title — the active tab's
// innerText embeds a ` · <agent name>` suffix that breaks text matching).
//
// Fires the loop in the background and returns immediately so the
// agent-browser eval doesn't have to await the full ROUND_TRIPS × DWELL_MS
// duration. Wait on the `SWITCH_LOOP_DONE` event before dumping.
//
// Refuses to launch if a previous loop is still in flight.
//
// Requires probe.js to have been installed first (provides
// window.__PROBE_EVENT / __listTabs / __clickTabByKey / __activeTabKey).
(function () {
const ROUND_TRIPS = 4;
const DWELL_MS = 10_000;
if (!window.__PROBE_EVENT || !window.__listTabs || !window.__clickTabByKey) {
return 'probe not installed — eval probe.js first';
}
if (window.__SWITCH_LOOP_RUNNING) {
return 'switch loop already running — wait for SWITCH_LOOP_DONE first';
}
const tabs = window.__listTabs();
const activeTab = tabs.find((t) => t.active);
if (!activeTab) return 'no active tab — abort';
// Pick the first inactive tab as AWAY target. With multiple inactive tabs
// you'll usually want the one that's stable across the test — feel free
// to swap to tabs[tabs.length-1] if you want the rightmost.
const inactives = tabs.filter((t) => !t.active);
if (inactives.length === 0) return 'no inactive tab to switch to — abort';
const awayTab = inactives.at(-1); // rightmost inactive
const BACK_KEY = activeTab.key;
const AWAY_KEY = awayTab.key;
window.__SWITCH_LOOP_RUNNING = true;
window.__PROBE_EVENT('SWITCH_LOOP_CONFIG:back=' + BACK_KEY + ',away=' + AWAY_KEY);
(async function () {
function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
try {
window.__PROBE_EVENT('SWITCH_LOOP_START');
for (let i = 1; i <= ROUND_TRIPS; i++) {
window.__PROBE_EVENT('AWAY_' + i);
const awayResult = window.__clickTabByKey(AWAY_KEY);
window.__PROBE_EVENT('AWAY_' + i + '_RES:' + awayResult.slice(0, 50));
await sleep(DWELL_MS);
window.__PROBE_EVENT('BACK_' + i);
const backResult = window.__clickTabByKey(BACK_KEY);
window.__PROBE_EVENT('BACK_' + i + '_RES:' + backResult.slice(0, 50));
await sleep(DWELL_MS);
}
window.__PROBE_EVENT('SWITCH_LOOP_DONE');
} finally {
window.__SWITCH_LOOP_RUNNING = false;
}
})();
return 'switch loop kicked off (BACK=' + BACK_KEY + ', AWAY=' + AWAY_KEY + ')';
})();
@@ -0,0 +1,113 @@
// Shared types between the in-browser probe and the Node-side analyzer.
// Kept tiny on purpose — anything the analyzer can re-derive is left off.
export interface ProbeStreamEvent {
/** Summarized payload — long strings truncated, arrays printed as Array(N) */
data?: Record<string, unknown>;
/** Keys present on the event's `data` payload — useful at a glance */
dataKeys?: string[];
/** ServerMessage.id — gateway WS frames carry an event-id we may resume from */
eventId?: string | null;
message?: string;
/** Last 10 chars of the operationId (full id is excessively long) */
opIdTail: string;
raw?: string;
/** Raw frame byte length, when applicable */
rawLen?: number;
/** For non-agent_event server frames (auth_success, heartbeat_ack, …) */
serverType?: string;
sseEvent?: string;
status?: number;
stepIndex?: number;
/** Milliseconds since the probe's t0 (install time). */
t: number;
/** 'ws' for gateway WebSocket frames, 'sse' for direct /api/agent/stream */
transport: 'ws' | 'sse';
/** Either the AgentStreamEvent.type, or a probe sentinel like `_WS_OPEN_` */
type: string;
url?: string;
}
export interface ProbeActionCall {
args?: {
count?: number;
context?: unknown;
params?: unknown;
};
error?: string;
/** `replaceMessages` / `refreshMessages` / `MARK:<label>` / `_WRAP_ERROR_` */
name: string;
stack?: string;
t: number;
}
export interface ProbeMessageSummary {
/** children.length */
chN: number;
/** content.length */
cLen: number;
/** Last 8 chars of the message id */
id: string;
/** reasoning.content.length */
rLen: number;
role: string;
/** tools.length */
tools: number;
}
export interface ProbeTimelineSample {
/** Last 10 chars of activeTopicId, or null */
activeTopic: string | null;
/** Per-key breakdown: display count, db count, message summaries */
byKey: Record<
string,
{
n: number;
dbN: number;
msgs: ProbeMessageSummary[];
}
>;
err?: string;
/** All messagesMap keys that have content at this moment */
keys: string[];
/** Number of operations in 'running' status */
runOps: number;
t: number;
}
export interface ProbeDumpMeta {
callCount: number;
/** Date.now() at dump call */
collectedAt: number;
eventCount: number;
sampleCount: number;
/** Date.now() at probe install */
t0: number;
}
export interface ProbeDump {
actionCalls: ProbeActionCall[];
meta: ProbeDumpMeta;
streamEvents: ProbeStreamEvent[];
timeline: ProbeTimelineSample[];
}
/**
* Globals the probe attaches to `window`. Keeps `as any` casts at the boundary
* instead of sprinkling them through the probe body.
*/
declare global {
interface Window {
__clickTabByKey?: (key: string) => string;
__listTabs?: () => Array<{ i: number; key: string | null; active: boolean; title: string }>;
__LOBE_STORES?: Record<string, () => any>;
__PROBE_ACTION_CALLS?: ProbeActionCall[];
__PROBE_EVENT?: (label: string) => void;
__PROBE_MSG_TIMELINE?: ProbeTimelineSample[];
__PROBE_ORIG_FETCH?: typeof fetch;
__PROBE_ORIG_WEBSOCKET?: typeof WebSocket;
__PROBE_STREAM_EVENTS?: ProbeStreamEvent[];
__PROBE_T0?: number;
__PROBE_TIMELINE_TIMER?: ReturnType<typeof setInterval> | null;
}
}
+3
View File
@@ -28,6 +28,9 @@ prd
# Recordings
.records/
# Agent-gateway probe captures (local debugging dumps)
.agent-gateway/
# Temporary files
.temp/
temp/
@@ -172,36 +172,25 @@ export default class GatewayConnectionCtr extends ControllerModule {
request: AgentRunRequestMessage,
): Promise<{ reason?: string; status: 'accepted' | 'rejected' }> {
try {
const ctr = this.heterogeneousAgentCtr;
const serverUrl = await this.remoteServerConfigCtr.getRemoteServerUrl();
if (!serverUrl) {
return { reason: 'Remote server URL not configured', status: 'rejected' };
}
// Map agentType to binary name.
// claude-code → `claude` CLI; all other platforms use their type name as the binary.
const command = request.agentType === 'claude-code' ? 'claude' : request.agentType;
// Create a session for the hetero agent.
const { sessionId } = await ctr.startSession({
// Fire-and-forget: lh hetero exec handles spawn -> adapt ->
// BatchIngester -> heteroIngest/heteroFinish -> server -> Gateway -> clients.
// Same command as spawnHeteroSandbox() on the server side.
this.heterogeneousAgentCtr.spawnLhHeteroExec({
agentType: request.agentType,
args: [],
command,
cwd: request.cwd,
// Inject LOBEHUB_JWT so the CLI authenticates against heteroIngest.
env: { LOBEHUB_JWT: request.jwt },
jwt: request.jwt,
operationId: request.operationId,
prompt: request.prompt,
resumeSessionId: request.resumeSessionId,
serverUrl,
topicId: request.topicId,
});
// Fire-and-forget: sendPrompt runs the CLI until completion.
ctr
.sendPrompt({
operationId: request.operationId,
prompt: request.prompt,
sessionId,
})
.catch((err: Error) => {
// Errors are surfaced via heteroFinish on the server side.
// Log locally for desktop debugging only.
console.error('[GatewayConnectionCtr] agent run failed:', err.message);
});
return { status: 'accepted' };
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
@@ -1251,4 +1251,69 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
process.on('SIGTERM', onSignal);
process.on('SIGINT', onSignal);
}
/**
* Spawn `lh hetero exec` for gateway-driven agent runs.
* The `lh` CLI handles everything downstream — no local
* AgentStreamPipeline or IPC broadcast needed. Mirrors
* `spawnHeteroSandbox()` on the server side.
*/
spawnLhHeteroExec(params: {
agentType: string;
cwd?: string;
jwt: string;
operationId: string;
prompt: string;
resumeSessionId?: string;
serverUrl: string;
topicId: string;
}): void {
const { agentType, cwd, jwt, operationId, prompt, resumeSessionId, serverUrl, topicId } =
params;
const workDir = cwd ?? process.cwd();
const args = [
'hetero',
'exec',
'--type',
agentType,
'--operation-id',
operationId,
'--topic',
topicId,
'--render',
'none',
'--input-json',
'-',
'--cwd',
workDir,
...(resumeSessionId ? ['--resume', resumeSessionId] : []),
];
const env = {
...process.env,
...buildProxyEnv(this.app.storeManager.get('networkProxy')),
LOBEHUB_JWT: jwt,
LOBEHUB_SERVER: serverUrl,
};
logger.info('spawnLhHeteroExec: type=%s op=%s topic=%s', agentType, operationId, topicId);
const child = spawn('lh', args, {
cwd: workDir,
env,
stdio: ['pipe', 'inherit', 'inherit'],
});
child.stdin.write(JSON.stringify(prompt));
child.stdin.end();
child.on('error', (err) => {
logger.error('spawnLhHeteroExec: spawn failed — %s', err.message);
});
child.on('exit', (code, signal) => {
logger.info('spawnLhHeteroExec: exited — op=%s code=%s signal=%s', operationId, code, signal);
});
}
}
@@ -200,11 +200,13 @@ const mockShellCommandCtr = {
const mockHeterogeneousAgentCtr = {
sendPrompt: vi.fn().mockResolvedValue(undefined),
spawnLhHeteroExec: vi.fn(),
startSession: vi.fn().mockResolvedValue({ sessionId: 'mock-session-id' }),
} as unknown as HeterogeneousAgentCtr;
const mockRemoteServerConfigCtr = {
getAccessToken: vi.fn().mockResolvedValue('mock-access-token'),
getRemoteServerUrl: vi.fn().mockResolvedValue('https://server.example.com'),
isRemoteServerConfigured: vi.fn().mockResolvedValue(true),
refreshAccessToken: vi.fn().mockResolvedValue({ success: true }),
} as unknown as RemoteServerConfigCtr;
@@ -631,26 +633,23 @@ describe('GatewayConnectionCtr', () => {
}
beforeEach(() => {
vi.mocked(mockHeterogeneousAgentCtr.startSession).mockClear();
vi.mocked(mockHeterogeneousAgentCtr.sendPrompt).mockClear();
vi.mocked(mockHeterogeneousAgentCtr.spawnLhHeteroExec).mockClear();
});
it.each([
['openclaw', 'openclaw'],
['hermes', 'hermes'],
['codex', 'codex'],
['claude-code', 'claude'],
] as const)('uses command "%s" for agentType "%s"', async (agentType, expectedCommand) => {
const client = await connectAndOpen();
client.simulateAgentRunRequest(agentType);
await vi.advanceTimersByTimeAsync(0);
it.each(['openclaw', 'hermes', 'codex', 'claude-code'] as const)(
'forwards agentType "%s" to spawnLhHeteroExec',
async (agentType) => {
const client = await connectAndOpen();
client.simulateAgentRunRequest(agentType);
await vi.advanceTimersByTimeAsync(0);
expect(mockHeterogeneousAgentCtr.startSession).toHaveBeenCalledWith(
expect.objectContaining({ agentType, command: expectedCommand }),
);
});
expect(mockHeterogeneousAgentCtr.spawnLhHeteroExec).toHaveBeenCalledWith(
expect.objectContaining({ agentType }),
);
},
);
it('sends accepted ack and fires sendPrompt', async () => {
it('sends accepted ack and spawns lh hetero exec', async () => {
const client = await connectAndOpen();
client.simulateAgentRunRequest('openclaw', 'op-xyz');
await vi.advanceTimersByTimeAsync(0);
@@ -659,15 +658,37 @@ describe('GatewayConnectionCtr', () => {
operationId: 'op-xyz',
status: 'accepted',
});
expect(mockHeterogeneousAgentCtr.sendPrompt).toHaveBeenCalledWith(
expect.objectContaining({ operationId: 'op-xyz', sessionId: 'mock-session-id' }),
expect(mockHeterogeneousAgentCtr.spawnLhHeteroExec).toHaveBeenCalledWith(
expect.objectContaining({
agentType: 'openclaw',
jwt: 'mock-jwt',
operationId: 'op-xyz',
prompt: 'hello',
serverUrl: 'https://server.example.com',
topicId: 'topic-1',
}),
);
});
it('sends rejected ack when startSession throws', async () => {
vi.mocked(mockHeterogeneousAgentCtr.startSession).mockRejectedValueOnce(
new Error('binary not found'),
);
it('sends rejected ack when remote server URL is not configured', async () => {
vi.mocked(mockRemoteServerConfigCtr.getRemoteServerUrl).mockResolvedValueOnce('');
const client = await connectAndOpen();
client.simulateAgentRunRequest('openclaw', 'op-fail');
await vi.advanceTimersByTimeAsync(0);
expect(client.sendAgentRunAck).toHaveBeenCalledWith({
operationId: 'op-fail',
reason: 'Remote server URL not configured',
status: 'rejected',
});
expect(mockHeterogeneousAgentCtr.spawnLhHeteroExec).not.toHaveBeenCalled();
});
it('sends rejected ack when spawnLhHeteroExec throws', async () => {
vi.mocked(mockHeterogeneousAgentCtr.spawnLhHeteroExec).mockImplementationOnce(() => {
throw new Error('binary not found');
});
const client = await connectAndOpen();
client.simulateAgentRunRequest('openclaw', 'op-fail');
+3
View File
@@ -34,6 +34,9 @@ export const DEFAULT_AGENT_CHAT_CONFIG: LobeAgentChatConfig = {
reasoningBudgetToken: 1024,
searchFCModel: DEFAULT_AGENT_SEARCH_FC_MODEL,
searchMode: 'auto',
selfIteration: {
enabled: false,
},
};
export const DEFAULT_AGENT_CONFIG: LobeAgentConfig = {
@@ -0,0 +1,18 @@
import { describe, expect, it } from 'vitest';
import { DEFAULT_AGENT_CONFIG } from '@/const/settings';
import { type Store } from './action';
import { selectors } from './selectors';
describe('AgentSetting selectors', () => {
describe('currentChatConfig', () => {
it('should include disabled self iteration by default', () => {
const state = {
config: DEFAULT_AGENT_CONFIG,
} as Store;
expect(selectors.currentChatConfig(state).selfIteration).toEqual({ enabled: false });
});
});
});
+8 -1
View File
@@ -7,6 +7,7 @@ import { useFetchAgentDocuments } from '@/hooks/useFetchAgentDocuments';
import { useFetchTopicMemories } from '@/hooks/useFetchMemoryForTopic';
import { useFetchNotebookDocuments } from '@/hooks/useFetchNotebookDocuments';
import { useChatStore } from '@/store/chat';
import { operationSelectors } from '@/store/chat/selectors';
import { featureFlagsSelectors, useServerConfigStore } from '@/store/serverConfig';
import { useUserStore } from '@/store/user';
import { settingsSelectors } from '@/store/user/selectors';
@@ -84,8 +85,14 @@ const ChatList = memo<ChatListProps>(
s.useFetchMessages,
]);
const activeAgentId = useChatStore((s) => s.activeAgentId);
// Suppress SWR focus revalidate while the current topic is streaming —
// the server-pushed UIChatMessage[] snapshot at step boundaries is the
// source of truth during that window. A focus refetch could hit DB
// mid-fan-out and clobber the in-memory streamed state with a stale
// assistant placeholder.
const isStreaming = useChatStore(operationSelectors.isAgentRuntimeRunningByContext(context));
const { enableAgentSelfIteration } = useServerConfigStore(featureFlagsSelectors);
useFetchMessages(context, skipFetch);
useFetchMessages(context, { revalidateOnFocus: !isStreaming, skipFetch });
const displayMessages = useConversationStore(dataSelectors.displayMessages);
const displayMessageIds = useConversationStore(dataSelectors.displayMessageIds);
const latestMessageId = displayMessageIds.at(-1);
@@ -9,7 +9,9 @@ import ContentBlocksScroll from './ContentBlocksScroll';
import type { RenderableAssistantContentBlock } from './types';
vi.mock('@lobehub/ui', () => ({
Flexbox: ({ children }: { children?: ReactNode }) => <div>{children}</div>,
Flexbox: ({ children, gap }: { children?: ReactNode; gap?: number }) => (
<div data-gap={gap}>{children}</div>
),
ScrollArea: ({ children }: { children?: ReactNode }) => <div>{children}</div>,
}));
@@ -62,4 +64,20 @@ describe('ContentBlocksScroll', () => {
'true',
);
});
it('uses a consistent gap between workflow blocks', () => {
const { container } = render(
<ContentBlocksScroll
assistantId="assistant-1"
blocks={[
{ content: 'first workflow block', id: 'block-1' },
{ content: 'second workflow block', id: 'block-2' },
]}
scroll={false}
variant="workflow"
/>,
);
expect(container.querySelector('[data-gap="8"]')).toBeInTheDocument();
});
});
@@ -68,7 +68,7 @@ const ContentBlocksScroll = memo<ContentBlocksScrollProps>((props) => {
}, [assistantIdFromProps, blocksFromProps, messagesList]);
const list = (
<Flexbox>
<Flexbox gap={variant === 'workflow' ? 8 : undefined}>
{blocks.map((block) => (
<ContentBlock
key={block.renderKey ?? block.id}
@@ -76,7 +76,7 @@ const ClientTaskItem = memo<ClientTaskItemProps>(({ item }) => {
);
// Fetch thread messages (skip when executing - messages come from real-time updates)
useFetchMessages(threadContext, isProcessing);
useFetchMessages(threadContext, { skipFetch: isProcessing });
// Get thread messages from store using selector
const threadMessages = useChatStore((s) =>
@@ -59,7 +59,7 @@ export const useClientTaskStats = ({
);
// Fetch thread messages (skip when disabled or no threadId)
useFetchMessages(threadContext, !enabled || !threadId);
useFetchMessages(threadContext, { skipFetch: !enabled || !threadId });
// Get thread messages from store using selector
const threadMessages = useChatStore((s) =>
@@ -54,7 +54,7 @@ const ClientTaskDetail = memo<ClientTaskDetailProps>(
);
// Fetch thread messages (skip when executing - messages come from real-time updates)
useFetchMessages(threadContext, isExecuting);
useFetchMessages(threadContext, { skipFetch: isExecuting });
// Get thread messages from store using selector
const threadMessages = useChatStore((s) =>
@@ -61,7 +61,7 @@ const ClientTaskItem = memo<ClientTaskItemProps>(({ item }) => {
);
// Fetch thread messages (skip when executing - messages come from real-time updates)
useFetchMessages(threadContext, isProcessing);
useFetchMessages(threadContext, { skipFetch: isProcessing });
// Get thread messages from store using selector
const threadMessages = useChatStore((s) =>
@@ -59,7 +59,7 @@ export const useClientTaskStats = ({
);
// Fetch thread messages (skip when disabled or no threadId)
useFetchMessages(threadContext, !enabled || !threadId);
useFetchMessages(threadContext, { skipFetch: !enabled || !threadId });
// Get thread messages from store using selector
const threadMessages = useChatStore((s) =>
@@ -1,4 +1,7 @@
import { HETEROGENEOUS_TYPE_LABELS } from '@lobechat/heterogeneous-agents';
import {
HETEROGENEOUS_TYPE_LABELS,
isRemoteHeterogeneousType,
} from '@lobechat/heterogeneous-agents';
import { type ModelPerformance, type ModelUsage } from '@lobechat/types';
import { ModelIcon } from '@lobehub/icons';
import { Center, Flexbox } from '@lobehub/ui';
@@ -33,7 +36,14 @@ const Usage = memo<UsageProps>(({ model, usage, performance, provider }) => {
if (!isDev && onboardingAgentId && conversationAgentId === onboardingAgentId) return null;
const heteroName = provider ? HETEROGENEOUS_TYPE_LABELS[provider] : undefined;
// Only remote platform agents (openclaw, hermes) replace the model name with
// the brand label — they don't expose a real model id. Local CLI agents
// (claude-code, codex) report their actual model on `turn_metadata` and
// should keep showing it.
const heteroName =
provider && isRemoteHeterogeneousType(provider)
? HETEROGENEOUS_TYPE_LABELS[provider]
: undefined;
return (
<Flexbox
@@ -6,6 +6,8 @@ import { type StateCreator } from 'zustand/vanilla';
import { useClientDataSWRWithSync } from '@/libs/swr';
import { messageService } from '@/services/message';
import { getChatStoreState } from '@/store/chat';
import { operationSelectors } from '@/store/chat/selectors';
import { messageMapKey } from '@/store/chat/utils/messageMapKey';
import { type Store as ConversationStore } from '../../action';
@@ -67,14 +69,17 @@ export interface DataAction {
switchMessageBranch: (messageId: string, branchIndex: number) => Promise<void>;
/**
* Fetch messages for this conversation using SWR
* Fetch messages for this conversation using SWR.
*
* @param context - Conversation context with sessionId and topicId
* @param skipFetch - When true, SWR key is null and no fetch occurs
* @param options.skipFetch - When true, SWR key is null and no fetch occurs
* @param options.revalidateOnFocus - Override SWR's default focus revalidate.
* Pass `false` while a streaming flow owns the in-memory message state so
* a focus refetch doesn't clobber it with a stale DB snapshot.
*/
useFetchMessages: (
context: ConversationContext,
skipFetch?: boolean,
options?: { revalidateOnFocus?: boolean; skipFetch?: boolean },
) => SWRResponse<UIChatMessage[]>;
}
@@ -184,7 +189,8 @@ export const dataSlice: StateCreator<
await state.updateMessageMetadata(message.parentId, { activeBranchIndex: branchIndex });
},
useFetchMessages: (context, skipFetch) => {
useFetchMessages: (context, options) => {
const { skipFetch, revalidateOnFocus } = options ?? {};
// When skipFetch is true, SWR key is null - no fetch occurs
// This is used when external messages are provided (e.g., creating new thread)
// Also skip fetch when topicId is null (new conversation state) - there's no server data,
@@ -206,10 +212,27 @@ export const dataSlice: StateCreator<
() => messageService.getMessages(context),
{
...(revalidateOnFocus !== undefined && { revalidateOnFocus }),
onData: (data) => {
if (!data) return;
if (!context.topicId) return;
// Defense-in-depth gate (LOBE-9501): drop any SWR onData while the
// topic is streaming. DB fan-out for chunk writes is async and lags
// the WS push by anywhere from 100ms to several seconds; an SWR
// refetch that lands inside that window returns the assistant row
// as the LOADING_FLAT placeholder (cLen=3) and would collapse the
// in-memory streamed content. SWR's own cache still receives the
// value, so once streaming ends a normal revalidate writes through.
//
// This is the catch-all backstop sitting BELOW the SoT consumption
// in gatewayEventHandler — `mergeFetchedMessagesWithLocalState`'s
// updatedAt tie-breaker handles most cases on its own, but the
// updatedAt comparison degenerates when server's pushed snapshot
// carries a DB updatedAt equal to a later stale fetch's row.
if (operationSelectors.isAgentRuntimeRunningByContext(context)(getChatStoreState()))
return;
const prevDbMessages = get().dbMessages;
const mergedMessages = mergeFetchedMessagesWithLocalState(data, prevDbMessages);
const storeContextKey = messageMapKey(get().context);
@@ -64,7 +64,7 @@ const ShareDataProvider = memo<PropsWithChildren<ShareDataProviderProps>>(
}, [activeAgentId, activeGroupId, activeThreadId, activeTopicId, context]);
const shouldSkipFetch = !resolvedContext.agentId || !resolvedContext.topicId;
const { isLoading } = useFetchMessages(resolvedContext, shouldSkipFetch);
const { isLoading } = useFetchMessages(resolvedContext, { skipFetch: shouldSkipFetch });
const messageKey = useMemo(() => {
if (!resolvedContext.agentId) return undefined;
+15 -11
View File
@@ -18,6 +18,8 @@ import { LOBE_THEME_NEUTRAL_COLOR, LOBE_THEME_PRIMARY_COLOR } from '@/const/them
import { isDesktop } from '@/const/version';
import { useIsDark } from '@/hooks/useIsDark';
import { getUILocaleAndResources } from '@/libs/getUILocaleAndResources';
import type { UILocaleResources } from '@/libs/getUILocaleAndResources.utils';
import { resolveUILocale } from '@/libs/getUILocaleAndResources.utils';
import Image from '@/libs/next/Image';
import { useGlobalStore } from '@/store/global';
import { systemStatusSelectors } from '@/store/global/selectors';
@@ -115,20 +117,22 @@ const AppTheme = memo<AppThemeProps>(
[messageTop],
);
const [uiResources, setUIResources] = useState<any>(null);
const uiLocale = useMemo(() => {
if (language.startsWith('zh')) return 'zh-CN';
if (language.startsWith('en')) return 'en-US';
return 'en-US';
}, [language]);
const [uiResources, setUIResources] = useState<UILocaleResources>();
const [uiLocale, setUILocale] = useState(() => resolveUILocale(language).uiLocale);
useEffect(() => {
let mounted = true;
getUILocaleAndResources(language).then(({ resources }) => {
if (mounted) {
setUIResources(resources);
}
});
setUILocale(resolveUILocale(language).uiLocale);
getUILocaleAndResources(language)
.then(({ locale, resources }) => {
if (mounted) {
setUILocale(locale);
setUIResources(resources);
}
})
.catch((error) => {
console.error('Failed to load UI locale resources:', error);
});
return () => {
mounted = false;
};
+16 -18
View File
@@ -1,24 +1,23 @@
import { en, zhCn } from '@lobehub/ui/es/i18n/resources/index';
import { normalizeLocale } from '@/locales/resources';
type UILocaleResources = Record<string, Record<string, string>>;
import type { UILocaleResourceInput, UILocaleResources } from './getUILocaleAndResources.utils';
import {
mergeUILocaleResources,
normalizeUILocaleResources,
resolveUILocale,
} from './getUILocaleAndResources.utils';
// eager: true — UI locale fully inlined at build time
const uiLocaleModules = import.meta.glob<{ default: UILocaleResources }>('/locales/*/ui.json', {
const uiLocaleModules = import.meta.glob<{ default: UILocaleResourceInput }>('/locales/*/ui.json', {
eager: true,
});
const getUILocale = (locale: string): string => {
if (locale.startsWith('zh')) return 'zh-CN';
if (locale.startsWith('en')) return 'en-US';
return locale;
};
const loadBusinessResources = (locale: string): UILocaleResources | null => {
const key = `/locales/${locale}/ui.json`;
const mod = uiLocaleModules[key];
return mod ? (mod.default as UILocaleResources) : null;
const resources = mod?.default as UILocaleResourceInput | null | undefined;
return resources ? normalizeUILocaleResources(resources) : null;
};
const loadLobeUIBuiltinResources = (locale: string): UILocaleResources | null => {
@@ -29,15 +28,14 @@ const loadLobeUIBuiltinResources = (locale: string): UILocaleResources | null =>
export const getUILocaleAndResources = async (
locale: string | 'auto',
): Promise<{ locale: string; resources: UILocaleResources }> => {
const effectiveLocale = locale === 'auto' ? 'en-US' : locale;
const normalizedLocale = normalizeLocale(effectiveLocale);
const uiLocale = getUILocale(normalizedLocale);
const { normalizedLocale, uiLocale } = resolveUILocale(locale);
const resources =
loadBusinessResources(normalizedLocale) ??
loadLobeUIBuiltinResources(normalizedLocale) ??
loadBusinessResources('en-US') ??
loadLobeUIBuiltinResources('en-US');
mergeUILocaleResources(
loadLobeUIBuiltinResources(normalizedLocale),
loadBusinessResources(normalizedLocale),
) ??
mergeUILocaleResources(loadLobeUIBuiltinResources('en-US'), loadBusinessResources('en-US'));
if (!resources)
throw new Error(
+41 -4
View File
@@ -2,6 +2,11 @@ import { describe, expect, it, vi } from 'vitest';
import { getUILocaleAndResources } from './getUILocaleAndResources';
const translateFromUILocaleResources = (
resources: Record<string, Record<string, string>>,
key: string,
) => Object.assign({}, ...Object.values(resources))[key];
describe('getUILocaleAndResources', () => {
it('should return zh-CN locale and zhCn resources for zh-CN', async () => {
const result = await getUILocaleAndResources('zh-CN');
@@ -9,6 +14,30 @@ describe('getUILocaleAndResources', () => {
expect(result.resources).toBeDefined();
});
it('should normalize business ui.json into a @lobehub/ui consumable resource map', async () => {
const result = await getUILocaleAndResources('zh-CN');
expect(translateFromUILocaleResources(result.resources, 'form.submit')).toBe('提交');
});
it('should merge built-in resources with partial business ui.json resources', async () => {
const result = await getUILocaleAndResources('zh-CN');
expect(translateFromUILocaleResources(result.resources, 'image.copy')).toBe('复制');
expect(translateFromUILocaleResources(result.resources, 'hotkey.clear')).toBe('清除绑定');
expect(translateFromUILocaleResources(result.resources, 'form.submit')).toBe('提交');
});
it('should merge en built-in fallback resources for non-en/zh partial business ui.json resources', async () => {
const result = await getUILocaleAndResources('de-DE');
expect(result.locale).toBe('de-DE');
expect(translateFromUILocaleResources(result.resources, 'image.copy')).toBe('Copy');
expect(translateFromUILocaleResources(result.resources, 'hotkey.clear')).toBe('Clear binding');
expect(translateFromUILocaleResources(result.resources, 'common.empty')).toBe('(empty)');
expect(translateFromUILocaleResources(result.resources, 'form.submit')).toBe('Absenden');
});
it('should return zh-CN locale and zhCn resources for zh-TW', async () => {
const result = await getUILocaleAndResources('zh-TW');
expect(result.locale).toBe('zh-CN');
@@ -27,10 +56,18 @@ describe('getUILocaleAndResources', () => {
expect(result.resources).toBeDefined();
});
it('should return en-US locale and en resources for auto', async () => {
const result = await getUILocaleAndResources('auto');
expect(result.locale).toBe('en-US');
expect(result.resources).toBeDefined();
it('should resolve auto from the current document language', async () => {
const previousLang = document.documentElement.lang;
document.documentElement.lang = 'zh-CN';
try {
const result = await getUILocaleAndResources('auto');
expect(result.locale).toBe('zh-CN');
expect(translateFromUILocaleResources(result.resources, 'form.submit')).toBe('提交');
} finally {
document.documentElement.lang = previousLang;
}
});
it('should return ar locale and custom resources for ar', async () => {
+18 -21
View File
@@ -1,17 +1,16 @@
import { normalizeLocale } from '@/locales/resources';
type UILocaleResources = Record<string, Record<string, string>>;
const getUILocale = (locale: string): string => {
if (locale.startsWith('zh')) return 'zh-CN';
if (locale.startsWith('en')) return 'en-US';
return locale;
};
import type { UILocaleResourceInput, UILocaleResources } from './getUILocaleAndResources.utils';
import {
mergeUILocaleResources,
normalizeUILocaleResources,
resolveUILocale,
} from './getUILocaleAndResources.utils';
const loadBusinessResources = async (locale: string): Promise<UILocaleResources | null> => {
try {
const resourcesModule = await import(`@/../locales/${locale}/ui.json`);
return resourcesModule.default as UILocaleResources;
const resources = resourcesModule.default as UILocaleResourceInput | null;
return resources ? normalizeUILocaleResources(resources) : null;
} catch {
return null;
}
@@ -31,19 +30,17 @@ const loadLobeUIBuiltinResources = async (locale: string): Promise<UILocaleResou
export const getUILocaleAndResources = async (
locale: string | 'auto',
): Promise<{ locale: string; resources: UILocaleResources }> => {
const effectiveLocale = locale === 'auto' ? 'en-US' : locale;
const normalizedLocale = normalizeLocale(effectiveLocale);
const uiLocale = getUILocale(normalizedLocale);
const { normalizedLocale, uiLocale } = resolveUILocale(locale);
// Priority:
// 1) business-defined ui.json
// 2) @lobehub/ui built-in resources (en/zh)
// 3) fallback to default en
const resources =
(await loadBusinessResources(normalizedLocale)) ??
(await loadLobeUIBuiltinResources(normalizedLocale)) ??
(await loadBusinessResources('en-US')) ??
(await loadLobeUIBuiltinResources('en-US'));
mergeUILocaleResources(
await loadLobeUIBuiltinResources(normalizedLocale),
await loadBusinessResources(normalizedLocale),
) ??
mergeUILocaleResources(
await loadLobeUIBuiltinResources('en-US'),
await loadBusinessResources('en-US'),
);
if (!resources)
throw new Error(
+60
View File
@@ -0,0 +1,60 @@
import { DEFAULT_LANG } from '@/const/locale';
import { normalizeLocale } from '@/locales/resources';
export type UILocaleResourceBundle = Record<string, string>;
export type UILocaleResources = Record<string, UILocaleResourceBundle>;
export type UILocaleResourceInput = UILocaleResourceBundle | UILocaleResources;
const getDocumentLocale = () => {
if (typeof document === 'undefined') return;
return document.documentElement.lang || undefined;
};
const getNavigatorLocale = () => {
if (typeof navigator === 'undefined') return;
return navigator.language || undefined;
};
const getUILocale = (locale: string): string => {
if (locale.startsWith('zh')) return 'zh-CN';
if (locale.startsWith('en')) return 'en-US';
return locale;
};
const isFlatUILocaleResources = (
resources: UILocaleResourceInput,
): resources is UILocaleResourceBundle =>
Object.values(resources).every((value) => typeof value === 'string');
const flattenUILocaleResources = (resources: UILocaleResourceInput): UILocaleResourceBundle =>
isFlatUILocaleResources(resources) ? resources : Object.assign({}, ...Object.values(resources));
export const normalizeUILocaleResources = (
resources: UILocaleResourceInput,
): UILocaleResources => ({
app: flattenUILocaleResources(resources),
});
export const mergeUILocaleResources = (
...resourcesList: (UILocaleResourceInput | null)[]
): UILocaleResources | null => {
const mergedResources = Object.assign(
{},
...resourcesList.filter(Boolean).map((resources) => flattenUILocaleResources(resources!)),
);
return Object.keys(mergedResources).length > 0 ? { app: mergedResources } : null;
};
export const resolveUILocale = (locale: string | 'auto') => {
const effectiveLocale =
locale === 'auto' ? (getDocumentLocale() ?? getNavigatorLocale() ?? DEFAULT_LANG) : locale;
const normalizedLocale = normalizeLocale(effectiveLocale);
return {
normalizedLocale,
uiLocale: getUILocale(normalizedLocale),
};
};
+19 -18
View File
@@ -1,14 +1,11 @@
import { normalizeLocale } from '@/locales/resources';
import type { UILocaleResourceInput, UILocaleResources } from './getUILocaleAndResources.utils';
import {
mergeUILocaleResources,
normalizeUILocaleResources,
resolveUILocale,
} from './getUILocaleAndResources.utils';
type UILocaleResources = Record<string, Record<string, string>>;
const uiLocaleLoaders = import.meta.glob<{ default: UILocaleResources }>('/locales/*/ui.json');
const getUILocale = (locale: string): string => {
if (locale.startsWith('zh')) return 'zh-CN';
if (locale.startsWith('en')) return 'en-US';
return locale;
};
const uiLocaleLoaders = import.meta.glob<{ default: UILocaleResourceInput }>('/locales/*/ui.json');
const loadBusinessResources = async (locale: string): Promise<UILocaleResources | null> => {
const key = `/locales/${locale}/ui.json`;
@@ -16,7 +13,9 @@ const loadBusinessResources = async (locale: string): Promise<UILocaleResources
if (!loader) return null;
try {
const mod = await loader();
return mod.default as UILocaleResources;
const resources = mod.default as UILocaleResourceInput | null;
return resources ? normalizeUILocaleResources(resources) : null;
} catch {
return null;
}
@@ -36,15 +35,17 @@ const loadLobeUIBuiltinResources = async (locale: string): Promise<UILocaleResou
export const getUILocaleAndResources = async (
locale: string | 'auto',
): Promise<{ locale: string; resources: UILocaleResources }> => {
const effectiveLocale = locale === 'auto' ? 'en-US' : locale;
const normalizedLocale = normalizeLocale(effectiveLocale);
const uiLocale = getUILocale(normalizedLocale);
const { normalizedLocale, uiLocale } = resolveUILocale(locale);
const resources =
(await loadBusinessResources(normalizedLocale)) ??
(await loadLobeUIBuiltinResources(normalizedLocale)) ??
(await loadBusinessResources('en-US')) ??
(await loadLobeUIBuiltinResources('en-US'));
mergeUILocaleResources(
await loadLobeUIBuiltinResources(normalizedLocale),
await loadBusinessResources(normalizedLocale),
) ??
mergeUILocaleResources(
await loadLobeUIBuiltinResources('en-US'),
await loadBusinessResources('en-US'),
);
if (!resources)
throw new Error(
+2
View File
@@ -15,6 +15,8 @@ const lobeHubOnlineModelLocales = {
'grok-4.20-beta-0309-non-reasoning.description': 'A non-reasoning variant for simple use cases',
'MiniMax-M2.1-Lightning.description':
'Powerful multilingual programming capabilities with faster and more efficient inference.',
'qwen3.7-max.description':
"Qwen3.7-Max is Alibaba Cloud's flagship agent-era model for complex coding, reasoning, office automation, and long-horizon autonomous workflows.",
'seedream-5-0-260128.description':
'ByteDance-Seedream-5.0-lite by BytePlus features web-retrieval-augmented generation for real-time information, enhanced complex prompt interpretation, and improved reference consistency for professional visual creation.',
'fal-ai/bytedance/seedream/v4.5.description':
@@ -0,0 +1,120 @@
import { render, screen } from '@testing-library/react';
import type { PropsWithChildren, ReactNode } from 'react';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import { ChatSettingsTabs } from '@/store/global/initialState';
import Content from './Content';
const mocks = vi.hoisted(() => ({
agentState: {
activeAgentId: 'inbox-agent',
config: {},
isInbox: true,
meta: {},
optimisticUpdateAgentConfig: vi.fn(),
optimisticUpdateAgentMeta: vi.fn(),
},
serverState: {
featureFlags: {
enableAgentSelfIteration: true,
},
},
}));
vi.mock('@lobehub/ui', () => ({
Avatar: () => <div data-testid="avatar" />,
Block: ({ children }: PropsWithChildren) => <div>{children}</div>,
Flexbox: ({ children }: PropsWithChildren) => <div>{children}</div>,
Icon: () => <span />,
Text: ({ children }: PropsWithChildren) => <span>{children}</span>,
}));
vi.mock('@/components/Menu', () => ({
default: ({
items = [],
onClick,
selectedKeys = [],
}: {
items?: { key?: string; label?: ReactNode }[];
onClick?: ({ key }: { key: string }) => void;
selectedKeys?: string[];
}) => (
<div data-selected={selectedKeys.join(',')} data-testid="agent-settings-menu">
{items.map((item) => (
<button
key={item.key}
type="button"
onClick={() => item.key && onClick?.({ key: item.key })}
>
{item.label}
</button>
))}
</div>
),
}));
vi.mock('@/features/AgentSetting', () => ({
AgentSettings: ({ tab }: { tab: ChatSettingsTabs }) => (
<div data-tab={tab} data-testid="agent-settings-content" />
),
}));
vi.mock('@/store/agent', () => {
const useAgentStore = (selector: (state: typeof mocks.agentState) => unknown) =>
selector(mocks.agentState);
useAgentStore.getState = () => mocks.agentState;
return { useAgentStore };
});
vi.mock('@/store/agent/selectors', () => ({
agentSelectors: {
currentAgentConfig: (state: typeof mocks.agentState) => state.config,
currentAgentMeta: (state: typeof mocks.agentState) => state.meta,
},
builtinAgentSelectors: {
isInboxAgent: (state: typeof mocks.agentState) => state.isInbox,
},
}));
vi.mock('@/store/serverConfig', () => ({
featureFlagsSelectors: (state: typeof mocks.serverState) => state.featureFlags,
useServerConfigStore: (selector: (state: typeof mocks.serverState) => unknown) =>
selector(mocks.serverState),
}));
vi.mock('antd-style', () => ({
useTheme: () => ({
colorBgLayout: '#fff',
colorBorderSecondary: '#eee',
}),
}));
vi.mock('react-i18next', () => ({
useTranslation: () => ({
t: (key: string) => key,
}),
}));
describe('AgentSettings Content', () => {
beforeEach(() => {
mocks.agentState.isInbox = true;
mocks.serverState.featureFlags.enableAgentSelfIteration = true;
});
it('should select self iteration when inbox hides opening settings', () => {
render(<Content />);
expect(screen.queryByRole('button', { name: 'agentTab.opening' })).not.toBeInTheDocument();
expect(screen.getByRole('button', { name: 'agentTab.selfIteration' })).toBeInTheDocument();
expect(screen.getByTestId('agent-settings-menu')).toHaveAttribute(
'data-selected',
ChatSettingsTabs.SelfIteration,
);
expect(screen.getByTestId('agent-settings-content')).toHaveAttribute(
'data-tab',
ChatSettingsTabs.SelfIteration,
);
});
});
@@ -5,7 +5,7 @@ import { type ItemType } from 'antd/es/menu/interface';
import { useTheme } from 'antd-style';
import isEqual from 'fast-deep-equal';
import { ActivityIcon, MessageSquareHeartIcon } from 'lucide-react';
import { memo, useMemo, useState } from 'react';
import { memo, useEffect, useMemo, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { shallow } from 'zustand/shallow';
@@ -29,6 +29,21 @@ const Content = memo(() => {
const { enableAgentSelfIteration } = useServerConfigStore(featureFlagsSelectors);
const [tab, setTab] = useState(ChatSettingsTabs.Opening);
const availableTabs = useMemo(
() =>
[
!isInbox ? ChatSettingsTabs.Opening : null,
enableAgentSelfIteration ? ChatSettingsTabs.SelfIteration : null,
].filter(Boolean) as ChatSettingsTabs[],
[isInbox, enableAgentSelfIteration],
);
const activeTab = availableTabs.includes(tab) ? tab : availableTabs[0];
useEffect(() => {
if (activeTab && activeTab !== tab) setTab(activeTab);
}, [activeTab, tab]);
const updateAgentConfig = async (config: any) => {
if (!agentId) return;
await useAgentStore.getState().optimisticUpdateAgentConfig(agentId, config);
@@ -41,23 +56,30 @@ const Content = memo(() => {
const menuItems: ItemType[] = useMemo(
() =>
[
!isInbox
? {
icon: <Icon icon={MessageSquareHeartIcon} />,
key: ChatSettingsTabs.Opening,
label: t('agentTab.opening'),
availableTabs
.map((tab) => {
switch (tab) {
case ChatSettingsTabs.Opening: {
return {
icon: <Icon icon={MessageSquareHeartIcon} />,
key: ChatSettingsTabs.Opening,
label: t('agentTab.opening'),
};
}
: null,
enableAgentSelfIteration
? {
icon: <Icon icon={ActivityIcon} />,
key: ChatSettingsTabs.SelfIteration,
label: t('agentTab.selfIteration'),
case ChatSettingsTabs.SelfIteration: {
return {
icon: <Icon icon={ActivityIcon} />,
key: ChatSettingsTabs.SelfIteration,
label: t('agentTab.selfIteration'),
};
}
: null,
].filter(Boolean) as ItemType[],
[t, isInbox, enableAgentSelfIteration],
default: {
return null;
}
}
})
.filter(Boolean) as ItemType[],
[availableTabs, t],
);
const displayTitle = isInbox ? 'Lobe AI' : meta.title || t('defaultSession', { ns: 'common' });
@@ -105,7 +127,7 @@ const Content = memo(() => {
<Menu
selectable
items={menuItems}
selectedKeys={[tab]}
selectedKeys={activeTab ? [activeTab] : []}
style={{ width: '100%' }}
onClick={({ key }) => setTab(key as ChatSettingsTabs)}
/>
@@ -116,15 +138,17 @@ const Content = memo(() => {
paddingInline={64}
style={{ overflow: 'scroll', width: '100%' }}
>
<Settings
config={config}
id={agentId}
loading={false}
meta={meta}
tab={tab}
onConfigChange={updateAgentConfig}
onMetaChange={updateAgentMeta}
/>
{activeTab && (
<Settings
config={config}
id={agentId}
loading={false}
meta={meta}
tab={activeTab}
onConfigChange={updateAgentConfig}
onMetaChange={updateAgentMeta}
/>
)}
</Flexbox>
</Flexbox>
);
@@ -1,45 +0,0 @@
import isEqual from 'fast-deep-equal';
import { memo, useCallback, useState } from 'react';
import { useAgentStore } from '@/store/agent';
import { agentSelectors } from '@/store/agent/selectors';
import PublishButton from './PublishButton';
import PublishResultModal from './PublishResultModal';
/**
* Agent Publish Button Component
*
* Simplified version - backend now handles ownership check automatically.
* The action type (submit vs upload) is determined by backend based on:
* 1. Whether the identifier exists
* 2. Whether the current user is the owner
*/
const AgentPublishButton = memo(() => {
const meta = useAgentStore(agentSelectors.currentAgentMeta, isEqual);
const [showResultModal, setShowResultModal] = useState(false);
const [publishedIdentifier, setPublishedIdentifier] = useState<string>();
const handlePublishSuccess = useCallback((identifier: string) => {
setPublishedIdentifier(identifier);
setShowResultModal(true);
}, []);
// Determine action based on whether we have an existing marketIdentifier
// Backend will verify ownership and decide to create new or update
const action = meta?.marketIdentifier ? 'upload' : 'submit';
return (
<>
<PublishButton action={action} onPublishSuccess={handlePublishSuccess} />
<PublishResultModal
identifier={publishedIdentifier}
open={showResultModal}
onCancel={() => setShowResultModal(false)}
/>
</>
);
});
export default AgentPublishButton;
@@ -0,0 +1,214 @@
import { render, screen } from '@testing-library/react';
import type { PropsWithChildren, ReactNode } from 'react';
import { beforeEach, describe, expect, it, vi } from 'vitest';
import Header from './index';
const mocks = vi.hoisted(() => ({
agentState: {
activeAgentId: 'agent-1',
canCurrentAgentPublishToCommunity: true,
isCurrentAgentHeterogeneous: false,
meta: {
title: 'Test Agent',
},
systemRole: 'You are helpful.',
},
globalState: {
isStatusInit: true,
showAgentBuilderPanel: false,
toggleAgentBuilderPanel: vi.fn(),
},
homeState: {
removeAgent: vi.fn(),
},
marketAuth: {
isAuthenticated: true,
isLoading: false,
signIn: vi.fn(),
},
marketPublish: {
checkOwnership: vi.fn(),
isPublishing: false,
publish: vi.fn(),
},
navigate: vi.fn(),
versionReviewStatus: {
isUnderReview: false,
},
}));
vi.mock('@lobehub/ui', () => ({
ActionIcon: () => <button aria-label="more" type="button" />,
DropdownMenu: ({
children,
items = [],
}: PropsWithChildren<{
items?: Array<{ key?: string; label?: ReactNode; type?: string }>;
}>) => (
<div>
{children}
<div data-testid="agent-profile-menu">
{items
.filter((item) => item.type !== 'divider')
.map((item) => (
<button key={item.key} type="button">
{item.label}
</button>
))}
</div>
</div>
),
Flexbox: ({ children }: PropsWithChildren) => <div>{children}</div>,
Icon: () => <span />,
}));
vi.mock('@lobehub/ui/icons', () => ({
ShapesUploadIcon: () => null,
}));
vi.mock('antd', () => ({
App: {
useApp: () => ({
modal: {
confirm: vi.fn(),
},
}),
},
Modal: {
confirm: vi.fn(),
},
}));
vi.mock('lucide-react', () => ({
BotMessageSquareIcon: () => null,
MoreHorizontal: () => null,
Settings2Icon: () => null,
Trash: () => null,
}));
vi.mock('react-i18next', () => ({
useTranslation: () => ({
t: (key: string) => key,
}),
}));
vi.mock('react-router-dom', () => ({
useNavigate: () => mocks.navigate,
}));
vi.mock('@/components/AntdStaticMethods', () => ({
message: {
error: vi.fn(),
success: vi.fn(),
warning: vi.fn(),
},
}));
vi.mock('@/const/layoutTokens', () => ({
DESKTOP_HEADER_ICON_SMALL_SIZE: 24,
}));
vi.mock('@/features/NavHeader', () => ({
default: ({ left, right }: { left?: ReactNode; right?: ReactNode }) => (
<header>
{left}
{right}
</header>
),
}));
vi.mock('@/features/RightPanel/ToggleRightPanelButton', () => ({
default: () => <button type="button">agentBuilder</button>,
}));
vi.mock('@/layout/AuthProvider/MarketAuth', () => ({
useMarketAuth: () => mocks.marketAuth,
}));
vi.mock('@/layout/AuthProvider/MarketAuth/errors', () => ({
resolveMarketAuthError: () => ({ code: 'unknown' }),
}));
vi.mock('@/store/agent', () => ({
useAgentStore: (selector: (state: typeof mocks.agentState) => unknown) =>
selector(mocks.agentState),
}));
vi.mock('@/store/agent/selectors', () => ({
agentSelectors: {
canCurrentAgentPublishToCommunity: (state: typeof mocks.agentState) =>
state.canCurrentAgentPublishToCommunity,
currentAgentMeta: (state: typeof mocks.agentState) => state.meta,
currentAgentSystemRole: (state: typeof mocks.agentState) => state.systemRole,
isCurrentAgentHeterogeneous: (state: typeof mocks.agentState) =>
state.isCurrentAgentHeterogeneous,
},
}));
vi.mock('@/store/global', () => ({
useGlobalStore: (selector: (state: typeof mocks.globalState) => unknown) =>
selector(mocks.globalState),
}));
vi.mock('@/store/global/selectors', () => ({
systemStatusSelectors: {
isStatusInit: (state: typeof mocks.globalState) => state.isStatusInit,
showAgentBuilderPanel: (state: typeof mocks.globalState) => state.showAgentBuilderPanel,
},
}));
vi.mock('@/store/home', () => ({
useHomeStore: (selector: (state: typeof mocks.homeState) => unknown) => selector(mocks.homeState),
}));
vi.mock('./AgentForkTag', () => ({
default: () => null,
}));
vi.mock('./AgentPublishButton/ForkConfirmModal', () => ({
default: () => null,
}));
vi.mock('./AgentPublishButton/PublishResultModal', () => ({
default: () => null,
}));
vi.mock('./AgentPublishButton/useMarketPublish', () => ({
useMarketPublish: () => mocks.marketPublish,
}));
vi.mock('./AgentStatusTag', () => ({
default: () => null,
}));
vi.mock('./AutoSaveHint', () => ({
default: () => null,
}));
vi.mock('./AgentVersionReviewTag', () => ({
default: () => null,
useVersionReviewStatus: () => mocks.versionReviewStatus,
}));
describe('Agent profile Header', () => {
beforeEach(() => {
mocks.agentState.canCurrentAgentPublishToCommunity = true;
mocks.agentState.isCurrentAgentHeterogeneous = false;
});
it('should show the community publish action for normal agents', () => {
render(<Header />);
expect(screen.getByRole('button', { name: 'publishToCommunity' })).toBeInTheDocument();
});
it('should hide the community publish action for heterogeneous and platform agents', () => {
mocks.agentState.canCurrentAgentPublishToCommunity = false;
mocks.agentState.isCurrentAgentHeterogeneous = true;
render(<Header />);
expect(screen.queryByRole('button', { name: 'publishToCommunity' })).not.toBeInTheDocument();
});
});
@@ -36,6 +36,7 @@ const Header = memo(() => {
const systemRole = useAgentStore(agentSelectors.currentAgentSystemRole);
const activeAgentId = useAgentStore((s) => s.activeAgentId);
const isHeterogeneous = useAgentStore(agentSelectors.isCurrentAgentHeterogeneous);
const canPublishToCommunity = useAgentStore(agentSelectors.canCurrentAgentPublishToCommunity);
const [showAgentBuilderPanel, toggleAgentBuilderPanel, isStatusInit] = useGlobalStore((s) => [
systemStatusSelectors.showAgentBuilderPanel(s),
s.toggleAgentBuilderPanel,
@@ -148,13 +149,17 @@ const Header = memo(() => {
onClick: () => useAgentStore.setState({ showAgentSetting: true }),
},
{ type: 'divider' as const },
{
icon: <Icon icon={ShapesUploadIcon} />,
key: 'publish',
label: t('publishToCommunity', { ns: 'setting' }),
onClick: handlePublishClick,
},
{ type: 'divider' as const },
...(canPublishToCommunity
? [
{
icon: <Icon icon={ShapesUploadIcon} />,
key: 'publish',
label: t('publishToCommunity', { ns: 'setting' }),
onClick: handlePublishClick,
},
{ type: 'divider' as const },
]
: []),
{
danger: true,
icon: <Icon icon={Trash} />,
@@ -163,7 +168,7 @@ const Header = memo(() => {
onClick: handleDelete,
},
],
[handlePublishClick, handleDelete, t],
[canPublishToCommunity, handlePublishClick, handleDelete, t],
);
return (
@@ -182,7 +187,7 @@ const Header = memo(() => {
<DropdownMenu items={menuItems}>
<ActionIcon
icon={MoreHorizontal}
loading={isPublishing || isAuthLoading}
loading={canPublishToCommunity && (isPublishing || isAuthLoading)}
size={DESKTOP_HEADER_ICON_SMALL_SIZE}
/>
</DropdownMenu>
@@ -1,4 +1,5 @@
import { type AgentState } from '@lobechat/agent-runtime';
import { type UIChatMessage } from '@lobechat/types';
import debug from 'debug';
import { type AgentOperationMetadata, type StepResult } from './AgentStateManager';
@@ -43,6 +44,17 @@ export interface AgentRuntimeCoordinatorOptions {
* Defaults to automatic selection based on Redis availability
*/
streamEventManager?: IStreamEventManager;
/**
* Resolve the canonical UIChatMessage[] snapshot for a terminal-state
* agent run, attached to `agent_runtime_end` events so the client can
* use the pushed payload as Source of Truth instead of refetching from
* DB.
*
* Optional: when omitted (e.g. tests, embedded usage without DB access)
* the coordinator falls back to publishing without `uiMessages` and the
* client behaves as before.
*/
uiMessagesResolver?: (state: AgentState) => Promise<UIChatMessage[] | undefined>;
}
/**
@@ -59,10 +71,12 @@ export interface AgentRuntimeCoordinatorOptions {
export class AgentRuntimeCoordinator {
private stateManager: IAgentStateManager;
private streamEventManager: IStreamEventManager;
private uiMessagesResolver?: (state: AgentState) => Promise<UIChatMessage[] | undefined>;
constructor(options?: AgentRuntimeCoordinatorOptions) {
this.stateManager = options?.stateManager ?? createAgentStateManager();
this.streamEventManager = options?.streamEventManager ?? createStreamEventManager();
this.uiMessagesResolver = options?.uiMessagesResolver;
}
/**
@@ -94,6 +108,22 @@ export class AgentRuntimeCoordinator {
}
}
/**
* Invoke the optional uiMessagesResolver and shield callers from its
* failures — stream-event publishing must never fail the surrounding
* save. Errors are logged and surfaced to the client as a missing field,
* which falls back to the legacy refresh path.
*/
private async resolveUiMessages(state: AgentState): Promise<UIChatMessage[] | undefined> {
if (!this.uiMessagesResolver) return undefined;
try {
return await this.uiMessagesResolver(state);
} catch (error) {
console.error('Failed to resolve uiMessages for agent_runtime_end:', error);
return undefined;
}
}
/**
* Save Agent state and handle corresponding events
*/
@@ -106,12 +136,13 @@ export class AgentRuntimeCoordinator {
// Send a terminal event once the operation first enters a terminal state.
if (hasEnteredStreamEndState(previousState?.status, state.status)) {
await this.streamEventManager.publishAgentRuntimeEnd(
await this.streamEventManager.publishAgentRuntimeEnd({
finalState: state,
operationId,
state.stepCount ?? previousState?.stepCount ?? 0,
state,
state.status,
);
reason: state.status,
stepIndex: state.stepCount ?? previousState?.stepCount ?? 0,
uiMessages: await this.resolveUiMessages(state),
});
log('[%s] Agent runtime reached terminal state: %s', operationId, state.status);
}
} catch (error) {
@@ -133,12 +164,14 @@ export class AgentRuntimeCoordinator {
// This ensures agent_runtime_end is sent after all step events.
if (hasEnteredStreamEndState(previousState?.status, stepResult.newState.status)) {
await this.streamEventManager.publishAgentRuntimeEnd(
await this.streamEventManager.publishAgentRuntimeEnd({
finalState: stepResult.newState,
operationId,
stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0,
stepResult.newState,
stepResult.newState.status,
);
reason: stepResult.newState.status,
stepIndex:
stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0,
uiMessages: await this.resolveUiMessages(stepResult.newState),
});
log(
'[%s] Agent runtime reached terminal state after step result: %s',
operationId,
@@ -7,7 +7,7 @@ import {
type StreamChunkData,
type StreamEvent,
} from './StreamEventManager';
import type { IStreamEventManager } from './types';
import type { IStreamEventManager, PublishAgentRuntimeEndParams } from './types';
const log = debug('lobe-server:agent-runtime:gateway-notifier');
@@ -78,26 +78,25 @@ export class GatewayStreamNotifier implements IStreamEventManager {
return result;
}
async publishAgentRuntimeEnd(
operationId: string,
stepIndex: number,
finalState: any,
reason?: string,
reasonDetail?: string,
): Promise<string> {
const result = await this.inner.publishAgentRuntimeEnd(
operationId,
stepIndex,
finalState,
reason,
reasonDetail,
);
async publishAgentRuntimeEnd(params: PublishAgentRuntimeEndParams): Promise<string> {
const { operationId, stepIndex, finalState, reason, reasonDetail, uiMessages } = params;
const result = await this.inner.publishAgentRuntimeEnd(params);
const effectiveReasonDetail = reasonDetail || getDefaultReasonDetail(finalState, reason);
const errorType = finalState?.error?.type || finalState?.error?.errorType;
this.pushEvent(operationId, {
data: { errorType, finalState, reason, reasonDetail: effectiveReasonDetail },
// Forward `uiMessages` to the gateway push channel so terminal-state
// clients consuming /push-event get the canonical UIChatMessage[]
// snapshot — the final step has no later step_start to carry a fresh
// snapshot, so dropping it here would break the SoT contract.
data: {
errorType,
finalState,
reason,
reasonDetail: effectiveReasonDetail,
...(uiMessages !== undefined && { uiMessages }),
},
operationId,
stepIndex,
timestamp: Date.now(),
@@ -1,7 +1,7 @@
import debug from 'debug';
import { type StreamChunkData, type StreamEvent } from './StreamEventManager';
import { type IStreamEventManager } from './types';
import { type IStreamEventManager, type PublishAgentRuntimeEndParams } from './types';
const log = debug('lobe-server:agent-runtime:in-memory-stream-event-manager');
@@ -97,13 +97,14 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
});
}
async publishAgentRuntimeEnd(
operationId: string,
stepIndex: number,
finalState: any,
reason?: string,
reasonDetail?: string,
): Promise<string> {
async publishAgentRuntimeEnd({
operationId,
stepIndex,
finalState,
reason,
reasonDetail,
uiMessages,
}: PublishAgentRuntimeEndParams): Promise<string> {
return this.publishStreamEvent(operationId, {
data: {
finalState,
@@ -111,6 +112,7 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
phase: 'execution_complete',
reason: reason || 'completed',
reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason),
...(uiMessages !== undefined && { uiMessages }),
},
stepIndex,
type: 'agent_runtime_end',
@@ -272,6 +272,14 @@ export interface RuntimeExecutorContext {
streamManager: IStreamEventManager;
toolExecutionService: ToolExecutionService;
topicId?: string;
/**
* Trace-pipeline sink for context engine input/output. Wired by
* AgentRuntimeService so the trace recorder can pick CE data up
* out-of-band, keeping the heavy CE payload (agentDocuments, systemRole, …)
* out of the `events` array and therefore out of the Redis state pipeline.
* See LOBE-9110.
*/
tracingContextEngine?: (input: unknown, output: unknown) => void;
userId?: string;
userTimezone?: string;
}
@@ -714,7 +722,7 @@ export const createRuntimeExecutors = (
processedMessages = await serverMessagesEngine(contextEngineInput);
// Emit context engine event for tracing
// Hand context engine input/output to the trace sink out-of-band.
// Omit large/redundant fields to reduce snapshot size:
// - input.messages: reconstructible from step's messagesBaseline + messagesDelta
// - input.toolsConfig: static per operation, ~47KB of manifests repeated every call_llm step
@@ -724,14 +732,10 @@ export const createRuntimeExecutors = (
toolsConfig: _toolsConfig,
...contextEngineInputLite
} = contextEngineInput;
events.push({
input: {
...contextEngineInputLite,
toolCount: _toolsConfig?.tools?.length ?? 0,
},
output: processedMessages,
type: 'context_engine_result',
} as any);
ctx.tracingContextEngine?.(
{ ...contextEngineInputLite, toolCount: _toolsConfig?.tools?.length ?? 0 },
processedMessages,
);
} else {
processedMessages = llmPayload.messages;
}
@@ -4,6 +4,7 @@ import debug from 'debug';
import { type Redis } from 'ioredis';
import { getAgentRuntimeRedisClient } from './redis';
import { type PublishAgentRuntimeEndParams } from './types';
const log = debug('lobe-server:agent-runtime:stream-event-manager');
const timing = debug('lobe-server:agent-runtime:timing');
@@ -182,13 +183,14 @@ export class StreamEventManager {
/**
* Publish Agent runtime end event
*/
async publishAgentRuntimeEnd(
operationId: string,
stepIndex: number,
finalState: any,
reason?: string,
reasonDetail?: string,
): Promise<string> {
async publishAgentRuntimeEnd({
operationId,
stepIndex,
finalState,
reason,
reasonDetail,
uiMessages,
}: PublishAgentRuntimeEndParams): Promise<string> {
return this.publishStreamEvent(operationId, {
data: {
finalState,
@@ -196,6 +198,7 @@ export class StreamEventManager {
phase: 'execution_complete',
reason: reason || 'completed',
reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason),
...(uiMessages !== undefined && { uiMessages }),
},
stepIndex,
type: 'agent_runtime_end',
@@ -95,12 +95,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStateManager.saveAgentState).toHaveBeenCalledWith(operationId, newState);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId,
newState.stepCount,
newState,
'done',
);
reason: 'done',
stepIndex: newState.stepCount,
uiMessages: undefined,
});
});
it('should publish end event when status changes to error', async () => {
@@ -112,12 +113,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId,
newState.stepCount,
newState,
'error',
);
reason: 'error',
stepIndex: newState.stepCount,
uiMessages: undefined,
});
});
it('should fallback to previous stepCount when terminal state is missing stepCount', async () => {
@@ -129,12 +131,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId,
previousState.stepCount,
newState,
'error',
);
reason: 'error',
stepIndex: previousState.stepCount,
uiMessages: undefined,
});
});
it('should publish end event when status changes to interrupted', async () => {
@@ -146,12 +149,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId,
newState.stepCount,
newState,
'interrupted',
);
reason: 'interrupted',
stepIndex: newState.stepCount,
uiMessages: undefined,
});
});
it('should publish end event when status changes to waiting_for_human so the client releases its loading state', async () => {
@@ -163,12 +167,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId,
newState.stepCount,
newState,
'waiting_for_human',
);
reason: 'waiting_for_human',
stepIndex: newState.stepCount,
uiMessages: undefined,
});
});
it('should not publish end event when status was already done', async () => {
@@ -214,12 +219,13 @@ describe('AgentRuntimeCoordinator', () => {
expect(mockStateManager.loadAgentState).toHaveBeenCalledWith(operationId);
expect(mockStateManager.saveStepResult).toHaveBeenCalledWith(operationId, stepResult);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId,
5,
stepResult.newState,
'done',
);
reason: 'done',
stepIndex: 5,
uiMessages: undefined,
});
});
it('should publish end event when status becomes error', async () => {
@@ -234,12 +240,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId,
5,
stepResult.newState,
'error',
);
reason: 'error',
stepIndex: 5,
uiMessages: undefined,
});
});
it('should fallback to stepResult.stepIndex when terminal step result state is missing stepCount', async () => {
@@ -254,12 +261,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId,
stepResult.stepIndex,
stepResult.newState,
'error',
);
reason: 'error',
stepIndex: stepResult.stepIndex,
uiMessages: undefined,
});
});
it('should publish end event when status becomes waiting_for_human (paused awaiting approval)', async () => {
@@ -274,12 +282,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId,
4,
stepResult.newState,
'waiting_for_human',
);
reason: 'waiting_for_human',
stepIndex: 4,
uiMessages: undefined,
});
});
it('should publish end event when status becomes interrupted', async () => {
@@ -294,12 +303,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId,
5,
stepResult.newState,
'interrupted',
);
reason: 'interrupted',
stepIndex: 5,
uiMessages: undefined,
});
});
it('should not publish end event when status is not done', async () => {
@@ -395,4 +405,105 @@ describe('AgentRuntimeCoordinator', () => {
expect(result).toBe(expectedHistory);
});
});
// Terminal events should carry the canonical UIChatMessage[] snapshot
// when a resolver is wired so the client can use the pushed payload as
// Source of Truth instead of refetching from DB.
describe('uiMessagesResolver on agent_runtime_end', () => {
it('passes resolver result through saveAgentState terminal publish', async () => {
const uiMessages = [{ id: 'msg_1', role: 'user' }] as any[];
const resolver = vi.fn().mockResolvedValue(uiMessages);
const coordinatorWithResolver = new AgentRuntimeCoordinator({
stateManager: mockStateManager,
streamEventManager: mockStreamManager,
uiMessagesResolver: resolver,
});
const previousState = { status: 'running', stepCount: 3 };
const newState = { status: 'done', stepCount: 5 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinatorWithResolver.saveAgentState('op-1', newState as any);
expect(resolver).toHaveBeenCalledWith(newState);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId: 'op-1',
reason: 'done',
stepIndex: 5,
uiMessages,
});
});
it('passes resolver result through saveStepResult terminal publish', async () => {
const uiMessages = [{ id: 'msg_a', role: 'assistantGroup' }] as any[];
const resolver = vi.fn().mockResolvedValue(uiMessages);
const coordinatorWithResolver = new AgentRuntimeCoordinator({
stateManager: mockStateManager,
streamEventManager: mockStreamManager,
uiMessagesResolver: resolver,
});
const stepResult = {
executionTime: 100,
newState: { status: 'done', stepCount: 4 },
stepIndex: 4,
};
mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 3 });
await coordinatorWithResolver.saveStepResult('op-2', stepResult as any);
expect(resolver).toHaveBeenCalledWith(stepResult.newState);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId: 'op-2',
reason: 'done',
stepIndex: 4,
uiMessages,
});
});
it('publishes with uiMessages=undefined when resolver rejects (must never fail the surrounding save)', async () => {
const resolver = vi.fn().mockRejectedValue(new Error('db down'));
const coordinatorWithResolver = new AgentRuntimeCoordinator({
stateManager: mockStateManager,
streamEventManager: mockStreamManager,
uiMessagesResolver: resolver,
});
const previousState = { status: 'running', stepCount: 3 };
const newState = { status: 'error', stepCount: 5 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinatorWithResolver.saveAgentState('op-3', newState as any);
expect(resolver).toHaveBeenCalled();
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId: 'op-3',
reason: 'error',
stepIndex: 5,
uiMessages: undefined,
});
});
it('publishes with uiMessages=undefined when no resolver is wired (default constructor)', async () => {
// The default `coordinator` from the outer beforeEach is constructed
// without a resolver — proves the field is genuinely optional and
// legacy call sites stay unaffected.
const previousState = { status: 'running', stepCount: 3 };
const newState = { status: 'done', stepCount: 5 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinator.saveAgentState('op-4', newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId: 'op-4',
reason: 'done',
stepIndex: 5,
uiMessages: undefined,
});
});
});
});
@@ -131,21 +131,27 @@ describe('GatewayStreamNotifier', () => {
it('delegates to inner and returns its result', async () => {
const finalState = { status: 'done' };
const result = await notifier.publishAgentRuntimeEnd('op-1', 2, finalState, 'completed');
const params = {
finalState,
operationId: 'op-1',
reason: 'completed',
stepIndex: 2,
};
const result = await notifier.publishAgentRuntimeEnd(params);
expect(result).toBe('publishAgentRuntimeEnd-result');
expect(inner.calls.publishAgentRuntimeEnd).toHaveLength(1);
expect(inner.calls.publishAgentRuntimeEnd[0]).toEqual([
'op-1',
2,
finalState,
'completed',
undefined,
]);
expect(inner.calls.publishAgentRuntimeEnd[0]).toEqual([params]);
});
it('calls gateway push-event endpoint only (no update-status)', async () => {
await notifier.publishAgentRuntimeEnd('op-1', 2, {}, 'completed', 'All done');
await notifier.publishAgentRuntimeEnd({
finalState: {},
operationId: 'op-1',
reason: 'completed',
reasonDetail: 'All done',
stepIndex: 2,
});
await new Promise((r) => setTimeout(r, 50));
@@ -163,7 +169,12 @@ describe('GatewayStreamNotifier', () => {
},
};
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
await notifier.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'error',
stepIndex: 0,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
@@ -176,7 +187,13 @@ describe('GatewayStreamNotifier', () => {
error: { message: 'Some error' },
};
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error', 'Custom detail');
await notifier.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'error',
reasonDetail: 'Custom detail',
stepIndex: 0,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
@@ -189,7 +206,12 @@ describe('GatewayStreamNotifier', () => {
error: { message: 'Budget exceeded', type: 'InsufficientBudgetForModel' },
};
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
await notifier.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'error',
stepIndex: 0,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
@@ -205,7 +227,12 @@ describe('GatewayStreamNotifier', () => {
},
};
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
await notifier.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'error',
stepIndex: 0,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
@@ -214,13 +241,49 @@ describe('GatewayStreamNotifier', () => {
});
it('errorType is undefined when no error in finalState', async () => {
await notifier.publishAgentRuntimeEnd('op-1', 0, { status: 'done' }, 'completed');
await notifier.publishAgentRuntimeEnd({
finalState: { status: 'done' },
operationId: 'op-1',
reason: 'completed',
stepIndex: 0,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
const body = JSON.parse(pushCall![1].body);
expect(body.event.data.errorType).toBeUndefined();
});
it('forwards uiMessages to the gateway push payload when provided', async () => {
const uiMessages = [{ id: 'msg_z', role: 'assistantGroup' }] as any;
await notifier.publishAgentRuntimeEnd({
finalState: { status: 'done' },
operationId: 'op-1',
reason: 'completed',
stepIndex: 4,
uiMessages,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
const body = JSON.parse(pushCall![1].body);
expect(body.event.data.uiMessages).toEqual(uiMessages);
});
it('omits uiMessages from the gateway push payload when not provided', async () => {
await notifier.publishAgentRuntimeEnd({
finalState: { status: 'done' },
operationId: 'op-1',
reason: 'completed',
stepIndex: 4,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
const body = JSON.parse(pushCall![1].body);
expect(body.event.data).not.toHaveProperty('uiMessages');
});
});
// ─── Read/subscribe methods: must delegate directly to inner ───
@@ -306,7 +369,12 @@ describe('GatewayStreamNotifier', () => {
() => new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), 10)),
);
const result = await notifier.publishAgentRuntimeEnd('op-1', 0, {}, 'completed');
const result = await notifier.publishAgentRuntimeEnd({
finalState: {},
operationId: 'op-1',
reason: 'completed',
stepIndex: 0,
});
expect(result).toBe('publishAgentRuntimeEnd-result');
expect(inner.calls.publishAgentRuntimeEnd).toHaveLength(1);
@@ -157,4 +157,46 @@ describe('InMemoryStreamEventManager', () => {
expect(manager.getAllEvents('op-1')).toHaveLength(0);
});
});
// agent_runtime_end optionally carries the canonical UIChatMessage[]
// snapshot so the client can use the pushed payload as Source of Truth
// instead of refetching from DB.
describe('publishAgentRuntimeEnd uiMessages', () => {
it('includes uiMessages in event data when provided', async () => {
const uiMessages = [
{ id: 'msg_u', role: 'user' },
{ id: 'msg_a', role: 'assistantGroup' },
] as any[];
const finalState = { status: 'done', stepCount: 3 };
await manager.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'done',
stepIndex: 3,
uiMessages,
});
const events = manager.getAllEvents('op-1');
expect(events).toHaveLength(1);
expect(events[0].type).toBe('agent_runtime_end');
expect(events[0].data.uiMessages).toEqual(uiMessages);
expect(events[0].data.finalState).toEqual(finalState);
});
it('omits uiMessages when not provided (legacy callers stay unaffected)', async () => {
const finalState = { status: 'done', stepCount: 3 };
await manager.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'done',
stepIndex: 3,
});
const events = manager.getAllEvents('op-1');
expect(events[0].data).not.toHaveProperty('uiMessages');
expect(events[0].data.finalState).toEqual(finalState);
});
});
});
@@ -75,7 +75,11 @@ describe('StreamEventManager', () => {
mockRedis.xadd.mockResolvedValue('event-id-456');
const result = await streamManager.publishAgentRuntimeEnd(operationId, stepIndex, finalState);
const result = await streamManager.publishAgentRuntimeEnd({
finalState,
operationId,
stepIndex,
});
expect(result).toBe('event-id-456');
expect(mockRedis.xadd).toHaveBeenCalledWith(
@@ -103,6 +107,50 @@ describe('StreamEventManager', () => {
);
});
// agent_runtime_end optionally carries the canonical UIChatMessage[]
// snapshot so the client can use the pushed payload as Source of Truth
// instead of refetching from DB.
it('should include uiMessages in serialized data when provided', async () => {
const operationId = 'test-operation-id';
const stepIndex = 5;
const finalState = { status: 'done', stepCount: 5 };
const uiMessages = [{ id: 'msg_a', role: 'assistantGroup' }] as any;
mockRedis.xadd.mockResolvedValue('event-id-ui');
await streamManager.publishAgentRuntimeEnd({
finalState,
operationId,
reason: 'done',
stepIndex,
uiMessages,
});
// Find the serialized `data` argument inline so this test stays robust
// if other positional args shift around.
const dataArg = mockRedis.xadd.mock.calls[0]?.find(
(a: any) => typeof a === 'string' && a.startsWith('{'),
);
const parsed = JSON.parse(dataArg);
expect(parsed.uiMessages).toEqual(uiMessages);
expect(parsed.finalState).toEqual(finalState);
});
it('should omit uiMessages from serialized data when not provided', async () => {
const operationId = 'test-operation-id';
const finalState = { status: 'done', stepCount: 3 };
mockRedis.xadd.mockResolvedValue('event-id-noui');
await streamManager.publishAgentRuntimeEnd({ finalState, operationId, stepIndex: 3 });
const dataArg = mockRedis.xadd.mock.calls[0]?.find(
(a: any) => typeof a === 'string' && a.startsWith('{'),
);
const parsed = JSON.parse(dataArg);
expect(parsed).not.toHaveProperty('uiMessages');
});
it('should accept custom reason and reasonDetail', async () => {
const operationId = 'test-operation-id';
const stepIndex = 3;
@@ -112,13 +160,13 @@ describe('StreamEventManager', () => {
mockRedis.xadd.mockResolvedValue('event-id-789');
await streamManager.publishAgentRuntimeEnd(
operationId,
stepIndex,
await streamManager.publishAgentRuntimeEnd({
finalState,
operationId,
reason,
reasonDetail,
);
stepIndex,
});
expect(mockRedis.xadd).toHaveBeenCalledWith(
expect.any(String),
@@ -158,7 +206,12 @@ describe('StreamEventManager', () => {
mockRedis.xadd.mockResolvedValue('event-id-790');
await streamManager.publishAgentRuntimeEnd(operationId, stepIndex, finalState, 'error');
await streamManager.publishAgentRuntimeEnd({
finalState,
operationId,
reason: 'error',
stepIndex,
});
expect(mockRedis.xadd).toHaveBeenCalledWith(
expect.any(String),
+22 -8
View File
@@ -1,9 +1,24 @@
import type { ToolExecuteData } from '@lobechat/agent-gateway-client';
import { type AgentState } from '@lobechat/agent-runtime';
import { type UIChatMessage } from '@lobechat/types';
import { type AgentOperationMetadata, type StepResult } from './AgentStateManager';
import { type StreamChunkData, type StreamEvent } from './StreamEventManager';
export interface PublishAgentRuntimeEndParams {
finalState: any;
operationId: string;
reason?: string;
reasonDetail?: string;
stepIndex: number;
/**
* Canonical UIChatMessage[] snapshot of the topic at terminal-state time.
* When present, the client uses this directly as Source of Truth instead
* of refetching from DB.
*/
uiMessages?: UIChatMessage[];
}
/**
* Agent State Manager Interface
* Abstract interface for state persistence, supports Redis and in-memory implementations
@@ -114,15 +129,14 @@ export interface IStreamEventManager {
getStreamHistory: (operationId: string, count?: number) => Promise<StreamEvent[]>;
/**
* Publish Agent runtime end event
* Publish Agent runtime end event.
*
* `uiMessages` is the canonical UIChatMessage[] snapshot of the topic at
* terminal-state time so the client can use the pushed payload as Source
* of Truth instead of refetching from DB. Optional: callers without DB
* access may omit it and the client falls back to its existing behaviour.
*/
publishAgentRuntimeEnd: (
operationId: string,
stepIndex: number,
finalState: any,
reason?: string,
reasonDetail?: string,
) => Promise<string>;
publishAgentRuntimeEnd: (params: PublishAgentRuntimeEndParams) => Promise<string>;
/**
* Publish Agent runtime initialization event
+7 -7
View File
@@ -132,13 +132,13 @@ export const agentNotifyRouter = router({
const stream = getStreamManager();
if (done) {
// Signal task completion — frontend gateway WS subscription closes.
await stream.publishAgentRuntimeEnd(
remoteOperationId,
0,
{ reason: 'success' },
'success',
'Remote hetero agent task completed',
);
await stream.publishAgentRuntimeEnd({
finalState: { reason: 'success' },
operationId: remoteOperationId,
reason: 'success',
reasonDetail: 'Remote hetero agent task completed',
stepIndex: 0,
});
} else {
// Lightweight invalidation — frontend calls fetchAndReplaceMessages.
await stream.publishStreamEvent(remoteOperationId, {
@@ -1478,4 +1478,55 @@ describe('AgentRuntimeService', () => {
expect(mockCoordinator.saveAgentState).not.toHaveBeenCalled();
});
});
// Stream events at step / operation boundaries should carry the canonical
// UIChatMessage[] snapshot so the client can use the pushed payload as
// Source of Truth instead of refetching from DB.
describe('queryUiMessages', () => {
const stubMessageService = (svc: any, queryMessages: ReturnType<typeof vi.fn>) => {
svc.messageServiceInstance = { queryMessages };
};
it('returns messageService.queryMessages result when agentId + topicId are present', async () => {
const stubMessages = [{ id: 'msg_x', role: 'user' }];
const queryMessages = vi.fn().mockResolvedValue(stubMessages);
stubMessageService(service, queryMessages);
const result = await service.queryUiMessages({
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
} as any);
expect(queryMessages).toHaveBeenCalledWith({ agentId: 'agt_1', topicId: 'tpc_1' });
expect(result).toEqual(stubMessages);
});
it('returns undefined when agentId or topicId is missing (skips empty-array push)', async () => {
const queryMessages = vi.fn();
stubMessageService(service, queryMessages);
const noAgent = await service.queryUiMessages({
metadata: { topicId: 'tpc_1' },
} as any);
const noTopic = await service.queryUiMessages({
metadata: { agentId: 'agt_1' },
} as any);
const noMeta = await service.queryUiMessages({} as any);
expect(noAgent).toBeUndefined();
expect(noTopic).toBeUndefined();
expect(noMeta).toBeUndefined();
expect(queryMessages).not.toHaveBeenCalled();
});
it('returns undefined and never throws when DB query fails (stream events must not fail the step)', async () => {
const queryMessages = vi.fn().mockRejectedValue(new Error('db down'));
stubMessageService(service, queryMessages);
const result = await service.queryUiMessages({
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
} as any);
expect(result).toBeUndefined();
});
});
});
@@ -13,6 +13,7 @@ import {
ChatErrorType,
type ChatMessageError,
type ExecSubAgentTaskParams,
type UIChatMessage,
} from '@lobechat/types';
import debug from 'debug';
import urlJoin from 'url-join';
@@ -30,6 +31,7 @@ import { type IStreamEventManager } from '@/server/modules/AgentRuntime/types';
import { emitAgentSignalSourceEvent } from '@/server/services/agentSignal';
import { toAgentSignalTraceEvents } from '@/server/services/agentSignal/observability/traceEvents';
import { mcpService } from '@/server/services/mcp';
import { MessageService } from '@/server/services/message';
import { QueueService } from '@/server/services/queue';
import { LocalQueueServiceImpl } from '@/server/services/queue/impls';
import { ToolExecutionService } from '@/server/services/toolExecution';
@@ -182,6 +184,17 @@ export class AgentRuntimeService {
private serverDB: LobeChatDatabase;
private userId: string;
private messageModel: MessageModel;
// Lazily constructed because MessageService instantiates a FileService
// which eagerly creates the S3 client and throws when S3 env vars are
// missing — eager construction would break every test that builds an
// AgentRuntimeService without mocking the file backend.
private messageServiceInstance?: MessageService;
private get messageService(): MessageService {
if (!this.messageServiceInstance) {
this.messageServiceInstance = new MessageService(this.serverDB, this.userId);
}
return this.messageServiceInstance;
}
constructor(db: LobeChatDatabase, userId: string, options?: AgentRuntimeServiceOptions) {
// Use factory function to auto-select Redis or InMemory implementation
@@ -192,6 +205,10 @@ export class AgentRuntimeService {
this.coordinator = new AgentRuntimeCoordinator({
...options?.coordinatorOptions,
streamEventManager: this.streamManager,
// Provide the canonical UIChatMessage[] for terminal-state events so
// the client can use the pushed payload directly instead of refetching
// from DB. Falls back gracefully when topicId isn't set.
uiMessagesResolver: (state) => this.queryUiMessages(state),
});
this.queueService =
options?.queueService === null ? null : (options?.queueService ?? new QueueService());
@@ -474,6 +491,31 @@ export class AgentRuntimeService {
}
}
/**
* Query the canonical UIChatMessage[] snapshot for the active topic the
* same shape the `message.getMessages` trpc lambda returns to the client.
* Attached to step_start / agent_runtime_end stream events so the client
* can use the pushed payload directly instead of refetching from DB.
*
* Returns undefined when the topic isn't known yet (e.g. very early in
* bootstrap before the topic row has been committed) so callers can skip
* the `uiMessages` field entirely instead of pushing an empty array.
*/
async queryUiMessages(agentState: AgentState): Promise<UIChatMessage[] | undefined> {
const agentId: string | undefined = agentState?.metadata?.agentId;
const topicId: string | undefined = agentState?.metadata?.topicId;
if (!agentId || !topicId) return undefined;
try {
return await this.messageService.queryMessages({ agentId, topicId });
} catch (error) {
// Stream events must never fail the step. If the DB hiccups, fall back
// to letting the client refresh as before.
console.error('[queryUiMessages] Failed to load uiMessages snapshot: %O', error);
return undefined;
}
}
/**
* Execute Agent step
*/
@@ -515,20 +557,27 @@ export class AgentRuntimeService {
try {
log('[%s][%d] Start step executing...', operationId, stepIndex);
// Publish step start event
await this.streamManager.publishStreamEvent(operationId, {
data: {},
stepIndex,
type: 'step_start',
});
// Get operation state and metadata
// Load agent state BEFORE publishing step_start so we can attach the
// canonical UIChatMessage snapshot to the event payload. step_start
// fires after the previous step's DB writes are awaited durable, so
// the snapshot query here reflects strongly-consistent state — that's
// the contract that lets the client treat the pushed uiMessages as
// the source of truth instead of doing its own refetch.
const agentState = await this.coordinator.loadAgentState(operationId);
if (!agentState) {
throw new Error(`Agent state not found for operation ${operationId}`);
}
const stepStartUiMessages = await this.queryUiMessages(agentState);
await this.streamManager.publishStreamEvent(operationId, {
data: {
...(stepStartUiMessages !== undefined && { uiMessages: stepStartUiMessages }),
},
stepIndex,
type: 'step_start',
});
agentState.metadata = {
...agentState.metadata,
externalRetryCount,
@@ -622,6 +671,14 @@ export class AgentRuntimeService {
log('[%s] beforeStep hook dispatch error: %O', operationId, hookError);
}
// Per-step buffer for context engine input/output. Populated by the
// `tracingContextEngine` callback passed into the executor context;
// consumed by traceRecorder.appendStep below. Routing CE this way keeps
// its heavy payload (agentDocuments, systemRole, …) out of
// `stepResult.events` and therefore out of the Redis state pipeline.
// See LOBE-9110.
let contextEnginePayload: { input: unknown; output: unknown } | undefined;
// Create Agent and Runtime instances
// Use agentState.metadata which contains the full app context (topicId, agentId, etc.)
// operationMetadata only contains basic fields (agentConfig, modelRuntimeConfig, userId)
@@ -629,6 +686,9 @@ export class AgentRuntimeService {
metadata: agentState?.metadata,
operationId,
stepIndex,
tracingContextEngine: (input, output) => {
contextEnginePayload = { input, output };
},
});
// Handle human intervention
@@ -800,6 +860,7 @@ export class AgentRuntimeService {
afterStepSignalEvents,
agentState,
beforeStepSignalEvents,
contextEngine: contextEnginePayload,
currentContext,
externalRetryCount,
presentation: stepPresentationData,
@@ -1337,10 +1398,12 @@ export class AgentRuntimeService {
metadata,
operationId,
stepIndex,
tracingContextEngine,
}: {
metadata?: any;
operationId: string;
stepIndex: number;
tracingContextEngine?: (input: unknown, output: unknown) => void;
}) {
const contextWindowTokens =
metadata?.modelRuntimeConfig?.model && metadata?.modelRuntimeConfig?.provider
@@ -1386,6 +1449,7 @@ export class AgentRuntimeService {
streamManager: this.streamManager,
toolExecutionService: this.toolExecutionService,
topicId: metadata?.topicId,
tracingContextEngine,
userId: metadata?.userId,
};
@@ -15,6 +15,13 @@ export interface AppendStepParams {
*/
agentState: any;
beforeStepSignalEvents: SignalEvent[];
/**
* Context engine input/output captured for this step. Delivered via
* `RuntimeExecutorContext.tracingContextEngine` rather than through the
* `events` array, so CE payloads (agentDocuments, systemRole, ) stay out
* of the Redis state pipeline. See LOBE-9110.
*/
contextEngine?: { input?: unknown; output?: unknown };
currentContext?: { payload?: unknown; phase?: string; stepContext?: unknown };
externalRetryCount: number;
presentation: StepPresentationData;
@@ -220,6 +227,7 @@ export class OperationTraceRecorder {
agentState,
afterStepSignalEvents,
beforeStepSignalEvents,
contextEngine: ceInput,
currentContext,
externalRetryCount,
presentation,
@@ -237,21 +245,20 @@ export class OperationTraceRecorder {
const isBaseline = stepIndex === 0 || isCompression;
const messagesDelta = afterMessages.slice(prevMessages.length);
// Extract context_engine_result into contextEngine (dedicated typed field).
// CE data is structural state, not a streaming event — it lives separately
// from events and uses the same delta pattern as messagesBaseline/messagesDelta.
const rawEvents = (stepResult.events as any[]) ?? [];
const ceEvent = rawEvents.find((e: any) => e.type === 'context_engine_result') as any;
const contextEngine: StepSnapshot['contextEngine'] = ceEvent
? { input: ceEvent.input, output: ceEvent.output }
// CE data is structural state, not a streaming event — delivered via the
// typed `contextEngine` field on AppendStepParams (sourced from
// RuntimeExecutorContext.tracingContextEngine). Uses the same delta
// pattern as messagesBaseline/messagesDelta.
const contextEngine: StepSnapshot['contextEngine'] = ceInput
? { input: ceInput.input, output: ceInput.output }
: undefined;
// Strip heavy/redundant data from events before persisting to snapshot.
// context_engine_result is excluded — stored in contextEngine instead.
const rawEvents = (stepResult.events as any[]) ?? [];
const snapshotEvents = [
...beforeStepSignalEvents,
...rawEvents
.filter((e) => e.type !== 'llm_stream' && e.type !== 'context_engine_result')
.filter((e) => e.type !== 'llm_stream')
.map((e) => {
if (e.type === 'done' && e.finalState) {
// Remove reconstructible fields from finalState:
@@ -358,26 +358,29 @@ describe('OperationTraceRecorder', () => {
recorder = new OperationTraceRecorder(store as any);
});
const buildCeEvent = (overrides: Record<string, unknown> = {}) => ({
const buildCe = (overrides: { input?: unknown; output?: unknown } = {}) => ({
input: { messages: ['hello'] },
output: { tokens: 42 },
type: 'context_engine_result',
...overrides,
});
const appendStepWithCe = (ceEvent: Record<string, unknown>, prevStepsInStore: any[]) => {
const appendStepWithCe = (
ce: { input?: unknown; output?: unknown },
prevStepsInStore: any[],
) => {
store.loadPartial.mockResolvedValue({ startedAt: 1, steps: prevStepsInStore });
return recorder.appendStep('op-ce', {
afterStepSignalEvents: [],
agentState: { messages: [] },
beforeStepSignalEvents: [],
contextEngine: ce,
currentContext: { phase: 'user_input' },
externalRetryCount: 0,
presentation: buildPresentation(),
startedAt: 1000,
stepIndex: prevStepsInStore.length,
stepResult: {
events: [ceEvent],
events: [],
newState: { activatedStepTools: [], messages: [] },
},
});
@@ -388,8 +391,8 @@ describe('OperationTraceRecorder', () => {
return saved.steps.find((s: any) => s.stepIndex === newStepIndex);
};
it('extracts CE event into contextEngine, not in events', async () => {
await appendStepWithCe(buildCeEvent(), []);
it('routes CE input/output into contextEngine, never into events', async () => {
await appendStepWithCe(buildCe(), []);
const step = getSavedStep(0);
// CE data lives in contextEngine, not events
@@ -398,7 +401,7 @@ describe('OperationTraceRecorder', () => {
});
it('keeps both input and output on the first step (no previous CE to compare against)', async () => {
await appendStepWithCe(buildCeEvent(), []);
await appendStepWithCe(buildCe(), []);
const step = getSavedStep(0);
expect(step.contextEngine.input).toEqual({ messages: ['hello'] });
@@ -411,7 +414,7 @@ describe('OperationTraceRecorder', () => {
stepIndex: 0,
stepType: 'call_llm',
};
await appendStepWithCe(buildCeEvent(), [prevStep]);
await appendStepWithCe(buildCe(), [prevStep]);
const step = getSavedStep(1);
expect(step.contextEngine.input).toBeUndefined();
@@ -424,10 +427,9 @@ describe('OperationTraceRecorder', () => {
stepIndex: 0,
stepType: 'call_llm',
};
await appendStepWithCe(
buildCeEvent({ input: { messages: ['hello'] }, output: { tokens: 99 } }),
[prevStep],
);
await appendStepWithCe(buildCe({ input: { messages: ['hello'] }, output: { tokens: 99 } }), [
prevStep,
]);
const step = getSavedStep(1);
expect(step.contextEngine.input).toBeUndefined();
@@ -440,10 +442,9 @@ describe('OperationTraceRecorder', () => {
stepIndex: 0,
stepType: 'call_llm',
};
await appendStepWithCe(
buildCeEvent({ input: { messages: ['world'] }, output: { tokens: 42 } }),
[prevStep],
);
await appendStepWithCe(buildCe({ input: { messages: ['world'] }, output: { tokens: 42 } }), [
prevStep,
]);
const step = getSavedStep(1);
expect(step.contextEngine.input).toEqual({ messages: ['world'] });
@@ -456,10 +457,9 @@ describe('OperationTraceRecorder', () => {
stepIndex: 0,
stepType: 'call_llm',
};
await appendStepWithCe(
buildCeEvent({ input: { messages: ['new'] }, output: { tokens: 20 } }),
[prevStep],
);
await appendStepWithCe(buildCe({ input: { messages: ['new'] }, output: { tokens: 20 } }), [
prevStep,
]);
const step = getSavedStep(1);
expect(step.contextEngine.input).toEqual({ messages: ['new'] });
@@ -476,7 +476,7 @@ describe('OperationTraceRecorder', () => {
// step 1 has no contextEngine — dedup must skip it and walk back to step 0
{ stepIndex: 1, stepType: 'call_tool' },
];
await appendStepWithCe(buildCeEvent(), prevSteps);
await appendStepWithCe(buildCe(), prevSteps);
const step = getSavedStep(2);
expect(step.contextEngine.input).toBeUndefined();
@@ -492,7 +492,7 @@ describe('OperationTraceRecorder', () => {
{ contextEngine: { output: { tokens: 42 } }, stepIndex: 1, stepType: 'call_llm' },
];
await appendStepWithCe(
buildCeEvent({ input: { messages: ['hello'] }, output: { tokens: 42 } }),
buildCe({ input: { messages: ['hello'] }, output: { tokens: 42 } }),
prevSteps,
);
@@ -501,7 +501,7 @@ describe('OperationTraceRecorder', () => {
expect(step.contextEngine.output).toBeUndefined();
});
it('sets contextEngine to undefined when the step has no context_engine_result event', async () => {
it('sets contextEngine to undefined when no CE was recorded for the step', async () => {
const prevStep = {
contextEngine: { input: { messages: ['hello'] }, output: { tokens: 42 } },
stepIndex: 0,
@@ -898,3 +898,82 @@ describe('AgentRuntimeService.executeStep - error-path snapshot finalize ()', ()
dispatchSpy.mockRestore();
});
});
// step_start event should carry the canonical UIChatMessage[] so the
// client can use the pushed payload as Source of Truth.
describe('AgentRuntimeService.executeStep - step_start uiMessages payload', () => {
const createService = () => {
return new AgentRuntimeService({} as any, 'user-1', { queueService: null });
};
it('attaches uiMessages to step_start data when topic context is known', async () => {
const service = createService();
const coordinator = (service as any).coordinator;
const streamManager = (service as any).streamManager;
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
// Force early-exit path so we don't need to mock the entire runtime
// execution surface — terminal-state short-circuits right after
// step_start publishes.
coordinator.loadAgentState = vi.fn().mockResolvedValue({
status: 'done',
stepCount: 3,
lastModified: new Date().toISOString(),
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
});
streamManager.publishStreamEvent = vi.fn().mockResolvedValue(undefined);
// Inject a uiMessages-returning messageService — the runtime queries
// through MessageService (not the bare messageModel) so that file URLs
// go through FileService postProcessUrl.
const stubMessages = [{ id: 'msg_1', role: 'user' }];
(service as any).messageServiceInstance = {
queryMessages: vi.fn().mockResolvedValue(stubMessages),
};
await service.executeStep({
operationId: 'op-uimsg',
stepIndex: 5,
context: { phase: 'user_input' } as any,
});
// First publish call is step_start; assert its payload carries uiMessages.
const stepStartCall = streamManager.publishStreamEvent.mock.calls.find(
([, evt]: any) => evt?.type === 'step_start',
);
expect(stepStartCall).toBeDefined();
expect(stepStartCall[1].data.uiMessages).toEqual(stubMessages);
});
it('omits uiMessages from step_start data when topic context is unknown', async () => {
const service = createService();
const coordinator = (service as any).coordinator;
const streamManager = (service as any).streamManager;
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
coordinator.loadAgentState = vi.fn().mockResolvedValue({
status: 'done',
stepCount: 3,
lastModified: new Date().toISOString(),
metadata: {}, // no agentId/topicId
});
streamManager.publishStreamEvent = vi.fn().mockResolvedValue(undefined);
const queryMock = vi.fn();
(service as any).messageServiceInstance = { queryMessages: queryMock };
await service.executeStep({
operationId: 'op-noctx',
stepIndex: 5,
context: { phase: 'user_input' } as any,
});
const stepStartCall = streamManager.publishStreamEvent.mock.calls.find(
([, evt]: any) => evt?.type === 'step_start',
);
expect(stepStartCall).toBeDefined();
expect(stepStartCall[1].data).not.toHaveProperty('uiMessages');
// Did not even attempt the DB query when context is missing.
expect(queryMock).not.toHaveBeenCalled();
});
});
@@ -256,7 +256,11 @@ describe('AgentRuntimeService.executeSync', () => {
// Publish event after a small delay
setTimeout(async () => {
await streamEventManager.publishAgentRuntimeEnd(operationId, 1, { status: 'done' });
await streamEventManager.publishAgentRuntimeEnd({
finalState: { status: 'done' },
operationId,
stepIndex: 1,
});
}, 10);
const event = await streamEventManager.waitForEvent(operationId, 'agent_runtime_end', 1000);
+7 -1
View File
@@ -873,7 +873,13 @@ export class AiAgentService {
if (!result.success) {
log('execAgent: remote hetero dispatch failed: %s', result.error);
await streamManager
.publishAgentRuntimeEnd(operationId, 0, { error: result.error }, 'error', result.error)
.publishAgentRuntimeEnd({
finalState: { error: result.error },
operationId,
reason: 'error',
reasonDetail: result.error,
stepIndex: 0,
})
.catch(() => {});
await this.messageModel.update(assistantMsg.id, {
content: '',
+13
View File
@@ -2,6 +2,7 @@ import { type LobeChatDatabase } from '@lobechat/database';
import { CompressionRepository } from '@lobechat/database';
import {
type CreateMessageParams,
type QueryMessageParams,
type UIChatMessage,
type UpdateMessageParams,
} from '@lobechat/types';
@@ -111,6 +112,18 @@ export class MessageService {
return { messages, success: true };
}
/**
* Fetch the canonical message list for an agent / topic scope, using the
* standard UIChatMessage shape (file URLs resolved through FileService).
*
* Mirrors the read path exposed by the `message.getMessages` trpc lambda
* so server-internal callers (e.g. agent runtime stream events) can push
* the same payload the client would otherwise fetch.
*/
async queryMessages(params: QueryMessageParams): Promise<UIChatMessage[]> {
return this.messageModel.query(params, this.getQueryOptions());
}
/**
* Create a new message and return the complete message list
* Pattern: create + query
@@ -360,6 +360,50 @@ describe('agentSelectors', () => {
});
});
describe('canCurrentAgentPublishToCommunity', () => {
it('should allow publishing normal agents', () => {
const state = createState({
activeAgentId: 'agent-1',
agentMap: { 'agent-1': { id: 'agent-1' } },
});
expect(agentSelectors.canCurrentAgentPublishToCommunity(state)).toBe(true);
});
it('should prevent publishing local heterogeneous agents', () => {
const state = createState({
activeAgentId: 'agent-1',
agentMap: {
'agent-1': {
agencyConfig: {
heterogeneousProvider: { command: 'codex', type: 'codex' },
},
id: 'agent-1',
},
},
});
expect(agentSelectors.canCurrentAgentPublishToCommunity(state)).toBe(false);
});
it('should prevent publishing platform agents', () => {
const state = createState({
activeAgentId: 'agent-1',
agentMap: {
'agent-1': {
agencyConfig: {
boundDeviceId: 'device-1',
heterogeneousProvider: { type: 'openclaw' },
},
id: 'agent-1',
},
},
});
expect(agentSelectors.canCurrentAgentPublishToCommunity(state)).toBe(false);
});
});
describe('currentKnowledgeIds', () => {
it('should return enabled file and knowledge base IDs', () => {
const state = createState({
+4
View File
@@ -286,6 +286,9 @@ const isCurrentAgentExternal = (s: AgentStoreState): boolean => !currentAgentDat
const isCurrentAgentHeterogeneous = (s: AgentStoreState): boolean =>
!!currentAgentConfig(s)?.agencyConfig?.heterogeneousProvider;
const canCurrentAgentPublishToCommunity = (s: AgentStoreState): boolean =>
!!currentAgentData(s) && !isCurrentAgentHeterogeneous(s);
const currentAgentHeterogeneousProviderType = (s: AgentStoreState) =>
currentAgentConfig(s)?.agencyConfig?.heterogeneousProvider?.type;
@@ -293,6 +296,7 @@ const getAgentDocumentsById = (agentId: string) => (s: AgentStoreState) =>
s.agentDocumentsMap[agentId];
export const agentSelectors = {
canCurrentAgentPublishToCommunity,
currentAgentHeterogeneousProviderType,
currentAgentAvatar,
currentAgentBackgroundColor,
@@ -378,6 +378,139 @@ describe('runAgent actions', () => {
}),
);
});
it('replaces messages from server-pushed uiMessages snapshot (SoT)', async () => {
const replaceMessages = vi.fn();
act(() => {
useChatStore.setState({
replaceMessages,
operations: {
[TEST_IDS.OPERATION_ID]: {
abortController: new AbortController(),
context: { agentId: 'agent-1', topicId: 'topic-1' },
id: TEST_IDS.OPERATION_ID,
metadata: { lastEventId: '0', startTime: Date.now(), stepCount: 0 },
status: 'running',
type: 'groupAgentGenerate',
},
},
});
});
const { result } = renderHook(() => useChatStore());
const context = createStreamingContext({ assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID });
const uiMessages = [{ id: 'msg_a', role: 'user' }] as any;
const event: StreamEvent = {
type: 'step_start',
timestamp: Date.now(),
operationId: TEST_IDS.OPERATION_ID,
data: { phase: 'tool_execution', uiMessages },
};
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
event,
context,
);
});
expect(replaceMessages).toHaveBeenCalledWith(uiMessages, {
context: { agentId: 'agent-1', topicId: 'topic-1' },
});
});
it('does not call replaceMessages when uiMessages absent on step_start', async () => {
const replaceMessages = vi.fn();
act(() => {
useChatStore.setState({ replaceMessages });
});
const { result } = renderHook(() => useChatStore());
const context = createStreamingContext({ assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID });
const event: StreamEvent = {
type: 'step_start',
timestamp: Date.now(),
operationId: TEST_IDS.OPERATION_ID,
data: { phase: 'tool_execution' }, // no uiMessages
};
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
event,
context,
);
});
expect(replaceMessages).not.toHaveBeenCalled();
});
});
describe('agent_runtime_end event', () => {
it('replaces messages from terminal uiMessages snapshot (final-step SoT)', async () => {
const replaceMessages = vi.fn();
act(() => {
useChatStore.setState({
replaceMessages,
operations: {
[TEST_IDS.OPERATION_ID]: {
abortController: new AbortController(),
context: { agentId: 'agent-1', topicId: 'topic-1' },
id: TEST_IDS.OPERATION_ID,
metadata: { lastEventId: '0', startTime: Date.now(), stepCount: 0 },
status: 'running',
type: 'groupAgentGenerate',
},
},
});
});
const { result } = renderHook(() => useChatStore());
const uiMessages = [{ id: 'msg_final', role: 'assistantGroup' }] as any;
const event: StreamEvent = {
type: 'agent_runtime_end',
timestamp: Date.now(),
operationId: TEST_IDS.OPERATION_ID,
data: { finalState: { status: 'done' }, reason: 'done', uiMessages },
};
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
event,
createStreamingContext({ assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID }),
);
});
expect(replaceMessages).toHaveBeenCalledWith(uiMessages, {
context: { agentId: 'agent-1', topicId: 'topic-1' },
});
});
});
describe('step_complete event', () => {
// The previous DB-refetch on tool_execution was the source of the
// assistantGroup-clobber regression (LOBE-9501) — tool results are
// now reconciled via the next step_start's uiMessages snapshot.
it('does NOT refreshMessages on tool_execution phase', async () => {
const { result } = renderHook(() => useChatStore());
const event: StreamEvent = {
type: 'step_complete',
timestamp: Date.now(),
operationId: TEST_IDS.OPERATION_ID,
data: { executionTime: 10, phase: 'tool_execution', result: { ok: true } },
};
await act(async () => {
await result.current.internal_handleAgentStreamEvent(
TEST_IDS.OPERATION_ID,
event,
createStreamingContext({ assistantId: TEST_IDS.ASSISTANT_MESSAGE_ID }),
);
});
expect(result.current.refreshMessages).not.toHaveBeenCalled();
});
});
});
});
@@ -118,9 +118,19 @@ export class AgentActionImpl {
case 'agent_runtime_end': {
// Agent runtime finished - this is the definitive signal that generation is complete
const { reason, reasonDetail, finalState } = event.data || {};
const { reason, reasonDetail, finalState, uiMessages } = event.data || {};
log(`Agent runtime ended for ${assistantId}: reason=${reason}, detail=${reasonDetail}`);
// Server pushes the canonical UIChatMessage[] snapshot for the
// topic as the Source of Truth on terminal-state. The last step
// has no later step_start to carry a fresh snapshot, so without
// this branch the streamed assistantGroup would only be reconciled
// with DB once a refetch fires — losing the SoT guarantee.
if (Array.isArray(uiMessages)) {
log(`Replacing messages from agent_runtime_end uiMessages (${uiMessages.length} msgs)`);
this.#get().replaceMessages(uiMessages, { context: operation.context });
}
// Update operation metadata with final state
if (finalState) {
this.#get().updateOperationMetadata(operationId, {
@@ -276,7 +286,19 @@ export class AgentActionImpl {
}
case 'step_start': {
const { phase, toolCall, pendingToolsCalling, requiresApproval } = event.data || {};
const { phase, toolCall, pendingToolsCalling, requiresApproval, uiMessages } =
event.data || {};
// Server attaches the canonical UIChatMessage[] snapshot to
// step_start so the client uses the pushed payload as Source of
// Truth instead of refetching from DB (the DB fan-out from the
// previous step's stream chunks is async — a refetch here would
// return a stale assistant placeholder that clobbers the
// streamed assistantGroup).
if (Array.isArray(uiMessages)) {
log(`Replacing messages from step_start uiMessages (${uiMessages.length} msgs)`);
this.#get().replaceMessages(uiMessages, { context: operation.context });
}
if (phase === 'human_approval' && requiresApproval) {
// Requires human approval
@@ -301,8 +323,10 @@ export class AgentActionImpl {
if (phase === 'tool_execution' && result) {
log(`Tool execution completed for ${assistantId} in ${executionTime}ms:`, result);
// Refresh messages to display tool results
await this.#get().refreshMessages();
// Tool results are reconciled via the canonical uiMessages
// snapshot the server pushes on the next step_start; no need
// to refetch from DB here (the refetch was the source of the
// assistantGroup-clobber regression that LOBE-9501 fixes).
} else if (phase === 'execution_complete' && finalState) {
// Agent execution complete
log(`Agent execution completed for ${assistantId}:`, finalState);
@@ -82,7 +82,7 @@ describe('createGatewayEventHandler', () => {
});
describe('stream_start', () => {
it('should associate new message with operation', async () => {
it('should associate new message with operation and skip the DB refetch (LOBE-9501)', async () => {
const store = createMockStore();
const handler = createHandler(store);
@@ -90,7 +90,12 @@ describe('createGatewayEventHandler', () => {
await flush();
expect(store.associateMessageWithOperation).toHaveBeenCalledWith('msg-step2', 'op-1');
expect(store.replaceMessages).toHaveBeenCalled();
// Native gateway streams carry the new assistant id directly + a SoT
// uiMessages snapshot on the preceding step_start, so stream_start must
// NOT trigger a DB refetch (the refetch is what clobbered the streamed
// assistantGroup with a stale placeholder).
expect(messageService.getMessages).not.toHaveBeenCalled();
expect(store.replaceMessages).not.toHaveBeenCalled();
expect(emitClientAgentSignalSourceEvent).toHaveBeenCalledWith(
expect.objectContaining({
payload: expect.objectContaining({
@@ -843,17 +848,14 @@ describe('createGatewayEventHandler', () => {
});
describe('sequential processing', () => {
it('should process stream_chunk only after stream_start refresh completes', async () => {
it('should dispatch stream_chunk to the new assistant id after stream_start switches it', async () => {
// Native gateway streams no longer await a DB fetch on stream_start
// (LOBE-9501) — but stream_chunk must still queue behind stream_start
// so the chunk targets the NEW assistant id (from stream_start.data),
// not the previous one.
const store = createMockStore();
const callOrder: string[] = [];
const { messageService } = await import('@/services/message');
(messageService.getMessages as any).mockImplementation(async () => {
callOrder.push('refresh_start');
await new Promise((r) => setTimeout(r, 10));
callOrder.push('refresh_end');
return [];
});
store.internal_dispatchMessage.mockImplementation(() => {
callOrder.push('dispatch');
});
@@ -867,10 +869,34 @@ describe('createGatewayEventHandler', () => {
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Hello' }));
await flush();
const refreshEndIdx = callOrder.indexOf('refresh_end');
// associate (from stream_start) precedes dispatch (from stream_chunk)
const associateIdx = callOrder.indexOf('associate');
const dispatchIdx = callOrder.indexOf('dispatch');
expect(refreshEndIdx).toBeGreaterThan(-1);
expect(dispatchIdx).toBeGreaterThan(refreshEndIdx);
expect(associateIdx).toBeGreaterThan(-1);
expect(dispatchIdx).toBeGreaterThan(associateIdx);
// Chunk targets the new id, proving the queue ordering held
expect(store.internal_dispatchMessage).toHaveBeenLastCalledWith(
expect.objectContaining({ id: 'msg-new', value: { content: 'Hello' } }),
{ operationId: 'op-1' },
);
// And no DB refetch was issued for the native stream
expect(messageService.getMessages).not.toHaveBeenCalled();
});
it('should still fetch from DB on stream_start when assistantMessage id is absent (hetero CLI)', async () => {
// Hetero CLI adapters (Claude Code / Codex) never set
// `assistantMessage.id` on stream_start, so the DB read is still
// mandatory — it pulls the executor-created placeholder into
// `dbMessagesMap` so subsequent chunks have a target.
const store = createMockStore();
const handler = createHandler(store);
handler(makeEvent('stream_start', {}));
await flush();
expect(messageService.getMessages).toHaveBeenCalled();
expect(store.replaceMessages).toHaveBeenCalled();
});
});
@@ -901,18 +927,22 @@ describe('createGatewayEventHandler', () => {
// Loading stays active between steps — only tool streaming is cleared
expect(store.internal_toggleToolCallingStreaming).toHaveBeenCalledWith('msg-1', undefined);
// Tool execution
// Tool execution — tool_end still refreshes from DB to pick up the
// server-created tool message row.
handler(makeEvent('tool_start', { parentMessageId: 'msg-1', toolCalling: tools[0] }));
handler(makeEvent('tool_end', { isSuccess: true }));
await flush();
expect(store.replaceMessages).toHaveBeenCalled();
// Step 2: Next LLM call with new assistant message
// Step 2: Next LLM call with new assistant message — native stream_start
// carries the id directly, so it must NOT trigger a DB refetch
// (LOBE-9501). Only the association switch happens.
vi.clearAllMocks();
handler(makeEvent('stream_start', { assistantMessage: { id: 'msg-2' } }));
await flush();
expect(store.replaceMessages).toHaveBeenCalled();
expect(store.associateMessageWithOperation).toHaveBeenCalledWith('msg-2', 'op-1');
expect(messageService.getMessages).not.toHaveBeenCalled();
expect(store.replaceMessages).not.toHaveBeenCalled();
handler(makeEvent('stream_chunk', { chunkType: 'text', content: 'Here are the results.' }));
await flush();
@@ -7,7 +7,12 @@ import type {
ToolExecuteData,
ToolStartData,
} from '@lobechat/agent-gateway-client';
import type { BuiltinToolResult, ChatMessageError, ConversationContext } from '@lobechat/types';
import type {
BuiltinToolResult,
ChatMessageError,
ConversationContext,
UIChatMessage,
} from '@lobechat/types';
import { AgentRuntimeErrorType } from '@lobechat/types';
import { messageService } from '@/services/message';
@@ -276,24 +281,35 @@ export const createGatewayEventHandler = (
accumulatedContent = '';
accumulatedReasoning = '';
// Heterogeneous CLI adapters emit `stream_start { newStep: true }`
// without a server-side assistant id. Pull the freshly created step
// assistant from DB so subsequent live chunks update the RIGHT row
// instead of appending onto the previous step's assistant.
const messages = await fetchAndReplaceMessages(get, context).catch((error) => {
console.error(error);
return undefined;
});
// Skip the DB read ONLY for native gateway streams — those carry
// `assistantMessage.id` directly on stream_start AND the preceding
// `step_start` already carried the SoT uiMessages snapshot, so
// chunks have a valid target in `dbMessagesMap` already. Removing
// the await here is what un-blocks the enqueue chain so live
// chunks can land mid-stream (LOBE-9501).
//
// Hetero CLI adapters (Claude Code / Codex) never set
// `assistantMessage.id` on stream_start, so the DB read stays
// mandatory for them — it (a) pulls the executor-created
// placeholder into `dbMessagesMap` so subsequent chunks can
// dispatch to it, and (b) resolves the next-step assistant id for
// the `newStep` fallback.
if (!newAssistantMessageId) {
const messages = await fetchAndReplaceMessages(get, context).catch((error) => {
console.error(error);
return undefined;
});
if (!newAssistantMessageId && data?.newStep) {
const resolvedAssistantMessageId = findNextAssistantMessageId(
messages as GatewayMessageLike[] | undefined,
currentAssistantMessageId,
);
if (data?.newStep) {
const resolvedAssistantMessageId = findNextAssistantMessageId(
messages as GatewayMessageLike[] | undefined,
currentAssistantMessageId,
);
if (resolvedAssistantMessageId) {
currentAssistantMessageId = resolvedAssistantMessageId;
get().associateMessageWithOperation(currentAssistantMessageId, operationId);
if (resolvedAssistantMessageId) {
currentAssistantMessageId = resolvedAssistantMessageId;
get().associateMessageWithOperation(currentAssistantMessageId, operationId);
}
}
}
@@ -406,8 +422,18 @@ export const createGatewayEventHandler = (
pendingToolsCalling?: unknown[];
phase?: string;
requiresApproval?: boolean;
uiMessages?: UIChatMessage[];
};
// Server attaches the canonical UIChatMessage[] snapshot at every
// step boundary (agent-runtime #15152). Use it as Source of Truth
// instead of issuing a DB refetch — the refetch returns a stale
// assistant placeholder while DB fan-out is still in flight, which
// clobbers the in-memory streamed assistantGroup (LOBE-9501).
if (Array.isArray(data?.uiMessages)) {
get().replaceMessages(data.uiMessages, { action: 'gateway/step_start', context });
}
if (data?.phase === 'human_approval' && data.requiresApproval && data.pendingToolsCalling) {
void notifyDesktopHumanApprovalRequired(get, context);
// Persist a paused marker so the sidebar reflects "waiting on user" across reload.
@@ -475,6 +501,8 @@ export const createGatewayEventHandler = (
case 'agent_runtime_end': {
enqueue(async () => {
const data = event.data as { uiMessages?: UIChatMessage[] } | undefined;
void emitClientAgentSignalSourceEvent({
payload: {
agentId: context.agentId,
@@ -499,7 +527,18 @@ export const createGatewayEventHandler = (
get().markUnreadCompleted(completedOp.context.agentId, completedOp.context.topicId);
}
await fetchAndReplaceMessages(get, context).catch(console.error);
// Terminal step has no later step_start to carry SoT — server
// pushes the canonical snapshot directly on this event. Fall back
// to a DB refetch only if the snapshot is absent (older server
// builds, or push-event delivery edge cases).
if (Array.isArray(data?.uiMessages)) {
get().replaceMessages(data.uiMessages, {
action: 'gateway/agent_runtime_end',
context,
});
} else {
await fetchAndReplaceMessages(get, context).catch(console.error);
}
});
break;
}
+18 -1
View File
@@ -106,8 +106,24 @@ export class MessageQueryActionImpl {
useFetchMessages = (
context: ConversationContext,
skipFetch?: boolean,
options?: {
/**
* Skip the fetch entirely (e.g. while another flow owns the data).
* Equivalent to passing a null SWR key.
*/
skipFetch?: boolean;
/**
* Revalidate when the window regains focus. Defaults to SWR's
* client-data default (true). Pass `false` to suppress the focus
* refetch used during streaming so the in-memory stream payload
* (Source of Truth) isn't clobbered by a stale DB read while DB
* fan-out writes are still in flight.
*/
revalidateOnFocus?: boolean;
},
): SWRResponse<UIChatMessage[]> => {
const { skipFetch, revalidateOnFocus } = options ?? {};
// Skip fetch when skipFetch is true or required fields are missing
const shouldFetch = !skipFetch && !!context.agentId && !!context.topicId;
@@ -121,6 +137,7 @@ export class MessageQueryActionImpl {
// Use replaceMessages to store the fetched messages
this.#get().replaceMessages(data, { action: 'useFetchMessages', context });
},
...(revalidateOnFocus !== undefined && { revalidateOnFocus }),
},
);
};
@@ -122,6 +122,9 @@ exports[`settingsSelectors > defaultAgent > should merge DEFAULT_AGENT and s.set
"provider": "deepseek",
},
"searchMode": "auto",
"selfIteration": {
"enabled": false,
},
},
"model": "gpt-3.5-turbo",
"openingQuestions": [],
@@ -166,6 +169,9 @@ exports[`settingsSelectors > defaultAgentConfig > should merge DEFAULT_AGENT_CON
"provider": "deepseek",
},
"searchMode": "auto",
"selfIteration": {
"enabled": false,
},
},
"model": "gpt-4",
"openingQuestions": [],