mirror of
https://github.com/lobehub/lobe-chat.git
synced 2026-06-19 22:00:34 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e56dcf9538 | |||
| 0fa2e2349c | |||
| 930344ae23 | |||
| 538195dfb4 |
@@ -14,7 +14,7 @@ In `NODE_ENV=development`, `AgentRuntimeService.executeStep()` automatically rec
|
||||
|
||||
**Data flow**: executeStep loop -> build `StepPresentationData` -> write partial snapshot to disk -> on completion, finalize to `.agent-tracing/{timestamp}_{traceId}.json`
|
||||
|
||||
**Context engine capture**: In `RuntimeExecutors.ts`, the `call_llm` executor emits a `context_engine_result` event after `serverMessagesEngine()` processes messages. This event carries the full `contextEngineInput` (DB messages, systemRole, model, knowledge, tools, userMemory, etc.) and the processed `output` messages (the final LLM payload).
|
||||
**Context engine capture**: In `RuntimeExecutors.ts`, the `call_llm` executor calls `ctx.tracingContextEngine(input, output)` after `serverMessagesEngine()` processes messages. `AgentRuntimeService.executeStep` buffers the call per step and forwards it to `OperationTraceRecorder.appendStep` as the typed `contextEngine` field. CE flows through this side channel rather than the `events` array so its heavy payload (agentDocuments, systemRole, …) never enters the Redis state pipeline (LOBE-9110).
|
||||
|
||||
## Package Location
|
||||
|
||||
@@ -199,9 +199,10 @@ interface StepSnapshot {
|
||||
messages?: any[]; // DB messages before step
|
||||
context?: { phase: string; payload?: unknown; stepContext?: unknown };
|
||||
events?: Array<{ type: string; [key: string]: unknown }>;
|
||||
// context_engine_result event contains:
|
||||
// input: full contextEngineInput (messages, systemRole, model, knowledge, tools, userMemory, ...)
|
||||
// output: processed messages array (final LLM payload)
|
||||
contextEngine?: {
|
||||
input?: unknown; // contextEngineInput minus messages + toolsConfig (reconstructible from baseline)
|
||||
output?: unknown; // processed messages array (final LLM payload)
|
||||
};
|
||||
}
|
||||
```
|
||||
|
||||
@@ -216,5 +217,5 @@ When using `--messages`, the output shows three sections (if context engine data
|
||||
## Integration Points
|
||||
|
||||
- **Recording**: `src/server/services/agentRuntime/AgentRuntimeService.ts` — in the `executeStep()` method, after building `stepPresentationData`, writes partial snapshot in dev mode
|
||||
- **Context engine event**: `src/server/modules/AgentRuntime/RuntimeExecutors.ts` — in `call_llm` executor, after `serverMessagesEngine()` returns, emits `context_engine_result` event
|
||||
- **Context engine capture**: `src/server/modules/AgentRuntime/RuntimeExecutors.ts` — in `call_llm` executor, after `serverMessagesEngine()` returns, calls `ctx.tracingContextEngine(input, output)`. `AgentRuntimeService.executeStep` buffers it per step and passes it to `traceRecorder.appendStep` as the typed `contextEngine` field (kept off the `events` array to stay out of Redis state).
|
||||
- **Store**: `FileSnapshotStore` reads/writes to `.agent-tracing/` relative to `process.cwd()`
|
||||
|
||||
@@ -0,0 +1,93 @@
|
||||
# LobeHub gateway streaming + tab-switch test harness
|
||||
|
||||
Captures store + DOM state at 200ms intervals so we can prove or disprove
|
||||
claims like "切回 tab 后消息回到了很早以前". Built for gateway-mode chat but
|
||||
works for any LobeHub streaming session.
|
||||
|
||||
## Files
|
||||
|
||||
`scripts/agent-gateway/`
|
||||
|
||||
| File | Role |
|
||||
| --------------- | ---------------------------------------------------------------- |
|
||||
| `probe.js` | Injects a 200ms sampler + `__PROBE_EVENT` marker + `__switchTab` |
|
||||
| `probe-dump.js` | Stops the sampler and returns `{events, samples}` as JSON string |
|
||||
| `tab-switch.js` | Runs N round-trip switches between two tabs, marks each step |
|
||||
| `analyze.mjs` | Node post-processor: timeline + regression detection |
|
||||
|
||||
## Standard workflow
|
||||
|
||||
```bash
|
||||
# 1. Start Electron with CDP
|
||||
./.agents/skills/local-testing/scripts/electron-dev.sh start
|
||||
|
||||
# 2. Navigate to a chat, switch runtime to Cloud Sandbox (gateway mode)
|
||||
|
||||
# 3. Install the probe + helpers
|
||||
agent-browser --cdp 9222 eval --stdin \
|
||||
< .agents/skills/local-testing/scripts/agent-gateway/probe.js
|
||||
|
||||
# 4. Send a tool-call message — manually or via type+press
|
||||
agent-browser --cdp 9222 eval "window.__PROBE_EVENT('SENT')"
|
||||
|
||||
# 5. Run the multi-switch driver (auto-picks active tab as BACK and the
|
||||
# rightmost inactive tab as AWAY — edit ROUND_TRIPS / DWELL_MS in the
|
||||
# file if you want different timing)
|
||||
agent-browser --cdp 9222 eval --stdin \
|
||||
< .agents/skills/local-testing/scripts/agent-gateway/tab-switch.js
|
||||
|
||||
# 6. Wait for streaming to finish, then dump
|
||||
agent-browser --cdp 9222 eval --stdin \
|
||||
< .agents/skills/local-testing/scripts/agent-gateway/probe-dump.js \
|
||||
> /tmp/probe.json
|
||||
|
||||
# 7. Analyze
|
||||
node .agents/skills/local-testing/scripts/agent-gateway/analyze.mjs /tmp/probe.json
|
||||
```
|
||||
|
||||
The analyzer prints three sections: EVENTS, TIMELINE, REGRESSIONS. If
|
||||
REGRESSIONS is non-empty it means content/reasoning/childN dropped on the
|
||||
same topic — the symptom users describe.
|
||||
|
||||
## What the probe tracks (and why)
|
||||
|
||||
`chat.messagesMap` only stores the top-level `assistantGroup` shell. The
|
||||
actual streamed content, reasoning, and tool calls live in
|
||||
`assistantGroup.children: AssistantContentBlock[]`. Any probe that only
|
||||
reads `m.content` / `m.reasoning` will see zeros throughout streaming and
|
||||
miss everything that matters. probe.js walks both levels and sums:
|
||||
|
||||
- `cT` total content length
|
||||
- `rT` total reasoning length
|
||||
- `toolT` total tool-call count
|
||||
- `childN` number of content blocks
|
||||
|
||||
Plus DOM-side signals (`domLen`, search/crawl indicator counts) so you can
|
||||
tell store-side regressions apart from render-side regressions.
|
||||
|
||||
## Gotchas
|
||||
|
||||
- **Optimistic new-topic state.** Before the first chunk lands, messages
|
||||
live under the `<scope>_new` key with `tmp_*` ids and no `topicId` field.
|
||||
probe.js falls back to those when `activeTopicId` is null.
|
||||
- **Reasoning resets to 0 are not bugs.** When the assistant finishes
|
||||
thinking and starts tool-use or text, the streaming reasoning buffer
|
||||
empties and the finalised reasoning gets sealed into a completed block.
|
||||
Filter these out manually if needed.
|
||||
- **DOM length jitters by a handful of chars** because counters like "(10)"
|
||||
in tool-call labels change as results arrive. analyze.mjs only flags
|
||||
`domLen` drops greater than 100 chars to ignore that noise.
|
||||
- **Never identify tabs by innerText.** The active tab's text embeds a
|
||||
` · <agent name>` suffix, so a search like `'LobeHub Growth'` matches the
|
||||
active tab when the active agent happens to be LobeHub Growth — and you
|
||||
end up clicking the tab you're already on. probe.js uses the stable
|
||||
`data-contextmenu-trigger` attribute (a React `useId()` value that's set
|
||||
per-tab and survives focus changes) plus `data-active="true"` to mark
|
||||
the active one. Helpers exposed:
|
||||
`__listTabs()` / `__clickTabByKey(key)` / `__clickTabByIndex(i)` /
|
||||
`__activeTabKey()`.
|
||||
- **`tab-switch.js` fires-and-forgets.** The IIFE kicks off an async loop
|
||||
and returns immediately so the agent-browser CLI eval doesn't blow past
|
||||
its default 25 s timeout. Wait on the `SWITCH_LOOP_DONE` event marker
|
||||
before dumping. Re-running while a loop is in flight is refused — the
|
||||
chaotic data from overlapping runs is not worth debugging.
|
||||
@@ -0,0 +1,119 @@
|
||||
#!/usr/bin/env node
|
||||
// Analyze a probe dump captured by probe.js + probe-dump.js.
|
||||
//
|
||||
// node analyze.mjs /tmp/probe.json
|
||||
//
|
||||
// Prints:
|
||||
// 1. EVENTS — user-action markers with their relative timestamps
|
||||
// 2. TIMELINE — periodic samples (~1 per second + event-adjacent samples)
|
||||
// showing every interesting field; columns:
|
||||
// t(ms) | runOps | msgN | childN | content | reasoning | tools | domLen | search | crawl | topic | event
|
||||
// 3. REGRESSIONS — every place a tracked counter *dropped* on the same
|
||||
// topic between adjacent samples. A "true" UI rollback shows up as a
|
||||
// drop in content/reasoning/tools/childN/domLen without a topic change.
|
||||
//
|
||||
// Whitelisted transitions (not flagged):
|
||||
// - topic change → all drops expected (focus moved away)
|
||||
// - reasoning length 0 after content starts → reasoning gets sealed into a
|
||||
// completed sub-block; the parent's running reasoning resets to ''.
|
||||
// - msgN drop when topic transitions from `_new` placeholder to a real id.
|
||||
|
||||
import fs from 'node:fs';
|
||||
|
||||
const file = process.argv[2];
|
||||
if (!file) {
|
||||
console.error('usage: node analyze.mjs <probe.json>');
|
||||
process.exit(1);
|
||||
}
|
||||
|
||||
const raw = JSON.parse(fs.readFileSync(file, 'utf8'));
|
||||
// probe-dump.js wraps the payload in JSON.stringify so agent-browser returns
|
||||
// it as a single quoted string. Unwrap.
|
||||
const data = typeof raw === 'string' ? JSON.parse(raw) : raw;
|
||||
const { events, samples } = data;
|
||||
|
||||
const fmt = {
|
||||
pad(v, n) {
|
||||
return String(v).padStart(n);
|
||||
},
|
||||
};
|
||||
|
||||
console.log('=== EVENTS ===');
|
||||
for (const e of events) console.log(` t=${fmt.pad(e.t, 7)} ${e.name}`);
|
||||
|
||||
console.log(
|
||||
'\n=== TIMELINE (~1s cadence, plus event-adjacent samples) ===\n' +
|
||||
' t(ms) runOps msgN childN content reasoning tools domLen search crawl topic event',
|
||||
);
|
||||
|
||||
let lastSampledAt = -1e9;
|
||||
const eventBuckets = events.map((e) => e.t);
|
||||
for (let i = 0; i < samples.length; i++) {
|
||||
const s = samples[i];
|
||||
const nearEvent = eventBuckets.some((et) => Math.abs(et - s.t) < 110);
|
||||
if (!nearEvent && s.t - lastSampledAt < 1000) continue;
|
||||
lastSampledAt = s.t;
|
||||
|
||||
const ev = events.find((e) => Math.abs(e.t - s.t) < 110);
|
||||
const evMarker = ev ? ` ◀ ${ev.name}` : '';
|
||||
const topicSuffix = s.topicId ? s.topicId.slice(-6) : '(none)';
|
||||
const search = s.ind?.search ?? 0;
|
||||
const crawl = s.ind?.crawl ?? 0;
|
||||
console.log(
|
||||
` ${fmt.pad(s.t, 6)} ` +
|
||||
`${fmt.pad(s.runOps, 6)} ` +
|
||||
`${fmt.pad(s.msgN, 4)} ` +
|
||||
`${fmt.pad(s.childN ?? 0, 5)} ` +
|
||||
`${fmt.pad(s.cT ?? 0, 8)} ` +
|
||||
`${fmt.pad(s.rT ?? 0, 9)} ` +
|
||||
`${fmt.pad(s.toolT ?? 0, 5)} ` +
|
||||
`${fmt.pad(s.domLen ?? 0, 7)} ` +
|
||||
`${fmt.pad(search, 6)} ` +
|
||||
`${fmt.pad(crawl, 5)} ` +
|
||||
`${topicSuffix.padEnd(8)}${evMarker}`,
|
||||
);
|
||||
}
|
||||
|
||||
console.log('\n=== REGRESSIONS (same topic, value dropped) ===');
|
||||
const regressions = [];
|
||||
for (let i = 1; i < samples.length; i++) {
|
||||
const prev = samples[i - 1];
|
||||
const cur = samples[i];
|
||||
if (!cur.topicId || prev.topicId !== cur.topicId) continue;
|
||||
|
||||
const drops = [];
|
||||
if (cur.msgN < prev.msgN) drops.push(`msgN: ${prev.msgN}→${cur.msgN}`);
|
||||
if ((cur.childN ?? 0) < (prev.childN ?? 0)) drops.push(`childN: ${prev.childN}→${cur.childN}`);
|
||||
if ((cur.cT ?? 0) < (prev.cT ?? 0)) drops.push(`content: ${prev.cT}→${cur.cT}`);
|
||||
if ((cur.rT ?? 0) < (prev.rT ?? 0)) drops.push(`reasoning: ${prev.rT}→${cur.rT}`);
|
||||
if ((cur.toolT ?? 0) < (prev.toolT ?? 0)) drops.push(`tools: ${prev.toolT}→${cur.toolT}`);
|
||||
// domLen jitters by a few chars from counter labels — only flag big drops.
|
||||
if ((cur.domLen ?? 0) < (prev.domLen ?? 0) - 100) {
|
||||
drops.push(`domLen: ${prev.domLen}→${cur.domLen}`);
|
||||
}
|
||||
if (drops.length === 0) continue;
|
||||
|
||||
const nearbyEv = events.filter((e) => Math.abs(e.t - cur.t) < 600).map((e) => e.name);
|
||||
regressions.push({ t: cur.t, topic: cur.topicId.slice(-6), drops, nearbyEv });
|
||||
}
|
||||
|
||||
if (regressions.length === 0) {
|
||||
console.log(' (none)');
|
||||
} else {
|
||||
for (const r of regressions) {
|
||||
const evStr = r.nearbyEv.length ? ` near:[${r.nearbyEv.join(',')}]` : '';
|
||||
console.log(` t=${fmt.pad(r.t, 7)} topic=${r.topic} ${r.drops.join(' | ')}${evStr}`);
|
||||
}
|
||||
}
|
||||
|
||||
console.log(`\n=== SUMMARY ===`);
|
||||
console.log(` samples: ${samples.length}`);
|
||||
console.log(` events: ${events.length}`);
|
||||
console.log(` regressions: ${regressions.length}`);
|
||||
if (samples.length) {
|
||||
const last = samples.at(-1);
|
||||
console.log(
|
||||
` final: msgN=${last.msgN} childN=${last.childN ?? 0} content=${last.cT ?? 0} ` +
|
||||
`reasoning=${last.rT ?? 0} tools=${last.toolT ?? 0} runOps=${last.runOps}`,
|
||||
);
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
// Stop the probe and serialize collected data.
|
||||
//
|
||||
// agent-browser --cdp 9222 eval --stdin < probe-dump.js > /tmp/probe.json
|
||||
//
|
||||
// The whole thing is wrapped in a JSON.stringify so agent-browser returns it
|
||||
// as a single quoted string — the analyzer double-parses to handle that.
|
||||
|
||||
(function () {
|
||||
if (window.__PROBE_TIMER) {
|
||||
clearInterval(window.__PROBE_TIMER);
|
||||
window.__PROBE_TIMER = null;
|
||||
}
|
||||
return JSON.stringify({
|
||||
events: window.__PROBE_EVENTS || [],
|
||||
samples: window.__PROBE_SAMPLES || [],
|
||||
});
|
||||
})();
|
||||
@@ -0,0 +1,204 @@
|
||||
// LobeHub chat streaming time-series probe.
|
||||
//
|
||||
// Inject into the renderer (via agent-browser eval) to record store + DOM
|
||||
// snapshots every 200ms during a streaming session. Designed to surface
|
||||
// "UI rolled back to an earlier state" symptoms — especially around
|
||||
// gateway-mode tab switches that happen while the assistant is still writing.
|
||||
//
|
||||
// Usage:
|
||||
// agent-browser --cdp 9222 eval --stdin < probe.js
|
||||
// # ...do test interactions, call window.__PROBE_EVENT('LABEL') to mark moments...
|
||||
// agent-browser --cdp 9222 eval --stdin < probe-dump.js > /tmp/probe.json
|
||||
// node analyze.mjs /tmp/probe.json
|
||||
//
|
||||
// What it captures per sample:
|
||||
// - activeTopicId
|
||||
// - msgN: top-level messages in chat.messagesMap for this topic
|
||||
// - childN: total assistantGroup.children blocks across all msgs (THIS is
|
||||
// where streaming content actually lives — top-level assistantGroup stays empty)
|
||||
// - cT / rT / toolT: totals across messages AND their children
|
||||
// (content, reasoning, tool-call count)
|
||||
// - perMsg: per-message breakdown so regressions can be located precisely
|
||||
// - runOps: number of running operations (execServerAgentRuntime etc.)
|
||||
// - domLen: total innerText length of the rendered chat list area
|
||||
// - ind: visible UI indicators (Search pages, Crawled pages, Deeply Thought, Sending)
|
||||
//
|
||||
// Event markers: window.__PROBE_EVENT('NAME') records {t, name} into
|
||||
// __PROBE_EVENTS, used by the analyzer to align state changes with
|
||||
// user-driven actions (SENT, AWAY_1, BACK_1, ...).
|
||||
|
||||
(function () {
|
||||
if (window.__PROBE_TIMER) clearInterval(window.__PROBE_TIMER);
|
||||
window.__PROBE_SAMPLES = [];
|
||||
window.__PROBE_EVENTS = [];
|
||||
const t0 = Date.now();
|
||||
|
||||
function snapshot() {
|
||||
try {
|
||||
const chat = window.__LOBE_STORES.chat();
|
||||
const topicId = chat.activeTopicId;
|
||||
const idTail = topicId ? topicId.replace('tpc_', '') : null;
|
||||
const keys = Object.keys(chat.messagesMap || {});
|
||||
|
||||
// Collect messages for the active topic. Before a topic is committed,
|
||||
// optimistic messages live under the `<agentScope>_new` key — fall
|
||||
// back to those when no topic is active yet.
|
||||
let msgs = [];
|
||||
if (idTail) {
|
||||
keys.forEach((k) => {
|
||||
if (k.includes(idTail)) msgs = msgs.concat(chat.messagesMap[k] || []);
|
||||
});
|
||||
} else {
|
||||
keys
|
||||
.filter((k) => k.endsWith('_new'))
|
||||
.forEach((k) => {
|
||||
msgs = msgs.concat(chat.messagesMap[k] || []);
|
||||
});
|
||||
}
|
||||
|
||||
// Walk top-level + assistantGroup.children. children carry the actual
|
||||
// streamed content / reasoning / tool calls; the parent assistantGroup
|
||||
// remains a placeholder (cLen=0, rLen=0) for its whole lifetime.
|
||||
let totalContent = 0;
|
||||
let totalReason = 0;
|
||||
let totalTools = 0;
|
||||
let childCount = 0;
|
||||
const perMsg = msgs.map((m) => {
|
||||
const cLen = (m.content || '').length;
|
||||
const rLen = ((m.reasoning && m.reasoning.content) || '').length;
|
||||
const tools = (m.tools || []).length;
|
||||
totalContent += cLen;
|
||||
totalReason += rLen;
|
||||
totalTools += tools;
|
||||
|
||||
const children = m.children || [];
|
||||
let chC = 0;
|
||||
let chR = 0;
|
||||
let chT = 0;
|
||||
children.forEach((c) => {
|
||||
chC += (c.content || '').length;
|
||||
chR += ((c.reasoning && c.reasoning.content) || '').length;
|
||||
chT += (c.tools || []).length;
|
||||
});
|
||||
totalContent += chC;
|
||||
totalReason += chR;
|
||||
totalTools += chT;
|
||||
childCount += children.length;
|
||||
|
||||
return {
|
||||
id: (m.id || '').slice(-8),
|
||||
role: m.role,
|
||||
cLen,
|
||||
rLen,
|
||||
tools,
|
||||
chCount: children.length,
|
||||
chC,
|
||||
chR,
|
||||
chT,
|
||||
};
|
||||
});
|
||||
|
||||
const ops = Object.values(chat.operations || {});
|
||||
const runningOps = ops.filter((o) => o.status === 'running');
|
||||
|
||||
// DOM probe: total rendered text in the chat scroll area (proxy for
|
||||
// "how much is actually visible to the user").
|
||||
const convScroll =
|
||||
document.querySelector(
|
||||
'[data-chat-list], [class*="ChatList"], [class*="ConversationList"]',
|
||||
) ||
|
||||
document.querySelector('main [class*="scroll"]') ||
|
||||
document.querySelector('main');
|
||||
const domTxt = convScroll ? convScroll.innerText || '' : '';
|
||||
|
||||
const bodyTxt = document.body.innerText || '';
|
||||
const searchMatches = (bodyTxt.match(/Search pages?:|Searched the web/g) || []).length;
|
||||
const crawlMatches = (bodyTxt.match(/Crawl(ed|ing) pages?/g) || []).length;
|
||||
|
||||
window.__PROBE_SAMPLES.push({
|
||||
t: Date.now() - t0,
|
||||
topicId,
|
||||
msgN: msgs.length,
|
||||
childN: childCount,
|
||||
cT: totalContent,
|
||||
rT: totalReason,
|
||||
toolT: totalTools,
|
||||
perMsg,
|
||||
runOps: runningOps.length,
|
||||
runOpTypes: runningOps.map((o) => o.type),
|
||||
domLen: domTxt.length,
|
||||
ind: {
|
||||
search: searchMatches,
|
||||
crawl: crawlMatches,
|
||||
sending: bodyTxt.includes('Sending message'),
|
||||
deeplyThinking: bodyTxt.includes('Deeply Thinking'),
|
||||
deeplyThought: bodyTxt.includes('Deeply Thought'),
|
||||
},
|
||||
});
|
||||
} catch (e) {
|
||||
window.__PROBE_SAMPLES.push({ t: Date.now() - t0, err: e.message });
|
||||
}
|
||||
}
|
||||
|
||||
snapshot();
|
||||
window.__PROBE_TIMER = setInterval(snapshot, 200);
|
||||
window.__PROBE_EVENT = function (name) {
|
||||
window.__PROBE_EVENTS.push({ t: Date.now() - t0, name });
|
||||
};
|
||||
|
||||
// Tab-switch helpers installed alongside the probe.
|
||||
//
|
||||
// The Electron tab bar mounts each tab as a div with data-insp-path
|
||||
// ending in `TabItem.tsx:...`. The active tab is marked with
|
||||
// data-active="true". DO NOT search by innerText — the active tab's text
|
||||
// includes a ` · <agent name>` suffix that produces false matches when
|
||||
// your search string happens to overlap with the agent name.
|
||||
function listTabs() {
|
||||
return Array.from(
|
||||
document.querySelectorAll('[data-insp-path*="TabItem.tsx"][data-contextmenu-trigger]'),
|
||||
).filter((t) => t.getBoundingClientRect().top < 30);
|
||||
}
|
||||
function tabKey(el) {
|
||||
// Stable for the tab's lifetime; survives focus changes.
|
||||
return el.getAttribute('data-contextmenu-trigger');
|
||||
}
|
||||
function findActiveTab() {
|
||||
return listTabs().find((t) => t.getAttribute('data-active') === 'true') || null;
|
||||
}
|
||||
|
||||
// Click by stable key captured earlier (preferred for round-trips).
|
||||
window.__clickTabByKey = function (key) {
|
||||
const tab = listTabs().find((t) => tabKey(t) === key);
|
||||
if (!tab) return 'not found: key=' + key;
|
||||
if (tab.getAttribute('data-active') === 'true') return 'already active: ' + key;
|
||||
tab.click();
|
||||
return 'clicked key=' + key;
|
||||
};
|
||||
|
||||
// Click by index in the tab strip (0-based, left-to-right).
|
||||
window.__clickTabByIndex = function (i) {
|
||||
const tabs = listTabs();
|
||||
if (i < 0 || i >= tabs.length) return 'index out of range: ' + i + '/' + tabs.length;
|
||||
const t = tabs[i];
|
||||
if (t.getAttribute('data-active') === 'true') return 'already active: i=' + i;
|
||||
t.click();
|
||||
return 'clicked i=' + i + ' key=' + tabKey(t);
|
||||
};
|
||||
|
||||
// Snapshot all tabs in order: [{key, active, title (first 60 chars of innerText)}]
|
||||
window.__listTabs = function () {
|
||||
return listTabs().map((t, i) => ({
|
||||
i,
|
||||
key: tabKey(t),
|
||||
active: t.getAttribute('data-active') === 'true',
|
||||
title: (t.innerText || '').slice(0, 60),
|
||||
}));
|
||||
};
|
||||
|
||||
window.__activeTabKey = function () {
|
||||
const a = findActiveTab();
|
||||
return a ? tabKey(a) : null;
|
||||
};
|
||||
|
||||
return 'probe installed';
|
||||
})();
|
||||
@@ -0,0 +1,72 @@
|
||||
// Run N round-trip tab switches with event markers timed against the probe.
|
||||
//
|
||||
// agent-browser --cdp 9222 eval --stdin < tab-switch.js
|
||||
//
|
||||
// Captures the currently-active tab as the BACK target and the rightmost
|
||||
// inactive tab as the AWAY target. Both are addressed by their stable
|
||||
// data-contextmenu-trigger key (NOT by visible title — the active tab's
|
||||
// innerText embeds a ` · <agent name>` suffix that breaks text matching).
|
||||
//
|
||||
// Fires the loop in the background and returns immediately so the
|
||||
// agent-browser eval doesn't have to await the full ROUND_TRIPS × DWELL_MS
|
||||
// duration. Wait on the `SWITCH_LOOP_DONE` event before dumping.
|
||||
//
|
||||
// Refuses to launch if a previous loop is still in flight.
|
||||
//
|
||||
// Requires probe.js to have been installed first (provides
|
||||
// window.__PROBE_EVENT / __listTabs / __clickTabByKey / __activeTabKey).
|
||||
|
||||
(function () {
|
||||
const ROUND_TRIPS = 4;
|
||||
const DWELL_MS = 10_000;
|
||||
|
||||
if (!window.__PROBE_EVENT || !window.__listTabs || !window.__clickTabByKey) {
|
||||
return 'probe not installed — eval probe.js first';
|
||||
}
|
||||
if (window.__SWITCH_LOOP_RUNNING) {
|
||||
return 'switch loop already running — wait for SWITCH_LOOP_DONE first';
|
||||
}
|
||||
|
||||
const tabs = window.__listTabs();
|
||||
const activeTab = tabs.find((t) => t.active);
|
||||
if (!activeTab) return 'no active tab — abort';
|
||||
|
||||
// Pick the first inactive tab as AWAY target. With multiple inactive tabs
|
||||
// you'll usually want the one that's stable across the test — feel free
|
||||
// to swap to tabs[tabs.length-1] if you want the rightmost.
|
||||
const inactives = tabs.filter((t) => !t.active);
|
||||
if (inactives.length === 0) return 'no inactive tab to switch to — abort';
|
||||
const awayTab = inactives.at(-1); // rightmost inactive
|
||||
|
||||
const BACK_KEY = activeTab.key;
|
||||
const AWAY_KEY = awayTab.key;
|
||||
|
||||
window.__SWITCH_LOOP_RUNNING = true;
|
||||
window.__PROBE_EVENT('SWITCH_LOOP_CONFIG:back=' + BACK_KEY + ',away=' + AWAY_KEY);
|
||||
|
||||
(async function () {
|
||||
function sleep(ms) {
|
||||
return new Promise((r) => setTimeout(r, ms));
|
||||
}
|
||||
|
||||
try {
|
||||
window.__PROBE_EVENT('SWITCH_LOOP_START');
|
||||
for (let i = 1; i <= ROUND_TRIPS; i++) {
|
||||
window.__PROBE_EVENT('AWAY_' + i);
|
||||
const awayResult = window.__clickTabByKey(AWAY_KEY);
|
||||
window.__PROBE_EVENT('AWAY_' + i + '_RES:' + awayResult.slice(0, 50));
|
||||
await sleep(DWELL_MS);
|
||||
|
||||
window.__PROBE_EVENT('BACK_' + i);
|
||||
const backResult = window.__clickTabByKey(BACK_KEY);
|
||||
window.__PROBE_EVENT('BACK_' + i + '_RES:' + backResult.slice(0, 50));
|
||||
await sleep(DWELL_MS);
|
||||
}
|
||||
window.__PROBE_EVENT('SWITCH_LOOP_DONE');
|
||||
} finally {
|
||||
window.__SWITCH_LOOP_RUNNING = false;
|
||||
}
|
||||
})();
|
||||
|
||||
return 'switch loop kicked off (BACK=' + BACK_KEY + ', AWAY=' + AWAY_KEY + ')';
|
||||
})();
|
||||
@@ -172,36 +172,25 @@ export default class GatewayConnectionCtr extends ControllerModule {
|
||||
request: AgentRunRequestMessage,
|
||||
): Promise<{ reason?: string; status: 'accepted' | 'rejected' }> {
|
||||
try {
|
||||
const ctr = this.heterogeneousAgentCtr;
|
||||
const serverUrl = await this.remoteServerConfigCtr.getRemoteServerUrl();
|
||||
if (!serverUrl) {
|
||||
return { reason: 'Remote server URL not configured', status: 'rejected' };
|
||||
}
|
||||
|
||||
// Map agentType to binary name.
|
||||
// claude-code → `claude` CLI; all other platforms use their type name as the binary.
|
||||
const command = request.agentType === 'claude-code' ? 'claude' : request.agentType;
|
||||
|
||||
// Create a session for the hetero agent.
|
||||
const { sessionId } = await ctr.startSession({
|
||||
// Fire-and-forget: lh hetero exec handles spawn -> adapt ->
|
||||
// BatchIngester -> heteroIngest/heteroFinish -> server -> Gateway -> clients.
|
||||
// Same command as spawnHeteroSandbox() on the server side.
|
||||
this.heterogeneousAgentCtr.spawnLhHeteroExec({
|
||||
agentType: request.agentType,
|
||||
args: [],
|
||||
command,
|
||||
cwd: request.cwd,
|
||||
// Inject LOBEHUB_JWT so the CLI authenticates against heteroIngest.
|
||||
env: { LOBEHUB_JWT: request.jwt },
|
||||
jwt: request.jwt,
|
||||
operationId: request.operationId,
|
||||
prompt: request.prompt,
|
||||
resumeSessionId: request.resumeSessionId,
|
||||
serverUrl,
|
||||
topicId: request.topicId,
|
||||
});
|
||||
|
||||
// Fire-and-forget: sendPrompt runs the CLI until completion.
|
||||
ctr
|
||||
.sendPrompt({
|
||||
operationId: request.operationId,
|
||||
prompt: request.prompt,
|
||||
sessionId,
|
||||
})
|
||||
.catch((err: Error) => {
|
||||
// Errors are surfaced via heteroFinish on the server side.
|
||||
// Log locally for desktop debugging only.
|
||||
console.error('[GatewayConnectionCtr] agent run failed:', err.message);
|
||||
});
|
||||
|
||||
return { status: 'accepted' };
|
||||
} catch (err) {
|
||||
const reason = err instanceof Error ? err.message : String(err);
|
||||
|
||||
@@ -1251,4 +1251,69 @@ export default class HeterogeneousAgentCtr extends ControllerModule {
|
||||
process.on('SIGTERM', onSignal);
|
||||
process.on('SIGINT', onSignal);
|
||||
}
|
||||
|
||||
/**
|
||||
* Spawn `lh hetero exec` for gateway-driven agent runs.
|
||||
* The `lh` CLI handles everything downstream — no local
|
||||
* AgentStreamPipeline or IPC broadcast needed. Mirrors
|
||||
* `spawnHeteroSandbox()` on the server side.
|
||||
*/
|
||||
spawnLhHeteroExec(params: {
|
||||
agentType: string;
|
||||
cwd?: string;
|
||||
jwt: string;
|
||||
operationId: string;
|
||||
prompt: string;
|
||||
resumeSessionId?: string;
|
||||
serverUrl: string;
|
||||
topicId: string;
|
||||
}): void {
|
||||
const { agentType, cwd, jwt, operationId, prompt, resumeSessionId, serverUrl, topicId } =
|
||||
params;
|
||||
const workDir = cwd ?? process.cwd();
|
||||
|
||||
const args = [
|
||||
'hetero',
|
||||
'exec',
|
||||
'--type',
|
||||
agentType,
|
||||
'--operation-id',
|
||||
operationId,
|
||||
'--topic',
|
||||
topicId,
|
||||
'--render',
|
||||
'none',
|
||||
'--input-json',
|
||||
'-',
|
||||
'--cwd',
|
||||
workDir,
|
||||
...(resumeSessionId ? ['--resume', resumeSessionId] : []),
|
||||
];
|
||||
|
||||
const env = {
|
||||
...process.env,
|
||||
...buildProxyEnv(this.app.storeManager.get('networkProxy')),
|
||||
LOBEHUB_JWT: jwt,
|
||||
LOBEHUB_SERVER: serverUrl,
|
||||
};
|
||||
|
||||
logger.info('spawnLhHeteroExec: type=%s op=%s topic=%s', agentType, operationId, topicId);
|
||||
|
||||
const child = spawn('lh', args, {
|
||||
cwd: workDir,
|
||||
env,
|
||||
stdio: ['pipe', 'inherit', 'inherit'],
|
||||
});
|
||||
|
||||
child.stdin.write(JSON.stringify(prompt));
|
||||
child.stdin.end();
|
||||
|
||||
child.on('error', (err) => {
|
||||
logger.error('spawnLhHeteroExec: spawn failed — %s', err.message);
|
||||
});
|
||||
|
||||
child.on('exit', (code, signal) => {
|
||||
logger.info('spawnLhHeteroExec: exited — op=%s code=%s signal=%s', operationId, code, signal);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -411,12 +411,16 @@
|
||||
"rag.userQuery.actions.regenerate": "Regenerate Query",
|
||||
"regenerate": "Regenerate",
|
||||
"roleAndArchive": "Agent Profile & History",
|
||||
"runtimeEnv.device.empty": "No devices available. Connect to gateway from the desktop app first.",
|
||||
"runtimeEnv.mode.cloud": "Cloud Sandbox",
|
||||
"runtimeEnv.mode.cloudDesc": "Run in a secure cloud sandbox",
|
||||
"runtimeEnv.mode.local": "Local",
|
||||
"runtimeEnv.mode.localDesc": "Access local files and commands",
|
||||
"runtimeEnv.mode.none": "Off",
|
||||
"runtimeEnv.mode.noneDesc": "Disable runtime environment",
|
||||
"runtimeEnv.mode.sandbox": "Sandbox",
|
||||
"runtimeEnv.mode.sandboxDesc": "Run in an isolated cloud sandbox",
|
||||
"runtimeEnv.section.device": "Device",
|
||||
"runtimeEnv.selectMode": "Select Runtime Environment",
|
||||
"runtimeEnv.title": "Runtime Environment",
|
||||
"search.grounding.imageSearchQueries": "Image Search Keywords",
|
||||
|
||||
@@ -411,12 +411,16 @@
|
||||
"rag.userQuery.actions.regenerate": "重新生成 Query",
|
||||
"regenerate": "重新生成",
|
||||
"roleAndArchive": "助理档案与记录",
|
||||
"runtimeEnv.device.empty": "暂无可用设备,请先在桌面端连接到网关",
|
||||
"runtimeEnv.mode.cloud": "云端沙箱",
|
||||
"runtimeEnv.mode.cloudDesc": "在安全的云端沙箱中运行",
|
||||
"runtimeEnv.mode.local": "本地",
|
||||
"runtimeEnv.mode.localDesc": "访问本地文件和命令",
|
||||
"runtimeEnv.mode.none": "关闭",
|
||||
"runtimeEnv.mode.noneDesc": "禁用运行时环境",
|
||||
"runtimeEnv.mode.sandbox": "沙箱",
|
||||
"runtimeEnv.mode.sandboxDesc": "在隔离的云端沙箱中运行",
|
||||
"runtimeEnv.section.device": "设备",
|
||||
"runtimeEnv.selectMode": "选择运行环境",
|
||||
"runtimeEnv.title": "运行环境",
|
||||
"search.grounding.imageSearchQueries": "图片搜索关键词",
|
||||
|
||||
@@ -22,7 +22,6 @@ import { desensitizeUrl } from '../../utils/desensitizeUrl';
|
||||
import { getModelPricing } from '../../utils/getModelPricing';
|
||||
import { isAccountDeactivatedError } from '../../utils/isAccountDeactivatedError';
|
||||
import { isExceededContextWindowError } from '../../utils/isExceededContextWindowError';
|
||||
import { isModelNotFoundError } from '../../utils/isModelNotFoundError';
|
||||
import { isQuotaLimitError } from '../../utils/isQuotaLimitError';
|
||||
import { MODEL_LIST_CONFIGS, processModelList } from '../../utils/modelParse';
|
||||
import { StreamingResponse } from '../../utils/response';
|
||||
@@ -333,15 +332,6 @@ export const handleDefaultAnthropicError = <T extends Record<string, any> = any>
|
||||
};
|
||||
}
|
||||
|
||||
if (isModelNotFoundError(errorMsg)) {
|
||||
return {
|
||||
endpoint: desensitizedEndpoint,
|
||||
error: errorResult,
|
||||
errorType: AgentRuntimeErrorType.ModelNotFound,
|
||||
message,
|
||||
};
|
||||
}
|
||||
|
||||
if (isExceededContextWindowError(errorMsg)) {
|
||||
return {
|
||||
endpoint: desensitizedEndpoint,
|
||||
@@ -746,16 +736,6 @@ export const createAnthropicCompatibleRuntime = <T extends Record<string, any> =
|
||||
});
|
||||
}
|
||||
|
||||
if (isModelNotFoundError(errorMsg)) {
|
||||
return AgentRuntimeError.chat({
|
||||
endpoint: desensitizedEndpoint,
|
||||
error: errorResult,
|
||||
errorType: AgentRuntimeErrorType.ModelNotFound,
|
||||
message,
|
||||
provider: this.id,
|
||||
});
|
||||
}
|
||||
|
||||
if (isExceededContextWindowError(errorMsg)) {
|
||||
return AgentRuntimeError.chat({
|
||||
endpoint: desensitizedEndpoint,
|
||||
|
||||
@@ -42,7 +42,6 @@ import { handleOpenAIError } from '../../utils/handleOpenAIError';
|
||||
import { isAccountDeactivatedError } from '../../utils/isAccountDeactivatedError';
|
||||
import { isExceededContextWindowError } from '../../utils/isExceededContextWindowError';
|
||||
import { isInsufficientQuotaError } from '../../utils/isInsufficientQuotaError';
|
||||
import { isModelNotFoundError } from '../../utils/isModelNotFoundError';
|
||||
import { isQuotaLimitError } from '../../utils/isQuotaLimitError';
|
||||
import { postProcessModelList } from '../../utils/postProcessModelList';
|
||||
import {
|
||||
@@ -1157,17 +1156,6 @@ export const createOpenAICompatibleRuntime = <T extends Record<string, any> = an
|
||||
});
|
||||
}
|
||||
|
||||
if (isModelNotFoundError(errorMsg)) {
|
||||
log('model not found error detected from message');
|
||||
return AgentRuntimeError.chat({
|
||||
endpoint: desensitizedEndpoint,
|
||||
error: errorResult,
|
||||
errorType: AgentRuntimeErrorType.ModelNotFound,
|
||||
message,
|
||||
provider: this.id,
|
||||
});
|
||||
}
|
||||
|
||||
if (isInsufficientQuotaError(errorMsg)) {
|
||||
log('insufficient quota error detected from message');
|
||||
return AgentRuntimeError.chat({
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import type { ILobeAgentRuntimeErrorType } from '../types/error';
|
||||
import { AgentRuntimeErrorType } from '../types/error';
|
||||
import { isExceededContextWindowError } from './isExceededContextWindowError';
|
||||
import { isModelNotFoundError } from './isModelNotFoundError';
|
||||
import { isQuotaLimitError } from './isQuotaLimitError';
|
||||
|
||||
export interface ParsedError {
|
||||
@@ -113,10 +112,6 @@ export function parseGoogleErrorMessage(message: string): ParsedError {
|
||||
return { error: { message }, errorType: AgentRuntimeErrorType.ProviderNoImageGenerated };
|
||||
}
|
||||
|
||||
if (isModelNotFoundError(message)) {
|
||||
return { error: { message }, errorType: AgentRuntimeErrorType.ModelNotFound };
|
||||
}
|
||||
|
||||
if (isExceededContextWindowError(message)) {
|
||||
return { error: { message }, errorType: AgentRuntimeErrorType.ExceededContextWindow };
|
||||
}
|
||||
|
||||
@@ -1,84 +0,0 @@
|
||||
import { describe, expect, it } from 'vitest';
|
||||
|
||||
import { isModelNotFoundError } from './isModelNotFoundError';
|
||||
|
||||
describe('isModelNotFoundError', () => {
|
||||
it('should return false for undefined/empty input', () => {
|
||||
expect(isModelNotFoundError(undefined)).toBe(false);
|
||||
expect(isModelNotFoundError('')).toBe(false);
|
||||
});
|
||||
|
||||
it('should detect "model not found" errors', () => {
|
||||
expect(isModelNotFoundError('The model gpt-5 was not found')).toBe(false);
|
||||
expect(isModelNotFoundError('model not found: gpt-5')).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect "model_not_found" code in message', () => {
|
||||
expect(isModelNotFoundError('Error: model_not_found')).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect "model ... does not exist" (OpenAI)', () => {
|
||||
expect(
|
||||
isModelNotFoundError('The model `gpt-5` does not exist or you do not have access to it.'),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect "model or endpoint ... does not exist" (Volcengine/doubao)', () => {
|
||||
expect(
|
||||
isModelNotFoundError(
|
||||
'The model or endpoint doubao-seed-2.0-pro does not exist or you do not have access to it.',
|
||||
),
|
||||
).toBe(true);
|
||||
});
|
||||
|
||||
it('should NOT match "does not exist" without model context', () => {
|
||||
// API key errors that incidentally say "does not exist"
|
||||
expect(isModelNotFoundError('Your API key does not exist')).toBe(false);
|
||||
// Deployment/endpoint errors
|
||||
expect(isModelNotFoundError('The deployment for this resource does not exist')).toBe(false);
|
||||
// Generic resource errors
|
||||
expect(isModelNotFoundError('This user does not exist')).toBe(false);
|
||||
expect(isModelNotFoundError('The organization does not exist')).toBe(false);
|
||||
});
|
||||
|
||||
it('should NOT match when "model" and "does not exist" are in different sentences', () => {
|
||||
expect(
|
||||
isModelNotFoundError(
|
||||
'This feature does not exist in your plan. Contact support to enable the model.',
|
||||
),
|
||||
).toBe(false);
|
||||
expect(isModelNotFoundError('The model is fine. Your account does not exist.')).toBe(false);
|
||||
});
|
||||
|
||||
it('should detect "no such model" errors', () => {
|
||||
expect(isModelNotFoundError('no such model: custom-model-v1')).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect "not found model" errors', () => {
|
||||
expect(isModelNotFoundError('not found model abc-123')).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect "model is not accessible" errors', () => {
|
||||
expect(isModelNotFoundError('The model is not accessible with your current plan')).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect "model is not available" errors', () => {
|
||||
expect(isModelNotFoundError('The requested model is not available in this region')).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect "invalid model" errors', () => {
|
||||
expect(isModelNotFoundError('invalid model: test-model')).toBe(true);
|
||||
});
|
||||
|
||||
it('should be case-insensitive', () => {
|
||||
expect(isModelNotFoundError('MODEL NOT FOUND')).toBe(true);
|
||||
expect(isModelNotFoundError('The Model Does Not Exist')).toBe(true);
|
||||
});
|
||||
|
||||
it('should return false for unrelated error messages', () => {
|
||||
expect(isModelNotFoundError('Insufficient Balance')).toBe(false);
|
||||
expect(isModelNotFoundError('Invalid API key')).toBe(false);
|
||||
expect(isModelNotFoundError('Rate limit reached')).toBe(false);
|
||||
expect(isModelNotFoundError('context length exceeded')).toBe(false);
|
||||
});
|
||||
});
|
||||
@@ -1,32 +0,0 @@
|
||||
const MODEL_NOT_FOUND_PATTERNS = [
|
||||
'model not found', // OpenAI / generic
|
||||
'model_not_found', // OpenAI (code in message)
|
||||
'no such model', // generic
|
||||
'not found model', // some providers
|
||||
'model is not accessible', // access-related model errors
|
||||
'model is not available', // generic
|
||||
'invalid model', // generic
|
||||
];
|
||||
|
||||
// "does not exist" on its own is too broad (it can show up in API key,
|
||||
// deployment, or unrelated errors). Require explicit model context:
|
||||
// the word "model" must appear before "does not exist" within the same
|
||||
// sentence. The char class excludes sentence terminators (. ! ?) but
|
||||
// allows a period when followed by a digit, so version numbers like
|
||||
// "doubao-seed-2.0-pro" don't accidentally break the match.
|
||||
//
|
||||
// Matches:
|
||||
// - OpenAI: "The model `gpt-5` does not exist or you do not have access to it."
|
||||
// - Volcengine: "The model or endpoint doubao-seed-2.0-pro does not exist..."
|
||||
// Correctly ignores:
|
||||
// - "Your API key does not exist"
|
||||
// - "The deployment for this resource does not exist"
|
||||
// - "The model is fine. Your account does not exist." (different sentences)
|
||||
const MODEL_DOES_NOT_EXIST_REGEX = /\bmodel\b(?:[^!.?\n]|\.(?=\d))+?\bdoes not exist\b/i;
|
||||
|
||||
export const isModelNotFoundError = (message?: string): boolean => {
|
||||
if (!message) return false;
|
||||
const lower = message.toLowerCase();
|
||||
if (MODEL_NOT_FOUND_PATTERNS.some((p) => lower.includes(p))) return true;
|
||||
return MODEL_DOES_NOT_EXIST_REGEX.test(message);
|
||||
};
|
||||
@@ -9,11 +9,12 @@ export type AgentMode = 'auto' | 'plan' | 'ask' | 'implement';
|
||||
|
||||
/**
|
||||
* Runtime environment mode
|
||||
* - local: Access local files and commands (desktop only)
|
||||
* - cloud: Run in cloud sandbox
|
||||
* - local: Run on a specific device (desktop only, requires deviceId)
|
||||
* - sandbox: Run in isolated cloud sandbox
|
||||
* - cloud: @deprecated Use 'sandbox' instead, kept for backward compatibility
|
||||
* - none: No runtime environment
|
||||
*/
|
||||
export type RuntimeEnvMode = 'cloud' | 'local' | 'none';
|
||||
export type RuntimeEnvMode = 'cloud' | 'local' | 'none' | 'sandbox';
|
||||
|
||||
export type RuntimePlatform = 'desktop' | 'web';
|
||||
|
||||
@@ -21,6 +22,11 @@ export type RuntimePlatform = 'desktop' | 'web';
|
||||
* Runtime environment configuration
|
||||
*/
|
||||
export interface RuntimeEnvConfig {
|
||||
/**
|
||||
* Device ID when runtimeMode is 'local' (desktop only).
|
||||
* Identifies which bound device to run on.
|
||||
*/
|
||||
deviceId?: string;
|
||||
/**
|
||||
* Runtime environment mode per platform
|
||||
*/
|
||||
|
||||
@@ -170,9 +170,10 @@ export interface LobeAgentChatConfig extends AgentMemoryChatConfig, AgentSelfIte
|
||||
/**
|
||||
* Zod schema for RuntimeEnvConfig
|
||||
*/
|
||||
const runtimeEnvModeEnum = z.enum(['local', 'cloud', 'none']);
|
||||
const runtimeEnvModeEnum = z.enum(['local', 'cloud', 'none', 'sandbox']);
|
||||
|
||||
export const RuntimeEnvConfigSchema = z.object({
|
||||
deviceId: z.string().optional(),
|
||||
runtimeMode: z.record(z.string(), runtimeEnvModeEnum).optional(),
|
||||
workingDirectory: z.string().optional(),
|
||||
});
|
||||
|
||||
@@ -0,0 +1,109 @@
|
||||
import { type DeviceAttachment } from '@lobechat/builtin-tool-remote-device';
|
||||
import { Flexbox, Icon } from '@lobehub/ui';
|
||||
import { createStaticStyles, cssVar, cx } from 'antd-style';
|
||||
import { LaptopIcon, MonitorIcon, ServerIcon } from 'lucide-react';
|
||||
import { memo } from 'react';
|
||||
|
||||
const styles = createStaticStyles(({ css }) => ({
|
||||
deviceName: css`
|
||||
font-size: 13px;
|
||||
font-weight: 500;
|
||||
color: ${cssVar.colorText};
|
||||
`,
|
||||
deviceOption: css`
|
||||
cursor: pointer;
|
||||
|
||||
width: 100%;
|
||||
padding-block: 8px;
|
||||
padding-inline: 8px;
|
||||
border-radius: ${cssVar.borderRadius};
|
||||
|
||||
transition: background-color 0.2s;
|
||||
|
||||
&:hover {
|
||||
background: ${cssVar.colorFillTertiary};
|
||||
}
|
||||
`,
|
||||
deviceOptionActive: css`
|
||||
background: ${cssVar.colorFillTertiary};
|
||||
`,
|
||||
deviceOptionDesc: css`
|
||||
font-size: 12px;
|
||||
color: ${cssVar.colorTextDescription};
|
||||
`,
|
||||
deviceOptionIcon: css`
|
||||
flex-shrink: 0;
|
||||
border: 1px solid ${cssVar.colorFillTertiary};
|
||||
border-radius: ${cssVar.borderRadius};
|
||||
background: ${cssVar.colorBgElevated};
|
||||
`,
|
||||
sectionTitle: css`
|
||||
padding-block: 6px 2px;
|
||||
padding-inline: 8px;
|
||||
|
||||
font-size: 11px;
|
||||
font-weight: 500;
|
||||
color: ${cssVar.colorTextQuaternary};
|
||||
text-transform: uppercase;
|
||||
letter-spacing: 0.5px;
|
||||
`,
|
||||
}));
|
||||
|
||||
const PLATFORM_ICONS: Record<string, typeof LaptopIcon> = {
|
||||
darwin: LaptopIcon,
|
||||
linux: MonitorIcon,
|
||||
win32: MonitorIcon,
|
||||
};
|
||||
|
||||
interface DeviceSelectorProps {
|
||||
activeDeviceId?: string;
|
||||
devices: DeviceAttachment[];
|
||||
onSelect: (deviceId: string) => void;
|
||||
}
|
||||
|
||||
export const DeviceSelector = memo<DeviceSelectorProps>(
|
||||
({ activeDeviceId, devices, onSelect }) => {
|
||||
return (
|
||||
<>
|
||||
{devices.map((device) => {
|
||||
const IconComp = PLATFORM_ICONS[device.platform] || ServerIcon;
|
||||
const isActive = activeDeviceId === device.deviceId;
|
||||
|
||||
return (
|
||||
<Flexbox
|
||||
horizontal
|
||||
align={'flex-start'}
|
||||
className={cx(styles.deviceOption, isActive && styles.deviceOptionActive)}
|
||||
gap={12}
|
||||
key={device.deviceId}
|
||||
onClick={() => onSelect(device.deviceId)}
|
||||
>
|
||||
<Flexbox
|
||||
align={'center'}
|
||||
className={styles.deviceOptionIcon}
|
||||
height={32}
|
||||
justify={'center'}
|
||||
width={32}
|
||||
>
|
||||
<Icon icon={IconComp} size={16} />
|
||||
</Flexbox>
|
||||
<Flexbox flex={1}>
|
||||
<div className={styles.deviceName}>{device.hostname}</div>
|
||||
<div className={styles.deviceOptionDesc}>{device.platform}</div>
|
||||
</Flexbox>
|
||||
</Flexbox>
|
||||
);
|
||||
})}
|
||||
</>
|
||||
);
|
||||
},
|
||||
);
|
||||
|
||||
DeviceSelector.displayName = 'DeviceSelector';
|
||||
|
||||
/** Section header for device/sandbox/none groups */
|
||||
export const SectionHeader = memo<{ label: string }>(({ label }) => (
|
||||
<div className={styles.sectionTitle}>{label}</div>
|
||||
));
|
||||
|
||||
SectionHeader.displayName = 'SectionHeader';
|
||||
@@ -4,6 +4,7 @@ import { Github } from '@lobehub/icons';
|
||||
import { Flexbox, Icon, Popover, Skeleton, Tooltip } from '@lobehub/ui';
|
||||
import { createStaticStyles, cssVar, cx } from 'antd-style';
|
||||
import {
|
||||
BoxIcon,
|
||||
ChevronDownIcon,
|
||||
CloudIcon,
|
||||
FolderIcon,
|
||||
@@ -12,9 +13,10 @@ import {
|
||||
MonitorOffIcon,
|
||||
SquircleDashed,
|
||||
} from 'lucide-react';
|
||||
import { memo, type ReactNode, useCallback, useMemo, useState } from 'react';
|
||||
import { memo, type ReactNode, useCallback, useEffect, useMemo, useState } from 'react';
|
||||
import { useTranslation } from 'react-i18next';
|
||||
|
||||
import { deviceService } from '@/services/device';
|
||||
import { useAgentStore } from '@/store/agent';
|
||||
import { agentByIdSelectors, chatConfigByIdSelectors } from '@/store/agent/selectors';
|
||||
import { useChatStore } from '@/store/chat';
|
||||
@@ -26,6 +28,7 @@ import { useUpdateAgentConfig } from '../hooks/useUpdateAgentConfig';
|
||||
import { useChatInputStore } from '../store';
|
||||
import ApprovalMode from './ApprovalMode';
|
||||
import CloudRepoSwitcher from './CloudRepoSwitcher';
|
||||
import { DeviceSelector, SectionHeader } from './DeviceSelector';
|
||||
import GitStatus from './GitStatus';
|
||||
import ModeSelector from './ModeSelector';
|
||||
import { useRepoType } from './useRepoType';
|
||||
@@ -35,6 +38,7 @@ const MODE_ICONS: Record<RuntimeEnvMode, typeof LaptopIcon> = {
|
||||
cloud: CloudIcon,
|
||||
local: LaptopIcon,
|
||||
none: MonitorOffIcon,
|
||||
sandbox: BoxIcon,
|
||||
};
|
||||
|
||||
const styles = createStaticStyles(({ css }) => ({
|
||||
@@ -63,6 +67,11 @@ const styles = createStaticStyles(({ css }) => ({
|
||||
background: ${cssVar.colorFillSecondary};
|
||||
}
|
||||
`,
|
||||
divider: css`
|
||||
height: 1px;
|
||||
margin-block: 4px;
|
||||
background: ${cssVar.colorBorderSecondary};
|
||||
`,
|
||||
modeDesc: css`
|
||||
font-size: 12px;
|
||||
color: ${cssVar.colorTextTertiary};
|
||||
@@ -107,16 +116,21 @@ const RuntimeConfig = memo(() => {
|
||||
const { updateAgentChatConfig } = useUpdateAgentConfig();
|
||||
const [dirPopoverOpen, setDirPopoverOpen] = useState(false);
|
||||
const [modePopoverOpen, setModePopoverOpen] = useState(false);
|
||||
const [devices, setDevices] = useState<Awaited<ReturnType<typeof deviceService.listDevices>>>([]);
|
||||
const [devicesLoading, setDevicesLoading] = useState(false);
|
||||
const showContextWindow = useChatInputStore((s) =>
|
||||
s.rightActions.flat().includes('contextWindow'),
|
||||
);
|
||||
|
||||
const [isLoading, runtimeMode, isHeterogeneous, enableAgentMode] = useAgentStore((s) => [
|
||||
agentByIdSelectors.isAgentConfigLoadingById(agentId)(s),
|
||||
chatConfigByIdSelectors.getRuntimeModeById(agentId)(s),
|
||||
agentId ? agentByIdSelectors.isAgentHeterogeneousById(agentId)(s) : false,
|
||||
agentByIdSelectors.getAgentEnableModeById(agentId)(s),
|
||||
]);
|
||||
const [isLoading, runtimeMode, isHeterogeneous, enableAgentMode, deviceId] = useAgentStore(
|
||||
(s) => [
|
||||
agentByIdSelectors.isAgentConfigLoadingById(agentId)(s),
|
||||
chatConfigByIdSelectors.getRuntimeModeById(agentId)(s),
|
||||
agentId ? agentByIdSelectors.isAgentHeterogeneousById(agentId)(s) : false,
|
||||
agentByIdSelectors.getAgentEnableModeById(agentId)(s),
|
||||
chatConfigByIdSelectors.getDeviceIdById(agentId)(s),
|
||||
],
|
||||
);
|
||||
|
||||
const topicWorkingDirectory = useChatStore(topicSelectors.currentTopicWorkingDirectory);
|
||||
const agentWorkingDirectory = useAgentStore((s) =>
|
||||
@@ -126,6 +140,17 @@ const RuntimeConfig = memo(() => {
|
||||
|
||||
const repoType = useRepoType(effectiveWorkingDirectory);
|
||||
|
||||
// Fetch device list when popover opens (desktop only)
|
||||
useEffect(() => {
|
||||
if (modePopoverOpen && isDesktop) {
|
||||
setDevicesLoading(true);
|
||||
deviceService.listDevices().then((list) => {
|
||||
setDevices(list);
|
||||
setDevicesLoading(false);
|
||||
});
|
||||
}
|
||||
}, [modePopoverOpen]);
|
||||
|
||||
const dirIconNode = useMemo((): ReactNode => {
|
||||
if (!effectiveWorkingDirectory) return <Icon icon={SquircleDashed} size={14} />;
|
||||
if (repoType === 'github') return <Github size={14} />;
|
||||
@@ -134,18 +159,43 @@ const RuntimeConfig = memo(() => {
|
||||
}, [effectiveWorkingDirectory, repoType]);
|
||||
|
||||
const switchMode = useCallback(
|
||||
async (mode: RuntimeEnvMode) => {
|
||||
if (mode === runtimeMode) return;
|
||||
async (mode: RuntimeEnvMode, opts?: { deviceId?: string }) => {
|
||||
if (mode === runtimeMode && opts?.deviceId === deviceId) return;
|
||||
|
||||
const platform = isDesktop ? 'desktop' : 'web';
|
||||
|
||||
await updateAgentChatConfig({
|
||||
runtimeEnv: { runtimeMode: { [platform]: mode } },
|
||||
runtimeEnv: {
|
||||
deviceId: opts?.deviceId,
|
||||
runtimeMode: { [platform]: mode },
|
||||
},
|
||||
});
|
||||
},
|
||||
[runtimeMode, updateAgentChatConfig],
|
||||
[runtimeMode, deviceId, updateAgentChatConfig],
|
||||
);
|
||||
|
||||
// Compute the display label for the mode button
|
||||
const activeDevice = useMemo(
|
||||
() => (deviceId ? devices.find((d) => d.deviceId === deviceId) : undefined),
|
||||
[deviceId, devices],
|
||||
);
|
||||
|
||||
const ModeIcon = MODE_ICONS[runtimeMode] || LaptopIcon;
|
||||
|
||||
const modeLabel = useMemo(() => {
|
||||
// When running on a specific device, show device hostname
|
||||
if (runtimeMode === 'local' && activeDevice) {
|
||||
return activeDevice.hostname;
|
||||
}
|
||||
return t(`runtimeEnv.mode.${runtimeMode}`);
|
||||
}, [runtimeMode, activeDevice, t]);
|
||||
|
||||
const displayName = effectiveWorkingDirectory
|
||||
? effectiveWorkingDirectory.split('/').findLast(Boolean) || effectiveWorkingDirectory
|
||||
: tPlugin('localSystem.workingDirectory.notSet');
|
||||
|
||||
const hasDevices = devices.length > 0;
|
||||
|
||||
// Skeleton placeholder to prevent layout jump during loading
|
||||
if (!agentId || isLoading) {
|
||||
return (
|
||||
@@ -156,66 +206,93 @@ const RuntimeConfig = memo(() => {
|
||||
);
|
||||
}
|
||||
|
||||
const ModeIcon = MODE_ICONS[runtimeMode];
|
||||
const modeLabel = t(`runtimeEnv.mode.${runtimeMode}`);
|
||||
|
||||
const displayName = effectiveWorkingDirectory
|
||||
? effectiveWorkingDirectory.split('/').findLast(Boolean) || effectiveWorkingDirectory
|
||||
: tPlugin('localSystem.workingDirectory.notSet');
|
||||
|
||||
const modes: { desc: string; icon: typeof LaptopIcon; label: string; mode: RuntimeEnvMode }[] = [
|
||||
// Local mode is desktop-only
|
||||
...(isDesktop
|
||||
? [
|
||||
{
|
||||
desc: t('runtimeEnv.mode.localDesc'),
|
||||
icon: LaptopIcon,
|
||||
label: t('runtimeEnv.mode.local'),
|
||||
mode: 'local' as RuntimeEnvMode,
|
||||
},
|
||||
]
|
||||
: []),
|
||||
{
|
||||
desc: t('runtimeEnv.mode.cloudDesc'),
|
||||
icon: CloudIcon,
|
||||
label: t('runtimeEnv.mode.cloud'),
|
||||
mode: 'cloud',
|
||||
},
|
||||
{
|
||||
desc: t('runtimeEnv.mode.noneDesc'),
|
||||
icon: MonitorOffIcon,
|
||||
label: t('runtimeEnv.mode.none'),
|
||||
mode: 'none',
|
||||
},
|
||||
];
|
||||
// ─── Popover Content ───
|
||||
|
||||
const modeContent = (
|
||||
<Flexbox gap={4} style={{ minWidth: 280 }}>
|
||||
{modes.map(({ mode, icon, label, desc }) => (
|
||||
{/* ── Device section (desktop only) ── */}
|
||||
{isDesktop && (
|
||||
<>
|
||||
<SectionHeader label={t('runtimeEnv.section.device')} />
|
||||
{devicesLoading ? (
|
||||
<Flexbox paddingBlock={12} paddingInline={8}>
|
||||
<Skeleton.Button
|
||||
active
|
||||
size="small"
|
||||
style={{ height: 16, marginBottom: 4, width: '60%' }}
|
||||
/>
|
||||
<Skeleton.Button active size="small" style={{ height: 12, width: '40%' }} />
|
||||
</Flexbox>
|
||||
) : hasDevices ? (
|
||||
<DeviceSelector
|
||||
activeDeviceId={deviceId}
|
||||
devices={devices}
|
||||
onSelect={(id) => switchMode('local', { deviceId: id })}
|
||||
/>
|
||||
) : (
|
||||
<Flexbox
|
||||
className={styles.modeOptionDesc}
|
||||
paddingBlock={8}
|
||||
paddingInline={8}
|
||||
>
|
||||
{t('runtimeEnv.device.empty')}
|
||||
</Flexbox>
|
||||
)}
|
||||
|
||||
<div className={styles.divider} />
|
||||
</>
|
||||
)}
|
||||
|
||||
{/* ── Sandbox ── */}
|
||||
<Flexbox
|
||||
horizontal
|
||||
align={'flex-start'}
|
||||
gap={12}
|
||||
className={cx(
|
||||
styles.modeOption,
|
||||
(runtimeMode === 'sandbox' || runtimeMode === 'cloud') && styles.modeOptionActive,
|
||||
)}
|
||||
onClick={() => switchMode('sandbox')}
|
||||
>
|
||||
<Flexbox
|
||||
horizontal
|
||||
align={'flex-start'}
|
||||
className={cx(styles.modeOption, runtimeMode === mode && styles.modeOptionActive)}
|
||||
gap={12}
|
||||
key={mode}
|
||||
onClick={() => switchMode(mode)}
|
||||
align={'center'}
|
||||
className={styles.modeOptionIcon}
|
||||
flex={'none'}
|
||||
height={32}
|
||||
justify={'center'}
|
||||
width={32}
|
||||
>
|
||||
<Flexbox
|
||||
align={'center'}
|
||||
className={styles.modeOptionIcon}
|
||||
flex={'none'}
|
||||
height={32}
|
||||
justify={'center'}
|
||||
width={32}
|
||||
>
|
||||
<Icon icon={icon} />
|
||||
</Flexbox>
|
||||
<Flexbox flex={1}>
|
||||
<div className={styles.modeOptionTitle}>{label}</div>
|
||||
<div className={styles.modeOptionDesc}>{desc}</div>
|
||||
</Flexbox>
|
||||
<Icon icon={BoxIcon} />
|
||||
</Flexbox>
|
||||
))}
|
||||
<Flexbox flex={1}>
|
||||
<div className={styles.modeOptionTitle}>{t('runtimeEnv.mode.sandbox')}</div>
|
||||
<div className={styles.modeOptionDesc}>{t('runtimeEnv.mode.sandboxDesc')}</div>
|
||||
</Flexbox>
|
||||
</Flexbox>
|
||||
|
||||
{/* ── Disabled ── */}
|
||||
<Flexbox
|
||||
horizontal
|
||||
align={'flex-start'}
|
||||
className={cx(styles.modeOption, runtimeMode === 'none' && styles.modeOptionActive)}
|
||||
gap={12}
|
||||
onClick={() => switchMode('none')}
|
||||
>
|
||||
<Flexbox
|
||||
align={'center'}
|
||||
className={styles.modeOptionIcon}
|
||||
flex={'none'}
|
||||
height={32}
|
||||
justify={'center'}
|
||||
width={32}
|
||||
>
|
||||
<Icon icon={MonitorOffIcon} />
|
||||
</Flexbox>
|
||||
<Flexbox flex={1}>
|
||||
<div className={styles.modeOptionTitle}>{t('runtimeEnv.mode.none')}</div>
|
||||
<div className={styles.modeOptionDesc}>{t('runtimeEnv.mode.noneDesc')}</div>
|
||||
</Flexbox>
|
||||
</Flexbox>
|
||||
</Flexbox>
|
||||
);
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
import { type AgentState } from '@lobechat/agent-runtime';
|
||||
import { type UIChatMessage } from '@lobechat/types';
|
||||
import debug from 'debug';
|
||||
|
||||
import { type AgentOperationMetadata, type StepResult } from './AgentStateManager';
|
||||
@@ -43,6 +44,17 @@ export interface AgentRuntimeCoordinatorOptions {
|
||||
* Defaults to automatic selection based on Redis availability
|
||||
*/
|
||||
streamEventManager?: IStreamEventManager;
|
||||
/**
|
||||
* Resolve the canonical UIChatMessage[] snapshot for a terminal-state
|
||||
* agent run, attached to `agent_runtime_end` events so the client can
|
||||
* use the pushed payload as Source of Truth instead of refetching from
|
||||
* DB.
|
||||
*
|
||||
* Optional: when omitted (e.g. tests, embedded usage without DB access)
|
||||
* the coordinator falls back to publishing without `uiMessages` and the
|
||||
* client behaves as before.
|
||||
*/
|
||||
uiMessagesResolver?: (state: AgentState) => Promise<UIChatMessage[] | undefined>;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -59,10 +71,12 @@ export interface AgentRuntimeCoordinatorOptions {
|
||||
export class AgentRuntimeCoordinator {
|
||||
private stateManager: IAgentStateManager;
|
||||
private streamEventManager: IStreamEventManager;
|
||||
private uiMessagesResolver?: (state: AgentState) => Promise<UIChatMessage[] | undefined>;
|
||||
|
||||
constructor(options?: AgentRuntimeCoordinatorOptions) {
|
||||
this.stateManager = options?.stateManager ?? createAgentStateManager();
|
||||
this.streamEventManager = options?.streamEventManager ?? createStreamEventManager();
|
||||
this.uiMessagesResolver = options?.uiMessagesResolver;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -94,6 +108,22 @@ export class AgentRuntimeCoordinator {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke the optional uiMessagesResolver and shield callers from its
|
||||
* failures — stream-event publishing must never fail the surrounding
|
||||
* save. Errors are logged and surfaced to the client as a missing field,
|
||||
* which falls back to the legacy refresh path.
|
||||
*/
|
||||
private async resolveUiMessages(state: AgentState): Promise<UIChatMessage[] | undefined> {
|
||||
if (!this.uiMessagesResolver) return undefined;
|
||||
try {
|
||||
return await this.uiMessagesResolver(state);
|
||||
} catch (error) {
|
||||
console.error('Failed to resolve uiMessages for agent_runtime_end:', error);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Save Agent state and handle corresponding events
|
||||
*/
|
||||
@@ -106,12 +136,13 @@ export class AgentRuntimeCoordinator {
|
||||
|
||||
// Send a terminal event once the operation first enters a terminal state.
|
||||
if (hasEnteredStreamEndState(previousState?.status, state.status)) {
|
||||
await this.streamEventManager.publishAgentRuntimeEnd(
|
||||
await this.streamEventManager.publishAgentRuntimeEnd({
|
||||
finalState: state,
|
||||
operationId,
|
||||
state.stepCount ?? previousState?.stepCount ?? 0,
|
||||
state,
|
||||
state.status,
|
||||
);
|
||||
reason: state.status,
|
||||
stepIndex: state.stepCount ?? previousState?.stepCount ?? 0,
|
||||
uiMessages: await this.resolveUiMessages(state),
|
||||
});
|
||||
log('[%s] Agent runtime reached terminal state: %s', operationId, state.status);
|
||||
}
|
||||
} catch (error) {
|
||||
@@ -133,12 +164,14 @@ export class AgentRuntimeCoordinator {
|
||||
|
||||
// This ensures agent_runtime_end is sent after all step events.
|
||||
if (hasEnteredStreamEndState(previousState?.status, stepResult.newState.status)) {
|
||||
await this.streamEventManager.publishAgentRuntimeEnd(
|
||||
await this.streamEventManager.publishAgentRuntimeEnd({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0,
|
||||
stepResult.newState,
|
||||
stepResult.newState.status,
|
||||
);
|
||||
reason: stepResult.newState.status,
|
||||
stepIndex:
|
||||
stepResult.newState.stepCount ?? stepResult.stepIndex ?? previousState?.stepCount ?? 0,
|
||||
uiMessages: await this.resolveUiMessages(stepResult.newState),
|
||||
});
|
||||
log(
|
||||
'[%s] Agent runtime reached terminal state after step result: %s',
|
||||
operationId,
|
||||
|
||||
@@ -7,7 +7,7 @@ import {
|
||||
type StreamChunkData,
|
||||
type StreamEvent,
|
||||
} from './StreamEventManager';
|
||||
import type { IStreamEventManager } from './types';
|
||||
import type { IStreamEventManager, PublishAgentRuntimeEndParams } from './types';
|
||||
|
||||
const log = debug('lobe-server:agent-runtime:gateway-notifier');
|
||||
|
||||
@@ -78,26 +78,25 @@ export class GatewayStreamNotifier implements IStreamEventManager {
|
||||
return result;
|
||||
}
|
||||
|
||||
async publishAgentRuntimeEnd(
|
||||
operationId: string,
|
||||
stepIndex: number,
|
||||
finalState: any,
|
||||
reason?: string,
|
||||
reasonDetail?: string,
|
||||
): Promise<string> {
|
||||
const result = await this.inner.publishAgentRuntimeEnd(
|
||||
operationId,
|
||||
stepIndex,
|
||||
finalState,
|
||||
reason,
|
||||
reasonDetail,
|
||||
);
|
||||
async publishAgentRuntimeEnd(params: PublishAgentRuntimeEndParams): Promise<string> {
|
||||
const { operationId, stepIndex, finalState, reason, reasonDetail, uiMessages } = params;
|
||||
const result = await this.inner.publishAgentRuntimeEnd(params);
|
||||
|
||||
const effectiveReasonDetail = reasonDetail || getDefaultReasonDetail(finalState, reason);
|
||||
const errorType = finalState?.error?.type || finalState?.error?.errorType;
|
||||
|
||||
this.pushEvent(operationId, {
|
||||
data: { errorType, finalState, reason, reasonDetail: effectiveReasonDetail },
|
||||
// Forward `uiMessages` to the gateway push channel so terminal-state
|
||||
// clients consuming /push-event get the canonical UIChatMessage[]
|
||||
// snapshot — the final step has no later step_start to carry a fresh
|
||||
// snapshot, so dropping it here would break the SoT contract.
|
||||
data: {
|
||||
errorType,
|
||||
finalState,
|
||||
reason,
|
||||
reasonDetail: effectiveReasonDetail,
|
||||
...(uiMessages !== undefined && { uiMessages }),
|
||||
},
|
||||
operationId,
|
||||
stepIndex,
|
||||
timestamp: Date.now(),
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import debug from 'debug';
|
||||
|
||||
import { type StreamChunkData, type StreamEvent } from './StreamEventManager';
|
||||
import { type IStreamEventManager } from './types';
|
||||
import { type IStreamEventManager, type PublishAgentRuntimeEndParams } from './types';
|
||||
|
||||
const log = debug('lobe-server:agent-runtime:in-memory-stream-event-manager');
|
||||
|
||||
@@ -97,13 +97,14 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
|
||||
});
|
||||
}
|
||||
|
||||
async publishAgentRuntimeEnd(
|
||||
operationId: string,
|
||||
stepIndex: number,
|
||||
finalState: any,
|
||||
reason?: string,
|
||||
reasonDetail?: string,
|
||||
): Promise<string> {
|
||||
async publishAgentRuntimeEnd({
|
||||
operationId,
|
||||
stepIndex,
|
||||
finalState,
|
||||
reason,
|
||||
reasonDetail,
|
||||
uiMessages,
|
||||
}: PublishAgentRuntimeEndParams): Promise<string> {
|
||||
return this.publishStreamEvent(operationId, {
|
||||
data: {
|
||||
finalState,
|
||||
@@ -111,6 +112,7 @@ export class InMemoryStreamEventManager implements IStreamEventManager {
|
||||
phase: 'execution_complete',
|
||||
reason: reason || 'completed',
|
||||
reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason),
|
||||
...(uiMessages !== undefined && { uiMessages }),
|
||||
},
|
||||
stepIndex,
|
||||
type: 'agent_runtime_end',
|
||||
|
||||
@@ -272,6 +272,14 @@ export interface RuntimeExecutorContext {
|
||||
streamManager: IStreamEventManager;
|
||||
toolExecutionService: ToolExecutionService;
|
||||
topicId?: string;
|
||||
/**
|
||||
* Trace-pipeline sink for context engine input/output. Wired by
|
||||
* AgentRuntimeService so the trace recorder can pick CE data up
|
||||
* out-of-band, keeping the heavy CE payload (agentDocuments, systemRole, …)
|
||||
* out of the `events` array and therefore out of the Redis state pipeline.
|
||||
* See LOBE-9110.
|
||||
*/
|
||||
tracingContextEngine?: (input: unknown, output: unknown) => void;
|
||||
userId?: string;
|
||||
userTimezone?: string;
|
||||
}
|
||||
@@ -714,7 +722,7 @@ export const createRuntimeExecutors = (
|
||||
|
||||
processedMessages = await serverMessagesEngine(contextEngineInput);
|
||||
|
||||
// Emit context engine event for tracing
|
||||
// Hand context engine input/output to the trace sink out-of-band.
|
||||
// Omit large/redundant fields to reduce snapshot size:
|
||||
// - input.messages: reconstructible from step's messagesBaseline + messagesDelta
|
||||
// - input.toolsConfig: static per operation, ~47KB of manifests repeated every call_llm step
|
||||
@@ -724,14 +732,10 @@ export const createRuntimeExecutors = (
|
||||
toolsConfig: _toolsConfig,
|
||||
...contextEngineInputLite
|
||||
} = contextEngineInput;
|
||||
events.push({
|
||||
input: {
|
||||
...contextEngineInputLite,
|
||||
toolCount: _toolsConfig?.tools?.length ?? 0,
|
||||
},
|
||||
output: processedMessages,
|
||||
type: 'context_engine_result',
|
||||
} as any);
|
||||
ctx.tracingContextEngine?.(
|
||||
{ ...contextEngineInputLite, toolCount: _toolsConfig?.tools?.length ?? 0 },
|
||||
processedMessages,
|
||||
);
|
||||
} else {
|
||||
processedMessages = llmPayload.messages;
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import debug from 'debug';
|
||||
import { type Redis } from 'ioredis';
|
||||
|
||||
import { getAgentRuntimeRedisClient } from './redis';
|
||||
import { type PublishAgentRuntimeEndParams } from './types';
|
||||
|
||||
const log = debug('lobe-server:agent-runtime:stream-event-manager');
|
||||
const timing = debug('lobe-server:agent-runtime:timing');
|
||||
@@ -182,13 +183,14 @@ export class StreamEventManager {
|
||||
/**
|
||||
* Publish Agent runtime end event
|
||||
*/
|
||||
async publishAgentRuntimeEnd(
|
||||
operationId: string,
|
||||
stepIndex: number,
|
||||
finalState: any,
|
||||
reason?: string,
|
||||
reasonDetail?: string,
|
||||
): Promise<string> {
|
||||
async publishAgentRuntimeEnd({
|
||||
operationId,
|
||||
stepIndex,
|
||||
finalState,
|
||||
reason,
|
||||
reasonDetail,
|
||||
uiMessages,
|
||||
}: PublishAgentRuntimeEndParams): Promise<string> {
|
||||
return this.publishStreamEvent(operationId, {
|
||||
data: {
|
||||
finalState,
|
||||
@@ -196,6 +198,7 @@ export class StreamEventManager {
|
||||
phase: 'execution_complete',
|
||||
reason: reason || 'completed',
|
||||
reasonDetail: reasonDetail || getDefaultReasonDetail(finalState, reason),
|
||||
...(uiMessages !== undefined && { uiMessages }),
|
||||
},
|
||||
stepIndex,
|
||||
type: 'agent_runtime_end',
|
||||
|
||||
@@ -95,12 +95,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStateManager.saveAgentState).toHaveBeenCalledWith(operationId, newState);
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId,
|
||||
newState.stepCount,
|
||||
newState,
|
||||
'done',
|
||||
);
|
||||
reason: 'done',
|
||||
stepIndex: newState.stepCount,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status changes to error', async () => {
|
||||
@@ -112,12 +113,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId,
|
||||
newState.stepCount,
|
||||
newState,
|
||||
'error',
|
||||
);
|
||||
reason: 'error',
|
||||
stepIndex: newState.stepCount,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should fallback to previous stepCount when terminal state is missing stepCount', async () => {
|
||||
@@ -129,12 +131,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId,
|
||||
previousState.stepCount,
|
||||
newState,
|
||||
'error',
|
||||
);
|
||||
reason: 'error',
|
||||
stepIndex: previousState.stepCount,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status changes to interrupted', async () => {
|
||||
@@ -146,12 +149,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId,
|
||||
newState.stepCount,
|
||||
newState,
|
||||
'interrupted',
|
||||
);
|
||||
reason: 'interrupted',
|
||||
stepIndex: newState.stepCount,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status changes to waiting_for_human so the client releases its loading state', async () => {
|
||||
@@ -163,12 +167,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveAgentState(operationId, newState as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId,
|
||||
newState.stepCount,
|
||||
newState,
|
||||
'waiting_for_human',
|
||||
);
|
||||
reason: 'waiting_for_human',
|
||||
stepIndex: newState.stepCount,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should not publish end event when status was already done', async () => {
|
||||
@@ -214,12 +219,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
expect(mockStateManager.loadAgentState).toHaveBeenCalledWith(operationId);
|
||||
expect(mockStateManager.saveStepResult).toHaveBeenCalledWith(operationId, stepResult);
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
5,
|
||||
stepResult.newState,
|
||||
'done',
|
||||
);
|
||||
reason: 'done',
|
||||
stepIndex: 5,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status becomes error', async () => {
|
||||
@@ -234,12 +240,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveStepResult(operationId, stepResult as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
5,
|
||||
stepResult.newState,
|
||||
'error',
|
||||
);
|
||||
reason: 'error',
|
||||
stepIndex: 5,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should fallback to stepResult.stepIndex when terminal step result state is missing stepCount', async () => {
|
||||
@@ -254,12 +261,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveStepResult(operationId, stepResult as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
stepResult.stepIndex,
|
||||
stepResult.newState,
|
||||
'error',
|
||||
);
|
||||
reason: 'error',
|
||||
stepIndex: stepResult.stepIndex,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status becomes waiting_for_human (paused awaiting approval)', async () => {
|
||||
@@ -274,12 +282,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveStepResult(operationId, stepResult as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
4,
|
||||
stepResult.newState,
|
||||
'waiting_for_human',
|
||||
);
|
||||
reason: 'waiting_for_human',
|
||||
stepIndex: 4,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should publish end event when status becomes interrupted', async () => {
|
||||
@@ -294,12 +303,13 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
|
||||
await coordinator.saveStepResult(operationId, stepResult as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith(
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId,
|
||||
5,
|
||||
stepResult.newState,
|
||||
'interrupted',
|
||||
);
|
||||
reason: 'interrupted',
|
||||
stepIndex: 5,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('should not publish end event when status is not done', async () => {
|
||||
@@ -395,4 +405,105 @@ describe('AgentRuntimeCoordinator', () => {
|
||||
expect(result).toBe(expectedHistory);
|
||||
});
|
||||
});
|
||||
|
||||
// Terminal events should carry the canonical UIChatMessage[] snapshot
|
||||
// when a resolver is wired so the client can use the pushed payload as
|
||||
// Source of Truth instead of refetching from DB.
|
||||
describe('uiMessagesResolver on agent_runtime_end', () => {
|
||||
it('passes resolver result through saveAgentState terminal publish', async () => {
|
||||
const uiMessages = [{ id: 'msg_1', role: 'user' }] as any[];
|
||||
const resolver = vi.fn().mockResolvedValue(uiMessages);
|
||||
const coordinatorWithResolver = new AgentRuntimeCoordinator({
|
||||
stateManager: mockStateManager,
|
||||
streamEventManager: mockStreamManager,
|
||||
uiMessagesResolver: resolver,
|
||||
});
|
||||
|
||||
const previousState = { status: 'running', stepCount: 3 };
|
||||
const newState = { status: 'done', stepCount: 5 };
|
||||
mockStateManager.loadAgentState.mockResolvedValue(previousState);
|
||||
|
||||
await coordinatorWithResolver.saveAgentState('op-1', newState as any);
|
||||
|
||||
expect(resolver).toHaveBeenCalledWith(newState);
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId: 'op-1',
|
||||
reason: 'done',
|
||||
stepIndex: 5,
|
||||
uiMessages,
|
||||
});
|
||||
});
|
||||
|
||||
it('passes resolver result through saveStepResult terminal publish', async () => {
|
||||
const uiMessages = [{ id: 'msg_a', role: 'assistantGroup' }] as any[];
|
||||
const resolver = vi.fn().mockResolvedValue(uiMessages);
|
||||
const coordinatorWithResolver = new AgentRuntimeCoordinator({
|
||||
stateManager: mockStateManager,
|
||||
streamEventManager: mockStreamManager,
|
||||
uiMessagesResolver: resolver,
|
||||
});
|
||||
|
||||
const stepResult = {
|
||||
executionTime: 100,
|
||||
newState: { status: 'done', stepCount: 4 },
|
||||
stepIndex: 4,
|
||||
};
|
||||
mockStateManager.loadAgentState.mockResolvedValue({ status: 'running', stepCount: 3 });
|
||||
|
||||
await coordinatorWithResolver.saveStepResult('op-2', stepResult as any);
|
||||
|
||||
expect(resolver).toHaveBeenCalledWith(stepResult.newState);
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: stepResult.newState,
|
||||
operationId: 'op-2',
|
||||
reason: 'done',
|
||||
stepIndex: 4,
|
||||
uiMessages,
|
||||
});
|
||||
});
|
||||
|
||||
it('publishes with uiMessages=undefined when resolver rejects (must never fail the surrounding save)', async () => {
|
||||
const resolver = vi.fn().mockRejectedValue(new Error('db down'));
|
||||
const coordinatorWithResolver = new AgentRuntimeCoordinator({
|
||||
stateManager: mockStateManager,
|
||||
streamEventManager: mockStreamManager,
|
||||
uiMessagesResolver: resolver,
|
||||
});
|
||||
|
||||
const previousState = { status: 'running', stepCount: 3 };
|
||||
const newState = { status: 'error', stepCount: 5 };
|
||||
mockStateManager.loadAgentState.mockResolvedValue(previousState);
|
||||
|
||||
await coordinatorWithResolver.saveAgentState('op-3', newState as any);
|
||||
|
||||
expect(resolver).toHaveBeenCalled();
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId: 'op-3',
|
||||
reason: 'error',
|
||||
stepIndex: 5,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
|
||||
it('publishes with uiMessages=undefined when no resolver is wired (default constructor)', async () => {
|
||||
// The default `coordinator` from the outer beforeEach is constructed
|
||||
// without a resolver — proves the field is genuinely optional and
|
||||
// legacy call sites stay unaffected.
|
||||
const previousState = { status: 'running', stepCount: 3 };
|
||||
const newState = { status: 'done', stepCount: 5 };
|
||||
mockStateManager.loadAgentState.mockResolvedValue(previousState);
|
||||
|
||||
await coordinator.saveAgentState('op-4', newState as any);
|
||||
|
||||
expect(mockStreamManager.publishAgentRuntimeEnd).toHaveBeenCalledWith({
|
||||
finalState: newState,
|
||||
operationId: 'op-4',
|
||||
reason: 'done',
|
||||
stepIndex: 5,
|
||||
uiMessages: undefined,
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -131,21 +131,27 @@ describe('GatewayStreamNotifier', () => {
|
||||
it('delegates to inner and returns its result', async () => {
|
||||
const finalState = { status: 'done' };
|
||||
|
||||
const result = await notifier.publishAgentRuntimeEnd('op-1', 2, finalState, 'completed');
|
||||
const params = {
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
stepIndex: 2,
|
||||
};
|
||||
const result = await notifier.publishAgentRuntimeEnd(params);
|
||||
|
||||
expect(result).toBe('publishAgentRuntimeEnd-result');
|
||||
expect(inner.calls.publishAgentRuntimeEnd).toHaveLength(1);
|
||||
expect(inner.calls.publishAgentRuntimeEnd[0]).toEqual([
|
||||
'op-1',
|
||||
2,
|
||||
finalState,
|
||||
'completed',
|
||||
undefined,
|
||||
]);
|
||||
expect(inner.calls.publishAgentRuntimeEnd[0]).toEqual([params]);
|
||||
});
|
||||
|
||||
it('calls gateway push-event endpoint only (no update-status)', async () => {
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 2, {}, 'completed', 'All done');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState: {},
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
reasonDetail: 'All done',
|
||||
stepIndex: 2,
|
||||
});
|
||||
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
@@ -163,7 +169,12 @@ describe('GatewayStreamNotifier', () => {
|
||||
},
|
||||
};
|
||||
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'error',
|
||||
stepIndex: 0,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
@@ -176,7 +187,13 @@ describe('GatewayStreamNotifier', () => {
|
||||
error: { message: 'Some error' },
|
||||
};
|
||||
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error', 'Custom detail');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'error',
|
||||
reasonDetail: 'Custom detail',
|
||||
stepIndex: 0,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
@@ -189,7 +206,12 @@ describe('GatewayStreamNotifier', () => {
|
||||
error: { message: 'Budget exceeded', type: 'InsufficientBudgetForModel' },
|
||||
};
|
||||
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'error',
|
||||
stepIndex: 0,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
@@ -205,7 +227,12 @@ describe('GatewayStreamNotifier', () => {
|
||||
},
|
||||
};
|
||||
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 0, finalState, 'error');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'error',
|
||||
stepIndex: 0,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
@@ -214,13 +241,49 @@ describe('GatewayStreamNotifier', () => {
|
||||
});
|
||||
|
||||
it('errorType is undefined when no error in finalState', async () => {
|
||||
await notifier.publishAgentRuntimeEnd('op-1', 0, { status: 'done' }, 'completed');
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState: { status: 'done' },
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
stepIndex: 0,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
const body = JSON.parse(pushCall![1].body);
|
||||
expect(body.event.data.errorType).toBeUndefined();
|
||||
});
|
||||
|
||||
it('forwards uiMessages to the gateway push payload when provided', async () => {
|
||||
const uiMessages = [{ id: 'msg_z', role: 'assistantGroup' }] as any;
|
||||
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState: { status: 'done' },
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
stepIndex: 4,
|
||||
uiMessages,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
const body = JSON.parse(pushCall![1].body);
|
||||
expect(body.event.data.uiMessages).toEqual(uiMessages);
|
||||
});
|
||||
|
||||
it('omits uiMessages from the gateway push payload when not provided', async () => {
|
||||
await notifier.publishAgentRuntimeEnd({
|
||||
finalState: { status: 'done' },
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
stepIndex: 4,
|
||||
});
|
||||
await new Promise((r) => setTimeout(r, 50));
|
||||
|
||||
const pushCall = mockFetch.mock.calls.find((c: any[]) => c[0].includes('push-event'));
|
||||
const body = JSON.parse(pushCall![1].body);
|
||||
expect(body.event.data).not.toHaveProperty('uiMessages');
|
||||
});
|
||||
});
|
||||
|
||||
// ─── Read/subscribe methods: must delegate directly to inner ───
|
||||
@@ -306,7 +369,12 @@ describe('GatewayStreamNotifier', () => {
|
||||
() => new Promise((_, reject) => setTimeout(() => reject(new Error('timeout')), 10)),
|
||||
);
|
||||
|
||||
const result = await notifier.publishAgentRuntimeEnd('op-1', 0, {}, 'completed');
|
||||
const result = await notifier.publishAgentRuntimeEnd({
|
||||
finalState: {},
|
||||
operationId: 'op-1',
|
||||
reason: 'completed',
|
||||
stepIndex: 0,
|
||||
});
|
||||
|
||||
expect(result).toBe('publishAgentRuntimeEnd-result');
|
||||
expect(inner.calls.publishAgentRuntimeEnd).toHaveLength(1);
|
||||
|
||||
@@ -157,4 +157,46 @@ describe('InMemoryStreamEventManager', () => {
|
||||
expect(manager.getAllEvents('op-1')).toHaveLength(0);
|
||||
});
|
||||
});
|
||||
|
||||
// agent_runtime_end optionally carries the canonical UIChatMessage[]
|
||||
// snapshot so the client can use the pushed payload as Source of Truth
|
||||
// instead of refetching from DB.
|
||||
describe('publishAgentRuntimeEnd uiMessages', () => {
|
||||
it('includes uiMessages in event data when provided', async () => {
|
||||
const uiMessages = [
|
||||
{ id: 'msg_u', role: 'user' },
|
||||
{ id: 'msg_a', role: 'assistantGroup' },
|
||||
] as any[];
|
||||
const finalState = { status: 'done', stepCount: 3 };
|
||||
|
||||
await manager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'done',
|
||||
stepIndex: 3,
|
||||
uiMessages,
|
||||
});
|
||||
|
||||
const events = manager.getAllEvents('op-1');
|
||||
expect(events).toHaveLength(1);
|
||||
expect(events[0].type).toBe('agent_runtime_end');
|
||||
expect(events[0].data.uiMessages).toEqual(uiMessages);
|
||||
expect(events[0].data.finalState).toEqual(finalState);
|
||||
});
|
||||
|
||||
it('omits uiMessages when not provided (legacy callers stay unaffected)', async () => {
|
||||
const finalState = { status: 'done', stepCount: 3 };
|
||||
|
||||
await manager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId: 'op-1',
|
||||
reason: 'done',
|
||||
stepIndex: 3,
|
||||
});
|
||||
|
||||
const events = manager.getAllEvents('op-1');
|
||||
expect(events[0].data).not.toHaveProperty('uiMessages');
|
||||
expect(events[0].data.finalState).toEqual(finalState);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -75,7 +75,11 @@ describe('StreamEventManager', () => {
|
||||
|
||||
mockRedis.xadd.mockResolvedValue('event-id-456');
|
||||
|
||||
const result = await streamManager.publishAgentRuntimeEnd(operationId, stepIndex, finalState);
|
||||
const result = await streamManager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId,
|
||||
stepIndex,
|
||||
});
|
||||
|
||||
expect(result).toBe('event-id-456');
|
||||
expect(mockRedis.xadd).toHaveBeenCalledWith(
|
||||
@@ -103,6 +107,50 @@ describe('StreamEventManager', () => {
|
||||
);
|
||||
});
|
||||
|
||||
// agent_runtime_end optionally carries the canonical UIChatMessage[]
|
||||
// snapshot so the client can use the pushed payload as Source of Truth
|
||||
// instead of refetching from DB.
|
||||
it('should include uiMessages in serialized data when provided', async () => {
|
||||
const operationId = 'test-operation-id';
|
||||
const stepIndex = 5;
|
||||
const finalState = { status: 'done', stepCount: 5 };
|
||||
const uiMessages = [{ id: 'msg_a', role: 'assistantGroup' }] as any;
|
||||
|
||||
mockRedis.xadd.mockResolvedValue('event-id-ui');
|
||||
|
||||
await streamManager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId,
|
||||
reason: 'done',
|
||||
stepIndex,
|
||||
uiMessages,
|
||||
});
|
||||
|
||||
// Find the serialized `data` argument inline so this test stays robust
|
||||
// if other positional args shift around.
|
||||
const dataArg = mockRedis.xadd.mock.calls[0]?.find(
|
||||
(a: any) => typeof a === 'string' && a.startsWith('{'),
|
||||
);
|
||||
const parsed = JSON.parse(dataArg);
|
||||
expect(parsed.uiMessages).toEqual(uiMessages);
|
||||
expect(parsed.finalState).toEqual(finalState);
|
||||
});
|
||||
|
||||
it('should omit uiMessages from serialized data when not provided', async () => {
|
||||
const operationId = 'test-operation-id';
|
||||
const finalState = { status: 'done', stepCount: 3 };
|
||||
|
||||
mockRedis.xadd.mockResolvedValue('event-id-noui');
|
||||
|
||||
await streamManager.publishAgentRuntimeEnd({ finalState, operationId, stepIndex: 3 });
|
||||
|
||||
const dataArg = mockRedis.xadd.mock.calls[0]?.find(
|
||||
(a: any) => typeof a === 'string' && a.startsWith('{'),
|
||||
);
|
||||
const parsed = JSON.parse(dataArg);
|
||||
expect(parsed).not.toHaveProperty('uiMessages');
|
||||
});
|
||||
|
||||
it('should accept custom reason and reasonDetail', async () => {
|
||||
const operationId = 'test-operation-id';
|
||||
const stepIndex = 3;
|
||||
@@ -112,13 +160,13 @@ describe('StreamEventManager', () => {
|
||||
|
||||
mockRedis.xadd.mockResolvedValue('event-id-789');
|
||||
|
||||
await streamManager.publishAgentRuntimeEnd(
|
||||
operationId,
|
||||
stepIndex,
|
||||
await streamManager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId,
|
||||
reason,
|
||||
reasonDetail,
|
||||
);
|
||||
stepIndex,
|
||||
});
|
||||
|
||||
expect(mockRedis.xadd).toHaveBeenCalledWith(
|
||||
expect.any(String),
|
||||
@@ -158,7 +206,12 @@ describe('StreamEventManager', () => {
|
||||
|
||||
mockRedis.xadd.mockResolvedValue('event-id-790');
|
||||
|
||||
await streamManager.publishAgentRuntimeEnd(operationId, stepIndex, finalState, 'error');
|
||||
await streamManager.publishAgentRuntimeEnd({
|
||||
finalState,
|
||||
operationId,
|
||||
reason: 'error',
|
||||
stepIndex,
|
||||
});
|
||||
|
||||
expect(mockRedis.xadd).toHaveBeenCalledWith(
|
||||
expect.any(String),
|
||||
|
||||
@@ -1,9 +1,24 @@
|
||||
import type { ToolExecuteData } from '@lobechat/agent-gateway-client';
|
||||
import { type AgentState } from '@lobechat/agent-runtime';
|
||||
import { type UIChatMessage } from '@lobechat/types';
|
||||
|
||||
import { type AgentOperationMetadata, type StepResult } from './AgentStateManager';
|
||||
import { type StreamChunkData, type StreamEvent } from './StreamEventManager';
|
||||
|
||||
export interface PublishAgentRuntimeEndParams {
|
||||
finalState: any;
|
||||
operationId: string;
|
||||
reason?: string;
|
||||
reasonDetail?: string;
|
||||
stepIndex: number;
|
||||
/**
|
||||
* Canonical UIChatMessage[] snapshot of the topic at terminal-state time.
|
||||
* When present, the client uses this directly as Source of Truth instead
|
||||
* of refetching from DB.
|
||||
*/
|
||||
uiMessages?: UIChatMessage[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Agent State Manager Interface
|
||||
* Abstract interface for state persistence, supports Redis and in-memory implementations
|
||||
@@ -114,15 +129,14 @@ export interface IStreamEventManager {
|
||||
getStreamHistory: (operationId: string, count?: number) => Promise<StreamEvent[]>;
|
||||
|
||||
/**
|
||||
* Publish Agent runtime end event
|
||||
* Publish Agent runtime end event.
|
||||
*
|
||||
* `uiMessages` is the canonical UIChatMessage[] snapshot of the topic at
|
||||
* terminal-state time so the client can use the pushed payload as Source
|
||||
* of Truth instead of refetching from DB. Optional: callers without DB
|
||||
* access may omit it and the client falls back to its existing behaviour.
|
||||
*/
|
||||
publishAgentRuntimeEnd: (
|
||||
operationId: string,
|
||||
stepIndex: number,
|
||||
finalState: any,
|
||||
reason?: string,
|
||||
reasonDetail?: string,
|
||||
) => Promise<string>;
|
||||
publishAgentRuntimeEnd: (params: PublishAgentRuntimeEndParams) => Promise<string>;
|
||||
|
||||
/**
|
||||
* Publish Agent runtime initialization event
|
||||
|
||||
@@ -201,7 +201,7 @@ export const createServerAgentToolsEngine = (
|
||||
// Always-on builtin tools
|
||||
...Object.fromEntries(alwaysOnToolIds.map((id) => [id, true])),
|
||||
// System-level rules (may override user selection for specific tools)
|
||||
[CloudSandboxManifest.identifier]: runtimeMode === 'cloud',
|
||||
[CloudSandboxManifest.identifier]: runtimeMode === 'cloud' || runtimeMode === 'sandbox',
|
||||
[KnowledgeBaseManifest.identifier]: hasEnabledKnowledgeBases,
|
||||
// Local-system: gated by `canUseDevice` (resolveDeviceAccessPolicy)
|
||||
// first — keeps external bot senders out before runtime checks even
|
||||
|
||||
@@ -132,13 +132,13 @@ export const agentNotifyRouter = router({
|
||||
const stream = getStreamManager();
|
||||
if (done) {
|
||||
// Signal task completion — frontend gateway WS subscription closes.
|
||||
await stream.publishAgentRuntimeEnd(
|
||||
remoteOperationId,
|
||||
0,
|
||||
{ reason: 'success' },
|
||||
'success',
|
||||
'Remote hetero agent task completed',
|
||||
);
|
||||
await stream.publishAgentRuntimeEnd({
|
||||
finalState: { reason: 'success' },
|
||||
operationId: remoteOperationId,
|
||||
reason: 'success',
|
||||
reasonDetail: 'Remote hetero agent task completed',
|
||||
stepIndex: 0,
|
||||
});
|
||||
} else {
|
||||
// Lightweight invalidation — frontend calls fetchAndReplaceMessages.
|
||||
await stream.publishStreamEvent(remoteOperationId, {
|
||||
|
||||
@@ -1478,4 +1478,55 @@ describe('AgentRuntimeService', () => {
|
||||
expect(mockCoordinator.saveAgentState).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
// Stream events at step / operation boundaries should carry the canonical
|
||||
// UIChatMessage[] snapshot so the client can use the pushed payload as
|
||||
// Source of Truth instead of refetching from DB.
|
||||
describe('queryUiMessages', () => {
|
||||
const stubMessageService = (svc: any, queryMessages: ReturnType<typeof vi.fn>) => {
|
||||
svc.messageServiceInstance = { queryMessages };
|
||||
};
|
||||
|
||||
it('returns messageService.queryMessages result when agentId + topicId are present', async () => {
|
||||
const stubMessages = [{ id: 'msg_x', role: 'user' }];
|
||||
const queryMessages = vi.fn().mockResolvedValue(stubMessages);
|
||||
stubMessageService(service, queryMessages);
|
||||
|
||||
const result = await service.queryUiMessages({
|
||||
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
|
||||
} as any);
|
||||
|
||||
expect(queryMessages).toHaveBeenCalledWith({ agentId: 'agt_1', topicId: 'tpc_1' });
|
||||
expect(result).toEqual(stubMessages);
|
||||
});
|
||||
|
||||
it('returns undefined when agentId or topicId is missing (skips empty-array push)', async () => {
|
||||
const queryMessages = vi.fn();
|
||||
stubMessageService(service, queryMessages);
|
||||
|
||||
const noAgent = await service.queryUiMessages({
|
||||
metadata: { topicId: 'tpc_1' },
|
||||
} as any);
|
||||
const noTopic = await service.queryUiMessages({
|
||||
metadata: { agentId: 'agt_1' },
|
||||
} as any);
|
||||
const noMeta = await service.queryUiMessages({} as any);
|
||||
|
||||
expect(noAgent).toBeUndefined();
|
||||
expect(noTopic).toBeUndefined();
|
||||
expect(noMeta).toBeUndefined();
|
||||
expect(queryMessages).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns undefined and never throws when DB query fails (stream events must not fail the step)', async () => {
|
||||
const queryMessages = vi.fn().mockRejectedValue(new Error('db down'));
|
||||
stubMessageService(service, queryMessages);
|
||||
|
||||
const result = await service.queryUiMessages({
|
||||
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
|
||||
} as any);
|
||||
|
||||
expect(result).toBeUndefined();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -13,6 +13,7 @@ import {
|
||||
ChatErrorType,
|
||||
type ChatMessageError,
|
||||
type ExecSubAgentTaskParams,
|
||||
type UIChatMessage,
|
||||
} from '@lobechat/types';
|
||||
import debug from 'debug';
|
||||
import urlJoin from 'url-join';
|
||||
@@ -30,6 +31,7 @@ import { type IStreamEventManager } from '@/server/modules/AgentRuntime/types';
|
||||
import { emitAgentSignalSourceEvent } from '@/server/services/agentSignal';
|
||||
import { toAgentSignalTraceEvents } from '@/server/services/agentSignal/observability/traceEvents';
|
||||
import { mcpService } from '@/server/services/mcp';
|
||||
import { MessageService } from '@/server/services/message';
|
||||
import { QueueService } from '@/server/services/queue';
|
||||
import { LocalQueueServiceImpl } from '@/server/services/queue/impls';
|
||||
import { ToolExecutionService } from '@/server/services/toolExecution';
|
||||
@@ -182,6 +184,17 @@ export class AgentRuntimeService {
|
||||
private serverDB: LobeChatDatabase;
|
||||
private userId: string;
|
||||
private messageModel: MessageModel;
|
||||
// Lazily constructed because MessageService instantiates a FileService
|
||||
// which eagerly creates the S3 client and throws when S3 env vars are
|
||||
// missing — eager construction would break every test that builds an
|
||||
// AgentRuntimeService without mocking the file backend.
|
||||
private messageServiceInstance?: MessageService;
|
||||
private get messageService(): MessageService {
|
||||
if (!this.messageServiceInstance) {
|
||||
this.messageServiceInstance = new MessageService(this.serverDB, this.userId);
|
||||
}
|
||||
return this.messageServiceInstance;
|
||||
}
|
||||
|
||||
constructor(db: LobeChatDatabase, userId: string, options?: AgentRuntimeServiceOptions) {
|
||||
// Use factory function to auto-select Redis or InMemory implementation
|
||||
@@ -192,6 +205,10 @@ export class AgentRuntimeService {
|
||||
this.coordinator = new AgentRuntimeCoordinator({
|
||||
...options?.coordinatorOptions,
|
||||
streamEventManager: this.streamManager,
|
||||
// Provide the canonical UIChatMessage[] for terminal-state events so
|
||||
// the client can use the pushed payload directly instead of refetching
|
||||
// from DB. Falls back gracefully when topicId isn't set.
|
||||
uiMessagesResolver: (state) => this.queryUiMessages(state),
|
||||
});
|
||||
this.queueService =
|
||||
options?.queueService === null ? null : (options?.queueService ?? new QueueService());
|
||||
@@ -474,6 +491,31 @@ export class AgentRuntimeService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Query the canonical UIChatMessage[] snapshot for the active topic — the
|
||||
* same shape the `message.getMessages` trpc lambda returns to the client.
|
||||
* Attached to step_start / agent_runtime_end stream events so the client
|
||||
* can use the pushed payload directly instead of refetching from DB.
|
||||
*
|
||||
* Returns undefined when the topic isn't known yet (e.g. very early in
|
||||
* bootstrap before the topic row has been committed) so callers can skip
|
||||
* the `uiMessages` field entirely instead of pushing an empty array.
|
||||
*/
|
||||
async queryUiMessages(agentState: AgentState): Promise<UIChatMessage[] | undefined> {
|
||||
const agentId: string | undefined = agentState?.metadata?.agentId;
|
||||
const topicId: string | undefined = agentState?.metadata?.topicId;
|
||||
if (!agentId || !topicId) return undefined;
|
||||
|
||||
try {
|
||||
return await this.messageService.queryMessages({ agentId, topicId });
|
||||
} catch (error) {
|
||||
// Stream events must never fail the step. If the DB hiccups, fall back
|
||||
// to letting the client refresh as before.
|
||||
console.error('[queryUiMessages] Failed to load uiMessages snapshot: %O', error);
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute Agent step
|
||||
*/
|
||||
@@ -515,20 +557,27 @@ export class AgentRuntimeService {
|
||||
try {
|
||||
log('[%s][%d] Start step executing...', operationId, stepIndex);
|
||||
|
||||
// Publish step start event
|
||||
await this.streamManager.publishStreamEvent(operationId, {
|
||||
data: {},
|
||||
stepIndex,
|
||||
type: 'step_start',
|
||||
});
|
||||
|
||||
// Get operation state and metadata
|
||||
// Load agent state BEFORE publishing step_start so we can attach the
|
||||
// canonical UIChatMessage snapshot to the event payload. step_start
|
||||
// fires after the previous step's DB writes are awaited durable, so
|
||||
// the snapshot query here reflects strongly-consistent state — that's
|
||||
// the contract that lets the client treat the pushed uiMessages as
|
||||
// the source of truth instead of doing its own refetch.
|
||||
const agentState = await this.coordinator.loadAgentState(operationId);
|
||||
|
||||
if (!agentState) {
|
||||
throw new Error(`Agent state not found for operation ${operationId}`);
|
||||
}
|
||||
|
||||
const stepStartUiMessages = await this.queryUiMessages(agentState);
|
||||
await this.streamManager.publishStreamEvent(operationId, {
|
||||
data: {
|
||||
...(stepStartUiMessages !== undefined && { uiMessages: stepStartUiMessages }),
|
||||
},
|
||||
stepIndex,
|
||||
type: 'step_start',
|
||||
});
|
||||
|
||||
agentState.metadata = {
|
||||
...agentState.metadata,
|
||||
externalRetryCount,
|
||||
@@ -622,6 +671,14 @@ export class AgentRuntimeService {
|
||||
log('[%s] beforeStep hook dispatch error: %O', operationId, hookError);
|
||||
}
|
||||
|
||||
// Per-step buffer for context engine input/output. Populated by the
|
||||
// `tracingContextEngine` callback passed into the executor context;
|
||||
// consumed by traceRecorder.appendStep below. Routing CE this way keeps
|
||||
// its heavy payload (agentDocuments, systemRole, …) out of
|
||||
// `stepResult.events` and therefore out of the Redis state pipeline.
|
||||
// See LOBE-9110.
|
||||
let contextEnginePayload: { input: unknown; output: unknown } | undefined;
|
||||
|
||||
// Create Agent and Runtime instances
|
||||
// Use agentState.metadata which contains the full app context (topicId, agentId, etc.)
|
||||
// operationMetadata only contains basic fields (agentConfig, modelRuntimeConfig, userId)
|
||||
@@ -629,6 +686,9 @@ export class AgentRuntimeService {
|
||||
metadata: agentState?.metadata,
|
||||
operationId,
|
||||
stepIndex,
|
||||
tracingContextEngine: (input, output) => {
|
||||
contextEnginePayload = { input, output };
|
||||
},
|
||||
});
|
||||
|
||||
// Handle human intervention
|
||||
@@ -800,6 +860,7 @@ export class AgentRuntimeService {
|
||||
afterStepSignalEvents,
|
||||
agentState,
|
||||
beforeStepSignalEvents,
|
||||
contextEngine: contextEnginePayload,
|
||||
currentContext,
|
||||
externalRetryCount,
|
||||
presentation: stepPresentationData,
|
||||
@@ -1337,10 +1398,12 @@ export class AgentRuntimeService {
|
||||
metadata,
|
||||
operationId,
|
||||
stepIndex,
|
||||
tracingContextEngine,
|
||||
}: {
|
||||
metadata?: any;
|
||||
operationId: string;
|
||||
stepIndex: number;
|
||||
tracingContextEngine?: (input: unknown, output: unknown) => void;
|
||||
}) {
|
||||
const contextWindowTokens =
|
||||
metadata?.modelRuntimeConfig?.model && metadata?.modelRuntimeConfig?.provider
|
||||
@@ -1386,6 +1449,7 @@ export class AgentRuntimeService {
|
||||
streamManager: this.streamManager,
|
||||
toolExecutionService: this.toolExecutionService,
|
||||
topicId: metadata?.topicId,
|
||||
tracingContextEngine,
|
||||
userId: metadata?.userId,
|
||||
};
|
||||
|
||||
|
||||
@@ -15,6 +15,13 @@ export interface AppendStepParams {
|
||||
*/
|
||||
agentState: any;
|
||||
beforeStepSignalEvents: SignalEvent[];
|
||||
/**
|
||||
* Context engine input/output captured for this step. Delivered via
|
||||
* `RuntimeExecutorContext.tracingContextEngine` rather than through the
|
||||
* `events` array, so CE payloads (agentDocuments, systemRole, …) stay out
|
||||
* of the Redis state pipeline. See LOBE-9110.
|
||||
*/
|
||||
contextEngine?: { input?: unknown; output?: unknown };
|
||||
currentContext?: { payload?: unknown; phase?: string; stepContext?: unknown };
|
||||
externalRetryCount: number;
|
||||
presentation: StepPresentationData;
|
||||
@@ -220,6 +227,7 @@ export class OperationTraceRecorder {
|
||||
agentState,
|
||||
afterStepSignalEvents,
|
||||
beforeStepSignalEvents,
|
||||
contextEngine: ceInput,
|
||||
currentContext,
|
||||
externalRetryCount,
|
||||
presentation,
|
||||
@@ -237,21 +245,20 @@ export class OperationTraceRecorder {
|
||||
const isBaseline = stepIndex === 0 || isCompression;
|
||||
const messagesDelta = afterMessages.slice(prevMessages.length);
|
||||
|
||||
// Extract context_engine_result into contextEngine (dedicated typed field).
|
||||
// CE data is structural state, not a streaming event — it lives separately
|
||||
// from events and uses the same delta pattern as messagesBaseline/messagesDelta.
|
||||
const rawEvents = (stepResult.events as any[]) ?? [];
|
||||
const ceEvent = rawEvents.find((e: any) => e.type === 'context_engine_result') as any;
|
||||
const contextEngine: StepSnapshot['contextEngine'] = ceEvent
|
||||
? { input: ceEvent.input, output: ceEvent.output }
|
||||
// CE data is structural state, not a streaming event — delivered via the
|
||||
// typed `contextEngine` field on AppendStepParams (sourced from
|
||||
// RuntimeExecutorContext.tracingContextEngine). Uses the same delta
|
||||
// pattern as messagesBaseline/messagesDelta.
|
||||
const contextEngine: StepSnapshot['contextEngine'] = ceInput
|
||||
? { input: ceInput.input, output: ceInput.output }
|
||||
: undefined;
|
||||
|
||||
// Strip heavy/redundant data from events before persisting to snapshot.
|
||||
// context_engine_result is excluded — stored in contextEngine instead.
|
||||
const rawEvents = (stepResult.events as any[]) ?? [];
|
||||
const snapshotEvents = [
|
||||
...beforeStepSignalEvents,
|
||||
...rawEvents
|
||||
.filter((e) => e.type !== 'llm_stream' && e.type !== 'context_engine_result')
|
||||
.filter((e) => e.type !== 'llm_stream')
|
||||
.map((e) => {
|
||||
if (e.type === 'done' && e.finalState) {
|
||||
// Remove reconstructible fields from finalState:
|
||||
|
||||
@@ -358,26 +358,29 @@ describe('OperationTraceRecorder', () => {
|
||||
recorder = new OperationTraceRecorder(store as any);
|
||||
});
|
||||
|
||||
const buildCeEvent = (overrides: Record<string, unknown> = {}) => ({
|
||||
const buildCe = (overrides: { input?: unknown; output?: unknown } = {}) => ({
|
||||
input: { messages: ['hello'] },
|
||||
output: { tokens: 42 },
|
||||
type: 'context_engine_result',
|
||||
...overrides,
|
||||
});
|
||||
|
||||
const appendStepWithCe = (ceEvent: Record<string, unknown>, prevStepsInStore: any[]) => {
|
||||
const appendStepWithCe = (
|
||||
ce: { input?: unknown; output?: unknown },
|
||||
prevStepsInStore: any[],
|
||||
) => {
|
||||
store.loadPartial.mockResolvedValue({ startedAt: 1, steps: prevStepsInStore });
|
||||
return recorder.appendStep('op-ce', {
|
||||
afterStepSignalEvents: [],
|
||||
agentState: { messages: [] },
|
||||
beforeStepSignalEvents: [],
|
||||
contextEngine: ce,
|
||||
currentContext: { phase: 'user_input' },
|
||||
externalRetryCount: 0,
|
||||
presentation: buildPresentation(),
|
||||
startedAt: 1000,
|
||||
stepIndex: prevStepsInStore.length,
|
||||
stepResult: {
|
||||
events: [ceEvent],
|
||||
events: [],
|
||||
newState: { activatedStepTools: [], messages: [] },
|
||||
},
|
||||
});
|
||||
@@ -388,8 +391,8 @@ describe('OperationTraceRecorder', () => {
|
||||
return saved.steps.find((s: any) => s.stepIndex === newStepIndex);
|
||||
};
|
||||
|
||||
it('extracts CE event into contextEngine, not in events', async () => {
|
||||
await appendStepWithCe(buildCeEvent(), []);
|
||||
it('routes CE input/output into contextEngine, never into events', async () => {
|
||||
await appendStepWithCe(buildCe(), []);
|
||||
|
||||
const step = getSavedStep(0);
|
||||
// CE data lives in contextEngine, not events
|
||||
@@ -398,7 +401,7 @@ describe('OperationTraceRecorder', () => {
|
||||
});
|
||||
|
||||
it('keeps both input and output on the first step (no previous CE to compare against)', async () => {
|
||||
await appendStepWithCe(buildCeEvent(), []);
|
||||
await appendStepWithCe(buildCe(), []);
|
||||
|
||||
const step = getSavedStep(0);
|
||||
expect(step.contextEngine.input).toEqual({ messages: ['hello'] });
|
||||
@@ -411,7 +414,7 @@ describe('OperationTraceRecorder', () => {
|
||||
stepIndex: 0,
|
||||
stepType: 'call_llm',
|
||||
};
|
||||
await appendStepWithCe(buildCeEvent(), [prevStep]);
|
||||
await appendStepWithCe(buildCe(), [prevStep]);
|
||||
|
||||
const step = getSavedStep(1);
|
||||
expect(step.contextEngine.input).toBeUndefined();
|
||||
@@ -424,10 +427,9 @@ describe('OperationTraceRecorder', () => {
|
||||
stepIndex: 0,
|
||||
stepType: 'call_llm',
|
||||
};
|
||||
await appendStepWithCe(
|
||||
buildCeEvent({ input: { messages: ['hello'] }, output: { tokens: 99 } }),
|
||||
[prevStep],
|
||||
);
|
||||
await appendStepWithCe(buildCe({ input: { messages: ['hello'] }, output: { tokens: 99 } }), [
|
||||
prevStep,
|
||||
]);
|
||||
|
||||
const step = getSavedStep(1);
|
||||
expect(step.contextEngine.input).toBeUndefined();
|
||||
@@ -440,10 +442,9 @@ describe('OperationTraceRecorder', () => {
|
||||
stepIndex: 0,
|
||||
stepType: 'call_llm',
|
||||
};
|
||||
await appendStepWithCe(
|
||||
buildCeEvent({ input: { messages: ['world'] }, output: { tokens: 42 } }),
|
||||
[prevStep],
|
||||
);
|
||||
await appendStepWithCe(buildCe({ input: { messages: ['world'] }, output: { tokens: 42 } }), [
|
||||
prevStep,
|
||||
]);
|
||||
|
||||
const step = getSavedStep(1);
|
||||
expect(step.contextEngine.input).toEqual({ messages: ['world'] });
|
||||
@@ -456,10 +457,9 @@ describe('OperationTraceRecorder', () => {
|
||||
stepIndex: 0,
|
||||
stepType: 'call_llm',
|
||||
};
|
||||
await appendStepWithCe(
|
||||
buildCeEvent({ input: { messages: ['new'] }, output: { tokens: 20 } }),
|
||||
[prevStep],
|
||||
);
|
||||
await appendStepWithCe(buildCe({ input: { messages: ['new'] }, output: { tokens: 20 } }), [
|
||||
prevStep,
|
||||
]);
|
||||
|
||||
const step = getSavedStep(1);
|
||||
expect(step.contextEngine.input).toEqual({ messages: ['new'] });
|
||||
@@ -476,7 +476,7 @@ describe('OperationTraceRecorder', () => {
|
||||
// step 1 has no contextEngine — dedup must skip it and walk back to step 0
|
||||
{ stepIndex: 1, stepType: 'call_tool' },
|
||||
];
|
||||
await appendStepWithCe(buildCeEvent(), prevSteps);
|
||||
await appendStepWithCe(buildCe(), prevSteps);
|
||||
|
||||
const step = getSavedStep(2);
|
||||
expect(step.contextEngine.input).toBeUndefined();
|
||||
@@ -492,7 +492,7 @@ describe('OperationTraceRecorder', () => {
|
||||
{ contextEngine: { output: { tokens: 42 } }, stepIndex: 1, stepType: 'call_llm' },
|
||||
];
|
||||
await appendStepWithCe(
|
||||
buildCeEvent({ input: { messages: ['hello'] }, output: { tokens: 42 } }),
|
||||
buildCe({ input: { messages: ['hello'] }, output: { tokens: 42 } }),
|
||||
prevSteps,
|
||||
);
|
||||
|
||||
@@ -501,7 +501,7 @@ describe('OperationTraceRecorder', () => {
|
||||
expect(step.contextEngine.output).toBeUndefined();
|
||||
});
|
||||
|
||||
it('sets contextEngine to undefined when the step has no context_engine_result event', async () => {
|
||||
it('sets contextEngine to undefined when no CE was recorded for the step', async () => {
|
||||
const prevStep = {
|
||||
contextEngine: { input: { messages: ['hello'] }, output: { tokens: 42 } },
|
||||
stepIndex: 0,
|
||||
|
||||
@@ -898,3 +898,82 @@ describe('AgentRuntimeService.executeStep - error-path snapshot finalize ()', ()
|
||||
dispatchSpy.mockRestore();
|
||||
});
|
||||
});
|
||||
|
||||
// step_start event should carry the canonical UIChatMessage[] so the
|
||||
// client can use the pushed payload as Source of Truth.
|
||||
describe('AgentRuntimeService.executeStep - step_start uiMessages payload', () => {
|
||||
const createService = () => {
|
||||
return new AgentRuntimeService({} as any, 'user-1', { queueService: null });
|
||||
};
|
||||
|
||||
it('attaches uiMessages to step_start data when topic context is known', async () => {
|
||||
const service = createService();
|
||||
const coordinator = (service as any).coordinator;
|
||||
const streamManager = (service as any).streamManager;
|
||||
|
||||
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
|
||||
// Force early-exit path so we don't need to mock the entire runtime
|
||||
// execution surface — terminal-state short-circuits right after
|
||||
// step_start publishes.
|
||||
coordinator.loadAgentState = vi.fn().mockResolvedValue({
|
||||
status: 'done',
|
||||
stepCount: 3,
|
||||
lastModified: new Date().toISOString(),
|
||||
metadata: { agentId: 'agt_1', topicId: 'tpc_1' },
|
||||
});
|
||||
streamManager.publishStreamEvent = vi.fn().mockResolvedValue(undefined);
|
||||
|
||||
// Inject a uiMessages-returning messageService — the runtime queries
|
||||
// through MessageService (not the bare messageModel) so that file URLs
|
||||
// go through FileService postProcessUrl.
|
||||
const stubMessages = [{ id: 'msg_1', role: 'user' }];
|
||||
(service as any).messageServiceInstance = {
|
||||
queryMessages: vi.fn().mockResolvedValue(stubMessages),
|
||||
};
|
||||
|
||||
await service.executeStep({
|
||||
operationId: 'op-uimsg',
|
||||
stepIndex: 5,
|
||||
context: { phase: 'user_input' } as any,
|
||||
});
|
||||
|
||||
// First publish call is step_start; assert its payload carries uiMessages.
|
||||
const stepStartCall = streamManager.publishStreamEvent.mock.calls.find(
|
||||
([, evt]: any) => evt?.type === 'step_start',
|
||||
);
|
||||
expect(stepStartCall).toBeDefined();
|
||||
expect(stepStartCall[1].data.uiMessages).toEqual(stubMessages);
|
||||
});
|
||||
|
||||
it('omits uiMessages from step_start data when topic context is unknown', async () => {
|
||||
const service = createService();
|
||||
const coordinator = (service as any).coordinator;
|
||||
const streamManager = (service as any).streamManager;
|
||||
|
||||
coordinator.tryClaimStep = vi.fn().mockResolvedValue(true);
|
||||
coordinator.loadAgentState = vi.fn().mockResolvedValue({
|
||||
status: 'done',
|
||||
stepCount: 3,
|
||||
lastModified: new Date().toISOString(),
|
||||
metadata: {}, // no agentId/topicId
|
||||
});
|
||||
streamManager.publishStreamEvent = vi.fn().mockResolvedValue(undefined);
|
||||
|
||||
const queryMock = vi.fn();
|
||||
(service as any).messageServiceInstance = { queryMessages: queryMock };
|
||||
|
||||
await service.executeStep({
|
||||
operationId: 'op-noctx',
|
||||
stepIndex: 5,
|
||||
context: { phase: 'user_input' } as any,
|
||||
});
|
||||
|
||||
const stepStartCall = streamManager.publishStreamEvent.mock.calls.find(
|
||||
([, evt]: any) => evt?.type === 'step_start',
|
||||
);
|
||||
expect(stepStartCall).toBeDefined();
|
||||
expect(stepStartCall[1].data).not.toHaveProperty('uiMessages');
|
||||
// Did not even attempt the DB query when context is missing.
|
||||
expect(queryMock).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -256,7 +256,11 @@ describe('AgentRuntimeService.executeSync', () => {
|
||||
|
||||
// Publish event after a small delay
|
||||
setTimeout(async () => {
|
||||
await streamEventManager.publishAgentRuntimeEnd(operationId, 1, { status: 'done' });
|
||||
await streamEventManager.publishAgentRuntimeEnd({
|
||||
finalState: { status: 'done' },
|
||||
operationId,
|
||||
stepIndex: 1,
|
||||
});
|
||||
}, 10);
|
||||
|
||||
const event = await streamEventManager.waitForEvent(operationId, 'agent_runtime_end', 1000);
|
||||
|
||||
@@ -873,7 +873,13 @@ export class AiAgentService {
|
||||
if (!result.success) {
|
||||
log('execAgent: remote hetero dispatch failed: %s', result.error);
|
||||
await streamManager
|
||||
.publishAgentRuntimeEnd(operationId, 0, { error: result.error }, 'error', result.error)
|
||||
.publishAgentRuntimeEnd({
|
||||
finalState: { error: result.error },
|
||||
operationId,
|
||||
reason: 'error',
|
||||
reasonDetail: result.error,
|
||||
stepIndex: 0,
|
||||
})
|
||||
.catch(() => {});
|
||||
await this.messageModel.update(assistantMsg.id, {
|
||||
content: '',
|
||||
|
||||
@@ -2,6 +2,7 @@ import { type LobeChatDatabase } from '@lobechat/database';
|
||||
import { CompressionRepository } from '@lobechat/database';
|
||||
import {
|
||||
type CreateMessageParams,
|
||||
type QueryMessageParams,
|
||||
type UIChatMessage,
|
||||
type UpdateMessageParams,
|
||||
} from '@lobechat/types';
|
||||
@@ -111,6 +112,18 @@ export class MessageService {
|
||||
return { messages, success: true };
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch the canonical message list for an agent / topic scope, using the
|
||||
* standard UIChatMessage shape (file URLs resolved through FileService).
|
||||
*
|
||||
* Mirrors the read path exposed by the `message.getMessages` trpc lambda
|
||||
* so server-internal callers (e.g. agent runtime stream events) can push
|
||||
* the same payload the client would otherwise fetch.
|
||||
*/
|
||||
async queryMessages(params: QueryMessageParams): Promise<UIChatMessage[]> {
|
||||
return this.messageModel.query(params, this.getQueryOptions());
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new message and return the complete message list
|
||||
* Pattern: create + query
|
||||
|
||||
@@ -0,0 +1,28 @@
|
||||
import { type DeviceAttachment } from '@lobechat/builtin-tool-remote-device';
|
||||
|
||||
import { lambdaClient } from '@/libs/trpc/client';
|
||||
|
||||
export const deviceService = {
|
||||
/**
|
||||
* List all online devices bound to the current user.
|
||||
* Returns devices from the device-gateway via tRPC.
|
||||
*/
|
||||
listDevices: async (): Promise<DeviceAttachment[]> => {
|
||||
try {
|
||||
return await lambdaClient.device.listDevices.query();
|
||||
} catch {
|
||||
return [];
|
||||
}
|
||||
},
|
||||
|
||||
/**
|
||||
* Check if the user has any online devices.
|
||||
*/
|
||||
getStatus: async (): Promise<{ deviceCount: number; online: boolean }> => {
|
||||
try {
|
||||
return await lambdaClient.device.status.query();
|
||||
} catch {
|
||||
return { deviceCount: 0, online: false };
|
||||
}
|
||||
},
|
||||
};
|
||||
@@ -58,17 +58,26 @@ const getRuntimeEnvConfigById = (agentId: string) => (s: AgentStoreState) =>
|
||||
const isLocalSystemEnabledById = (agentId: string) => (s: AgentStoreState) =>
|
||||
getRuntimeModeById(agentId)(s) === 'local';
|
||||
|
||||
/** Get the selected device ID for the agent (desktop only) */
|
||||
const getDeviceIdById =
|
||||
(agentId: string) =>
|
||||
(s: AgentStoreState): string | undefined =>
|
||||
getChatConfigById(agentId)(s).runtimeEnv?.deviceId;
|
||||
|
||||
/**
|
||||
* Get runtime environment mode by agent ID.
|
||||
* Reads from `runtimeMode[platform]`, defaults to 'local' on desktop, 'none' on web.
|
||||
* Legacy 'cloud' values are normalized to 'sandbox' for backward compatibility.
|
||||
*/
|
||||
const getRuntimeModeById =
|
||||
(agentId: string) =>
|
||||
(s: AgentStoreState): RuntimeEnvMode => {
|
||||
const runtimeEnv = getChatConfigById(agentId)(s).runtimeEnv;
|
||||
const platform = isDesktop ? 'desktop' : 'web';
|
||||
const mode = runtimeEnv?.runtimeMode?.[platform] ?? (isDesktop ? 'local' : 'none');
|
||||
|
||||
return runtimeEnv?.runtimeMode?.[platform] ?? (isDesktop ? 'local' : 'none');
|
||||
// Legacy backward compatibility: map 'cloud' to 'sandbox'
|
||||
return mode === 'cloud' ? 'sandbox' : mode;
|
||||
};
|
||||
|
||||
const getSkillActivateModeById =
|
||||
@@ -78,6 +87,7 @@ const getSkillActivateModeById =
|
||||
|
||||
export const chatConfigByIdSelectors = {
|
||||
getChatConfigById,
|
||||
getDeviceIdById,
|
||||
getEnableHistoryCountById,
|
||||
getHistoryCountById,
|
||||
getRuntimeEnvConfigById,
|
||||
|
||||
@@ -34,8 +34,10 @@ const isMemoryToolEnabled = (s: AgentStoreState) =>
|
||||
const isLocalSystemEnabled = (s: AgentStoreState) =>
|
||||
chatConfigByIdSelectors.isLocalSystemEnabledById(s.activeAgentId || '')(s);
|
||||
|
||||
const isCloudSandboxEnabled = (s: AgentStoreState) =>
|
||||
chatConfigByIdSelectors.getRuntimeModeById(s.activeAgentId || '')(s) === 'cloud';
|
||||
const isCloudSandboxEnabled = (s: AgentStoreState) => {
|
||||
const mode = chatConfigByIdSelectors.getRuntimeModeById(s.activeAgentId || '')(s);
|
||||
return mode === 'cloud' || mode === 'sandbox';
|
||||
};
|
||||
|
||||
const skillActivateMode = (s: AgentStoreState) =>
|
||||
chatConfigByIdSelectors.getSkillActivateModeById(s.activeAgentId || '')(s);
|
||||
|
||||
Reference in New Issue
Block a user