Compare commits

..

4 Commits

Author SHA1 Message Date
Arvin Xu e56dcf9538 feat: restructure runtime mode selector with device binding support
- Replace flat runtime mode options with hierarchical structure:
  Device group (bound devices from device-gateway) / Sandbox / Disabled
- Add 'sandbox' to RuntimeEnvMode, keep 'cloud' for backward compat
- Add deviceId field to RuntimeEnvConfig for device-specific selection
- New DeviceSelector component with platform-specific icons
- New deviceService wrapping tRPC device.listDevices query
- Update i18n with device/sandbox labels in zh-CN and en-US
2026-05-24 02:56:47 +08:00
Arvin Xu 0fa2e2349c 🐛 fix(desktop): route gateway agent runs through lh hetero exec (#15132)
* feat(desktop): route gateway agent runs through lh hetero exec

Replace the desktop-side GatewayConnectionCtr.executeAgentRun() flow
(startSession -> sendPrompt with local AgentStreamPipeline) with a direct
lh hetero exec spawn. The lh CLI handles spawn -> adapt -> BatchIngester ->
heteroIngest/heteroFinish, matching the cloud sandbox path exactly.

Changes:
- HeterogeneousAgentCtr: add spawnLhHeteroExec() method
- GatewayConnectionCtr: executeAgentRun() now delegates to the new method

* 🐛 fix(desktop): remove duplicate lh token from hetero exec args

spawn('lh', args) already invokes the lh binary, so the leading 'lh'
in args made the effective command `lh lh hetero exec ...` and failed
before heteroIngest could run, breaking the gateway-triggered agent
run flow.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

---------

Co-authored-by: LobeHub Agent <agent@lobehub.com>
Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 02:54:00 +08:00
Arvin Xu 930344ae23 feat(agent-runtime): push UIChatMessage snapshot at gateway step boundaries (#15152)
* 🧪 chore(local-testing): add agent-gateway probe scripts for stream SoT validation

Probe + tab-switch + analyzer scripts under .agents/skills/local-testing/scripts/agent-gateway/
to capture in-browser snapshots of the message store during gateway streaming and detect
regressions where assistantGroup messages get clobbered by stale DB refetches.

Used to verify LOBE-9501.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

*  feat(agent-runtime): push canonical UIChatMessage snapshot at step boundaries

LOBE-9501

Gateway-mode streaming previously let the client refetch from DB on every
step_complete or tab-focus; with stream chunks landing before the DB write
fans out, the refetch returned a stale assistant placeholder that clobbered
the in-memory streamed assistantGroup (reasoning / tool calls / content).

Server now attaches the canonical UIChatMessage[] snapshot to step_start
and agent_runtime_end events so the client can use the pushed payload as
Source of Truth instead of refetching:

- step_start now loads agent state first, queries messages, and attaches
  uiMessages to the event data when topic context is known
- publishAgentRuntimeEnd signature switched to a params object (additive
  uiMessages field) and the coordinator resolves the snapshot through an
  optional uiMessagesResolver hook before publishing terminal events
- AgentRuntimeService wires the resolver through a lazily-instantiated
  MessageService so tests without S3 env still construct cleanly
- MessageService.queryMessages exposes the same read path as the
  message.getMessages trpc lambda (FileService postProcessUrl included)

Pure additive on the wire: legacy consumers see new uiMessages field, old
finalState payload unchanged. Existing call sites in agentNotify and
aiAgent migrated to the params shape. Failures in the resolver fall back
to publishing without uiMessages so streaming never fails the step.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* 🐛 fix(agent-runtime): forward uiMessages in gateway /push-event payload

LOBE-9501

GatewayStreamNotifier.publishAgentRuntimeEnd was delegating uiMessages to
the inner manager (Redis SSE) but reconstructing its own push-event data
object that only carried { errorType, finalState, reason, reasonDetail }.
In gateway mode, clients consume /push-event rather than Redis directly,
so the canonical UIChatMessage[] snapshot never reached them at terminal
state — and the final step has no later step_start to carry a fresh one.

Forward uiMessages via the same conditional-spread pattern used in the
inner managers; add two tests covering the present/absent branches.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-24 01:23:21 +08:00
Arvin Xu 538195dfb4 🐛 fix(agent-runtime): route context engine payload out of the events stream (#15151)
* 🐛 fix(agent-runtime): route context engine payload out of the events stream

`call_llm` previously pushed a `context_engine_result` event carrying the
full `contextEngineInput` (agentDocuments, systemRole, knowledge, …) into
the per-step events array. That array is the same one persisted into
Redis `agent_runtime_events`, so every step shipped the heavy CE payload
into the state pipeline even though the only consumer was the trace
recorder, which extracted CE into the typed `contextEngine` snapshot
field and immediately filtered the event back out.

Wire a typed `recordContextEngine` callback through
`RuntimeExecutorContext` instead. `AgentRuntimeService.executeStep`
buffers the call per step and hands it to
`OperationTraceRecorder.appendStep` via a new `contextEngine` param.
Trace snapshots are byte-identical; the events stream — and therefore
the Redis state blob — no longer carries CE.

Step toward LOBE-9110 (split state vs trace pipeline). Viewer keeps
the legacy `context_engine_result` reader for back-compat with older
on-disk snapshots.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* 🎨 refactor(agent-runtime): rename recordContextEngine to tracingContextEngine

The callback name now signals its role as the trace-pipeline channel,
matching the `tracing` prefix used elsewhere for non-state observability
wiring. Pure rename, no behavior change.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 01:14:12 +08:00
42 changed files with 1634 additions and 433 deletions
+6 -5
View File
@@ -14,7 +14,7 @@ In `NODE_ENV=development`, `AgentRuntimeService.executeStep()` automatically rec
**Data flow**: executeStep loop -> build `StepPresentationData` -> write partial snapshot to disk -> on completion, finalize to `.agent-tracing/{timestamp}_{traceId}.json`
**Context engine capture**: In `RuntimeExecutors.ts`, the `call_llm` executor emits a `context_engine_result` event after `serverMessagesEngine()` processes messages. This event carries the full `contextEngineInput` (DB messages, systemRole, model, knowledge, tools, userMemory, etc.) and the processed `output` messages (the final LLM payload).
**Context engine capture**: In `RuntimeExecutors.ts`, the `call_llm` executor calls `ctx.tracingContextEngine(input, output)` after `serverMessagesEngine()` processes messages. `AgentRuntimeService.executeStep` buffers the call per step and forwards it to `OperationTraceRecorder.appendStep` as the typed `contextEngine` field. CE flows through this side channel rather than the `events` array so its heavy payload (agentDocuments, systemRole, …) never enters the Redis state pipeline (LOBE-9110).
## Package Location
@@ -199,9 +199,10 @@ interface StepSnapshot {
messages?: any[]; // DB messages before step
context?: { phase: string; payload?: unknown; stepContext?: unknown };
events?: Array<{ type: string; [key: string]: unknown }>;
// context_engine_result event contains:
// input: full contextEngineInput (messages, systemRole, model, knowledge, tools, userMemory, ...)
// output: processed messages array (final LLM payload)
contextEngine?: {
input?: unknown; // contextEngineInput minus messages + toolsConfig (reconstructible from baseline)
output?: unknown; // processed messages array (final LLM payload)
};
}
```
@@ -216,5 +217,5 @@ When using `--messages`, the output shows three sections (if context engine data
## Integration Points
- **Recording**: `src/server/services/agentRuntime/AgentRuntimeService.ts` — in the `executeStep()` method, after building `stepPresentationData`, writes partial snapshot in dev mode
- **Context engine event**: `src/server/modules/AgentRuntime/RuntimeExecutors.ts` — in `call_llm` executor, after `serverMessagesEngine()` returns, emits `context_engine_result` event
- **Context engine capture**: `src/server/modules/AgentRuntime/RuntimeExecutors.ts` — in `call_llm` executor, after `serverMessagesEngine()` returns, calls `ctx.tracingContextEngine(input, output)`. `AgentRuntimeService.executeStep` buffers it per step and passes it to `traceRecorder.appendStep` as the typed `contextEngine` field (kept off the `events` array to stay out of Redis state).
- **Store**: `FileSnapshotStore` reads/writes to `.agent-tracing/` relative to `process.cwd()`
@@ -0,0 +1,93 @@
# LobeHub gateway streaming + tab-switch test harness
Captures store + DOM state at 200ms intervals so we can prove or disprove
claims like "切回 tab 后消息回到了很早以前". Built for gateway-mode chat but
works for any LobeHub streaming session.
## Files
`scripts/agent-gateway/`
| File | Role |
| --------------- | ---------------------------------------------------------------- |
| `probe.js` | Injects a 200ms sampler + `__PROBE_EVENT` marker + `__switchTab` |
| `probe-dump.js` | Stops the sampler and returns `{events, samples}` as JSON string |
| `tab-switch.js` | Runs N round-trip switches between two tabs, marks each step |
| `analyze.mjs` | Node post-processor: timeline + regression detection |
## Standard workflow
```bash
# 1. Start Electron with CDP
./.agents/skills/local-testing/scripts/electron-dev.sh start
# 2. Navigate to a chat, switch runtime to Cloud Sandbox (gateway mode)
# 3. Install the probe + helpers
agent-browser --cdp 9222 eval --stdin \
< .agents/skills/local-testing/scripts/agent-gateway/probe.js
# 4. Send a tool-call message — manually or via type+press
agent-browser --cdp 9222 eval "window.__PROBE_EVENT('SENT')"
# 5. Run the multi-switch driver (auto-picks active tab as BACK and the
# rightmost inactive tab as AWAY — edit ROUND_TRIPS / DWELL_MS in the
# file if you want different timing)
agent-browser --cdp 9222 eval --stdin \
< .agents/skills/local-testing/scripts/agent-gateway/tab-switch.js
# 6. Wait for streaming to finish, then dump
agent-browser --cdp 9222 eval --stdin \
< .agents/skills/local-testing/scripts/agent-gateway/probe-dump.js \
> /tmp/probe.json
# 7. Analyze
node .agents/skills/local-testing/scripts/agent-gateway/analyze.mjs /tmp/probe.json
```
The analyzer prints three sections: EVENTS, TIMELINE, REGRESSIONS. If
REGRESSIONS is non-empty it means content/reasoning/childN dropped on the
same topic — the symptom users describe.
## What the probe tracks (and why)
`chat.messagesMap` only stores the top-level `assistantGroup` shell. The
actual streamed content, reasoning, and tool calls live in
`assistantGroup.children: AssistantContentBlock[]`. Any probe that only
reads `m.content` / `m.reasoning` will see zeros throughout streaming and
miss everything that matters. probe.js walks both levels and sums:
- `cT` total content length
- `rT` total reasoning length
- `toolT` total tool-call count
- `childN` number of content blocks
Plus DOM-side signals (`domLen`, search/crawl indicator counts) so you can
tell store-side regressions apart from render-side regressions.
## Gotchas
- **Optimistic new-topic state.** Before the first chunk lands, messages
live under the `<scope>_new` key with `tmp_*` ids and no `topicId` field.
probe.js falls back to those when `activeTopicId` is null.
- **Reasoning resets to 0 are not bugs.** When the assistant finishes
thinking and starts tool-use or text, the streaming reasoning buffer
empties and the finalised reasoning gets sealed into a completed block.
Filter these out manually if needed.
- **DOM length jitters by a handful of chars** because counters like "(10)"
in tool-call labels change as results arrive. analyze.mjs only flags
`domLen` drops greater than 100 chars to ignore that noise.
- **Never identify tabs by innerText.** The active tab's text embeds a
` · <agent name>` suffix, so a search like `'LobeHub Growth'` matches the
active tab when the active agent happens to be LobeHub Growth — and you
end up clicking the tab you're already on. probe.js uses the stable
`data-contextmenu-trigger` attribute (a React `useId()` value that's set
per-tab and survives focus changes) plus `data-active="true"` to mark
the active one. Helpers exposed:
`__listTabs()` / `__clickTabByKey(key)` / `__clickTabByIndex(i)` /
`__activeTabKey()`.
- **`tab-switch.js` fires-and-forgets.** The IIFE kicks off an async loop
and returns immediately so the agent-browser CLI eval doesn't blow past
its default 25 s timeout. Wait on the `SWITCH_LOOP_DONE` event marker
before dumping. Re-running while a loop is in flight is refused — the
chaotic data from overlapping runs is not worth debugging.
@@ -0,0 +1,119 @@
#!/usr/bin/env node
// Analyze a probe dump captured by probe.js + probe-dump.js.
//
// node analyze.mjs /tmp/probe.json
//
// Prints:
// 1. EVENTS — user-action markers with their relative timestamps
// 2. TIMELINE — periodic samples (~1 per second + event-adjacent samples)
// showing every interesting field; columns:
// t(ms) | runOps | msgN | childN | content | reasoning | tools | domLen | search | crawl | topic | event
// 3. REGRESSIONS — every place a tracked counter *dropped* on the same
// topic between adjacent samples. A "true" UI rollback shows up as a
// drop in content/reasoning/tools/childN/domLen without a topic change.
//
// Whitelisted transitions (not flagged):
// - topic change → all drops expected (focus moved away)
// - reasoning length 0 after content starts → reasoning gets sealed into a
// completed sub-block; the parent's running reasoning resets to ''.
// - msgN drop when topic transitions from `_new` placeholder to a real id.
import fs from 'node:fs';
const file = process.argv[2];
if (!file) {
console.error('usage: node analyze.mjs <probe.json>');
process.exit(1);
}
const raw = JSON.parse(fs.readFileSync(file, 'utf8'));
// probe-dump.js wraps the payload in JSON.stringify so agent-browser returns
// it as a single quoted string. Unwrap.
const data = typeof raw === 'string' ? JSON.parse(raw) : raw;
const { events, samples } = data;
const fmt = {
pad(v, n) {
return String(v).padStart(n);
},
};
console.log('=== EVENTS ===');
for (const e of events) console.log(` t=${fmt.pad(e.t, 7)} ${e.name}`);
console.log(
'\n=== TIMELINE (~1s cadence, plus event-adjacent samples) ===\n' +
' t(ms) runOps msgN childN content reasoning tools domLen search crawl topic event',
);
let lastSampledAt = -1e9;
const eventBuckets = events.map((e) => e.t);
for (let i = 0; i < samples.length; i++) {
const s = samples[i];
const nearEvent = eventBuckets.some((et) => Math.abs(et - s.t) < 110);
if (!nearEvent && s.t - lastSampledAt < 1000) continue;
lastSampledAt = s.t;
const ev = events.find((e) => Math.abs(e.t - s.t) < 110);
const evMarker = ev ? `${ev.name}` : '';
const topicSuffix = s.topicId ? s.topicId.slice(-6) : '(none)';
const search = s.ind?.search ?? 0;
const crawl = s.ind?.crawl ?? 0;
console.log(
` ${fmt.pad(s.t, 6)} ` +
`${fmt.pad(s.runOps, 6)} ` +
`${fmt.pad(s.msgN, 4)} ` +
`${fmt.pad(s.childN ?? 0, 5)} ` +
`${fmt.pad(s.cT ?? 0, 8)} ` +
`${fmt.pad(s.rT ?? 0, 9)} ` +
`${fmt.pad(s.toolT ?? 0, 5)} ` +
`${fmt.pad(s.domLen ?? 0, 7)} ` +
`${fmt.pad(search, 6)} ` +
`${fmt.pad(crawl, 5)} ` +
`${topicSuffix.padEnd(8)}${evMarker}`,
);
}
console.log('\n=== REGRESSIONS (same topic, value dropped) ===');
const regressions = [];
for (let i = 1; i < samples.length; i++) {
const prev = samples[i - 1];
const cur = samples[i];
if (!cur.topicId || prev.topicId !== cur.topicId) continue;
const drops = [];
if (cur.msgN < prev.msgN) drops.push(`msgN: ${prev.msgN}${cur.msgN}`);
if ((cur.childN ?? 0) < (prev.childN ?? 0)) drops.push(`childN: ${prev.childN}${cur.childN}`);
if ((cur.cT ?? 0) < (prev.cT ?? 0)) drops.push(`content: ${prev.cT}${cur.cT}`);
if ((cur.rT ?? 0) < (prev.rT ?? 0)) drops.push(`reasoning: ${prev.rT}${cur.rT}`);
if ((cur.toolT ?? 0) < (prev.toolT ?? 0)) drops.push(`tools: ${prev.toolT}${cur.toolT}`);
// domLen jitters by a few chars from counter labels — only flag big drops.
if ((cur.domLen ?? 0) < (prev.domLen ?? 0) - 100) {
drops.push(`domLen: ${prev.domLen}${cur.domLen}`);
}
if (drops.length === 0) continue;
const nearbyEv = events.filter((e) => Math.abs(e.t - cur.t) < 600).map((e) => e.name);
regressions.push({ t: cur.t, topic: cur.topicId.slice(-6), drops, nearbyEv });
}
if (regressions.length === 0) {
console.log(' (none)');
} else {
for (const r of regressions) {
const evStr = r.nearbyEv.length ? ` near:[${r.nearbyEv.join(',')}]` : '';
console.log(` t=${fmt.pad(r.t, 7)} topic=${r.topic} ${r.drops.join(' | ')}${evStr}`);
}
}
console.log(`\n=== SUMMARY ===`);
console.log(` samples: ${samples.length}`);
console.log(` events: ${events.length}`);
console.log(` regressions: ${regressions.length}`);
if (samples.length) {
const last = samples.at(-1);
console.log(
` final: msgN=${last.msgN} childN=${last.childN ?? 0} content=${last.cT ?? 0} ` +
`reasoning=${last.rT ?? 0} tools=${last.toolT ?? 0} runOps=${last.runOps}`,
);
}
@@ -0,0 +1,17 @@
// Stop the probe and serialize collected data.
//
// agent-browser --cdp 9222 eval --stdin < probe-dump.js > /tmp/probe.json
//
// The whole thing is wrapped in a JSON.stringify so agent-browser returns it
// as a single quoted string — the analyzer double-parses to handle that.
(function () {
if (window.__PROBE_TIMER) {
clearInterval(window.__PROBE_TIMER);
window.__PROBE_TIMER = null;
}
return JSON.stringify({
events: window.__PROBE_EVENTS || [],
samples: window.__PROBE_SAMPLES || [],
});
})();
@@ -0,0 +1,204 @@
// LobeHub chat streaming time-series probe.
//
// Inject into the renderer (via agent-browser eval) to record store + DOM
// snapshots every 200ms during a streaming session. Designed to surface
// "UI rolled back to an earlier state" symptoms — especially around
// gateway-mode tab switches that happen while the assistant is still writing.
//
// Usage:
// agent-browser --cdp 9222 eval --stdin < probe.js
// # ...do test interactions, call window.__PROBE_EVENT('LABEL') to mark moments...
// agent-browser --cdp 9222 eval --stdin < probe-dump.js > /tmp/probe.json
// node analyze.mjs /tmp/probe.json
//
// What it captures per sample:
// - activeTopicId
// - msgN: top-level messages in chat.messagesMap for this topic
// - childN: total assistantGroup.children blocks across all msgs (THIS is
// where streaming content actually lives — top-level assistantGroup stays empty)
// - cT / rT / toolT: totals across messages AND their children
// (content, reasoning, tool-call count)
// - perMsg: per-message breakdown so regressions can be located precisely
// - runOps: number of running operations (execServerAgentRuntime etc.)
// - domLen: total innerText length of the rendered chat list area
// - ind: visible UI indicators (Search pages, Crawled pages, Deeply Thought, Sending)
//
// Event markers: window.__PROBE_EVENT('NAME') records {t, name} into
// __PROBE_EVENTS, used by the analyzer to align state changes with
// user-driven actions (SENT, AWAY_1, BACK_1, ...).
(function () {
if (window.__PROBE_TIMER) clearInterval(window.__PROBE_TIMER);
window.__PROBE_SAMPLES = [];
window.__PROBE_EVENTS = [];
const t0 = Date.now();
function snapshot() {
try {
const chat = window.__LOBE_STORES.chat();
const topicId = chat.activeTopicId;
const idTail = topicId ? topicId.replace('tpc_', '') : null;
const keys = Object.keys(chat.messagesMap || {});
// Collect messages for the active topic. Before a topic is committed,
// optimistic messages live under the `<agentScope>_new` key — fall
// back to those when no topic is active yet.
let msgs = [];
if (idTail) {
keys.forEach((k) => {
if (k.includes(idTail)) msgs = msgs.concat(chat.messagesMap[k] || []);
});
} else {
keys
.filter((k) => k.endsWith('_new'))
.forEach((k) => {
msgs = msgs.concat(chat.messagesMap[k] || []);
});
}
// Walk top-level + assistantGroup.children. children carry the actual
// streamed content / reasoning / tool calls; the parent assistantGroup
// remains a placeholder (cLen=0, rLen=0) for its whole lifetime.
let totalContent = 0;
let totalReason = 0;
let totalTools = 0;
let childCount = 0;
const perMsg = msgs.map((m) => {
const cLen = (m.content || '').length;
const rLen = ((m.reasoning && m.reasoning.content) || '').length;
const tools = (m.tools || []).length;
totalContent += cLen;
totalReason += rLen;
totalTools += tools;
const children = m.children || [];
let chC = 0;
let chR = 0;
let chT = 0;
children.forEach((c) => {
chC += (c.content || '').length;
chR += ((c.reasoning && c.reasoning.content) || '').length;
chT += (c.tools || []).length;
});
totalContent += chC;
totalReason += chR;
totalTools += chT;
childCount += children.length;
return {
id: (m.id || '').slice(-8),
role: m.role,
cLen,
rLen,
tools,
chCount: children.length,
chC,
chR,
chT,
};
});
const ops = Object.values(chat.operations || {});
const runningOps = ops.filter((o) => o.status === 'running');
// DOM probe: total rendered text in the chat scroll area (proxy for
// "how much is actually visible to the user").
const convScroll =
document.querySelector(
'[data-chat-list], [class*="ChatList"], [class*="ConversationList"]',
) ||
document.querySelector('main [class*="scroll"]') ||
document.querySelector('main');
const domTxt = convScroll ? convScroll.innerText || '' : '';
const bodyTxt = document.body.innerText || '';
const searchMatches = (bodyTxt.match(/Search pages?:|Searched the web/g) || []).length;
const crawlMatches = (bodyTxt.match(/Crawl(ed|ing) pages?/g) || []).length;
window.__PROBE_SAMPLES.push({
t: Date.now() - t0,
topicId,
msgN: msgs.length,
childN: childCount,
cT: totalContent,
rT: totalReason,
toolT: totalTools,
perMsg,
runOps: runningOps.length,
runOpTypes: runningOps.map((o) => o.type),
domLen: domTxt.length,
ind: {
search: searchMatches,
crawl: crawlMatches,
sending: bodyTxt.includes('Sending message'),
deeplyThinking: bodyTxt.includes('Deeply Thinking'),
deeplyThought: bodyTxt.includes('Deeply Thought'),
},
});
} catch (e) {
window.__PROBE_SAMPLES.push({ t: Date.now() - t0, err: e.message });
}
}
snapshot();
window.__PROBE_TIMER = setInterval(snapshot, 200);
window.__PROBE_EVENT = function (name) {
window.__PROBE_EVENTS.push({ t: Date.now() - t0, name });
};
// Tab-switch helpers installed alongside the probe.
//
// The Electron tab bar mounts each tab as a div with data-insp-path
// ending in `TabItem.tsx:...`. The active tab is marked with
// data-active="true". DO NOT search by innerText — the active tab's text
// includes a ` · <agent name>` suffix that produces false matches when
// your search string happens to overlap with the agent name.
function listTabs() {
return Array.from(
document.querySelectorAll('[data-insp-path*="TabItem.tsx"][data-contextmenu-trigger]'),
).filter((t) => t.getBoundingClientRect().top < 30);
}
function tabKey(el) {
// Stable for the tab's lifetime; survives focus changes.
return el.getAttribute('data-contextmenu-trigger');
}
function findActiveTab() {
return listTabs().find((t) => t.getAttribute('data-active') === 'true') || null;
}
// Click by stable key captured earlier (preferred for round-trips).
window.__clickTabByKey = function (key) {
const tab = listTabs().find((t) => tabKey(t) === key);
if (!tab) return 'not found: key=' + key;
if (tab.getAttribute('data-active') === 'true') return 'already active: ' + key;
tab.click();
return 'clicked key=' + key;
};
// Click by index in the tab strip (0-based, left-to-right).
window.__clickTabByIndex = function (i) {
const tabs = listTabs();
if (i < 0 || i >= tabs.length) return 'index out of range: ' + i + '/' + tabs.length;
const t = tabs[i];
if (t.getAttribute('data-active') === 'true') return 'already active: i=' + i;
t.click();
return 'clicked i=' + i + ' key=' + tabKey(t);
};
// Snapshot all tabs in order: [{key, active, title (first 60 chars of innerText)}]
window.__listTabs = function () {
return listTabs().map((t, i) => ({
i,
key: tabKey(t),
active: t.getAttribute('data-active') === 'true',
title: (t.innerText || '').slice(0, 60),
}));
};
window.__activeTabKey = function () {
const a = findActiveTab();
return a ? tabKey(a) : null;
};
return 'probe installed';
})();
@@ -0,0 +1,72 @@
// Run N round-trip tab switches with event markers timed against the probe.
//
// agent-browser --cdp 9222 eval --stdin < tab-switch.js
//
// Captures the currently-active tab as the BACK target and the rightmost
// inactive tab as the AWAY target. Both are addressed by their stable
// data-contextmenu-trigger key (NOT by visible title — the active tab's
// innerText embeds a ` · <agent name>` suffix that breaks text matching).
//
// Fires the loop in the background and returns immediately so the
// agent-browser eval doesn't have to await the full ROUND_TRIPS × DWELL_MS
// duration. Wait on the `SWITCH_LOOP_DONE` event before dumping.
//
// Refuses to launch if a previous loop is still in flight.
//
// Requires probe.js to have been installed first (provides
// window.__PROBE_EVENT / __listTabs / __clickTabByKey / __activeTabKey).
(function () {
const ROUND_TRIPS = 4;
const DWELL_MS = 10_000;
if (!window.__PROBE_EVENT || !window.__listTabs || !window.__clickTabByKey) {
return 'probe not installed — eval probe.js first';
}
if (window.__SWITCH_LOOP_RUNNING) {
return 'switch loop already running — wait for SWITCH_LOOP_DONE first';
}
const tabs = window.__listTabs();
const activeTab = tabs.find((t) => t.active);
if (!activeTab) return 'no active tab — abort';
// Pick the first inactive tab as AWAY target. With multiple inactive tabs
// you'll usually want the one that's stable across the test — feel free
// to swap to tabs[tabs.length-1] if you want the rightmost.
const inactives = tabs.filter((t) => !t.active);
if (inactives.length === 0) return 'no inactive tab to switch to — abort';
const awayTab = inactives.at(-1); // rightmost inactive
const BACK_KEY = activeTab.key;
const AWAY_KEY = awayTab.key;
window.__SWITCH_LOOP_RUNNING = true;
window.__PROBE_EVENT('SWITCH_LOOP_CONFIG:back=' + BACK_KEY + ',away=' + AWAY_KEY);
(async function () {
function sleep(ms) {
return new Promise((r) => setTimeout(r, ms));
}
try {
window.__PROBE_EVENT('SWITCH_LOOP_START');
for (let i = 1; i <= ROUND_TRIPS; i++) {
window.__PROBE_EVENT('AWAY_' + i);
const awayResult = window.__clickTabByKey(AWAY_KEY);
window.__PROBE_EVENT('AWAY_' + i + '_RES:' + awayResult.slice(0, 50));
await sleep(DWELL_MS);
window.__PROBE_EVENT('BACK_' + i);
const backResult = window.__clickTabByKey(BACK_KEY);
window.__PROBE_EVENT('BACK_' + i + '_RES:' + backResult.slice(0, 50));
await sleep(DWELL_MS);
}
window.__PROBE_EVENT('SWITCH_LOOP_DONE');
} finally {
window.__SWITCH_LOOP_RUNNING = false;
}
})();
return 'switch loop kicked off (BACK=' + BACK_KEY + ', AWAY=' + AWAY_KEY + ')';
})();
@@ -172,36 +172,25 @@ export default class GatewayConnectionCtr extends ControllerModule {
request: AgentRunRequestMessage,
): Promise<{ reason?: string; status: 'accepted' | 'rejected' }> {
try {
const ctr = this.heterogeneousAgentCtr;
const serverUrl = await this.remoteServerConfigCtr.getRemoteServerUrl();
if (!serverUrl) {
return { reason: 'Remote server URL not configured', status: 'rejected' };
}
// Map agentType to binary name.
// claude-code → `claude` CLI; all other platforms use their type name as the binary.
const command = request.agentType === 'claude-code' ? 'claude' : request.agentType;
// Create a session for the hetero agent.
const { sessionId } = await ctr.startSession({
// Fire-and-forget: lh hetero exec handles spawn -> adapt ->
// BatchIngester -> heteroIngest/heteroFinish -> server -> Gateway -> clients.
// Same command as spawnHeteroSandbox() on the server side.
this.heterogeneousAgentCtr.spawnLhHeteroExec({
agentType: request.agentType,
args: [],
command,
cwd: request.cwd,
// Inject LOBEHUB_JWT so the CLI authenticates against heteroIngest.
env: { LOBEHUB_JWT: request.jwt },
jwt: request.jwt,
operationId: request.operationId,
prompt: request.prompt,
resumeSessionId: request.resumeSessionId,
serverUrl,
topicId: request.topicId,
});
// Fire-and-forget: sendPrompt runs the CLI until completion.
ctr
.sendPrompt({
operationId: request.operationId,
prompt: request.prompt,
sessionId,
})
.catch((err: Error) => {
// Errors are surfaced via heteroFinish on the server side.
// Log locally for desktop debugging only.
console.error('[GatewayConnectionCtr] agent run failed:', err.message);
});
return { status: 'accepted' };
} catch (err) {
const reason = err instanceof Error ? err.message : String(err);
@@ -1251,4 +1251,69 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
process.on('SIGTERM', onSignal);
process.on('SIGINT', onSignal);
}
/**
* Spawn `lh hetero exec` for gateway-driven agent runs.
* The `lh` CLI handles everything downstream — no local
* AgentStreamPipeline or IPC broadcast needed. Mirrors
* `spawnHeteroSandbox()` on the server side.
*/
spawnLhHeteroExec(params: {
agentType: string;
cwd?: string;
jwt: string;
operationId: string;
prompt: string;
resumeSessionId?: string;
serverUrl: string;
topicId: string;
}): void {
const { agentType, cwd, jwt, operationId, prompt, resumeSessionId, serverUrl, topicId } =
params;
const workDir = cwd ?? process.cwd();
const args = [
'hetero',
'exec',
'--type',
agentType,
'--operation-id',
operationId,
'--topic',
topicId,
'--render',
'none',
'--input-json',
'-',
'--cwd',
workDir,
...(resumeSessionId ? ['--resume', resumeSessionId] : []),
];
const env = {
...process.env,
...buildProxyEnv(this.app.storeManager.get('networkProxy')),
LOBEHUB_JWT: jwt,
LOBEHUB_SERVER: serverUrl,
};
logger.info('spawnLhHeteroExec: type=%s op=%s topic=%s', agentType, operationId, topicId);
const child = spawn('lh', args, {
cwd: workDir,
env,
stdio: ['pipe', 'inherit', 'inherit'],
});
child.stdin.write(JSON.stringify(prompt));
child.stdin.end();
child.on('error', (err) => {
logger.error('spawnLhHeteroExec: spawn failed — %s', err.message);
});
child.on('exit', (code, signal) => {
logger.info('spawnLhHeteroExec: exited — op=%s code=%s signal=%s', operationId, code, signal);
});
}
}
+4
View File
@@ -411,12 +411,16 @@
"rag.userQuery.actions.regenerate": "Regenerate Query",
"regenerate": "Regenerate",
"roleAndArchive": "Agent Profile & History",
"runtimeEnv.device.empty": "No devices available. Connect to gateway from the desktop app first.",
"runtimeEnv.mode.cloud": "Cloud Sandbox",
"runtimeEnv.mode.cloudDesc": "Run in a secure cloud sandbox",
"runtimeEnv.mode.local": "Local",
"runtimeEnv.mode.localDesc": "Access local files and commands",
"runtimeEnv.mode.none": "Off",
"runtimeEnv.mode.noneDesc": "Disable runtime environment",
"runtimeEnv.mode.sandbox": "Sandbox",
"runtimeEnv.mode.sandboxDesc": "Run in an isolated cloud sandbox",
"runtimeEnv.section.device": "Device",
"runtimeEnv.selectMode": "Select Runtime Environment",
"runtimeEnv.title": "Runtime Environment",
"search.grounding.imageSearchQueries": "Image Search Keywords",
+4
View File
@@ -411,12 +411,16 @@
"rag.userQuery.actions.regenerate": "重新生成 Query",
"regenerate": "重新生成",
"roleAndArchive": "助理档案与记录",
"runtimeEnv.device.empty": "暂无可用设备,请先在桌面端连接到网关",
"runtimeEnv.mode.cloud": "云端沙箱",
"runtimeEnv.mode.cloudDesc": "在安全的云端沙箱中运行",
"runtimeEnv.mode.local": "本地",
"runtimeEnv.mode.localDesc": "访问本地文件和命令",
"runtimeEnv.mode.none": "关闭",
"runtimeEnv.mode.noneDesc": "禁用运行时环境",
"runtimeEnv.mode.sandbox": "沙箱",
"runtimeEnv.mode.sandboxDesc": "在隔离的云端沙箱中运行",
"runtimeEnv.section.device": "设备",
"runtimeEnv.selectMode": "选择运行环境",
"runtimeEnv.title": "运行环境",
"search.grounding.imageSearchQueries": "图片搜索关键词",
@@ -22,7 +22,6 @@ import { desensitizeUrl } from '../../utils/desensitizeUrl';
import { getModelPricing } from '../../utils/getModelPricing';
import { isAccountDeactivatedError } from '../../utils/isAccountDeactivatedError';
import { isExceededContextWindowError } from '../../utils/isExceededContextWindowError';
import { isModelNotFoundError } from '../../utils/isModelNotFoundError';
import { isQuotaLimitError } from '../../utils/isQuotaLimitError';
import { MODEL_LIST_CONFIGS, processModelList } from '../../utils/modelParse';
import { StreamingResponse } from '../../utils/response';
@@ -333,15 +332,6 @@ export const handleDefaultAnthropicError = <T extends Record<string, any> = any>
};
}
if (isModelNotFoundError(errorMsg)) {
return {
endpoint: desensitizedEndpoint,
error: errorResult,
errorType: AgentRuntimeErrorType.ModelNotFound,
message,
};
}
if (isExceededContextWindowError(errorMsg)) {
return {
endpoint: desensitizedEndpoint,
@@ -746,16 +736,6 @@ export const createAnthropicCompatibleRuntime = <T extends Record<string, any> =
});
}
if (isModelNotFoundError(errorMsg)) {
return AgentRuntimeError.chat({
endpoint: desensitizedEndpoint,
error: errorResult,
errorType: AgentRuntimeErrorType.ModelNotFound,
message,
provider: this.id,
});
}
if (isExceededContextWindowError(errorMsg)) {
return AgentRuntimeError.chat({
endpoint: desensitizedEndpoint,
@@ -42,7 +42,6 @@ import { handleOpenAIError } from '../../utils/handleOpenAIError';
import { isAccountDeactivatedError } from '../../utils/isAccountDeactivatedError';
import { isExceededContextWindowError } from '../../utils/isExceededContextWindowError';
import { isInsufficientQuotaError } from '../../utils/isInsufficientQuotaError';
import { isModelNotFoundError } from '../../utils/isModelNotFoundError';
import { isQuotaLimitError } from '../../utils/isQuotaLimitError';
import { postProcessModelList } from '../../utils/postProcessModelList';
import {
@@ -1157,17 +1156,6 @@ export const createOpenAICompatibleRuntime = <T extends Record<string, any> = an
});
}
if (isModelNotFoundError(errorMsg)) {
log('model not found error detected from message');
return AgentRuntimeError.chat({
endpoint: desensitizedEndpoint,
error: errorResult,
errorType: AgentRuntimeErrorType.ModelNotFound,
message,
provider: this.id,
});
}
if (isInsufficientQuotaError(errorMsg)) {
log('insufficient quota error detected from message');
return AgentRuntimeError.chat({
@@ -1,7 +1,6 @@
import type { ILobeAgentRuntimeErrorType } from '../types/error';
import { AgentRuntimeErrorType } from '../types/error';
import { isExceededContextWindowError } from './isExceededContextWindowError';
import { isModelNotFoundError } from './isModelNotFoundError';
import { isQuotaLimitError } from './isQuotaLimitError';
export interface ParsedError {
@@ -113,10 +112,6 @@ export function parseGoogleErrorMessage(message: string): ParsedError {
return { error: { message }, errorType: AgentRuntimeErrorType.ProviderNoImageGenerated };
}
if (isModelNotFoundError(message)) {
return { error: { message }, errorType: AgentRuntimeErrorType.ModelNotFound };
}
if (isExceededContextWindowError(message)) {
return { error: { message }, errorType: AgentRuntimeErrorType.ExceededContextWindow };
}
@@ -1,84 +0,0 @@
import { describe, expect, it } from 'vitest';
import { isModelNotFoundError } from './isModelNotFoundError';
describe('isModelNotFoundError', () => {
it('should return false for undefined/empty input', () => {
expect(isModelNotFoundError(undefined)).toBe(false);
expect(isModelNotFoundError('')).toBe(false);
});
it('should detect "model not found" errors', () => {
expect(isModelNotFoundError('The model gpt-5 was not found')).toBe(false);
expect(isModelNotFoundError('model not found: gpt-5')).toBe(true);
});
it('should detect "model_not_found" code in message', () => {
expect(isModelNotFoundError('Error: model_not_found')).toBe(true);
});
it('should detect "model ... does not exist" (OpenAI)', () => {
expect(
isModelNotFoundError('The model `gpt-5` does not exist or you do not have access to it.'),
).toBe(true);
});
it('should detect "model or endpoint ... does not exist" (Volcengine/doubao)', () => {
expect(
isModelNotFoundError(
'The model or endpoint doubao-seed-2.0-pro does not exist or you do not have access to it.',
),
).toBe(true);
});
it('should NOT match "does not exist" without model context', () => {
// API key errors that incidentally say "does not exist"
expect(isModelNotFoundError('Your API key does not exist')).toBe(false);
// Deployment/endpoint errors
expect(isModelNotFoundError('The deployment for this resource does not exist')).toBe(false);
// Generic resource errors
expect(isModelNotFoundError('This user does not exist')).toBe(false);
expect(isModelNotFoundError('The organization does not exist')).toBe(false);
});
it('should NOT match when "model" and "does not exist" are in different sentences', () => {
expect(
isModelNotFoundError(
'This feature does not exist in your plan. Contact support to enable the model.',
),
).toBe(false);
expect(isModelNotFoundError('The model is fine. Your account does not exist.')).toBe(false);
});
it('should detect "no such model" errors', () => {
expect(isModelNotFoundError('no such model: custom-model-v1')).toBe(true);
});
it('should detect "not found model" errors', () => {
expect(isModelNotFoundError('not found model abc-123')).toBe(true);
});
it('should detect "model is not accessible" errors', () => {
expect(isModelNotFoundError('The model is not accessible with your current plan')).toBe(true);
});
it('should detect "model is not available" errors', () => {
expect(isModelNotFoundError('The requested model is not available in this region')).toBe(true);
});
it('should detect "invalid model" errors', () => {
expect(isModelNotFoundError('invalid model: test-model')).toBe(true);
});
it('should be case-insensitive', () => {
expect(isModelNotFoundError('MODEL NOT FOUND')).toBe(true);
expect(isModelNotFoundError('The Model Does Not Exist')).toBe(true);
});
it('should return false for unrelated error messages', () => {
expect(isModelNotFoundError('Insufficient Balance')).toBe(false);
expect(isModelNotFoundError('Invalid API key')).toBe(false);
expect(isModelNotFoundError('Rate limit reached')).toBe(false);
expect(isModelNotFoundError('context length exceeded')).toBe(false);
});
});
@@ -1,32 +0,0 @@
const MODEL_NOT_FOUND_PATTERNS = [
'model not found', // OpenAI / generic
'model_not_found', // OpenAI (code in message)
'no such model', // generic
'not found model', // some providers
'model is not accessible', // access-related model errors
'model is not available', // generic
'invalid model', // generic
];
// "does not exist" on its own is too broad (it can show up in API key,
// deployment, or unrelated errors). Require explicit model context:
// the word "model" must appear before "does not exist" within the same
// sentence. The char class excludes sentence terminators (. ! ?) but
// allows a period when followed by a digit, so version numbers like
// "doubao-seed-2.0-pro" don't accidentally break the match.
//
// Matches:
// - OpenAI: "The model `gpt-5` does not exist or you do not have access to it."
// - Volcengine: "The model or endpoint doubao-seed-2.0-pro does not exist..."
// Correctly ignores:
// - "Your API key does not exist"
// - "The deployment for this resource does not exist"
// - "The model is fine. Your account does not exist." (different sentences)
const MODEL_DOES_NOT_EXIST_REGEX = /\bmodel\b(?:[^!.?\n]|\.(?=\d))+?\bdoes not exist\b/i;
export const isModelNotFoundError = (message?: string): boolean => {
if (!message) return false;
const lower = message.toLowerCase();
if (MODEL_NOT_FOUND_PATTERNS.some((p) => lower.includes(p))) return true;
return MODEL_DOES_NOT_EXIST_REGEX.test(message);
};
+9 -3
View File
@@ -9,11 +9,12 @@ export type AgentMode = 'auto' | 'plan' | 'ask' | 'implement';
/**
* Runtime environment mode
* - local: Access local files and commands (desktop only)
* - cloud: Run in cloud sandbox
* - local: Run on a specific device (desktop only, requires deviceId)
* - sandbox: Run in isolated cloud sandbox
* - cloud: @deprecated Use 'sandbox' instead, kept for backward compatibility
* - none: No runtime environment
*/
export type RuntimeEnvMode = 'cloud' | 'local' | 'none';
export type RuntimeEnvMode = 'cloud' | 'local' | 'none' | 'sandbox';
export type RuntimePlatform = 'desktop' | 'web';
@@ -21,6 +22,11 @@ export type RuntimePlatform = 'desktop' | 'web';
* Runtime environment configuration
*/
export interface RuntimeEnvConfig {
/**
* Device ID when runtimeMode is 'local' (desktop only).
* Identifies which bound device to run on.
*/
deviceId?: string;
/**
* Runtime environment mode per platform
*/
+2 -1
View File
@@ -170,9 +170,10 @@ export interface LobeAgentChatConfig extends AgentMemoryChatConfig, AgentSelfIte
/**
* Zod schema for RuntimeEnvConfig
*/
const runtimeEnvModeEnum = z.enum(['local', 'cloud', 'none']);
const runtimeEnvModeEnum = z.enum(['local', 'cloud', 'none', 'sandbox']);
export const RuntimeEnvConfigSchema = z.object({
deviceId: z.string().optional(),
runtimeMode: z.record(z.string(), runtimeEnvModeEnum).optional(),
workingDirectory: z.string().optional(),
});
@@ -0,0 +1,109 @@
import { type DeviceAttachment } from '@lobechat/builtin-tool-remote-device';
import { Flexbox, Icon } from '@lobehub/ui';
import { createStaticStyles, cssVar, cx } from 'antd-style';
import { LaptopIcon, MonitorIcon, ServerIcon } from 'lucide-react';
import { memo } from 'react';
const styles = createStaticStyles(({ css }) => ({
deviceName: css`
font-size: 13px;
font-weight: 500;
color: ${cssVar.colorText};
`,
deviceOption: css`
cursor: pointer;
width: 100%;
padding-block: 8px;
padding-inline: 8px;
border-radius: ${cssVar.borderRadius};
transition: background-color 0.2s;
&:hover {
background: ${cssVar.colorFillTertiary};
}
`,
deviceOptionActive: css`
background: ${cssVar.colorFillTertiary};
`,
deviceOptionDesc: css`
font-size: 12px;
color: ${cssVar.colorTextDescription};
`,
deviceOptionIcon: css`
flex-shrink: 0;
border: 1px solid ${cssVar.colorFillTertiary};
border-radius: ${cssVar.borderRadius};
background: ${cssVar.colorBgElevated};
`,
sectionTitle: css`
padding-block: 6px 2px;
padding-inline: 8px;
font-size: 11px;
font-weight: 500;
color: ${cssVar.colorTextQuaternary};
text-transform: uppercase;
letter-spacing: 0.5px;
`,
}));
const PLATFORM_ICONS: Record<string, typeof LaptopIcon> = {
darwin: LaptopIcon,
linux: MonitorIcon,
win32: MonitorIcon,
};
interface DeviceSelectorProps {
activeDeviceId?: string;
devices: DeviceAttachment[];
onSelect: (deviceId: string) => void;
}
export const DeviceSelector = memo<DeviceSelectorProps>(
({ activeDeviceId, devices, onSelect }) => {
return (
<>
{devices.map((device) => {
const IconComp = PLATFORM_ICONS[device.platform] || ServerIcon;
const isActive = activeDeviceId === device.deviceId;
return (
<Flexbox
horizontal
align={'flex-start'}
className={cx(styles.deviceOption, isActive && styles.deviceOptionActive)}
gap={12}
key={device.deviceId}
onClick={() => onSelect(device.deviceId)}
>
<Flexbox
align={'center'}
className={styles.deviceOptionIcon}
height={32}
justify={'center'}
width={32}
>
<Icon icon={IconComp} size={16} />
</Flexbox>
<Flexbox flex={1}>
<div className={styles.deviceName}>{device.hostname}</div>
<div className={styles.deviceOptionDesc}>{device.platform}</div>
</Flexbox>
</Flexbox>
);
})}
</>
);
},
);
DeviceSelector.displayName = 'DeviceSelector';
/** Section header for device/sandbox/none groups */
export const SectionHeader = memo<{ label: string }>(({ label }) => (
<div className={styles.sectionTitle}>{label}</div>
));
SectionHeader.displayName = 'SectionHeader';
+142 -65
View File
@@ -4,6 +4,7 @@ import { Github } from '@lobehub/icons';
import { Flexbox, Icon, Popover, Skeleton, Tooltip } from '@lobehub/ui';
import { createStaticStyles, cssVar, cx } from 'antd-style';
import {
BoxIcon,
ChevronDownIcon,
CloudIcon,
FolderIcon,
@@ -12,9 +13,10 @@ import {
MonitorOffIcon,
SquircleDashed,
} from 'lucide-react';
import { memo, type ReactNode, useCallback, useMemo, useState } from 'react';
import { memo, type ReactNode, useCallback, useEffect, useMemo, useState } from 'react';
import { useTranslation } from 'react-i18next';
import { deviceService } from '@/services/device';
import { useAgentStore } from '@/store/agent';
import { agentByIdSelectors, chatConfigByIdSelectors } from '@/store/agent/selectors';
import { useChatStore } from '@/store/chat';
@@ -26,6 +28,7 @@ import { useUpdateAgentConfig } from '../hooks/useUpdateAgentConfig';
import { useChatInputStore } from '../store';
import ApprovalMode from './ApprovalMode';
import CloudRepoSwitcher from './CloudRepoSwitcher';
import { DeviceSelector, SectionHeader } from './DeviceSelector';
import GitStatus from './GitStatus';
import ModeSelector from './ModeSelector';
import { useRepoType } from './useRepoType';
@@ -35,6 +38,7 @@ const MODE_ICONS: Record<RuntimeEnvMode, typeof LaptopIcon> = {
cloud: CloudIcon,
local: LaptopIcon,
none: MonitorOffIcon,
sandbox: BoxIcon,
};
const styles = createStaticStyles(({ css }) => ({
@@ -63,6 +67,11 @@ const styles = createStaticStyles(({ css }) => ({
background: ${cssVar.colorFillSecondary};
}
`,
divider: css`
height: 1px;
margin-block: 4px;
background: ${cssVar.colorBorderSecondary};
`,
modeDesc: css`
font-size: 12px;
color: ${cssVar.colorTextTertiary};
@@ -107,16 +116,21 @@ const RuntimeConfig = memo(() => {
const { updateAgentChatConfig } = useUpdateAgentConfig();
const [dirPopoverOpen, setDirPopoverOpen] = useState(false);
const [modePopoverOpen, setModePopoverOpen] = useState(false);
const [devices, setDevices] = useState<Awaited<ReturnType<typeof deviceService.listDevices>>>([]);
const [devicesLoading, setDevicesLoading] = useState(false);
const showContextWindow = useChatInputStore((s) =>
s.rightActions.flat().includes('contextWindow'),
);
const [isLoading, runtimeMode, isHeterogeneous, enableAgentMode] = useAgentStore((s) => [
agentByIdSelectors.isAgentConfigLoadingById(agentId)(s),
chatConfigByIdSelectors.getRuntimeModeById(agentId)(s),
agentId ? agentByIdSelectors.isAgentHeterogeneousById(agentId)(s) : false,
agentByIdSelectors.getAgentEnableModeById(agentId)(s),
]);
const [isLoading, runtimeMode, isHeterogeneous, enableAgentMode, deviceId] = useAgentStore(
(s) => [
agentByIdSelectors.isAgentConfigLoadingById(agentId)(s),
chatConfigByIdSelectors.getRuntimeModeById(agentId)(s),
agentId ? agentByIdSelectors.isAgentHeterogeneousById(agentId)(s) : false,
agentByIdSelectors.getAgentEnableModeById(agentId)(s),
chatConfigByIdSelectors.getDeviceIdById(agentId)(s),
],
);
const topicWorkingDirectory = useChatStore(topicSelectors.currentTopicWorkingDirectory);
const agentWorkingDirectory = useAgentStore((s) =>
@@ -126,6 +140,17 @@ const RuntimeConfig = memo(() => {
const repoType = useRepoType(effectiveWorkingDirectory);
// Fetch device list when popover opens (desktop only)
useEffect(() => {
if (modePopoverOpen && isDesktop) {
setDevicesLoading(true);
deviceService.listDevices().then((list) => {
setDevices(list);
setDevicesLoading(false);
});
}
}, [modePopoverOpen]);
const dirIconNode = useMemo((): ReactNode => {
if (!effectiveWorkingDirectory) return <Icon icon={SquircleDashed} size={14} />;
if (repoType === 'github') return <Github size={14} />;
@@ -134,18 +159,43 @@ const RuntimeConfig = memo(() => {
}, [effectiveWorkingDirectory, repoType]);
const switchMode = useCallback(
async (mode: RuntimeEnvMode) => {
if (mode === runtimeMode) return;
async (mode: RuntimeEnvMode, opts?: { deviceId?: string }) => {
if (mode === runtimeMode && opts?.deviceId === deviceId) return;
const platform = isDesktop ? 'desktop' : 'web';
await updateAgentChatConfig({
runtimeEnv: { runtimeMode: { [platform]: mode } },
runtimeEnv: {
deviceId: opts?.deviceId,
runtimeMode: { [platform]: mode },
},
});
},
[runtimeMode, updateAgentChatConfig],
[runtimeMode, deviceId, updateAgentChatConfig],
);
// Compute the display label for the mode button
const activeDevice = useMemo(
() => (deviceId ? devices.find((d) => d.deviceId === deviceId) : undefined),
[deviceId, devices],
);
const ModeIcon = MODE_ICONS[runtimeMode] || LaptopIcon;
const modeLabel = useMemo(() => {
// When running on a specific device, show device hostname
if (runtimeMode === 'local' && activeDevice) {
return activeDevice.hostname;
}
return t(`runtimeEnv.mode.${runtimeMode}`);
}, [runtimeMode, activeDevice, t]);
const displayName = effectiveWorkingDirectory
? effectiveWorkingDirectory.split('/').findLast(Boolean) || effectiveWorkingDirectory
: tPlugin('localSystem.workingDirectory.notSet');
const hasDevices = devices.length > 0;
// Skeleton placeholder to prevent layout jump during loading
if (!agentId || isLoading) {
return (
@@ -156,66 +206,93 @@ const RuntimeConfig = memo(() => {
);
}
const ModeIcon = MODE_ICONS[runtimeMode];
const modeLabel = t(`runtimeEnv.mode.${runtimeMode}`);
const displayName = effectiveWorkingDirectory
? effectiveWorkingDirectory.split('/').findLast(Boolean) || effectiveWorkingDirectory
: tPlugin('localSystem.workingDirectory.notSet');
const modes: { desc: string; icon: typeof LaptopIcon; label: string; mode: RuntimeEnvMode }[] = [
// Local mode is desktop-only
...(isDesktop
? [
{
desc: t('runtimeEnv.mode.localDesc'),
icon: LaptopIcon,
label: t('runtimeEnv.mode.local'),
mode: 'local' as RuntimeEnvMode,
},
]
: []),
{
desc: t('runtimeEnv.mode.cloudDesc'),
icon: CloudIcon,
label: t('runtimeEnv.mode.cloud'),
mode: 'cloud',
},
{
desc: t('runtimeEnv.mode.noneDesc'),
icon: MonitorOffIcon,
label: t('runtimeEnv.mode.none'),
mode: 'none',
},
];
// ─── Popover Content ───
const modeContent = (
<Flexbox gap={4} style={{ minWidth: 280 }}>
{modes.map(({ mode, icon, label, desc }) => (
{/* ── Device section (desktop only) ── */}
{isDesktop && (
<>
<SectionHeader label={t('runtimeEnv.section.device')} />
{devicesLoading ? (
<Flexbox paddingBlock={12} paddingInline={8}>
<Skeleton.Button
active
size="small"
style={{ height: 16, marginBottom: 4, width: '60%' }}
/>
<Skeleton.Button active size="small" style={{ height: 12, width: '40%' }} />
</Flexbox>
) : hasDevices ? (
<DeviceSelector
activeDeviceId={deviceId}
devices={devices}
onSelect={(id) => switchMode('local', { deviceId: id })}
/>
) : (
<Flexbox
className={styles.modeOptionDesc}
paddingBlock={8}
paddingInline={8}
>
{t('runtimeEnv.device.empty')}
</Flexbox>
)}
<div className={styles.divider} />
</>
)}
{/* ── Sandbox ── */}
<Flexbox
horizontal
align={'flex-start'}
gap={12}
className={cx(
styles.modeOption,
(runtimeMode === 'sandbox' || runtimeMode === 'cloud') && styles.modeOptionActive,
)}
onClick={() => switchMode('sandbox')}
>
<Flexbox
horizontal
align={'flex-start'}
className={cx(styles.modeOption, runtimeMode === mode && styles.modeOptionActive)}
gap={12}
key={mode}
onClick={() => switchMode(mode)}
align={'center'}
className={styles.modeOptionIcon}
flex={'none'}
height={32}
justify={'center'}
width={32}
>
<Flexbox
align={'center'}
className={styles.modeOptionIcon}
flex={'none'}
height={32}
justify={'center'}
width={32}
>
<Icon icon={icon} />
</Flexbox>
<Flexbox flex={1}>
<div className={styles.modeOptionTitle}>{label}</div>
<div className={styles.modeOptionDesc}>{desc}</div>
</Flexbox>
<Icon icon={BoxIcon} />
</Flexbox>
))}
<Flexbox flex={1}>
<div className={styles.modeOptionTitle}>{t('runtimeEnv.mode.sandbox')}</div>
<div className={styles.modeOptionDesc}>{t('runtimeEnv.mode.sandboxDesc')}</div>
</Flexbox>
</Flexbox>
{/* ── Disabled ── */}
<Flexbox
horizontal
align={'flex-start'}
className={cx(styles.modeOption, runtimeMode === 'none' && styles.modeOptionActive)}
gap={12}
onClick={() => switchMode('none')}
>
<Flexbox
align={'center'}
className={styles.modeOptionIcon}
flex={'none'}
height={32}
justify={'center'}
width={32}
>
<Icon icon={MonitorOffIcon} />
</Flexbox>
<Flexbox flex={1}>
<div className={styles.modeOptionTitle}>{t('runtimeEnv.mode.none')}</div>
<div className={styles.modeOptionDesc}>{t('runtimeEnv.mode.noneDesc')}</div>
</Flexbox>
</Flexbox>
</Flexbox>
);
@@ -1,4 +1,5 @@
import { type AgentState } from '@lobechat/agent-runtime';
import { type UIChatMessage } from '@lobechat/types';
import debug from 'debug';
import { type AgentOperationMetadata, type StepResult } from './AgentStateManager';
@@ -43,6 +44,17 @@ export interface AgentRuntimeCoordinatorOptions {
* Defaults to automatic selection based on Redis availability
*/
streamEventManager?: IStreamEventManager;
/**
* Resolve the canonical UIChatMessage[] snapshot for a terminal-state
* agent run, attached to `agent_runtime_end` events so the client can
* use the pushed payload as Source of Truth instead of refetching from
* DB.
*
* Optional: when omitted (e.g. tests, embedded usage without DB access)
* the coordinator falls back to publishing without `uiMessages` and the
* client behaves as before.
*/
uiMessagesResolver?: (state: AgentState) => Promise<UIChatMessage[] | undefined>;
}
/**
@@ -59,10 +71,12 @@ export interface AgentRuntimeCoordinatorOptions {
export class AgentRuntimeCoordinator {
private stateManager: IAgentStateManager;
private streamEventManager: IStreamEventManager;
private uiMessagesResolver?: (state: AgentState) => Promise<UIChatMessage[] | undefined>;
constructor(options?: AgentRuntimeCoordinatorOptions) {
this.stateManager = options?.stateManager ?? createAgentStateManager();
this.streamEventManager = options?.streamEventManager ?? createStreamEventManager();
this.uiMessagesResolver = options?.uiMessagesResolver;
}
/**
@@ -94,6 +108,22 @@ export class AgentRuntimeCoordinator {
}
}
/**
* Invoke the optional uiMessagesResolver and shield callers from its
* failures — stream-event publishing must never fail the surrounding
* save. Errors are logged and surfaced to the client as a missing field,
* which falls back to the legacy refresh path.
*/
private async resolveUiMessages(state: AgentState): Promise<UIChatMessage[] | undefined> {
if (!this.uiMessagesResolver) return undefined;
try {
return await this.uiMessagesResolver(state);
} catch (error) {
console.error('Failed to resolve uiMessages for agent_runtime_end:', error);
return undefined;
}
}
/**
* Save Agent state and handle corresponding events
*/
@@ -106,12 +136,13 @@ export class AgentRuntimeCoordinator {
// Send a terminal event once the operation first enters a terminal state.
if (hasEnteredStreamEndState(previousState?.status, state.status)) {
await this.streamEventManager.publishAgentRuntimeEnd(
await this.streamEventManager.publishAgentRuntimeEnd({
finalState: state,
operationId,
state.stepCount ?? previousState?.stepCount ?? 0,
state,
state.status,
);
reason: state.status,
stepIndex: state.stepCount ?? previousState?.stepCount ?? 0,
uiMessages: await this.resolveUiMessages(state),
});
log('[%s] Agent runtime reached terminal state: %s', operationId, state.status);
}
} catch (error) {
@@ -133,12 +164,14 @@ export class AgentRuntimeCoordinator {
// This ensures agent_runtime_end is sent after all step events.
if (hasEnteredStreamEndState(previousState?.status, stepResult.newState.status)) {
await this.streamEventManager.publishAgentRuntimeEnd(
await this.streamEventManager.publishAgentRuntimeEnd({
finalState: stepResult.newState,
operationId,
stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0,
stepResult.newState,
stepResult.newState.status,
);
reason: stepResult.newState.status,
stepIndex:
stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0,
uiMessages: await this.resolveUiMessages(stepResult.newState),
});
log(
'[%s] Agent runtime reached terminal state after step result: %s',
operationId,
@@ -7,7 +7,7 @@ import {
type StreamChunkData,
type StreamEvent,
} from './StreamEventManager';
import type { IStreamEventManager } from './types';
import type { IStreamEventManager, PublishAgentRuntimeEndParams } from './types';
const log = debug('lobe-server:agent-runtime:gateway-notifier');
@@ -78,26 +78,25 @@ export class GatewayStreamNotifier implements IStreamEventManager {
return result;
}
async publishAgentRuntimeEnd(
operationId: string,
stepIndex: number,
finalState: any,
reason?: string,
reasonDetail?: string,
): Promise<string> {
const result = await this.inner.publishAgentRuntimeEnd(
operationId,
stepIndex,
finalState,
reason,
reasonDetail,
);
async publishAgentRuntimeEnd(params: PublishAgentRuntimeEndParams): Promise<string> {
const { operationId, stepIndex, finalState, reason, reasonDetail, uiMessages } = params;
const result = await this.inner.publishAgentRuntimeEnd(params);
const effectiveReasonDetail = reasonDetail || getDefaultReasonDetail(finalState, reason);
const errorType = finalState?.error?.type || finalState?.error?.errorType;
this.pushEvent(operationId, {
data: { errorType, finalState, reason, reasonDetail: effectiveReasonDetail },
// Forward `uiMessages` to the gateway push channel so terminal-state
// clients consuming /push-event get the canonical UIChatMessage[]
// snapshot — the final step has no later step_start to carry a fresh
// snapshot, so dropping it here would break the SoT contract.
data: {
errorType,
finalState,
reason,
reasonDetail: effectiveReasonDetail,
...(uiMessages !== undefined && { uiMessages }),
},
operationId,
stepIndex,
timestamp: Date.now(),
@@ -1,7 +1,7 @@
import debug from 'debug';
import { type StreamChunkData, type StreamEvent } from './StreamEventManager';
import { type IStreamEventManager } from './types';
import { type IStreamEventManager, type PublishAgentRuntimeEndParams } from './types';
const log = debug('lobe-server:agent-runtime:in-memory-stream-event-manager');
@@ -97,13 +97,14 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
});
}
async publishAgentRuntimeEnd(
operationId: string,
stepIndex: number,
finalState: any,
reason?: string,
reasonDetail?: string,
): Promise<string> {
async publishAgentRuntimeEnd({
operationId,
stepIndex,
finalState,
reason,
reasonDetail,
uiMessages,
}: PublishAgentRuntimeEndParams): Promise<string> {
return this.publishStreamEvent(operationId, {
data: {
finalState,
@@ -111,6 +112,7 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
phase: 'execution_complete',
reason: reason || 'completed',
reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason),
...(uiMessages !== undefined && { uiMessages }),
},
stepIndex,
type: 'agent_runtime_end',
@@ -272,6 +272,14 @@ export interface RuntimeExecutorContext {
streamManager: IStreamEventManager;
toolExecutionService: ToolExecutionService;
topicId?: string;
/**
* Trace-pipeline sink for context engine input/output. Wired by
* AgentRuntimeService so the trace recorder can pick CE data up
* out-of-band, keeping the heavy CE payload (agentDocuments, systemRole, …)
* out of the `events` array and therefore out of the Redis state pipeline.
* See LOBE-9110.
*/
tracingContextEngine?: (input: unknown, output: unknown) => void;
userId?: string;
userTimezone?: string;
}
@@ -714,7 +722,7 @@ export const createRuntimeExecutors = (
processedMessages = await serverMessagesEngine(contextEngineInput);
// Emit context engine event for tracing
// Hand context engine input/output to the trace sink out-of-band.
// Omit large/redundant fields to reduce snapshot size:
// - input.messages: reconstructible from step's messagesBaseline + messagesDelta
// - input.toolsConfig: static per operation, ~47KB of manifests repeated every call_llm step
@@ -724,14 +732,10 @@ export const createRuntimeExecutors = (
toolsConfig: _toolsConfig,
...contextEngineInputLite
} = contextEngineInput;
events.push({
input: {
...contextEngineInputLite,
toolCount: _toolsConfig?.tools?.length ?? 0,
},
output: processedMessages,
type: 'context_engine_result',
} as any);
ctx.tracingContextEngine?.(
{ ...contextEngineInputLite, toolCount: _toolsConfig?.tools?.length ?? 0 },
processedMessages,
);
} else {
processedMessages = llmPayload.messages;
}
@@ -4,6 +4,7 @@ import debug from 'debug';
import { type Redis } from 'ioredis';
import { getAgentRuntimeRedisClient } from './redis';
import { type PublishAgentRuntimeEndParams } from './types';
const log = debug('lobe-server:agent-runtime:stream-event-manager');
const timing = debug('lobe-server:agent-runtime:timing');
@@ -182,13 +183,14 @@ export class StreamEventManager {
/**
* Publish Agent runtime end event
*/
async publishAgentRuntimeEnd(
operationId: string,
stepIndex: number,
finalState: any,
reason?: string,
reasonDetail?: string,
): Promise<string> {
async publishAgentRuntimeEnd({
operationId,
stepIndex,
finalState,
reason,
reasonDetail,
uiMessages,
}: PublishAgentRuntimeEndParams): Promise<string> {
return this.publishStreamEvent(operationId, {
data: {
finalState,
@@ -196,6 +198,7 @@ export class StreamEventManager {
phase: 'execution_complete',
reason: reason || 'completed',
reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason),
...(uiMessages !== undefined && { uiMessages }),
},
stepIndex,
type: 'agent_runtime_end',
@@ -95,12 +95,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStateManager.saveAgentState).toHaveBeenCalledWith(operationId, newState);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId,
newState.stepCount,
newState,
'done',
);
reason: 'done',
stepIndex: newState.stepCount,
uiMessages: undefined,
});
});
it('should publish end event when status changes to error', async () => {
@@ -112,12 +113,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId,
newState.stepCount,
newState,
'error',
);
reason: 'error',
stepIndex: newState.stepCount,
uiMessages: undefined,
});
});
it('should fallback to previous stepCount when terminal state is missing stepCount', async () => {
@@ -129,12 +131,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId,
previousState.stepCount,
newState,
'error',
);
reason: 'error',
stepIndex: previousState.stepCount,
uiMessages: undefined,
});
});
it('should publish end event when status changes to interrupted', async () => {
@@ -146,12 +149,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId,
newState.stepCount,
newState,
'interrupted',
);
reason: 'interrupted',
stepIndex: newState.stepCount,
uiMessages: undefined,
});
});
it('should publish end event when status changes to waiting_for_human so the client releases its loading state', async () => {
@@ -163,12 +167,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveAgentState(operationId, newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId,
newState.stepCount,
newState,
'waiting_for_human',
);
reason: 'waiting_for_human',
stepIndex: newState.stepCount,
uiMessages: undefined,
});
});
it('should not publish end event when status was already done', async () => {
@@ -214,12 +219,13 @@ describe('AgentRuntimeCoordinator', () => {
expect(mockStateManager.loadAgentState).toHaveBeenCalledWith(operationId);
expect(mockStateManager.saveStepResult).toHaveBeenCalledWith(operationId, stepResult);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId,
5,
stepResult.newState,
'done',
);
reason: 'done',
stepIndex: 5,
uiMessages: undefined,
});
});
it('should publish end event when status becomes error', async () => {
@@ -234,12 +240,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId,
5,
stepResult.newState,
'error',
);
reason: 'error',
stepIndex: 5,
uiMessages: undefined,
});
});
it('should fallback to stepResult.stepIndex when terminal step result state is missing stepCount', async () => {
@@ -254,12 +261,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId,
stepResult.stepIndex,
stepResult.newState,
'error',
);
reason: 'error',
stepIndex: stepResult.stepIndex,
uiMessages: undefined,
});
});
it('should publish end event when status becomes waiting_for_human (paused awaiting approval)', async () => {
@@ -274,12 +282,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId,
4,
stepResult.newState,
'waiting_for_human',
);
reason: 'waiting_for_human',
stepIndex: 4,
uiMessages: undefined,
});
});
it('should publish end event when status becomes interrupted', async () => {
@@ -294,12 +303,13 @@ describe('AgentRuntimeCoordinator', () => {
await coordinator.saveStepResult(operationId, stepResult as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId,
5,
stepResult.newState,
'interrupted',
);
reason: 'interrupted',
stepIndex: 5,
uiMessages: undefined,
});
});
it('should not publish end event when status is not done', async () => {
@@ -395,4 +405,105 @@ describe('AgentRuntimeCoordinator', () => {
expect(result).toBe(expectedHistory);
});
});
// Terminal events should carry the canonical UIChatMessage[] snapshot
// when a resolver is wired so the client can use the pushed payload as
// Source of Truth instead of refetching from DB.
describe('uiMessagesResolver on agent_runtime_end', () => {
it('passes resolver result through saveAgentState terminal publish', async () => {
const uiMessages = [{ id: 'msg_1', role: 'user' }] as any[];
const resolver = vi.fn().mockResolvedValue(uiMessages);
const coordinatorWithResolver = new AgentRuntimeCoordinator({
stateManager: mockStateManager,
streamEventManager: mockStreamManager,
uiMessagesResolver: resolver,
});
const previousState = { status: 'running', stepCount: 3 };
const newState = { status: 'done', stepCount: 5 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinatorWithResolver.saveAgentState('op-1', newState as any);
expect(resolver).toHaveBeenCalledWith(newState);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId: 'op-1',
reason: 'done',
stepIndex: 5,
uiMessages,
});
});
it('passes resolver result through saveStepResult terminal publish', async () => {
const uiMessages = [{ id: 'msg_a', role: 'assistantGroup' }] as any[];
const resolver = vi.fn().mockResolvedValue(uiMessages);
const coordinatorWithResolver = new AgentRuntimeCoordinator({
stateManager: mockStateManager,
streamEventManager: mockStreamManager,
uiMessagesResolver: resolver,
});
const stepResult = {
executionTime: 100,
newState: { status: 'done', stepCount: 4 },
stepIndex: 4,
};
mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 3 });
await coordinatorWithResolver.saveStepResult('op-2', stepResult as any);
expect(resolver).toHaveBeenCalledWith(stepResult.newState);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: stepResult.newState,
operationId: 'op-2',
reason: 'done',
stepIndex: 4,
uiMessages,
});
});
it('publishes with uiMessages=undefined when resolver rejects (must never fail the surrounding save)', async () => {
const resolver = vi.fn().mockRejectedValue(new Error('db down'));
const coordinatorWithResolver = new AgentRuntimeCoordinator({
stateManager: mockStateManager,
streamEventManager: mockStreamManager,
uiMessagesResolver: resolver,
});
const previousState = { status: 'running', stepCount: 3 };
const newState = { status: 'error', stepCount: 5 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinatorWithResolver.saveAgentState('op-3', newState as any);
expect(resolver).toHaveBeenCalled();
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId: 'op-3',
reason: 'error',
stepIndex: 5,
uiMessages: undefined,
});
});
it('publishes with uiMessages=undefined when no resolver is wired (default constructor)', async () => {
// The default `coordinator` from the outer beforeEach is constructed
// without a resolver — proves the field is genuinely optional and
// legacy call sites stay unaffected.
const previousState = { status: 'running', stepCount: 3 };
const newState = { status: 'done', stepCount: 5 };
mockStateManager.loadAgentState.mockResolvedValue(previousState);
await coordinator.saveAgentState('op-4', newState as any);
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
finalState: newState,
operationId: 'op-4',
reason: 'done',
stepIndex: 5,
uiMessages: undefined,
});
});
});
});
@@ -131,21 +131,27 @@ describe('GatewayStreamNotifier', () => {
it('delegates to inner and returns its result', async () => {
const finalState = { status: 'done' };
const result = await notifier.publishAgentRuntimeEnd('op-1', 2, finalState, 'completed');
const params = {
finalState,
operationId: 'op-1',
reason: 'completed',
stepIndex: 2,
};
const result = await notifier.publishAgentRuntimeEnd(params);
expect(result).toBe('publishAgentRuntimeEnd-result');
expect(inner.calls.publishAgentRuntimeEnd).toHaveLength(1);
expect(inner.calls.publishAgentRuntimeEnd[0]).toEqual([
'op-1',
2,
finalState,
'completed',
undefined,
]);
expect(inner.calls.publishAgentRuntimeEnd[0]).toEqual([params]);
});
it('calls gateway push-event endpoint only (no update-status)', async () => {
await notifier.publishAgentRuntimeEnd('op-1', 2, {}, 'completed', 'All done');
await notifier.publishAgentRuntimeEnd({
finalState: {},
operationId: 'op-1',
reason: 'completed',
reasonDetail: 'All done',
stepIndex: 2,
});
await new Promise((r) => setTimeout(r, 50));
@@ -163,7 +169,12 @@ describe('GatewayStreamNotifier', () => {
},
};
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
await notifier.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'error',
stepIndex: 0,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
@@ -176,7 +187,13 @@ describe('GatewayStreamNotifier', () => {
error: { message: 'Some error' },
};
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error', 'Custom detail');
await notifier.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'error',
reasonDetail: 'Custom detail',
stepIndex: 0,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
@@ -189,7 +206,12 @@ describe('GatewayStreamNotifier', () => {
error: { message: 'Budget exceeded', type: 'InsufficientBudgetForModel' },
};
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
await notifier.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'error',
stepIndex: 0,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
@@ -205,7 +227,12 @@ describe('GatewayStreamNotifier', () => {
},
};
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
await notifier.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'error',
stepIndex: 0,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
@@ -214,13 +241,49 @@ describe('GatewayStreamNotifier', () => {
});
it('errorType is undefined when no error in finalState', async () => {
await notifier.publishAgentRuntimeEnd('op-1', 0, { status: 'done' }, 'completed');
await notifier.publishAgentRuntimeEnd({
finalState: { status: 'done' },
operationId: 'op-1',
reason: 'completed',
stepIndex: 0,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
const body = JSON.parse(pushCall![1].body);
expect(body.event.data.errorType).toBeUndefined();
});
it('forwards uiMessages to the gateway push payload when provided', async () => {
const uiMessages = [{ id: 'msg_z', role: 'assistantGroup' }] as any;
await notifier.publishAgentRuntimeEnd({
finalState: { status: 'done' },
operationId: 'op-1',
reason: 'completed',
stepIndex: 4,
uiMessages,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
const body = JSON.parse(pushCall![1].body);
expect(body.event.data.uiMessages).toEqual(uiMessages);
});
it('omits uiMessages from the gateway push payload when not provided', async () => {
await notifier.publishAgentRuntimeEnd({
finalState: { status: 'done' },
operationId: 'op-1',
reason: 'completed',
stepIndex: 4,
});
await new Promise((r) => setTimeout(r, 50));
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
const body = JSON.parse(pushCall![1].body);
expect(body.event.data).not.toHaveProperty('uiMessages');
});
});
// ─── Read/subscribe methods: must delegate directly to inner ───
@@ -306,7 +369,12 @@ describe('GatewayStreamNotifier', () => {
() => new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), 10)),
);
const result = await notifier.publishAgentRuntimeEnd('op-1', 0, {}, 'completed');
const result = await notifier.publishAgentRuntimeEnd({
finalState: {},
operationId: 'op-1',
reason: 'completed',
stepIndex: 0,
});
expect(result).toBe('publishAgentRuntimeEnd-result');
expect(inner.calls.publishAgentRuntimeEnd).toHaveLength(1);
@@ -157,4 +157,46 @@ describe('InMemoryStreamEventManager', () => {
expect(manager.getAllEvents('op-1')).toHaveLength(0);
});
});
// agent_runtime_end optionally carries the canonical UIChatMessage[]
// snapshot so the client can use the pushed payload as Source of Truth
// instead of refetching from DB.
describe('publishAgentRuntimeEnd uiMessages', () => {
it('includes uiMessages in event data when provided', async () => {
const uiMessages = [
{ id: 'msg_u', role: 'user' },
{ id: 'msg_a', role: 'assistantGroup' },
] as any[];
const finalState = { status: 'done', stepCount: 3 };
await manager.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'done',
stepIndex: 3,
uiMessages,
});
const events = manager.getAllEvents('op-1');
expect(events).toHaveLength(1);
expect(events[0].type).toBe('agent_runtime_end');
expect(events[0].data.uiMessages).toEqual(uiMessages);
expect(events[0].data.finalState).toEqual(finalState);
});
it('omits uiMessages when not provided (legacy callers stay unaffected)', async () => {
const finalState = { status: 'done', stepCount: 3 };
await manager.publishAgentRuntimeEnd({
finalState,
operationId: 'op-1',
reason: 'done',
stepIndex: 3,
});
const events = manager.getAllEvents('op-1');
expect(events[0].data).not.toHaveProperty('uiMessages');
expect(events[0].data.finalState).toEqual(finalState);
});
});
});
@@ -75,7 +75,11 @@ describe('StreamEventManager', () => {
mockRedis.xadd.mockResolvedValue('event-id-456');
const result = await streamManager.publishAgentRuntimeEnd(operationId, stepIndex, finalState);
const result = await streamManager.publishAgentRuntimeEnd({
finalState,
operationId,
stepIndex,
});
expect(result).toBe('event-id-456');
expect(mockRedis.xadd).toHaveBeenCalledWith(
@@ -103,6 +107,50 @@ describe('StreamEventManager', () => {
);
});
// agent_runtime_end optionally carries the canonical UIChatMessage[]
// snapshot so the client can use the pushed payload as Source of Truth
// instead of refetching from DB.
it('should include uiMessages in serialized data when provided', async () => {
const operationId = 'test-operation-id';
const stepIndex = 5;
const finalState = { status: 'done', stepCount: 5 };
const uiMessages = [{ id: 'msg_a', role: 'assistantGroup' }] as any;
mockRedis.xadd.mockResolvedValue('event-id-ui');
await streamManager.publishAgentRuntimeEnd({
finalState,
operationId,
reason: 'done',
stepIndex,
uiMessages,
});
// Find the serialized `data` argument inline so this test stays robust
// if other positional args shift around.
const dataArg = mockRedis.xadd.mock.calls[0]?.find(
(a: any) => typeof a === 'string' && a.startsWith('{'),
);
const parsed = JSON.parse(dataArg);
expect(parsed.uiMessages).toEqual(uiMessages);
expect(parsed.finalState).toEqual(finalState);
});
it('should omit uiMessages from serialized data when not provided', async () => {
const operationId = 'test-operation-id';
const finalState = { status: 'done', stepCount: 3 };
mockRedis.xadd.mockResolvedValue('event-id-noui');
await streamManager.publishAgentRuntimeEnd({ finalState, operationId, stepIndex: 3 });
const dataArg = mockRedis.xadd.mock.calls[0]?.find(
(a: any) => typeof a === 'string' && a.startsWith('{'),
);
const parsed = JSON.parse(dataArg);
expect(parsed).not.toHaveProperty('uiMessages');
});
it('should accept custom reason and reasonDetail', async () => {
const operationId = 'test-operation-id';
const stepIndex = 3;
@@ -112,13 +160,13 @@ describe('StreamEventManager', () => {
mockRedis.xadd.mockResolvedValue('event-id-789');
await streamManager.publishAgentRuntimeEnd(
operationId,
stepIndex,
await streamManager.publishAgentRuntimeEnd({
finalState,
operationId,
reason,
reasonDetail,
);
stepIndex,
});
expect(mockRedis.xadd).toHaveBeenCalledWith(
expect.any(String),
@@ -158,7 +206,12 @@ describe('StreamEventManager', () => {
mockRedis.xadd.mockResolvedValue('event-id-790');
await streamManager.publishAgentRuntimeEnd(operationId, stepIndex, finalState, 'error');
await streamManager.publishAgentRuntimeEnd({
finalState,
operationId,
reason: 'error',
stepIndex,
});
expect(mockRedis.xadd).toHaveBeenCalledWith(
expect.any(String),
+22 -8
View File
@@ -1,9 +1,24 @@
import type { ToolExecuteData } from '@lobechat/agent-gateway-client';
import { type AgentState } from '@lobechat/agent-runtime';
import { type UIChatMessage } from '@lobechat/types';
import { type AgentOperationMetadata, type StepResult } from './AgentStateManager';
import { type StreamChunkData, type StreamEvent } from './StreamEventManager';
export interface PublishAgentRuntimeEndParams {
finalState: any;
operationId: string;
reason?: string;
reasonDetail?: string;
stepIndex: number;
/**
* Canonical UIChatMessage[] snapshot of the topic at terminal-state time.
* When present, the client uses this directly as Source of Truth instead
* of refetching from DB.
*/
uiMessages?: UIChatMessage[];
}
/**
* Agent State Manager Interface
* Abstract interface for state persistence, supports Redis and in-memory implementations
@@ -114,15 +129,14 @@ export interface IStreamEventManager {
getStreamHistory: (operationId: string, count?: number) => Promise<StreamEvent[]>;
/**
* Publish Agent runtime end event
* Publish Agent runtime end event.
*
* `uiMessages` is the canonical UIChatMessage[] snapshot of the topic at
* terminal-state time so the client can use the pushed payload as Source
* of Truth instead of refetching from DB. Optional: callers without DB
* access may omit it and the client falls back to its existing behaviour.
*/
publishAgentRuntimeEnd: (
operationId: string,
stepIndex: number,
finalState: any,
reason?: string,
reasonDetail?: string,
) => Promise<string>;
publishAgentRuntimeEnd: (params: PublishAgentRuntimeEndParams) => Promise<string>;
/**
* Publish Agent runtime initialization event
@@ -201,7 +201,7 @@ export const createServerAgentToolsEngine = (
// Always-on builtin tools
...Object.fromEntries(alwaysOnToolIds.map((id) => [id, true])),
// System-level rules (may override user selection for specific tools)
[CloudSandboxManifest.identifier]: runtimeMode === 'cloud',
[CloudSandboxManifest.identifier]: runtimeMode === 'cloud' || runtimeMode === 'sandbox',
[KnowledgeBaseManifest.identifier]: hasEnabledKnowledgeBases,
// Local-system: gated by `canUseDevice` (resolveDeviceAccessPolicy)
// first — keeps external bot senders out before runtime checks even
+7 -7
View File
@@ -132,13 +132,13 @@ export const agentNotifyRouter = router({
const stream = getStreamManager();
if (done) {
// Signal task completion — frontend gateway WS subscription closes.
await stream.publishAgentRuntimeEnd(
remoteOperationId,
0,
{ reason: 'success' },
'success',
'Remote hetero agent task completed',
);
await stream.publishAgentRuntimeEnd({
finalState: { reason: 'success' },
operationId: remoteOperationId,
reason: 'success',
reasonDetail: 'Remote hetero agent task completed',
stepIndex: 0,
});
} else {
// Lightweight invalidation — frontend calls fetchAndReplaceMessages.
await stream.publishStreamEvent(remoteOperationId, {
@@ -1478,4 +1478,55 @@ describe('AgentRuntimeService', () => {
expect(mockCoordinator.saveAgentState).not.toHaveBeenCalled();
});
});
// Stream events at step / operation boundaries should carry the canonical
// UIChatMessage[] snapshot so the client can use the pushed payload as
// Source of Truth instead of refetching from DB.
describe('queryUiMessages', () => {
const stubMessageService = (svc: any, queryMessages: ReturnType<typeof vi.fn>) => {
svc.messageServiceInstance = { queryMessages };
};
it('returns messageService.queryMessages result when agentId + topicId are present', async () => {
const stubMessages = [{ id: 'msg_x', role: 'user' }];
const queryMessages = vi.fn().mockResolvedValue(stubMessages);
stubMessageService(service, queryMessages);
const result = await service.queryUiMessages({
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
} as any);
expect(queryMessages).toHaveBeenCalledWith({ agentId: 'agt_1', topicId: 'tpc_1' });
expect(result).toEqual(stubMessages);
});
it('returns undefined when agentId or topicId is missing (skips empty-array push)', async () => {
const queryMessages = vi.fn();
stubMessageService(service, queryMessages);
const noAgent = await service.queryUiMessages({
metadata: { topicId: 'tpc_1' },
} as any);
const noTopic = await service.queryUiMessages({
metadata: { agentId: 'agt_1' },
} as any);
const noMeta = await service.queryUiMessages({} as any);
expect(noAgent).toBeUndefined();
expect(noTopic).toBeUndefined();
expect(noMeta).toBeUndefined();
expect(queryMessages).not.toHaveBeenCalled();
});
it('returns undefined and never throws when DB query fails (stream events must not fail the step)', async () => {
const queryMessages = vi.fn().mockRejectedValue(new Error('db down'));
stubMessageService(service, queryMessages);
const result = await service.queryUiMessages({
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
} as any);
expect(result).toBeUndefined();
});
});
});
@@ -13,6 +13,7 @@ import {
ChatErrorType,
type ChatMessageError,
type ExecSubAgentTaskParams,
type UIChatMessage,
} from '@lobechat/types';
import debug from 'debug';
import urlJoin from 'url-join';
@@ -30,6 +31,7 @@ import { type IStreamEventManager } from '@/server/modules/AgentRuntime/types';
import { emitAgentSignalSourceEvent } from '@/server/services/agentSignal';
import { toAgentSignalTraceEvents } from '@/server/services/agentSignal/observability/traceEvents';
import { mcpService } from '@/server/services/mcp';
import { MessageService } from '@/server/services/message';
import { QueueService } from '@/server/services/queue';
import { LocalQueueServiceImpl } from '@/server/services/queue/impls';
import { ToolExecutionService } from '@/server/services/toolExecution';
@@ -182,6 +184,17 @@ export class AgentRuntimeService {
private serverDB: LobeChatDatabase;
private userId: string;
private messageModel: MessageModel;
// Lazily constructed because MessageService instantiates a FileService
// which eagerly creates the S3 client and throws when S3 env vars are
// missing — eager construction would break every test that builds an
// AgentRuntimeService without mocking the file backend.
private messageServiceInstance?: MessageService;
private get messageService(): MessageService {
if (!this.messageServiceInstance) {
this.messageServiceInstance = new MessageService(this.serverDB, this.userId);
}
return this.messageServiceInstance;
}
constructor(db: LobeChatDatabase, userId: string, options?: AgentRuntimeServiceOptions) {
// Use factory function to auto-select Redis or InMemory implementation
@@ -192,6 +205,10 @@ export class AgentRuntimeService {
this.coordinator = new AgentRuntimeCoordinator({
...options?.coordinatorOptions,
streamEventManager: this.streamManager,
// Provide the canonical UIChatMessage[] for terminal-state events so
// the client can use the pushed payload directly instead of refetching
// from DB. Falls back gracefully when topicId isn't set.
uiMessagesResolver: (state) => this.queryUiMessages(state),
});
this.queueService =
options?.queueService === null ? null : (options?.queueService ?? new QueueService());
@@ -474,6 +491,31 @@ export class AgentRuntimeService {
}
}
/**
* Query the canonical UIChatMessage[] snapshot for the active topic — the
* same shape the `message.getMessages` trpc lambda returns to the client.
* Attached to step_start / agent_runtime_end stream events so the client
* can use the pushed payload directly instead of refetching from DB.
*
* Returns undefined when the topic isn't known yet (e.g. very early in
* bootstrap before the topic row has been committed) so callers can skip
* the `uiMessages` field entirely instead of pushing an empty array.
*/
async queryUiMessages(agentState: AgentState): Promise<UIChatMessage[] | undefined> {
const agentId: string | undefined = agentState?.metadata?.agentId;
const topicId: string | undefined = agentState?.metadata?.topicId;
if (!agentId || !topicId) return undefined;
try {
return await this.messageService.queryMessages({ agentId, topicId });
} catch (error) {
// Stream events must never fail the step. If the DB hiccups, fall back
// to letting the client refresh as before.
console.error('[queryUiMessages] Failed to load uiMessages snapshot: %O', error);
return undefined;
}
}
/**
* Execute Agent step
*/
@@ -515,20 +557,27 @@ export class AgentRuntimeService {
try {
log('[%s][%d] Start step executing...', operationId, stepIndex);
// Publish step start event
await this.streamManager.publishStreamEvent(operationId, {
data: {},
stepIndex,
type: 'step_start',
});
// Get operation state and metadata
// Load agent state BEFORE publishing step_start so we can attach the
// canonical UIChatMessage snapshot to the event payload. step_start
// fires after the previous step's DB writes are awaited durable, so
// the snapshot query here reflects strongly-consistent state — that's
// the contract that lets the client treat the pushed uiMessages as
// the source of truth instead of doing its own refetch.
const agentState = await this.coordinator.loadAgentState(operationId);
if (!agentState) {
throw new Error(`Agent state not found for operation ${operationId}`);
}
const stepStartUiMessages = await this.queryUiMessages(agentState);
await this.streamManager.publishStreamEvent(operationId, {
data: {
...(stepStartUiMessages !== undefined && { uiMessages: stepStartUiMessages }),
},
stepIndex,
type: 'step_start',
});
agentState.metadata = {
...agentState.metadata,
externalRetryCount,
@@ -622,6 +671,14 @@ export class AgentRuntimeService {
log('[%s] beforeStep hook dispatch error: %O', operationId, hookError);
}
// Per-step buffer for context engine input/output. Populated by the
// `tracingContextEngine` callback passed into the executor context;
// consumed by traceRecorder.appendStep below. Routing CE this way keeps
// its heavy payload (agentDocuments, systemRole, …) out of
// `stepResult.events` and therefore out of the Redis state pipeline.
// See LOBE-9110.
let contextEnginePayload: { input: unknown; output: unknown } | undefined;
// Create Agent and Runtime instances
// Use agentState.metadata which contains the full app context (topicId, agentId, etc.)
// operationMetadata only contains basic fields (agentConfig, modelRuntimeConfig, userId)
@@ -629,6 +686,9 @@ export class AgentRuntimeService {
metadata: agentState?.metadata,
operationId,
stepIndex,
tracingContextEngine: (input, output) => {
contextEnginePayload = { input, output };
},
});
// Handle human intervention
@@ -800,6 +860,7 @@ export class AgentRuntimeService {
afterStepSignalEvents,
agentState,
beforeStepSignalEvents,
contextEngine: contextEnginePayload,
currentContext,
externalRetryCount,
presentation: stepPresentationData,
@@ -1337,10 +1398,12 @@ export class AgentRuntimeService {
metadata,
operationId,
stepIndex,
tracingContextEngine,
}: {
metadata?: any;
operationId: string;
stepIndex: number;
tracingContextEngine?: (input: unknown, output: unknown) => void;
}) {
const contextWindowTokens =
metadata?.modelRuntimeConfig?.model && metadata?.modelRuntimeConfig?.provider
@@ -1386,6 +1449,7 @@ export class AgentRuntimeService {
streamManager: this.streamManager,
toolExecutionService: this.toolExecutionService,
topicId: metadata?.topicId,
tracingContextEngine,
userId: metadata?.userId,
};
@@ -15,6 +15,13 @@ export interface AppendStepParams {
*/
agentState: any;
beforeStepSignalEvents: SignalEvent[];
/**
* Context engine input/output captured for this step. Delivered via
* `RuntimeExecutorContext.tracingContextEngine` rather than through the
* `events` array, so CE payloads (agentDocuments, systemRole, …) stay out
* of the Redis state pipeline. See LOBE-9110.
*/
contextEngine?: { input?: unknown; output?: unknown };
currentContext?: { payload?: unknown; phase?: string; stepContext?: unknown };
externalRetryCount: number;
presentation: StepPresentationData;
@@ -220,6 +227,7 @@ export class OperationTraceRecorder {
agentState,
afterStepSignalEvents,
beforeStepSignalEvents,
contextEngine: ceInput,
currentContext,
externalRetryCount,
presentation,
@@ -237,21 +245,20 @@ export class OperationTraceRecorder {
const isBaseline = stepIndex === 0 || isCompression;
const messagesDelta = afterMessages.slice(prevMessages.length);
// Extract context_engine_result into contextEngine (dedicated typed field).
// CE data is structural state, not a streaming event — it lives separately
// from events and uses the same delta pattern as messagesBaseline/messagesDelta.
const rawEvents = (stepResult.events as any[]) ?? [];
const ceEvent = rawEvents.find((e: any) => e.type === 'context_engine_result') as any;
const contextEngine: StepSnapshot['contextEngine'] = ceEvent
? { input: ceEvent.input, output: ceEvent.output }
// CE data is structural state, not a streaming event — delivered via the
// typed `contextEngine` field on AppendStepParams (sourced from
// RuntimeExecutorContext.tracingContextEngine). Uses the same delta
// pattern as messagesBaseline/messagesDelta.
const contextEngine: StepSnapshot['contextEngine'] = ceInput
? { input: ceInput.input, output: ceInput.output }
: undefined;
// Strip heavy/redundant data from events before persisting to snapshot.
// context_engine_result is excluded — stored in contextEngine instead.
const rawEvents = (stepResult.events as any[]) ?? [];
const snapshotEvents = [
...beforeStepSignalEvents,
...rawEvents
.filter((e) => e.type !== 'llm_stream' && e.type !== 'context_engine_result')
.filter((e) => e.type !== 'llm_stream')
.map((e) => {
if (e.type === 'done' && e.finalState) {
// Remove reconstructible fields from finalState:
@@ -358,26 +358,29 @@ describe('OperationTraceRecorder', () => {
recorder = new OperationTraceRecorder(store as any);
});
const buildCeEvent = (overrides: Record<string, unknown> = {}) => ({
const buildCe = (overrides: { input?: unknown; output?: unknown } = {}) => ({
input: { messages: ['hello'] },
output: { tokens: 42 },
type: 'context_engine_result',
...overrides,
});
const appendStepWithCe = (ceEvent: Record<string, unknown>, prevStepsInStore: any[]) => {
const appendStepWithCe = (
ce: { input?: unknown; output?: unknown },
prevStepsInStore: any[],
) => {
store.loadPartial.mockResolvedValue({ startedAt: 1, steps: prevStepsInStore });
return recorder.appendStep('op-ce', {
afterStepSignalEvents: [],
agentState: { messages: [] },
beforeStepSignalEvents: [],
contextEngine: ce,
currentContext: { phase: 'user_input' },
externalRetryCount: 0,
presentation: buildPresentation(),
startedAt: 1000,
stepIndex: prevStepsInStore.length,
stepResult: {
events: [ceEvent],
events: [],
newState: { activatedStepTools: [], messages: [] },
},
});
@@ -388,8 +391,8 @@ describe('OperationTraceRecorder', () => {
return saved.steps.find((s: any) => s.stepIndex === newStepIndex);
};
it('extracts CE event into contextEngine, not in events', async () => {
await appendStepWithCe(buildCeEvent(), []);
it('routes CE input/output into contextEngine, never into events', async () => {
await appendStepWithCe(buildCe(), []);
const step = getSavedStep(0);
// CE data lives in contextEngine, not events
@@ -398,7 +401,7 @@ describe('OperationTraceRecorder', () => {
});
it('keeps both input and output on the first step (no previous CE to compare against)', async () => {
await appendStepWithCe(buildCeEvent(), []);
await appendStepWithCe(buildCe(), []);
const step = getSavedStep(0);
expect(step.contextEngine.input).toEqual({ messages: ['hello'] });
@@ -411,7 +414,7 @@ describe('OperationTraceRecorder', () => {
stepIndex: 0,
stepType: 'call_llm',
};
await appendStepWithCe(buildCeEvent(), [prevStep]);
await appendStepWithCe(buildCe(), [prevStep]);
const step = getSavedStep(1);
expect(step.contextEngine.input).toBeUndefined();
@@ -424,10 +427,9 @@ describe('OperationTraceRecorder', () => {
stepIndex: 0,
stepType: 'call_llm',
};
await appendStepWithCe(
buildCeEvent({ input: { messages: ['hello'] }, output: { tokens: 99 } }),
[prevStep],
);
await appendStepWithCe(buildCe({ input: { messages: ['hello'] }, output: { tokens: 99 } }), [
prevStep,
]);
const step = getSavedStep(1);
expect(step.contextEngine.input).toBeUndefined();
@@ -440,10 +442,9 @@ describe('OperationTraceRecorder', () => {
stepIndex: 0,
stepType: 'call_llm',
};
await appendStepWithCe(
buildCeEvent({ input: { messages: ['world'] }, output: { tokens: 42 } }),
[prevStep],
);
await appendStepWithCe(buildCe({ input: { messages: ['world'] }, output: { tokens: 42 } }), [
prevStep,
]);
const step = getSavedStep(1);
expect(step.contextEngine.input).toEqual({ messages: ['world'] });
@@ -456,10 +457,9 @@ describe('OperationTraceRecorder', () => {
stepIndex: 0,
stepType: 'call_llm',
};
await appendStepWithCe(
buildCeEvent({ input: { messages: ['new'] }, output: { tokens: 20 } }),
[prevStep],
);
await appendStepWithCe(buildCe({ input: { messages: ['new'] }, output: { tokens: 20 } }), [
prevStep,
]);
const step = getSavedStep(1);
expect(step.contextEngine.input).toEqual({ messages: ['new'] });
@@ -476,7 +476,7 @@ describe('OperationTraceRecorder', () => {
// step 1 has no contextEngine — dedup must skip it and walk back to step 0
{ stepIndex: 1, stepType: 'call_tool' },
];
await appendStepWithCe(buildCeEvent(), prevSteps);
await appendStepWithCe(buildCe(), prevSteps);
const step = getSavedStep(2);
expect(step.contextEngine.input).toBeUndefined();
@@ -492,7 +492,7 @@ describe('OperationTraceRecorder', () => {
{ contextEngine: { output: { tokens: 42 } }, stepIndex: 1, stepType: 'call_llm' },
];
await appendStepWithCe(
buildCeEvent({ input: { messages: ['hello'] }, output: { tokens: 42 } }),
buildCe({ input: { messages: ['hello'] }, output: { tokens: 42 } }),
prevSteps,
);
@@ -501,7 +501,7 @@ describe('OperationTraceRecorder', () => {
expect(step.contextEngine.output).toBeUndefined();
});
it('sets contextEngine to undefined when the step has no context_engine_result event', async () => {
it('sets contextEngine to undefined when no CE was recorded for the step', async () => {
const prevStep = {
contextEngine: { input: { messages: ['hello'] }, output: { tokens: 42 } },
stepIndex: 0,
@@ -898,3 +898,82 @@ describe('AgentRuntimeService.executeStep - error-path snapshot finalize ()', ()
dispatchSpy.mockRestore();
});
});
// step_start event should carry the canonical UIChatMessage[] so the
// client can use the pushed payload as Source of Truth.
describe('AgentRuntimeService.executeStep - step_start uiMessages payload', () => {
const createService = () => {
return new AgentRuntimeService({} as any, 'user-1', { queueService: null });
};
it('attaches uiMessages to step_start data when topic context is known', async () => {
const service = createService();
const coordinator = (service as any).coordinator;
const streamManager = (service as any).streamManager;
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
// Force early-exit path so we don't need to mock the entire runtime
// execution surface — terminal-state short-circuits right after
// step_start publishes.
coordinator.loadAgentState = vi.fn().mockResolvedValue({
status: 'done',
stepCount: 3,
lastModified: new Date().toISOString(),
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
});
streamManager.publishStreamEvent = vi.fn().mockResolvedValue(undefined);
// Inject a uiMessages-returning messageService — the runtime queries
// through MessageService (not the bare messageModel) so that file URLs
// go through FileService postProcessUrl.
const stubMessages = [{ id: 'msg_1', role: 'user' }];
(service as any).messageServiceInstance = {
queryMessages: vi.fn().mockResolvedValue(stubMessages),
};
await service.executeStep({
operationId: 'op-uimsg',
stepIndex: 5,
context: { phase: 'user_input' } as any,
});
// First publish call is step_start; assert its payload carries uiMessages.
const stepStartCall = streamManager.publishStreamEvent.mock.calls.find(
([, evt]: any) => evt?.type === 'step_start',
);
expect(stepStartCall).toBeDefined();
expect(stepStartCall[1].data.uiMessages).toEqual(stubMessages);
});
it('omits uiMessages from step_start data when topic context is unknown', async () => {
const service = createService();
const coordinator = (service as any).coordinator;
const streamManager = (service as any).streamManager;
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
coordinator.loadAgentState = vi.fn().mockResolvedValue({
status: 'done',
stepCount: 3,
lastModified: new Date().toISOString(),
metadata: {}, // no agentId/topicId
});
streamManager.publishStreamEvent = vi.fn().mockResolvedValue(undefined);
const queryMock = vi.fn();
(service as any).messageServiceInstance = { queryMessages: queryMock };
await service.executeStep({
operationId: 'op-noctx',
stepIndex: 5,
context: { phase: 'user_input' } as any,
});
const stepStartCall = streamManager.publishStreamEvent.mock.calls.find(
([, evt]: any) => evt?.type === 'step_start',
);
expect(stepStartCall).toBeDefined();
expect(stepStartCall[1].data).not.toHaveProperty('uiMessages');
// Did not even attempt the DB query when context is missing.
expect(queryMock).not.toHaveBeenCalled();
});
});
@@ -256,7 +256,11 @@ describe('AgentRuntimeService.executeSync', () => {
// Publish event after a small delay
setTimeout(async () => {
await streamEventManager.publishAgentRuntimeEnd(operationId, 1, { status: 'done' });
await streamEventManager.publishAgentRuntimeEnd({
finalState: { status: 'done' },
operationId,
stepIndex: 1,
});
}, 10);
const event = await streamEventManager.waitForEvent(operationId, 'agent_runtime_end', 1000);
+7 -1
View File
@@ -873,7 +873,13 @@ export class AiAgentService {
if (!result.success) {
log('execAgent: remote hetero dispatch failed: %s', result.error);
await streamManager
.publishAgentRuntimeEnd(operationId, 0, { error: result.error }, 'error', result.error)
.publishAgentRuntimeEnd({
finalState: { error: result.error },
operationId,
reason: 'error',
reasonDetail: result.error,
stepIndex: 0,
})
.catch(() => {});
await this.messageModel.update(assistantMsg.id, {
content: '',
+13
View File
@@ -2,6 +2,7 @@ import { type LobeChatDatabase } from '@lobechat/database';
import { CompressionRepository } from '@lobechat/database';
import {
type CreateMessageParams,
type QueryMessageParams,
type UIChatMessage,
type UpdateMessageParams,
} from '@lobechat/types';
@@ -111,6 +112,18 @@ export class MessageService {
return { messages, success: true };
}
/**
* Fetch the canonical message list for an agent / topic scope, using the
* standard UIChatMessage shape (file URLs resolved through FileService).
*
* Mirrors the read path exposed by the `message.getMessages` trpc lambda
* so server-internal callers (e.g. agent runtime stream events) can push
* the same payload the client would otherwise fetch.
*/
async queryMessages(params: QueryMessageParams): Promise<UIChatMessage[]> {
return this.messageModel.query(params, this.getQueryOptions());
}
/**
* Create a new message and return the complete message list
* Pattern: create + query
+28
View File
@@ -0,0 +1,28 @@
import { type DeviceAttachment } from '@lobechat/builtin-tool-remote-device';
import { lambdaClient } from '@/libs/trpc/client';
export const deviceService = {
/**
* List all online devices bound to the current user.
* Returns devices from the device-gateway via tRPC.
*/
listDevices: async (): Promise<DeviceAttachment[]> => {
try {
return await lambdaClient.device.listDevices.query();
} catch {
return [];
}
},
/**
* Check if the user has any online devices.
*/
getStatus: async (): Promise<{ deviceCount: number; online: boolean }> => {
try {
return await lambdaClient.device.status.query();
} catch {
return { deviceCount: 0, online: false };
}
},
};
@@ -58,17 +58,26 @@ const getRuntimeEnvConfigById = (agentId: string) => (s: AgentStoreState) =>
const isLocalSystemEnabledById = (agentId: string) => (s: AgentStoreState) =>
getRuntimeModeById(agentId)(s) === 'local';
/** Get the selected device ID for the agent (desktop only) */
const getDeviceIdById =
(agentId: string) =>
(s: AgentStoreState): string | undefined =>
getChatConfigById(agentId)(s).runtimeEnv?.deviceId;
/**
* Get runtime environment mode by agent ID.
* Reads from `runtimeMode[platform]`, defaults to 'local' on desktop, 'none' on web.
* Legacy 'cloud' values are normalized to 'sandbox' for backward compatibility.
*/
const getRuntimeModeById =
(agentId: string) =>
(s: AgentStoreState): RuntimeEnvMode => {
const runtimeEnv = getChatConfigById(agentId)(s).runtimeEnv;
const platform = isDesktop ? 'desktop' : 'web';
const mode = runtimeEnv?.runtimeMode?.[platform] ?? (isDesktop ? 'local' : 'none');
return runtimeEnv?.runtimeMode?.[platform] ?? (isDesktop ? 'local' : 'none');
// Legacy backward compatibility: map 'cloud' to 'sandbox'
return mode === 'cloud' ? 'sandbox' : mode;
};
const getSkillActivateModeById =
@@ -78,6 +87,7 @@ const getSkillActivateModeById =
export const chatConfigByIdSelectors = {
getChatConfigById,
getDeviceIdById,
getEnableHistoryCountById,
getHistoryCountById,
getRuntimeEnvConfigById,
@@ -34,8 +34,10 @@ const isMemoryToolEnabled = (s: AgentStoreState) =>
const isLocalSystemEnabled = (s: AgentStoreState) =>
chatConfigByIdSelectors.isLocalSystemEnabledById(s.activeAgentId || '')(s);
const isCloudSandboxEnabled = (s: AgentStoreState) =>
chatConfigByIdSelectors.getRuntimeModeById(s.activeAgentId || '')(s) === 'cloud';
const isCloudSandboxEnabled = (s: AgentStoreState) => {
const mode = chatConfigByIdSelectors.getRuntimeModeById(s.activeAgentId || '')(s);
return mode === 'cloud' || mode === 'sandbox';
};
const skillActivateMode = (s: AgentStoreState) =>
chatConfigByIdSelectors.getSkillActivateModeById(s.activeAgentId || '')(s);